103_MapReduce编程框架

时间:2021-08-08
本文章向大家介绍103_MapReduce编程框架,主要包括103_MapReduce编程框架使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1 MapReduce思想

MapReduce思想在生活中处处可见。我们或多或少都曾接触过这种思想。MapReduce的思想核心是分而治之,充分利用了并行处理的优势。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。

MapReduce任务过程是分为两个处理阶段:

  • Map阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。Map阶段的这些任务可以并行计算,彼此间没有依赖关系。
  • Reduce阶段:Reduce阶段的主要作用是“合”,即对map阶段的结果进行全局汇总。

2 官方WordCount案例源码解析

WordCount.java

package org.apache.hadoop.examples;

import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
  public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens())
      {
        this.word.set(itr.nextToken());
        context.write(this.word, one);
      }
    }
  }
  
  public static class IntSumReducer
    extends Reducer<Text, IntWritable, Text, IntWritable>
  {
    private IntWritable result = new IntWritable();
    
    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      this.result.set(sum);
      context.write(key, this.result);
    }
  }
  
  public static void main(String[] args)
    throws Exception
  {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2)
    {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

经过查看分析官方WordCount案例源码,我们发现一个统计单词数量的MapReduce程序的代码由三个部分组成:

  • Mapper类
  • Reducer类
  • 运行作业的代码(Driver)

Mapper类继承了org.apache.hadoop.mapreduce.Mapper类并重写了其中的map方法,Reducer类继承了org.apache.hadoop.mapreduce.Reducer类并重写了其中的reduce方法。

重写的Map方法作用:map方法中的逻辑 是用户希望mr程序中 map阶段如何处理的逻辑。

重写的Reduce方法作用:reduce方法中的逻辑 是用户希望mr程序中 reduce阶段如何处理的逻辑。

2.1 Hadoop序列化

什么是Hadoop序列化?

Hadoop序列指:当我们通过网络通信传输数据时 或者 把对象持久化到文件时,需要把对象序列化成二进制的结构的过程。

观察源码时发现自定义Mapper类与自定义Reducer类都有泛型类型约束,比如自定义Mapper有四个形
参类型,但是形参类型并不是常见的java基本类型。

为什么Hadoop要选择建立自己的序列化格式而不使用java自带的serializable呢?

  • 序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过RPC(远
    程过程调用:Remote Procedure Call)实现;RPC将消息序列化成二进制流发送到远程节点,远
    程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往往追求如下特点:
    • 紧凑:数据更紧凑,能充分利用网络带宽资源
    • 快速:序列化和反序列化的性能开销更低
  • Hadoop使用的是自己的序列化格式Writable,它比java的序列化serialization更紧凑且速度更快。一个对象使用Serializable序列化后,会携带很多额外信息比如校验信息、Header、继承体系等。

Java基本类型与Hadoop常用序列化类型:

Java基本类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
array ArrayWritable
map MapWritable

3 MapReduce编程规范及示例编写

3.1 Mapper类

  • 用户自定义一个Mapper类继承Hadoop的Mapper类
  • Mapper的输入数据是key-value对的形式(类型可以自定义)
  • Map阶段的业务逻辑定义在map()方法中
  • Mapper的输出数据是key-value对的形式(类型可以自定义)

注意:map()方法是对输入的一个key-value对调用一次!

3.2 Reducer类

  • 用户自定义Reducer类要继承Hadoop的Reducer类
  • Reducer的输入数据类型对应Mapper的输出数据类型(key-value对)
  • Reducer的业务逻辑写在reduce()方法中
  • Reduce()方法是对相同key的一组key-value对调用执行一次

3.3 Driver阶段

创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运行。

3.4 WordCount代码实现

3.4.1 需求分析

给定一个文本文件,统计文件中每一个单词出现的总次数

输入数据:wc.txt

输出:

apache 2
clickhouse 2
hadoop 1
mapreduce 1
spark 2
xiaoming 1

3.4.2 具体步骤

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

3.4.2.1 新建maven工程

  1. 导入hadoop依赖

pom.xml

    <dependencies>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>

    <!--maven打包插件 -->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>

                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意:以上依赖第一次需要联网下载!

  1. 添加log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.4.2.2 整体思路梳理(仿照源码)

Map阶段:

  1. map()方法中把传入的数据转为String类型
  2. 根据空格切分出单词
  3. 输出<单词,1>

Reduce阶段:

  1. 汇总各个key(单词)的个数,遍历value数据进行累加
  2. 输出key的总数

Driver

  1. 获取配置文件对象,获取job对象实例
  2. 指定程序jar的本地路径
  3. 指定Mapper/Reducer类
  4. 指定Mapper输出的kv数据类型
  5. 指定最终输出的kv数据类型
  6. 指定job处理的原始数据路径
  7. 指定job输出结果路径
  8. 提交作业

3.4.2.3 编写Mapper类

WordCountMapper.java

package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1 获取第一行
        String line = value.toString();

        //2 切割
        String[] words = line.split(" ");

        //3 输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

3.4.2.4 编写Reducer类

WordcountReducer.java

package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        //2 输出
        v.set(sum);
        context.write(key, v);
    }
}

3.4.2.5 编写Driver驱动类

WordcountDriver.java

package com.lagou.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2 设置jar加载路径
        job.setJarByClass(WordCountDriver.class);

        //3 设置map和reduce类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

