大数据:Hadoop-MapReduce练习

时间:2022-07-28
本文章向大家介绍大数据:Hadoop-MapReduce练习,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

MapReduce练习

写在之前

已经安装好hadoop 能上传文件到hdfs hadoop版本:2.7

项目依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.2</hadoop.version>
</properties>


<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-annotations</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-api</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

</dependencies>

统计

需求:有以下数据,对该数据统计每个单词的出现次数

hello word
hello page
123456 789
生如夏花 死如秋叶

mapper

public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] datas = line.split(" ");
        for (String data : datas) {
            context.write(new Text(data),new LongWritable(1));
        }
    }
}

reduce

public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws IOException, InterruptedException {
        String data = key.toString();
        long count = 0;

        for (LongWritable value : values) {
            count += value.get();
        }

        context.write(new Text(data),new LongWritable(count));
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    //设置启动类
    job.setJarByClass(App.class);
     //wordCount
       job.setMapperClass(WordCountMapper.class);
       job.setReducerClass(WordCountReducer.class);

       //设置输入输出类型
       job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(LongWritable.class);
     //设置读取输入路径
        FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
        job.waitForCompletion(true);

    }

数据去重

需求:有以下数据,对该数据进行去重处理

192.168.234.21
192.168.234.22
192.168.234.21
192.168.234.21
192.168.234.23
192.168.234.21
192.168.234.21
192.168.234.21
192.168.234.25
192.168.234.21
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.29
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.25
192.168.234.25
192.168.234.21
192.168.234.22
192.168.234.21

mapper

public class DistinctMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,Text,NullWritable>.Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

resure

public class DistinctReduce extends Reducer<Text,NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text,NullWritable,Text,NullWritable>.Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setMapperClass(DistinctMapper.class);
    job.setReducerClass(DistinctReduce.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setMapOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(Text.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);

}

求平均值

需求:有以下数据,求出他们的平均值

tom 69
tom 84
tom 68
jary 89
jary 90
jary 81
jary 35
rose 23
rose 100
rose 230

mapper

public class AvgMapper extends Mapper<LongWritable, Text,Text, FloatWritable> {
    @Override
    protected void map(LongWritable key, Text value,Mapper<LongWritable, Text,Text,FloatWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] datas = line.split(" ");
        String name = datas[0];
        int score = Integer.parseInt(datas[1]);

        context.write(new Text(name),new FloatWritable(score));
    }
}

reduce

public class AvgReduce extends Reducer<Text, FloatWritable,Text,FloatWritable> {
    @Override
    protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
        Integer i = 0;
        Float sums = 0f;

        for (FloatWritable value : values) {
            sums+=value.get();
            i+=1;
        }
        sums=sums/i;
        context.write(key,new FloatWritable(sums));
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setMapperClass(AvgMapper.class);
    job.setReducerClass(AvgReduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FloatWritable.class);
	job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);

}

求最大值

需求:假设我们需要处理一批有关天气的数据,其格式如下:

按照ASCII码存储,每行一条记录。每行共24个字符(包含符号在内)

第9、10、11、12字符为年份,第20、21、22、23字符代表温度,求每年的最高温度

2329999919500515070000
9909999919500515120022
9909999919500515180011
9509999919490324120111
6509999919490324180078
9909999919370515070001
9909999919370515120002
9909999919450515180001
6509999919450324120002
8509999919450324180078

mapper

public class MaxMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String str = value.toString();
        String year = str.substring(8,12);
        int temp = Integer.parseInt(str.substring(18,22));
        context.write(new Text(year),new IntWritable(temp));
    }
}

reduce

public class MaxReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable temp : values) {
            if (max<temp.get()){
                max = temp.get();
            }
        }
        context.write(key,new IntWritable(max));
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setMapperClass(MaxMapper.class);
    job.setReducerClass(MaxReduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);

}

统计流量

需求:有以下数据统计出每个用户的流量使用情况

12321445 zs bj 343
12321312 ww sh 234
12321445 zs bj 343
12321312 ww cd 234
12345678 zs cd 156

