必读|spark的重分区及排序
时间:2022-06-20
本文章向大家介绍必读|spark的重分区及排序,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
前几天,有人在星球里,问了一个有趣的算子,也即是RepartitionAndSortWithinPartitions。当时浪尖也在星球里讲了一下,整个关于分区排序的内容。今天,在这里给大家分享一下。
昨天说了,mapPartitions 的使用技巧。大家应该都知道mapPartitions值针对整个分区执行map操作。而且对于PairRDD的分区默认是基于hdfs的物理块,当然不可分割的话就是hdfs的文件个数。但是我们也可以给partitionBy 算子传入HashPartitioner,来给RDD进行重新分区,而且会使得key的hashcode相同的数据落到同一个分区。
spark 1.2之后引入了一个高质量的算子repartitionAndSortWithinPartitions 。该算子为spark的Shuffle增加了sort。假如,后面再跟mapPartitions算子的话,其算子就是针对已经按照key排序的分区,这就有点像mr的意思了。与groupbykey不同的是,数据不会一次装入内存,而是使用迭代器一次一条记录从磁盘加载。这种方式最小化了内存压力。
repartitionAndSortWithinPartitions 也可以用于二次排序。
下面举个简单的例子。
import org.apache.spark.Partitioner
class KeyBasePartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[Int]
Math.abs(k.hashCode() % numPartitions)
}
}
import org.apache.spark.SparkContext._
sc.textFile("file:///opt/hadoop/spark-2.3.1/README.md").flatMap(_.split("\s+")).map((_,1)).reduceByKey(_+_).map(each=>(each._2,each._1))
implicit val caseInsensitiveOrdering = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compareTo(a)
}
// Sort by key, using
res7.repartitionAndSortWithinPartitions(new KeyBasePartitioner(3)).saveAsTextFile("file:///opt/output/")
结果,可以看到每个分区都是有效的。
mdhdeMacBook-Pro-3:output mdh$ pwd
/opt/output
mdhdeMacBook-Pro-3:output mdh$ ls
_SUCCESS part-00000 part-00001 part-00002
mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00000
(24,the)
(12,for)
(9,##)
(9,and)
(6,is)
(6,in)
(3,general)
(3,documentation)
(3,example)
(3,how)
mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00001
(16,Spark)
(7,can)
(7,run)
(7,on)
(4,build)
(4,Please)
(4,with)
(4,also)
(4,if)
(4,including)
mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00002
(47,)
(17,to)
(8,a)
(5,using)
(5,of)
(2,Python)
(2,locally)
(2,This)
(2,Hive)
(2,SparkPi)
mdhdeMacBook-Pro-3:output mdh$
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 【Code】GraphSAGE 源码解析
- Kafka常见的导致重复消费原因和解决方案
- 近30个MySQL常用函数,必须推荐!
- 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文
- 用注解实现 MyBatis 开发
- MyBatis 实现数据的增删改查
- 0790-5.16.2-NameNode服务的edits不同步异常
- 0789-不停止MySQL服务重做备库的方法
- 防盗链Apache和Nginx配置对比
- Python 类特殊方法__getitem__
- 前端自动化测试探索和实践
- 我的开发日记(十四)
- 测试梗--欢迎补充
- Vue 3 正式进入 RC 阶段!
- 广义线性模型应用举例之泊松回归及R计算