举例说明Spark RDD的分区、依赖
时间:2022-06-07
本文章向大家介绍举例说明Spark RDD的分区、依赖,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
例子如下:
scala> val textFileRDD = sc.textFile("/Users/zhuweibin/Downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt")
15/08/03 07:00:08 INFO MemoryStore: ensureFreeSpace(57160) called with curMem=0, maxMem=278019440
15/08/03 07:00:08 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 55.8 KB, free 265.1 MB)
15/08/03 07:00:08 INFO MemoryStore: ensureFreeSpace(17237) called with curMem=57160, maxMem=278019440
15/08/03 07:00:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.8 KB, free 265.1 MB)
15/08/03 07:00:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:51675 (size: 16.8 KB, free: 265.1 MB)
15/08/03 07:00:08 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
textFileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> println( textFileRDD.partitions.size )
15/08/03 07:00:09 INFO FileInputFormat: Total input paths to process : 1
2
scala> textFileRDD.partitions.foreach { partition =>
| println("index:" + partition.index + " hasCode:" + partition.hashCode())
| }
index:0 hasCode:1681
index:1 hasCode:1682
scala> println("dependency size:" + textFileRDD.dependencies)
dependency size:List(org.apache.spark.OneToOneDependency@543669de)
scala> println( textFileRDD )
MapPartitionsRDD[1] at textFile at <console>:21
scala> textFileRDD.dependencies.foreach { dep =>
| println("dependency type:" + dep.getClass)
| println("dependency RDD:" + dep.rdd)
| println("dependency partitions:" + dep.rdd.partitions)
| println("dependency partitions size:" + dep.rdd.partitions.length)
| }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:/Users/zhuweibin/Downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt HadoopRDD[0] at textFile at <console>:21
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2
scala>
scala> val flatMapRDD = textFileRDD.flatMap(_.split(" "))
flatMapRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23
scala> println( flatMapRDD )
MapPartitionsRDD[2] at flatMap at <console>:23
scala> flatMapRDD.dependencies.foreach { dep =>
| println("dependency type:" + dep.getClass)
| println("dependency RDD:" + dep.rdd)
| println("dependency partitions:" + dep.rdd.partitions)
| println("dependency partitions size:" + dep.rdd.partitions.length)
| }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:MapPartitionsRDD[1] at textFile at <console>:21
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2
scala>
scala> val mapRDD = flatMapRDD.map(word => (word, 1))
mapRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25
scala> println( mapRDD )
MapPartitionsRDD[3] at map at <console>:25
scala> mapRDD.dependencies.foreach { dep =>
| println("dependency type:" + dep.getClass)
| println("dependency RDD:" + dep.rdd)
| println("dependency partitions:" + dep.rdd.partitions)
| println("dependency partitions size:" + dep.rdd.partitions.length)
| }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:MapPartitionsRDD[2] at flatMap at <console>:23
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2
scala>
scala>
scala> val counts = mapRDD.reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:27
scala> println( counts )
ShuffledRDD[4] at reduceByKey at <console>:27
scala> counts.dependencies.foreach { dep =>
| println("dependency type:" + dep.getClass)
| println("dependency RDD:" + dep.rdd)
| println("dependency partitions:" + dep.rdd.partitions)
| println("dependency partitions size:" + dep.rdd.partitions.length)
| }
dependency type:class org.apache.spark.ShuffleDependency
dependency RDD:MapPartitionsRDD[3] at map at <console>:25
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2
scala>
从输出我们可以看出,对于任意一个RDD x来说,其dependencies代表了其直接依赖的RDDs(一个或多个)。那dependencies又是怎么能够表明RDD之间的依赖关系呢?假设dependency为dependencies成员
- dependency的类型(NarrowDependency或ShuffleDependency)说明了该依赖是窄依赖还是宽依赖
- 通过dependency的
def getParents(partitionId: Int): Seq[Int]
方法,可以得到子RDD的每个分区依赖父RDD的哪些分区 - dependency包含RDD成员,即子RDD依赖的父RDD,该RDD的compute函数说明了对该父RDD的分区进行怎么样的计算能得到子RDD的分区
- 该父RDD中同样包含dependency成员,该dependency同样包含上述特点,同样可以通过该父RDD的dependency成员来确定该父RDD依赖的爷爷RDD。同样可以通过
dependency.getParents
方法和爷爷RDD.compute来得出如何从父RDD回朔到爷爷RDD,依次类推,可以回朔到第一个RDD
那么,如果某个RDD的partition计算失败,要回朔到哪个RDD为止呢?上例中打印出的dependency.RDD如下:
MapPartitionsRDD[1] at textFile at <console>:21
MapPartitionsRDD[2] at flatMap at <console>:23
MapPartitionsRDD[3] at map at <console>:25
ShuffledRDD[4] at reduceByKey at <console>:27
可以看出每个RDD都有一个编号,在回朔的过程中,每向上回朔一次变回得到一个或多个相对父RDD,这时系统会判断该RDD是否存在(即被缓存),如果存在则停止回朔,如果不存在则一直向上回朔到某个RDD存在或到最初RDD的数据源为止。
- 快速入门系列--MVC--01概述
- JavaScript图片库
- SMB共享之SCF文件攻击解析
- JavaScript之arguements对象学习
- 让我们一起写出更有效的CSharp代码吧,少年们!
- SQL学习之计算字段的用法与解析
- JavaScript之JS的执行环境和作用域
- GOF设计模式快速学习
- JavaScript之面向对象学习一
- JavaScript之JS实现动画效果
- 深入入门系列--Data Structure--04树
- SQL学习之高级联结(自联结、自然联结、外联接)
- ExtJs学习笔记(21)-使用XTemplate结合WCF显示数据
- JavaScript之面向对象的概念,对象属性和对象属性的特性简介
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法