Spark 结构化API DataFrames

结构化API

结构化 API 是处理各种数据的工具,从非结构化 log 文件到半结构化 CSV 文件和高度结构化的 Parquet 文件, spark 中三种主要的三类结构化 API 为:

  • Datasets
  • DataFrames
  • SQL tables and views
  • App dag stage task

    DataFrame Dataset

    DataFrame、 Dataset 是(分布式)1-。4类表集合具有定义良好的行和列
    DataFrame 无类型(runtime check)
    Dataset 有类型 (compile time)

    对于 Spark 来说, DataFrame 是类型为 Row Dataset

  • immutable
  • lazily evaluated plans
  • .Scala type reference

    Schemas

    模式定义了一个 DataFrame 的列名和类型。
    可以手动定义 schema 或者 schema on read
    Spark 类型直接映射到Spark维护的不同语言 api ,在 Scala、Java、Python、SQL R 中,每种 api 都有一个查询表,简单的说最终代码 使用纯 spark 执行( Spark’s internal Catalyst representation)

    结构化```API的执行流程

  • 编写 DataFrame/Dataset/SQL 代码.
  • 如果代码是正确的spark将其转化为 Logical Plan .
  • Logical Plan 转为 Physical Plan
    4. Spark 在集群上执行 Physical Plan ( RDD 操作)

    catalogy 是一个包含所有 table DataFrame 信息的仓库,用于check代码是否有问题 (eg: table column 不存在)
    check 通过的plan 通过 Catalyst Optimizer 优化
    用户可以扩展 Catalyst 自定义优化规则

    Physical Planning

    org.apache.spark.sql.types.StructType = ...
    StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
    StructField(ORIGIN_COUNTRY_NAME,StringType,true),
    StructField(count,LongType,true))
    spark 可以根据文件的前几行推断出schema(schema on read)
    schemaStructType 实例
    StructType  由一个StructFields 构成
    Boolean代表这个列是否可以为空
    

    手动指定Schema

    // in Scala
    import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
    import org.apache.spark.sql.types.Metadata
    val myManualSchema = StructType(Array(StructField("DEST_COUNTRY_NAME", StringType,true),
      StructField("ORIGIN_COUNTRY_NAME", StringType, true),
      StructField("count", LongType, false,
      Metadata.fromJson("{\"hello\":\"world\"}"))
    val df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")
    

    Columns

    spark来说, 列是一种逻辑结构,它仅表示通过表达式按每个记录计算的值. 这意味着要为列赋实值,我们需要有一行;为了得到一行,我们需要一个DataFrame

    构造和引用列的两种最基本的方式:
    colcolumn 方法

    Column作为表达式

    expr函数实际上可以解析字符串中的转换和列引用,然后可以将它们传递到进一步的转换中,下面三者等价:

    col("someCol") - 5
    expr("someCol - 5")
    expr("someCol") - 5
    

    Columns 只是表达式
    这些列和这些列的转换编译成与解析表达式相同的逻辑计划

    这意味着您可以将表达式编写为DataFrame代码或SQL表达式,并获得完全相同的性能特征

    from pyspark.sql.functions import expr, col, column
    df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
    .show(2)
    

    Records and Rows

    spark中一行使用Row对象表示,Spark使用列表达式操作行对象,以生成可用的值,行对象在内部表示字节数组。字节数组接口永远不会显示给用户,因为我们只使用列表达式来操作它们

    df.show(2)
    +-----------------+-------------------+-----+
    |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
    +-----------------+-------------------+-----+
    |    United States|            Romania|   15|
    |    United States|            Croatia|    1|
    +-----------------+-------------------+-----+
    

    select and selectExpr

    df.select("DEST_COUNTRY_NAME").show(2)
    -- in SQL
    SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2
    # select muti columns
    df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
    # select function favor
    df.select(expr("DEST_COUNTRY_NAME").alias("dest")).show(2)
    # select sql favor
    df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
    # selectExpr
    df.selectExpr("DEST_COUNTRY_NAME as dest", "ORIGIN_COUNTRY_NAME").show(2)
    # selectExpr  opens up the true power of Spark
    df.selectExpr("*",
                  "(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
    df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(1)
    
    df.select(expr("*"), lit(1).alias("One")).show(2)
    
    df.withColumn("numberOne", lit(1)).show(2)
    -- in SQL
    SELECT *, 1 as numberOne FROM dfTable LIMIT 2
    df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
    .show(2)
    //rename
    df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
    
    df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
    

    保留字符和关键字

    withColumn不用特殊处理
    selectExpr需要加上`

    dfWithLongColumnName = df.withColumn("this is a long column", expr("ORIGIN_COUNTRY_NAME"))
    dfWithLongColumnName.selectExpr("`this is a long column` as `long column`").show(2)
    

    大小写敏感性

    默认spark大小写不敏感,可以通过配置开启大小写敏感

    set spark.sql.caseSensitive true
    
    df.drop("ORIGIN_COUNTRY_NAME").columns
    

    改变列的类型cast

    df.withColumn("count2", col("count").cast("long"))
    -- in SQL
    SELECT *, cast(count as long) AS count2 FROM dfTabl
    

    wherefilter有一样的过滤功能

    df.filter(col("count") < 2).sho
    df.where("count < 2").show(2)
    

    spark自动在同一时间执行所有过滤操作,而不管过滤器的顺序如何。这意味着,如果您想指定多个过滤器,只需按顺序将它们链接起来,其余的由Spark处理

    df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
    .show(2)
    

    Unique

    df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
    -- in SQL
    SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
    
    dataFrames = df.randomSplit([0.25, 0.75], seed)
    dataFrames[0].count() > dataFrames[1].count() # False
    

    Union

    from pyspark.sql import Row
    schema = df.schema
    newRows = [
    Row("New Country", "Other Country", 5L),
    Row("New Country 2", "Other Country 3", 1L)
    parallelizedRows = spark.sparkContext.parallelize(newRows)
    newDF = spark.createDataFrame(parallelizedRows, schema)
    df.union(newDF)\
    .where("count = 1")\
    .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
    .show()
    

    sortorderBy 有相同的排序功能

    df.sort("count").show(5)
    df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
    df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
    from pyspark.sql.functions import desc, asc
    df.orderBy(expr("count desc")).show(2)
    df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
    -- in SQL
    SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
    

    一个高级技巧是使用asc_nulls_firstdesc_nulls_first、asc_nulls_lastdesc_nulls_last来指定希望DataFrame空值按顺序出现在哪里

    出于优化目的,有时建议先对每个分区排序然后执行之后的transformations

    spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
    .sortWithinPartitions("count")
    

    Limit

    df.limit(5).show()
    -- in SQL
    SELECT * FROM dfTable LIMIT 6
    df.orderBy(expr("count desc")).limit(6).show()
    -- in SQL
    SELECT * FROM dfTable ORDER BY count desc LIMIT 6
    

    重新分区和合并

    Sparkdriver程序中维护集群的状态。有时您需要向driver程序收集一些数据,以便在本地机器上对其进行操作
    到目前为止,我们还没有明确定义这个操作。然而,我们使用了几种不同的方法来实现这一点,它们实际上都是一样的
    collect从整个DataFrame获取所有数据,take获取前N行,show以表格样式打印

    collectDF = df.limit(10)
    collectDF.take(5) # take works with an Integer count