新建FlowPojo类

public class FlowPojo implements Writable {

    private String name;
    private String phone;
    private String addr;
    private long flow;

    public FlowPojo() {
    }

    public FlowPojo(String name, String phone, String addr, long flow) {
        this.name = name;
        this.phone = phone;
        this.addr = addr;
        this.flow = flow;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public String getAddr() {
        return addr;
    }

    public void setAddr(String addr) {
        this.addr = addr;
    }

    public long getFlow() {
        return flow;
    }

    public void setFlow(long flow) {
        this.flow = flow;
    }

    @Override
    public String toString() {
        return "[" +
                "name:'" + name + ''' +
                ", phone:'" + phone + ''' +
                ", addr:'" + addr + ''' +
                ", flow:" + flow +
                ']';
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(phone);
        dataOutput.writeUTF(addr);
        dataOutput.writeLong(flow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.name = dataInput.readUTF();
        this.phone = dataInput.readUTF();
        this.addr = dataInput.readUTF();
        this.flow = dataInput.readLong();
    }
}

mapper

public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowPojo> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String str = value.toString();
        String[] datas = str.split(" ");
        String phone = datas[0];
        String name = datas[1];
        String addr = datas[2];
        Long flow = Long.parseLong(datas[3]);
        FlowPojo flowPojo = new FlowPojo(name, phone, addr, flow);
        context.write(new Text(phone),flowPojo);
    }
}

reduce

public class FlowReduce extends Reducer<Text, FlowPojo,Text,FlowPojo> {
    @Override
    protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
        FlowPojo flowPojo = new FlowPojo();
        long flow = 0l;
        for (FlowPojo value : values) {
            flow+=value.getFlow();
            //判断名字是否赋值
            if (value.getName().isEmpty()){
                flowPojo.setName(value.getName());
                flowPojo.setPhone(value.getPhone());
                flowPojo.setAddr(value.getAddr());
            }
        }
        flowPojo.setFlow(flow);
        System.out.println(flow);
        context.write(key,flowPojo);
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setMapperClass(FlowMapper.class);
    job.setReducerClass(FlowReduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowMapper.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);

}

分区案例

需求:有以下数据,统计每个月的利润,按月分区

mapper

public class ProfitMapper extends Mapper<LongWritable, Text,Text, FloatWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split(" ");
        String month = datas[0];
        float profit = Float.parseFloat(datas[1]);
        context.write(new Text(month),new FloatWritable(profit));
    }
}

reduce

public class ProfitReduce extends Reducer<Text, FloatWritable,Text,FloatWritable> {
    @Override
    protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
        float count= 0f;
        for (FloatWritable value : values) {
            count+=value.get();
        }
        context.write(key,new FloatWritable(count));
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setNumReduceTasks(3);
    job.setMapperClass(ProfitMapper.class);
    job.setReducerClass(ProfitReduce.class);
	job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FloatWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);

}

mapper

public class FlowsMapper extends Mapper<LongWritable, Text,Text, FlowPojo> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split(" ");
        FlowPojo flowPojo = new FlowPojo();
        flowPojo.setPhone(datas[0]);
        flowPojo.setAddr(datas[1]);
        flowPojo.setName(datas[2]);
        flowPojo.setFlow(Long.parseLong(datas[3]));
        context.write(new Text(flowPojo.getName()),flowPojo);
    }
}

reduce

public class FlowsReduce extends Reducer<Text, FlowPojo, Text,FlowPojo> {
    @Override
    protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
        FlowPojo flowPojo = new FlowPojo();
        for (FlowPojo value : values) {
            flowPojo.setFlow(flowPojo.getFlow()+value.getFlow());
            flowPojo.setName(value.getName());
            flowPojo.setAddr(value.getAddr());
            flowPojo.setPhone(value.getPhone());
        }
        context.write(key,flowPojo);
    }
}

自定义分区

需求:有以下数据,按地区进行分区,做流量统计

13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987

FlowPartition

public class FlowPartition extends Partitioner<Text, FlowPojo> {
    @Override
    public int getPartition(Text text, FlowPojo flowPojo, int i) {
        if (flowPojo.getAddr().equals("bj")){
            return 0;
        }else if (flowPojo.getAddr().equals("sh")){
            return 1;
        }else if (flowPojo.getAddr().equals("sz")){
            return 2;
        }else {
            return 3;
        }
    }
}

mapper

public class FlowsMapper extends Mapper<LongWritable, Text,Text, FlowPojo> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split(" ");
        FlowPojo flowPojo = new FlowPojo();
        flowPojo.setPhone(datas[0]);
        flowPojo.setAddr(datas[1]);
        flowPojo.setName(datas[2]);
        flowPojo.setFlow(Long.parseLong(datas[3]));
        context.write(new Text(flowPojo.getName()),flowPojo);
    }
}

reduce

public class FlowsReduce extends Reducer<Text, FlowPojo, Text,FlowPojo> {
    @Override
    protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
        FlowPojo flowPojo = new FlowPojo();
        for (FlowPojo value : values) {
            flowPojo.setFlow(flowPojo.getFlow()+value.getFlow());
            flowPojo.setName(value.getName());
            flowPojo.setAddr(value.getAddr());
            flowPojo.setPhone(value.getPhone());
        }
        context.write(key,flowPojo);
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setPartitionerClass(FlowPartition.class);
    job.setNumReduceTasks(4);
    job.setMapperClass(FlowsMapper.class);
    job.setReducerClass(FlowsReduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowPojo.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowPojo.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);
}

分区求和

需求:有三个成绩文件,chinese.txt,english.txt,math.txt,计算每个人 三个月,每科的总成绩

chinese.txt

1 zhang 89
2 zhang 73
3 zhang 67
1 wang 49
2 wang 83
3 wang 27

english.txt

1 zhang 55
2 zhang 69
3 zhang 75
1 wang 44
2 wang 64
3 wang 86

math.txt

1 zhang 85
2 zhang 59
3 zhang 95
1 wang 74
2 wang 67
3 wang 96

新建StudentPojo

public class StudentPojo implements Writable {

    private String name;
    private int math;
    private int chinese;
    private int english;

    public StudentPojo() {
    }

    public StudentPojo(String name, int math, int chinese, int english) {
        this.name = name;
        this.math = math;
        this.chinese = chinese;
        this.english = english;
    }

    @Override
    public String toString() {
        return "[" +
                "name:'" + name + ''' +
                ", math:" + math +
                ", chinese:" + chinese +
                ", english:" + english +
                ']';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getMath() {
        return math;
    }

    public void setMath(int math) {
        this.math = math;
    }

    public int getChinese() {
        return chinese;
    }

    public void setChinese(int chinese) {
        this.chinese = chinese;
    }

    public int getEnglish() {
        return english;
    }

    public void setEnglish(int english) {
        this.english = english;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(name);
        dataOutput.writeInt(math);
        dataOutput.writeInt(chinese);
        dataOutput.writeInt(english);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
            this.name = dataInput.readUTF();
            this.math = dataInput.readInt();
            this.chinese = dataInput.readInt();
            this.english = dataInput.readInt();
    }
}

mapper

public class ScoreMapper extends Mapper<LongWritable, Text,Text, StudentPojo> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split(" ");
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String course = fileSplit.getPath().getName();//获取文件名
        StudentPojo studentPojo = new StudentPojo();
        studentPojo.setName(datas[1]);

        if ("math.txt".equals(course)){
            studentPojo.setMath(Integer.parseInt(datas[2]));
        }
        if ("chinese.txt".equals(course)){
            studentPojo.setChinese(Integer.parseInt(datas[2]));
        }
        if ("english.txt".equals(course)){
            studentPojo.setEnglish(Integer.parseInt(datas[2]));
        }
        context.write(new Text(datas[1]),studentPojo);
    }
}

