博客是我工作的好帮手,遇到困难就来博客找资料
分类: 系统运维
2017-02-08 10:44:30
1.如果是格式化成Json的話直接
val rdd = df.toJSON.rdd
2.如果要指定格式需要自定义函数如下:
//格式化具体字段条目
def formatItem(p:(StructField,Any)):String={
p match {
case (sf,a) =>
sf.dataType match {
case StringType => "\"" + sf.name + "\":\"" + a + "\""
case IntegerType => "\"" + sf.name + "\":" + a
case LongType => "\"" + sf.name + "\":" + a
case StructType(s) => "\"" + sf.name + "\":" + formatStruct(s, a.asInstanceOf[Row])
}
}
}
//格式化整行数据格式
def formatStruct(schema:Seq[StructField],r:Row)= {
val paired = schema.zip(r.toSeq)
"{" + paired.foldLeft("")((s,p) => (if(s == "") "" else (s + ", ")) + formatItem(p)) + "}"
}
//格式化整个DF
def formatDataFrame(st:StructType,srdd:DataFrame)={
srdd.rdd.map(formatStruct(st.fields,_))
}
调用示例:
val strings = formatDataFrame(df.schema, df)
strings.foreach { println }
1.RDD -> Dataset
val ds = rdd.toDS()
2.RDD -> DataFrame
val df = spark.read.json(rdd)
3.Dataset -> RDD
val rdd = ds.rdd
4.Dataset -> DataFrame
val df = ds.toDF()
5.DataFrame -> RDD
val rdd = df.toJSON.rdd
6.DataFrame -> Dataset
val ds = df.toJSON
转载于http://www.cnblogs.com/ciade/