在本文我们将详细了解什么是 Spark RDD、什么是Spark RDD 的转换操作、什么是Spark RDD 执行操作、RDD转换操作和执行操作具体有哪些函数?
一、Spark RDD 操作
Apache Spark RDD操作有两种类型——转换(Transformations)和执行(Actions)。转换是一个从现有的RDD生成新的RDD的函数,但是当我们处理实际的数据集时,就会使用执行操作。当生成结果后触发执行操作时,不会像转换操作那样生产新的RDD。
二、Apache Spark RDD操作
在开始了解 Spark RDD 操作之前,我们有必要对 Spark RDD是什么进行了解。
Apache Spark RDD支持两种类型的操作:
三、RDD转换操作(RDD Transformation)
Spark 转换操作是一个从现有的 RDD 生成新 RDD 的函数。它以 RDD 作为输入参数,生成并输出一个或多个RDD。每当我们使用任何一个转换操作函数的时候,它都会生成新的 RDD。由于 RDD 本质上是不可变的,因此输入的 RDD 也是不可变的。
调用转换操作函数会构建了一个 RDD 谱系,其中包含最终 RDD 的整个父 RDD。RDD 谱系,也称为 RDD 运算符图或RDD 依赖图。这是一个逻辑执行计划,即它是 RDD 的整个父 RDD 的 有向无环图(DAG) 。
转换操作本质上是一种懒加载,也就是说不会立即执行转换函数,只有当我们调用执行操作函数的时候才会执行转换操作函数。转换操作最基本最常用的两个函数是:map() 和 fliter() 函数。转换后生成的 RDD 始终与其父 RDD 不同。转换后的RDD 可能变得比小(如:filter(), count(), distinct(), sample()),也可能比大(如:flatMap(), union(), Cartesian())甚至大小一样的(如:map())。
转换操作有两种类型:
窄转换(Narrow transformation)
在窄转换中,计算单个分区中的记录所需的所有元素都位于父RDD的单个分区中。使用分区的有限子集来计算结果。窄转换是调用map()、filter()函数后生成的结果。
在 RDD 转换中有各种各样的函数。让我们通过例子来看看RDD转换操作函数。
3.1 map(func)
map(func) 函数遍历 RDD 中的每一行并拆分为新的 RDD。使用接受任何函数作为输入参数的 map() 函数,并且该函数(入参函数)作用到 RDD 的每个元素上面。
在 map() 函数中,我们可以灵活地选择 RDD 的输入和返回类型。例如,我们可以将字符串作为RDD的输入类型,在应用map() 函数之后,返回的 RDD 可以是布尔类型。
例如,在 RDD{1, 2, 3, 4, 5} 中,如果我们应用 rdd .map(x=>x+2) 后,将得到(3,4,5,6,7)的结果。
Map() 函数示例:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName("函数示例").master("local").getOrCreate()
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.map(line => (line, line.length))
mapFile.foreach(println)
复制代码
“spark_test.txt”文件内容如下:
The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD.In the map, we have the flexibility that the input and the return type of RDD may differ from each other. For example, we can have input RDD type as String, after applying the map() function the return RDD can be Boolean.
3.2 flatMap(func)
借助于 flatMap() 函数,对于每个输入元素,我们在输出的 RDD 中都有许多元素。flatMap() 最简单的用法是将每个输入字符串拆分为单词。Map 和 flatMap 的相似之处在于它们从输入RDD中获取一条线并在该线上应用传入的函数。map() 和flatMap() 之间的 关键区别是 map() 只返回一个元素,而 flatMap() 可以返回一个元素列表 。
flatMap() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.read.textFile("spark_test.txt").rdd
val flatmapFile = data.flatMap(lines => lines.split(" "))
flatmapFile.foreach(println)
复制代码
-** Note -** 在上面的代码中,flatMap() 函数按空格对每一行字符串进行拆分。
3.3 filter(func)
Spark RDD filter() 函数返回一个新的 RDD,其中仅包含满足谓词的元素。这是一个窄转换操作,因为它不会把数据从一个分区映射到多个分区。例如,假设 RDD 包含前五个自然数(1、2、3、4和5),并且谓词检查偶数。那么过滤器后的结果 RDD 中就只包含偶数,即2和4。
filter() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split(" ")).filter(value => value=="map")
println(mapFile.count())
**Note - **在上面的代码中,flatMap 函数将每一行映射为单词,然后从 mapFile 过滤包含“ map”的行后,使用count() 执行对单词“ map”进行计数。
3.4 mapPartitions(func)
MapPartition() 函数 将源RDD的每个分区转换为包含许多元素的结果(也可能一个元素也没有)。在 mapPartition() 中,map() 函数同时作用到每个分区上。MapPartition 类似于 map,但不同之处在于它在RDD的每个分区(块)上单独运行。
3.5 mapPartitionWithIndex()
它就像 mapPartition;除了 mapPartition 之外,它还为 func 提供一个表示分区索引的整数值,map() 依次应用于分区索引。
3.6 union(dataset)
使用 union() 函数,我们可以在新的 RDD 中获得两个RDD的元素。这个函数的关键规则是两个 RDD 应该是同一种类型的。
例如,RDD1的元素是(Spark、Spark、Hadoop、Flink), RDD2的元素是(Big data、Spark、Flink),因此生成的rdd1.union(rdd2) 将包含元素(Spark、Spark、Spark、Hadoop、Flink、Flink、Big data)。
union() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val rdd1 = spark.sparkContext.parallelize(Seq((1, "jan", 2016), (3, "nov", 2014), (16, "fed", 2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5, "dec", 2014), (17, "sep", 2015)))
val rdd3 = spark.sparkContext.parallelize(Seq((6, "dec", 2011), (16, "may", 2015)))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.foreach(println)
Note - 在上面的代码中,union() 操作将返回一个新的数据集,该数据集包含源数据集(rdd1)中的元素和参数(rdd2和rdd3)的并集。
3.7 intersection(其他dataset)
使用 intersection() 函数,我们只能在新的RDD中获得两个RDD的公共元素。这个函数的关键规则是两个RDD应该是同一种类型的。
看一个示例,RDD1的元素为(Spark,Spark,Hadoop,Flink),而RDD2的元素为(Big data,Spark,Flink),因此生成的 rdd1.intersection(rdd2) 生成的结果中就只有元素(spark)。
intersection() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014), (16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(1,"jan",2016)))
val comman = rdd1.intersection(rdd2)
comman.foreach(println)
Note - intersection() 函数返回一个新的RDD。 它包含rdd1和rdd2中相同的元素。
3.8 distinct()
distinct() 函数返回一个新的数据集,其中包含源数据集的不同元素。它最重要的特点就是:删除重复数据。
例如,如果RDD有元素(Spark、Spark、Hadoop、Flink),那么rdd .distinct() 最终的结果中就只包含元素(Spark、Hadoop、Flink),而将相同的元素“Spark”删掉一个。
distinct() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014),(3,"nov",2014)))
val result = rdd1.distinct()
println(result.collect().mkString(", "))
Note - 在上面的示例中,distinct() 函数将删除重复记录,即(3,"nov",2014)。
结果输出为:
(3,nov,2014), (16,feb,2014), (1,jan,2016)
复制代码
3.9 groupByKey()
当我们在一个(K, V)对的数据集上使用 groupByKey() 时,数据将根据另一个RDD中的键值K进行混洗。在这种转换中,许多不必要的数据将通过网络传输。
当一个执行器上的数据超出内存中能够容纳的数据时,Spark 提供了将数据保存到磁盘的功能。单击此链接可以详细了解RDD缓存和持久性机制。
groupByKey() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)
Note - groupByKey() 函数将根据相同的键(字母表)对整数进行分组,然后 collect() 操作将以数组的形式返回数据集的所有元素。
结果输出为:
(s,CompactBuffer(3, 4))
(p,CompactBuffer(7, 5))
(t,CompactBuffer(8))
(k,CompactBuffer(5, 6))
复制代码
3.10 reduceByKey(func, [numTasks])
当我们在数据集(K, V)上使用 reduceByKey 时,在数据混洗之前,将具有相同键的同一机器上的键值对进行组合。
reduceByKey() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val words = Array("one","two","two","four","five","six","six","eight","nine","ten")
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_ + _)
data.foreach(println)
Note - 上面的代码将并行化字符串数组,然后将每一个字符串出现的次数设置为1,最后 reduceByKey 把具有相同键(k)的值进行相加。
结果输出为:
(two,2)
(eight,1)
(one,1)
(nine,1)
(six,2)
(five,1)
(ten,1)
(four,1)
复制代码
3.11 sortByKey()
当我们将 sortByKey() 函数应用于键值对(K, V)数据集时,根据另一个RDD中的键K排序生成新的数据。
sortByKey() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.sparkContext.parallelize(Seq(("maths",52), ("english",75), ("science",82), ("computer",65), ("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)
Note - 在上面的代码中,sortByKey() 转换函数将以键值对(K,V)中的K(即字符串)进行升序排序。
结果输出为:
(computer,65)
(english,75)
(maths,52)
(maths,85)
(science,82)
复制代码
3.12 join()
Join 是数据库术语。它两个表中的相同字段进行合并。Spark 中的 join() 操作是在 pair RDD 上定义的。pair RDD 也是RDD,其中每个元素都是以元组类型的形式出现。其中第一个元素是键,第二个元素是值。
使用键控制数据的好处是我们可以将数据组合在一起。join() 函数就是基于键组合两个数据集的操作。
join() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data.join(data2)
println(result.collect().mkString(","))
Note - join() 转换函数是基于 Key 连接两个不同的RDD。
结果输出为:
(b,(2,7)),(A,(1,4)),(A,(1,6)),(c,(3,3)),(c,(3,8))
复制代码
3.13 coalesce()
为了避免完全打乱数据,我们可以使用 coalesce() 函数。在 coalesce() 中,我们使用现有分区,以便减少数据的混排,使用这个特性我们可以减少分区的数量。假设我们有四个节点,而我们现在只需要两个节点。然后,多余节点的数据将保存到我们保留的节点上。
coalesce() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val rdd1 = spark.sparkContext.parallelize(Array("jan","feb","mar","april","may","jun"),3)
val result = rdd1.coalesce(2)
result.foreach(println)
Note - coalesce() 函数将源RDD的分区数量减少到coalesce参数中设置的分区数。
结果输出为:
复制代码
四、RDD执行操作(RDD Action)
转换操作函数从一个RDD转换成另一个 RDD,但当我们处理实际的数据集时会调用执行函数进行数据的处理。在结果之后触发执行操作函数时,不会像转换操作函数那样生成新的RDD。因此,执行操作函数是提供非RDD值的Spark RDD操作。
执行函数生成的值将被存储到驱动程序或外部存储系统中。因此形成了RDD的惰性特性。执行操作函数是从执行器向驱动程序发送数据的方法之一,执行器代理执行操作任务;而驱动程序是一个协调工作人员和任务执行的JVM进程。Spark的部分执行函数有如下这些:
4.1 count()
count() 函数 返回 RDD 中的元素数量。
例如,在这个RDD的值为{1、2、2、3、4、5、5、6}中,rdd .count() 执行后的结果为 8。
count() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.read.textFile("E:/Workspaces/IdeaProjects/sparkDemo/src/main/resources/spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split(" ")).filter(value => value=="map")
println(mapFile.count())
**Note - **在上面的代码中,flatMap() 函数将行映射为一个个单词,并使用 count() 操作函数从 mapFile 中过滤包含map的行之后对单词map进行计数。
4.2 collect()
collect() 函数是最常见也是最简单的执行操作函数,它将整个RDD的内容返回给驱动程序。当内存能容下整个RDD的大小时可以使用 collect() 函数进行单元测试。因此,很容易将RDD的结果与预期结果进行比较。
但 collect() 函数有一个约束,即所有数据都应该适合于计算,然后复制到驱动程序。
collect() 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data1 = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data1.join(data2)
println(result.collect().mkString(","))
Note - 上面代码中,join() 转换将基于相同的键(字母)连接两个RDD之后,collect() 操作将以数组的形式返回数据集的所有元素。
结果输出为:
(b,(2,7)),(A,(1,4)),(A,(1,6)),(c,(3,3)),(c,(3,8))
复制代码
4.3 take(n)
take(n) 从RDD中返回 n 个元素。它尝试减少其访问的分区数量,因此它表示一个有偏差的集合。我们不知道元素的顺序。
例如,RDD {1,2,2,3,4,5,5,6} 执行“rdd.take(4)”后的出结果可能为 {2,2,3,4}。
take(n) 函数示例:
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
val twoRec = group.take(2)
twoRec.foreach(println)
Note - take(2) 操作将返回一个数组,其中包含在take参数中定义的数据集的前n个元素。
结果输出为:
(s,CompactBuffer(3, 4))
(p,CompactBuffer(7, 5))
复制代码
4.4 top()
如果我们的RDD已经排序,那么我们可以使用 top() 函数从 RDD 中提取 top 元素。top() 函数使用默认的数据排序。
top() 函数示例:
val path = "E:/Workspaces/IdeaProjects/sparkDemo/src/main/resources/spark_test.txt"
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.read.textFile(path).rdd
val mapFile = data.map(line => (line,line.length))
val res = mapFile.top(2)
res.foreach(println)
Note - map() 操作将映射每一行及其长度。top(2) 将从 mapFile 返回2条记录,默认顺序。
4.5 countByValue()
countByValue() 返回每个元素在RDD中出现的总次数。
例如,在这个RDD的值为{1,2,2,3,4,5,5,6}中,rdd.countbyvalue() 将返回结果 {(1,1),(2,2),(3,1),(4,1),(5,2),(6,1)。
countByValue() 函数示例:
val path = "E:/Workspaces/IdeaProjects/sparkDemo/src/main/resources/spark_test.txt"
val spark = SparkSession.builder().appName("函数示例").master("local").getOrCreate()
val data = spark.read.textFile(path).rdd
val result= data.map(line => (line,line.length)).countByValue()
result.foreach(println)
Note - countByValue() 操作将对每个键的计数封装成一个hashmap (K, Int)返回。
4.6 reduce()
reduce() 函数从 RDD 中取两个元素作为输入参数,然后输出与输入元素类型相同的结果。我们可以往 RDD 中添加元素,然后统计单词数量。它接受交换与结合运算作为参数。
reduce() 函数示例:
val rdd1 = spark.sparkContext.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
println(sum)
Note - 上面代码中的reduce()操作将源RDD的元素进行求和。
4.7 fold()
fold() 函数的概念类似于reduce() 函数。fold() 操作需要从一个初始的“种子”值开始,并以该值作为上下文,处理集合中的每个元素。fold() 和 reduce() 之间的关键区别是,reduce() 为空集合会抛出异常,而 fold() 定义空集合。
例如,加法的默认是0;乘法的默认是1。fold() 的返回类型与我们正在操作的RDD元素的返回类型相同。
例如,rdd.fold(0)((x, y) => x + y)。
fold() 函数示例:
val rdd1 = spark.sparkContext.parallelize(List(("maths", 80),("science", 90)))
val additionalMarks = ("extra", 4)
val sum = rdd1.fold(additionalMarks){ (acc, marks) => val add = acc._2 + marks._2
("total", add)
println(sum)
Note - 在上面的代码中,additionalMarks是一个初始值。此值与rdd1每个元素中为 int 的值进行相加。
4.8 aggregate()
aggregate() 函数的灵活性:可以从输入类型获取不同的数据类型。aggregate() 使用两个函数来生成最终结果。通过第一个函数,我们将来自RDD的元素与累加器组合起来,然后通过第二个函数,我们将累加器组合起来。因此,总的来说,我们提供了初始值为零的返回类型。
4.9 foreach()
当我们想要在RDD的每个元素上进行操作,但又不想让它给驱动程序返回值的时候。在这种情况下,foreach() 函数起到了作用。例如,将一条记录插入数据库。
foreach() 函数示例:
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)
Note - foreach() 操作对数据集组的每个元素上应用println()函数。
总之,在对一个RDD应用转换操作韩式时,会创建生成另一个RDD。因此,RDD在本质上是不可变的。在RDD上应用执行操作函数时,将计算出最终结果。这种延迟计算减少了计算开销,使系统运行更高效。
字节跳动云原生计算