hadoop系列之MR的经典代码案例一
时间:2022-04-25
本文章向大家介绍hadoop系列之MR的经典代码案例一,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
七、MapReduce经典案例
1、网站分析案例 1)分析
省份访问
procinceId --> Key
1 -->Value
<procinceId,list(1,1,1,1,1,)>
数据库:
维度表
tb_provinve_info
provinveId
provinveName
provinveXxx
江苏省 -> 2098
上海市 -> 34563
2)程序
i.设置Mapper类和Map方法
ii.设置Reduce类和reduce方法
iii.设置run方法
iv.设置main方法
v.设置计数器(设置在mapper类中)
3)导出jar包运行
i.eclipse打包
iv.YARN命令运行
$ bin/yarn jar .....
2、二次排序
http://blog.csdn.net/u014729236/article/details/46327335
1)IntPair类
package com.hadoop.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair> {
private IntWritable first;
private IntWritable second;
public void set(IntWritable first, IntWritable second) {
this.first = first;
this.second = second;
}
//注意:需要添加无参的构造方法,否则反射时会报错。
public IntPair() {
set(new IntWritable(), new IntWritable());
}
public IntPair(int first, int second) {
set(new IntWritable(first), new IntWritable(second));
}
public IntPair(IntWritable first, IntWritable second) {
set(first, second);
}
public IntWritable getFirst() {
return first;
}
public void setFirst(IntWritable first) {
this.first = first;
}
public IntWritable getSecond() {
return second;
}
public void setSecond(IntWritable second) {
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof IntPair) {
IntPair tp = (IntPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "t" + second;
}
@Override
public int compareTo(IntPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
2)Secondary类
package com.hadoop.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondarySort {
static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("t");
int field1 = Integer.parseInt(fields[0]);
int field2 = Integer.parseInt(fields[1]);
context.write(new IntPair(field1,field2), NullWritable.get());
}
}
static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
//private static final Text SEPARATOR = new Text("------------------------------------------------");
@Override
protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return Math.abs(key.getFirst().get()) % numPartitions;
}
}
//如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序
public static class KeyComparator extends WritableComparator {
//无参构造器必须加上,否则报错。
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
//第一列按升序排序
int cmp = ip1.getFirst().compareTo(ip2.getFirst());
if (cmp != 0) {
return cmp;
}
//在第一列相等的情况下,第二列按倒序排序
return -ip1.getSecond().compareTo(ip2.getSecond());
}
}
/* public static class GroupComparator extends WritableComparator {
//无参构造器必须加上,否则报错。
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
return ip1.getFirst().compareTo(ip2.getFirst());
}
}*/
//入口程序
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
//设置Mapper的相关属性
job.setMapperClass(TheMapper.class);
//当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。
//job.setMapOutputKeyClass(IntPair.class);
//job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置分区的相关属性
job.setPartitionerClass(FirstPartitioner.class);
//在map中对key进行排序
job.setSortComparatorClass(KeyComparator.class);
//job.setGroupingComparatorClass(GroupComparator.class);
//设置Reducer的相关属性
job.setReducerClass(TheReducer.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Reducer数量
int reduceNum = 1;
if(args.length >= 3 && args[2] != null){
reduceNum = Integer.parseInt(args[2]);
}
job.setNumReduceTasks(reduceNum);
job.waitForCompletion(true);
}
}
3)测试
打成secsort.jar包,从hdfs上的/test/secsortdata获取数据文件,mapreduce输出目录是/test/secsortresult8,启动1个reduce:
hadoop jar secsort.jar /test/secsortdata /test/secsortresult8 1
测试结果:
3、二次排序(写法二)
1)IntPair类
package com.hadoop.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;
public void set(int first, int second) {
this.first = first;
this.second = second;
}
// 注意:需要添加无参的构造方法,否则反射时会报错。
public IntPair() {
}
public IntPair(int first, int second) {
set(first, second);
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.write(first);
out.write(second);
}
@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}
@Override
public int hashCode() {
return first + "".hashCode() + second + "".hashCode();
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.getFirst() == first && r.getSecond() == second;
} else {
return false;
}
}
// 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
@Override
public int compareTo(IntPair o) {
if (first != o.getFirst()) {
return first - o.getFirst();
} else if (second != o.getSecond()) {
return o.getSecond() - second;
} else {
return 0;
}
}
}
2)Secondary类
package com.hadoop.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondarySort {
static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("t");
int field1 = Integer.parseInt(fields[0]);
int field2 = Integer.parseInt(fields[1]);
context.write(new IntPair(field1,field2), NullWritable.get());
}
}
static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
//private static final Text SEPARATOR = new Text("------------------------------------------------");
@Override
protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
@Override
public int getPartition(IntPair key, NullWritable value,
int numPartitions) {
return Math.abs(key.getFirst().get()) % numPartitions;
}
}
//如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序
public static class KeyComparator extends WritableComparator {
//无参构造器必须加上,否则报错。
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
//第一列按升序排序
int cmp = ip1.getFirst().compareTo(ip2.getFirst());
if (cmp != 0) {
return cmp;
}
//在第一列相等的情况下,第二列按倒序排序
return -ip1.getSecond().compareTo(ip2.getSecond());
}
}
/* public static class GroupComparator extends WritableComparator {
//无参构造器必须加上,否则报错。
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
return ip1.getFirst().compareTo(ip2.getFirst());
}
}*/
//入口程序
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
//设置Mapper的相关属性
job.setMapperClass(TheMapper.class);
//当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。
//job.setMapOutputKeyClass(IntPair.class);
//job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置分区的相关属性
job.setPartitionerClass(FirstPartitioner.class);
//在map中对key进行排序
job.setSortComparatorClass(KeyComparator.class);
//job.setGroupingComparatorClass(GroupComparator.class);
//设置Reducer的相关属性
job.setReducerClass(TheReducer.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Reducer数量
int reduceNum = 1;
if(args.length >= 3 && args[2] != null){
reduceNum = Integer.parseInt(args[2]);
}
job.setNumReduceTasks(reduceNum);
job.waitForCompletion(true);
}
}
PS#Scala二次排序
package com.spark.secondApp
import org.apache.spark.{SparkContext, SparkConf}
object SecondarySort {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(" Secondary Sort ").setMaster("local")
val sc = new SparkContext(conf)
val file = sc.textFile("hdfs://worker02:9000/test/secsortdata")
val rdd = file.map(line => line.split("t")).
map(x => (x(0),x(1))).groupByKey().
sortByKey(true).map(x => (x._1,x._2.toList.sortWith(_>_)))
val rdd2 = rdd.flatMap{
x =>
val len = x._2.length
val array = new Array[(String,String)](len)
for(i <- 0 until len) {
array(i) = (x._1,x._2(i))
}
array
}
sc.stop()
}
}
承接子推荐阅读:
后续会讲MR join的经典案例。
- 如何通过iframe调用其他页面的内容
- WCF 学习总结1 -- 简单实例
- Java8 + Tomcat8 实现Websocket 例子
- Python Syslog Server 开发实例
- WCF 学习总结2 -- 配置WCF
- SQLite事务 SQLite插入多条语句为什么这么慢?.net (C#)
- Linux 应用程序开发入门
- LINQ to XML LINQ学习第一篇
- PHP 安全与性能
- Extjs 项目中常用的小技巧,也许你用得着(5)--设置 Ext.data.Store 传参的请求方式
- WPF备忘录(5)怎样修改模板中的控件
- Linux 系统与数据库安全
- 使用委托实现同步回调与异步回调
- Application Firewall Design
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 11.实现AI对战AI的五子棋程序
- 8.wxPython设置图像遮罩(mask)的方法
- Logstash:Data转换,分析,提取,丰富及核心操作
- 7.wxPython制作一个桌面精灵
- html前端之css绘制形状
- Python 对列表中的字符串首字母大写处理
- 对加密-加签的完整流程
- 加密-解密详解
- 链表中的指针到底是啥 ?
- java8流处理遍历(仅供个人学习)
- Kubernetes 为什么需要策略支持
- iOS富文本之数字小角标
- 50. Vue名称案例-使用keyup事件监听
- 浅谈树形结构的特性和应用(上):多叉树,红黑树,堆,Trie树,B树,B+树...
- 腾讯云-轻量应用服务器SaaS交付Discuz! Q