3.4.2.6 运行任务

  1. 本地模式

在program arguments设置参数

运行结束,去输出结果路径查看结果

注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程
方式模拟的mr的运行。

  1. Yarn集群模式
  • (1)把程序打成jar包,改名为wc.jar,上传到Hadoop集群

找到Maven的默认配置文件路径

在settings.xml文件中配置阿里云的镜像

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">

    <!-- 配置本地仓库地址 -->
    <localRepository>C:\Users\海潮明月\.m2\repository</localRepository>

    <!-- 配置下载Jar包的镜像仓库地址 -->
    <mirrors>

        <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>uk</id>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://uk.maven.org/maven2/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>nexus</id>
            <name>internal nexus repository</name>
            <url>http://repo.maven.apache.org/maven2</url>
            <mirrorOf>central</mirrorOf>
        </mirror>
    </mirrors>

    <profiles>
        <!-- 下载源代码和Javadoc -->
        <profile>
            <id>downloadSources</id>
            <properties>
                <downloadSources>true</downloadSources>
                <downloadJavadocs>true</downloadJavadocs>
            </properties>
        </profile>
    </profiles>

    <!-- 激活在profiles的配置项 -->
    <activeProfiles>
        <activeProfile>downloadSources</activeProfile>
    </activeProfiles>

</settings>

settings.xml配置好以后,如果发现Maven的Plugins缺少包,可以重新导一下包

随后先清理一下

最后执行打包操作

打包成功

jar包的路径在这里

选择合适的jar包上传到服务器本地(如:linux121)

将原始数据文件上传导hdfs

  • (2)使用Hadoop 命令提交任务运行

    hadoop jar wc.jar com.lagou.mr.wc.WordCountDriver /wc_input/wc.txt /wc_output
    

Yarn集群任务运行成功展示图

4 序列化Writable接口

基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对
象就需要实现Writable序列化接口。

4.1 实现Writable序列化步骤如下

  1. 必须实现Writable接口

  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造

    public CustomBean() {
      super();
    }
    
  3. 重写序列化方法

    @Override
    public void write(DataOutput out) throws IOException {
        ....
    }
    
  4. 重写反序列化方法

    @Override
    public void readFields(DataInput in) throws IOException {
        ....
    }
    
  5. 反序列化的字段顺序和序列化字段的顺序必须完全一致

  6. 为了方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符

  7. 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序!!

    @Override
    public int compareTo(CustomBean o) {
        // 自定义排序规则
        return this.num > o.getNum() ? -1 : 1;
    }
    

4.2 Writable接口案例需求

4.2.1 需求分析

统计每台智能音箱设备内容播放时长。

原始日志格式:

001   001577c3  kar_890809     120.196.100.99     1116             954             200
日志id  设备id  appkey(合作硬件厂商)  网络ip        自有内容时长(秒)  第三方内容时长(秒)   网络状态码

输出结果:

001577c3     11160            9540         20700
设备id    自有内容时长(秒)  第三方内容时长(秒)   总时长

