spark分区与任务切分
我们都知道在spark中,RDD是其基本的抽象数据集,其中每个RDD由多个Partition组成。在job的运行期间,参与运算的Parttion数据分布在多台机器中,进行并行计算,所以分区是计算大数据量的措施。
分区数越多越好吗?
不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
分区太少有什么影响?
分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。分区的目的就是要避免存在单任务处理时间过长。
合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor?
一般合理的分区数设置为总核数的2~3倍
分区数就是任务数吗?
一般来说任务数对应为分区数量,默认情况下为每一个HDFS分区创建一个分区,默认为128MB,但如果文件中的行太长(比块大小更长),则分区将会更少。RDD创建与HDFS分区一致数量的分区。
当使用textFile压缩文件(file.txt.gz不是file.txt或类似的)时,Spark禁用拆分,这使得只有1个分区的RDD(因为对gzip文件的读取无法并行化)。在这种情况下,要更改应该重新分区的分区数
但有时候你需要为你的应用程序,调整分区的大小,或者使用另一种分区方案。
设置多大分区数 ?
Spark只能为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量。因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是该分区的2-3倍)。
此外,分区数决定了将RDD保存到文件的操作生成的文件数。
划分RDD:repartition
repartition(numPartitions: Int)
rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)
请注意,Spark禁用拆分压缩文件,并创建只有1个分区的RDD。在这种情况下,使用sc.textFile('demo.gz')和重新分区是有帮助的,rdd.repartition(100)
rdd.repartition(N)做一个shuffle分割数据来匹配N
划分RDD:coalesce
coalesce(numPartitions: Int, shuffle: Boolean = false)
该coalesce转变是用来改变分区的数量。它可以根据标志触发RDD混洗shuffle(默认情况下禁用,即false)。
shuffle = true 和repartition是一致的。
分区的3种方式
1.HashPartitioner
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new HashPartitioner(3))
HashPartitioner确定分区的方式:partition = key.hashCode () % numPartitions
2.RangePartitioner
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new RangePartitioner(3,counts))
RangePartitioner会对key值进行排序,然后将key值被划分成3份key值集合。 3.CustomPartitioner
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))
class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int =
{
if(key==1)){
0
}else if
(key==2){
1}else{ 2 }} override def equals(AcadGild: Any): Boolean = AcadGild match { case test: CustomPartitioner => test.numPartitions == numPartitions case _ => false }}
CustomPartitioner可以根据自己具体的应用需求,自定义分区。
- Python iterator迭代器
- How To Implement The Decision Tree Algorithm From Scratch In Python (从零开始在Python中实现决策树算法)
- 『教程』微信小程序webview的使用
- How to Save an ARIMA Time Series Forecasting Model in Python (如何在Python中保存ARIMA时间序列预测模型)
- Decision Trees in Apache Spark (Apache Spark中的决策树)
- Feature Selection For Machine Learning in Python (Python机器学习中的特征选择)
- 简约的JAVA版本MapReduce和日常No.25
- 根据职位说明使用机器学习来检索相关简历
- 微信小游戏初体验
- 一行Spark代码的诞生记(深度剖析Spark架构)
- Ray:AI的分布式系统
- Spring Boot 中使用 MongoDB 增删改查
- 来人啊给我炸了那个Java虚拟机No.46
- 机器学习虾扯淡之Logistic回归No.44
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法