今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。
用过 Spark SQL 应该知道,Spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 JsonArray,我们可以在 spark-shell 里头举个栗子来看一下。
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[*]").appName("test")
.getOrCreate();
//提供隐式转换功能,比如将 Rdd 转为 dataframe
import spark.implicits._
val df:DataFrame = spark.sparkContext.parallelize(Array(("abc",2),("efg",4))).toDF()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc| 2|
|efg| 4|
+---+---+
//这里使用 dataframe Api 转换成 jsonArray
val jsonStr:String = df.toJSON.collectAsList.toString
println(jsonStr)
/*--------------- json String-------------
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
可以发现,我们可以使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonArray 的形式,但这样子却有些冗余。以上面的例子来说,很多时候我要的不是这样的形式。
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
而是下面这种形式。
[{"abc":2}, {"efg":4}]
这才是我们通常会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,我们需要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。
import org.apache.spark.sql.DataFrame
import scala.util.parsing.json.{JSONArray, JSONObject}
object DFTest {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[*]").appName("test")
.getOrCreate();
//提供隐式转换功能,比如将 Rdd 转为 dataframe
import spark.implicits._
val df:DataFrame = spark.sparkContext.parallelize(Array(("abc",2),("efg",4))).toDF()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc| 2|
|efg| 4|
+---+---+
//接下来不一样了
val df2Array:Array[Tuple2[String,Int]] = df.collect().map{
case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)}
val jsonData:Array[JSONObject] = df2Array.map{ i =>
new JSONObject(Map(i._1 -> i._2))
val jsonArray:JSONArray = new JSONArray(jsonData.toList)
println(jsonArray)
/*-----------jsonArray------------
[{"abc" : 2}, {"efg" : 4}]