4.2.2 编写MapReduce程序

  1. 创建SpeakBean对象

    SpeakBean.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * SpeakBean是一个自定义类型,
     * 目的是封装map输出kv中value的类型,
     * 需要实现Writable序列化接口
     */
    public class SpeakBean implements Writable {
        //定义属性
        private Long selfDuration;      //自由内容时长
        private Long thirdPartDuration; //第三方内容时长
        private String deviceId;        //设备id
        private Long sumDuration;       //总时长(=自由内容时长+第三方内容时长)
    
        //准备一个空参构造方法
        public SpeakBean() {
        }
    
        //准备一个有参构造方法
        public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) {
            this.selfDuration = selfDuration;
            this.thirdPartDuration = thirdPartDuration;
            this.deviceId = deviceId;
            this.sumDuration = this.selfDuration + this.thirdPartDuration; //总时长=自由内容时长+第三方内容时长
        }
    
        /**
         * 序列化方法:将目标数据封装到dataOutput中
         *
         * @param dataOutput
         * @throws IOException
         */
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(selfDuration);      //long类型用writeLong方法封装
            dataOutput.writeLong(thirdPartDuration); //long类型用writeLong方法封装
            dataOutput.writeUTF(deviceId);           //String类型用writeUTF方法封装
            dataOutput.writeLong(sumDuration);       //long类型用writeLong方法封装
        }
    
        /**
         * 反序列化方法:通过dataInput读取序列化后的目标数据
         *
         * @param dataInput
         * @throws IOException
         */
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            this.selfDuration = dataInput.readLong();      //long类型用readLong方法读取
            this.thirdPartDuration = dataInput.readLong(); //long类型用readLong方法读取
            this.deviceId = dataInput.readUTF();           //String类型用readUTF方法读取
            this.sumDuration = dataInput.readLong();       //long类型用readLong方法读取
        }
    
        public Long getSelfDuration() {
            return selfDuration;
        }
    
        public void setSelfDuration(Long selfDuration) {
            this.selfDuration = selfDuration;
        }
    
        public Long getThirdPartDuration() {
            return thirdPartDuration;
        }
    
        public void setThirdPartDuration(Long thirdPartDuration) {
            this.thirdPartDuration = thirdPartDuration;
        }
    
        public String getDeviceId() {
            return deviceId;
        }
    
        public void setDeviceId(String deviceId) {
            this.deviceId = deviceId;
        }
    
        public Long getSumDuration() {
            return sumDuration;
        }
    
        public void setSumDuration(Long sumDuration) {
            this.sumDuration = sumDuration;
        }
    
        /**
         * 为了方便查看,需重写toString方法:用制表符分隔每一列数据
         *
         * @return 返回SpeakBean对象的私有属性
         */
        @Override
        public String toString() {
            return selfDuration + "\t"
                    + thirdPartDuration + "\t"
                    + deviceId + "\t"
                    + sumDuration;
        }
    }
    
  2. 编写Mapper类

    SpeakMapper.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 一共四个参数:分为两对kv
     * 第一对kv是map输入参数的kv类型:k代指一行文本的偏移量,v代指一行文本内容
     * 第二对kv是map输出参数的kv类型:k代指map输出的key类型,v代指map输出的value类型
     */
    public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
        /*
        1 转换接收到的text数据为String类型
        2 按照制表符进行切分:自由内容时长 第三方内容时长 设备id  -->  将目标数据封装为自定义类型SpeakBean
        3 直接输出:k-->设备id  value-->SpeakBean对象
         */
        Text device_id = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //1 转换接收到的text数据为String类型
            final String line = value.toString();
    
            //2 按照制表符进行切分,获取目标数据:自由内容时长 第三方内容时长 设备id
            final String[] fields = line.split("\t");      //切分输入的整行数据
            String selfDuration = fields[fields.length - 3];     //自由内容时长
            String thirdPartDuration = fields[fields.length - 2];//第三方内容时长
            String deviceId = fields[1];                         //设备id
            //2.1 将目标数据封装到自定义Bean对象中
            final SpeakBean speakBean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId);
    
            //3 直接输出:k-->设备id  value-->SpeakBean对象
            device_id.set(deviceId);
            context.write(device_id, speakBean);
        }
    }
    
  3. 编写Reducer

    SpeakReducer.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    import javax.xml.soap.Text;
    import java.io.IOException;
    
    public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> {
        @Override
        protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException {
            //定义时长累加的初始值
            Long self_duration = 0L;
            Long third_part_duration = 0L;
    
            //reduce方法的key:map输出的某一个key
            //reduce方法的value:map输出的kv对中,key相同的value组成的一个集合
            //reduce逻辑:遍历迭代器累加时长,得到各自的总时长
            for (SpeakBean speakBean : values) {
                final Long selfDuration = speakBean.getSelfDuration();
                final Long thirdPartDuration = speakBean.getThirdPartDuration();
                self_duration += selfDuration;
                third_part_duration += third_part_duration;
            }
    
            //将目标数据封装成SpeakBean对象进行输出
            final SpeakBean bean = new SpeakBean(self_duration, third_part_duration, key.toString());
            context.write(key, bean);
        }
    }
    
  4. 编写驱动

    SpeakDriver.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class SpeakDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            final Configuration conf = new Configuration();
            final Job job = Job.getInstance(conf, "SpeakDriver");
    
            //设置jar包本地路径
            job.setJarByClass(SpeakDriver.class);
    
            //使用的mapper和reducer
            job.setMapperClass(SpeakMapper.class);
            job.setReducerClass(SpeakReducer.class);
    
            //map的输出kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(SpeakBean.class);
    
            //设置reduce输出
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(SpeakBean.class);
    
            //读取的数据路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //提交任务
            final boolean flag = job.waitForCompletion(true);
            System.exit(flag ? 0 : 1);
        }
    }
    

mr编程技巧总结

  • 结合业务设计Map输出的key和v,利用key相同则去往同一个reduce的特点
  • map()方法中获取到只是一行文本数据尽量不做聚合运算
  • reduce()方法的参数要清楚含义

5 MapReduce原理分析

5.1 MapTask运行机制详解

MapTask流程

详细步骤:

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

  2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

  3. 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。

  4. map逻辑执行完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

  • MapReduce提供的Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key进行hash后再以reduce task数量取余。默认的取余方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
  1. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
  • 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

  • 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

  1. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为

    • 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
    • 那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
  2. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

至此map整个阶段结束!

MapTask的一些配置:

官方参考地址:https://hadoop.apache.org/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

5.2 MapTask的并行度

  1. MapTask并行度思考

    MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。

    思考:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

  2. MapTask并行度决定机制

    • 数据块:Block是HDFS物理上把数据分成一块一块。
    • 切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

5.2.1 切片机制源码阅读

切片机制默认就是128M

问:MapTask并行度是不是越多越好呢?

答:不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split。

MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分
片,一个是128M,一个是1M;

对于1M的切片的Map task来说,太浪费资源。

问:129M的文件在Hdfs存储的时候会不会切成两块?

答:在hdfs存储时会将129M的文件切分为两个block块128+1进行存储

5.3 ReduceTask 工作机制

Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMergeronDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,是纯粹的sort阶段,sort完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

详细步骤:

  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
  • Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
  • 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
  • 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

5.4 ReduceTask并行度

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

注意事项:

  1. ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
  2. ReduceTask数量不设置默认就是一个,输出文件数量为1个;
  3. 如果数据分布不均匀,可能在Reduce阶段产生倾斜;

