MapReduce之自定义InputFormat
时间:2022-07-22
本文章向大家介绍MapReduce之自定义InputFormat,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在企业开发中,Hadoop框架自带的
InputFormat
类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。 自定义InputFormat步骤如下:
- (1)自定义一个类继承
FilelnputFormat
。 - (2)自定义一个类继承
RecordReader
,实现一次读取一个完整文件,将文件名为key,文件内容为value。 - (3)在输出时使用
SequenceFileOutPutFormat
输出合并文件。
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
1. 需求
将多个小文件合并成一个SequenceFile
文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据
(2)期望输出文件格式
2. 需求分析
- 自定义一个类继承
FileInputFormat
(1)重写isSplitable()
方法,返回false
,让文件不可切,整个文件作为1片。 (2)重写createRecordReader(),返回自定义的RecordReader对象 - 自定义一个类继承
RecordReader
在RecordReader中,nextKeyValue
()是最重要的方法,返回当前读取到的key-value
,如果读到返回true
,调用Mapper的map()来处理,否则返回false
3. 编写程序
MyInputFormat.java
/*
* 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切
*
* 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value
*/
public class MyInputFormat extends FileInputFormat {
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new MyRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
MyRecordReader.java
/*
* RecordReader从MapTask处理的当前切片中读取数据
*
* XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象
*/
public class MyRecordReader extends RecordReader {
private Text key;
private BytesWritable value;
private String filename;
private int length;
private FileSystem fs;
private Path path;
private FSDataInputStream is;
private boolean flag=true;
// MyRecordReader在创建后,在进入Mapper的run()之前,自动调用
// 文件的所有内容设置为1个切片,切片的长度等于文件的长度
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fileSplit=(FileSplit) split;
filename=fileSplit.getPath().getName();
length=(int) fileSplit.getLength();
path=fileSplit.getPath();
//获取当前Job的配置对象
Configuration conf = context.getConfiguration();
//获取当前Job使用的文件系统
fs=FileSystem.get(conf);
is = fs.open(path);
}
// 读取一组输入的key-value,读到返回true,否则返回false
// 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true
// 第二次调用nextKeyValue()返回false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (flag) {
//实例化对象
if (key==null) {
key=new Text();
}
if (value==null) {
value=new BytesWritable();
}
//赋值
//将文件名封装到key中
key.set(filename);
// 将文件的内容读取到BytesWritable中
byte [] content=new byte[length];
IOUtils.readFully(is, content, 0, length);
value.set(content, 0, length);
flag=false;
return true;
}
return false;
}
//返回当前读取到的key-value中的key
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return key;
}
//返回当前读取到的key-value中的value
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return value;
}
//返回读取切片的进度
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
// 在Mapper的输入关闭时调用,清理工作
@Override
public void close() throws IOException {
if (is != null) {
IOUtils.closeStream(is);
}
if (fs !=null) {
fs.close();
}
}
}
CustomIFMapper.java
public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
}
CustomIFReducer.java
public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{
}
CustomIFDriver.java
public class CustomIFDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("e:/mrinput/custom");
Path outputPath=new Path("e:/mroutput/custom");
//作为整个Job的配置
Configuration conf = new Configuration();
//保证输出目录不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 创建Job
Job job = Job.getInstance(conf);
// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
job.setMapperClass(CustomIFMapper.class);
job.setReducerClass(CustomIFReducer.class);
// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 设置输入和输出格式
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// ③运行Job
job.waitForCompletion(true);
}
}
- 数据城堡参赛代码实战篇(五)---使用sklearn解决分类问题
- 洛谷P1894 [USACO4.2]完美的牛栏The Perfect Stall
- [编程经验]Python生成器、迭代器与yield语句小结
- TensorFlow从0到1 - 12 - TensorFlow构建3层NN玩转MNIST
- 数据城堡参赛代码实战篇(四)---使用pandas合并数据表
- HDU 2586 How far away ?
- HDU 3078 Network
- 数据城堡参赛代码实战篇(三)---我们来探究一个深奥的问题!
- 数据城堡参赛代码实战篇(二)---使用pandas进行数据去重
- 洛谷P3375 【模板】KMP字符串匹配
- Day5下午解题报告1
- [编程经验] Python中处理时间的方法小结
- 数据城堡参赛代码实战篇(一)---手把手教你使用pandas
- [编程经验] SciPy之图像处理小结
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- FreeRTOS内核应用开发手记
- 移植FreeRTOS后运行,卡在uxDeletedTasksWaitingCleanUp
- 99%的人都不知道内网、外网、宽带、带宽、流量、网速之间的区别与联系
- 什么是前缀树--打开了我的新思路
- 一个案例搞懂原码、反码、补码,不懂得请看过来
- 人人都在用,但你却不知道它背后发生了什么——浏览器的工作原理:浏览器幕后揭秘
- 这有一把钥匙,打开MySQL死锁问题!
- 普通人如何全面了解大数据的特点,意义和发展前景
- 面试官问我Linux下常见网络命令
- 最全常用User-Agent
- 聊到JVM(还怕面试官问JVM吗?)
- Android.location.Address类方法获取GPS定位信息
- Python二叉树详解笔记
- 《剑指offer》第七天:二叉树的下一个结点
- 后台登录微信并定时发送消息,消息包括农历、阴历、天气;自动监测是否断线,支持邮箱发送二维码登录;适合于挂在服务器上运行