添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第10天, 点击查看活动详情

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

  • 0. 写在前面
  • 1. 报错
  • 2. 代码及报错信息
  • 3. 原因及纠错
  • 4. 参考链接
  • 0. 写在前面

  • Spark : Spark3.0.0
  • Scala : Scala2.12
  • 1. 报错

    overloaded method value foreachBatch with alternatives:

    2. 代码及报错信息

    Error:(48, 12) overloaded method value foreachBatch with alternatives:

    (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]

    (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]

    cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])

    .foreachBatch((df, batchId) => {

    import java.util.Properties
    import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    object ForeachBatchSink1 {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("ForeachSink1")
                .getOrCreate()
            import spark.implicits._
            val lines: DataFrame = spark.readStream
                .format("socket") // 设置数据源
                .option("host""cluster01")
                .option("port"10000)
                .load
            val props = new Properties()
            props.setProperty("user""root")
            props.setProperty("password""1234")
            val query: StreamingQuery = lines.writeStream
                .outputMode("update")
                .foreachBatch((df, batchId) => {
                    val result = df.as[String].flatMap(_.split("\W+")).groupBy("value").count()
                    result.persist()
                  result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                    result.write.mode("overwrite").json("./foreach1")
                    result.unpersist()
    //            .trigger(Trigger.ProcessingTime(0))
                .trigger(Trigger.Continuous(10))
                .start
            query.awaitTermination()
    
  • Error:(43, 12) overloaded method value foreachBatch with alternatives:
  • (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
  • .foreachBatch((df, batchId) => { */
  • import java.util.Properties
    import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    object ForeachBatchSink {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("ForeachSink")
                .getOrCreate()
            import spark.implicits._
            val lines: DataFrame = spark.readStream
                .format("socket") // 设置数据源
                .option("host""cluster01")
                .option("port"10000)
                .load
            val props = new Properties()
            props.setProperty("user""root")
            props.setProperty("password""1234")
            val query: StreamingQuery = lines.writeStream
                .outputMode("complete")
                .foreachBatch((df, batchId) => {          
                    result.persist()
                    result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                    result.write.mode("overwrite").json("./foreach")
                    result.unpersist()
                .start
            query.awaitTermination()
    

    3. 原因及纠错

    Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样

    正确代码如下

    import java.util.Properties
    import org.apache.spark.sql.streaming.StreamingQuery
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    object ForeachBatchSink {
        def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
            println("BatchId" + batchId)
            if (df.count() != 0) {
                df.persist()
                df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
                df.unpersist()
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
              .builder()
              .master("local[2]")
              .appName("ForeachBatchSink")
              .getOrCreate()
            import spark.implicits._
            val lines: DataFrame = spark.readStream
              .format("socket") // TODO 设置数据源
              .option("host""cluster01")
              .option("port"10000)
              .load
            val wordCount: DataFrame = lines.as[String]
              .flatMap(_.split("\W+"))
              .groupBy("value")
              .count()  // value count
            val props = new Properties()
            props.setProperty("user""root")
            props.setProperty("password""1234")
            val query: StreamingQuery = wordCount.writeStream
              .outputMode("complete")
              .foreachBatch((df : Dataset[Row], batchId : Long) => {
                  myFun(df, batchId, props)
              .start
            query.awaitTermination()
    
    import java.util.Properties
    import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    object ForeachBatchSink1 {
        def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
            import spark.implicits._
            println("BatchId = " + batchId)
            if (df.count() != 0) {
                val result = df.as[String].flatMap(_.split("\W+")).groupBy("value").count()
                result.persist()
                result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
                result.unpersist()
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
              .builder()
              .master("local[2]")
              .appName("ForeachBatchSink1")
              .getOrCreate()
            import spark.implicits._
            val lines: DataFrame = spark.readStream
              .format("socket") // TODO 设置数据源
              .option("host""cluster01")
              .option("port"10000)
              .load
            val props = new Properties()
            props.setProperty("user""root")
            props.setProperty("password""1234")
            val query: StreamingQuery = lines.writeStream
              .outputMode("update")
              .foreachBatch((df : Dataset[Row], batchId : Long) => {
                    myFun(df, batchId, props, spark)
              .trigger(Trigger.Continuous(10))
              .start
            query.awaitTermination()
    

    4. 参考链接

    blog.csdn.net/Shockang/ar…

    分类:
    后端