spark import org.apache.spark.sql.functions._ 示例用法源码权威资料
spark sql import org.apache.spark.sql.functions.示例用法源码权威资料
源自专栏《 SparkML:Spark ML、原理、床头书、调优、Graphx、pyspark、sparkSQL、yarn集群、源码解析等系列专栏目录 》【持续更新中,收藏关注楼主就不会错过更多优质spark资料】
object functions概览
/**
* DataFrame操作中常用的函数。使用这里定义的函数可以提供更多的编译时安全性,以确保函数存在。
* Spark还包括了一些不太常见的内置函数,这些函数在这里没有定义。您仍然可以使用`functions.expr()` API来访问它们(以及这里定义的所有函数),
* 并通过SQL表达式字符串来调用它们。您可以在SQL API文档中找到完整的函数列表。
* 例如,`isnan`是在这里定义的一个函数。您可以使用`isnan(col("myCol"))`来调用`isnan`函数。这样,编程语言的编译器就会确保`isnan`
* 存在并且具有正确的形式。您也可以使用`expr("isnan(myCol)")`函数来调用相同的函数。在这种情况下,当Spark分析查询时,它自己会确保`isnan`存在。
* `regr_count`是一个内置函数的示例,但在这里没有定义,因为它不常用。要调用它,请使用`expr("regr_count(yCol, xCol)")`。
* @groupname udf_funcs UDF函数
* @groupname agg_funcs 聚合函数
* @groupname datetime_funcs 日期时间函数
* @groupname sort_funcs 排序函数
* @groupname normal_funcs 非聚合函数
* @groupname math_funcs 数学函数
* @groupname misc_funcs 杂项函数
* @groupname window_funcs 窗口函数
* @groupname string_funcs 字符串函数
* @groupname collection_funcs 集合函数
* @groupname Ungrouped DataFrames的支持函数
* @since 1.3.0
@InterfaceStability.Stable
object functions {
private def withExpr(expr: Expression): Column = Column(expr)
private def withAggregateFunction(
func: AggregateFunction,
isDistinct: Boolean = false): Column = {
Column(func.toAggregateExpression(isDistinct))
}
排序函数
//////////////////////////////////////////////////////////////////////////////////////////////
// 排序函数
//////////////////////////////////////////////////////////////////////////////////////////////
* 返回基于列的升序排序表达式。
* {{{
* df.sort(asc("dept"), desc("age"))
* }}}
* @group sort_funcs
* @since 1.3.0
def asc(columnName: String): Column = Column(columnName).asc
* 返回基于列的升序排序表达式,空值会排在非空值之前。
* {{{
* df.sort(asc_nulls_first("dept"), desc("age"))
* }}}
* @group sort_funcs
* @since 2.1.0
def asc_nulls_first(columnName: String): Column = Column(columnName).asc_nulls_first
* 返回基于列的升序排序表达式,空值会排在非空值之后。
* {{{
* df.sort(asc_nulls_last("dept"), desc("age"))
* }}}
* @group sort_funcs
* @since 2.1.0
def asc_nulls_last(columnName: String): Column = Column(columnName).asc_nulls_last
* 返回基于列的降序排序表达式。
* {{{
* df.sort(asc("dept"), desc("age"))
* }}}
* @group sort_funcs
* @since 1.3.0
def desc(columnName: String): Column = Column(columnName).desc
* 返回基于列的降序排序表达式,空值会排在非空值之前。
* {{{
* df.sort(asc("dept"), desc_nulls_first("age"))
* }}}
* @group sort_funcs
* @since 2.1.0
def desc_nulls_first(columnName: String): Column = Column(columnName).desc_nulls_first
* 返回基于列的降序排序表达式,空值会排在非空值之后。
* {{{
* df.sort(asc("dept"), desc_nulls_last("age"))
* }}}
* @group sort_funcs
* @since 2.1.0
def desc_nulls_last(columnName: String): Column = Column(columnName).desc_nulls_last
agg_funcs 聚合函数
//////////////////////////////////////////////////////////////////////////////////////////////
// 聚合函数
//////////////////////////////////////////////////////////////////////////////////////////////
* @group agg_funcs
* @since 1.3.0
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct(e: Column): Column = approx_count_distinct(e)
* @group agg_funcs
* @since 1.3.0
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct(columnName: String): Column = approx_count_distinct(columnName)
* @group agg_funcs
* @since 1.3.0
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct(e: Column, rsd: Double): Column = approx_count_distinct(e, rsd)
* @group agg_funcs
* @since 1.3.0
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct(columnName: String, rsd: Double): Column = {
approx_count_distinct(Column(columnName), rsd)
* 聚合函数:返回组中不同项的近似数量。
* @group agg_funcs
* @since 2.1.0
def approx_count_distinct(e: Column): Column = withAggregateFunction {
HyperLogLogPlusPlus(e.expr)
* 聚合函数:返回组中不同项的近似数量。
* @group agg_funcs
* @since 2.1.0
def approx_count_distinct(columnName: String): Column = approx_count_distinct(column(columnName))
* 聚合函数:返回组中不同项的近似数量。
* @param rsd 允许的最大估计误差(默认值 = 0.05)
* @group agg_funcs
* @since 2.1.0
def approx_count_distinct(e: Column, rsd: Double): Column = withAggregateFunction {
HyperLogLogPlusPlus(e.expr, rsd, 0, 0)
* 聚合函数:返回组中不同项的近似数量。
* @param rsd 允许的最大估计误差(默认值 = 0.05)
* @group agg_funcs
* @since 2.1.0
def approx_count_distinct(columnName: String, rsd: Double): Column = {
approx_count_distinct(Column(columnName), rsd)
* 聚合函数:返回组中值的平均值。
* @group agg_funcs
* @since 1.3.0
def avg(e: Column): Column = withAggregateFunction { Average(e.expr) }
* 聚合函数:返回组中值的平均值。
* @group agg_funcs
* @since 1.3.0
def avg(columnName: String): Column = avg(Column(columnName))
* 聚合函数:返回一个带有重复值的对象列表。
* @note 此函数是非确定性的,因为收集结果的顺序取决于行的顺序,在洗牌后可能是非确定性的。
* @group agg_funcs
* @since 1.6.0
def collect_list(e: Column): Column = withAggregateFunction { CollectList(e.expr) }
* 聚合函数:返回一个带有重复值的对象列表。
* @note 此函数是非确定性的,因为收集结果的顺序取决于行的顺序,在洗牌后可能是非确定性的。
* @group agg_funcs
* @since 1.6.0
def collect_list(columnName: String): Column = collect_list(Column(columnName))
* 聚合函数:返回一个去除重复元素的对象集合。
* @note 此函数是非确定性的,因为收集结果的顺序取决于行的顺序,在洗牌后可能是非确定性的。
* @group agg_funcs
* @since 1.6.0
def collect_set(e: Column): Column = withAggregateFunction { CollectSet(e.expr) }
* 聚合函数:返回一个去除重复元素的对象集合。
* @note 此函数是非确定性的,因为收集结果的顺序取决于行的顺序,在洗牌后可能是非确定性的。
* @group agg_funcs
* @since 1.6.0
def collect_set(columnName: String): Column = collect_set(Column(columnName))
* 聚合函数:返回两列的皮尔逊相关系数。
* @group agg_funcs
* @since 1.6.0
def corr(column1: Column, column2: Column): Column = withAggregateFunction {
Corr(column1.expr, column2.expr)
* 聚合函数:返回两列的皮尔逊相关系数。
* @group agg_funcs
* @since 1.6.0
def corr(columnName1: String, columnName2: String): Column = {
corr(Column(columnName1), Column(columnName2))
* 聚合函数:返回组中项的数量。
* @group agg_funcs
* @since 1.3.0
def count(e: Column): Column = withAggregateFunction {
e.expr match {
// 将count(*)转换为count(1)
case s: Star => Count(Literal(1))
case _ => Count(e.expr)
* 聚合函数:返回分组中的项目数。
* @group agg_funcs
* @since 1.3.0
def count(columnName: String): TypedColumn[Any, Long] =
count(Column(columnName)).as(ExpressionEncoder[Long]())
* 聚合函数:返回分组中不同项目的数量。
* @group agg_funcs
* @since 1.3.0
@scala.annotation.varargs
def countDistinct(expr: Column, exprs: Column*): Column = {
withAggregateFunction(Count.apply((expr +: exprs).map(_.expr)), isDistinct = true)
* 聚合函数:返回分组中不同项目的数量。
* @group agg_funcs
* @since 1.3.0
@scala.annotation.varargs
def countDistinct(columnName: String, columnNames: String*): Column =
countDistinct(Column(columnName), columnNames.map(Column.apply) : _*)
* 聚合函数:返回两列的总体协方差。
* @group agg_funcs
* @since 2.0.0
def covar_pop(column1: Column, column2: Column): Column = withAggregateFunction {
CovPopulation(column1.expr, column2.expr)
* 聚合函数:返回两列的总体协方差。
* @group agg_funcs
* @since 2.0.0
def covar_pop(columnName1: String, columnName2: String): Column = {
covar_pop(Column(columnName1), Column(columnName2))
* 聚合函数:返回两列的样本协方差。
* @group agg_funcs
* @since 2.0.0
def covar_samp(column1: Column, column2: Column): Column = withAggregateFunction {
CovSample(column1.expr, column2.expr)
* 聚合函数:返回两列的样本协方差。
* @group agg_funcs
* @since 2.0.0
def covar_samp(columnName1: String, columnName2: String): Column = {
covar_samp(Column(columnName1), Column(columnName2))
* 聚合函数:返回分组中的第一个值。
* 默认情况下,该函数返回它看到的第一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的第一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 2.0.0
def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new First(e.expr, Literal(ignoreNulls))
* 聚合函数:返回分组中某列的第一个值。
* 默认情况下,该函数返回它看到的第一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的第一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 2.0.0
def first(columnName: String, ignoreNulls: Boolean): Column = {
first(Column(columnName), ignoreNulls)
* 聚合函数:返回分组中的第一个值。
* 默认情况下,该函数返回它看到的第一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的第一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 1.3.0
def first(e: Column): Column = first(e, ignoreNulls = false)
* 聚合函数:返回分组中某列的第一个值。
* 默认情况下,该函数返回它看到的第一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的第一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 1.3.0
def first(columnName: String): Column = first(Column(columnName))
* 聚合函数:指示在 GROUP BY 列表中的指定列是否已聚合,
* 返回结果集中的聚合为 1,未聚合为 0。
* @group agg_funcs
* @since 2.0.0
def grouping(e: Column): Column = Column(Grouping(e.expr))
* 聚合函数:指示在 GROUP BY 列表中的指定列是否已聚合,
* 返回结果集中的聚合为 1,未聚合为 0。
* @group agg_funcs
* @since 2.0.0
def grouping(columnName: String): Column = grouping(Column(columnName))
* 聚合函数:返回分组级别,等于
* (grouping(c1) <<; (n-1)) + (grouping(c2) <<; (n-2)) + ... + grouping(cn)
* 注意:列列表应与分组列完全匹配,或为空(表示所有分组列)。
* @group agg_funcs
* @since 2.0.0
def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr)))
* 聚合函数:返回分组级别,等于
* (grouping(c1) <<; (n-1)) + (grouping(c2) <<; (n-2)) + ... + grouping(cn)
* 注意:列列表应与分组列完全匹配。
* @group agg_funcs
* @since 2.0.0
def grouping_id(colName: String, colNames: String*): Column = {
grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*)
* 聚合函数:返回分组中值的峰度。
* @group agg_funcs
* @since 1.6.0
def kurtosis(e: Column): Column = withAggregateFunction { Kurtosis(e.expr) }
* 聚合函数:返回分组中值的峰度。
* @group agg_funcs
* @since 1.6.0
def kurtosis(columnName: String): Column = kurtosis(Column(columnName))
* 聚合函数:返回分组中的最后一个值。
* 默认情况下,该函数返回它看到的最后一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的最后一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 2.0.0
def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new Last(e.expr, Literal(ignoreNulls))
* 聚合函数:返回分组中某列的最后一个值。
* 默认情况下,该函数返回它看到的最后一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的最后一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 2.0.0
def last(columnName: String, ignoreNulls: Boolean): Column = {
last(Column(columnName), ignoreNulls)
* 聚合函数:返回分组中的最后一个值。
* 默认情况下,该函数返回它看到的最后一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的最后一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 1.3.0
def last(e: Column): Column = last(e, ignoreNulls = false)
* 聚合函数:返回分组中某列的最后一个值。
* 默认情况下,该函数返回它看到的最后一个值。当 ignoreNulls 设置为 true 时,它将返回它看到的最后一个非空值。
* 如果所有值都为 null,则返回 null。
* 注意:由于结果取决于行的顺序,所以该函数是不确定性的,在洗牌后可能是不确定性的。
* @group agg_funcs
* @since 1.3.0
def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
* 聚合函数:返回表达式在分组中的最大值。
* @group agg_funcs
* @since 1.3.0
def max(e: Column): Column = withAggregateFunction { Max(e.expr) }
* 聚合函数:返回某列在分组中的最大值。
* @group agg_funcs
* @since 1.3.0
def max(columnName: String): Column = max(Column(columnName))
* 聚合函数:返回分组中值的平均值。
* avg 的别名。
* @group agg_funcs
* @since 1.4.0
def mean(e: Column): Column = avg(e)
* 聚合函数:返回分组中某列的平均值。
* avg 的别名。
* @group agg_funcs
* @since 1.4.0
def mean(columnName: String): Column = avg(columnName)
* 聚合函数:返回表达式在分组中的最小值。
* @group agg_funcs
* @since 1.3.0
def min(e: Column): Column = withAggregateFunction { Min(e.expr) }
* 聚合函数:返回某列在分组中的最小值。
* @group agg_funcs
* @since 1.3.0
def min(columnName: String): Column = min(Column(columnName))
* 聚合函数:返回分组中值的偏度。
* @group agg_funcs
* @since 1.6.0
def skewness(e: Column): Column = withAggregateFunction { Skewness(e.expr) }
* 聚合函数:返回分组中值的偏度。
* @group agg_funcs
* @since 1.6.0
def skewness(columnName: String): Column = skewness(Column(columnName))
* 聚合函数:`stddev_samp` 的别名。
* @group agg_funcs
* @since 1.6.0
def stddev(e: Column): Column = withAggregateFunction { StddevSamp(e.expr) }
* 聚合函数:`stddev_samp` 的别名。
* @group agg_funcs
* @since 1.6.0
def stddev(columnName: String): Column = stddev(Column(columnName))
* 聚合函数:返回表达式在分组中的样本标准差。
* @group agg_funcs
* @since 1.6.0
def stddev_samp(e: Column): Column = withAggregateFunction { StddevSamp(e.expr) }
* 聚合函数:返回表达式在分组中的样本标准差。
* @group agg_funcs
* @since 1.6.0
def stddev_samp(columnName: String): Column = stddev_samp(Column(columnName))
* 聚合函数:返回表达式在分组中的总体标准差。
* @group agg_funcs
* @since 1.6.0
def stddev_pop(e: Column): Column = withAggregateFunction { StddevPop(e.expr) }
* 聚合函数:返回表达式在分组中的总体标准差。
* @group agg_funcs
* @since 1.6.0
def stddev_pop(columnName: String): Column = stddev_pop(Column(columnName))
* 聚合函数:返回表达式中所有值的总和。
* @group agg_funcs
* @since 1.3.0
def sum(e: Column): Column = withAggregateFunction { Sum(e.expr) }
* 聚合函数:返回某列中所有值的总和。
* @group agg_funcs
* @since 1.3.0
def sum(columnName: String): Column = sum(Column(columnName))
* 聚合函数:返回表达式中不同值的总和。
* @group agg_funcs
* @since 1.3.0
def sumDistinct(e: Column): Column = withAggregateFunction(Sum(e.expr), isDistinct = true)
* 聚合函数:返回某列中不同值的总和。
* @group agg_funcs
* @since 1.3.0
def sumDistinct(columnName: String): Column = sumDistinct(Column(columnName))
* 聚合函数:别名为 `var_samp`。
* @group agg_funcs
* @since 1.6.0
def variance(e: Column): Column = withAggregateFunction { VarianceSamp(e.expr) }
* 聚合函数:别名为 `var_samp`。
* @group agg_funcs
* @since 1.6.0
def variance(columnName: String): Column = variance(Column(columnName))
* 聚合函数:返回分组中值的无偏方差。
* @group agg_funcs
* @since 1.6.0
def var_samp(e: Column): Column = withAggregateFunction { VarianceSamp(e.expr) }
* 聚合函数:返回分组中值的无偏方差。
* @group agg_funcs
* @since 1.6.0
def var_samp(columnName: String): Column = var_samp(Column(columnName))
* 聚合函数:返回分组中值的总体方差。
* @group agg_funcs
* @since 1.6.0
def var_pop(e: Column): Column = withAggregateFunction { VariancePop(e.expr) }
* 聚合函数:返回分组中值的总体方差。
* @group agg_funcs
* @since 1.6.0
def var_pop(columnName: String): Column = var_pop(Column(columnName))
窗口函数
//////////////////////////////////////////////////////////////////////////////////////////////
// 窗口函数
//////////////////////////////////////////////////////////////////////////////////////////////
* 此函数在Spark 2.4中已弃用。有关更多信息,请参见SPARK-25842。
* @group window_funcs
* @since 2.3.0
@deprecated("Use Window.unboundedPreceding", "2.4.0")
def unboundedPreceding(): Column = Column(UnboundedPreceding)
* 此函数在Spark 2.4中已弃用。有关更多信息,请参见SPARK-25842。
* @group window_funcs
* @since 2.3.0
@deprecated("Use Window.unboundedFollowing", "2.4.0")
def unboundedFollowing(): Column = Column(UnboundedFollowing)
* 此函数在Spark 2.4中已弃用。有关更多信息,请参见SPARK-25842。
* @group window_funcs
* @since 2.3.0
@deprecated("Use Window.currentRow", "2.4.0")
def currentRow(): Column = Column(CurrentRow)
* 窗口函数:返回窗口分区内值的累计分布,即当前行之前(包括当前行)的行的比例。
* {{{
* N = 窗口分区中的总行数
* cumeDist(x) = 在 x 之前(包括 x)的值的数量 / N
* }}}
* @group window_funcs
* @since 1.6.0
def cume_dist(): Column = withExpr { new CumeDist }
* 窗口函数:返回窗口分区内行的等级,没有间隔。
* 排名和密集排名之间的区别是,当存在并列时,密集排名不会在排名序列中留下空白。
* 也就是说,如果使用密集排名对竞争进行排名,并且有三个人并列为第二名,那么所有三个人都将被认为是第二名,
* 下一个人将被认为是第三名。而排名将给出连续的数字,使得在并列后来的第三名(在并列后)将被标记为第五名。
* 这相当于SQL中的DENSE_RANK函数。
* @group window_funcs
* @since 1.6.0
def dense_rank(): Column = withExpr { new DenseRank }
* 窗口函数:返回当前行之前的 `offset` 行的值,如果当前行之前的行数少于 `offset` 行,则返回 `null`。
* 例如,`lag(a, b)` 将在窗口分区的任何给定点返回前一行。
* 这相当于SQL中的LAG函数。
* @group window_funcs
* @since 1.4.0
def lag(e: Column, offset: Int): Column = lag(e, offset, null)
* 窗口函数:返回当前行之前的 `offset` 行的值,如果当前行之前的行数少于 `offset` 行,则返回 `null`。
* 例如,`lag(a, b)` 将在窗口分区的任何给定点返回前一行。
* 这相当于SQL中的LAG函数。
* @group window_funcs
* @since 1.4.0
def lag(columnName: String, offset: Int): Column = lag(columnName, offset, null)
* 窗口函数:返回当前行之前的 `offset` 行的值,如果当前行之前的行数少于 `offset` 行,则返回 `defaultValue`。
* 例如,`lag(a, b, c)` 将在窗口分区的任何给定点返回前一行。
* 这相当于SQL中的LAG函数。
* @group window_funcs
* @since 1.4.0
def lag(columnName: String, offset: Int, defaultValue: Any): Column = {
lag(Column(columnName), offset, defaultValue)
* 窗口函数:返回当前行之前的 `offset` 行的值,如果当前行之前的行数少于 `offset` 行,则返回 `defaultValue`。
* 例如,`lag(a, b, c)` 将在窗口分区的任何给定点返回前一行。
* 这相当于SQL中的LAG函数。
* @group window_funcs
* @since 1.4.0
def lag(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
Lag(e.expr, Literal(offset), Literal(defaultValue))
* 窗口函数:返回当前行之后的 `offset` 行的值,如果当前行之后的行数少于 `offset` 行,则返回 `null`。
* 例如,`lead(a, b)` 将在窗口分区的任何给定点返回下一行。
* 这相当于SQL中的LEAD函数。
* @group window_funcs
* @since 1.4.0
def lead(columnName: String, offset: Int): Column = { lead(columnName, offset, null) }
* 窗口函数:返回当前行之后的 `offset` 行的值,如果当前行之后的行数少于 `offset` 行,则返回 `null`。
* 例如,`lead(a, b)` 将在窗口分区的任何给定点返回下一行。
* 这相当于SQL中的LEAD函数。
* @group window_funcs
* @since 1.4.0
def lead(e: Column, offset: Int): Column = { lead(e, offset, null) }
* 窗口函数:返回当前行之后的 `offset` 行的值,如果当前行之后的行数少于 `offset` 行,则返回 `defaultValue`。
* 例如,`lead(a, b, c)` 将在窗口分区的任何给定点返回下一行。
* 这相当于SQL中的LEAD函数。
* @group window_funcs
* @since 1.4.0
def lead(columnName: String, offset: Int, defaultValue: Any): Column = {
lead(Column(columnName), offset, defaultValue)
* 窗口函数:返回当前行之后的 `offset` 行的值,如果当前行之后的行数少于 `offset` 行,则返回 `defaultValue`。
* 例如,`lead(a, b, c)` 将在窗口分区的任何给定点返回下一行。
* 这相当于SQL中的LEAD函数。
* @group window_funcs
* @since 1.4.0
def lead(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
Lead(e.expr, Literal(offset), Literal(defaultValue))
* 窗口函数:返回一个NTILE组ID(从1到 `n`)在有序窗口分区内。
* 例如,如果 `n` 是4,第一四分位数将得到值1,第二四分位数将得到值2,第三四分位数将得到值3,
* 最后四分位数将得到值4。
* 这相当于SQL中的NTILE函数。
* @group window_funcs
* @since 1.4.0
def ntile(n: Int): Column = withExpr { new NTile(Literal(n)) }
* 窗口函数:返回行在窗口分区内的相对排名(即百分位数)。
* 计算方法为:
* {{{
* (窗口分区中的行的排名 - 1)/ (窗口分区中的行数 - 1)
* }}}
* 这相当于SQL中的PERCENT_RANK函数。
* @group window_funcs
* @since 1.6.0
def percent_rank(): Column = withExpr { new PercentRank }
* 窗口函数:返回行在窗口分区内的排名。
* 排名和密集排名之间的区别是,当存在并列时,密集排名不会在排名序列中留下空白。
* 也就是说,如果使用密集排名对竞争进行排名,并且有三个人并列为第二名,那么所有三个人都将被认为是第二名,
* 下一个人将被认为是第三名。而排名将给出连续的数字,使得在并列后来的第三名(在并列后)将被标记为第五名。
* 这相当于SQL中的RANK函数。
* @group window_funcs
* @since 1.4.0
def rank(): Column = withExpr { new Rank }
* 窗口函数:在窗口分区内创建一个从1开始的顺序号。
* @group window_funcs
* @since 1.6.0
def row_number(): Column = withExpr { RowNumber() }
非聚合函数
//////////////////////////////////////////////////////////////////////////////////////////////
// 非聚合函数
//////////////////////////////////////////////////////////////////////////////////////////////
* 根据给定的列名返回一个基于该列名的[[Column]]。
* @group normal_funcs
* @since 1.3.0
def col(colName: String): Column = Column(colName)
* 根据给定的列名返回一个基于该列名的[[Column]]。[[col]]的别名。
* @group normal_funcs
* @since 1.3.0
def column(colName: String): Column = Column(colName)
* 创建一个表示字面值的[[Column]]。
* 如果传入的对象已经是一个[[Column]],则直接返回该对象。如果对象是一个Scala Symbol,则也将其转换为[[Column]]。
* 否则,将创建一个新的[[Column]]来表示字面值。
* @group normal_funcs
* @since 1.3.0
def lit(literal: Any): Column = typedLit(literal)
* 创建一个表示字面值的[[Column]]。
* 如果传入的对象已经是一个[[Column]],则直接返回该对象。如果对象是一个Scala Symbol,则也将其转换为[[Column]]。
* 否则,将创建一个新的[[Column]]来表示字面值。与[[lit]]函数的区别在于,此函数可以处理参数化的scala类型,例如:List、Seq和Map。
* @group normal_funcs
* @since 2.2.0
def typedLit[T : TypeTag](literal: T): Column = literal match {
case c: Column => c
case s: Symbol => new ColumnName(s.name)
case _ => Column(Literal.create(literal))
* 创建一个新的数组列。输入列必须具有相同的数据类型。
* @group normal_funcs
* @since 1.4.0
@scala.annotation.varargs
def array(cols: Column*): Column = withExpr { CreateArray(cols.map(_.expr)) }
* 创建一个新的数组列。输入列必须具有相同的数据类型。
* @group normal_funcs
* @since 1.4.0
@scala.annotation.varargs
def array(colName: String, colNames: String*): Column = {
array((colName +: colNames).map(col) : _*)
* 创建一个新的映射列。输入列必须按键值对进行分组,例如(key1,value1,key2,value2,...)。
* 键列必须具有相同的数据类型,并且不能为 null。值列必须具有相同的数据类型。
* @group normal_funcs
* @since 2.0
@scala.annotation.varargs
def map(cols: Column*): Column = withExpr { CreateMap(cols.map(_.expr)) }
* 创建一个新的映射列。第一列中的数组用作键。第二列中的数组用作值。键中的所有元素都不应为 null。
* @group normal_funcs
* @since 2.4
def map_from_arrays(keys: Column, values: Column): Column = withExpr {
MapFromArrays(keys.expr, values.expr)
* 将一个DataFrame标记为在广播连接中可用的小型数据集。
* 下面的示例使用 `joinKey` 将右侧的DataFrame标记为广播哈希连接。
* {{{
* // left和right是DataFrames
* left.join(broadcast(right), "joinKey")
* }}}
* @group normal_funcs
* @since 1.5.0
def broadcast[T](df: Dataset[T]): Dataset[T] = {
Dataset[T](df.sparkSession,
ResolvedHint(df.logicalPlan, HintInfo(broadcast = true)))(df.exprEnc)
* 返回第一个非空的列,如果所有输入都为null,则返回null。
* 例如,`coalesce(a, b, c)` 如果a不为null,则返回a,
* 如果a为null且b不为null,则返回b,如果a和b都为null但c不为null,则返回c。
* @group normal_funcs
* @since 1.3.0
@scala.annotation.varargs
def coalesce(e: Column*): Column = withExpr { Coalesce(e.map(_.expr)) }
* 创建一个表示当前Spark任务的文件名的字符串列。
* @group normal_funcs
* @since 1.6.0
def input_file_name(): Column = withExpr { InputFileName() }
* 如果列为NaN,则返回true。
* @group normal_funcs
* @since 1.6.0
def isnan(e: Column): Column = withExpr { IsNaN(e.expr) }
* 如果列为空,则返回true。
* @group normal_funcs
* @since 1.6.0
def isnull(e: Column): Column = withExpr { IsNull(e.expr) }
* 生成单调递增的64位整数的列表达式。
* 生成的ID保证是单调递增且唯一的,但不是连续的。
* 当前实现将分区ID放在高31位中,并将每个分区中的记录编号放在低33位中。
* 假设数据框具有少于10亿个分区,并且每个分区具有小于80亿条记录。
* 例如,考虑一个包含两个分区的DataFrame,每个分区有3条记录。
* 此表达式将返回以下ID:
* {{{
* 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
* }}}
* @group normal_funcs
* @since 1.4.0
@deprecated("Use monotonically_increasing_id()", "2.0.0")
def monotonicallyIncreasingId(): Column = monotonically_increasing_id()
* 返回一个生成单调递增的64位整数的列表达式。
* 生成的ID保证是单调递增且唯一的,但不是连续的。
* 当前实现将分区ID放在高31位中,并将每个分区中的记录编号放在低33位中。
* 假设数据框具有少于10亿个分区,并且每个分区具有小于80亿条记录。
* 例如,考虑一个包含两个分区的DataFrame,每个分区有3条记录。
* 此表达式将返回以下ID:
* {{{
* 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
* }}}
* @group normal_funcs
* @since 1.6.0
def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() }
* 如果列1不为NaN,则返回列1,否则返回列2。
* 输入列应为浮点列(DoubleType或FloatType)。
* @group normal_funcs
* @since 1.5.0
def nanvl(col1: Column, col2: Column): Column = withExpr { NaNvl(col1.expr, col2.expr) }
* 一元减号,即对表达式取反。
* {{{
* // 选择amount列并对所有值取反。
* // Scala:
* df.select(-df("amount"))
* // Java:
* df.select(negate(df.col("amount")));
* }}}
* @group normal_funcs
* @since 1.3.0
def negate(e: Column): Column = -e
* 布尔表达式的取反,即 NOT。
* {{{
* // Scala:选择不活动的行(isActive === false)
* df.filter(!df("isActive"))
* // Java:
* df.filter(not(df.col("isActive")));
* }}}
* @group normal_funcs
* @since 1.3.0
def not(e: Column): Column = !e
* 生成一个具有独立且来自U[0.0, 1.0]的i.i.d.样本的随机列。
* @note 一般情况下,该函数是非确定性的。
* @group normal_funcs
* @since 1.4.0
def rand(seed: Long): Column = withExpr { Rand(seed) }
* 生成一个具有独立且来自U[0.0, 1.0]的i.i.d.样本的随机列。
* @note 一般情况下,该函数是非确定性的。
* @group normal_funcs
* @since 1.4.0
def rand(): Column = rand(Utils.random.nextLong)
* 生成一个具有独立且来自标准正态分布的i.i.d.样本的随机列。
* @note 一般情况下,该函数是非确定性的。
* @group normal_funcs
* @since 1.4.0
def randn(seed: Long): Column = withExpr { Randn(seed) }
* 生成一个具有独立且来自标准正态分布的i.i.d.样本的随机列。
* @note 一般情况下,该函数是非确定性的。
* @group normal_funcs
* @since 1.4.0
def randn(): Column = randn(Utils.random.nextLong)
* 分区ID。
* @note 由于它取决于数据分区和任务调度,因此这是非确定性的。
* @group normal_funcs
* @since 1.6.0
def spark_partition_id(): Column = withExpr { SparkPartitionID() }
* 创建一个新的结构列。
* 如果输入列是`DataFrame`中的列,或者是已命名的导出列表达式,
* 则它的名称将保留为StructField的名称;
* 否则,新生成的StructField的名称将自动生成为`col`加上后缀`index + 1`,
* 即col1、col2、col3等。
* @group normal_funcs
* @since 1.4.0
@scala.annotation.varargs
def struct(cols: Column*): Column = withExpr { CreateStruct(cols.map(_.expr)) }
* 创建一个新的结构列,由多个输入列组成。
* @group normal_funcs
* @since 1.4.0
@scala.annotation.varargs
def struct(colName: String, colNames: String*): Column = {
struct((colName +: colNames).map(col) : _*)
* 计算一系列条件,并返回多个可能结果表达式之一。
* 如果最后没有定义otherwise,则对于未匹配的条件返回null。
* {{{
* // 示例:将性别字符串列编码为整数。
* // Scala:
* people.select(when(people("gender") === "male", 0)
* .when(people("gender") === "female", 1)
* .otherwise(2))
* // Java:
* people.select(when(col("gender").equalTo("male"), 0)
* .when(col("gender").equalTo("female"), 1)
* .otherwise(2))
* }}}
* @group normal_funcs
* @since 1.4.0
def when(condition: Column, value: Any): Column = withExpr {
CaseWhen(Seq((condition.expr, lit(value).expr)))
* 计算一个数的按位取反(~)。
* @group normal_funcs
* @since 1.4.0
def bitwiseNOT(e: Column): Column = withExpr { BitwiseNot(e.expr) }
* 将表达式字符串解析为表示其列的列,类似于[[Dataset#selectExpr]]。
* {{{
* // 获取每个长度的单词数
* df.groupBy(expr("length(word)")).count()
* }}}
* @group normal_funcs
def expr(expr: String): Column = {
val parser = SparkSession.getActiveSession.map(_.sessionState.sqlParser).getOrElse {
new SparkSqlParser(new SQLConf)
Column(parser.parseExpression(expr))
}
数学函数
/**
* 计算给定值的平方根。
* @group math_funcs
* @since 1.3.0
def sqrt(e: Column): Column = withExpr { Sqrt(e.expr) }
* 计算给定值的平方根。
* @group math_funcs
* @since 1.5.0
def sqrt(colName: String): Column = sqrt(Column(colName))
//////////////////////////////////////////////////////////////////////////////////////////////
// Math Functions
//////////////////////////////////////////////////////////////////////////////////////////////
* 计算数值的绝对值。
* @group math_funcs
* @since 1.3.0
def abs(e: Column): Column = withExpr { Abs(e.expr) }
* @return 返回`e`的反余弦值(以弧度为单位),如同通过`java.lang.Math.acos`计算得到的那样
* @group math_funcs
* @since 1.4.0
def acos(e: Column): Column = withExpr { Acos(e.expr) }
* @return 返回`columnName`的反余弦值,如同通过`java.lang.Math.acos`计算得到的那样
* @group math_funcs
* @since 1.4.0
def acos(columnName: String): Column = acos(Column(columnName))
* @return 返回`e`的反正弦值(以弧度为单位),如同通过`java.lang.Math.asin`计算得到的那样
* @group math_funcs
* @since 1.4.0
def asin(e: Column): Column = withExpr { Asin(e.expr) }
* @return 返回`columnName`的反正弦值,如同通过`java.lang.Math.asin`计算得到的那样
* @group math_funcs
* @since 1.4.0
def asin(columnName: String): Column = asin(Column(columnName))
* @return 返回`e`的反正切值,如同通过`java.lang.Math.atan`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan(e: Column): Column = withExpr { Atan(e.expr) }
* @return 返回`columnName`的反正切值,如同通过`java.lang.Math.atan`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan(columnName: String): Column = atan(Column(columnName))
* @param y y轴上的坐标
* @param x x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(y: Column, x: Column): Column = withExpr { Atan2(y.expr, x.expr) }
* @param y y轴上的坐标
* @param xName x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(y: Column, xName: String): Column = atan2(y, Column(xName))
* @param yName y轴上的坐标
* @param x x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(yName: String, x: Column): Column = atan2(Column(yName), x)
* @param yName y轴上的坐标
* @param xName x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(yName: String, xName: String): Column =
atan2(Column(yName), Column(xName))
* @param y y轴上的坐标
* @param xValue x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(y: Column, xValue: Double): Column = atan2(y, lit(xValue))
* @param yName y轴上的坐标
* @param xValue x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(yName: String, xValue: Double): Column = atan2(Column(yName), xValue)
* @param yValue y轴上的坐标
* @param x x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(yValue: Double, x: Column): Column = atan2(lit(yValue), x)
* @param yValue y轴上的坐标
* @param xName x轴上的坐标
* @return 对应于笛卡尔坐标中的点(x,y)的极坐标中的<r,θ>组件的<θ>值,
* 如同通过`java.lang.Math.atan2`计算得到的那样
* @group math_funcs
* @since 1.4.0
def atan2(yValue: Double, xName: String): Column = atan2(yValue, Column(xName))
* 返回具有给定长整型列的二进制值的字符串表示形式。
* 例如,bin("12")返回"1100"。
* @group math_funcs
* @since 1.5.0
def bin(e: Column): Column = withExpr { Bin(e.expr) }
* 返回具有给定长整型列的二进制值的字符串表示形式。
* 例如,bin("12")返回"1100"。
* @group math_funcs
* @since 1.5.0
def bin(columnName: String): Column = bin(Column(columnName))
* 计算给定值的立方根。
* @group math_funcs
* @since 1.4.0
def cbrt(e: Column): Column = withExpr { Cbrt(e.expr) }
* 计算给定列的立方根。
* @group math_funcs
* @since 1.4.0
def cbrt(columnName: String): Column = cbrt(Column(columnName))
* 计算给定值的上限。
* @group math_funcs
* @since 1.4.0
def ceil(e: Column): Column = withExpr { Ceil(e.expr) }
* 计算给定列的上限。
* @group math_funcs
* @since 1.4.0
def ceil(columnName: String): Column = ceil(Column(columnName))
* 将一个字符串列中的数字从一种进制转换为另一种进制。
* @group math_funcs
* @since 1.5.0
def conv(num: Column, fromBase: Int, toBase: Int): Column = withExpr {
Conv(num.expr, lit(fromBase).expr, lit(toBase).expr)
* 返回角度的余弦值,类似于`java.lang.Math.cos`计算得到的值。
* @group math_funcs
* @since 1.4.0
def cos(e: Column): Column = withExpr { Cos(e.expr) }
* 返回角度的余弦值,类似于`java.lang.Math.cos`计算得到的值。
* @group math_funcs
* @since 1.4.0
def cos(columnName: String): Column = cos(Column(columnName))
* 返回给定值的双曲余弦值,类似于`java.lang.Math.cosh`计算得到的值。
* @group math_funcs
* @since 1.4.0
def cosh(e: Column): Column = withExpr { Cosh(e.expr) }
* 返回给定值的双曲余弦值,类似于`java.lang.Math.cosh`计算得到的值。
* @group math_funcs
* @since 1.4.0
def cosh(columnName: String): Column = cosh(Column(columnName))
* 计算给定值的指数。
* @group math_funcs
* @since 1.4.0
def exp(e: Column): Column = withExpr { Exp(e.expr) }
* 计算给定列的指数。
* @group math_funcs
* @since 1.4.0
def exp(columnName: String): Column = exp(Column(columnName))
* 计算给定值减一的指数。
* @group math_funcs
* @since 1.4.0
def expm1(e: Column): Column = withExpr { Expm1(e.expr) }
* 计算给定列减一的指数。
* @group math_funcs
* @since 1.4.0
def expm1(columnName: String): Column = expm1(Column(columnName))
* 计算给定值的阶乘。
* @group math_funcs
* @since 1.5.0
def factorial(e: Column): Column = withExpr { Factorial(e.expr) }
* 计算给定值的下舍整数。
* @group math_funcs
* @since 1.4.0
def floor(e: Column): Column = withExpr { Floor(e.expr) }
* 计算给定列的下舍整数。
* @group math_funcs
* @since 1.4.0
def floor(columnName: String): Column = floor(Column(columnName))
* 返回值列表中的最大值,跳过null值。该函数至少需要2个参数。当所有参数都为null时返回null。
* @group normal_funcs
* @since 1.5.0
@scala.annotation.varargs
def greatest(exprs: Column*): Column = withExpr { Greatest(exprs.map(_.expr)) }
* 返回列名列表中的最大值,跳过null值。该函数至少需要2个参数。当所有参数都为null时返回null。
* @group normal_funcs
* @since 1.5.0
@scala.annotation.varargs
def greatest(columnName: String, columnNames: String*): Column = {
greatest((columnName +: columnNames).map(Column.apply): _*)
* 计算给定列的十六进制值。
* @group math_funcs
* @since 1.5.0
def hex(column: Column): Column = withExpr { Hex(column.expr) }
* 对十六进制值进行解码。将每对字符解释为一个十六进制数字,并转换为相应的字节表示形式。
* @group math_funcs
* @since 1.5.0
def unhex(column: Column): Column = withExpr { Unhex(column.expr) }
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(l: Column, r: Column): Column = withExpr { Hypot(l.expr, r.expr) }
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(l: Column, rightName: String): Column = hypot(l, Column(rightName))
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(leftName: String, r: Column): Column = hypot(Column(leftName), r)
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(leftName: String, rightName: String): Column =
hypot(Column(leftName), Column(rightName))
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(l: Column, r: Double): Column = hypot(l, lit(r))
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(leftName: String, r: Double): Column = hypot(Column(leftName), r)
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(l: Double, r: Column): Column = hypot(lit(l), r)
* 计算 `sqrt(a^2^ + b^2^)` ,避免溢出或下溢。
* @group math_funcs
* @since 1.4.0
def hypot(l: Double, rightName: String): Column = hypot(l, Column(rightName))
* 返回值列表中的最小值,跳过null值。该函数至少需要2个参数。当所有参数都为null时返回null。
* @group normal_funcs
* @since 1.5.0
@scala.annotation.varargs
def least(exprs: Column*): Column = withExpr { Least(exprs.map(_.expr)) }
* 返回列名列表中的最小值,跳过null值。该函数至少需要2个参数。当所有参数都为null时返回null。
* @group normal_funcs
* @since 1.5.0
@scala.annotation.varargs
def least(columnName: String, columnNames: String*): Column = {
least((columnName +: columnNames).map(Column.apply): _*)
* 计算给定值的自然对数。
* @group math_funcs
* @since 1.4.0
def log(e: Column): Column = withExpr { Log(e.expr) }
* 计算给定列的自然对数。
* @group math_funcs
* @since 1.4.0
def log(columnName: String): Column = log(Column(columnName))
* 返回以给定底数为基的对数值。
* @group math_funcs
* @since 1.4.0
def log(base: Double, a: Column): Column = withExpr { Logarithm(lit(base).expr, a.expr) }
* 返回以给定底数为基的对数值。
* @group math_funcs
* @since 1.4.0
def log(base: Double, columnName: String): Column = log(base, Column(columnName))
* 计算给定值的以10为底的对数。
* @group math_funcs
* @since 1.4.0
def log10(e: Column): Column = withExpr { Log10(e.expr) }
* 计算给定列的以10为底的对数。
* @group math_funcs
* @since 1.4.0
def log10(columnName: String): Column = log10(Column(columnName))
* 计算给定值加一的自然对数。
* @group math_funcs
* @since 1.4.0
def log1p(e: Column): Column = withExpr { Log1p(e.expr) }
* 计算给定列加一的自然对数。
* @group math_funcs
* @since 1.4.0
def log1p(columnName: String): Column = log1p(Column(columnName))
* 计算给定列的以2为底的对数。
* @group math_funcs
* @since 1.5.0
def log2(expr: Column): Column = withExpr { Log2(expr.expr) }
* 计算给定值的以2为底的对数。
* @group math_funcs
* @since 1.5.0
def log2(columnName: String): Column = log2(Column(columnName))
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(l: Column, r: Column): Column = withExpr { Pow(l.expr, r.expr) }
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(l: Column, rightName: String): Column = pow(l, Column(rightName))
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(leftName: String, r: Column): Column = pow(Column(leftName), r)
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(leftName: String, rightName: String): Column = pow(Column(leftName), Column(rightName))
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(l: Column, r: Double): Column = pow(l, lit(r))
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(leftName: String, r: Double): Column = pow(Column(leftName), r)
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(l: Double, r: Column): Column = pow(lit(l), r)
* 返回第一个参数的第二个参数次幂的值。
* @group math_funcs
* @since 1.4.0
def pow(l: Double, rightName: String): Column = pow(l, Column(rightName))
* 返回除法中的正数被除数模除除数的正数余数。
* @group math_funcs
* @since 1.5.0
def pmod(dividend: Column, divisor: Column): Column = withExpr {
Pmod(dividend.expr, divisor.expr)
* 返回最接近参数的整数,四舍五入到最接近的整数。
* @group math_funcs
* @since 1.4.0
def rint(e: Column): Column = withExpr { Rint(e.expr) }
* 返回最接近参数的整数,四舍五入到最接近的整数。
* @group math_funcs
* @since 1.4.0
def rint(columnName: String): Column = rint(Column(columnName))
* 返回列 `e` 的值向0舍入到0位小数的结果,使用HALF_UP舍入模式。
* @group math_funcs
* @since 1.5.0
def round(e: Column): Column = round(e, 0)
* 将列 `e` 的值舍入到 `scale` 位小数,使用HALF_UP舍入模式,
* 如果 `scale` 大于等于0或者在 `scale` 小于0时取整数部分。
* @group math_funcs
* @since 1.5.0
def round(e: Column, scale: Int): Column = withExpr { Round(e.expr, Literal(scale)) }
* 返回列 `e` 的值向0舍入到0位小数的结果,使用HALF_EVEN舍入模式。
* @group math_funcs
* @since 2.0.0
def bround(e: Column): Column = bround(e, 0)
* 将列 `e` 的值舍入到 `scale` 位小数,使用HALF_EVEN舍入模式,
* 如果 `scale` 大于等于0或者在 `scale` 小于0时取整数部分。
* @group math_funcs
* @since 2.0.0
def bround(e: Column, scale: Int): Column = withExpr { BRound(e.expr, Literal(scale)) }
* 将给定值向左移动 `numBits` 位。如果给定值是long类型,则返回long类型,否则返回整型。
* @group math_funcs
* @since 1.5.0
def shiftLeft(e: Column, numBits: Int): Column = withExpr { ShiftLeft(e.expr, lit(numBits).expr) }
* (有符号)将给定值向右移动 `numBits` 位。如果给定值是long类型,则返回long类型,否则返回整型。
* @group math_funcs
* @since 1.5.0
def shiftRight(e: Column, numBits: Int): Column = withExpr {
ShiftRight(e.expr, lit(numBits).expr)
* 无符号地将给定值向右移动 `numBits` 位。如果给定值是long类型,则返回long类型,否则返回整型。
* @group math_funcs
* @since 1.5.0
def shiftRightUnsigned(e: Column, numBits: Int): Column = withExpr {
ShiftRightUnsigned(e.expr, lit(numBits).expr)
* 计算给定值的符号。
* @group math_funcs
* @since 1.4.0
def signum(e: Column): Column = withExpr { Signum(e.expr) }
* 计算给定列的符号。
* @group math_funcs
* @since 1.4.0
def signum(columnName: String): Column = signum(Column(columnName))
* 返回角度的正弦值,类似于`java.lang.Math.sin`计算得到的值。
* @group math_funcs
* @since 1.4.0
def sin(e: Column): Column = withExpr { Sin(e.expr) }
* 返回角度的正弦值,类似于`java.lang.Math.sin`计算得到的值。
* @group math_funcs
* @since 1.4.0
def sin(columnName: String): Column = sin(Column(columnName))
* 返回给定值的双曲正弦值,类似于`java.lang.Math.sinh`计算得到的值。
* @group math_funcs
* @since 1.4.0
def sinh(e: Column): Column = withExpr { Sinh(e.expr) }
* 返回给定值的双曲正弦值,类似于`java.lang.Math.sinh`计算得到的值。
* @param columnName 双曲角
* @return 给定值的双曲正弦值,类似于`java.lang.Math.sinh`计算得到的值。
* @group math_funcs
* @since 1.4.0
def sinh(columnName: String): Column = sinh(Column(columnName))
* 返回角度的正切值,类似于`java.lang.Math.tan`计算得到的值。
* @param e 弧度角
* @return 角度的正切值,类似于`java.lang.Math.tan`计算得到的值。
* @group math_funcs
* @since 1.4.0
def tan(e: Column): Column = withExpr { Tan(e.expr) }
* 返回角度的正切值,类似于`java.lang.Math.tan`计算得到的值。
* @param columnName 弧度角
* @return 角度的正切值,类似于`java.lang.Math.tan`计算得到的值。
* @group math_funcs
* @since 1.4.0
def tan(columnName: String): Column = tan(Column(columnName))
* 返回给定值的双曲正切值,类似于`java.lang.Math.tanh`计算得到的值。
* @param e 双曲角
* @return 给定值的双曲正切值,类似于`java.lang.Math.tanh`计算得到的值。
* @group math_funcs
* @since 1.4.0
def tanh(e: Column): Column = withExpr { Tanh(e.expr) }
* 返回给定值的双曲正切值,类似于`java.lang.Math.tanh`计算得到的值。
* @param columnName 双曲角
* @return 给定值的双曲正切值,类似于`java.lang.Math.tanh`计算得到的值。
* @group math_funcs
* @since 1.4.0
def tanh(columnName: String): Column = tanh(Column(columnName))
* @group math_funcs
* @since 1.4.0
* @deprecated("Use degrees", "2.1.0")
* 将弧度转换为度数。
def toDegrees(e: Column): Column = degrees(e)
* @group math_funcs
* @since 1.4.0
* @deprecated("Use degrees", "2.1.0")
* 将弧度转换为度数。
def toDegrees(columnName: String): Column = degrees(Column(columnName))
* 将以弧度表示的角度转换为大致相等的以度数表示的角度。
* @param e 弧度角
* @return 以度数表示的角度,类似于`java.lang.Math.toDegrees`
* @group math_funcs
* @since 2.1.0
def degrees(e: Column): Column = withExpr { ToDegrees(e.expr) }
* 将以弧度表示的角度转换为大致相等的以度数表示的角度。
* @param columnName 弧度角
* @return 以度数表示的角度,类似于`java.lang.Math.toDegrees`
* @group math_funcs
* @since 2.1.0
def degrees(columnName: String): Column = degrees(Column(columnName))
* @group math_funcs
* @since 1.4.0
* @deprecated("Use radians", "2.1.0")
* 将度数转换为弧度。
def toRadians(e: Column): Column = radians(e)
* @group math_funcs
* @since 1.4.0
* @deprecated("Use radians", "2.1.0")
* 将度数转换为弧度。
def toRadians(columnName: String): Column = radians(Column(columnName))
* 将以度数表示的角度转换为大致相等的以弧度表示的角度。
* @param e 度数角
* @return 以弧度表示的角度,类似于`java.lang.Math.toRadians`
* @group math_funcs
* @since 2.1.0
def radians(e: Column): Column = withExpr { ToRadians(e.expr) }
* 将以度数表示的角度转换为大致相等的以弧度表示的角度。
* @param columnName 度数角
* @return 以弧度表示的角度,类似于`java.lang.Math.toRadians`
* @group math_funcs
* @since 2.1.0
def radians(columnName: String): Column = radians(Column(columnName))
misc_funcs 杂项函数
//////////////////////////////////////////////////////////////////////////////////////////////
// Misc functions
//////////////////////////////////////////////////////////////////////////////////////////////
* 计算二进制列的MD5摘要,并将结果作为32个字符的十六进制字符串返回。
* @group misc_funcs
* @since 1.5.0
def md5(e: Column): Column = withExpr { Md5(e.expr) }
* 计算二进制列的SHA-1摘要,并将结果作为40个字符的十六进制字符串返回。
* @group misc_funcs
* @since 1.5.0
def sha1(e: Column): Column = withExpr { Sha1(e.expr) }
* 计算二进制列的SHA-2系列散列函数,并将结果作为十六进制字符串返回。
* @param e 要计算SHA-2的列
* @param numBits 224、256、384或512中的一个
* @group misc_funcs
* @since 1.5.0
def sha2(e: Column, numBits: Int): Column = {
require(Seq(0, 224, 256, 384, 512).contains(numBits),
s"numBits $numBits is not in the permitted values (0, 224, 256, 384, 512)")
withExpr { Sha2(e.expr, lit(numBits).expr) }
* 计算二进制列的循环冗余校验值(CRC32),并将结果作为bigint返回。
* @group misc_funcs
* @since 1.5.0
def crc32(e: Column): Column = withExpr { Crc32(e.expr) }
* 计算给定列的哈希码,并将结果作为int列返回。
* @group misc_funcs
* @since 2.0.0
@scala.annotation.varargs
def hash(cols: Column*): Column = withExpr {
new Murmur3Hash(cols.map(_.expr))
}
string_funcs 字符串函数
//////////////////////////////////////////////////////////////////////////////////////////////
// String functions
//////////////////////////////////////////////////////////////////////////////////////////////
* 计算字符串列第一个字符的数字值,并将结果作为int列返回。
* @group string_funcs
* @since 1.5.0
def ascii(e: Column): Column = withExpr { Ascii(e.expr) }
* 计算二进制列的BASE64编码,并将结果作为字符串列返回。
* 这是unbase64的反操作。
* @group string_funcs
* @since 1.5.0
def base64(e: Column): Column = withExpr { Base64(e.expr) }
* 将多个输入字符串列连接成单个字符串列,使用给定的分隔符。
* @group string_funcs
* @since 1.5.0
@scala.annotation.varargs
def concat_ws(sep: String, exprs: Column*): Column = withExpr {
ConcatWs(Literal.create(sep, StringType) +: exprs.map(_.expr))
* 使用提供的字符集('US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'之一)
* 将第一个参数解码为字符串。如果任一参数为空,则结果也为空。
* @group string_funcs
* @since 1.5.0
def decode(value: Column, charset: String): Column = withExpr {
Decode(value.expr, lit(charset).expr)
* 使用提供的字符集('US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'之一)
* 将第一个参数编码为二进制。如果任一参数为空,则结果也为空。
* @group string_funcs
* @since 1.5.0
def encode(value: Column, charset: String): Column = withExpr {
Encode(value.expr, lit(charset).expr)
* 格式化数字列x为类似于'#,###,###.##'的格式,四舍五入到d位小数,
* 使用HALF_EVEN舍入模式,并将结果作为字符串列返回。
* 如果d为0,则结果没有小数点或小数部分。
* 如果d小于0,则结果将为空。
* @group string_funcs
* @since 1.5.0
def format_number(x: Column, d: Int): Column = withExpr {
FormatNumber(x.expr, lit(d).expr)
* 使用printf样式格式化参数,并将结果作为字符串列返回。
* @group string_funcs
* @since 1.5.0
@scala.annotation.varargs
def format_string(format: String, arguments: Column*): Column = withExpr {
FormatString((lit(format) +: arguments).map(_.expr): _*)
* 返回一个新的字符串列,其中每个单词的首字母转换为大写。
* 单词由空格分隔。
* 例如,"hello world"将变为"Hello World"。
* @group string_funcs
* @since 1.5.0
def initcap(e: Column): Column = withExpr { InitCap(e.expr) }
* 定位给定字符串中子字符串的第一个出现位置。
* 如果任一参数为空,则返回null。
* 注意:位置不是从零开始,而是从1开始的索引。如果在str中找不到substr,则返回0。
* @group string_funcs
* @since 1.5.0
def instr(str: Column, substring: String): Column = withExpr {
StringInstr(str.expr, lit(substring).expr)
* 计算给定字符串的字符长度或给定二进制字符串的字节数。
* 字符串的长度包括尾部的空格。二进制字符串的长度包括二进制零。
* @group string_funcs
* @since 1.5.0
def length(e: Column): Column = withExpr { Length(e.expr) }
* 将字符串列转换为小写。
* @group string_funcs
* @since 1.3.0
def lower(e: Column): Column = withExpr { Lower(e.expr) }
* 计算两个给定字符串列的Levenshtein距离。
* @group string_funcs
* @since 1.5.0
def levenshtein(l: Column, r: Column): Column = withExpr { Levenshtein(l.expr, r.expr) }
* 在给定字符串中定位子字符串的第一个出现位置。
* 注意:位置不是从零开始,而是从1开始的索引。如果在str中找不到substr,则返回0。
* @group string_funcs
* @since 1.5.0
def locate(substr: String, str: Column): Column = withExpr {
new StringLocate(lit(substr).expr, str.expr)
* 在给定字符串中定位子字符串的第一个出现位置,位置从pos开始。
* 注意:位置不是从零开始,而是从1开始的索引。如果在str中找不到substr,则返回0。
* @group string_funcs
* @since 1.5.0
def locate(substr: String, str: Column, pos: Int): Column = withExpr {
StringLocate(lit(substr).expr, str.expr, lit(pos).expr)
* 使用pad右填充字符串列,使其长度达到len。如果字符串列超过len,则返回值将被截断为len个字符。
* @group string_funcs
* @since 1.5.0
def lpad(str: Column, len: Int, pad: String): Column = withExpr {
StringLPad(str.expr, lit(len).expr, lit(pad).expr)
* 从指定字符串值的左端删除空格。
* @group string_funcs
* @since 1.5.0
def ltrim(e: Column): Column = withExpr {StringTrimLeft(e.expr) }
* 从指定字符串列的左端删除指定的字符字符串。
* @group string_funcs
* @since 2.3.0
def ltrim(e: Column, trimString: String): Column = withExpr {
StringTrimLeft(e.expr, Literal(trimString))
* 从指定字符串表达式中提取与Java正则表达式匹配的特定组。如果正则表达式未匹配或指定的组未匹配,则返回空字符串。
* @group string_funcs
* @since 1.5.0
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column = withExpr {
RegExpExtract(e.expr, lit(exp).expr, lit(groupIdx).expr)
* 使用rep替换指定字符串值的所有子字符串。
* @group string_funcs
* @since 1.5.0
def regexp_replace(e: Column, pattern: String, replacement: String): Column = withExpr {
RegExpReplace(e.expr, lit(pattern).expr, lit(replacement).expr)
* 使用rep替换指定字符串值的所有子字符串。
* @group string_funcs
* @since 2.1.0
def regexp_replace(e: Column, pattern: Column, replacement: Column): Column = withExpr {
RegExpReplace(e.expr, pattern.expr, replacement.expr)
* 解码一个BASE64编码的字符串列,并将其作为二进制列返回。
* 这是base64的反操作。
* @group string_funcs
* @since 1.5.0
def unbase64(e: Column): Column = withExpr { UnBase64(e.expr) }
* 使用pad右填充字符串列,使其长度达到len。如果字符串列超过len,则返回值将被截断为len个字符。
* @group string_funcs
* @since 1.5.0
def rpad(str: Column, len: Int, pad: String): Column = withExpr {
StringRPad(str.expr, lit(len).expr, lit(pad).expr)
* 将字符串列重复n次,并将其作为新的字符串列返回。
* @group string_funcs
* @since 1.5.0
def repeat(str: Column, n: Int): Column = withExpr {
StringRepeat(str.expr, lit(n).expr)
* 从指定字符串值的右端删除空格。
* @group string_funcs
* @since 1.5.0
def rtrim(e: Column): Column = withExpr { StringTrimRight(e.expr) }
* 从指定字符串列的右端删除指定的字符字符串。
* @group string_funcs
* @since 2.3.0
def rtrim(e: Column, trimString: String): Column = withExpr {
StringTrimRight(e.expr, Literal(trimString))
* 返回指定表达式的Soundex代码。
* @group string_funcs
* @since 1.5.0
def soundex(e: Column): Column = withExpr { SoundEx(e.expr) }
* 使用pattern(pattern是一个正则表达式)在str周围拆分。
* 注意:pattern是正则表达式的字符串表示形式。
* @group string_funcs
* @since 1.5.0
def split(str: Column, pattern: String): Column = withExpr {
StringSplit(str.expr, lit(pattern).expr)
* 返回字符串列的子串,子串从`pos`开始,长度为`len`
* 当str是String类型时,或者返回二进制数组的切片,该切片从`pos`开始,长度为`len`,当str是Binary类型时。
* @note 位置不是基于0的,而是基于1的索引。
* @group string_funcs
* @since 1.5.0
def substring(str: Column, pos: Int, len: Int): Column = withExpr {
Substring(str.expr, lit(pos).expr, lit(len).expr)
* 返回给定字符串中出现分隔符delim之前的子字符串。
* 如果count是正数,则返回分隔符的左侧部分(从左开始计数)。
* 如果count是负数,则返回分隔符的右侧部分(从右开始计数)。
* substring_index在搜索delim时执行区分大小写的匹配。
* @group string_funcs
def substring_index(str: Column, delim: String, count: Int): Column = withExpr {
SubstringIndex(str.expr, lit(delim).expr, lit(count).expr)
* 将源字符串中的任何字符替换为replaceString中的字符。
* replaceString中的字符与matchingString中的字符相对应。
* 当源字符串中的字符与matchingString中的字符匹配时,将进行替换。
* @group string_funcs
* @since 1.5.0
def translate(src: Column, matchingString: String, replaceString: String): Column = withExpr {
StringTranslate(src.expr, lit(matchingString).expr, lit(replaceString).expr)
* 去掉字符串列两端的空格。
* @group string_funcs
* @since 1.5.0
def trim(e: Column): Column = withExpr { StringTrim(e.expr) }
* 去掉指定字符串列两端的指定字符。
* @group string_funcs
* @since 2.3.0
def trim(e: Column, trimString: String): Column = withExpr {
StringTrim(e.expr, Literal(trimString))
* 将字符串列转换为大写。
* @group string_funcs
* @since 1.3.0
def upper(e: Column): Column = withExpr { Upper(e.expr) }
日期时间函数
//////////////////////////////////////////////////////////////////////////////////////////////
// DateTime functions
//////////////////////////////////////////////////////////////////////////////////////////////
* 返回`startDate`之后`numMonths`个月的日期。
* @param startDate 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param numMonths 要添加到`startDate`的月数,可以为负数以减去月数
* @return 一个日期,如果`startDate`是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def add_months(startDate: Column, numMonths: Int): Column = withExpr {
AddMonths(startDate.expr, Literal(numMonths))
* 返回当前日期作为日期列。
* @group datetime_funcs
* @since 1.5.0
def current_date(): Column = withExpr { CurrentDate() }
* 返回当前时间戳作为时间戳列。
* @group datetime_funcs
* @since 1.5.0
def current_timestamp(): Column = withExpr { CurrentTimestamp() }
* 将日期/时间戳/字符串转换为指定格式的字符串值。
* 有关有效日期和时间格式模式,请参见[[java.text.SimpleDateFormat]]
* @param dateExpr 日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param format 模式 `dd.MM.yyyy`将返回如`18.03.1993`的字符串
* @return 一个字符串,如果`dateExpr`是无法转换为时间戳的字符串,则返回null
* @note 尽可能使用专用函数(例如[[year]]),因为它们能从特殊实现中受益。
* @throws IllegalArgumentException 如果`format`模式无效
* @group datetime_funcs
* @since 1.5.0
def date_format(dateExpr: Column, format: String): Column = withExpr {
DateFormatClass(dateExpr.expr, Literal(format))
* 返回`start`之后`days`天的日期。
* @param start 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param days 要添加到`start`的天数,可以为负数以减去天数
* @return 一个日期,如果`start`是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def date_add(start: Column, days: Int): Column = withExpr { DateAdd(start.expr, Literal(days)) }
* 返回`start`之前`days`天的日期。
* @param start 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param days 要减去`start`的天数,可以为负数以添加天数
* @return 一个日期,如果`start`是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def date_sub(start: Column, days: Int): Column = withExpr { DateSub(start.expr, Literal(days)) }
* 返回从`start`到`end`的天数。
* 只考虑输入的日期部分。例如:
* {{{
* dateddiff("2018-01-10 00:00:00", "2018-01-09 23:59:59")
* // 返回1
* }}}
* @param end 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param start 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @return 一个整数,如果`end`或`start`是无法转换为日期的字符串,则返回null。如果`end`在`start`之前,则返回负数
* @group datetime_funcs
* @since 1.5.0
def datediff(end: Column, start: Column): Column = withExpr { DateDiff(end.expr, start.expr) }
* 从给定日期/时间戳/字符串中提取年份作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def year(e: Column): Column = withExpr { Year(e.expr) }
* 从给定日期/时间戳/字符串中提取季度作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def quarter(e: Column): Column = withExpr { Quarter(e.expr) }
* 从给定日期/时间戳/字符串中提取月份作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def month(e: Column): Column = withExpr { Month(e.expr) }
* 从给定日期/时间戳/字符串中提取星期几作为整数。
* 从星期天到星期六分别为1到7。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 2.3.0
def dayofweek(e: Column): Column = withExpr { DayOfWeek(e.expr) }
* 从给定日期/时间戳/字符串中提取月份中的某天作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def dayofmonth(e: Column): Column = withExpr { DayOfMonth(e.expr) }
* 从给定日期/时间戳/字符串中提取年份中的某天作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def dayofyear(e: Column): Column = withExpr { DayOfYear(e.expr) }
* 从给定日期/时间戳/字符串中提取小时作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def hour(e: Column): Column = withExpr { Hour(e.expr) }
* 返回给定日期所在月份的最后一天。
* 例如,输入"2015-07-27"返回"2015-07-31",因为7月31日是2015年7月的最后一天。
* @param e 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @return 一个日期,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def last_day(e: Column): Column = withExpr { LastDay(e.expr) }
* 从给定日期/时间戳/字符串中提取分钟作为整数。
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def minute(e: Column): Column = withExpr { Minute(e.expr) }
* 返回`start`和`end`之间的月份数。
* 如果两个输入具有相同的日期,或者两者都是各自月份的最后一天,将返回一个整数。
* 否则,假设每个月有31天,计算差异。
* 例如:
* {{{
* months_between("2017-11-14", "2017-07-14") // 返回4.0
* months_between("2017-01-01", "2017-01-10") // 返回0.29032258
* months_between("2017-06-01", "2017-06-16 12:00:00") // 返回-0.5
* }}}
* @param end 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param start 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @return 一个双精度浮点数,如果`end`或`start`是无法转换为时间戳的字符串,则返回null。如果`end`在`start`之前,则返回负数
* @group datetime_funcs
* @since 1.5.0
def months_between(end: Column, start: Column): Column = withExpr {
new MonthsBetween(end.expr, start.expr)
* 返回`start`和`end`之间的月份数。如果将`roundOff`设置为true,则结果将四舍五入到8位小数;否则不进行四舍五入。
* @group datetime_funcs
* @since 2.4.0
def months_between(end: Column, start: Column, roundOff: Boolean): Column = withExpr {
MonthsBetween(end.expr, start.expr, lit(roundOff).expr)
* 返回给定日期的下一个星期几之后的日期。
* 例如,`next_day('2015-07-27', "Sunday")`返回2015-08-02,因为这是2015年7月27日之后的第一个星期日。
* @param date 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param dayOfWeek 不区分大小写,接受:"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"
* @return 一个日期,如果`date`是无法转换为日期的字符串或`dayOfWeek`是无效值,则返回null
* @group datetime_funcs
* @since 1.5.0
def next_day(date: Column, dayOfWeek: String): Column = withExpr {
NextDay(date.expr, lit(dayOfWeek).expr)
* 从给定日期/时间戳/字符串中提取秒数作为整数。
* @return 一个整数,如果输入是无法转换为时间戳的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def second(e: Column): Column = withExpr { Second(e.expr) }
* 从给定日期/时间戳/字符串中提取年份中的某周作为整数。
* 周被认为从星期一开始,第1周是具有大于3天的第一周,这是ISO 8601定义的
* @return 一个整数,如果输入是无法转换为日期的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def weekofyear(e: Column): Column = withExpr { WeekOfYear(e.expr) }
* 将从Unix纪元(1970-01-01 00:00:00 UTC)到的秒数转换为表示该时刻的时间戳的字符串,
* 该时刻是在当前系统时区中的`yyyy-MM-dd HH:mm:ss`格式。
* @param ut 可以转换为long类型的数字,例如字符串或整数。可以为负数以表示Unix纪元之前的时间戳
* @return 一个字符串,如果输入是无法转换为long的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def from_unixtime(ut: Column): Column = withExpr {
FromUnixTime(ut.expr, Literal("yyyy-MM-dd HH:mm:ss"))
* 将给定的秒数从Unix纪元(1970-01-01 00:00:00 UTC)转换为表示该时刻的时间戳的字符串,
* 该时刻是在当前系统时区中的给定格式。
* 可参考[[java.text.SimpleDateFormat]]获取有效的日期和时间格式模式
* @param ut 可以转换为long类型的数字,例如字符串或整数。可以为负数以表示Unix纪元之前的时间戳
* @param f 日期时间格式模式,用于将输入格式化为字符串
* @return 一个字符串,如果`ut`是无法转换为long的字符串,则返回null,或者`f`是无效的日期时间格式模式
* @group datetime_funcs
* @since 1.5.0
def from_unixtime(ut: Column, f: String): Column = withExpr {
FromUnixTime(ut.expr, Literal(f))
* 返回当前Unix时间戳(以秒为单位)作为长整型。
* @note 在同一查询中,所有`unix_timestamp`的调用返回相同的值
* (即当前时间戳是在查询评估开始时计算的)。
* @group datetime_funcs
* @since 1.5.0
def unix_timestamp(): Column = withExpr {
UnixTimestamp(CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss"))
* 将格式为`yyyy-MM-dd HH:mm:ss`的时间字符串转换为Unix时间戳(以秒为单位),
* 使用默认时区和默认区域设置。
* @param s 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @return 一个长整型,如果输入是无法转换为正确格式的字符串,则返回null
* @group datetime_funcs
* @since 1.5.0
def unix_timestamp(s: Column): Column = withExpr {
UnixTimestamp(s.expr, Literal("yyyy-MM-dd HH:mm:ss"))
* 将给定日期/时间戳/字符串按照给定模式转换为Unix时间戳(以秒为单位)。
* 可参考[[java.text.SimpleDateFormat]]获取有效的日期和时间格式模式
* @param s 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param p 字符串格式详细说明`s`在`s`是字符串时的格式
* @return 一个长整型,如果`s`是无法转换为时间戳的字符串,则返回null。或者`p`是无效的格式
* @group datetime_funcs
* @since 1.5.0
def unix_timestamp(s: Column, p: String): Column = withExpr { UnixTimestamp(s.expr, Literal(p)) }
* 根据`TimestampType`将列转换为时间戳。
* @param s 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @return 一个时间戳,如果输入是无法转换为时间戳的字符串,则返回null
* @group datetime_funcs
* @since 2.2.0
def to_timestamp(s: Column): Column = withExpr {
new ParseToTimestamp(s.expr)
* 将给定格式的时间字符串转换为时间戳。
* 可参考[[java.text.SimpleDateFormat]]获取有效的日期和时间格式模式
* @param s 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param fmt 字符串格式详细说明`s`在`s`是字符串时的格式
* @return 一个时间戳,如果`s`是无法转换为时间戳的字符串,则返回null,或者`fmt`是无效的格式
* @group datetime_funcs
* @since 2.2.0
def to_timestamp(s: Column, fmt: String): Column = withExpr {
new ParseToTimestamp(s.expr, Literal(fmt))
* 根据`DateType`将列转换为`DateType`。
* @group datetime_funcs
* @since 1.5.0
def to_date(e: Column): Column = withExpr { new ParseToDate(e.expr) }
* 将列转换为指定格式的`DateType`
* 可参考[[java.text.SimpleDateFormat]]获取有效的日期和时间格式模式
* @param e 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param fmt 字符串格式详细说明`e`在`e`是字符串时的格式
* @return 一个日期,如果`e`是无法转换为日期的字符串,则返回null,或者`fmt`是无效的格式
* @group datetime_funcs
* @since 2.2.0
def to_date(e: Column, fmt: String): Column = withExpr {
new ParseToDate(e.expr, Literal(fmt))
* 将日期按照给定格式截断到指定单位。
* 例如,`trunc("2018-11-19 12:01:19", "year")`返回2018-01-01
* @param date 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为日期的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param format: 'year', 'yyyy', 'yy' 表示按年截断,
* 或 'month', 'mon', 'mm' 表示按月截断
* @return 一个日期,如果`date`是无法转换为日期的字符串,则返回null,或者`format`是无效的值
* @group datetime_funcs
* @since 1.5.0
def trunc(date: Column, format: String): Column = withExpr {
TruncDate(date.expr, Literal(format))
* 将时间戳按照给定格式截断到指定单位。
* 例如,`date_tunc("2018-11-19 12:01:19", "year")`返回2018-01-01 00:00:00
* @param format: 'year', 'yyyy', 'yy' 表示按年截断,
* 'month', 'mon', 'mm' 表示按月截断,
* 'day', 'dd' 表示按日截断,
* 其他选项为:'second', 'minute', 'hour', 'week', 'month', 'quarter'
* @param timestamp 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @return 一个时间戳,如果`timestamp`是无法转换为时间戳的字符串,则返回null,或者`format`是无效的值
* @group datetime_funcs
* @since 2.3.0
def date_trunc(format: String, timestamp: Column): Column = withExpr {
TruncTimestamp(Literal(format), timestamp.expr)
* 给定一个类似于'2017-07-14 02:40:00.0'的时间戳,在UTC中解释它,并将该时间作为给定时区中的时间戳呈现。
* 例如,'GMT+1'将产生'2017-07-14 03:40:00.0'。
* @param ts 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param tz 详细说明输入应调整为的时区的字符串,例如`Europe/London`、`PST`或`GMT+5`
* @return 一个时间戳,如果`ts`是无法转换为时间戳的字符串,则返回null,或者`tz`是无效的值
* @group datetime_funcs
* @since 1.5.0
def from_utc_timestamp(ts: Column, tz: String): Column = withExpr {
FromUTCTimestamp(ts.expr, Literal(tz))
* 给定一个类似于'2017-07-14 02:40:00.0'的时间戳,在UTC中解释它,并将该时间作为给定时区中的时间戳呈现。
* 例如,'GMT+1'将产生'2017-07-14 03:40:00.0'。
* @group datetime_funcs
* @since 2.4.0
def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
FromUTCTimestamp(ts.expr, tz.expr)
* 给定一个类似于'2017-07-14 02:40:00.0'的时间戳,在给定时区中解释它,并将该时间作为UTC中的时间戳呈现。
* 例如,'GMT+1'将产生'2017-07-14 01:40:00.0'。
* @param ts 一个日期、时间戳或字符串。如果是字符串,则数据必须以可以转换为时间戳的格式,例如`yyyy-MM-dd`或`yyyy-MM-dd HH:mm:ss.SSSS`
* @param tz 详细说明输入所属的时区的字符串,例如`Europe/London`、`PST`或`GMT+5`
* @return 一个时间戳,如果`ts`是无法转换为时间戳的字符串,则返回null,或者`tz`是无效的值
* @group datetime_funcs
* @since 1.5.0
def to_utc_timestamp(ts: Column, tz: String): Column = withExpr {
ToUTCTimestamp(ts.expr, Literal(tz))
* 给定一个类似于'2017-07-14 02:40:00.0'的时间戳,在给定时区中解释它,并将该时间作为UTC中的时间戳呈现。
* 例如,'GMT+1'将产生'2017-07-14 01:40:00.0'。
* @group datetime_funcs
* @since 2.4.0
def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
ToUTCTimestamp(ts.expr, tz.expr)
* 将行按照给定时间戳列分桶到一个或多个时间窗口中。窗口的起始时间是包含的,但窗口的结束时间是不包含的,
* 例如12:05将在窗口[12:05,12:10)中,但不在[12:00,12:05)中。窗口可以支持微秒精度。不支持按月份的窗口。
* 下面的示例将每10秒开始于整点后5秒的一个一分钟窗口的平均股票价格:
* {{{
* val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
* df.groupBy(window($"time", "1 minute", "10 seconds", "5 seconds"), $"stockId")
* .agg(mean("price"))
* }}}
* 窗口将会如下所示:
* {{{
* 09:00:05-09:01:05
* 09:00:15-09:01:15
* 09:00:25-09:01:25 ...
* }}}
* 对于流式查询,可以使用函数`current_timestamp`在处理时间上生成窗口。
* @param timeColumn 时间戳列或表达式,用于按时间进行窗口化。时间列必须是TimestampType类型。
* @param windowDuration 字符串,指定窗口的宽度,例如`10 minutes`、`1 second`。请检查`org.apache.spark.unsafe.types.CalendarInterval`以获取有效的持续时间标识符。注意,持续时间是固定的时间长度,并且不随时间变化(根据日历)而变化。例如,`1 day`始终表示86,400,000毫秒,而不是日历天。
* @param slideDuration 字符串,指定窗口的滑动间隔,例如`1 minute`。每个`slideDuration`将生成一个新的窗口。必须小于或等于`windowDuration`。请检查`org.apache.spark.unsafe.types.CalendarInterval`以获取有效的持续时间标识符。此持续时间同样是绝对的,并且不随时间变化(根据日历)而变化。
* @param startTime 与1970-01-01 00:00:00 UTC相对偏移量,用于开始窗口间隔。例如,为了使小时滚动窗口从整点后15分钟开始,例如12:15-13:15、13:15-14:15...提供`startTime`作为`15 minutes`。
* @group datetime_funcs
* @since 2.0.0
def window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column = {
withExpr {
TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
}.as("window")
* 将行按照给定时间戳列分桶到一个或多个时间窗口中。窗口的起始时间是包含的,但窗口的结束时间是不包含的,
* 例如12:05将在窗口[12:05,12:10)中,但不在[12:00,12:05)中。窗口可以支持微秒精度。不支持按月份的窗口。
* 窗口的开始时间从1970-01-01 00:00:00 UTC开始。
* 下面的示例将每10秒生成一个一分钟窗口的平均股票价格:
* {{{
* val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
* df.groupBy(window($"time", "1 minute", "10 seconds"), $"stockId")
* .agg(mean("price"))
* }}}
* 窗口将会如下所示:
* {{{
* 09:00:00-09:01:00
* 09:00:10-09:01:10
* 09:00:20-09:01:20 ...
* }}}
* 对于流式查询,可以使用函数`current_timestamp`根据处理时间生成窗口。
* @param timeColumn 时间戳列或表达式,用于按时间进行窗口化。时间列必须是TimestampType类型。
* @param windowDuration 字符串,指定窗口的宽度,例如`10 minutes`、`1 second`。请检查`org.apache.spark.unsafe.types.CalendarInterval`以获取有效的持续时间标识符。注意,持续时间是固定的时间长度,并且不随时间变化(根据日历)而变化。例如,`1 day`始终表示86,400,000毫秒,而不是日历天。
* @param slideDuration 字符串,指定窗口的滑动间隔,例如`1 minute`。每个`slideDuration`将生成一个新的窗口。必须小于或等于`windowDuration`。请检查`org.apache.spark.unsafe.types.CalendarInterval`以获取有效的持续时间标识符。此持续时间同样是绝对的,并且不随时间变化(根据日历)而变化。
* @group datetime_funcs
* @since 2.0.0
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column = {
window(timeColumn, windowDuration, slideDuration, "0 second")
* 给定一个时间戳列,生成滚动窗口。窗口的起始时间是包含的,但窗口的结束时间是不包含的,
* 例如12:05将在窗口[12:05,12:10)中,但不在[12:00,12:05)中。窗口可以支持微秒精度。不支持按月份的窗口。
* 窗口的开始时间从1970-01-01 00:00:00 UTC开始。
* 下面的示例将每一分钟生成一个平均股票价格的滚动窗口:
* {{{
* val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
* df.groupBy(window($"time", "1 minute"), $"stockId")
* .agg(mean("price"))
* }}}
* 窗口将会如下所示:
* {{{
* 09:00:00-09:01:00
* 09:01:00-09:02:00
* 09:02:00-09:03:00 ...
* }}}
* 对于流式查询,可以使用函数`current_timestamp`根据处理时间生成窗口。
* @param timeColumn 时间戳列或表达式,用于按时间进行窗口化。时间列必须是TimestampType类型。
* @param windowDuration 字符串,指定窗口的宽度,例如`10 minutes`、`1 second`。请检查`org.apache.spark.unsafe.types.CalendarInterval`以获取有效的持续时间标识符。
* @group datetime_funcs
* @since 2.0.0
def window(timeColumn: Column, windowDuration: String): Column = {
window(timeColumn, windowDuration, windowDuration, "0 second")
}
collection_funcs 集合函数
//////////////////////////////////////////////////////////////////////////////////////////////
// 集合函数
//////////////////////////////////////////////////////////////////////////////////////////////
* 如果数组为null,则返回null;如果数组包含`value`,则返回true;否则返回false。
* @param column 数组列
* @param value 值
* @group collection_funcs
* @since 1.5.0
def array_contains(column: Column, value: Any): Column = withExpr {
ArrayContains(column.expr, lit(value).expr)
* 如果`a1`和`a2`至少有一个非空元素,则返回`true`。如果两个数组都非空且其中任何一个包含`null`,则返回`null`。否则返回`false`。
* @param a1 数组列1
* @param a2 数组列2
* @group collection_funcs
* @since 2.4.0
def arrays_overlap(a1: Column, a2: Column): Column = withExpr {
ArraysOverlap(a1.expr, a2.expr)
* 返回包含`x`从索引`start`开始(如果`start`为负数,则从末尾开始)的指定长度`length`的所有元素的数组。
* @param x 列
* @param start 起始位置
* @param length 长度
* @group collection_funcs
* @since 2.4.0
def slice(x: Column, start: Int, length: Int): Column = withExpr {
Slice(x.expr, Literal(start), Literal(length))
* 使用`delimiter`将`column`的元素连接起来。将null值替换为`nullReplacement`。
* @param column 列
* @param delimiter 分隔符
* @param nullReplacement 空值替换
* @group collection_funcs
* @since 2.4.0
def array_join(column: Column, delimiter: String, nullReplacement: String): Column = withExpr {
ArrayJoin(column.expr, Literal(delimiter), Some(Literal(nullReplacement)))
* 使用`delimiter`将`column`的元素连接起来。
* @param column 列
* @param delimiter 分隔符
* @group collection_funcs
* @since 2.4.0
def array_join(column: Column, delimiter: String): Column = withExpr {
ArrayJoin(column.expr, Literal(delimiter), None)
* 将多个输入列连接成一个单独的列。函数适用于字符串、二进制和兼容的数组列。
* @param exprs 输入列
* @group collection_funcs
* @since 1.5.0
@scala.annotation.varargs
def concat(exprs: Column*): Column = withExpr { Concat(exprs.map(_.expr)) }
* 返回在给定数组中找到的第一个值的位置(作为长整型)。如果任一参数为null,则返回null。
* 注意:位置不是基于0的索引,而是从1开始的索引。如果在数组中找不到值,则返回0。
* @param column 数组列
* @param value 值
* @group collection_funcs
* @since 2.4.0
def array_position(column: Column, value: Any): Column = withExpr {
ArrayPosition(column.expr, lit(value).expr)
* 如果列是数组,则返回数组中给定索引处的元素。如果列是映射,则返回映射中给定键的值。
* @param column 列
* @param value 索引或键的值
* @group collection_funcs
* @since 2.4.0
def element_at(column: Column, value: Any): Column = withExpr {
ElementAt(column.expr, lit(value).expr)
* 按升序对输入数组进行排序。输入数组的元素必须是可排序的。将null元素放在返回的数组的末尾。
* @param e 数组列
* @group collection_funcs
* @since 2.4.0
def array_sort(e: Column): Column = withExpr { ArraySort(e.expr) }
* 从给定数组中删除等于element的所有元素。
* @param column 数组列
* @param element 要删除的元素
* @group collection_funcs
* @since 2.4.0
def array_remove(column: Column, element: Any): Column = withExpr {
ArrayRemove(column.expr, lit(element).expr)
* 从数组中删除重复的值。
* @param e 数组列
* @group collection_funcs
* @since 2.4.0
def array_distinct(e: Column): Column = withExpr { ArrayDistinct(e.expr) }
* 返回两个数组的交集(不包含重复的元素)。
* @param col1 数组列1
* @param col2 数组列2
* @group collection_funcs
* @since 2.4.0
def array_intersect(col1: Column, col2: Column): Column = withExpr {
ArrayIntersect(col1.expr, col2.expr)
* 返回两个数组的并集(不包含重复的元素)。
* @param col1 数组列1
* @param col2 数组列2
* @group collection_funcs
* @since 2.4.0
def array_union(col1: Column, col2: Column): Column = withExpr {
ArrayUnion(col1.expr, col2.expr)
* 返回第一个数组中但不在第二个数组中的元素(不包含重复的元素)。结果中元素的顺序不确定。
* @param col1 数组列1
* @param col2 数组列2
* @group collection_funcs
* @since 2.4.0
def array_except(col1: Column, col2: Column): Column = withExpr {
ArrayExcept(col1.expr, col2.expr)
* 根据给定的数组或映射列,为每个元素创建一个新行。
* @param e 数组或映射列
* @group collection_funcs
* @since 1.3.0
def explode(e: Column): Column = withExpr { Explode(e.expr) }
* 根据给定的数组或映射列,为每个元素创建一个新行。如果数组或映射为null或空,则返回null。
* @param e 数组或映射列
* @group collection_funcs
* @since 2.2.0
def explode_outer(e: Column): Column = withExpr { GeneratorOuter(Explode(e.expr)) }
* 根据给定的数组或映射列和元素的位置,为每个元素创建一个新行。
* @param e 数组或映射列
* @group collection_funcs
* @since 2.1.0
def posexplode(e: Column): Column = withExpr { PosExplode(e.expr) }
* 根据给定的数组或映射列和元素的位置,为每个元素创建一个新行。如果数组或映射为null或空,则返回(null, null)。
* @param e 数组或映射列
* @group collection_funcs
* @since 2.2.0
def posexplode_outer(e: Column): Column = withExpr { GeneratorOuter(PosExplode(e.expr)) }
* 根据指定的json path从json字符串中提取json对象,并返回提取后的json字符串。
* 如果输入的json字符串无效,则返回null。
* @param e json列
* @param path json path
* @group collection_funcs
* @since 1.6.0
def get_json_object(e: Column, path: String): Column = withExpr {
GetJsonObject(e.expr, lit(path).expr)
* 根据给定字段名称为JSON列创建一个新的行。
* @param json 包含JSON数据的列
* @param fields 字段名称
* @group collection_funcs
* @since 1.6.0
@scala.annotation.varargs
def json_tuple(json: Column, fields: String*): Column = withExpr {
require(fields.nonEmpty, "至少需要一个字段名称。")
JsonTuple(json.expr +: fields.map(Literal.apply))
* (仅适用于Scala)将包含JSON字符串的列解析为具有指定模式的`StructType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @param options 控制如何解析json的选项。接受与json数据源相同的选项。
* @group collection_funcs
* @since 2.1.0
def from_json(e: Column, schema: StructType, options: Map[String, String]): Column =
from_json(e, schema.asInstanceOf[DataType], options)
* (仅适用于Java)将包含JSON字符串的列解析为具有指定模式的`StructType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @param options 控制如何解析json的选项。接受与json数据源相同的选项。
* @group collection_funcs
* @since 2.1.0
def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
from_json(e, schema, options.asScala.toMap)
* (仅适用于Scala)将包含JSON字符串的列解析为`MapType`,键类型为`StringType`,
* 值类型为指定模式的`StructType`或`ArrayType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @param options 控制如何解析json的选项。接受与json数据源相同的选项。
* @group collection_funcs
* @since 2.2.0
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
JsonToStructs(schema, options, e.expr)
* (仅适用于Java)将包含JSON字符串的列解析为`MapType`,键类型为`StringType`,
* 值类型为指定模式的`StructType`或`ArrayType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @param options 控制如何解析json的选项。接受与json数据源相同的选项。
* @group collection_funcs
* @since 2.2.0
def from_json(e: Column, schema: DataType, options: java.util.Map[String, String]): Column =
from_json(e, schema, options.asScala.toMap)
* 将包含JSON字符串的列解析为具有指定模式的`StructType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @group collection_funcs
* @since 2.1.0
def from_json(e: Column, schema: StructType): Column =
from_json(e, schema, Map.empty[String, String])
* (仅适用于Java)将包含JSON字符串的列解析为具有指定模式的`StructType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @group collection_funcs
* @since 2.1.0
def from_json(e: Column, schema: DataType): Column =
from_json(e, schema, Map.empty[String, String])
* 将包含JSON字符串的列解析为具有指定模式的`MapType`,键类型为`StringType`,
* 值类型为指定模式的`StructType`或`ArrayType`。
* 如果无法解析字符串,则返回null。
* @param e 包含JSON数据的字符串列
* @param schema 解析json字符串时使用的模式
* @group collection_funcs
* @since 2.4.0
def from_json(e: Column, schema: Column): Column = {
from_json(e, schema, Map.empty[String, String].asJava)
* 解析JSON字符串并推断其DDL格式的模式。
* @param json JSON字符串
* @group collection_funcs
* @since 2.4.0
def schema_of_json(json: String): Column = schema_of_json(lit(json))
* 解析JSON字符串并推断其DDL格式的模式。
* @param json 包含JSON字符串的列
* @group collection_funcs
* @since 2.4.0
def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr))
* (仅适用于Scala)将包含`StructType`、`ArrayType`或`MapType`的列转换为具有指定模式的JSON字符串。
* 在不支持的类型的情况下抛出异常。
* @param e 包含结构体、数组或映射的列
* @param options 控制如何将结构列转换为JSON字符串的选项。接受与json数据源相同的选项。
* @group collection_funcs
* @since 2.1.0
def to_json(e: Column, options: Map[String, String]): Column = withExpr {
StructsToJson(options, e.expr)
* (仅适用于Java)将包含`StructType`、`ArrayType`或`MapType`的列转换为具有指定模式的JSON字符串。
* 在不支持的类型的情况下抛出异常。
* @param e 包含结构体、数组或映射的列
* @param options 控制如何将结构列转换为JSON字符串的选项。接受与json数据源相同的选项。
* @group collection_funcs
* @since 2.1.0
def to_json(e: Column, options: java.util.Map[String, String]): Column =
to_json(e, options.asScala.toMap)
* 将包含`StructType`、`ArrayType`或`MapType`的列转换为具有指定模式的JSON字符串。
* 在不支持的类型的情况下抛出异常。
* @param e 包含结构体、数组或映射的列
* @group collection_funcs
* @since 2.1.0
def to_json(e: Column): Column =
to_json(e, Map.empty[String, String])
* 返回数组或映射的长度。
* @group collection_funcs
* @since 1.5.0
def size(e: Column): Column = withExpr { Size(e.expr) }
* 按升序对给定列中的数组进行排序,根据数组元素的自然顺序。
* null元素将放在返回的数组的开头。
* @group collection_funcs
* @since 1.5.0
def sort_array(e: Column): Column = sort_array(e, asc = true)
* 根据给定列中的数组按升序或降序排序,根据数组元素的自然顺序。
* null元素将按升序放在返回的数组的开头,按降序放在末尾。
* @group collection_funcs
* @since 1.5.0
def sort_array(e: Column, asc: Boolean): Column = withExpr { SortArray(e.expr, lit(asc).expr) }
* 返回数组中的最小值。
* @group collection_funcs
* @since 2.4.0
def array_min(e: Column): Column = withExpr { ArrayMin(e.expr) }
* 返回数组中的最大值。
* @group collection_funcs
* @since 2.4.0
def array_max(e: Column): Column = withExpr { ArrayMax(e.expr) }
* 返回给定数组的随机排列。
* 注意:该函数是非确定性的。
* @group collection_funcs
* @since 2.4.0
def shuffle(e: Column): Column = withExpr { Shuffle(e.expr) }
* 返回一个反转的字符串或数组,或者元素顺序相反的数组。
* @group collection_funcs
* @since 1.5.0
def reverse(e: Column): Column = withExpr { Reverse(e.expr) }
* 从数组的数组中创建一个单一的数组。如果嵌套数组的结构深度大于两级,则仅删除一级嵌套。
* @group collection_funcs
* @since 2.4.0
def flatten(e: Column): Column = withExpr { Flatten(e.expr) }
* 从start到stop生成一个整数序列,步长为step。
* @group collection_funcs
* @since 2.4.0
def sequence(start: Column, stop: Column, step: Column): Column = withExpr {
new Sequence(start.expr, stop.expr, step.expr)
* 从start到stop生成一个整数序列,
* 如果start小于等于stop,则步长为1;否则为-1。
* @group collection_funcs
* @since 2.4.0
def sequence(start: Column, stop: Column): Column = withExpr {
new Sequence(start.expr, stop.expr)
* 创建一个包含左参数重复右参数次数的数组。
* @group collection_funcs
* @since 2.4.0
def array_repeat(left: Column, right: Column): Column = withExpr {
ArrayRepeat(left.expr, right.expr)
* 创建一个包含左参数重复右参数次数的数组。
* @group collection_funcs
* @since 2.4.0
def array_repeat(e: Column, count: Int): Column = array_repeat(e, lit(count))
* 返回包含映射键的无序数组。
* @group collection_funcs
* @since 2.3.0
def map_keys(e: Column): Column = withExpr { MapKeys(e.expr) }
* 返回包含映射值的无序数组。
* @group collection_funcs
* @since 2.3.0
def map_values(e: Column): Column = withExpr { MapValues(e.expr) }
* 从给定的条目数组创建一个map。
* @group collection_funcs
* @since 2.4.0
def map_from_entries(e: Column): Column = withExpr { MapFromEntries(e.expr) }
* 返回合并所有给定映射的结果。
* @group collection_funcs
* @since 2.4.0
def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) }
udf函数
// scalastyle:off line.size.limit
// scalastyle:off parameter.number
/* 使用以下代码生成:
(0 to 10).foreach { x =>
val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"Try(ScalaReflection.schemaFor(typeTag[A$i])).toOption :: $s"})
println(s"""
| * 将具有$x个参数的Scala闭包定义为用户定义的函数(UDF)。
| * 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
| * 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
| * @group udf_funcs
| * @since 1.3.0
|def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
| val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
| val inputSchemas = $inputSchemas
| val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
| if (nullable) udf else udf.asNonNullable()
|}""".stripMargin)
(0 to 10).foreach { i =>
val extTypeArgs = (0 to i).map(_ => "_").mkString(", ")
val anyTypeArgs = (0 to i).map(_ => "Any").mkString(", ")
val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs]]"
val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
val funcCall = if (i == 0) "() => func" else "func"
println(s"""
| * 将Java UDF$i实例定义为用户定义的函数(UDF)。
| * 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
| * 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
| * @group udf_funcs
| * @since 2.3.0
|def udf(f: UDF$i[$extTypeArgs], returnType: DataType): UserDefinedFunction = {
| val func = f$anyCast.call($anyParams)
| SparkUserDefinedFunction.create($funcCall, returnType, inputSchemas = Seq.fill($i)(None))
|}""".stripMargin)
//////////////////////////////////////////////////////////////////////////////////////////////
// Scala UDF functions
//////////////////////////////////////////////////////////////////////////////////////////////
* 将具有0个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
* 将具有1个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
* 将具有6个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
* 将具有7个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
* 将具有8个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A8])).toOption :: Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
* 将具有9个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A8])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A9])).toOption :: Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
* 将具有10个参数的Scala闭包定义为用户定义的函数(UDF)。
* 数据类型根据Scala闭包的签名自动推断。默认情况下,返回的UDF是确定性的。
* 要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 1.3.0
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A3])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A4])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A5])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A6])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A7])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A8])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A9])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A10])).toOption :: Nil
val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)
if (nullable) udf else udf.asNonNullable()
//////////////////////////////////////////////////////////////////////////////////////////////
// Java UDF functions
//////////////////////////////////////////////////////////////////////////////////////////////
* 将Java UDF0实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF0[Any]].call()
SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
* 将Java UDF1实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(1)(None))
* 将Java UDF2实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF2[_, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(2)(None))
* 将Java UDF3实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF3[_, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(3)(None))
* 将Java UDF4实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF4[_, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(4)(None))
* 将Java UDF5实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF5[_, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(5)(None))
* 将Java UDF6实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF6[_, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(6)(None))
* 将Java UDF7实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(7)(None))
* 将Java UDF8实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(8)(None))
* 将Java UDF9实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(9)(None))
* 将Java UDF10实例定义为用户定义的函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* 默认情况下,返回的UDF是确定性的。要将其更改为不确定性,请调用API `UserDefinedFunction.asNondeterministic()`。
* @group udf_funcs
* @since 2.3.0
def udf(f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any)
SparkUserDefinedFunction.create(func, returnType, inputSchemas = Seq.fill(10)(None))
// scalastyle:on parameter.number
// scalastyle:on line.size.limit
* 使用Scala闭包定义确定性的用户定义函数(UDF)。
* 调用者必须指定输出数据类型,不进行自动输入类型强制转换。
* @param f Scala闭包
* @param dataType UDF的输出数据类型
* @group udf_funcs
* @since 2.0.0
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = {
// TODO: should call SparkUserDefinedFunction.create() instead but inputSchemas is currently
// unavailable. We may need to create type-safe overloaded versions of udf() methods.
new UserDefinedFunction(f, dataType, inputTypes = None)
* 调用用户定义函数(UDF)。
* 示例:
* {{{
* import org.apache.spark.sql._
* val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
* val spark = df.sparkSession
* spark.udf.register("simpleUDF", (v: Int) => v * v)
* df.select($"id", callUDF("simpleUDF", $"value"))
* }}}
* @group udf_funcs
* @since 1.5.0