5.5 Shuffle机制

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫
shuffle。

shuffle:洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)

5.5.1 MapReduce的分区与reduceTask的数量

在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据,

如何才能保证相同key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。结合以上原理分析我们知道MR程序shuffle机制默认就是这种规则!!

1 分区源码

翻阅源码验证以上规则,MR程序默认使用的HashPartitioner,保证了相同的key去往同个分区!!

2 自定义分区

实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以
及分区数量!!

如何制定自己需要的分区规则?

具体步骤:

  1. 自定义类继承Partitioner,重写getPartition()方法
  2. 在Driver驱动中,指定使用自定义Partitioner
  3. 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量

实例:按照不同的appkey把记录输出到不同的分区中

  • 原始日志格式:
001   001577c3  kar_890809     120.196.100.99     1116             954             200
日志id  设备id  appkey(合作硬件厂商)  网络ip        自有内容时长(秒)  第三方内容时长(秒)   网络状态码
  • 输出结果:
根据appkey把不同厂商的日志数据分别输出到不同的文件中
  • 需求分析:

    • 面对业务需求,结合mr的特点,来设计map输出的kv,以及reduce输出的kv数据。
    • 一个ReduceTask对应一个输出文件,因为在shuffle机制中每个reduceTask拉取的都是某一个分区的数
      据,一个分区对应一个输出文件。
    • 结合appkey的前缀相同的特点,同时不能使用默认分区规则,而是使用自定义分区器,只要appkey前
      缀相同则数据进入同个分区。
  • 整体思路:

    • Mapper
      1. 读取一行文本,按照制表符切分
      2. 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)
      3. 设计map()输出的kv,key-->appkey(依靠该字段完成分区),PartitionBean对象作为Value输出
    • Partition
      • 自定义分区器,实现按照appkey字段的前缀来区分所属分区
    • Reduce
      1. reduce()正常输出即可,无需进行聚合操作
    • Driver
      1. 在原先设置job属性的同时增加设置使用自定义分区器
      2. 注意设置ReduceTask的数量(与分区数量保持一致)

PartitionBean.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PartitionBean implements Writable {
    private String id;              //日志id
    private String deviceId;        //设备id
    private String appkey;          //appkey厂商id
    private String ip;              //ip地址
    private Long selfDuration;      //自有内容播放时长
    private Long thirdPartDuration; //第三方内容时长
    private String status;          //状态码

    public PartitionBean() {
    }

    public PartitionBean(String id, String deviceId, String appkey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
        this.id = id;
        this.deviceId = deviceId;
        this.appkey = appkey;
        this.ip = ip;
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.status = status;
    }

    //序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(deviceId);
        dataOutput.writeUTF(appkey);
        dataOutput.writeUTF(ip);
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeUTF(status);
    }

    //反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readUTF();
        this.deviceId = dataInput.readUTF();
        this.appkey = dataInput.readUTF();
        this.ip = dataInput.readUTF();
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.status = dataInput.readUTF();
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public String getAppkey() {
        return appkey;
    }

    public void setAppkey(String appkey) {
        this.appkey = appkey;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Long getSelfDuration() {
        return selfDuration;
    }

    public void setSelfDuration(Long selfDuration) {
        this.selfDuration = selfDuration;
    }

    public Long getThirdPartDuration() {
        return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
        this.thirdPartDuration = thirdPartDuration;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return id + '\t'
                + deviceId + '\t'
                + appkey + '\t'
                + ip + '\t'
                + selfDuration + '\t'
                + thirdPartDuration + '\t'
                + status;
    }

}

PartitionMapper.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
    final PartitionBean bean = new PartitionBean();
    final Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String[] fields = value.toString().split("\t");
        String appkey = fields[2];

        bean.setId(fields[0]);
        bean.setDeviceId(fields[1]);
        bean.setAppkey(fields[2]);
        bean.setIp(fields[3]);
        bean.setSelfDuration(Long.parseLong(fields[4]));
        bean.setThirdPartDuration(Long.parseLong(fields[5]));
        bean.setStatus(fields[6]);

        k.set(appkey);
        context.write(k, bean);
    }
}

CustomPartitioner.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
    @Override
    public int getPartition(Text text, PartitionBean partitionBean, int countOfPartitions) {
        int partition = 0;

        if (text.toString().equals("kar")) {
            partition = 0;
        } else if (text.toString().equals("pandora")) {
            partition = 1;
        } else {
            partition = 2;
        }
        return partition;
    }
}

PartitionReducer.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
    @Override
    protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
        //无需进行聚合运算,直接输出即可
        for (PartitionBean bean : values) {
            context.write(key, bean);
        }
    }
}

PartitionDriver.java

package com.lagou.mr.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取配置文件
        final Configuration configuration = new Configuration();
        //2 获取job实例
        final Job job = Job.getInstance(configuration);

        //3 设置任务相关参数
        job.setJarByClass(PartitionDriver.class);
        job.setMapperClass(PartitionMapper.class);
        job.setReducerClass(PartitionReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PartitionBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PartitionBean.class);

        //4 设置使用自定义分区器
        job.setPartitionerClass(CustomPartitioner.class);

        //5 指定reducetask的数量与分区数量保持一致,分区数量是3
        job.setNumReduceTasks(3);//reducetask不设置默认是1个

        //6 指定输入和输出数据路径
        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\speak.data"));
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\partition\\out"));

        //7 提交任务
        final boolean flag = job.waitForCompletion(true);

        System.exit(flag ? 0 : 1);
    }
}

