mapreduce数据处理——统计排序
时间:2019-11-14
本文章向大家介绍mapreduce数据处理——统计排序,主要包括mapreduce数据处理——统计排序使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
接上篇https://www.cnblogs.com/sengzhao666/p/11850849.html
2、数据处理:
·统计最受欢迎的视频/文章的Top10访问次数 (id)
·按照地市统计最受欢迎的Top10课程 (ip)
·按照流量统计最受欢迎的Top10课程 (traffic)
分两步:
统计;排序
初始文件部分样例:
1.192.25.84 2016-11-10-00:01:14 10 54 video 5551 1.194.144.222 2016-11-10-00:01:20 10 54 video 3589 1.194.187.2 2016-11-10-00:01:05 10 54 video 2212 1.203.177.243 2016-11-10-00:01:18 10 6050 video 7361 1.203.177.243 2016-11-10-00:01:19 10 72 video 7361 1.203.177.243 2016-11-10-00:01:22 10 6050 video 7361 1.30.162.63 2016-11-10-00:01:46 10 54 video 3639 1.84.205.195 2016-11-10-00:01:12 10 54 video 1412
统计:
package priv.tzk.mapreduce.dataProcess.visits; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class DataVisits { public static String INPUT_PATH="/home/hadoop/out"; public static String OUTPUT_PATH="hdfs://localhost:9000/mapReduce/mymapreduce1/out"; public static class Map extends Mapper<Object,Text,Text,IntWritable>{ //将输入输出作为string类型,对应Text类型 private static Text newKey=new Text(); //每一行作为一个数据 public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String line=value.toString();//转为字符串类型 //System.out.println(line); if(!("".equals(line)))//增加控制语句,使得line为”“时能够停止。否则不符合reduce接受的数据不会执行reduce { String arr[]=line.split("\t");//splite是按照输入的值拆分成数组 newKey.set(arr[5]); int click=1; context.write(newKey,new IntWritable(click)); //System.out.println(newKey+" "+new IntWritable(click)); } } } public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ int count=0; for(IntWritable val:values) { //Iterable迭代器 count++; } context.write(key,new IntWritable(count)); //System.out.println("reduceStart"); } } public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job=Job.getInstance(conf); job.setJobName("MyAverage"); //Job job =new Job(conf,"MyAverage"); job.setJarByClass(DataVisits.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);//设置map的输出格式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputpath=new Path(OUTPUT_PATH); Path inputpath=new Path(INPUT_PATH); FileInputFormat.addInputPath(job,inputpath ); FileOutputFormat.setOutputPath(job,outputpath); boolean flag = job.waitForCompletion(true); System.out.println(flag); System.exit(flag? 0 : 1); } }
统计部分结果样例:
10061 1 10077 1 10198 1 10290 1 10314 1 10324 1 1034 1 10400 1 10421 1 10427 1 10450 1 10505 1 10506 7 10511 1
针对统计结果排序:
package priv.tzk.mapreduce.dataProcess.visits; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class visitsSort { public static String INPUT_PATH="/home/hadoop/visits_out"; public static String OUTPUT_PATH="hdfs://localhost:9000/mapReduce/mymapreduce1/out1"; public static class Sort extends WritableComparator { public Sort(){ //这里就是看你map中填的输出key是什么数据类型,就给什么类型 super(IntWritable.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { return -a.compareTo(b);//加个负号就是倒序,把负号去掉就是正序。 } } public static class Map extends Mapper<Object,Text,IntWritable,Text>{ //将输入输出作为string类型,对应Text类型 private static Text mid=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String line=value.toString();//转为字符串类型 if(!("".equals(line)))//增加控制语句,使得line为”“时能够停止。否则不符合reduce接受的数据不会执行reduce { String arr[]=line.split("\t");//splite是按照输入的值拆分成数组 mid.set(arr[0]); num.set(Integer.parseInt(arr[1])); context.write(num,mid); } } } //MapReduce框架默认排序规则。它是按照key值进行排序的 public static class Reduce extends Reducer<IntWritable,Text,IntWritable,Text>{ private static int i=0; public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException,InterruptedException{ for(Text val:values) { //Iterable迭代器 if(i<10) { i++; context.write(key, val); } } //System.out.println("reduceStart"); } } public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job=Job.getInstance(conf); //Job job =new Job(conf,""); job.setJarByClass(visitsSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setSortComparatorClass(Sort.class); //设置map的输出格式 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputpath=new Path(OUTPUT_PATH); Path inputpath=new Path(INPUT_PATH); FileInputFormat.addInputPath(job,inputpath ); FileOutputFormat.setOutputPath(job,outputpath); boolean flag = job.waitForCompletion(true); System.out.println(flag); System.exit(flag? 0 : 1); } }
排序结果:
31 2402 19 1309 18 3078 18 2801 16 5683 16 3369 16 1336 16 4018 15 11239 15 13098
原文地址:https://www.cnblogs.com/sengzhao666/p/11862763.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- setTimeout 是到了xx ms 就执行吗,了解浏览器的 Event-Loop 机制
- 使用ABAP的RTTI和Java反射机制访问static private属性
- ABAP面试问题 - 不使用加减乘除等操作比较两个整数大小
- SAP订单上Shipping抬头和行项目字段的持久化实现原理
- SAP CRM订单模型CRMD_SHIPPING的单元测试方法
- 给SAP WebClient UI的表格行项目增添PDF预览功能
- 如何将ABAP透明表的内容导入PostgreSQL数据库
- 使用代理模式改善SAP UI5应用的图片加载体验
- 如何使用ABAP open SQL的locator
- dotnet 新 SDK Style 项目格式如何使用 InternalsVisibleToAttribute 功能
- WPF dotnet core 如何开启 Pointer 消息的支持
- web Storage的特点
- 四、实现跨域访问
- hadoop集群搭建
- java和node.js使用md5算法实现对数据的加密与加盐操作