添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
谦虚好学的火柴  ·  Spark ...·  6 天前    · 
大方的泡面  ·  FileResult In ASP.NET ...·  1 年前    · 
坚强的吐司  ·  Unable to get ...·  1 年前    · 
仗义的火车  ·  httprequest - Occur ...·  1 年前    · 

一、RDD的生成


  • 使用parallelize/makeRDD算子从集合转换而来,常用于测试
  • 使用类似textFile()这样的算子从文件系统读取数据形成RDD
  • 使用transformation算子转换而来


二、 DataFrame 的生成


  • 直接读取文件系统数据形成
val df = spark.read.format.load()
  • RDD转换而来
  • DataSet转换而来


三、DataSet的生成



四、RDD和DataFrame的转换


初始化SparkSession, Spark SQL的统一入口。

Java语言:

package com.kfk.spark.common;
import org.apache.spark.sql.SparkSession;
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:01 下午
public class CommSparkSession {
    public static SparkSession getSparkSession(){
        SparkSession spark = SparkSession.builder()
                .appName("CommSparkSession")
                .master("local")
                .config("spark.sql.warehouse.dir","/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate();
        return spark;

Scala语言:

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark

(1)RDD转DataFrame

方案一:直接将字段名称传入toDF中

数据源

Michael, 29
Andy, 30
Justin, 19

使用toDF的方式需要导入隐式转换

import spark.implicits._
import spark.implicits._
val path = CommScala.fileDirPath + "people.txt";
val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            (x(0),x(1).trim)
        }).toDF("name", "age")
personDf1.show()
* +-------+---+
* |   name|age|
* +-------+---+
* |Michael| 29|
* |   Andy| 30|
* | Justin| 19|
* +-------+---+

方案二:通过 反射 的方式

Java语言实现

Spark SQL支持将JavaBean的RDD自动转换 为DataFrame。

package com.kfk.spark.sql;
import java.io.Serializable;
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:15 下午
public class Person implements Serializable {
    private String name;
    private long age;
    public Person(String name, long age) {
        this.name = name;
        this.age = age;
    public Person() {
    public String getName() {
        return name;
    public void setName(String name) {
        this.name = name;
    public long getAge() {
        return age;
    public void setAge(long age) {
        this.age = age;
package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:07 下午
public class RDDToDFReflectionJava {
    public static void main(String[] args) {
        SparkSession spark = CommSparkSession.getSparkSession();
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        String filePath = Comm.fileDirPath + "people.txt";
        // 将文件转换成rdd
        JavaRDD<Person> personrdd = spark.read().textFile(filePath).javaRDD().map(line -> {
            String name = line.split(",")[0];
            long age = Long.parseLong(line.split(",")[1].trim());
            Person person = new Person(name,age);
            return person;
        // 通过反射将rdd转换成DataFrame
        Dataset<Row> personDf = spark.createDataFrame(personrdd,Person.class);
        personDf.show();
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
        // 创建临时表
        personDf.createOrReplaceTempView("person");
        Dataset<Row> resultDf = spark.sql("select * from person a where a.age > 20");
        resultDf.show();
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+

Scala语言实现

Scala中借助case class使用反射

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 6:32 下午
object RDDToDFReflectionScala {
    case class Person(name : String, age : Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession();
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        val path = CommScala.fileDirPath + "people.txt";
        // 通过case class反射将rdd转换成DataFrame
        import spark.implicits._
        val personDf = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toLong)
        }).toDF()
        personDf.show()
        // 直接将字段名称传入toDF中转换成DataFrame
        val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            (x(0),x(1).trim)
        }).toDF("name", "age")
        personDf1.show()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
        // 创建临时表
        personDf.createOrReplaceTempView("person")
        val resultDf = spark.sql("select * from person a where a.age > 20")
        resultDf.show()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+

方案三:构造 Schema 的方式

Java语言实现

package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 9:32 下午
public class RDDToDFProgramJava {
    public static void main(String[] args) {
        SparkSession spark = CommSparkSession.getSparkSession();
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        String filePath = Comm.fileDirPath + "people.txt";
        List<StructField> fields = new ArrayList<StructField>();
        StructField structField_Name = DataTypes.createStructField("name",DataTypes.StringType,true);
        StructField structField_Age = DataTypes.createStructField("age",DataTypes.StringType,true);
        fields.add(structField_Name);
        fields.add(structField_Age);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 将rdd(people)转换成RDD[Row]
        JavaRDD<Row> personRdd = spark.read().textFile(filePath).javaRDD().map(line ->{
            String[] attributes = line.split(",");
            return RowFactory.create(attributes[0], attributes[1].trim());
        // 通过rdd和scheme创建DataFrame
        Dataset<Row> personDataFrame = spark.createDataFrame(personRdd,scheme);
        personDataFrame.show();
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
        // 创建临时表
        personDataFrame.createOrReplaceTempView("person");
        Dataset<Row> resultDataFrame = spark.sql("select * from person a where a.age > 20");
        resultDataFrame.show();
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+

Scala语言实现

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 10:05 下午
object RDDToDFProgramScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession();
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        val path = CommScala.fileDirPath + "people.txt";
        // 构造Schema
        val scheme = StructType(Array(
            StructField("name",DataTypes.StringType,true),
            StructField("age",DataTypes.LongType,true)
        // 将rdd(people)转换成RDD[Row]
        val rdd = spark.sparkContext.textFile(path).map(line => {
            line.split(",")
        }).map(x => {
            Row(x(0),x(1).trim.toLong)
        // 通过rdd和scheme创建DataFrame
        val personDataFrame = spark.createDataFrame(rdd,scheme)
        personDataFrame.show()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
        // 创建临时表
        personDataFrame.createOrReplaceTempView("person")
        val resultDataFrame = spark.sql("select * from person a where a.age > 20")
        resultDataFrame.show()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
        for (elem <- resultDataFrame.collect()) {
            System.out.println(elem)
         * [Michael,29]
         * [Andy,30]

(2)DataFrame转RDD

通过 df.rdd 或者 df.javaRDD() 将DataFrame转RDD

Java语言实现

// 将DataFrame转换成rdd
JavaRDD<Person> resultRdd = resultDf.javaRDD().map(line -> {
    Person person = new Person();
    person.setName(line.getAs("name"));
    person.setAge(line.getAs("age"));
    return person;
for (Person person : resultRdd.collect()){
    System.out.println(person.getName() + " : " + person.getAge());
 * Michael : 29
 * Andy : 30

Scala语言实现

// 将DataFrame转换成rdd
val resultRdd = resultDf.rdd.map(row => {
    val name = row.getAs[String]("name")
    val age = row.getAs[Long]("age")
    Person(name,age)
for (elem <- resultRdd.collect()) {
    System.out.println(elem.name + " : " + elem.age)
 * Michael : 29
 * Andy : 30


五、RDD和DataSet的转换


(1)RDD转DataSet

方案一:使用toDS() 算子

使用toDS()算子,需要导入隐式转换,schema参考RDD转DataFrame的方法(schema信息不可以通过toDS直接传入)

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        val path = CommScala.fileDirPath + "people.txt";
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        // 方案一:通过toDS()将RDD转换成DataSet
        val dataDS = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toInt)
        }).toDS()
        dataDS.show()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+

方案二:使用spark.createDataset(rdd)

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        val path = CommScala.fileDirPath + "people.txt";
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        // 方案二:使用spark.createDataset(rdd)将RDD转换成DataSet
        val rdd = spark.sparkContext.textFile(path).map(line => {
            line.split(",")
        }).map(x => {
            Person(x(0),x(1).trim.toInt)
        val dataDS1 = spark.createDataset(rdd)
        dataDS1.show()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+

(2)DataSet转RDD

ds.rdd


// 将DataSet转RDD
val rdd1 = dataDS1.rdd
for (elem <- rdd1.collect()) {
   System.out.println(elem.name + " : " + elem.age)
 * Michael : 29
 * Andy : 30
 * Justin : 19


六、DataFrame与DataSet的转换


(1)DataFrame转DataSet

DataSet 含有信息最多, 相对最复杂, 所以 RDD 和 DataFrame 向其转换的时候都需要样例类, 而复杂的 DataSet 向简单的 RDD 和 DataFrame 转换时直接调用方法即可

case class xxx()
df.as[xxx]
package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommScala, CommSparkSessionScala}
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 5:35 下午
object DataFrameToDataSetScala {
    case class Person(name:String,age:Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
        val path = CommScala.fileDirPath + "people.txt";
        val df = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => Person(x(0),x(1).trim.toLong)).toDF()
        df.show()
        // 将DF转换成DS
        val personDS = df.as[Person]
        personDS.show()
        personDS.printSchema()
        // 将DS装换成DF
        val personDF = personDS.toDF()
        personDF.show()
        personDF.printSchema()
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         * root
         *  |-- name: string (nullable = true)
         *  |-- age: long (nullable = false)

(2)DataSet转DataFrame

ds.toDF()


Spark RDD 机制理解吗?RDD 的五大属性,RDD、DataFrame、DataSet 三者的关系,RDD 和 DataFrame 的区别,Spark 有哪些分区器【重要】
Spark RDD 机制理解吗?RDD 的五大属性,RDD、DataFrame、DataSet 三者的关系,RDD 和 DataFrame 的区别,Spark 有哪些分区器【重要】
旧版Spark(1.6版本) 将RDD动态转为DataFrame
旧版本spark不能直接读取csv转为df,没有spark.read.option(“header”, “true”).csv这么简单的方法直接将第一行作为df的列名,只能现将数据读取为rdd,然后通过map和todf方法转为df,如果csv(txt)的列数很多的话用如(1,2,…,n),即创建元组很麻烦,本文解决如何用旧版spark读取多列txt文件转为df