spark源码系列之累加器实现机制及自定义累加器
一,基本概念
累加器是Spark的一种变量,顾名思义该变量只能增加。有以下特点:
1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。
2,累加器不会改变Spark Lazy计算的特点。只会在Job触发的时候进行相关累加操作。
3,现有累加器的类型。
二,累加器的使用
Driver端初始化,并在Action之后获取值。
val accum = sc.accumulator(0, "test Accumulator")
accum.value
Executor端进行计算
accum+=1;
三,累加器的重点类
Class Accumulator extends Accumulable
主要是实现了累加器的初始化及封装了相关的累加器操作方法。同时在类对象构建的时候向我们的Accumulators注册了累加器。累加器的add操作的返回值类型和我们传入的值类型可以不一样。所以,我们一定要定义好如何累加和合并值。也即add方法
object Accumulators:
该方法在Driver端管理着我们的累加器,也包含了特定累加器的聚合操作。
trait AccumulatorParam[T] extends AccumulableParam[T, T]:
AccumulatorParam的addAccumulator操作的泛型封装,具体的实现还是要再具体实现类里面实现addInPlace方法。
object AccumulatorParam:
主要是进行隐式类型转换的操作。
TaskContextImpl:
在Executor端管理着我们的累加器。
四,累加器的源码解析
1,Driver端的初始化
val accum = sc.accumulator(0, "test Accumulator")
val acc = new Accumulator(initialValue, param, Some(name))
主要是在Accumulable(Accumulator)中调用了,这样我们就可以使用Accumulator使用了。
Accumulators.register(this)
2,Executor端的反序列化得到我们对象的过程
首先,我们的value_ 可以看到其并不支持序列化
@volatile @transient private var value_ : R = initialValue // Current value on master
其初始化是在我们反序列化的时候做的,反序列化还完成了Accumulator向我们的TaskContextImpl的注册
反序列化是在调用ResultTask的RunTask方法的时候做的
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
过程中会调用
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
3,累加器的累加
accum+=1;
param.addAccumulator(value_, term)
根据不同的累加器参数有不同的实现AccumulableParam
如,int类型。最终调用的AccumulatorParam特质的addAccumulator方法。
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}
然后,调用的是各个具体实现的addInPlace方法
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}
返回后更新了我们的Accumulators的value_的值。
4,Accumulator的各个节点累加的之后的聚合操作
在Task类的run方法里面得到并返回的
(runTask(context), context.collectAccumulators())
最终在DAGScheduler里面调用了updateAccumulators(event)
在updateAccumulators方法中
Accumulators.add(event.accumUpdates)
具体内容如下:
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
5,最后我们就可以获取到累加器的值了
accum.value
五,累加器使用注意事项
累加器不会改变我们RDD的Lazy的特性,之后再Action之后完成计算和更新。
但是假如出现两个Action公用一个转化操作,如map,在map里面进行累加器累加,那么每次action都会累加,造成某些我们不需要的结果。
六,自定义累加器
自定义累加器输出
七,总结
主要牵涉点就是序列化及类加载执行,这是深入玩spark的必须。
- 碎片化 | 第四阶段-42-校验验证码-视频
- Angularjs中UI Router超级详细的教程{{上}}
- 【LEETCODE】模拟面试-120- Triangle
- Spring Cloud Edgware新特性之四:Zuul routes端点功能增强
- Spring Cloud Edgware新特性之三:使用配置属性自定义Feign的行为
- 碎片化 | 第四阶段-43-struts2拦截器interceptor-视频
- Spring Cloud Edgware新特性之二:如何配置Zuul的Hystrix线程池
- 碎片化 | 第四阶段-44-struts2注解使用-视频
- Spring Cloud Edgware新特性之:解决Eureka中Jersey 1.x版本过旧的问题-不使用Jersey
- 如何自定义微服务的Instance ID
- 怎样做情感分析
- Python|编写自己的类
- 碎片化 | 第四阶段-45-session为空问题解决-视频
- 以后有机会写框架用得着的
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Qt音视频开发29-Onvif云台控制
- 十大高性能开发
- 【云+社区年度征文】java agent及字节码技术得到DNS时间流程尝试
- 短视频商城源码,制作彩色验证码
- ubuntu下的mysql安装与使用
- 排序算法
- 并发编程框架Disruptor之高性能设计
- 3分钟短文:Laravel说要用软删除,可不要真删
- leetcode队列之设计循环双端队列
- MySQL8.0的binlog详解
- Flutter NestedScrollView实现的一个经典滑动折叠头部图片的效果
- Maven 编译拷贝资源的时候提示 UTF-8 编码信息
- Qt音视频开发30-Onvif事件订阅
- 干掉 Navicat:正版,MySQL 官方客户端真香!
- CentOS 搭建 K8S 环境教程,一次性成功,收藏了!