Spark Streaming 聚合操作
接着上一篇: ailx10:Spark Streaming 映射操作 ,接下来我们看看聚合操作。
- 数据集: reddit评论数据集下载
- 修改时间戳: NewFileTime下载
首先看count:计算DStream的RDD中的元素个数,也就是有多少行
val recCount = comments.count()
recCount.print()
结果如下:有10170行
再看看countByValue:计算DStream的RDD中的每个不同元素的评论
val recCountValue = comments.countByValue()
recCountValue.print(3)
结果如下:展示前3行
({"gilded":0,"retrieved_on":1473821342,"distinguished":null,"author":"[deleted]","author_flair_text":null,"id":"c1xox","edited":false,"parent_id":"t3_1xja","subreddit":"reddit.com","created_utc":1139832463,"author_flair_css_class":null,"score":-1,"ups":-1,"controversiality":1,"body":"That gets a Chuck Norris on the scale of badass-ity.","stickied":false,"link_id":"t3_1xja","subreddit_id":"t5_6"},1)
({"edited":false,"parent_id":"t1_c27oi","id":"c27xs","author":"[deleted]","author_flair_text":null,"retrieved_on":1473821535,"distinguished":null,"gilded":0,"stickied":false,"link_id":"t3_5y9xd","subreddit_id":"t5_6","controversiality":0,"body":"[deleted]","created_utc":1140818793,"author_flair_css_class":null,"score":-6,"ups":-6,"subreddit":"reddit.com"},1)
({"subreddit":"reddit.com","score":2,"ups":2,"author_flair_css_class":null,"created_utc":1139006212,"body":"I don't think this is the authorative debunking it purports to be. Look at some of the conclusions:\r\n\r\n> Whoever claims to be on a perpetual polyphasic schedule must be either suffering from a sleep disorder, or be a liar, a mutant, or a person with a mulishly stubborn iron-will that lets him plod through the daily torture of sleep deprivation.\r\n\r\n> All the hype surrounding polyphasic sleep can be delegated to the same lunatic basket as miracle diets, scientology, homeopathy, water magnetizers, creation \"science\", electrolytic detoxifiers, [...]\r\n\r\nI think the good doctor has spoken too hastily, considering the growing number of people who have managed to sustain polyphasic sleep (of the uberman kind) for long periods of time (e.g. http://www.stevepavlina.com/blog/2006/02/polyphasic-mutants/ but also several people on the uberman Yahoo group).","controversiality":0,"subreddit_id":"t5_6","link_id":"t3_18m5","stickied":false,"gilded":0,"distinguished":null,"retrieved_on":1473820919,"author":"Luc","author_flair_text":null,"parent_id":"t3_18m5","edited":false,"id":"c18sq"},1)
...
最后看看reduce:通过对每一对值调用reduceFunc,将每个RDD中的值聚集到一个值中,计算每个批次全部消息主体的单词总数。
val totalWords = comments.map(rec => ((parse(rec) \ "body").values.toString))
.flatMap(body => body.split(" "))
.map(word => 1)
.reduce(_ + _)
totalWords.print()
结果如下:一共有417373个单词
完整代码如下:
package org.apress.prospark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date
object Study3Dstream {
def main(args: Array[String]) {
val inputPath = "C:\\study\\pro_spark_streaming\\3\\bz\\"
val conf = new SparkConf()
.setAppName("AILX10")
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
.setMaster("local")
val ssc = new StreamingContext(conf, Seconds(1))
val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val recCount = comments.count()
recCount.print()
val recCountValue = comments.countByValue()
recCountValue.print(3)
val totalWords = comments.map(rec => ((parse(rec) \ "body").values.toString))
.flatMap(body => body.split(" "))
.map(word => 1)
.reduce(_ + _)
totalWords.print()