自定义InputFormat案例实操
时间:2019-10-09
本文章向大家介绍自定义InputFormat案例实操,主要包括自定义InputFormat案例实操使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
引言:
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。(对外是一个整文件,对内仍是原先的小文件,节省MapTask)
需求如下:
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据
(2)期望输出文件格式
步骤:
程序实现
(1)自定义InputFromat
package cn.mark.mrInputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; //存储的形式为文件路径+名称为key,文件内容为value。读全部文件用到流,byte //故 输入Key类型为Text,输入Value类型为BytesWritable public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> { // 定义类继承FileInputFormat @Override protected boolean isSplitable(JobContext context, Path filename) { return false;//单个文件不允许再切片 } @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split,context); return recordReader; } }
(2)自定义RecordReader类(核心)
package cn.mark.mrInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; //RecordReader<Text, BytesWritable> 固有的输入KV格式 public class WholeRecordReader extends RecordReader<Text, BytesWritable> { // 主要针对缺什么补什么 FileSplit split; Configuration configuration; Text k = new Text(); BytesWritable v = new BytesWritable(); // 标记位 boolean isProgress = true; //************************************ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 初始化 this.split = (FileSplit) split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // 核心业务逻辑处理 对key和value进行封装 if (isProgress){ /** The number of bytes in the file to process. 获取文件字节的全部数量 public long getLength() { return fs.getLength(); } */ byte[] buf = new byte[(int) split.getLength()]; // 1.获取fs对象 Path path = split.getPath(); FileSystem fs = path.getFileSystem(configuration); // 2.获取输入流 FSDataInputStream fis = fs.open(path); // 3.拷贝 // readFully(InputStream in, byte buf[], int off, int len) // 4各参数: 1.要读的流 2.目的地 3.读的大小的起始位置 4.读的长度 /** * Reads len bytes in a loop. * @param in InputStream to read from * @param buf The buffer to fill * @param off offset from the buffer :缓冲区的偏移量,即开始位置 * @param len the length of bytes to read 先开辟一段相应长度的字节缓冲区,再读内容进去 */ IOUtils.readFully(fis,buf,0,buf.length); // 4.封装v v是文件的内容 /**Set the value to a copy of the given byte range * @param newData the new values to copy in * @param offset the offset in newData to start at * @param length the number of bytes to copy public void set(byte[] newData, int offset, int length) 再将之前的缓冲区内容通过set方法,设置成v的值 */ v.set(buf,0,buf.length); // 5.封装K ,k本身就是路径名 path.toString():既有路径又有文件名称 k.set(path.toString()); // 6.关闭资源 IOUtils.closeStream(fis); // 能进来说明能读到数据,而且每次调用nextKeyValue函数时候是说明已经新读一个文件, // 本WholeRecordReader类会重新创建对象,重新初始化,isProgress都会重新设为true isProgress = false;//说明本文件已经读完! return true; //只有return true才会执行下面的函数 /** public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { //<往常只读一行,有数据则true> map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } 下如果只写ture 则会无限循环,如果只写false则会不进循环,不进行读写操作, 故需要一个标记*/ } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return v; } //*********************************** @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
(3)编写SequenceFileMapper类处理流程
package cn.mark.mrInputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { // 不是一次读取一行,是一次读取一整个文件 context.write(key,value); } }
(4)编写SequenceFileReducer类处理流程
package cn.mark.mrInputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; // 要知道传过来的是什么数据及其类型 // 传过来a.txt , b.txt 输出 : <路径名文件名,文件内容> public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { // 循环写出 每次都是一个文件的全部内容 for (BytesWritable value : values) { context.write(key,value); } } }
(5)编写SequenceFileDriver类处理流程
package cn.mark.mrInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "C:\\Users\\Administrator\\Downloads\\input\\123", "C:\\Users\\Administrator\\Downloads\\input\\output" }; // 1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包存储位置、关联自定义的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); // 7设置输入的inputFormat job.setInputFormatClass(WholeFileInputformat.class); // 8设置输出的outputFormat 默认是Text.class job.setOutputFormatClass(SequenceFileOutputFormat.class); // 3 设置map输出端的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 4 设置最终输出端的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交job job.waitForCompletion(true); } }
ok!
原文地址:https://www.cnblogs.com/Mark-blog/p/11644209.html
- python selenium2示例 - email发送
- pyhton-----break语句
- python unittest使用基本过程
- 基于unittest集成你的selenium2测试
- Selenium Webdriver Desired Capabilities
- 在Selenium Webdriver中使用XPath Contains、Sibling函数定位
- Python多线程Selenium跨浏览器测试
- Python Selenium设计模式-POM
- 基于Excel参数化你的Selenium2测试
- 创建你的第一个webdriver python代码
- Python Selenium Webdriver安装手册
- 工具篇 - HTTP协议报文结构及示例03
- 工具篇 - JMeter目录及关键配置分析02
- python unittest之加载及跳过测试方法和示例
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Xcode清理模拟器文件
- 【Android 音视频开发打怪升级:FFmpeg音视频编解码篇】六、FFmpeg简单合成MP4:视屏解封与重新封装
- 32.opengl高级光照-延迟着色法
- Tsunami:一款功能强大的通用网络安全扫描工具
- Hive查看表/分区更新时间
- 直接通过手机抓取GPS的qxdm日志
- uniapp常用提示框uni.showToast(OBJECT)
- MySQL 索引(3)
- TS 设计模式04 - 适配器模式
- WebRTC | 原理、架构、框架目录、运行机制、核心类、PeerConnection调用过程等详解
- sql注入总结笔记
- WebRTC | Web服务器原理、Nodejs工作原理、Nodejs事件处理流程、V8引擎等要点解析
- OpenGL ES 3.0 | 围绕HelloTriangle实战案例 展开 渲染流程分析
- 基于 Kotlin + Netty 实现一个简单的 TCP 自定义协议
- dnslog带出——sqli-labs第8关