Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I'm getting an exception when executing
spark2-submit
on my hadoop cluster, when reading a directory of
.jsons
in hdfs I have no idea how to resolve it.
I have found some question on several board about this, but none of them popular or with an answer.
I tried explicit importing
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
, but it seems redundant, to importing
SparkSession
, so it's not getting recognised.
I can however confirm that both of these classes are available.
val json:org.apache.spark.sql.execution.datasources.json.JsonDataSource
val json:org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Stack Trace:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple sources found for json (org.apache.spark.sql.execution.datasources.json.JsonFileFormat, org.apache.spark.sql.execution.datasources.json.DefaultSource), please specify the fully qualified class name.;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:397)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:340)
at jsonData.HdfsReader$.readJsonToDataFrame(HdfsReader.scala:45)
at jsonData.HdfsReader$.process(HdfsReader.scala:52)
at exp03HDFS.StartExperiment03$.main(StartExperiment03.scala:41)
at exp03HDFS.StartExperiment03.main(StartExperiment03.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
HdfsReader:
import java.net.URI
import org.apache.hadoop.fs.{LocatedFileStatus, RemoteIterator}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import pipelines.ContentPipeline
object HdfsReader {
def readJsonToDataFrame(inputDir: String, multiline: Boolean = true, verbose: Boolean = false)
: DataFrame = {
val multiline_df = spark.read.option("multiline",value = true).json(inputDir)
multiline_df.show(false)
if (verbose) multiline_df.show(truncate = true)
multiline_df
def process(path: URI) = {
val dataFrame = readJsonToDataFrame(path.toString, verbose = true)
val contentDataFrame = ContentPipeline.getContentOfText(dataFrame)
val newDataFrame = dataFrame.join(contentDataFrame, "text").distinct()
JsonFileUtils.saveAsJson(newDataFrame, outputFolder)
build.sbt
version := "0.1"
scalaVersion := "2.11.8" //same version hadoop uses
libraryDependencies ++=Seq(
"org.apache.spark" %% "spark-core" % "2.3.0", //same version hadoop uses
"com.johnsnowlabs.nlp" %% "spark-nlp" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"org.apache.spark" %% "spark-mllib" % "2.3.0",
"org.scalactic" %% "scalactic" % "3.2.0",
"org.scalatest" %% "scalatest" % "3.2.0" % "test",
"com.lihaoyi" %% "upickle" % "0.7.1")
–
–
val dataFrame2 = spark
.read
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline",value = true)
.load(inputDir)
These two functions do essentially the same:
They read a whole directory of *.json files into a DataFrame.
The only thing that's different is, that dataFrame1 one makes assumptions about the datatype you're going to use and looks for it in org.apache.spark.sql.execution.datasources.json.
You don't want that, because if you try to initialese a json from this class path you will find 2 sources.
val json:org.apache.spark.sql.execution.datasources.json.JsonDataSource
val json:org.apache.spark.sql.execution.datasources.json.JsonFileFormat
There is however an option that allows you to specify a source, in case of conflicts.
This is where you use format(source).read(path) to explicitly use a specific datatype to read the files.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.