5.5.2 MapReduce中的Combiner

combiner运行机制:

  1. Combiner是MR程序中Mapper和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer
  3. Combiner和reducer的区别在于运行的位置
  4. Combiner是在每一个maptask所在的节点运行;
  5. Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
  6. Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
#### 举例说明:
假设一个计算平均值的MR任务
Map阶段:2个MapTask
	MapTask1输出数据:10,5,15 如果使用Combiner:(10+5+15)/3=10
	MapTask2输出数据:2,6     如果使用Combiner:(2+6)/2=4
Reduce阶段汇总:(10+4/2=7
而正确结果应该是:(10+5+15+2+6/5=7.6

## 所以Combiner不能用于计算平均值
  • 自定义Combiner实现步骤
    • 自定义一个Combiner继承Reducer,重写Reduce方法
    • 在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)

1 改造WordCount程序

WordCountCombiner.java

package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, NullWritable, IntWritable> {
    final IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int num = 0;
        //进行局部汇总,逻辑与reduce方法保持一致
        for (IntWritable value : values) {
            final int i = value.get();
            num += 1;
        }

        total.set(num);
        context.write(NullWritable.get(), total);
    }
}

WordCountDriver.java

在驱动(Driver)设置使用Combiner:

job.setCombinerClass(WordcountCombiner.class);

验证结果:

观察程序运行日志

如果直接使用WordCountReducer作为Combiner使用是否可以?

直接使用Reducer作为Combiner组件来使用是可以的!

5.6 MapReduce中的排序

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

  • MapTask
    • 它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,
    • 溢写完毕后,它会对磁盘上所有文件进行归并排序。
  • ReduceTask
    • 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
  1. 部分排序

    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

  2. 全排序

    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

  3. 辅助排序:( GroupingComparator分组)
    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

  4. 二次排序.
    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

5.6.1 WritableComparable

Bean对象如果作为Map输出的key时,需要实现WritableComparable接口并重写compareTo方法指定
排序规则

1 全排序

基于统计的播放时长案例的输出结果对总时长进行排序

实现全局排序只能设置一个ReduceTask!!

播放时长案例输出结果:

00fdaf3 33180 33420 00fdaf3 66600
00wersa4 30689 35191 00wersa4 65880
0a0fe2 43085 44254 0a0fe2 87339
0ad0s7 31702 29183 0ad0s7 60885
0sfs01 31883 29101 0sfs01 60984
a00df6s 33239 36882 a00df6s 70121
adfd00fd5 30727 31491 adfd00fd5 62218

需求分析

如何设计map()方法输出的key,value

MR框架中shuffle阶段的排序是默认行为,不管你是否需要都会进行排序。

key:把所有字段封装成为一个bean对象,并且指定bean对象作为key输出,如果作为key输出,需要实现排序接口,指定自己的排序规则;

具体步骤:

  • Mapper

    1. 读取结果文件,按照制表符进行切分
    2. 解析出相应字段封装为SpeakBean
    3. SpeakBean实现WritableComparable接口重写compareTo方法
    4. map()方法输出kv;key-->SpeakBean,value-->NullWritable.get()
  • Reducer

    1. 循环遍历输出

SpeakBeanSort.java

package com.lagou.mr.sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class SpeakBeanSort implements WritableComparable<SpeakBeanSort> {
    //定义属性
    private Long selfDrutation;     //自有内容播放时长
    private Long thirdPartDuration; //第三方内容播放时长
    private String deviceId;        //设备id
    private Long sumDuration;       //总时长

    public SpeakBeanSort() {
    }

    public SpeakBeanSort(Long selfDrutation, Long thirdPartDuration, String deviceId, Long sumDuration) {
        this.selfDrutation = selfDrutation;
        this.thirdPartDuration = thirdPartDuration;
        this.deviceId = deviceId;
        this.sumDuration = sumDuration;
    }

    public Long getSelfDrutation() {
        return selfDrutation;
    }

    public void setSelfDrutation(Long selfDrutation) {
        this.selfDrutation = selfDrutation;
    }

    public Long getThirdPartDuration() {
        return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
        this.thirdPartDuration = thirdPartDuration;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public Long getSumDuration() {
        return sumDuration;
    }

    public void setSumDuration(Long sumDuration) {
        this.sumDuration = sumDuration;
    }

    //序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(selfDrutation);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeUTF(deviceId);
        dataOutput.writeLong(sumDuration);
    }

    //反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.selfDrutation = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.deviceId = dataInput.readUTF();
        this.sumDuration = dataInput.readLong();
    }

    /**
     * 指定排序规则,我们希望按照总时长进行排序
     *
     * @param o
     * @return 返回值有三种:0相等 1小于 -1大于
     */
    @Override
    public int compareTo(SpeakBeanSort o) {
        System.out.println("compareTo 方法执行了...");
        //指定按照bean对象的总时长字段的值进行比较
        if (this.sumDuration > o.sumDuration) {
            return -1;
        } else if (this.sumDuration < o.sumDuration) {
            return 1;
        } else {
            return 0; //如果相等,可以在这里加入第二个判断条件,进行二次排序
        }
    }

    @Override
    public boolean equals(Object o) {
        System.out.println("equals方法执行了...");
        return super.equals(o);
    }

    @Override
    public int hashCode() {
        return Objects.hash(selfDrutation, thirdPartDuration, deviceId, sumDuration);
    }

    @Override
    public String toString() {
        return selfDrutation + "\t"
                + thirdPartDuration + "\t"
                + deviceId + '\'' + "\t"
                + sumDuration;
    }
}

