添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Spark的分区数对于Spark性能调优很重要,如果分区数少,那么并行执行的task就少,比如分区数为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果数据量很大的话,那么任务执行的就很慢,因此熟悉各种情况下默认的分区数对于Spark调优就很有必要了,特别是执行完算子返回的结果分区数为1的情况,更需要特别注意。

二、默认最小分区数:defaultMinPartitions

一般生产环境指定的分区数都会大于2,而且此参数只是指定最小分区数
sc.defaultMinPartitions=min(sc.defaultParallelism,2)
也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1。
上面的公式是在源码里定义的(均在类SparkContext里):

* Default min number of partitions for Hadoop RDDs when not given by user * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. * The reasons for this are discussed in https://github.com/mesos/spark/pull/718 def defaultMinPartitions : Int = math . min ( defaultParallelism , 2 )

三、默认并行度设置:defaultParallelism

  • 在文件spark-defaults.conf中可以设置:spark.default.parallelism=20
  • SparkConf对象设置config(“spark.default.parallelism”, 20)
  • 提交命令设置:spark-submit --conf spark.default.parallelism=20

默认并行度优先级与Flink类似也是,任务代码>提交命令>配置文件。如果不设置默认并行度,shell和local模式下默认为cpu核心数,local[n]默认为n,local[*]为核心数

yarn模式下为分配的所有的Executor的cpu核数的总和或者2,两者取最大值,yarn模式时使用的cpu核数为虚拟的cpu核数,和实际cpu的核数有偏差

  • spark-submit --conf spark.default.parallelism=2 --num-executors $1 --executor-cores 1 --executor-memory 640M --master yarn --class com.heroking.spark.WordCount spark-word-count.jar

四、textFile读取hdfs文件默认分区数

这个地方看有好多人说是默认block数有多少就有多少个分区,其实不太对(或者可能以前老版本的是这样的,新版本的不是这样的)我也没有去深究老版本。

  • 1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

  • 2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量partitons的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是32M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该文件大小/goalSize + 1个分区。

  • 3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成分区数量规则同2中的规则。

  • 读取hdfs上的其它文件和spark sql读取的df默认分区数基本和上述规则一致
    sc.parallelize()创建RDD
    默认分区数等于sc.defaultParallelism,指定参数时分区数值等于参数值。

五、分区数为1的情况

上面已经讲过几个分区数为1(当默认的并行度大于1时)的情况:
1、spark.read.csv()读取本地文件
2、读取关系型数据库表
上面是从外部数据源加载进来就为1的情况,还有就是对df或rdd进行转换操作之后的分区数为1的情况:
1、df.limit(n)

六、合理的设置分区数

根据自己集群的情况和数据大小等合理设置分区的数目,对于Spark性能调优很有必要,根据前面讲的可知,可通过配置spark.default.parallelism、传参设置分区数,遇到那些分区数为1的特殊算子可以利用repartition()进行重新分区即可。

val line RDD : RDD [String] = sc.textFile("./aa,txt") 默认 分区 (并行度):def defaultMinPartitions: Int = math.min(totalCores, 2) totalCores:任务运行的总核 源码如下: 2、从集合(内存)中 创建 RDD 例如:Yarn、Standalone模式下 val value RDD : RDD .. 3)从其他 RDD 创建 (执行转换算子的 候) 1)从集合(内存)中 创建 方法:parallelize、make RDD 1、首先来看一下这种方式 创建 RDD 是怎样的 分区 规则 object test02_ RDD DefalutPatirion { def main(arg RDD 分区 原则是 分区 的个 尽量等于集群中的CPU核心(Core) 目。各种模式下的 默认 分区 目如下(1) Local模式: 默认 为本地机器的CPU 目,若设置了local[N].则 默认 为N.(2) Standalone或者Yarn模式:在“集群中所有CPU核 总和"和“2”这两者中取较大值作为 默认 值。(3) Mesos 模式: 默认 分区 是8. Spark 框架为 RDD 提供了两种 分区 方式,分别是哈希 分区 (HashPartitioner)和范围 分区 (RangePartitioner)。 Spark 也支持自定义 分区 spark 在处理的 据在内部是分partition的。 除非是在本地新建的list 组才需要使用parallelize。保存在hdfs中的文件,在使用 spark 处理的 候是 默认 分partition的。 我们可以使用getNumPartitions()获取当前 rdd 的partition的信息。 通过glom()函 能够获取到分partition的 rdd 信息 我们在处理 据的一般使用的map函... rdd 是弹性分布式 据集, 分区 是对 rdd 据的划分。 分区 之后,job并行度增大。一个 分区 对应一个任务。           什么是任务,任务是job的执行逻辑单元。task会在excutor中执行。           当 Spark 读取这些文件作为输入 ,会根据具体 据格式对应的InputFormat进行解析,一般是将若干个Block合并成一... spark cache: 1,cache 方法不是被调用 立即缓存,而是触发后面的action ,该 RDD 将会被缓存在计算节点的内存中,并供后面重用 2, cache 是调用的 persist() 默认 情况下 persist() 会把 据以序列化的形式缓存在 JVM 的堆空间中 3,cache 默认 的存储级别都是仅在内存存储一份, Spark 的存储级别还有好多种,存储级别... 1 cache(), persist()和unpersist() 原文链接: Spark DataFrame Cache and Persist Explained spark 中DataFrame或Dataset里的cache()方法 默认 存储等级为MEMORY_AND_DISK,这跟 RDD .cache()的存储等级MEMORY_ONLY是不一样的。理由是重新计算内存中的表的代价是昂贵的。MEMORY_AND_DISK表示如果内存中缓存不下,就存在磁盘上。 spark 的dataset类中的cache()方法内部 新手首先要明白几个配置: spark .default.parallelism:( 默认 的并发 )     如果配置文件 spark -default.conf中没有显示的配置,则按照如下规则取值:     本地模式(不会启动executor,由 Spark Submit进程生成指定 量的线程 来并发): spark -shell                ...