添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

UnsupportedOperationException:没有找到org.apache.spark.sql.Row的编码器。

0 人关注

我正试图创建一个数据框架。似乎火花无法从scala.Tuple2类型中创建一个数据框架。我怎样才能做到这一点呢?我是scala和spark的新手。

下面是代码运行中的部分错误追踪

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_1")
- root class: "scala.Tuple2"
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:666)
    ..........  
    org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:299)
    at SparkMapReduce$.runMapReduce(SparkMapReduce.scala:46)
    at Entrance$.queryLoader(Entrance.scala:64)
    at Entrance$.paramsParser(Entrance.scala:43)
    at Entrance$.main(Entrance.scala:30)
    at Entrance.main(Entrance.scala)

下面是整个程序中的一部分代码。问题出现在感叹号上方的一行注释中

import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.split import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame object SparkMapReduce { Logger.getLogger("org.spark_project").setLevel(Level.WARN) Logger.getLogger("org.apache").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) Logger.getLogger("com").setLevel(Level.WARN) def runMapReduce(spark: SparkSession, pointPath: String, rectanglePath: String): DataFrame = var pointDf = spark.read.format("csv").option("delimiter",",").option("header","false").load(pointPath); pointDf = pointDf.toDF() pointDf.createOrReplaceTempView("points") pointDf = spark.sql("select ST_Point(cast(points._c0 as Decimal(24,20)),cast(points._c1 as Decimal(24,20))) as point from points") pointDf.createOrReplaceTempView("pointsDf") // pointDf.show() var rectangleDf = spark.read.format("csv").option("delimiter",",").option("header","false").load(rectanglePath); rectangleDf = rectangleDf.toDF() rectangleDf.createOrReplaceTempView("rectangles") rectangleDf = spark.sql("select ST_PolygonFromEnvelope(cast(rectangles._c0 as Decimal(24,20)),cast(rectangles._c1 as Decimal(24,20)), cast(rectangles._c2 as Decimal(24,20)), cast(rectangles._c3 as Decimal(24,20))) as rectangle from rectangles") rectangleDf.createOrReplaceTempView("rectanglesDf") // rectangleDf.show() val joinDf = spark.sql("select rectanglesDf.rectangle as rectangle, pointsDf.point as point from rectanglesDf, pointsDf where ST_Contains(rectanglesDf.rectangle, pointsDf.point)") joinDf.createOrReplaceTempView("joinDf") // joinDf.show() import spark.implicits._ val joinRdd = joinDf.rdd val resmap = joinRdd.map(x=>(x, 1)) val reduced = resmap.reduceByKey(_+_) val final_datablock = reduced.collect() val trying : List[Float] = List() print(final_datablock) // .toDF("rectangles", "count") // val dataframe_final1 = spark.createDataFrame(reduced) val dataframe_final2 = spark.createDataFrame(reduced).toDF("rectangles", "count") // ^ !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Line above creates problem // You need to complete this part var result = spark.emptyDataFrame return result // You need to change this part
1 个评论
嘿,Vishwad,你能分享你的输入和预期输出吗?
scala
apache-spark
Vishwad
Vishwad
发布于 2021-10-23
1 个回答
Majid Hajibaba
Majid Hajibaba
发布于 2021-10-23
0 人赞同

你的第一列 reduced ,其类型为 ROW ,你在从RDD转换到DF时没有指定它。一个数据框架必须有一个模式。因此,你需要使用以下方法,为你的 RDD 定义一个正确的模式,以转换为 DataFrame

createDataFrame(RDD<Row> rowRDD, StructType schema)
val schema = new StructType()
  .add(Array(