SortMapper.java

package com.lagou.mr.sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text, SpeakBeanSort, NullWritable> {
    final SpeakBeanSort beanSort = new SpeakBeanSort();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //读取一行文本,转为字符串,切分
        final String[] fields = value.toString().split("\t");

        //解析出各个字段封装成SpeakBean对象
        beanSort.setDeviceId(fields[0]);
        beanSort.setSelfDrutation(Long.parseLong(fields[1]));
        beanSort.setThirdPartDuration(Long.parseLong(fields[2]));
        beanSort.setSumDuration(Long.parseLong(fields[4]));

        //SpeakBeanSort作为key输出
        context.write(beanSort, NullWritable.get());
    }
}

SortReducer.java

package com.lagou.mr.sort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<SpeakBeanSort, NullWritable, SpeakBeanSort, NullWritable> {
    @Override
    protected void reduce(SpeakBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //为了避免前面compareTo方法导致总流量相等被当成对象相等,而合并了key,所以遍历values获取每个key(bean对象)
        for (NullWritable value : values) {
            context.write(key, value);
        }
    }
}

SortDriver.java

package com.lagou.mr.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
         /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
         */

        final Configuration configuration = new Configuration();
        final Job job = Job.getInstance(configuration, "SortDriver");

        job.setJarByClass(SortDriver.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(SpeakBeanSort.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(SpeakBeanSort.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job, new Path(""));
        FileOutputFormat.setOutputPath(job, new Path(""));

        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

总结

  1. 自定义对象作为Map的key输出时,需要实现WritableComparable接口,排序:重写compareTo()方法,序列化以及反序列化方法
  2. 再次理解reduce()方法的参数;reduce()方法是map输出的kv中key相同的kv中的v组成一个集合调用一次reduce()方法,选择遍历values得到所有的key
  3. 默认reduceTask数量是1个
  4. 对于全局排序需要保证只有一个reduceTask

2 分区排序(默认的分区规则,区内有序)

5.6.2 GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。

  1. 需求

    原始数据

    订单id 商品id 成交金额
    Order_0000001 Pdt_01 222.8
    Order_0000001 Pdt_05 25.8
    Order_0000002 Pdt_03 522.8
    Order_0000002 Pdt_04 122.4
    Order_0000002 Pdt_05 722.4
    Order_0000003 Pdt_01 232.8

    需要求出每一个订单中成交金额最大的一笔交易。

  2. 实现思路

    Mapper

    • 读取一行文本数据,切分出每个字段;
    • 订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单id相等再按照金额降序排;
    • map()方法输出kv;key-->bean对象,value-->NullWritable.get();

    Shuffle

    • 指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)

    • 指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;

    Reduce

    • 每个reduce()方法写出一组key的第一个

参考代码:

OrderBean

  • OrderBean定义两个字段,一个字段是orderId,第二个字段是金额(注意金额一定要使用Double或者DoubleWritable类型,否则没法按照金额顺序排序)。排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排!!

OrderBean.java

package com.lagou.mr.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId; //订单id
    private Double price;   //金额

    public OrderBean() {
    }

    public OrderBean(String orderId, Double price) {
        this.orderId = orderId;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }


    /**
     * 序列化
     *
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeDouble(price);
    }

    /**
     * 反序列化
     *
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }


    /**
     * 指定排序规则,先按照订单id比较在按照金额比较,按照金额降序排序
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        int res = -this.price.compareTo(o.getPrice());
        System.out.println(res);
        return res;
    }

    /**
     * 重写toString方法
     *
     * @return
     */
    @Override
    public String toString() {
        return orderId + '\t' + price;
    }
}

自定义分区器

  • 保证ID相同的订单去往同个分区最终去往同一个Reduce中

CustomPartitioner.java

package com.lagou.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
        //订单id相同的数据进入同个分区
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

自定义GroupingComparator

  • 保证id相同的订单进入一个分组中,进入分组的数据已经是按照金额降序排序。reduce()方法取出第一个即是金额最高的交易

CustomGroupingComparator.java

package com.lagou.mr.group;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator {
    public CustomGroupingComparator() {
        super(OrderBean.class, true);
    }

    //重写其中的compare方法,通过这个方法来让mr接受 orderid相同则两个对象相等的规则,key相等


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //a和b是orderbean对象
        //比较两个对象的orderid
        final OrderBean o1 = (OrderBean) a;
        final OrderBean o2 = (OrderBean) b;
        final int i = o1.getOrderId().compareTo(o2.getOrderId());
        return i; // 0 1 -1
    }
}

Mapper

GroupMapper.java

package com.lagou.mr.group;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    OrderBean bean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String[] fields = value.toString().split("\t");
        //订单id与金额封装为一个orderBean对象
        bean.setOrderId(fields[0]);
        bean.setPrice(Double.parseDouble(fields[2]));
        context.write(bean, NullWritable.get());
    }
}

