Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I'm getting an exception when executing
spark2-submit
on my hadoop cluster, when reading a directory of
.jsons
in hdfs I have no idea how to resolve it.
I have found some question on several board about this, but none of them popular or with an answer.
I tried explicit importing
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
, but it seems redundant, to importing
SparkSession
, so it's not getting recognised.
I can however confirm that both of these classes are available.
val json:org.apache.spark.sql.execution.datasources.json.JsonDataSource
val json:org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Stack Trace:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple sources found for json (org.apache.spark.sql.execution.datasources.json.JsonFileFormat, org.apache.spark.sql.execution.datasources.json.DefaultSource), please specify the fully qualified class name.;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:397)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:340)
at jsonData.HdfsReader$.readJsonToDataFrame(HdfsReader.scala:45)
at jsonData.HdfsReader$.process(HdfsReader.scala:52)
at exp03HDFS.StartExperiment03$.main(StartExperiment03.scala:41)
at exp03HDFS.StartExperiment03.main(StartExperiment03.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
HdfsReader:
import java.net.URI
import org.apache.hadoop.fs.{LocatedFileStatus, RemoteIterator}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import pipelines.ContentPipeline
object HdfsReader {
def readJsonToDataFrame(inputDir: String, multiline: Boolean = true, verbose: Boolean = false)
: DataFrame = {
val multiline_df = spark.read.option("multiline",value = true).json(inputDir)
multiline_df.show(false)
if (verbose) multiline_df.show(truncate = true)
multiline_df
def process(path: URI) = {
val dataFrame = readJsonToDataFrame(path.toString, verbose = true)
val contentDataFrame = ContentPipeline.getContentOfText(dataFrame)
val newDataFrame = dataFrame.join(contentDataFrame, "text").distinct()
JsonFileUtils.saveAsJson(newDataFrame, outputFolder)
build.sbt
version := "0.1"
scalaVersion := "2.11.8" //same version hadoop uses
libraryDependencies ++=Seq(
"org.apache.spark" %% "spark-core" % "2.3.0", //same version hadoop uses
"com.johnsnowlabs.nlp" %% "spark-nlp" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"org.apache.spark" %% "spark-mllib" % "2.3.0",
"org.scalactic" %% "scalactic" % "3.2.0",
"org.scalatest" %% "scalatest" % "3.2.0" % "test",
"com.lihaoyi" %% "upickle" % "0.7.1")
–
–
val dataFrame2 = spark
.read
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline",value = true)
.load(inputDir)
These two functions do essentially the same:
They read a whole directory of *.json
files into a DataFrame.
The only thing that's different is, that dataFrame1
one makes assumptions about the datatype you're going to use and looks for it in org.apache.spark.sql.execution.datasources.json
.
You don't want that, because if you try to initialese a json from this class path you will find 2 sources.
val json:org.apache.spark.sql.execution.datasources.json.JsonDataSource
val json:org.apache.spark.sql.execution.datasources.json.JsonFileFormat
There is however an option that allows you to specify a source, in case of conflicts.
This is where you use format(source).read(path)
to explicitly use a specific datatype to read the files.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.