框架 | Spark中的combineByKey
在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组、聚合或者将两个包含Pair数据的RDD根据key进行join。从函数的抽象层面看,这些操作具有共同的特征,都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。这种数据处理操作并非单纯的对Pair的value进行map,而是针对不同的key值对原有的value进行联合(Combine)。因而,不仅类型可能不同,元素个数也可能不同。
Spark为此提供了一个高度抽象的操作combineByKey。该方法的定义如下所示:
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
//实现略
}
函数式风格与命令式风格不同之处在于它说明了代码做了什么(what to do),而不是怎么做(how to do)。combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
combineByKey是将RDD[(K,V)]combine为RDD[(K,C)],因此,首先需要提供一个函数,能够完成从V到C的combine,称之为combiner。如果V和C类型一致,则函数为V => V。倘若C是一个集合,例如Iterable[V],则createCombiner为V => Iterable[V]。
mergeValue则是将原RDD中Pair的Value合并为操作后的C类型数据。合并操作的实现决定了结果的运算方式。所以,mergeValue更像是声明了一种合并方式,它是由整个combine运算的结果来导向的。函数的输入为原RDD中Pair的V,输出为结果RDD中Pair的C。
最后的mergeCombiners则会根据每个Key所对应的多个C,进行归并。
让我们将combineByKey想象成是一个超级酷的果汁机。它能同时接受各种各样的水果,然后聪明地按照水果的种类分别榨出不同的果汁。苹果归苹果汁,橙子归橙汁,西瓜归西瓜汁。我们为水果定义类型为Fruit,果汁定义为Juice,那么combineByKey就是将RDD[(String, Fruit)]combine为RDD[(String, Juice)]。
注意,在榨果汁前,水果可能有很多,即使是相同类型的水果,也会作为不同的RDD元素:
("apple", apple1), ("orange", orange1), ("apple", apple2)
combine的结果是每种水果只有一杯果汁(只是容量不同罢了):
("apple", appleJuice), ("orange", orangeJuice)
这个果汁机由什么元件构成呢?首先,它需要一个元件提供将各种水果榨为各种果汁的功能;其次,它需要提供将果汁进行混合的功能;最后,为了避免混合错误,还得提供能够根据水果类型进行混合的功能。注意第二个函数和第三个函数的区别,前者只提供混合功能,即能够将不同容器的果汁装到一个容器中,而后者的输入已有一个前提,那就是已经按照水果类型放到不同的区域,果汁机在混合果汁时,并不会混淆不同区域的果汁。
果汁机的功能类似于groupByKey+foldByKey操作。它可以调用combineByKey函数:
case class Fruit(kind: String, weight: Int) {
def makeJuice:Juice = Juice(weight * 100)
}
case class Juice(volumn: Int) {
def add(j: Juice):Juice = Juice(volumn + j.volumn)
}
val apple1 = Fruit("apple", 5)
val apple2 = Fruit("apple", 8)
val orange1 = Fruit("orange", 10)
val fruit = sc.parallelize(List(("apple", apple1) , ("orange", orange1) , ("apple", apple2)))
val juice = fruit.combineByKey(
f => f.makeJuice,
(j:Juice,f) => j.add(f.makeJuice),
(j1:Juice,j2:Juice) => j1.add(j2)
)
执行juice.collect,结果为:
Array[(String, Juice)] = Array((orange,Juice(1000)), (apple,Juice(1300)))
RDD中有许多针对Pair RDD的操作在内部实现都调用了combineByKey函数。例如groupByKey:
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
}
groupByKey函数针对PairRddFunctions的RDD[(K, V)]按照key对value进行分组。它在内部调用了combineByKey函数,传入的三个函数分别承担了如下职责:
- createCombiner是将原RDD中的K类型转换为Iterable[V]类型,实现为CompactBuffer。
- mergeValue实则就是将原RDD的元素追加到CompactBuffer中,即将追加操作(+=)视为合并操作。
- mergeCombiners则负责针对每个key值所对应的Iterable[V],提供合并功能。
再例如,我们要针对科目对成绩求平均值:
val scores = sc.parallelize(List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0)))
平均值并不能一次获得,而是需要求得各个科目的总分以及科目的数量。因此,我们需要针对scores进行combine,从(String, Float)combine为(String, (Float, Int))。调用combineByKey函数后,我们可以再通过map来获得平均值。代码如下:
val avg = scores.combineByKey(
(v) => (v, 1),
(acc: (Float, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Float, Int), acc2:(Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
除了可以进行group、average之外,根据传入的函数实现不同,我们还可以利用combineByKey完成诸如aggregate、fold等操作。这是一个高度的抽象,但从声明的角度来看,却又不需要了解过多的实现细节。这正是函数式编程的魅力。
- 【52ABP实战教程】0.3-- 从GitHub推送代码回VSTS实现双向同步
- css绝对定位如何在不同分辨率下的电脑正常显示定位位置?
- nvm安装node和npm,个人踩坑记录
- clang_intprt_t类型探究
- 学习zepto.js(Hello World)
- JS中函数声明与函数表达式的异同
- [技巧]看我如何通过Weeman+Ettercap拿下路由器管理权限
- 一分钟理清Vue-cli 代码构建步骤。
- 点击图片放大至原始图片大小
- 替代jquery1.9版本以前的toggle事件函数(开关)
- 总结CSS3新特性(Animation篇)
- Scrapy爬虫入门
- 移动端页面按手机屏幕分辨率自动缩放的js
- PYTHON黑帽编程 4.1 SNIFFER(嗅探器)之数据捕获--补充
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 为什么我的Redis这么“慢”?
- Nginx系列:数据压缩
- Ray,面向新兴AI应用的分布式框架
- 60.Vue export default 和 export 的使用方式
- Idea开发maven插件
- redis实战 migrate异常NOAUTH Authentication required.
- linux内存使用情况分析(free + top)
- Centos7 python3安装
- crontab JAVA_HOME not found
- Centos7 源码安装mysql5.6
- mysql登录时报socket找不到终极解决方案
- Grafana将数据库由sqlite3改为mysql
- Python自学成才之路 带有参数的装饰器
- Grafana 将默认的元数据库 sqlite 改为 mysql
- 使用IDEA整合spring4+spring mvc+hibernate