添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

spark-sql优化简述

数据仓库服务 大数据
2023-08-28 10:25:14
113
0

1、自适应中reduce参数控制

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。
spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。
spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

2、合理设置单partition读取数据量

SET spark.sql.files.maxPartitionBytes=xxxx;

3、合理设置shuffle partition的数量

SET spark.sql.shuffle.partitions=xxxx

4、使用coalesce & repartition调整partition数量

SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE

5、使用broadcast join

6、开启Adaptive Query Execution(Spark 3.0)

6.1、动态合并分区: spark会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数
spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小
spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数

当RDD的分区数处于spark.sql.adaptive.coalescePartitions.initialPartitionNum与spark.sql.adaptive.coalescePartitions.minPartitionNum范围内才会合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小

6.2、动态切换join策略: 在join的时候,会动态选择性能最高的join策略,提高效率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join

6.3、动态申请资源: 当计算过程中资源不足会自动申请资源
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.dynamicAllocation.enabled: 是否开启动态资源申请
spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪

6.4、动态join数据倾斜: join的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。
spark.sql.adaptive.enabled: 是否开启AQE优化
倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor:N
倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M
拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes
G [代表优化之后,分区数数据的预期大小]

sparksql判断出现数据倾斜的依据[需要两个条件同时满足]:
当某个分区处理的数据量>= N * 所有task处理数据量的中位数
当某个分区处理的数据量>= M

7、文件与分区

SET spark.sql.files.maxPartitionBytes=xxx  //读取文件的时候一个分区接受多少数据;
spark.sql.files.openCostInBytes//文件打开的开销,通俗理解就是小文件合并的阈值

8、CBO优化

spark.sql.cbo.enabled: 是否开启cbo优化
spark.sql.cbo.joinReorder.enabled: 是否调整多表Join的顺序
spark.sql.cbo.joinReorder.dp.threshold: 设置多表jion的表数量的阈值,一旦join的表数量超过该阈值则不优化多表join的顺序

9、hints优化

hints预防主要用在分区和join上。

Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE

Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;

## Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;

-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

## Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

## Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

## When different join strategy hints are specified on both sides of a join, Spark
## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
## over the SHUFFLE_REPLICATE_NL hint.
## Spark will issue Warning in the following example
## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
## is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

10、缓存表

对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(TableName)或者DataFrame.cache即可,SparkSQL会用内存列存储的格式进行表的缓存,然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存的使用和GC的开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,使用SQLContext.setConf()设置,可以通过

spark.sql.inMemoryColumnarStorage.batchSize

这个参数,默认10000,配置列存储单位。