将CST_NO 列
var neg_tmp = data_tmp.select("CST_NO").collect().map(_(0)).toList
println(neg_tmp.length)
// 取第一行 neg_tmp(0)
var neg_list = neg_tmp(0).toString.split(",")
println(neg_list)
neg_tmp: List[Any] = List(ke,sun,tian,sun)
neg_list: Array[String] = Array(ke, sun, tian, sun)
参考博客: 点击传送
List去重
1, 最简单直接办法是用distinct
scala> val l = List(1,2,3,3,4,4,5,5,6,6,6,8,9)
l: List[Int] = List(1, 2, 3, 3, 4, 4, 5, 5, 6, 6, 6, 8, 9)
scala> l.distinct
res32: List[Int] = List(1, 2, 3, 4, 5, 6, 8, 9)
2, toSet
scala> l.toSet.toList
res33: List[Int] = List(5, 1, 6, 9, 2, 3, 8, 4)
参考博客: 点击传送
// 注 表格里值一定要统一格式 ,全转化为String(null除外,没意义) 如果没有则toDF方法报错
var lst = List[String]("57.54", "trusfortMeans", null, "20190720", "5852.00", null, null, "25.77", null)
var name_list = List("idm", "CO", "distrn","dayId", "Ant", "CLP", "CAC", "PE_num","CE")
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
var df = List((lst.toArray)).toDF("features")
//df: org.apache.spark.sql.DataFrame = [id: int, features: vector]
df.show()
+--------------------+
| features|
+--------------------+
|[57.54, trusfortM...|
+--------------------+
// name_list为列名 lst为一行的值
// 注 表格里值一定要统一格式 ,全转化为String(null除外,没意义) 如果没有则toDF方法报错
var lst = List[String]("57.54", "trusfortMeans", null, "20190720", "5852.00", null, null, "25.77", null)
var name_list = List("idm", "CO", "distrn","dayId", "Ant", "CLP", "CAC", "PE_num","CE")
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
var df = List((lst.toArray)).toDF("features")
//df: org.apache.spark.sql.DataFrame = [id: int, features: vector]
df.show()
// +--------------------+
// | features|
// +--------------------+
// |[57.54, trusfortM...|
// +--------------------+
// sizeof `elements` should be equal to the number of entries in column `features`
val elements = name_list.toArray
// Create a SQL-like expression using the array
val sqlExpr = elements.zipWithIndex.map{ case (alias, idx) => col("features").getItem(idx).as(alias) }
// Extract Elements from dfArr
df = df.select(sqlExpr : _*)
df.show()
df: org.apache.spark.sql.DataFrame = [features: array<string>]
+--------------------+
| features|
+--------------------+
|[57.54, trusfortM...|
+--------------------+
df: org.apache.spark.sql.DataFrame = [idm: string, CO: string ... 7 more fields]
+-----+-------------+------+--------+-------+----+----+------+----+
| idm| CO|distrn| dayId| Ant| CLP| CAC|PE_num| CE|
+-----+-------------+------+--------+-------+----+----+------+----+
|57.54|trusfortMeans| null|20190720|5852.00|null|null| 25.77|null|
+-----+-------------+------+--------+-------+----+----+------+----+
参考链接:点击传送
import spark.implicits._var data_csv = Seq( ("ke,sun"), ("tian,sun")).toDF("CST_NO") +--------+| CST_NO|+--------+| ke,sun||tian,sun|+--------+将CST_NO 列var kkk = data_csv.select("...
Spark中将将数据Array或者list转为dataFrame编造数据这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入
其实只是为了编造数据,找了几篇都不满意。项目中使用的
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
问题:遇到了一个问题是需要根据DataFrame中的某一列把对应另一列的数据由列转为行。
一、数据展示
a = pd.DataFrame({'name1':['a','a','b','c','b','a','b','a','c','c'],
'name2':['张三','张三','张三','李四','李四','李四','王五','王五','王五','王五']})
display(a)
二、数据展示
dataframe是pyspark中常见的数据类型,一般从load的sql中读取。有时候输入数据源并非sql,这时如何处理呢?
具体转化示例
list转化为dataframe
先将list转化为 dataframe
import pandas as pd
data_list = [['wer', 1], ['asd', 2]]
panda_df = pd.DataFrame(data_list, columns=['col_name1', 'col_name2'])
# 此处要注意panda和pand
SparkSql 数据类型转换1、SparkSql数据类型1.1数字类型1.2复杂类型2、Spark Sql数据类型和Scala数据类型对比3、Spark Sql数据类型转换案例3.1获取Column类3.2测试数据准备3.3spark入口代码3.4测试默认数据类型3.5把数值型的列转为IntegerType3.6Column类cast方法的两种重载
原文作者:SunnyRivers
原文地址...
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoders, SparkSession}
object DataSetTest {
case class Person(name:String, age:Long)
可以使用 `agg` 函数和 `join` 函数将多行合并为一行。
例如,假设你有一个名为 `df` 的 DataFrame,其中有一个名为 `group_col` 的列需要进行分组,并且需要将其他列合并为一行。
你可以使用以下代码将多行合并为一行:
```python
# 定义自定义函数,用于将多行合并为一行
def join_rows(group):
# 将每个分组中的多行合并为一行
row = group.iloc[0]
for col in group.columns:
if col != 'group_col':
row[col] = ', '.join(str(x) for x in group[col])
return row
# 对 DataFrame 进行分组并将多行合并为一行
result_df = df.groupby('group_col').agg(join_rows).reset_index()
在这个例子中,`join_rows` 函数用于将多行合并为一行,`agg` 函数用于对 DataFrame 进行分组并应用 `join_rows` 函数,在最后使用 `reset_index` 函数将分组的列重新变为普通列。