一、RDD的生成
-
使用parallelize/makeRDD算子从集合转换而来,常用于测试
-
使用类似textFile()这样的算子从文件系统读取数据形成RDD
-
使用transformation算子转换而来
val df = spark.read.format.load()
初始化SparkSession,
Spark
SQL的统一入口。
Java语言:
package com.kfk.spark.common;
import org.apache.spark.sql.SparkSession;
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/2
* @time : 10:01 下午
public class CommSparkSession {
public static SparkSession getSparkSession(){
SparkSession spark = SparkSession.builder()
.appName("CommSparkSession")
.master("local")
.config("spark.sql.warehouse.dir","/Users/caizhengjie/Document/spark/spark-warehouse")
.getOrCreate();
return spark;
Scala语言:
package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/2
* @time : 10:02 下午
object CommSparkSessionScala {
def getSparkSession(): SparkSession ={
val spark = SparkSession
.builder
.appName("CommSparkSessionScala")
.master("local")
.config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
.getOrCreate
return spark
(1)RDD转DataFrame
方案一:直接将字段名称传入toDF中
数据源
Michael, 29
Andy, 30
Justin, 19
使用toDF的方式需要导入隐式转换
import spark.implicits._
val path = CommScala.fileDirPath + "people.txt";
val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
(x(0),x(1).trim)
}).toDF("name", "age")
personDf1.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
方案二:通过
反射
的方式
Java语言实现
Spark SQL支持将JavaBean的RDD自动转换 为DataFrame。
package com.kfk.spark.sql;
import java.io.Serializable;
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/2
* @time : 10:15 下午
public class Person implements Serializable {
private String name;
private long age;
public Person(String name, long age) {
this.name = name;
this.age = age;
public Person() {
public String getName() {
return name;
public void setName(String name) {
this.name = name;
public long getAge() {
return age;
public void setAge(long age) {
this.age = age;
package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/2
* @time : 10:07 下午
public class RDDToDFReflectionJava {
public static void main(String[] args) {
SparkSession spark = CommSparkSession.getSparkSession();
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
String filePath = Comm.fileDirPath + "people.txt";
// 将文件转换成rdd
JavaRDD<Person> personrdd = spark.read().textFile(filePath).javaRDD().map(line -> {
String name = line.split(",")[0];
long age = Long.parseLong(line.split(",")[1].trim());
Person person = new Person(name,age);
return person;
// 通过反射将rdd转换成DataFrame
Dataset<Row> personDf = spark.createDataFrame(personrdd,Person.class);
personDf.show();
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
// 创建临时表
personDf.createOrReplaceTempView("person");
Dataset<Row> resultDf = spark.sql("select * from person a where a.age > 20");
resultDf.show();
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* +-------+---+
Scala语言实现
Scala中借助case class使用反射
package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/3
* @time : 6:32 下午
object RDDToDFReflectionScala {
case class Person(name : String, age : Long)
def main(args: Array[String]): Unit = {
val spark = CommSparkSessionScala.getSparkSession();
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
val path = CommScala.fileDirPath + "people.txt";
// 通过case class反射将rdd转换成DataFrame
import spark.implicits._
val personDf = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
Person(x(0),x(1).trim.toLong)
}).toDF()
personDf.show()
// 直接将字段名称传入toDF中转换成DataFrame
val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
(x(0),x(1).trim)
}).toDF("name", "age")
personDf1.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
// 创建临时表
personDf.createOrReplaceTempView("person")
val resultDf = spark.sql("select * from person a where a.age > 20")
resultDf.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* +-------+---+
方案三:构造
Schema
的方式
Java语言实现
package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/3
* @time : 9:32 下午
public class RDDToDFProgramJava {
public static void main(String[] args) {
SparkSession spark = CommSparkSession.getSparkSession();
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
String filePath = Comm.fileDirPath + "people.txt";
List<StructField> fields = new ArrayList<StructField>();
StructField structField_Name = DataTypes.createStructField("name",DataTypes.StringType,true);
StructField structField_Age = DataTypes.createStructField("age",DataTypes.StringType,true);
fields.add(structField_Name);
fields.add(structField_Age);
// 构造Schema
StructType scheme = DataTypes.createStructType(fields);
// 将rdd(people)转换成RDD[Row]
JavaRDD<Row> personRdd = spark.read().textFile(filePath).javaRDD().map(line ->{
String[] attributes = line.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
// 通过rdd和scheme创建DataFrame
Dataset<Row> personDataFrame = spark.createDataFrame(personRdd,scheme);
personDataFrame.show();
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
// 创建临时表
personDataFrame.createOrReplaceTempView("person");
Dataset<Row> resultDataFrame = spark.sql("select * from person a where a.age > 20");
resultDataFrame.show();
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* +-------+---+
Scala语言实现
package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/3
* @time : 10:05 下午
object RDDToDFProgramScala {
def main(args: Array[String]): Unit = {
val spark = CommSparkSessionScala.getSparkSession();
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
val path = CommScala.fileDirPath + "people.txt";
// 构造Schema
val scheme = StructType(Array(
StructField("name",DataTypes.StringType,true),
StructField("age",DataTypes.LongType,true)
// 将rdd(people)转换成RDD[Row]
val rdd = spark.sparkContext.textFile(path).map(line => {
line.split(",")
}).map(x => {
Row(x(0),x(1).trim.toLong)
// 通过rdd和scheme创建DataFrame
val personDataFrame = spark.createDataFrame(rdd,scheme)
personDataFrame.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
// 创建临时表
personDataFrame.createOrReplaceTempView("person")
val resultDataFrame = spark.sql("select * from person a where a.age > 20")
resultDataFrame.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* +-------+---+
for (elem <- resultDataFrame.collect()) {
System.out.println(elem)
* [Michael,29]
* [Andy,30]
(2)DataFrame转RDD
通过
df.rdd
或者
df.javaRDD()
将DataFrame转RDD
Java语言实现
// 将DataFrame转换成rdd
JavaRDD<Person> resultRdd = resultDf.javaRDD().map(line -> {
Person person = new Person();
person.setName(line.getAs("name"));
person.setAge(line.getAs("age"));
return person;
for (Person person : resultRdd.collect()){
System.out.println(person.getName() + " : " + person.getAge());
* Michael : 29
* Andy : 30
Scala语言实现
// 将DataFrame转换成rdd
val resultRdd = resultDf.rdd.map(row => {
val name = row.getAs[String]("name")
val age = row.getAs[Long]("age")
Person(name,age)
for (elem <- resultRdd.collect()) {
System.out.println(elem.name + " : " + elem.age)
* Michael : 29
* Andy : 30
五、RDD和DataSet的转换
方案一:使用toDS()
算子
使用toDS()算子,需要导入隐式转换,schema参考RDD转DataFrame的方法(schema信息不可以通过toDS直接传入)
package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/5
* @time : 4:10 下午
object DataSetDemoScala {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = CommSparkSessionScala.getSparkSession()
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
val path = CommScala.fileDirPath + "people.txt";
//使用toDS()函数需要导入隐式转换
import spark.implicits._
// 方案一:通过toDS()将RDD转换成DataSet
val dataDS = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
Person(x(0),x(1).trim.toInt)
}).toDS()
dataDS.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
方案二:使用spark.createDataset(rdd)
package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/5
* @time : 4:10 下午
object DataSetDemoScala {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = CommSparkSessionScala.getSparkSession()
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
val path = CommScala.fileDirPath + "people.txt";
//使用toDS()函数需要导入隐式转换
import spark.implicits._
// 方案二:使用spark.createDataset(rdd)将RDD转换成DataSet
val rdd = spark.sparkContext.textFile(path).map(line => {
line.split(",")
}).map(x => {
Person(x(0),x(1).trim.toInt)
val dataDS1 = spark.createDataset(rdd)
dataDS1.show()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
(2)DataSet转RDD
// 将DataSet转RDD
val rdd1 = dataDS1.rdd
for (elem <- rdd1.collect()) {
System.out.println(elem.name + " : " + elem.age)
* Michael : 29
* Andy : 30
* Justin : 19
六、DataFrame与DataSet的转换
(1)DataFrame转DataSet
DataSet 含有信息最多, 相对最复杂, 所以 RDD 和 DataFrame 向其转换的时候都需要样例类, 而复杂的 DataSet 向简单的 RDD 和 DataFrame 转换时直接调用方法即可
case class xxx()
df.as[xxx]
package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommScala, CommSparkSessionScala}
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/5
* @time : 5:35 下午
object DataFrameToDataSetScala {
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = CommSparkSessionScala.getSparkSession()
import spark.implicits._
* 数据源
* Michael, 29
* Andy, 30
* Justin, 19
val path = CommScala.fileDirPath + "people.txt";
val df = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => Person(x(0),x(1).trim.toLong)).toDF()
df.show()
// 将DF转换成DS
val personDS = df.as[Person]
personDS.show()
personDS.printSchema()
// 将DS装换成DF
val personDF = personDS.toDF()
personDF.show()
personDF.printSchema()
* +-------+---+
* | name|age|
* +-------+---+
* |Michael| 29|
* | Andy| 30|
* | Justin| 19|
* +-------+---+
* root
* |-- name: string (nullable = true)
* |-- age: long (nullable = false)
(2)DataSet转DataFrame
Spark RDD 机制理解吗?RDD 的五大属性,RDD、DataFrame、DataSet 三者的关系,RDD 和 DataFrame 的区别,Spark 有哪些分区器【重要】
旧版本spark不能直接读取csv转为df,没有spark.read.option(“header”, “true”).csv这么简单的方法直接将第一行作为df的列名,只能现将数据读取为rdd,然后通过map和todf方法转为df,如果csv(txt)的列数很多的话用如(1,2,…,n),即创建元组很麻烦,本文解决如何用旧版spark读取多列txt文件转为df