hadoop 里执行 MapReduce 任务的几种常见方式
说明:
测试文件:
echo -e "aatbb tccnbbtcctdd" > 3.txt
hadoop fs -put 3.txt /tmp/3.txt
全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。
1、原生态的方式:java 源码编译打包成jar包后,由 hadoop 脚本调度执行,举例:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,
* 这些类实现了WritableComparable接口, 都能够被串行化从而便于在分布式环境中进行数据交换,
* 你可以将它们分别视为long,int,String 的替代品。
*/
// IntWritable one 相当于 java 原生类型 int 1
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 每行记录都会调用 map 方法处理,此处是每行都被分词
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// 输出每个词及其出现的次数 1,类似 <word1,1><word2,1><word1,1>
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
// key 相同的键值对会被分发到同一个 reduce中处理
// 例如 <word1,<1,1>>在 reduce1 中处理,而<word2,<1>> 会在 reduce2 中处理
int sum = 0;
// 相同的key(单词)的出现次数会被 sum 累加
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// 1个 reduce 处理完1 个键值对后,会输出其 key(单词)对应的结果(出现次数)
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 多队列hadoop集群中,设置使用的队列
conf.set("mapred.job.queue.name", "regular");
// 之所以此处不直接用 argv[1] 这样的,是为了排除掉运行时的集群属性参数,例如队列参数,
// 得到用户输入的纯参数,如路径信息等
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
for (String argsStr : otherArgs) {
System.out.println("-->> " + argsStr);
}
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
// map、reduce 输入输出类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
// 多子job的类中,可以保证各个子job串行执行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行:
bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/3.txt /tmp/5
结果:
hadoop fs -cat /tmp/5/*
aa 1
bb 2
cc 2
dd 1
参考资料:
Hadoop - Map/Reduce 通过WordCount例子的变化来了解新版hadoop接口的变化
http://blog.csdn.net/derekjiang/article/details/6836209
Hadoop示例程序WordCount运行及详解
http://samuschen.iteye.com/blog/763940
官方的 wordcount v1.0 例子
http://hadoop.apache.org/docs/r1.1.1/mapred_tutorial.html#Example%3A+WordCount+v1.0
2、基于 MR 的数据流 Like SQL 脚本开发语言:pig
A1 = load '/data/3.txt';
A = stream A1 through `sed "s/t/ /g"`;
B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;
C = filter B by word matches '\w+';
D = group C by word;
E = foreach D generate COUNT(C), group;
dump E;
注意:不同分隔符对load及后面的$0的影响。
详情请见:
https://gist.github.com/186460
http://www.slideshare.net/erikeldridge/a-brief-handson-introduction-to-hadoop-pig
3、构建数据仓库的类 SQL 开发语言:hive
create table textlines(text string);
load data inpath '/data/3.txt' overwrite into table textlines;
SELECT wordColumn, count(1) FROM textlines LATERAL VIEW explode(split(text,'t+')) wordTable AS wordColumn GROUP BY wordColumn;
详情请见:
http://my.oschina.net/leejun2005/blog/83045
http://blog.csdn.net/techdo/article/details/7433222
4、跨平台的脚本语言:python
map:
#!/usr/bin/python
import os,re,sys
for line in sys.stdin:
for i in line.strip().split("t"):
print i
reduce:
#!/usr/bin/python
import os,re,sys
arr = {}
for words in sys.stdin:
word = words.strip()
if word not in arr:
arr[word] = 1
else:
arr[word] += 1
for k, v in arr.items():
print str(k) + ": " + str(v)
最后在shell下执行:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py
注意:脚本开头需要显示指定何种解释器以及赋予脚本执行权限
详情请见:
http://blog.csdn.net/jiedushi/article/details/7390015
5、Linux 下的瑞士军刀:shell 脚本
map:
#!/bin/bash
tr 't' 'n'
reduce:
#!/bin/bash
sort|uniq -c
最后在shell下执行:
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py
packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null
12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1
12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/local]
12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041
12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run:
12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201210141552_0041
12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041
12/10/14 21:57:01 INFO streaming.StreamJob: map 0% reduce 0%
12/10/14 21:57:13 INFO streaming.StreamJob: map 67% reduce 0%
12/10/14 21:57:19 INFO streaming.StreamJob: map 100% reduce 0%
12/10/14 21:57:22 INFO streaming.StreamJob: map 100% reduce 22%
12/10/14 21:57:31 INFO streaming.StreamJob: map 100% reduce 100%
12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041
12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
hadoop fs -cat /data/py/part-00000
1 aa
1 bb
1 bb
2 cc
1 dd
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
特别提示:上述有些方法对字段后的空格忽略或计算,请注意仔细甄别。
说明:列举了上述几种方法主要是给大家一个不同的思路,
在解决问题的过程中,开发效率、执行效率都是我们需要考虑的,不要太局限某一种方法了。
- php性能监测模块XHProf
- BZOJ 1022: [SHOI2008]小约翰的游戏John (Anti-nim)
- 洛谷P2252 取石子游戏(威佐夫博弈)
- HDU 3032 Nim or not Nim?(Multi-Nim)
- POJ 2311 Cutting Game(二维SG+Multi-Nim)
- js去掉html标签和去掉字符串文本的所有的空格
- php操作memcache的使用测试总结
- linux awk命令详解
- php str_split 解决中文
- PHP汉字转拼音函数
- 51NOD 1185 威佐夫游戏 V2(威佐夫博弈)
- HDU 1527 取石子游戏(威佐夫博弈)
- PHP文件操作类
- Linux添加/删除用户和用户组
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- OpenWRT通过3G Modem加asterisk将GSM通话转为SIP By HKL,
- Coding通过Jenkins生成jekyll并发布到腾讯云对象存储Qcloud COS By HKL,
- mybatis 实用技巧:<trim prefix="where" prefixOverrides="and|or">
- OpenWRT配置Webdav(s)共享文件 By HKL,
- OpenWRT配置Apache Webdav By HKL,
- 我向面试官讲解了单例模式,他对我竖起了大拇指
- 47 张图带你 MySQL 进阶!!!
- 新特性解读 | InnoDB-Cluster 扫盲-日常运维
- Laravel 框架实现无限极分类
- 这样设置IDEA,让你爽到飞起!
- 这些年,我写过的BUG(一)
- Selenium处理下拉列表
- 掌握好这几个css属性,少写100行js代码
- 计算机基础知识总结与操作系统 PDF 下载
- Kafka工作流程及文件存储机制