MapReduce中的自定义多目录/文件名输出HDFS
最近考虑到这样一个需求:
需要把原始的日志文件用hadoop做清洗后,按业务线输出到不同的目录下去,以供不同的部门业务线使用。
这个需求需要用到MultipleOutputFormat和MultipleOutputs来实现自定义多目录、文件的输出。
需要注意的是,在hadoop 0.21.x之前和之后的使用方式是不一样的:
hadoop 0.21 之前的API 中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat 和 org.apache.hadoop.mapred.lib.MultipleOutputs,而到了 0.21 之后 的API为 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs ,
新版的API 整合了上面旧API两个的功能,没有了MultipleOutputFormat。
本文将给出新旧两个版本的API code。
1、旧版0.21.x之前的版本:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultiFile extends Configured implements Tool {
public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, Text> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
output.collect(NullWritable.get(), value);
}
}
// MultipleTextOutputFormat 继承自MultipleOutputFormat,实现输出文件的分类
public static class PartitionByCountryMTOF extends
MultipleTextOutputFormat<NullWritable, Text> { // key is
// NullWritable,
// value is Text
protected String generateFileNameForKeyValue(NullWritable key,
Text value, String filename) {
String[] arr = value.toString().split(",", -1);
String country = arr[4].substring(1, 3); // 获取country的名称
return country + "/" + filename;
}
}
// 此处不使用reducer
/*
* public static class Reducer extends MapReduceBase implements
* org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text>
* {
*
* @Override public void reduce(LongWritable key, Iterator<Text> values,
* OutputCollector<NullWritable, Text> output, Reporter reporter) throws
* IOException { // TODO Auto-generated method stub
*
* }
*
* }
*/
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MultiFile.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MultiFile");
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(PartitionByCountryMTOF.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MultiFile(), args);
System.exit(res);
}
}
测试数据及结果:
hadoop fs -cat /tmp/multiTest.txt
5765303,1998,14046,1996,"AD","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,,
5785566,1998,14088,1996,"AD","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,,
5894770,1999,14354,1997,"AD","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,
5765303,1998,14046,1996,"CN","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,,
5785566,1998,14088,1996,"CN","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,,
5894770,1999,14354,1997,"CN","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,
from:
MultipleOutputFormat Example
http://mazd1002.blog.163.com/blog/static/665749652011102553947492/
2、新版0.21.x及之后的版本:
public class TestwithMultipleOutputs extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {
private MultipleOutputs<Text,IntWritable> mos;
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text,IntWritable>(context);
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] tokens = line.split("-");
mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //(第一处)
mos.write("MOSText", new Text(tokens[0]),tokens[2]); //(第二处)
mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/"); //(第三处)同时也可写到指定的文件或文件夹中
}
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf,"word count with MultipleOutputs");
job.setJarByClass(TestwithMultipleOutputs.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
System.exit(res);
}
}
测试的数据:
abc-1232-hdf abc-123-rtd ioj-234-grjth ntg-653-sdgfvd kju-876-btyun bhm-530-bhyt hfter-45642-bhgf bgrfg-8956-fmgh jnhdf-8734-adfbgf ntg-68763-nfhsdf ntg-98634-dehuy hfter-84567-drhuk
结果截图:(结果输出到/test/testMOSout)
PS:遇到的一个问题:
如果没有mos.close(), 程序运行中会出现异常:
12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:
org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /test/mosreduce/_temporary/_attempt_local_0001_r_000000_0/h-r-00000 File does not exist. [Lease. Holder: DFSClient_-352105532, pendingcreates: 5]
from:
MultipleOutputFormat和MultipleOutputs
http://www.cnblogs.com/liangzh/archive/2012/05/22/2512264.html
Hadoop利用Partitioner对输出文件分类(改写partition,路由到指定的文件中)
http://superlxw1234.iteye.com/blog/1495465
http://ghost-face.iteye.com/blog/1869926
更多参考&推荐阅读:
1、【Hadoop】利用MultipleOutputs,MultiOutputFormat实现以不同格式输出到多个文件
http://www.cnblogs.com/iDonal/archive/2012/08/07/2626588.html
2、cdh3u3 hadoop 0.20.2 MultipleOutputs 多输出文件初探
http://my.oschina.net/wangjiankui/blog/49521
3、使用MultipleOutputs
http://blog.163.com/ecy_fu/blog/static/444512620101274344951/
4、Hadoop reduce多个输出
http://blog.csdn.net/inte_sleeper/article/details/7042020
5、Hadoop 0.20.2中怎么使用MultipleOutputFormat实现多文件输出和完全自定义文件名
http://www.cnblogs.com/flying5/archive/2011/05/04/2078407.html
6、Hadoop OutputFormat浅析
http://zhb-mccoy.iteye.com/blog/1591635
7、others:
https://sites.google.com/site/hadoopandhive/home/how-to-write-output-to-multiple-named-files-in-hadoop-using-multipletextoutputformat https://issues.apache.org/jira/browse/HADOOP-3149 http://grokbase.com/t/hadoop/common-user/112ewx7s15/could-i-write-outputs-in-multiple-directories
8、MultipleOutputs 官方范例
http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html
9、多数据源输入:MultipleInputs
https://groups.google.com/forum/#!topic/nosql-databases/SH61smOV-mo
http://bigdataprocessing.wordpress.com/2012/07/27/hadoop-hbase-mapreduce-examples/
http://hbase.apache.org/book/mapreduce.example.html
10、Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究
http://www.iteblog.com/archives/842 (一)
http://www.iteblog.com/archives/848 (二)
- WordPress免插件仅修改代码去掉评论/留言里的链接
- 阅读Ext 学习Javascript(一)Core/Ext.js
- 利用腾讯的ip地址库做ip地址定位
- WordPress登陆不了后台的原因及解决方法(登陆界面不断返回)
- select元素的options.add 与 insertbefore的区别
- 如何去理解 拓扑排序算法
- WordPress免插件仅代码实现文章归档(模板页面)I
- Bing Map App 开发 还没入门遇见错误无法继续
- 使用MongoDB的支持Linq 驱动NoRM
- Ext的组件模型印象
- 2018年预计仍将持续活跃走高的数字货币都有哪些?
- Ext整体印象
- 重磅干货:AI场景的价值体现——视觉 AI 技术如何落地?
- 将单机版短信猫变成网络版
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Typecho配置多国语支持,检查客户端语言自动切换翻译
- NgModule imports定义的运行时数据结构
- 用Windows电脑训练深度学习模型?超详细配置教程来了
- Angular No provider for EffectsRootModule错误消息
- Leetcode No.15 三数之和
- 正则表达式介绍与使用
- Angular StoreFeatureModule
- Angular Component之间的事件通知机制
- 如何将你的Python项目全面自动化?
- 正则表达式介绍与使用
- Angular ERROR NullInjectorError: R3InjectorError(AppModule)的错误分析
- 2.4-Air302(NB-IOT)-基础外设-延时,定时器
- Shell正则表达式一览表
- YAML基础语法
- 详解Kubernetes存储体系