Spark源码系列之foreach和foreachPartition的区别
一,基本使用
1,RDD分布式数据集的五大特性
1),A list of partitions(一系列的分区)
2),A function for computing each split(计算每个分片的方法)
3),A list of dependencies on other RDDs(一系列的依赖RDD)
4),Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
(可选,对于key-value类型的RDD都会有一个分区器)
5),Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)(可选,最佳位置)
2,RDD的操作类型:
Transformations:转换操作,lazy型,不会触发计算
Action:触发job
Persist:缓存,也不会触发job,在第一次触发job之后才会真正进行缓存。
3,RDD的计算
RDD的计算实际上我们可以分为两个大部分:
1),Driver端的计算
主要是stage划分,task的封装,task调度执行
2),Executor端的计算
真正的计算开始,默认情况下每个cpu运行一个task。一个task实际上就是一个分区,我们的方法无论是转换算子里封装的,还是action算子里封装的都是此时在一个task里面计算一个分区的数据。
下面就那这两个例子,开始讲解吧,针对转换类型的操作可以类比查看。
jsonRDD.foreach(each=>{
//连接数据库
//插入数据库
//关闭数据库连接
})
jsonRDD.foreachPartition(partition=>{
//此处连接上数据库
partition.foreach(each=>
//插入数据
})
//关闭数据库连接
})
这两个算子里面,上述我说的”我们的方法是”,每个算子圆括号内部的所有内容。
二,源码相关
1,第一次封装
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
可以看到方法通过clean操作(清理闭包,为序列化和网络传输做准备),进行了一层匿名函数的封装,
针对foreach方法,是我们的方法被传入了迭代器的foreach(每个元素遍历执行一次函数),
而对于foreachpartiton方法是迭代器被传入了我们的方法(每个分区执行一次函数,我们获取迭代器后需要自行进行迭代处理,也即上述第二个demo的partition.foreach)。
2,第二次封装
这次很统一就在
/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
就是讲上述进一步封装的方法进一步按照匿名函数封装
(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)
3,执行的时候
Spark的Task类型我们用到的也就两个
1),ShuffleMapTask
2),ResultTask
Action算子的方法执行是在ResultTask中执行的,也即ResultTask的runTask方法。
首先反序列化得到我们的方法(2步骤封装的)和RDD,然后执行。传入的是迭代器
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
三,总结
RDD.foreach(foreachFunction)
RDD.foreachPatition(foreachPartitionFunction)
经过第二步的分析我们可以理解,展开之后实际上就是
RDD的每个分区的iterator(集合):
iterator.foreach(foreachFunction)
foreachPartitionFunction(iterator)
这就很明显了,假如我们的Function中有数据库,网络TCP等IO链接,文件流等等的创建关闭操作,采用foreachPatition方法,针对每个分区集合进行计算,更能提高我们的性能。
- javascript 红皮高程(16)
- javascript 红皮高程(15)
- javascript 红皮高程(21)-- 乘性操作符
- javascript 红皮高程(20)-- 逻辑或
- javascript 红皮高程(19)-- 逻辑与
- 技术分享 | 浅谈 RAS
- Sniper-OJ 练习平台多题WriteUp
- 怎么能学透一个知识点
- Jarvis-OJ平台多题WriteUp分享
- 会员提问 之 JS中的私有方法有什么意义?
- 本周末的QQ群视频--还是电商网站的事
- 【译】使用Apache的mod重写来保护你的C2 Empire
- 大白话,设计一个购物车对象
- 【译】Cromos – 下载并注入代码到谷歌 Chrome 浏览器扩展中
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- R语言中的Theil-Sen回归分析
- R语言关于回归系数的解释
- R语言对二分连续变量进行逻辑回归数据分析
- SAS中用单因素ANOVA研究不同疗法对焦虑症的有效性
- R语言逻辑回归预测分析付费用户
- R语言中使用多重聚合预测算法(MAPA)进行时间序列分析
- R语言中的岭回归、套索回归、主成分回归:线性模型选择和正则化
- R语言基于树的方法:决策树,随机森林,套袋Bagging,增强树
- R语言无监督学习:PCA主成分分析可视化
- 如何用r语言制作交互可视化报告图表
- R语言大数据分析纽约市的311万条投诉统计可视化与时间序列分析
- R语言动态可视化:制作历史全球平均温度的累积动态折线图动画gif视频图
- R语言里的非线性模型:多项式回归、局部样条、平滑样条、广义加性模型分析
- 使用R语言进行机制检测的隐马尔可夫模型HMM
- 【Kubernetes】Octant再探...