浅谈Hadoop Distcp工具的InputFormat

时间:2022-06-06
本文章向大家介绍浅谈Hadoop Distcp工具的InputFormat,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

导语

从Hadoop的出现到现在已经超过十年了,它在大数据领域扮演着一个重要的角色,相信在Hadoop的使用过程中,或多或少的都会用到自带的一个常用工具,就是Hadoop的distcp工具,这篇文章就是简单的方式去了解他的拷贝策略原理。

背景

在集群迁移或者数据跨集群同步的过程中,必要少不了数据拷贝的动作,在同一个集群内,跨NameSpace的数据拷贝,你可以使用distcp,你也可以自己实现类似facebook提供的fastcopy的拷贝(社区好像没提供),那么在使用distcp工具的过程中,其中的一些参数到底影响了什么,他是一个怎样的原理,今天就和大家简单的分享下。

我们在命令行执行hadoop distcp命令回车,就会看到他所支持的很多参数,其中在命令行拷贝策略(-strategy)选项中,有两个参数可选参数:dynamic,uniformsize。在默认情况下使用的是uniformsize,含义是distcp的每个map会相对均衡去复制数据量大小的文件。

我们通过查看源码容易可以看出,除了命令行选项之外,distcp还能默认的去加载distcp-default.xml,我们可以放置到$HADOOP_CONF_DIR下,我们可以配置相对常用的参数到这个文件中。

  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
    Configuration config = new Configuration(configuration);
    config.addResource(DISTCP_DEFAULT_XML);
    setConf(config);
    this.inputOptions = inputOptions;
    this.metaFolder   = createMetaFolderPath();
  }

除此之外,我们还从默认的配置当中,看到了两个参数:

<property>
<name>distcp.dynamic.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
<description>Implementation of dynamic input format</description>
</property>

<property>
<name>distcp.static.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
<description>Implementation of static input format</description>
</property>

这个就是上述说的两种命令行策略的参数模式。通过命名可以很容易可以看出,其实这就是两个InputFormat的实现类,distcp任务(其实也就是MR任务),通过配置命令行或者参数指定使用不同的inputFormat生成不同的splits,从而实现不同的拷贝文件的逻辑。

然而,既然有两个选项,那他们的区别在哪呢?我们可以简单的看看下图的一个整体结构

DynamicInputFormat

对于DynamicInputFormat来说,有几个重要的相关的类:DynamicRecordReader,DynamicInputChunk,DynamicInputChunkContext。

对于distcp任务,会先生成一个copy-listing文件,该文件包含复制文件的列表等信息,DynamicInputFormat的getSplits方法就是将这些切分为不同chunk,然后分配到不同的task中。

在切分copy-listing文件到不同的chunk当中,其中有几个变量,numMaps和numRecords得到splitRatio的比例,也就是算出平均每个map处理多少个chunk,然后通过总的records数量,算出每个chunk中有多少条records

static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
int maxChunksIdeal = getMaxChunksIdeal(conf);
int minRecordsPerChunk = getMinRecordsPerChunk(conf);
int splitRatio = getSplitRatio(conf);

if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}

if (nMaps > maxChunksIdeal)
return splitRatio;

int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));

return nRecordsPerChunk < minRecordsPerChunk ?
splitRatio : nPickups;
}

最终会将所有的record放到不同的chunk中,在hdfs上会在对应目录行程对应的文件类似fileList.seq.chunk.0000x

drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248
drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir
-rw-r--r-- 1 hadoop supergroup 1504 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00002
-rw-r--r-- 1 hadoop supergroup 1486 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00003
-rw-r--r-- 1 hadoop supergroup 1646 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000000
-rw-r--r-- 1 hadoop supergroup 1524 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000001
-rw-r--r-- 1 hadoop supergroup 6686 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq
-rw-r--r-- 1 hadoop supergroup 5906 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq_sorted

然后map通过DynamicRecordReader去读取数据的时候就会将对应的chunk文件修改为task(task_1526024399954_0017_m_000000)名字,所以通过上面的文件夹输入可以看出,这时有两个map正在对数据进行拷贝,执行速度快的map会继续读取未被领取的chunk进行拷贝,这就让速度快的map可以对更多的数据进行拷贝

UniformSizeInputFormat

对于默认的UniformSizeInputFormat,他的实现方式比DynamicInputFormat简单,原理其实就是得到总的totalSizeBytes,然后除以map数量得到平均每个map处理多少数据,然后当文件的大小加起来大于nBytesPerSplit的时候,就形成一个split,这样是希望每个map处理的数据差距不会太大。

带宽控制

带宽控制主要实现在ThrottledInputStream当中,他在hadoop除了在distcp之外,也用在了NameNode之间的FSImage传输等场景上的使用,原理就是,他继承了原有的InputStream并在每次读取的时候进行每秒获取字节的速率检查(throttle),如果超过,则进行sleep:

  /**
   * Read bytes starting from the specified position. This requires rawStream is
   * an instance of {@link PositionedReadable}.
   */
  public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
    if (!(rawStream instanceof PositionedReadable)) {
      throw new UnsupportedOperationException(
          "positioned read is not supported by the internal stream");
    }
    throttle();
    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
        offset, length);
    if (readLen != -1) {
      bytesRead += readLen;
    }
    return readLen;
  }

  private void throttle() throws IOException {
    while (getBytesPerSec() > maxBytesPerSec) {
      try {
        Thread.sleep(SLEEP_DURATION_MS);
        totalSleepTime += SLEEP_DURATION_MS;
      } catch (InterruptedException e) {
        throw new IOException("Thread aborted", e);
      }
    }
  }

  /**
   * Getter for the read-rate from this stream, since creation.
   * Calculated as bytesRead/elapsedTimeSinceStart.
   * @return Read rate, in bytes/sec.
   */
  public long getBytesPerSec() {
    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
    if (elapsed == 0) {
      return bytesRead;
    } else {
      return bytesRead / elapsed;
    }
  }

总结:

除了本文说的参数之外,我们平时在数据拷贝的过程中,我们还可以综合的通过控制map的数量,控制带宽阈值去减少这个过程对线上系统的影响,其中还有update参数等等,我们可以按照自身的业务需求去调整自身的参数,从而达到一个相对最优的数据拷贝效果。