添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams
object KafkaSparkHdfs {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
  sparkConf.set("spark.driver.allowMultipleContexts", "true");
  val sc = new SparkContext(sparkConf)
  def main(args: Array[String]): Unit = {
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val ssc = new StreamingContext(sparkConf, Seconds(20))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "stream3",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    val topics = Array("fridaydata")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    val lines = stream.map(consumerRecord => consumerRecord.value)
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(word => (word, 1))
    val wordCount = wordMap.reduceByKey(_ + _)
    wordCount.foreachRDD(rdd => {
      val dataframe = rdd.toDF(); 
      dataframe.write
        .mode(SaveMode.Append)
        .save("hdfs://localhost:9000/newfile24")     
    ssc.start()
    ssc.awaitTermination()

The folder is created but the file is not written.

The program is getting terminated with the following error:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.lang.Thread.run(Thread.java:748)
    18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

In my pom I am using respective dependencies:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11
  • The error is due to trying to run multiple spark contexts at the same time. Setting allowMultipleContexts to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext is used everywhere. From the code we can see that the SparkContext (sc) is used to create a SQLContext which is fine. However, when creating the StreamingContext it is not used, instead the SparkConf is used.

    By looking at the documentation we see:

    Create a StreamingContext by providing the configuration necessary for a new SparkContext

    In other words, by using SparkConf as parameter a new SparkContext will be created. Now there are two separate contexts.

    The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext to:

    val ssc = new StreamingContext(sc, Seconds(20))
    

    Note: In newer versions of Spark (2.0+) use SparkSession instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...). It can look as follows:

    val spark = SparkSession().builder
      .setMaster("local[*]")
      .setAppName("SparkKafka")
      .getOrCreate()
    import sqlContext.implicits._
    val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
                    yeah it is working, but i have used append mode when ever new data comes it is not writing in same file ,it is creating new file @Shaido
    – andani
                    Jun 25, 2018 at 5:28
                    is their any way we can keep writing in same file ,because it is streaming    if streaming is emitting for every one min ,it keep creating n number of files.
    – andani
                    Jun 25, 2018 at 5:37
                    @andani: Checking if that can be done. Spark will save separate partitions as separate files so not sure if it can be done. However, you can read all files by doing spark.read.parquet("path").
    – Shaido
                    Jun 25, 2018 at 5:45
    

    While reducing number of files might be tempting in many scenarios, it should be done if and only if it amount of data is low enough for nodes to handle (clearly it isn't here).

    Furthermore, let me quote the documentation:

    However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

    The conclusion is you should adjust the parameter accordingly to the expected amount of data and desired parallelism. coalesce(1) as such is rarely useful in practice, especially in a context like streaming, where data properties can differ over time.

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.