data_path = "hdfs://..."
df = spark.read.parquet(data_path)
三、基本操作
2.1 建立SparkSession对象
一切操作之前需要先建立一个SparkSession对象(运行Spark code的Entrance point,可以理解为交互部件):
详见: pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local" ).appName("Word Count" ).config("spark.some.config.option" , "some-value" ).getOrCreate()
如果遇到如下报错
Traceback (most recent call last):
File "/Users/my_name/caogao/code_test_1/code_test_pyspark.py" , line 5 , in <module>
spark = SparkSession.builder.master("local" ).getOrCreate()
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/sql/session.py" , line 186 , in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py" , line 376 , in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py" , line 136 , in __init__
conf, jsc, profiler_cls)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py" , line 198 , in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py" , line 315 , in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/java_gateway.py" , line 1569 , in __call__
answer, self._gateway_client, None , self._fqn)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/protocol.py" , line 328 , in get_return_value
format (target_id, "." , name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None .org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Can't assign requested address: Service ' sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service ' sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
则在开头添加代码
import pyspark
conf = pyspark.SparkConf().set ('spark.driver.host' ,'127.0.0.1' )
sc = pyspark.SparkContext(master='local' , appName='myAppName' ,conf=conf)
参考:解决方案
2.2 创建模拟数据表
test = []
test.append((1 , 'age' , '30' , 50 , 40 ))
test.append((1 , 'city' , 'beijing' , 50 , 40 ))
test.append((1 , 'gender' , 'fale' , 50 , 40 ))
test.append((1 , 'height' , '172cm' , 50 , 40 ))
test.append((1 , 'weight' , '70kg' , 50 , 40 ))
test.append((2 , 'age' , '26' , 100 , 80 ))
test.append((2 , 'city' , 'beijing' , 100 , 80 ))
test.append((2 , 'gender' , 'fale' , 100 , 80 ))
test.append((2 , 'height' , '170cm' , 100 , 80 ))
test.append((2 , 'weight' , '65kg' , 100 , 80 ))
test.append((3 , 'age' , '35' , 99 , 99 ))
test.append((3 , 'city' , 'nanjing' , 99 , 99 ))
test.append((3 , 'gender' , 'female' , 99 , 99 ))
test.append((3 , 'height' , '161cm' , 99 , 99 ))
test.append((3 , 'weight' , '50kg' , 99 , 99 ))
df = spark.createDataFrame(test,
['user_id' , 'attr_name' ,'attr_value' , 'income' , 'expenses' ])
df = spark.createDataFrame([('1' , 'Joe' , '70000' , '1' ), ('2' , 'Henry' , '80000' , None )],
['Id' , 'Name' , 'Sallary' , 'DepartmentId' ])
2.3 查
2.3.1 行元素查询操作
1. 打印数据
df.show()默认打印前20条数据,当然可以指定具体打印多少条数据。
如果有些属性值特别长,pyspark会截断数据导致打不全,这时候可以使用. df.show(truncate=False)
>>> df.show()
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 1 | age| 30 | 50 | 40 |
| 1 | city| beijing| 50 | 40 |
| 1 | gender| fale| 50 | 40 |
| 1 | height| 172cm| 50 | 40 |
| 1 | weight| 70kg| 50 | 40 |
| 2 | age| 26 | 100 | 80 |
| 2 | city| beijing| 100 | 80 |
| 2 | gender| fale| 100 | 80 |
| 2 | height| 170cm| 100 | 80 |
| 2 | weight| 65kg| 100 | 80 |
| 3 | age| 35 | 99 | 99 |
| 3 | city| nanjing| 99 | 99 |
| 3 | gender| female| 99 | 99 |
| 3 | height| 161cm| 99 | 99 |
| 3 | weight| 50kg| 99 | 99 |
+-------+---------+----------+------+--------+
>>> df.show(3 )
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 1 | age| 30 | 50 | 40 |
| 1 | city| beijing| 50 | 40 |
| 1 | gender| fale| 50 | 40 |
+-------+---------+----------+------+--------+
only showing top 3 rows
2. 打印概要
>>> df.printSchema()
|-- user_id: long (nullable = true)
|-- attr_name: string (nullable = true)
|-- attr_value: string (nullable = true)
|-- income: long (nullable = true)
|-- expenses: long (nullable = true)
3. 查询总行数
>>> df.count()
4. 获取头几行到本地
>>> list = df.head(3 )
>>> df.head(3 )
[Row(user_id=1 , attr_name=u'age' , attr_value=u'30' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'city' , attr_value=u'beijing' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'gender' , attr_value=u'fale' , income=50 , expenses=40 )]
>>> df.take(5 )
[Row(user_id=1 , attr_name=u'age' , attr_value=u'30' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'city' , attr_value=u'beijing' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'gender' , attr_value=u'fale' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'height' , attr_value=u'172cm' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'weight' , attr_value=u'70kg' , income=50 , expenses=40 )]
5. 查询某列为null的行
>>> from pyspark.sql.functions import isnull
>>> df = df.filter (isnull("income" ))
>>> df.show()
19 /02/22 17 :05:51 WARN DFSClient: Slow ReadProcessor read fields took 87487ms (threshold=30000ms); ack: seqno: 198 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 17565965 flag: 0 flag: 0 flag: 0 , targets: [DatanodeInfoWithStorage[172.21 .3 .38 :50010 ,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21 .80 .165 :50010 ,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21 .151 .40 :50010 ,DS-29ba84d5-ad7d-407f-9484 -d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
+-------+---------+----------+------+--------+
6. 输出list类型,list中每个元素是Row类:
>>> df.collect()
[Row(user_id=1 , attr_name=u'age' , attr_value=u'30' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'city' , attr_value=u'beijing' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'gender' , attr_value=u'fale' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'height' , attr_value=u'172cm' , income=50 , expenses=40 ), Row(user_id=1 , attr_name=u'weight' , attr_value=u'70kg' , income=50 , expenses=40 ), Row(user_id=2 , attr_name=u'age' , attr_value=u'26' , income=100 , expenses=80 ), Row(user_id=2 , attr_name=u'city' , attr_value=u'beijing' , income=100 , expenses=80 ), Row(user_id=2 , attr_name=u'gender' , attr_value=u'fale' , income=100 , expenses=80 ), Row(user_id=2 , attr_name=u'height' , attr_value=u'170cm' , income=100 , expenses=80 ), Row(user_id=2 , attr_name=u'weight' , attr_value=u'65kg' , income=100 , expenses=80 ), Row(user_id=3 , attr_name=u'age' , attr_value=u'35' , income=99 , expenses=99 ), Row(user_id=3 , attr_name=u'city' , attr_value=u'nanjing' , income=99 , expenses=99 ), Row(user_id=3 , attr_name=u'gender' , attr_value=u'female' , income=99 , expenses=99 ), Row(user_id=3 , attr_name=u'height' , attr_value=u'161cm' , income=99 , expenses=99 ), Row(user_id=3 , attr_name=u'weight' , attr_value=u'50kg' , income=99 , expenses=99 )]
注:此方法将所有数据全部导入到本地,返回一个Array对象。当然,我们可以取出Array中的值,是一个Row,我们也可以取出Row中的值。
>>> list = df.collect()
>>> 19 /02/22 16 :54 :04 WARN DFSClient: Slow ReadProcessor read fields took 43005ms (threshold=30000ms); ack: seqno: 179 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 18446744073455908425 flag: 0 flag: 0 flag: 0 , targets: [DatanodeInfoWithStorage[172.21 .3 .38 :50010 ,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21 .80 .165 :50010 ,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21 .151 .40 :50010 ,DS-29ba84d5-ad7d-407f-9484 -d85aa3f0a736,DISK]]
>>> list [0 ]
Row(user_id=1 , attr_name=u'age' , attr_value=u'30' , income=50 , expenses=40 )
>>> list [0 ][1 ]
u'age'
7. 查询概况
>>> df.describe().show()
19 /02/22 16 :58 :23 WARN DFSClient: Slow ReadProcessor read fields took 78649ms (threshold=30000ms); ack: seqno: 188 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 187817284 flag: 0 flag: 0 flag: 0 , targets: [DatanodeInfoWithStorage[172.21 .3 .38 :50010 ,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21 .80 .165 :50010 ,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21 .151 .40 :50010 ,DS-29ba84d5-ad7d-407f-9484 -d85aa3f0a736,DISK]]
+-------+------------------+---------+------------------+-----------------+------------------+
|summary| user_id|attr_name| attr_value| income| expenses|
+-------+------------------+---------+------------------+-----------------+------------------+
| count| 15 | 15 | 15 | 15 | 15 |
| mean| 2.0 | null|30.333333333333332 | 83.0 | 73.0 |
| stddev|0.8451542547285166 | null| 4.509249752822894 |24.15722311383137 |25.453037988757707 |
| min | 1 | age| 161cm| 50 | 40 |
| max | 3 | weight| nanjing| 100 | 99 |
+-------+------------------+---------+------------------+-----------------+------------------+
8. 去重set操作
distinct() 无法传参
>>> df.distinct().show()
+-------+
|user_id|
+-------+
| 1 |
| 3 |
| 2 |
+-------+
去重并计数
df.groupBy("col1" ).agg(F.countDistinct("col2" )).orderBy("col1" , ascending=False ).show()
df1 = req_df.filter ("col1=1" ).select("col2" ).dropDuplicates(subset=["col2" ])
df1.count()
dfn = req_df.filter ("col1=n" ).select("col2" ).dropDuplicates(subset=["col2" ])
dfn.count()
如果要传参 ,选择需要去重的列,采用dropDuplicates()
A = [("A" , 1 , "AAA" , "AAAAA" ), ("A" , 2 , "AAA" , "AAAAA" )]
df = spark.createDataFrame(A,['name' ,'id' , "name1" , "name2" ])
df.show()
+----+---+-----+-----+
|name| id |name1|name2|
+----+---+-----+-----+
| A| 1 | AAA|AAAAA|
| A| 2 | AAA|AAAAA|
+----+---+-----+-----+
df.dropDuplicates().show()
+----+---+-----+-----+
|name| id |name1|name2|
+----+---+-----+-----+
| A| 2 | AAA|AAAAA|
| A| 1 | AAA|AAAAA|
+----+---+-----+-----+
df.dropDuplicates(subset=["name" , "name1" , "name2" ]).show()
+----+---+-----+-----+
|name| id |name1|name2|
+----+---+-----+-----+
| A| 1 | AAA|AAAAA|
+----+---+-----+-----+
df.dropDuplicates(subset=[c for c in df.columns if c != "id" ]).show()
+----+---+-----+-----+
|name| id |name1|name2|
+----+---+-----+-----+
| A| 1 | AAA|AAAAA|
+----+---+-----+-----+
2.3.2 列元素操作
1. 选择一列或多列:select
一般来说,select
和 selectExpr
是一样的,区别可以看 Spark---DataFrame学习(二)——select、selectExpr函数
df.select("age" ).show()
df["age" ]
df.age
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1 )
df.select(df.a, df.b, df.c)
df.select(df["a" ], df["b" ], df["c" ])
2. where按条件选择 (filter 和 where 是一样的 )
语法:where(conditionExpr: String)
传入筛选条件表达式,可以用and
和or
。得到DataFrame类型的返回结果
注意:字符串 b 需要加引号
>>> df.where("id = 1 or c1 = 'b'" ).show()
+-------+---------+----------+------+--------+
| id |attr_name|attr_value|income| c1 |
+-------+---------+----------+------+--------+
| 1 | age| 30 | 50 | c|
| 2 | city| beijing| 50 | b|
| 2 | gender| fale| 50 | b|
| 3 | height| 172cm| 50 | b|
| 4 | weight| 70kg| 50 | b|
+-------+---------+----------+------+--------+
3. filter 根据字段选择 (filter 和 where 是一样的 )
注意:filter 有好几种用法,推荐第一种
df.filter ("id = 1 or c1 = 'b'" ).show()
df.filter ((df.id =="1" ) & (df.c1=="b" ))
df.filter ((df.id =="1" ) | (df.c1=="b" ))
df.filter ('id=="1"' ).filter ('c1=="b"' )
df.filter ("id == 1 or c1 == 'b'" )
对于 bool 型字段
A = [('Pirate' ,True ),('Monkey' ,False ), ('Ninja' ,True ),('Dodo' ,False ), ('Spa' ,False )]
df = spark.createDataFrame(A,['name' ,'is_boy' ])
df.show()
+------+------+
| name|is_boy|
+------+------+
|Pirate| true|
|Monkey| false|
| Ninja| true|
| Dodo| false|
| Spa| false|
+------+------+
df.filter ("is_boy=True" ).show()
+------+------+
| name|is_boy|
+------+------+
|Pirate| true|
| Ninja| true|
+------+------+
df.filter ("is_boy=true" ).show()
+------+------+
| name|is_boy|
+------+------+
|Pirate| true|
| Ninja| true|
+------+------+
df.filter ("is_boy" ).show()
+------+------+
| name|is_boy|
+------+------+
|Pirate| true|
| Ninja| true|
+------+------+
df.filter ("is_boy=False" ).show()
+------+------+
| name|is_boy|
+------+------+
|Monkey| false|
| Dodo| false|
| Spa| false|
+------+------+
对于 Null 类型
可以有2种用法
import pyspark.sql.functions as F
df_.show()
+----+-----+
|name|value|
+----+-----+
| a| null|
| b| 2 |
| c| null|
+----+-----+
df_.filter ("value is null" ).show()
df_.filter (F.col("value" ).isNull()).show()
+----+-----+
|name|value|
+----+-----+
| a| null|
| c| null|
+----+-----+
df_.filter ("value is not null" ).show()
df_.filter (F.col("value" ).isNotNull()).show()
+----+-----+
|name|value|
+----+-----+
| b| 2 |
+----+-----+
对于空字符串(非 null)
df_.show()
+----+-----+
|name|value|
+----+-----+
| a| |
| b| 2 |
| c| |
+----+-----+
df_.filter ("value=''" ).show()
+----+-----+
|name|value|
+----+-----+
| a| |
| c| |
+----+-----+
2.3.3 排序
1. orderBy:按指定字段排序,默认为升序
>>> df.orderBy(df.income.desc()).show()
19 /02/22 18 :02:31 WARN DFSClient: Slow ReadProcessor read fields took 87360ms (threshold=30000ms); ack: seqno: 325 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 14139744 flag: 0 flag: 0 flag: 0 , targets: [DatanodeInfoWithStorage[172.21 .3 .38 :50010 ,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21 .80 .165 :50010 ,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21 .151 .40 :50010 ,DS-29ba84d5-ad7d-407f-9484 -d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 2 | gender| fale| 100 | 80 |
| 2 | weight| 65kg| 100 | 80 |
| 2 | height| 170cm| 100 | 80 |
| 2 | age| 26 | 100 | 80 |
| 2 | city| beijing| 100 | 80 |
| 3 | gender| female| 99 | 99 |
| 3 | age| 35 | 99 | 99 |
| 3 | height| 161cm| 99 | 99 |
| 3 | weight| 50kg| 99 | 99 |
| 3 | city| nanjing| 99 | 99 |
| 1 | age| 30 | 50 | 40 |
| 1 | height| 172cm| 50 | 40 |
| 1 | city| beijing| 50 | 40 |
| 1 | weight| 70kg| 50 | 40 |
| 1 | gender| fale| 50 | 40 |
+-------+---------+----------+------+--------+
2.3.4 抽样
sample是抽样函数,其中withReplacement = True or False代表是否有放回。42是seed。
t1 = train.sample(False , 0.2 , 42 )
2.4 增加、删除、修改列
增加列用 withColumn
方法
增加一列value全为0的列
from pyspark.sql.functions import lit
df.withColumn('newCol' , lit(0 )).show()
+---+-----+-------+------------+------+
| Id| Name|Sallary|DepartmentId|newCol|
+---+-----+-------+------------+------+
| 1 | Joe| 70000 | 1 | 0 |
| 2 |Henry| 80000 | null| 0 |
+---+-----+-------+------------+------+
重命名列名 pyspark系列--dataframe基础
data = spark.createDataFrame(data=[("Alberto" , 2 ), ("Dakota" , 2 )],
schema=['name' ,'length' ])
data.show()
data.printSchema()
color_df2 = color_df.selectExpr('cast(color as long) as color2' ,'length as length2' )
color_df2.show()
color_df2 = color_df.withColumnRenamed('color' ,'color2' )\
.withColumnRenamed('length' ,'length2' )
color_df2.show()
color_df.select(color_df.color.alias('color2' )).show()
2.5 groupBy 分组统计
In [63 ]: df.groupby('Sallary' ).count().show()
+-------------+-----+
|app_category2|count|
+-------------+-----+
| null| 231 |
| 77 | 215 |
| 81 | 378 |
| 84 | 14 |
+-------------+-----+
注意!正确参数是 ascending,如果误拼写成 ascending,不会报错,但是不能正确排序,要注意!!!
从小到大排序:ascending=True
从大到小排序:ascending=False
valuesA = [('Pirate' ,'boy' ,1 ),('Monkey' ,'girl' ,2 ),('Monkey' ,'boy' ,3 ),('Ninja' ,'girl' ,3 ),('Spa' ,'boy' ,4 ), ('Spa' ,'boy' ,5 ), ('Spa' ,'girl' ,7 )]
df = spark.createDataFrame(valuesA,['name' ,'sex' ,'value' ])
In [8 ]: df.show()
+------+----+-----+
| name| sex|value|
+------+----+-----+
|Pirate| boy| 1 |
|Monkey|girl| 2 |
|Monkey| boy| 3 |
| Ninja|girl| 3 |
| Spa| boy| 4 |
| Spa| boy| 5 |
| Spa|girl| 7 |
+------+----+-----+
df.groupBy("name" , "sex" ).count().orderBy("count" , ascending=False ).show()
+------+----+-----+
| name| sex|count|
+------+----+-----+
| Spa| boy| 2 |
|Monkey| boy| 1 |
| Spa|girl| 1 |
|Monkey|girl| 1 |
|Pirate| boy| 1 |
| Ninja|girl| 1 |
+------+----+-----+
collect_set 和 collect_list
from pyspark.sql import functions as F
df.show()
+---+-----+----+
| id |value|name|
+---+-----+----+
| a| null| Leo|
| a| 11 |null|
| a| 11 |Mike|
| a| 22 | Leo|
+---+-----+----+
df.groupBy("id" ).agg(F.sum ("value" ).alias("value_sum" ), F.collect_set("value" ).alias("value_collect_set" ), F.collect_list("name" ).alias("name_collect_list" )).show()
+---+---------+-----------------+-----------------+
| id |value_sum|value_collect_set|name_collect_list|
+---+---------+-----------------+-----------------+
| a| 44 | [22 , 11 ]| [Leo, Mike, Leo]|
+---+---------+-----------------+-----------------+
关于 pk_key
当pk_key为列表时,可以用星号 *pk_key 来取出pk_key的值(似乎不用星号也行,还没看到不用星号会失败的情况)
详见下面关于 python 中单星号的用法(解压参数列表)
df1.show()
+----+-----+
|name|value|
+----+-----+
| a| 1 |
| a| 2 |
| a| 2 |
+----+-----+
df1.groupBy(*pk).agg(F.sum ("value" )).show()
+----+-----+----------+
|name|value|sum (value)|
+----+-----+----------+
| a| 1 | 1 |
| a| 2 | 4 |
+----+-----+----------+
df1.groupBy(pk).agg(F.sum ("value" )).show()
+----+-----+----------+
|name|value|sum (value)|
+----+-----+----------+
| a| 1 | 1 |
| a| 2 | 4 |
+----+-----+----------+
单个星号 *
参考 python 中单* 和双 ** 的用法
单星号的其中一个用法是解压参数列表
def func (a, b ):
print a, b
param = [1 , 2 ]
func(*param)
2.6 join 操作
如果想要保证左表不丢失数据,则需要用 left join
,否则,用普通 join 即可
在使用 left join 的时候,左表比右表大,join 不上的部分,会以 null
显示,需要手动把这些null替换为其他值,便于计算,比如替换为0
df = df.join(df1,'t_id' ,'left' ).withColumn('is_name' ,F.coalesce('my_col' ,F.lit(0 ))).drop('my_col' )
join 两个表,二者有一个共同列 “ad_id”
想知道第一个表的 ad_id 对应的 ocpc_type,所以需要到表2去找
df = spark.createDataFrame([('1' , 'Joe' ), ('4' , 'Henry' ), ('1' , 'Nan' ), ('4' , 'Hesssnry' )], ['ad_id' , 'Name' ])
df2 = spark.createDataFrame([('1' , 'A' ), ('4' , 'B' ), ('5' , 'C' )], ['ad_id' , 'ocpc_type' ])
df3 = df2.join(df, on='ad_id' , how='left' )
df3.show()
+-----+---------+--------+
|ad_id|ocpc_type| Name|
+-----+---------+--------+
| 5 | C| null|
| 1 | A| Joe|
| 1 | A| Nan|
| 4 | B| Henry|
| 4 | B|Hesssnry|
+-----+---------+--------+
df3.filter ('ocpc_type == "A"' ).show()
+-----+---------+----+
|ad_id|ocpc_type|Name|
+-----+---------+----+
| 1 | A| Joe|
| 1 | A| Nan|
+-----+---------+----+
df3 = df.join(df2, on='ad_id' , how='left' )
df3.show()
+-----+--------+---------+
|ad_id| Name|ocpc_type|
+-----+--------+---------+
| 1 | Joe| A|
| 1 | Nan| A|
| 4 | Henry| B|
| 4 |Hesssnry| B|
+-----+--------+---------+
可以理解为,哪个表在join操作的前面,就以其为主,后面的为补充
left_semi
取 df1 和 df2 相交的部分,df1的数据
left_anti
取 df1 和 df2 相交的部分,df1的余下 数据
df1.show()
+---+---+
| id |num|
+---+---+
| A| 1 |
| B| 2 |
| C| 3 |
+---+---+
df2.show()
+---+---+
| id |num|
+---+---+
| C| 33 |
| D| 4 |
| E| 5 |
+---+---+
df1.join(df2, "id" , "left" ).show()
+---+---+----+
| id |num| num|
+---+---+----+
| B| 2 |null|
| C| 3 | 33 |
| A| 1 |null|
+---+---+----+
df1.join(df2, "id" , "left_semi" ).show()
+---+---+
| id |num|
+---+---+
| C| 3 |
+---+---+
df1.join(df2, "id" , "left_anti" ).show()
+---+---+
| id |num|
+---+---+
| B| 2 |
| A| 1 |
+---+---+
当某个表 join 时,如果 join 的 pk_key 有重复的话,会出现组合爆炸的情况,需要保证 join 双方都没有重复的 pk_key
valuesA = [('Pirate' ,1 ),('Monkey' ,2 ),('Monkey' ,3 ),('Ninja' ,3 ),('Spaghetti' ,4 )]
TableA = spark.createDataFrame(valuesA,['name' ,'id' ])
valuesB = [('Rutabaga' ,11 ) ,('Monkey' ,22 ) ,('Monkey' ,222 ),('Ninja' ,33 ),('Darth Vader' ,44 )]
TableB = spark.createDataFrame(valuesB,['name' ,'id2' ])
TableA.join(TableB,on='name' ).show(50 ,False )
+------+---+---+
|name |id |id2|
+------+---+---+
|Ninja |3 |33 |
|Monkey|2 |222 |
|Monkey|2 |22 |
|Monkey|3 |222 |
|Monkey|3 |22 |
+------+---+---+
TableA.join(TableB,on='name' ,how='left' ).show(50 ,False )
+---------+---+----+
|name |id |id2 |
+---------+---+----+
|Spaghetti|4 |null|
|Ninja |3 |33 |
|Pirate |1 |null|
|Monkey |2 |22 |
|Monkey |2 |222 |
|Monkey |3 |22 |
|Monkey |3 |222 |
+---------+---+----+
TableA.dropDuplicates().join(TableB.dropDuplicates(),on='name' ).show(50 ,False )
+------+---+---+
|name |id |id2|
+------+---+---+
|Ninja |3 |33 |
|Monkey|3 |22 |
|Monkey|3 |222 |
|Monkey|2 |22 |
|Monkey|2 |222 |
+------+---+---+
In [25 ]: valuesC = [('Pirate' ,1 ),('Monkey' ,222 ),('Monkey' ,111 ),('Ninja' ,3 ),('Spaghetti' ,4 )]
In [26 ]: TableC = spark.createDataFrame(valuesC,['name' ,'id' ])
In [27 ]: TableC.show()
+---------+---+
| name| id |
+---------+---+
| Pirate| 1 |
| Monkey|222 |
| Monkey|111 |
| Ninja| 3 |
|Spaghetti| 4 |
+---------+---+
In [28 ]: TableC.dropDuplicates().show()
+---------+---+
| name| id |
+---------+---+
| Pirate| 1 |
| Ninja| 3 |
| Monkey|111 |
| Monkey|222 |
|Spaghetti| 4 |
+---------+---+
----------------------------------------------------------------------------------------------
In [23 ]: valuesC = [('Pirate' ,1 ),('Monkey' ,222 ),('Monkey' ,222 ),('Ninja' ,3 ),('Spaghetti' ,4 )]
In [24 ]: TableC = spark.createDataFrame(valuesC,['name' ,'id' ])
In [25 ]: TableC.show()
+---------+---+
| name| id |
+---------+---+
| Pirate| 1 |
| Monkey|222 |
| Monkey|222 |
| Ninja| 3 |
|Spaghetti| 4 |
+---------+---+
In [26 ]: TableC.dropDuplicates().show()
+---------+---+
| name| id |
+---------+---+
| Pirate| 1 |
| Ninja| 3 |
| Monkey|222 |
|Spaghetti| 4 |
+---------+---+
注意,如果列名重复,join 之后会出现重复列
df1 = spark.createDataFrame([("A" , 1 ), ("B" , 2 )], ["name" , "num" ])
df1.show()
+----+---+
|name|num|
+----+---+
| A| 1 |
| B| 2 |
+----+---+
df2 = spark.createDataFrame([("A" , 1 ), ("B" , 2 ), ("C" , 3 )], ["name" , "num" ])
df2.show()
+----+---+
|name|num|
+----+---+
| A| 1 |
| B| 2 |
| C| 3 |
+----+---+
df3 = df1.join(df2, "name" )
df3.show()
+----+---+---+
|name|num|num|
+----+---+---+
| B| 2 | 2 |
| A| 1 | 1 |
+----+---+---+
2.5 复杂用法实例
from pyspark.sql import functions as F
coalesce(与 mysql 类似)
作用是将返回传入的参数中第一个非null的值,比如
mysql
SELECT COALESCE(NULL, NULL, 1 );
Return 1
SELECT COALESCE(NULL, NULL, NULL, NULL);
Return NULL
select coalesce(a,b,c);
Spark
df = df.join(df1,'t_id' ,'left' ).withColumn('is_name' ,F.coalesce('my_col' ,F.lit(0 ))).drop('my_col' )
时间戳转时间
注意! 这里用到的是 spark SQL 的语法,而不是python的语法 ,参考 Spark SQL
valuesA = [('Pirate' ,1609785094 ),('Monkey' ,1609785094 ),('Monkey' ,1609785094 ),('Ninja' ,1609785094 ),('Spaghetti' ,0 )]
TableA = spark.createDataFrame(valuesA,['name' ,'time' ])
new_time = F.expr("FROM_UNIXTIME(`time`, 'yyyy-MM-dd')" )
df2 = df.where(new_time == "2021-01-01" )
df2.show()
如果是 python 的话,则用下面的语法
import time
dt = "2016-05-05 20:28:54"
timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S" )
dt_new = time.strftime("%Y%m%d-%H:%M:%S" ,timeArray)
print dt_new
2.7 判断两个 dataframe 是否相同
参考:Spark sql实战--如何比较两个dataframe是否相等
a = [('Pirate' ,1 ),('Monkey' ,2 )]
A = spark.createDataFrame(a,['name' ,'id' ])
In [3 ]: A.show()
+------+---+
| name| id |
+------+---+
|Pirate| 1 |
|Monkey| 2 |
+------+---+
b = [('Monkey' ,2 ),('Pirate' ,1 )]
B = spark.createDataFrame(b,['name' ,'id' ])
In [6 ]: B.show()
+------+---+
| name| id |
+------+---+
|Monkey| 2 |
|Pirate| 1 |
+------+---+
def match_df (df1, df2 ):
count1 = len (df1.subtract(df2).take(1 ))
count2 = len (df2.subtract(df1).take(1 ))
return True if count1 == count2 and count1 == 0 else False
print match_df(A, B)
2.8 交集&并集&合集
1. 交集&并集&合集
差集 except
df1.subtract(df2)
In [31 ]: df1.show()
+-----+
|value|
+-----+
| 1 |
| 2 |
| 3 |
+-----+
In [32 ]: df2.show()
+-----+
|value|
+-----+
| 2 |
| 3 |
| 4 |
+-----+
In [33 ]: df1.subtract(df2).show()
+-----+
|value|
+-----+
| 1 |
+-----+
df1.intersect(df2)
df1.union(df2)
df1.union(df2).distinct()
关于union的坑
union的2张表,必须保证字段完全相同,且字段的顺序完全相同!函数本身不会按照字段来union,只会机械得进行2表的拼接
df1 = spark.createDataFrame([("A" , 1 , 0 ), ("B" , 1 , 0 )], ["id" , "is_girl" , "is_boy" ])
+---+-------+------+
| id |is_girl|is_boy|
+---+-------+------+
| A| 1 | 0 |
| B| 1 | 0 |
+---+-------+------+
df2 = spark.createDataFrame([("C" , 1 , 0 ), ("D" , 1 , 0 )], ["id" , "is_boy" , "is_girl" ])
+---+------+-------+
| id |is_boy|is_girl|
+---+------+-------+
| C| 1 | 0 |
| D| 1 | 0 |
+---+------+-------+
df1.union(df2).show()
+---+-------+------+
| id |is_girl|is_boy|
+---+-------+------+
| A| 1 | 0 |
| B| 1 | 0 |
| C| 1 | 0 |
| D| 1 | 0 |
+---+-------+------+
df1.selectExpr("id" , "is_girl" , "is_boy" ).union(df2.selectExpr("id" , "is_girl" , "is_boy" )).show()
+---+-------+------+
| id |is_girl|is_boy|
+---+-------+------+
| A| 1 | 0 |
| B| 1 | 0 |
| C| 0 | 1 |
| D| 0 | 1 |
+---+-------+------+
2. join和交集的区别
df1 = spark.createDataFrame([("A" , 1 ), ("A" , 11 ), ("B" , 2 ), ("B" , 3 )], ["name" , "num" ]).select("name" )
df2 = spark.createDataFrame([("A" , 1 ), ("B" , 2 ), ("B" , 3 )], ["name" , "num" ]).select("name" )
df1.show()
+----+
|name|
+----+
| A|
| A|
| B|
| B|
+----+
df2.show()
+----+
|name|
+----+
| A|
| B|
| B|
+----+
In [28 ]: df1.intersect(df2).show()
+----+
|name|
+----+
| B|
| A|
+----+
In [29 ]: df1.join(df2, "name" ).show()
+----+
|name|
+----+
| B|
| B|
| B|
| B|
| A|
| A|
+----+
In [30 ]: df1.dropDuplicates().join(df2.dropDuplicates(), "name" ).show()
+----+
|name|
+----+
| B|
| A|
+----+
2.9 计算某列的均值 & 求和
valuesA = [('Pirate' ,1 ),('Monkey' ,2 ),('Monkey' ,3 ),('Ninja' ,3 ),('Spaghetti' ,4 )]
A = spark.createDataFrame(valuesA,['name' ,'id' ])
A.agg({'id' : 'avg' }).show()
+-------+
|avg(id )|
+-------+
| 2.6 |
+-------+
A.agg({'id' : 'sum' }).show()
+-------+
|sum (id )|
+-------+
| 13 |
+-------+
from pyspark.sql import functions as F
A.agg(F.avg('id' ).alias('id_avg' )).show()
+------+
|id_avg|
+------+
| 2.6 |
+------+
A.agg(F.sum ('id' ).alias('id_sum' )).show()
+------+
|id_sum|
+------+
| 13 |
+------+
例子 :求列中各个元素的占比
collect 可以取出列中的元素值
import pyspark.sql.functions as F
A = [[1 ,'CAT1' ,10 ], [2 , 'CAT2' , 20 ], [3 , 'CAT3' , 70 ]]
df = spark.createDataFrame(A, ['id' , 'cate' , 'value' ])
df.show()
+---+----+-----+
| id |cate|value|
+---+----+-----+
| 1 |CAT1| 10 |
| 2 |CAT2| 20 |
| 3 |CAT3| 70 |
+---+----+-----+
df.agg(F.sum ("value" )).show()
+----------+
|sum (value)|
+----------+
| 100 |
+----------+
df.groupBy("cate" ).sum ("value" ).show()
+----+----------+
|cate|sum (value)|
+----+----------+
|CAT2| 20 |
|CAT1| 10 |
|CAT3| 70 |
+----+----------+
df.groupBy("value" ).sum ().collect()
Out[36 ]:
[Row(value=10 , sum (id )=1 , sum (value)=10 ),
Row(value=20 , sum (id )=2 , sum (value)=20 ),
Row(value=70 , sum (id )=3 , sum (value)=70 )]
df.groupBy("value" ).sum ().collect()[0 ][1 ]
Out[37 ]: 1
df.groupBy("value" ).sum ().collect()[0 ][2 ]
Out[38 ]: 10
df.agg({"value" :"sum" }).collect()
Out[39 ]: Row(sum (value)=100 )
df.agg({"value" :"sum" }).collect()[0 ][0 ]
Out[41 ]: 100
df.agg(F.sum ("value" )).collect()[0 ][0 ]
Out[47 ]: 100
开始求占比
value_sum = df.agg(F.sum ("value" )).collect()[0 ][0 ]
df2 = df.withColumn("sum" , F.lit(value_sum))
df2.show()
+---+----+-----+---+
| id |cate|value|sum |
+---+----+-----+---+
| 1 |CAT1| 10 |100 |
| 2 |CAT2| 20 |100 |
| 3 |CAT3| 70 |100 |
+---+----+-----+---+
df2 = df2.withColumn("ratio" , F.round (F.col("value" ) / F.col("sum" ), 3 ))
df2.show()
+---+----+-----+---+-----+
| id |cate|value|sum |ratio|
+---+----+-----+---+-----+
| 1 |CAT1| 10 |100 | 0.1 |
| 2 |CAT2| 20 |100 | 0.2 |
| 3 |CAT3| 70 |100 | 0.7 |
+---+----+-----+---+-----+
四、复杂操作
4.1 concat_ws 重组列
concat_ws
import pyspark.sql.functions as F
df1.show()
+----+-----+
|name|value|
+----+-----+
| a| 1 |
| b| 2 |
+----+-----+
df1.select(F.concat_ws("_" , F.col("name" ), F.col("value" ).alias("name_value" )), "name" ).show()
+-----------------------------------------+----+
|concat_ws(_, name, value AS `name_value`)|name|
+-----------------------------------------+----+
| a_1| a|
| b_2| b|
+-----------------------------------------+----+
4.2 udf 复杂自定义函数
参考:【Pyspark】UDF函数的使用、UDF传入多个参数、UDF传出多个参数、传入特殊数据类型
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import functions as F
A = [("a" , [1 ,2 ,3 ], [10 , 20 , 30 ]), ("b" , [4 , 5 , 6 ], [100 , 200 , 300 ])]
df1 = spark.createDataFrame(A, ["name" , "value1" , "value2" ])
df1.show()
+----+---------+---------------+
|name| value1| value2|
+----+---------+---------------+
| a|[1 , 2 , 3 ]| [10 , 20 , 30 ]|
| b|[4 , 5 , 6 ]|[100 , 200 , 300 ]|
+----+---------+---------------+
def func (list1, list2 ):
list1 和 list2 分别是表的两个列名
list3 = []
for i, j in zip (list1, list2):
list3.append(i * j)
return list3
func_udf = F.udf(func, ArrayType(IntegerType()))
df2 = df1.withColumn("new_col" , func_udf("value1" , "value2" ))
df2.show()
+----+---------+---------------+-----------------+
|name| value1| value2| new_col|
+----+---------+---------------+-----------------+
| a|[1 , 2 , 3 ]| [10 , 20 , 30 ]| [10 , 40 , 90 ]|
| b|[4 , 5 , 6 ]|[100 , 200 , 300 ]|[400 , 1000 , 1800 ]|
+----+---------+---------------+-----------------+
4.3 window 分组排序
Spark Window 入门介绍
Spark Window Functions-PySpark(窗口函数)
需求是,先对表中数据分组,再在组内进行排序
找出每个科目中,排名第一的学生
from pyspark.sql import Window
from pyspark.sql import functions as F
df = spark.createDataFrame((
["A" , 1 , "Science" , 20 ],
["B" , 1 , "Science" , 80 ],
["C" , 2 , "Science" , 90 ],
["D" , 2 , "Science" , 40 ],
["E" , 3 , "Science" , 60 ],
["F" , 4 , "Art" , 60 ],
["G" , 4 , "Art" , 50 ],
["H" , 5 , "Art" , 90 ],
["I" , 5 , "Art" , 100 ],
["J" , 6 , "Art" , 20 ],
), ["name" , "class" , "subject" , "score" ])
window = Window.partitionBy("subject" ).orderBy(F.desc("score" ))
df = df.withColumn("rank" , F.row_number().over(window))
+----+-----+-------+-----+----+
|name|class |subject|score|rank|
+----+-----+-------+-----+----+
| C| 2 |Science| 90 | 1 |
| B| 1 |Science| 80 | 2 |
| E| 3 |Science| 60 | 3 |
| D| 2 |Science| 40 | 4 |
| A| 1 |Science| 20 | 5 |
| I| 5 | Art| 100 | 1 |
| H| 5 | Art| 90 | 2 |
| F| 4 | Art| 60 | 3 |
| G| 4 | Art| 50 | 4 |
| J| 6 | Art| 20 | 5 |
+----+-----+-------+-----+----+
df.filter ("rank=1" ).show()
+----+-----+-------+-----+----+
|name|class |subject|score|rank|
+----+-----+-------+-----+----+
| C| 2 |Science| 90 | 1 |
| I| 5 | Art| 100 | 1 |
+----+-----+-------+-----+----+
4.4 中文乱码问题
u"中文"
即可
In [61 ]: A = [("a" , "快手" ), ("b" , "抖音" )]
In [62 ]: df__ = spark.createDataFrame(A, ["id" , "name" ])
In [63 ]: df__.show()
+---+------+
| id | name|
+---+------+
| a|å¿«æ|
| b|æé³|
+---+------+
In [58 ]: A = [("a" , u"快手" ), ("b" , u"抖音" )]
In [59 ]: df__ = spark.createDataFrame(A, ["id" , "name" ])
In [60 ]: df__.show()
+---+----+
| id |name|
+---+----+
| a| 快手|
| b| 抖音|
+---+----+
4.5 split 拆分
from pyspark.sql.functions import split
from pyspark.sql import functions as F
A = [("A" , "20%" ), ("B" , "18%" )]
df_ = spark.createDataFrame(A, ["name" , "ratio1" ])
df2_ = df_.withColumn('ratio1_new' , split(F.col("ratio1" ), "%" ).getItem(0 ) * F.lit(0.01 ))
df2_.show()
+----+------+----------+
|name|ratio1|ratio1_new|
+----+------+----------+
| A| 20 %| 0.2 |
| B| 18 %| 0.18 |
+----+------+----------+
五、值替换
5.1 空值替换
法一、使用 fillna 函数
A = [("a" , 1 , None ), ("b" , None , 2 ), ("c" , None , None )]
df_ = spark.createDataFrame(A, ["name" , "value1" , "value2" ])
df_.show()
+----+------+------+
|name|value1|value2|
+----+------+------+
| a| 1 | null|
| b| null| 2 |
| c| null| null|
+----+------+------+
df_.fillna({"value1" : 0.0 , "value2" : 11.0 }).show()
+----+------+------+
|name|value1|value2|
+----+------+------+
| a| 1 | 11 |
| b| 0 | 2 |
| c| 0 | 11 |
+----+------+------+
法二、when...otherwise 替换
这个方法可以进行复杂的值替换
from pyspark.sql import functions as F
A = [("a" , 1 , None ), ("b" , None , 2 ), ("c" , None , None )]
df_ = spark.createDataFrame(A, ["name" , "value1" , "value2" ])
df_.show()
+----+------+------+
|name|value1|value2|
+----+------+------+
| a| 1 | null|
| b| null| 2 |
| c| null| null|
+----+------+------+
df_.withColumn("value3" , F.when(F.col("value1" )<10 , F.lit(10 )).otherwise(F.lit(-10 ))).show()
+----+------+------+------+
|name|value1|value2|value3|
+----+------+------+------+
| a| 1 | null| 10 |
| b| null| 2 | -10 |
| c| null| null| -10 |
+----+------+------+------+
df_.withColumn("value1" ,F.when(F.col("value1" ).isNull(),F.lit(0.0 )).otherwise(F.lit(F.col("value1" ))))\
.withColumn("value2" , F.when(F.col("value2" ).isNull(),F.lit(11 )).otherwise(F.lit(F.col("value2" )))).show()
+----+------+------+
|name|value1|value2|
+----+------+------+
| a| 1.0 | 11 |
| b| 0.0 | 2 |
| c| 0.0 | 11 |
+----+------+------+
group1 = ["a" ]
group2 = ["b" ]
df = df.withColumn("group" , F.when(F.col("name" ).isin(group1), F.lit("goup_1" )).when(F.col("name" ).isin(group2), F.lit("goup_2" )).otherwise(F.lit("group_other" )))
+----+------+------+-----------+
|name|value1|value2| group|
+----+------+------+-----------+
| a| 1 | null| goup_1|
| b| null| 2 | goup_2|
| c| null| null|group_other|
+----+------+------+-----------+
df = df.withColumn("value" , F.when(F.col("value1" )==1.0 , F.lit("value_is_1" )).otherwise(F.lit("value_is_other" )))
+----+------+------+--------------+
|name|value1|value2| value|
+----+------+------+--------------+
| a| 1 | null| value_is_1|
| b| null| 2 |value_is_other|
| c| null| null|value_is_other|
+----+------+------+--------------+
bool 值的情况
使用双等于号 "=="
df.show()
+----+-----+
|name|value|
+----+-----+
| a| true|
| b|false|
| c| true|
+----+-----+
df.withColumn("value_new" , F.when(F.col("value" )==True , F.lit(1 )).otherwise(F.lit(0 ))).show()
+----+-----+---------+
|name|value|value_new|
+----+-----+---------+
| a| true| 1 |
| b|false| 0 |
| c| true| 1 |
+----+-----+---------+
pyspark.sql api 文档
Spark-SQL之DataFrame操作大全
Spark 2.2.x 中文文档
Pyspark数据基础操作集合(DataFrame)
PySpark-DataFrame各种常用操作举例
(超详细)PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理
pyspark.sql module
Spark---DataFrame学习(二)——select、selectExpr函数
Spark
1891
自在的LEE
Kubernetes
Spark
4455
ShowMeAI
pandas
Spark
706
vivo互联网技术
Apache Hive
Spark
328
ShowMeAI
pandas
Spark
Jupyter