reduce

public class ScoreReduce extends Reducer<Text, StudentPojo,Text,StudentPojo> {
    @Override
    protected void reduce(Text key, Iterable<StudentPojo> values, Context context) throws IOException, InterruptedException {
        StudentPojo studentPojo = new StudentPojo();
        studentPojo.setName(key.toString());
        for (StudentPojo value : values) {
            studentPojo.setMath(studentPojo.getMath()+value.getMath());
            studentPojo.setChinese(studentPojo.getChinese()+value.getChinese());
            studentPojo.setEnglish(studentPojo.getEnglish()+value.getEnglish());
        }
        context.write(key,studentPojo);
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setMapperClass(ScoreMapper.class);
    job.setReducerClass(ScoreReduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(StudentPojo.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(StudentPojo.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);
}

排序

需求:有以下数据,根据热度对电影进行降序排序

惊天破 72
机械师2 83
奇异博士 67
但丁密码 79
比利林恩的中场战事 94
侠探杰克:永不回头 68
龙珠Z:复活的弗利萨 79
长城 56
夺路而逃 69 
神奇动物在哪里 57
驴得水 68
我不是潘金莲 56
你的名字 77
大闹天竺 96
捉迷藏 78
凶手还未睡 23
魔发精灵 68
勇士之门 35
罗曼蒂克消亡史 67
小明和他的小伙伴们 36

MoviePojo

public class MoviePojo implements WritableComparable<MoviePojo> {
    private String name;
    private int hot;

    public MoviePojo() {
    }

    public MoviePojo(String name, int hot) {
        this.name = name;
        this.hot = hot;
    }

    @Override
    public String toString() {
        return "name:'" + name + ''' +
                ", hot:" + hot ;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getHot() {
        return hot;
    }

    public void setHot(int hot) {
        this.hot = hot;
    }

    @Override
    public int compareTo(MoviePojo o) {
        return o.getHot()-this.hot;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeUTF(this.name);
            dataOutput.writeInt(this.hot);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
            this.name = dataInput.readUTF();
            this.hot = dataInput.readInt();
    }
}

mapper

public class MovieMapper extends Mapper<LongWritable, Text,MoviePojo, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split(" ");
        MoviePojo moviePojo = new MoviePojo();
        moviePojo.setName(datas[0]);
        moviePojo.setHot(Integer.parseInt(datas[1]));
        context.write(moviePojo,NullWritable.get());
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setMapperClass(MovieMapper.class);
	job.setOutputKeyClass(MoviePojo.class);
	job.setOutputValueClass(NullWritable.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);
}

全排序统计

需求:有这样一组数字,要求利用3个reduce来处理,并且生成的三个结果文件,是整体有序的。

将1到100装入第一个分区,100到1000装入第二个分区,大于1000的装入第三个分区

82 239 231
23 22 213
123 232 124
213 3434 232
4546 565 123
231 231
2334 231
1123 5656 657
12313 4324 213
123 2 232 32
343 123 4535
12321 3442 453
1233 342 453
1231 322 452
232 343 455
3123 3434 3242

TotalsortPartition

public class TotalsortPartition extends Partitioner<LongWritable, IntWritable> {
    @Override
    public int getPartition(LongWritable longWritable, IntWritable intWritable, int i) {
        Long val = longWritable.get();
        if (val<100){
            return 0;
        }else if(val>=100 && val<1000){
            return 1;
        }else {
            return 2;
        }

    }
}

mapper

public class TotalsortMapper extends Mapper<LongWritable, Text,LongWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split(" ");
        for (String data : datas) {
            context.write(new LongWritable(Long.valueOf(data)),new IntWritable(1));
        }
    }
}

reduce

public class TotalsortReduce extends Reducer<LongWritable, IntWritable,LongWritable,IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count+=value.get();
        }
        context.write(key,new IntWritable(count));
    }
}

App

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
//设置启动类
	job.setJarByClass(App.class);
    job.setPartitionerClass(TotalsortPartition.class);
    job.setNumReduceTasks(3);
    job.setMapperClass(TotalsortMapper.class);
    job.setReducerClass(TotalsortReduce.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(IntWritable.class);
 //设置读取输入路径
    FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
    job.waitForCompletion(true);
}