MapReduce之WritableComparable排序
时间:2022-07-22
本文章向大家介绍MapReduce之WritableComparable排序,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
@
目录
- 排序概述
- 获取Mapper输出的key的比较器(源码)
- 案例实操(区内排序)
- 自定义排序器,使用降序
- Key实现Comparable进行比较
排序概述
- 排序是MapReduce框架中最重要的操作之一。
- Map Task和ReduceTask均会默认对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
- 黑默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
- 对于
MapTask
,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。 - 对于
ReduceTask
,它从每个MapTak上远程拷贝相应的数据文件,如果文件大小超过一定阑值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。 - 排序器:排序器影响的是排序的速度(效率,对什么排序?),QuickSorter
- 比较器:比较器影响的是排序的结果(按照什么规则排序)
获取Mapper输出的key的比较器(源码)
public RawComparator getOutputKeyComparator() {
// 从配置中获取mapreduce.job.output.key.comparator.class的值,必须是RawComparator类型,如果没有配置,默认为null
Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);
// 一旦用户配置了此参数,实例化一个用户自定义的比较器实例
if (theClass != null){
return ReflectionUtils.newInstance(theClass, this);
}
//用户没有配置,判断Mapper输出的key的类型是否是WritableComparable的子类,如果不是,就抛异常,如果是,系统会自动为我们提供一个key的比较器
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
案例实操(区内排序)
需求 对每个手机号按照上行流量和下行流量的总和进行内部排序。
思考
因为Map Task和ReduceTask均会默认对数据按照key进行排序,所以需要把流量总和设置为Key
,手机号等其他内容设置为value
FlowBeanMapper.java
public class FlowBeanMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
private LongWritable out_key=new LongWritable();
private Text out_value=new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("t");
//封装总流量为key
out_key.set(Long.parseLong(words[3]));//切分后,流量和的下标为3
//封装其他内容为value
out_value.set(words[0]+"t"+words[1]+"t"+words[2]);
context.write(out_key, out_value);
}
}
FlowBeanReducer.java
public class FlowBeanReducer extends Reducer<LongWritable, Text, Text, LongWritable>{
@Override
protected void reduce(LongWritable key, Iterable<Text> values,
Reducer<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
FlowBeanDriver.java
public class FlowBeanDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("E:\mroutput\flowbean");
Path outputPath=new Path("e:/mroutput/flowbeanSort1");
//作为整个Job的配置
Configuration conf = new Configuration();
//保证输出目录不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①创建Job
Job job = Job.getInstance(conf);
// ②设置Job
// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowBeanReducer.class);
// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
//由于Mapper和Reducer输出的Key-value类型不一致(maper输出类型是long-text,而reducer是text-value)
//所以需要额外设定
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 默认升序排,可以设置使用自定义的比较器
//job.setSortComparatorClass(DecreasingComparator.class);
// ③运行Job
job.waitForCompletion(true);
}
}
运行结果(默认升序排)
自定义排序器,使用降序
- 方法一:自定义类,这个类必须是
RawComparator
类型,通过设置mapreduce.job.output.key.comparator.class
自定义的类的类型。 自定义类时,可以继承WriableComparator
类,也可以实现RawCompartor
调用方法时,先调用RawCompartor. compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
,再调用RawCompartor.compare()
- 方法二:定义Mapper输出的key,让key实现
WritableComparable
,实现CompareTo()
MyDescComparator.java
public class MyDescComparator extends WritableComparator{
@Override
public int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {
long thisValue = readLong(b1, s1);
long thatValue = readLong(b2, s2);
//这里把第一个-1改成1,把第二个1改成-1,就是降序排
return (thisValue<thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
}
}
运行结果
Key实现Comparable进行比较
思路二:把map输出时的key封装为一个bean
,这个key包含上行流量、下行流量、总流量,value
只有手机号
FlowBean.java
public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
private long downFlow;
private Long sumFlow;
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
// 序列化 在写出属性时,如果为引用数据类型,属性不能为null
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化 序列化和反序列化的顺序要一致
@Override
public void readFields(DataInput in) throws IOException {
upFlow=in.readLong();
downFlow=in.readLong();
sumFlow=in.readLong();
}
@Override
public String toString() {
return upFlow + "t" + downFlow + "t" + sumFlow;
}
// 系统封装的比较器在对比key时,调用key的compareTo进行比较
// 降序比较总流量
@Override
public int compareTo(FlowBean o) {
return -this.sumFlow.compareTo(o.getSumFlow());
}
}
FlowBeanMapper.java
public class FlowBeanMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
private FlowBean out_key=new FlowBean();
private Text out_value=new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("t");
//封装总流量为key
out_key.setUpFlow(Long.parseLong(words[1]));
out_key.setDownFlow(Long.parseLong(words[2]));
out_key.setSumFlow(Long.parseLong(words[3]));
out_value.set(words[0]);
context.write(out_key, out_value);
}
}
FlowBeanReducer.java
public class FlowBeanReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
@Override
protected void reduce(FlowBean key, Iterable<Text> values,
Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
FlowBeanDriver.java
public class FlowBeanDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("E:\mroutput\flowbean");
Path outputPath=new Path("e:/mroutput/flowbeanSort2");
//作为整个Job的配置
Configuration conf = new Configuration();
//保证输出目录不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①创建Job
Job job = Job.getInstance(conf);
// ②设置Job
// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowBeanReducer.class);
// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// ③运行Job
job.waitForCompletion(true);
}
}
- kotlin到底好在哪里?
- Django 1.10中文文档-第一个应用Part6-静态文件
- Django 1.10中文文档-第一个应用Part5-测试
- 设计模式(5)-己所不欲,施之于人(代理模式)
- Python标准库笔记(4) — collections模块
- 使用captcha模块生成图形验证码
- 设计模式(6)-装饰器(认识程序中的装饰器)
- Selenium Webdriver常用方法
- 设计模式(7)-模板(从事务处理应用的模板)
- Python NLP入门教程
- 设计模式(8)-状态模式(关注状态之间的变化)
- Python标准库笔记(6) — struct模块
- Golang中image/jpeg包和image/png包用法
- Python Webdriver 重新使用已经打开的浏览器实例
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Kafka 独立消费者
- 【iOS】记录iOS14以及xcode12 遇到的问题
- 如果你还在犹豫要不要入行,请先看看我的IT入坑记【技术创作101训练营】
- Session、Cookie、Token 【浅谈三者之间的那点事】
- Python 微信机器人-20行代码实现斗图功能,简单易懂,全是干货!斗图啦API调用方法
- 浅谈布隆过滤器
- Python 技术篇-获取图片GPS信息,锁定追踪图片拍摄地点、拍摄时间
- 测试工具 - Postman接口测试入门使用手册,Postman如何进行数据关联、自动更新cookies、简单编程
- 白盒测试工具 - sonar的安装、配置与使用入门手册,用sonar检查代码质量实战演示
- Chmod -R 777 / 误操作恢复教程
- 最全总结 | 聊聊 Python 数据处理全家桶(Redis篇)
- 虚拟机安装mikrotik-ROS
- 搬砖武士|手把手教你在容器服务 TKE 上使用 LB直通 Pod
- linux安装snmp服务-ubuntu
- 企业微信机器人