spark分区与任务切分

时间:2022-07-22
本文章向大家介绍spark分区与任务切分,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

我们都知道在spark中,RDD是其基本的抽象数据集,其中每个RDD由多个Partition组成。在job的运行期间,参与运算的Parttion数据分布在多台机器中,进行并行计算,所以分区是计算大数据量的措施。

分区数越多越好吗?

不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。

分区太少有什么影响?

分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。分区的目的就是要避免存在单任务处理时间过长。

合理的分区数是多少?如何设置?

总核数=executor-cores * num-executor?

一般合理的分区数设置为总核数的2~3倍

分区数就是任务数吗?

一般来说任务数对应为分区数量,默认情况下为每一个HDFS分区创建一个分区,默认为128MB,但如果文件中的行太长(比块大小更长),则分区将会更少。RDD创建与HDFS分区一致数量的分区。

当使用textFile压缩文件(file.txt.gz不是file.txt或类似的)时,Spark禁用拆分,这使得只有1个分区的RDD(因为对gzip文件的读取无法并行化)。在这种情况下,要更改应该重新分区的分区数

但有时候你需要为你的应用程序,调整分区的大小,或者使用另一种分区方案。

设置多大分区数 ?

Spark只能为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量。因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是该分区的2-3倍)。

此外,分区数决定了将RDD保存到文件的操作生成的文件数。

划分RDD:repartition

repartition(numPartitions: Int)
rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)

请注意,Spark禁用拆分压缩文件,并创建只有1个分区的RDD。在这种情况下,使用sc.textFile('demo.gz')和重新分区是有帮助的,rdd.repartition(100)

rdd.repartition(N)做一个shuffle分割数据来匹配N

划分RDD:coalesce

coalesce(numPartitions: Int, shuffle: Boolean = false)

该coalesce转变是用来改变分区的数量。它可以根据标志触发RDD混洗shuffle(默认情况下禁用,即false)。

shuffle = true 和repartition是一致的。

分区的3种方式

1.HashPartitioner

 val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new HashPartitioner(3))

HashPartitioner确定分区的方式:partition = key.hashCode () % numPartitions

2.RangePartitioner

val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new RangePartitioner(3,counts))

RangePartitioner会对key值进行排序,然后将key值被划分成3份key值集合。 3.CustomPartitioner

val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))

class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int =
{
      if(key==1)){
   0
}else if 
(key==2){
1}else{ 2 }} override def equals(AcadGild: Any): Boolean = AcadGild match { case test: CustomPartitioner => test.numPartitions == numPartitions case _ => false }}

CustomPartitioner可以根据自己具体的应用需求,自定义分区。