mapreduce工作原理及如何在mapreduce中实现二次排序

时间:2018-11-15
本文章向大家介绍mapreduce工作原理及如何在mapreduce中实现二次排序,需要的朋友可以参考一下

什么是二次排序

待排序的数据具有多个字段,首先对第一个字段排序,再对第一字段相同的行按照第二字段排序,第二次排序不破坏第一次排序的结果,这个过程就称为二次排序

如何在mapreduce中实现二次排序

mapreduce的工作原理

MR的工作原理如下图(如果看不清可右键新标签页查看):

图片部分数据参考自:https://www.bbsmax.com/A/KE5Qjg6qdL/

示例数据以及需求

为了方便分析整个过程,这里以如下数据作为示例,现在假设给出如下的数据,数据有两列,每列都是整形数据

7 5
3 95
1 5
2 7
1 2
4 62
4 13
2 99
1 8
7 8888

要求输出结果为:

------------------------------------------------
1 2
1 5
1 8
------------------------------------------------
2 7
2 99
------------------------------------------------
3 95
------------------------------------------------
4 13
4 62
------------------------------------------------
7 5
7 8888

可以看到这就是一个二次排序的过程。

思路一

假设每一行以空格划分的两个Int型数据分别为Int1、Int2,那么最简单的思路是:Mapper以每一行数据作为输入,输出键值对为<Int1, Int2>,由于我们知道在reducer运行之前,数据会先按照Key也就是Int1排序,那么Int1相同的数据就将合并到一起供同一个Reducer进行处理,那么我们便可以在reduce函数中对输入的<Int1, [Int2-list]>按照Int2升序操作即可。

现在来分析一下,在这个思路下,一个Reducer要接收一个给定Key的所有值并对其进行内部排序,如果数据量大的话,那显然这会耗尽机器的内存,对于实际运用,这是不可取的思路。

思路二

 仔细观察MR的原理图就可以发现,MR的分区、排序、分组等操作都是针对Key进行的,既然我们想要对两个字段都进行排序,那么可以将Int1和Int2组合成新的Key,原来的Value保持不变(不过这时候Value其实都不重要了,因为Key就包含了原来键值对的所有信息了,所以Value其实也可以设置为Null,这里就选择保持Value不变的方式进行操作),这样一来按照MR的原理图来看,对于新Key,

  1. 其分区逻辑为:只对Int1进行分区(默认的分区操作是以整个Key进行哈希操作的,这就可能把有同样Int1的组合Key发送给不同的Reducer,这显然不是我们想要的);
  2. 其排序逻辑为:先对Int1排序,在Int1相同的基础上对Int2排序(即是二次排序的逻辑);
  3. 其分组逻辑为:只对Int1进行分组(保持与原逻辑一致,Int1相同的数据可以在一次reduce函数被调用时一同被处理)。

下面开始讲解要实现的代码。

 一、自定义组合Key

在MR中,所有的Key值类型都必须实现WritableComparable接口,使其支持可序列化(用于写入磁盘)和可比较(用于排序)。

  • 要是可序列化的就得实现readFiels()和write()这两个序列化和反序列化函数
  • 要是可比较的就得实现compareTo()函数,该函数即是排序规则的实现

代码实现如下:

public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;

    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt();
      second = in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first);
      out.writeInt(second);
    }

    @Override
    public int compareTo(IntPair other) {
      if (first != other.first) {
        return first < other.first ? -1 : 1;
      } else if (second != other.second) {
        return second < other.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }

二、实现组合Key的分区逻辑

 这里有两种实现方式,实现其一就可以实现目的。

实现方式一:自定义分区类

public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() % numPartitions); 
} 
}

由于分区只针对Int1,所以这里进行哈希时只使用到了Key.getFirst()。由于getPartition()函数返回的是用于标记键值对属于哪个分区的int型数据,而分区数就等于Reducer的数目,所以这里要先取模。同时为了保证结果为证,这里最后要取最绝对值。

实现方式二:重载组合Key的hashCode()函数以及equals()函数

 以下代码在组合Key——IntPair中实现。

@Override
public int hashCode() {
  return first;
}

@Override
public boolean equals(Object other) {
  if (other instanceof IntPair) {
    IntPair o = (IntPair) other;
    return o.first == first && o.second == second;
  } else {
    return false;
  }
}

在Java中hashCode()函数和equals函数基本上是成对实现的,关于hashCode()函数的设计方式可参考:hashCode 方法及 equals 方法的规范