添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

本文主要介绍spark的Scala编程中的循环处理DataFrame的方法,本文主要是在写数据自动化测试的一个demo,大体的思路就是配置好了测试用例,现在需要批量执行测试用例。

目前主要有如下三个方法:

  1. For/While循环
  2. 多线程
  3. DataFrame的foreachPartition函数

这里给出来的都是代码片段,如果需要演示上面集中方法,那么需要在本地先构造好初始的DataFrame,然后可以借鉴这里给出来的方法。

标题For/While循环

通过Scala的循环处理方式,指的就是先把DataFrame的集合处理成Row对象数组,可以使用collect()函数,然后在通过for/while循环来完成,具体代码如下所示:

    val caseSqlArray = caseDataUpdateInTimeDf.select($"id", $"case_sql").collect()
      //.where($"id" === 2).collect()
        //|| $"id" === 9 || $"id" === 10 || $"id" === 11 || $"id" === 12).collect()
    var caseSqlRowsDf: DataFrame = null
    // 因为这里需要逐个执行spark-sql,然后需要将逐个执行的SQL合并起来,因此循环合并dataframe
    for( caseSqlRow <- caseSqlArray ){
      if (caseSqlRowsDf == null)
        caseSqlRowsDf = sparkSession.sql(caseSqlRow.get(1).toString).withColumn("id", lit(caseSqlRow.get(0)))
        caseSqlRowsDf = caseSqlRowsDf.union(sparkSession.sql(caseSqlRow.get(1).toString)
          .withColumn("id", lit(caseSqlRow.get(0))))

caseDataUpdateInTimeDf是你自己定义的DataFrame,具体如何创建DataFrame可以参考我的相关博客《基于spark的Scala编程—DataFrame操作之select

可以通过Scala多线程编程来实现循环处理DataFrame的每一个Row,具体代码如下:

    val caseSqlArray = caseDataUpdateInTimeDf.select($"id", $"case_sql").collect()
    val caseList = new util.ArrayList[Future[DataFrame]]()
    val caseExecutors = Executors.newFixedThreadPool(caseSqlArray.length)
    for( caseSqlRow <- caseSqlArray ){
      val task = caseExecutors.submit(new Callable[DataFrame] {
        override def call(): DataFrame ={
          sparkSession.sql(caseSqlRow.get(1).toString).withColumn("id", lit(caseSqlRow.get(0)))
      caseList.add(task)
    // 合并多线程的结果
    import scala.collection.JavaConversions._
    for(testCase <- caseList){
      try {
        if (caseSqlRowsDf == null)
          caseSqlRowsDf = testCase.get()
          caseSqlRowsDf = caseSqlRowsDf.union(testCase.get())
      } catch {
        case e: InterruptedException =>
          LOGGER.error("获取多线程结果失败,异常终端", e)
        case e: ExecutionException =>
          LOGGER.error("获取多线程结果失败", e)
      } finally{
        //启动一次顺序关闭,执行以前提交的任务,但不接受新任务
        caseExecutors.shutdown();
    caseSqlRowsDf.show()

但是在当前场景下,用for循环和用多线程没有什么大的差别,因为spark.sql是非阻塞的,所以它本质上都是在并行执行任务。

DataFrame的foreachPartition函数

第三个方法是使用DataFrame的foreachPartition函数,具体代码如下:

    // 通过Scala的foreach函数,发现在foreach里面执行spark.sql会报错
    var caseSqlRowsDf: DataFrame = null
    caseDataUpdateInTimeDf.select($"id", $"case_sql")
      .where($"id" === 9 || $"id" === 10 || $"id" === 11 || $"id" === 12)
      .foreachPartition(iterator => {
        while (iterator.hasNext){
          val caseRow = iterator.next()
          if (caseSqlRowsDf == null)
            caseSqlRowsDf = sparkSession.sql(caseRow.getString(1)).withColumn("id", lit(caseRow.get(0)))
            caseSqlRowsDf = caseSqlRowsDf.union(
              sparkSession.sql(caseRow.getString(1)).withColumn("id", lit(caseRow.get(0)))

不过这种方法会报错,因为是在dataframe里面再执行spark.sql,如果是执行普通的Scala函数来处理相关数据是没有问题的,具体原因还没有找到,后期找到具体原因了再来更新。

分别出现在spark的哪些版本? RDD是spark一开始就提出的概念,DataFramespark1.3.0版本提出来的,spark1.6.0版本又引入了DateSet的,但是在spark2.0版本中,Dat... 在贴代码之前先介绍一下DataFrame与DataSet,以下介绍内容来自以下博客:https://www.cnblogs.com/seaspring/p/5831677.html DataFrame DataFrame是一个分布式集合,其中数据逻辑存储结构为有名字的列。它概念上等价于关系数据库中的表,一个列名对应很多列值,但底层做了更多的优化。DataFrame可以从很多数据源构建,比如:已... .appName("Spark SQL basic example") .enableHiveSupport() //.config("spark.some.config.op... Spark已更新至2.x,DataFrame归DataSet管了,因此API也相应统一。本文不再适用2.0.0及以上版本。 DataFrame原生支持直接输出到JDBC,但如果目标表有自增字段(比如id),那么DataFrame就不能直接进行写入了。因为DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须完全一致(... Spark已更新至2.x,DataFrame归DataSet管了,因此API也相应统一。本文不再适用2.0.0及以上版本。DataFrame原生支持直接输出到JDBC,但如果目标表有自增字段(比如id),那么DataFrame就不能直接进行写入了。因为DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须完全一致(甚至字段顺序都要一致),否则会抛异... //读取目录文件 scala> val df = spark.read.json("file:///opt/module/spark/mycode/a.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] //创建临时视图 scala> df.createOr import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, 在SparkDataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格。和python的Pandas的DataFrame非常类似。DataFrame和RDD的区别主要在于,DataFrame带有Schema元信息,即DataFrame锁表示的二维表格数据集的每一列都带有名称和类型。DataSet是分布式的数据集合,在Spark1.6中添加的一个新的抽象,是DataFrame的一个扩展。DataSet和DataFrame的关系DataSet的特性。... SparkSQL随机DataFrame/DataSet数据源query查询用户数据(Java版/Scala版) SparkSQL入门小demo,主要操作是构造DataFrame/Dataset,以及通过它们去执行Sql 一、以下为Java版本的Demo Java版本(DataSourceJava.java、App.java) DataSourceJava.java package top.it10... Dataset[String] to Data[case class] dataset[String] to Dataset[ThrDynamicRowV001] `val ds: Dataset[ThrDynamicRowV001] = spark.read.textFile(inputThrFile).map(row => { val split_str = row.split(",") for (i <- 0 to 13) { if (split_str(i).isEmpt 我之前写的代码大体功能如下: val map= new mutable.HashMap[String, String] val df: DataFrame = DbDataApi.requestColMetaInfo(dataId) df.foreach(row =>{ map.put(row.getAs[String](fieldName = "colName"),row.getAs[String](fieldName = "col1")) 运行后发现 在最近项目中,因为由于数据量不是特别大并且内存充足,所以采用了foreachPartition代替了foreach,使用了mapPartition代替了map。下面给大家讲解一下,关于他们之间的区别。 map是spark中非常强大的一个算子,可以对RDD中每个元素进行转换,文件中的每行数据都会返回一个数组对象。而mapPartition一下处理的是一个分区中的数据,所以在数据量并不是很大的情...