Reducer

GroupReducer.java

package com.lagou.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    //key:reduce方法的key注意是一组相同的kv的第一个key作为传入reduce方法的key,因为我们已经指定了排序的规则
    //按照金额降序排列,则第一个key就是金额最大的交易数据
    //value:一组相同的key的kv对中v的集合

    //对于如何判断key是否相同,自定义对象是需要我们指定一个规则,这个规则通过GroupingComparator来指定

    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //直接输出key就是金额最大的交易
        context.write(key, NullWritable.get());
    }
}

Driver

GroupDriver.java

package com.lagou.mr.group;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
        */

        //1. 获取配置文件对象,获取job对象实例
        final Configuration configuration = new Configuration();
        final Job job = Job.getInstance(configuration, "GroupDriver");

        //2. 指定程序jar的本地路径
        job.setJarByClass(GroupDriver.class);

        //3. 指定Mapper/Reducer类
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);

        //4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //指定分区器
        job.setPartitionerClass(CustomPartitioner.class);
        //指定使用groupingcomparator
        job.setGroupingComparatorClass(CustomGroupingComparator.class);

        //6. 指定job处理的原始数据路径
        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\GroupingComparator"));

        //7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\group\\out"));

        //指定reducetask的数量,不要使用默认的一个,分区效果不明显
        job.setNumReduceTasks(2);

        //8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

5.7 MapReduce Join实战

5.7.1 MR reduce端join

1.1 需求分析

需求:

投递行为数据表deliver_info:

userId positionId date
1001 177725422 2020-01-03
1002 177725422 2020-01-04
1003 177725433 2020-01-03

职位表position:

id positionName
177725422 产品经理
177725433 大数据开发工程师

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算

1.2 代码实现

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

Bean

DeliverBean.java

