持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第10天,
点击查看活动详情
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
0. 写在前面
1. 报错
2. 代码及报错信息
3. 原因及纠错
4. 参考链接
0. 写在前面
Spark :
Spark3.0.0
Scala :
Scala2.12
1. 报错
overloaded method value foreachBatch with alternatives:
2. 代码及报错信息
Error:(48, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
.foreachBatch((df, batchId) => {
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ForeachBatchSink1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ForeachSink1")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("update")
.foreachBatch((df, batchId) => {
val result = df.as[String].flatMap(_.split("\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach1")
result.unpersist()
.trigger(Trigger.Continuous(10))
.start
query.awaitTermination()
Error:(43, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
.foreachBatch((df, batchId) => { */
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ForeachBatchSink {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ForeachSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("complete")
.foreachBatch((df, batchId) => {
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach")
result.unpersist()
.start
query.awaitTermination()
3. 原因及纠错
Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样
正确代码如下
import java.util.Properties
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object ForeachBatchSink {
def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
println("BatchId" + batchId)
if (df.count() != 0) {
df.persist()
df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
df.unpersist()
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachBatchSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // TODO 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val wordCount: DataFrame = lines.as[String]
.flatMap(_.split("\W+"))
.groupBy("value")
.count() // value count
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = wordCount.writeStream
.outputMode("complete")
.foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props)
.start
query.awaitTermination()
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object ForeachBatchSink1 {
def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
import spark.implicits._
println("BatchId = " + batchId)
if (df.count() != 0) {
val result = df.as[String].flatMap(_.split("\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
result.unpersist()
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachBatchSink1")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // TODO 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("update")
.foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props, spark)
.trigger(Trigger.Continuous(10))
.start
query.awaitTermination()
4. 参考链接
blog.csdn.net/Shockang/ar…