添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Build In Functions

Build In Functions

前言

需要处理的数据结构往往是复杂的,在 Spark 中该如何操作 Map,Array,struct 这些结构呢?Spark 已经为我们提供了很多内置函数来处理这一切。这些函数大多定义在 org.apache.saprk.sql.functions。

日期处理

获取当前 date,timestamp

current_date,current_timestamp,包括下面说的处理函数可以直接在 sql 语句中使用,spark.sql("Your SQL")

scala> spark.emptyDataFrame.withColumn("current_date", current_date).schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(current_date,DateType,false))
scala> spark.range(1).select(current_date).show()
+--------------+
|current_date()|
+--------------+
|    2020-03-17|
+--------------+
scala> spark.range(1).select(current_timestamp).show()
+--------------------+
| current_timestamp()|
+--------------------+
|2020-03-17 10:37:...|
+--------------------+
scala> spark.range(1).select(current_timestamp).show(false)
+-----------------------+
|current_timestamp()    |
+-----------------------+
|2020-03-17 10:39:53.896|
+-----------------------+

column 转换到 date,timestamp,时间戳

to_date,to_timestamp 接受 date,timestamp ,string 类型的参数。如果参数为 string,指定可选的 format,转换失败不会报错,会返回 null。默认的 format 是 yyyy-MM-dd HH:mm:ss。

unix_timestamp 返回当前时间戳或者根据指定字段生成时间戳,这个经常会用到。

from_unixtime 转换时间戳到 timestamp

scala> spark.range(1).select(current_date).withColumn("to_timestamp", to_timestamp($"current_date()", "yyyy-MM-dd HH:mm:ss")).show()
+--------------+-------------------+
|current_date()|       to_timestamp|
+--------------+-------------------+
|    2020-03-17|2020-03-17 00:00:00|
+--------------+-------------------+
scala> spark.range(1).select(current_timestamp).withColumn("to_date", to_date($"current_timestamp()")).show()
+--------------------+----------+
| current_timestamp()|   to_date|
+--------------------+----------+
|2020-03-17 10:43:...|2020-03-17|
+--------------------+----------+
scala> Seq("2020-03-17 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show()
+-------------------+--------------+
|               time|unix_timestamp|
+-------------------+--------------+
|2020-03-17 00:00:00|    1584374400|
+-------------------+--------------+
scala> spark.range(1).select(unix_timestamp).show()
+--------------------------------------------------------+
|unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss)|
+--------------------------------------------------------+
|                                              1584413996|
+--------------------------------------------------------+
scala> spark.sql("select current_date as current_date").show()
+------------+
|current_date|
+------------+
|  2020-03-17|
+------------+
scala> Seq(1582720143).toDF("time").withColumn("from_unixtime", from_unixtime($"time")).show()
+----------+-------------------+
|      time|      from_unixtime|
+----------+-------------------+
|1582720143|2020-02-26 20:29:03|
+----------+-------------------+

格式化日期

date_format 用来格式化日期

scala> Seq("2020-03-17 00:00:00").toDF("time").withColumn("date_foramt", date_format($"time", "yyyy/MM/dd hh:mm:ss")).show()
+-------------------+-------------------+
|               time|        date_foramt|
+-------------------+-------------------+
|2020-03-17 00:00:00|2020/03/17 12:00:00|
+-------------------+-------------------+

复杂数据结构处理

就跟我们平常使用这些数据结构一样,取元素,追加元素,求长度,根据里面的数据做处理,修改数组,记录状态等。 org.apache.spark.sql.functions 中 @group 为 collection_funcs 的函数就是用来实现这些功能的。下面只介绍下经常遇到和使用的函数,更多的可以去看源码。

处理 Array

Method Description
array_contains 某个值是否在 array 中
array_join 类似于 Python 中的 “,”.join。指定分隔符和可选的空值替换符连接 array 中的值
element_at 按索引取值,索引从 1 开始。
explode 平铺数组中内容到多行
reverse 反转数组
array_zip |合并数组到结构体,array1 array2 → array

这里 explode 比较重要,像一个用户的多个事件被存在了数组里面,就可以使用 explode 提取出来,变成一行一个事件处理。explode 变多行,那么我想把数组里面的值依次取出来变成多列呢?

这是一个问题,你可以去了解下 transform,想想如何实现。

再有一些比较复杂的操作,比如我们想通过数组前面的均值来计算填充 null 值,这时候你就需要用 UDF 自己写处理逻辑来实现了。