package com.lagou.mr.reduce_join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DeliverBean implements Writable {
    private String userId;
    private String positionId;
    private String date;
    private String positionName;
    //判断是投递数据还是职位数据标识
    private String flag;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getPositionId() {
        return positionId;
    }

    public void setPositionId(String positionId) {
        this.positionId = positionId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPositionName() {
        return positionName;
    }

    public void setPositionName(String positionName) {
        this.positionName = positionName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    /**
     * 序列化
     *
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(userId);
        dataOutput.writeUTF(positionId);
        dataOutput.writeUTF(date);
        dataOutput.writeUTF(positionName);
        dataOutput.writeUTF(flag);
    }

    /**
     * 反序列化
     *
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.userId = dataInput.readUTF();
        this.positionId = dataInput.readUTF();
        this.date = dataInput.readUTF();
        this.positionName = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    @Override
    public String toString() {
        return userId + '\t' +
                positionId + '\t' +
                date + '\t' +
                positionName + '\t' +
                flag;
    }
}
Mapper

ReduceJoinMapper.java

package com.lagou.mr.reduce_join;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/*
输出kv类型:
    k: positionId
    v: deliverBean
 */
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, DeliverBean> {
    String name = "";
    Text k = new Text();
    //读取的是投递行为数据
    DeliverBean bean = new DeliverBean();

    /**
     * map任务启动时初始化执行一次
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit inputSplit = context.getInputSplit();
        FileSplit split = (FileSplit) inputSplit;
        name = split.getPath().getName();
        System.out.println("context name: " + name);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] arr = line.split("\t");

        if (name.startsWith("deliver_info")) {
            //读取的是投递行为数据表
            bean.setUserId(arr[0]);
            bean.setPositionId(arr[1]);
            bean.setDate(arr[2]);
            bean.setPositionName("");
            bean.setFlag("deliver");
        } else {
            //读取的是职位表
            bean.setUserId("");
            bean.setPositionId(arr[0]);
            bean.setDate("");
            bean.setPositionName(arr[1]);
            bean.setFlag("position");
        }
        k.set(bean.getPositionId());
        context.write(k, bean);
    }
}
Reducer

ReduceJoinReducer.java

package com.lagou.mr.reduce_join;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class ReduceJoinReducer extends Reducer<Text, DeliverBean, DeliverBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<DeliverBean> values, Context context) throws IOException, InterruptedException {
        ArrayList<DeliverBean> deBeans = new ArrayList<>();//相同position的bean对象放在一起(1个职位数据,n个投递行为数据)
        DeliverBean positionBean = new DeliverBean();
        for (DeliverBean bean : values) {
            String flag = bean.getFlag();
            if (flag.equalsIgnoreCase("deliver")) { //投递行为数据
                //此处不能直接把bean对象添加到debeans中,需要将其深度拷贝到newBean中才行
                DeliverBean newBean = new DeliverBean();
                try {
                    BeanUtils.copyProperties(newBean, bean);
                    deBeans.add(newBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    BeanUtils.copyProperties(positionBean, bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        for (DeliverBean bean : deBeans) {
            bean.setPositionName(positionBean.getPositionName());
            context.write(bean, NullWritable.get());
        }
    }
}
Driver

ReduceJoinDriver.java

package com.lagou.mr.reduce_join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final Configuration configuration = new Configuration();
        final Job job = Job.getInstance(configuration, "ReduceJoinDriver");

        job.setJarByClass(ReduceJoinDriver.class);
        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DeliverBean.class);

        job.setOutputKeyClass(DeliverBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\ReduceJoin"));
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\ReduceJoin\\out"));

        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端join实现方式

5.7.2 MR map端join

2.1 需求分析

适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

2.2 代码实现

  • 在Mapper的setup阶段,将文件读取到缓存集合中

  • 在驱动函数中加载缓存

    // 缓存普通文件到Task运行节点
    job.addCacheFile(new URI("file:///e:/cache/position.txt"));
    
DeliverBean

DeliverBean.java

package com.lagou.mr.map_join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DeliverBean implements Writable {
    private String userId;
    private String positionId;
    private String date;
    private String positionName;
    //判断是投递数据还是职位数据标识
    private String flag;

    public DeliverBean() {
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getPositionId() {
        return positionId;
    }

    public void setPositionId(String positionId) {
        this.positionId = positionId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPositionName() {
        return positionName;
    }

    public void setPositionName(String positionName) {
        this.positionName = positionName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    /**
     * 序列化
     *
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(userId);
        dataOutput.writeUTF(positionId);
        dataOutput.writeUTF(date);
        dataOutput.writeUTF(positionName);
        dataOutput.writeUTF(flag);
    }

    /**
     * 反序列化
     *
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.userId = dataInput.readUTF();
        this.positionId = dataInput.readUTF();
        this.date = dataInput.readUTF();
        this.positionName = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    @Override
    public String toString() {
        return userId + '\t' +
                positionId + '\t' +
                date + '\t' +
                positionName + '\t' +
                flag;
    }
}
Mapper

MapJoinMapper.java

package com.lagou.mr.map_join;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;

/*
使用map端join完成投递行为与与职位数据的关联
 map端缓存所有的 职位数据(关联表)
 map方法读取的文件数据是 投递行为数据(主表)
 基于投递行为数据的positionId去缓存中查询出positionName,输出即可
 这个job中无需reducetask,setnumreducetask为0
*/
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap<String, String> hashMap = new HashMap<>();
    Text k = new Text();

    /**
     * 加载职位数据
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //读取缓存文件
        InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream("position.txt"), "UTF-8");
        BufferedReader reader = new BufferedReader(inputStreamReader);

        //读取职位数据解析为kv类型(hashmap): k -- positionId, value -- positionName
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            String[] fields = line.split("\t");
            hashMap.put(fields[0], fields[1]);
        }
        reader.lines();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] arr = line.split("\t");
        String positionName = hashMap.get(arr[1]);
        k.set(line + "\t" + positionName);
        context.write(k, NullWritable.get());
    }
}
Driver

MapJoinDriver.java

package com.lagou.mr.map_join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {

        final Configuration conf = new Configuration();
        final Job job = Job.getInstance(conf, "MapJoinDriver");

        job.setJarByClass(MapJoinDriver.class);
        job.setMapperClass(MapJoinMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\ReduceJoin\\deliver_info.txt")); //指定读取数据的原始路径
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\MapJoin\\out")); //指定结果数据输出路径

        //设置加载缓存文件
        job.addCacheFile(new URI("file:///H:/hadoop/learningCode/mapreduce/wordcount/input/ReduceJoin/position.txt"));

        //设置reducetask数量为0
        job.setNumReduceTasks(0);

        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);//jvm退出:正常退出0,非0值则是错误退出
    }
}

5.7.3 数据倾斜解决方案

  • 什么是数据倾斜?

    • 数据倾斜无非就是大量的相同key被partition分配到一个分区里。
  • 现象

    • 绝大多数task执行得都非常快,但个别task执行的极慢。甚至失败!
  • 通用解决方案:

    • 对key增加随机数

5.8 MapReduce读取和输出数据

5.8.1 InputFormat

运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
InputFormat是MapReduce框架用来读取数据的类。

InputFormat常见子类包括:

  • TextInputFormat(普通文本文件,MR框架默认的读取实现类型)
  • KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
  • NLineInputF ormat(读取数据按照行数进行划分分片)
  • CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
  • 自定义InputFormat
  1. CombineTextInputFormat案例

MR框架默认的TextInputFormat切片机制按文件划分切片,文件无论大小,都是单独一个切片,
然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个
MapTask处理的数据量很小,大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用
率不高。

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切
片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。

需求:将输入数据中的多个小文件合并为一个切片处理。
准备:运行WordCount案例,准备多个小文件
具体使用方式:

//如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4MB
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

随后验证切片数量的变化。

CombineTextInputFormat切片原理:

  • 切片生成过程分为两部分:虚拟存储过程和切片过程
    假设设置setMaxInputSplitSize值为4MB
    四个小文件分别是:1.txt-->2M 2.txt-->7M 3.txt-->0.3M 4.txt--->8.2M
    • 虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比
      较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于
      两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时
      将文件均分成2个虚拟存储块(防止出现太小切片)。
      比如如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的
      块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文
      件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

      1.txt-->2M;2M<4M;一个块;

      2.txt-->7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;

      3.txt-->0.3M;0.3<4M ,0.3M<4M ,一个块

      4.txt-->8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M

      所有块信息:2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块

    • 切片过程

      • 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
      • 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
      • 按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件,
        则虚拟存储之后形成7个文件块,大小分别为:
        2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M

原文地址:https://www.cnblogs.com/haitaoli/p/15114487.html