添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm getting an exception when executing spark2-submit on my hadoop cluster, when reading a directory of .jsons in hdfs I have no idea how to resolve it.

I have found some question on several board about this, but none of them popular or with an answer.

I tried explicit importing org.apache.spark.sql.execution.datasources.json.JsonFileFormat , but it seems redundant, to importing SparkSession , so it's not getting recognised.

I can however confirm that both of these classes are available.

val json:org.apache.spark.sql.execution.datasources.json.JsonDataSource
val json:org.apache.spark.sql.execution.datasources.json.JsonFileFormat

Stack Trace:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple sources found for json (org.apache.spark.sql.execution.datasources.json.JsonFileFormat, org.apache.spark.sql.execution.datasources.json.DefaultSource), please specify the fully qualified class name.;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:397)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:340)
    at jsonData.HdfsReader$.readJsonToDataFrame(HdfsReader.scala:45)
    at jsonData.HdfsReader$.process(HdfsReader.scala:52)
    at exp03HDFS.StartExperiment03$.main(StartExperiment03.scala:41)
    at exp03HDFS.StartExperiment03.main(StartExperiment03.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

HdfsReader:

import java.net.URI
import org.apache.hadoop.fs.{LocatedFileStatus, RemoteIterator}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import pipelines.ContentPipeline
object HdfsReader {
  def readJsonToDataFrame(inputDir: String, multiline: Boolean = true, verbose: Boolean = false)
  : DataFrame = {
    val multiline_df = spark.read.option("multiline",value = true).json(inputDir)
    multiline_df.show(false)
    if (verbose) multiline_df.show(truncate = true)
    multiline_df
  def process(path: URI) = {
    val dataFrame = readJsonToDataFrame(path.toString, verbose = true)
    val contentDataFrame = ContentPipeline.getContentOfText(dataFrame)
    val newDataFrame = dataFrame.join(contentDataFrame, "text").distinct()
    JsonFileUtils.saveAsJson(newDataFrame, outputFolder)

build.sbt

version := "0.1"
scalaVersion := "2.11.8" //same version hadoop uses
libraryDependencies ++=Seq(
  "org.apache.spark" %% "spark-core" % "2.3.0", //same version hadoop uses
  "com.johnsnowlabs.nlp" %% "spark-nlp" % "2.3.0",
  "org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-mllib" % "2.3.0",
  "org.scalactic" %% "scalactic" % "3.2.0",
  "org.scalatest" %% "scalatest" % "3.2.0" % "test",
  "com.lihaoyi" %% "upickle" % "0.7.1")
                Need to use either Spark 2 or 3 but not both i.e. get rid of duplicated jars. Maybe your cluster has Spark 3 installed? It seems spark-nlp supports only Spark 2 at the moment
– ollik1
                Jul 5, 2020 at 20:49
                I checked every version in the build sbt to be compatible with the cluster and i'm still getting the same exact error
– blkpingu
                Jul 7, 2020 at 15:25
val dataFrame2 = spark
  .read
  .format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
  .option("multiline",value = true)
  .load(inputDir)

These two functions do essentially the same:

They read a whole directory of *.json files into a DataFrame.

The only thing that's different is, that dataFrame1 one makes assumptions about the datatype you're going to use and looks for it in org.apache.spark.sql.execution.datasources.json.

You don't want that, because if you try to initialese a json from this class path you will find 2 sources.

val json:org.apache.spark.sql.execution.datasources.json.JsonDataSource
val json:org.apache.spark.sql.execution.datasources.json.JsonFileFormat

There is however an option that allows you to specify a source, in case of conflicts.

This is where you use format(source).read(path) to explicitly use a specific datatype to read the files.

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.