# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import concat, concat_ws, lit
import os
# 创建一个连接
spark = SparkSession. \
Builder(). \
appName('sql'). \
master('local'). \
getOrCreate()
df = spark.read.format("csv"). \
option("header", "false"). \
option("delimiter"," "). \
load("file:///home/software/20220126/pre.txt")
df1 = df.withColumnRenamed('_c0','id1').withColumnRenamed('_c1','id2_1')
df2 = df.withColumnRenamed('_c0','id1').withColumnRenamed('_c1','id2_2')
df = df1.drop('id2_1').distinct()
# 首先通过表连接 t1.id1 = t2.id1 and t1.id2 != t2.id2 构造一个需要的多行数据
df3 = df1.join(df2,df1.id1 == df2.id1 ,'inner').select(df1.id1, df1.id2_1, df2.id2_2)
df4 = df3.select("id1", "id2_1", "id2_2").where(" id2_1 != id2_2 ")
# 其次剔除掉id1这个多余的列,id2_1可以有id2_2 这么多个间接好友,因为可能存在重复,进行去重操作
df5 = df4.drop('id1')
df6 = df5.distinct()
# 因为上述的结果集是id2_1的间接好友集,但是可能也会含有id2_1的直接好友,需要剔除
df7 = df6.select("id2_1", "id2_2").subtract(df1.select("id1", "id2_1"))
# df8 = df7.groupby('id2_1').agg(collect_list(df7["id2_2"]).alias("id2_2_new"))
df8 = df7.groupby('id2_1').agg(
concat_ws(
" | ",
collect_list(
concat(lit("("), concat_ws(", ", 'id2_2'), lit(")"))
)).alias("id2_2_new"))
df9 = df8.join(df, df.id1 == df8.id2_1,'inner').select(df8.id2_1, df8.id2_2_new)
df9.show()
#保留第一行,以逗号作为分隔符,#overwrite 清空后再写入
file1=r"file:///home/software/20220126/output"
df10 = df9.coalesce(numPartitions= 1)
df10.write.csv(file1)
# 关闭spark会话
spark.stop()
运行截图:
option("header", "false").
option("delimiter"," ").
load("file:///home/software/20220126/pre.txt")
2.2.2 collect_list使用注意
df8 = df7.groupby('id2_1').agg(collect_list(df7["id2_2"]).alias("id2_2_new"))
df8 = df7.groupby('id2_1').agg(
concat_ws(
" | ",
collect_list(
concat(lit("("), concat_ws(", ", 'id2_2'), lit(")"))
)).alias("id2_2_new"))
注释的df8输出会报错:
pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<string> data type.;'
2.2.3 getPythonAuthSocketTimeout does not exist in the JVM
运行代码报错:
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getPythonAuthSocketTimeout does not exist in the JVM
网上搜索了一下,spark安装也没问题,python安装也没有问题,只是python找不到spark,此时需要安装findspark包
pip install findspark
然后在程序中添加一以下代码
import findspark
findspark.init()