结构化API
          结构化
          
           API
          
          是处理各种数据的工具,从非结构化
          
           log
          
          文件到半结构化
          
           CSV
          
          文件和高度结构化的
          
           Parquet
          
          文件,
          
           spark
          
          中三种主要的三类结构化
          
           API
          
          为:
         
           Datasets
          
         
           DataFrames
          
         
           SQL tables and views
          
         App dag stage task
DataFrame Dataset
          
           DataFrame、 Dataset
          
          是(分布式)1-。4类表集合具有定义良好的行和列
          
          
           DataFrame
          
          无类型(runtime check)
          
          
           Dataset
          
          有类型 (compile time)
         
          对于
          
           Spark
          
          来说,
          
           DataFrame
          
          是类型为
          
           Row
          
          的
          
           Dataset
          
          
         
           immutable
          
         
           lazily evaluated plans
          
         .Scala type reference
Schemas
          模式定义了一个
          
           DataFrame
          
          的列名和类型。
          
          可以手动定义
          
           schema
          
          或者
          
           schema on read
          
          
          
           Spark
          
          类型直接映射到Spark维护的不同语言
          
           api
          
          ,在
          
           Scala、Java、Python、SQL
          
          和
          
           R
          
          中,每种
          
           api
          
          都有一个查询表,简单的说最终代码 使用纯
          
           spark
          
          执行( Spark’s internal Catalyst representation)
         
结构化```API的执行流程
           DataFrame/Dataset/SQL
          
          代码.
         
           Logical Plan
          
          .
         
          Logical Plan
         
         转为
         
          Physical Plan
         
         4.
          Spark
         
         在集群上执行
         
          Physical Plan
         
         (
         
          RDD
         
         操作)
         
          
           catalogy
          
          是一个包含所有
          
           table
          
          和
          
           DataFrame
          
          信息的仓库,用于check代码是否有问题 (eg:
          
           table column
          
          不存在)
          
          
           check
          
          通过的plan 通过
          
           Catalyst Optimizer
          
          优化
          
          用户可以扩展
          
           Catalyst
          
          自定义优化规则
         
Physical Planning
org.apache.spark.sql.types.StructType = ...
StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))
spark 可以根据文件的前几行推断出schema(schema on read)
schema 是 StructType 实例
StructType  由一个StructFields 构成
Boolean代表这个列是否可以为空
手动指定Schema
// in Scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(StructField("DEST_COUNTRY_NAME", StringType,true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false,
  Metadata.fromJson("{\"hello\":\"world\"}"))
val df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")
Columns
对spark来说, 列是一种逻辑结构,它仅表示通过表达式按每个记录计算的值. 这意味着要为列赋实值,我们需要有一行;为了得到一行,我们需要一个DataFrame
构造和引用列的两种最基本的方式:
col 或column 方法
Column作为表达式
expr函数实际上可以解析字符串中的转换和列引用,然后可以将它们传递到进一步的转换中,下面三者等价:
col("someCol") - 5
expr("someCol - 5")
expr("someCol") - 5
Columns 只是表达式
这些列和这些列的转换编译成与解析表达式相同的逻辑计划
这意味着您可以将表达式编写为DataFrame代码或SQL表达式,并获得完全相同的性能特征
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)
Records and Rows
spark中一行使用Row对象表示,Spark使用列表达式操作行对象,以生成可用的值,行对象在内部表示字节数组。字节数组接口永远不会显示给用户,因为我们只使用列表达式来操作它们
df.show(2)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
select and selectExpr
df.select("DEST_COUNTRY_NAME").show(2)
-- in SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2
# select muti columns
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
# select function favor
df.select(expr("DEST_COUNTRY_NAME").alias("dest")).show(2)
# select sql favor
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
# selectExpr
df.selectExpr("DEST_COUNTRY_NAME as dest", "ORIGIN_COUNTRY_NAME").show(2)
# selectExpr  opens up the true power of Spark
df.selectExpr("*",
              "(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(1)
df.select(expr("*"), lit(1).alias("One")).show(2)
df.withColumn("numberOne", lit(1)).show(2)
-- in SQL
SELECT *, 1 as numberOne FROM dfTable LIMIT 2
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
.show(2)
//rename
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
保留字符和关键字
withColumn不用特殊处理
selectExpr需要加上`
dfWithLongColumnName = df.withColumn("this is a long column", expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColumnName.selectExpr("`this is a long column` as `long column`").show(2)
大小写敏感性
默认spark大小写不敏感,可以通过配置开启大小写敏感
set spark.sql.caseSensitive true
df.drop("ORIGIN_COUNTRY_NAME").columns
改变列的类型cast
df.withColumn("count2", col("count").cast("long"))
-- in SQL
SELECT *, cast(count as long) AS count2 FROM dfTabl
where和filter有一样的过滤功能
df.filter(col("count") < 2).sho
df.where("count < 2").show(2)
spark自动在同一时间执行所有过滤操作,而不管过滤器的顺序如何。这意味着,如果您想指定多个过滤器,只需按顺序将它们链接起来,其余的由Spark处理
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)
Unique
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
-- in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False
Union
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()
sort和orderBy  有相同的排序功能
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
-- in SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
一个高级技巧是使用asc_nulls_first、desc_nulls_first、asc_nulls_last或desc_nulls_last来指定希望DataFrame空值按顺序出现在哪里
出于优化目的,有时建议先对每个分区排序然后执行之后的transformations
spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
.sortWithinPartitions("count")
Limit
df.limit(5).show()
-- in SQL
SELECT * FROM dfTable LIMIT 6
df.orderBy(expr("count desc")).limit(6).show()
-- in SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 6
重新分区和合并
Spark在driver程序中维护集群的状态。有时您需要向driver程序收集一些数据,以便在本地机器上对其进行操作
到目前为止,我们还没有明确定义这个操作。然而,我们使用了几种不同的方法来实现这一点,它们实际上都是一样的
collect从整个DataFrame获取所有数据,take获取前N行,show以表格样式打印
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count