scala> val array = Seq("1 2 3 4 5").toDF("id_string").withColumn("id_array", split($"id_string", " "))
array: org.apache.spark.sql.DataFrame = [id_string: string, id_array: array<string>]
scala> array.show()
+---------+---------------+
|id_string|       id_array|
+---------+---------------+
|1 2 3 4 5|[1, 2, 3, 4, 5]|
+---------+---------------+
scala> array.withColumn("head", element_at($"id_array", 1)).show()
+---------+---------------+----+
|id_string|       id_array|head|
+---------+---------------+----+
|1 2 3 4 5|[1, 2, 3, 4, 5]|   1|
+---------+---------------+----+
scala> array.withColumn("array_contains", array_contains($"id_array", "1")).show()
+---------+---------------+--------------+
|id_string|       id_array|array_contains|
+---------+---------------+--------------+
|1 2 3 4 5|[1, 2, 3, 4, 5]|          true|
+---------+---------------+--------------+
scala> array.withColumn("array_join", array_join($"id_array", ",")).show()
+---------+---------------+----------+
|id_string|       id_array|array_join|
+---------+---------------+----------+
|1 2 3 4 5|[1, 2, 3, 4, 5]| 1,2,3,4,5|
+---------+---------------+----------+
scala> array.select(explode($"id_array")).show()
+---+
|col|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
scala> val df = spark.sql("select arrays_zip(array(1,2,3),array('4','5')) as array_zip")
df: org.apache.spark.sql.DataFrame = [array_zip: array<struct<0:int,1:string>>]
scala> df.show(false)
+----------------------+
|array_zip             |
+----------------------+
|[[1, 4], [2, 5], [3,]]|
+----------------------+

处理 Map

Method Description
map_keys 返回 map 的 key
map_values 返回 map 的 value
map_from_arrays 接收两个数组,第一个数组为 key 数组,第二个数组为 value 数组,key 数组不允许存在空值。
getField / getItem 根据 key 取值

不同于结构体,map 里面的数据类型要求是一致的。

scala> val map = Seq(Map(1 -> "1"), Map(2 -> "2")).toDF("map")
map: org.apache.spark.sql.DataFrame = [map: map<int,string>]
scala> map.show()
+--------+
|     map|
+--------+
|[1 -> 1]|
|[2 -> 2]|
+--------+
scala> map.withColumn("getField", $"map".getItem(1)).show()
+--------+--------+
|     map|getField|
+--------+--------+
|[1 -> 1]|       1|
|[2 -> 2]|    null|
+--------+--------+

处理 struct

类似于 C++ 中的结构体。在 Scala 中可以用 case class 实现。取出数据可以用 getField。如果你想构造自己的结构体,可以直接使用 struct 函数,接收多列组合成一列,组合列的类型是结构体。如果你想展开结构体,可以直接在 select 中使用 * 通配符。

scala> val simple_struct = df.select(explode($"array_zip"))
simple_struct: org.apache.spark.sql.DataFrame = [col: struct<0: int, 1: string>]
scala> simple_struct.select("col.*").show()
+---+----+
|  0|   1|
+---+----+
|  1|   4|
|  2|   5|
|  3|null|
+---+----+
scala> spark.sql("select struct(1, 2, 3, 4,  5)").show()
+---------------------------------------------------------+
|named_struct(col1, 1, col2, 2, col3, 3, col4, 4, col5, 5)|
+---------------------------------------------------------+
|                                          [1, 2, 3, 4, 5]|
+---------------------------------------------------------+

处理 Json

Spark 在读取 json 数据格式的时候会自动解析出各列。有时候你可能需要在数据中生成 json 字符串。这里会用到 to_json。

聚合计算

聚合计算主要是 sum,min,max,avg,countDistinct 等等,常和 groupBy,agg 等结合使用。在后面的 Aggregate 中详细讲

其他

这里的其他函数指的是 org.apache.spark.sql.functions 中 @group 为 normal_funcs 的函数。介绍一下常用的。

Method Description
lit 生成一个 Column,该 Column 的值是不变的
typedLit 生成一个 Column,该 Column 的值是不变的,和上面的区别是支持 Scala 中的 Seq,Map,List 类型
struct 生成结构体
when 类似于 sql 中的 when,和 otherwise 结合使用

tip:对于 lit,我们在指定 track 的类型。对于 typedLit,我们可以用于指定生成一个空的 properties。对与 struct,我们用来选取指定字段构造 properties。这里直接拿一段 Spark SQL 的代码举例。

pvInfoDF
      .select(cols.map(col(_)): _*)
      .filter(adjust_valid)
      .filter(delimiter_valid)
      .filter(!col("$ip").contains(","))
      .filter($"channel_id" isin (channel: _*))
      .na.replace(cols, Map("-" -> ""))
      .withColumn("original_id", UDF.getUUID($"original_id"))
      .withColumn("distinct_id", when($"distinct_id".notEqual(""), $"distinct_id").otherwise($"original_id"))
      .withColumn("hour", UDF.intUDF($"hour"))
      .withColumn("type", lit("track"))
      .withColumn("time_free", lit(true))