Spark函数讲解: combineByKey
1、背景
在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组、聚合或者将两个包含Pair数据的RDD根据key进行join。从函数的抽象层面看,这些操作具有共同的特征,都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。这种数据处理操作并非单纯的对Pair的value进行map,而是针对不同的key值对原有的value进行联合(Combine)。因而,不仅类型可能不同,元素个数也可能不同。
combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据的类型不同的返回值。
Spark为此提供了一个高度抽象的操作combineByKey。该方法的定义如下所示:
def combineByKey[C](
//在找到给定分区中第一次碰到的key(在RDD元素中)时被调用。此方法为这个key初始化一个累加器。
createCombiner: V => C,
//当累加器已经存在的时候(也就是上面那个key的累加器)调用。
mergeValue: (C, V) => C,
// 如果哪个key跨多个分区,该参数就会被调用。
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null
): RDD[(K, C)] = { //实现略 }
函数式风格与命令式风格不同之处在于它说明了代码做了什么(what to do),而不是怎么做(how to do)。combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
2、原理
由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。
- 如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。
- 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
- 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
3、示例:
让我们来计算每一项科目的平均值
// 关闭 spark-shell INFO/DEBUG 调试信息
scala> sc.setLogLevel("WARN")
scala> val inputrdd = sc.parallelize(Seq(
("maths", 50), ("maths", 60),
("english", 65),
("physics", 66), ("physics", 61), ("physics", 87)),
1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27
scala> inputrdd.getNumPartitions
res55: Int = 1
scala> val reduced = inputrdd.combineByKey(
(mark) => {
println(s"Create combiner -> ${mark}")
(mark, 1)
},
(acc: (Int, Int), v) => {
println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
(acc._1 + v, acc._2 + 1)
},
(acc1: (Int, Int), acc2: (Int, Int)) => {
println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
(acc1._1 + acc2._1, acc1._2 + acc2._2)
}
)
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29
scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))
scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31
scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))
注意:本例中因为只有一个分区所以 mergeCombiners 并没有用到,你也可以通过下面的代码从另外角度来验证:
scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21
scala> rdd1.getNumPartitions
res18: Int = 64
scala> rdd1.combineByKey(
(v : Int) => v + "_",
(c : String, v : Int) => c + "@" + v,
(c1 : String, c2 : String) => c1 + "$" + c2
).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
在此例中,因为分区多而记录少,可以看做每条记录都跨分区了,所以没有机会用到 mergeValue,最后直接 mergeCombiners 得到结果 。
除了可以进行group、average之外,根据传入的函数实现不同,我们还可以利用combineByKey完成诸如aggregate、fold等操作。这是一个高度的抽象,但从声明的角度来看,却又不需要了解过多的实现细节。这正是函数式编程的魅力。
Refer:
[1] Spark函数讲解:combineByKey
http://bihell.com/2017/03/14/Combiner-in-Pair-RDDs-combineByKey/
[2] combineByKey操作
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/combinebykey.html
[3] Spark算子执行流程详解之五
http://blog.csdn.net/wl044090432/article/details/59483319
[4] Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
http://lxw1234.com/archives/2015/07/358.htm
- 学习SVM(四) 理解SVM中的支持向量(Support Vector)
- 转行数据挖掘和机器学习(四)
- 了解ASP.NET MVC几种ActionResult的本质:JavaScriptResult & JsonResult
- 学习SVM(五)理解线性SVM的松弛因子
- 了解ASP.NET MVC几种ActionResult的本质:EmptyResult & ContentResult
- 可视化(番外篇)——SWT总结
- 新年必看!预测2018年将是区块链爆发的行情
- 探秘Tomcat(一)——Myeclipse中导入Tomcat源码
- 解决Myeclipse下Debug出现Source not found以及sql server中导入数据报错
- Hadoop阅读笔记(七)——代理模式
- 认识ASP.NET MVC的5种AuthorizationFilter
- 2017十大“最差”密码出炉
- SVG图形绘制入门第一弹
- 使用自定义标记来构建页面
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Flutter “孔雀开屏”动画效果
- Flutter 使用Navigator进行局部跳转页面
- Flutter 动画鼻祖之CustomPaint
- 【Flutter实战】Flutter 中那么多组件,难道要都学一遍?
- 【Flutter组件终结篇】332个组件 658页PDF
- Kubernetes在pod中配置hosts解析域名
- 【Flutter 实战】简约而不简单的计算器
- Flutter 中渐变的高级用法
- 【Flutter实战】动画核心(1/2)
- 【Flutter实战】动画核心(2/2)
- Flutter 1.17 新 Material motion 规范的预构建动画
- Canonical通过Flutter启用Linux桌面应用程序支持
- Flutter 快捷开发 Mac Android Studio 篇
- TRTC Android端开发接入学习之互动直播(七)
- Flutter 实现酷炫的3D效果