简单的MapReduce程序(Hadoop2.2.0)

时间:2018-11-21
本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。,需要的朋友可以参考一下

本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。

1.获取源码

大家可以下载Hbase

Hbase: hbase-0.98.9-hadoop2-bin.tar.gz

在里面就包含了Hadoop2.2.0版本的jar文件和源码。

2.WordCount案例分析

在做详解之前,我们先来看一个例子,就是在一个文件中有一下的内容

hello hongten 1
hello hongten 2
hello hongten 3
hello hongten 4
hello hongten 5
......
......

文件中每一行包含一个hello,一个hongten,然后在每一行最后有一个数字,这个数字是递增的。

我们要统计这个文件里面的单词出现的次数(这个可以在网上找到很多相同的例子)

首先,我们要产生这个文件,大家可以使用以下的java代码生成这个文件

 1 import java.io.BufferedWriter;
 2 import java.io.File;
 3 import java.io.FileWriter;
 4 
 5 /**
 6  * @author Hongten
 7  * @created 11 Nov 2018
 8  */
 9 public class GenerateWord {
10 
11     public static void main(String[] args) throws Exception {
12         
13         double num = 12000000;
14         
15         StringBuilder sb = new StringBuilder();
16         for(int i=1;i<num;i++){
17             sb.append("hello").append(" ").append("hongten").append(" ").append(i).append("\n");
18         }
19         
20         File writename = new File("/root/word.txt");
21         writename.createNewFile();
22         BufferedWriter out = new BufferedWriter(new FileWriter(writename));
23         out.write(sb.toString()); 
24         out.flush();
25         out.close();
26         System.out.println("done.");
27     }
28 }

进入Linux系统,编译GenerateWord.java文件

javac GenerateWord.java

编译好了以后,会生成GenerateWord.class文件,然后执行

java GenerateWord

等待一段时间....就会生成这个文件了(大概252MB左右)。

接下来,我们来写统计单词的map,reduce,以及客户端的实现。

项目结构

这里总共有三个java文件

客户端

首先,我们需要定义Configuration和job,然后就是job的set操作,最后到job.waitForCompletion()方法,才触发了动作的提交。

这里可以理解为在客户端,包含了一个配置分布式运行的相关配置信息,最后提交动作。

 1 package com.b510.hongten.hadoop;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 /**
12  * @author Hongten
13  * @created 11 Nov 2018
14  */
15 public class WordCount {
16 
17     public static void main(String[] args) throws Exception {
18         //读取配置文件
19         Configuration conf = new Configuration();
20         //创建job
21         Job job = Job.getInstance(conf);
22 
23         // Create a new Job
24         job.setJarByClass(WordCount.class);
25 
26         // Specify various job-specific parameters
27         job.setJobName("wordcount");
28 
29         job.setMapperClass(MyMapper.class);
30         job.setMapOutputKeyClass(Text.class);
31         job.setMapOutputValueClass(IntWritable.class);
32 
33         job.setReducerClass(MyReducer.class);
34         job.setOutputKeyClass(Text.class);
35         job.setOutputValueClass(IntWritable.class);
36 
37         // job.setInputPath(new Path("/usr/input/wordcount"));
38         // job.setOutputPath(new Path("/usr/output/wordcount"));
39 
40         FileInputFormat.addInputPath(job, new Path("/usr/input/wordcount1"));
41 
42         Path output = new Path("/usr/output/wordcount");
43         if (output.getFileSystem(conf).exists(output)) {
44             output.getFileSystem(conf).delete(output, true);
45         }
46 
47         FileOutputFormat.setOutputPath(job, output);
48 
49         // Submit the job, then poll for progress until the job is complete
50         job.waitForCompletion(true);
51 
52     }
53 }

自定义的Mapper

 1 package com.b510.hongten.hadoop;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 
10 /**
11  * @author Hongten
12  * @created 11 Nov 2018
13  */
14 public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
15 
16     private final static IntWritable one = new IntWritable(1);
17     private Text word = new Text();
18 
19     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
20         StringTokenizer itr = new StringTokenizer(value.toString());
21         while (itr.hasMoreTokens()) {
22             word.set(itr.nextToken());
23             context.write(word, one);
24         }
25     }
26
27 }

自定义的Reduce

 1 package com.b510.hongten.hadoop;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 /**
10  * @author Hongten
11  * @created 11 Nov 2018
12  */
13 public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
14 
15     private IntWritable result = new IntWritable();
16 
17     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
18         int sum = 0;
19         for (IntWritable val : values) {
20             sum += val.get();
21         }
22         result.set(sum);
23         context.write(key, result);
24     }
25 
26 }

运行并查看结果

cd /home/hadoop-2.5/bin/

--创建测试文件夹
./hdfs dfs -mkdir -p /usr/input/wordcount1

--把测试文件放入测试文件夹
./hdfs dfs -put /root/word.txt /usr/input/wordcount1

--运行测试
./hadoop jar /root/wordcount.jar com.b510.hongten.hadoop.WordCount

--下载hdfs上面的文件
./hdfs dfs -get /usr/output/wordcount/* ~/

--查看文件最后5行
tail -n5 /root/part-r-00000

运行结果

从yarn客户端可以看到程序运行的时间长度

从11:47:46开始,到11:56:48结束,总共9min2s.(这是在我机器上面的虚拟机里面跑的结果,如果在真正的集群里面跑的话,应该要快很多)

数据条数:12000000-1条

3.客户端源码分析

当我们在客户端进行了分布式作业的配置后,最后执行

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

那么在waiteForCompletion()方法里面都做了些什么事情呢?

//我们传递的verbose=true
public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
        //提交动作
      submit();
    }
    //verbose=true
    if (verbose) {
        //监控并且打印job的相关信息
        //在客户端执行分布式作业的时候,我们能够看到很多输出
        //如果verbose=false,我们则看不到作业输出信息
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    //返回作业的状态
    return isSuccessful();
  }

这个方法里面最重要的就是submit()方法,提交分布式作业。所以,我们需要进入submit()方法。

public void submit() 
        throws IOException, InterruptedException, ClassNotFoundException {
   ensureState(JobState.DEFINE);
   //设置新的API,我使用的2.2.0的HadoopAPI,区别于之前的API
   setUseNewAPI();
   //和集群做连接,集群里面做出相应,分配作业ID
   connect();
   final JobSubmitter submitter = 
       getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
   status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
     public JobStatus run() throws IOException, InterruptedException, 
     ClassNotFoundException {
         //提交作业
         /*
         Internal method for submitting jobs to the system. 

         The job submission process involves: 
         1. Checking the input and output specifications of the job. 
         2. Computing the InputSplits for the job. 
         3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 
         4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 
         5. Submitting the job to the JobTracker and optionally monitoring it's status. 
         */
         //在这个方法里面包含5件事情。
         //1.检查输入和输出
         //2.为每个job计算输入切片的数量
         //3.4.提交资源文件
         //5.提交作业,监控状态
         //这里要注意的是,在2.x里面,已经没有JobTracker了。
         //JobTracker is no longer used since M/R 2.x. 
         //This is a dummy JobTracker class, which is used to be compatible with M/R 1.x applications.
       return submitter.submitJobInternal(Job.this, cluster);
     }
   });
   state = JobState.RUNNING;
   LOG.info("The url to track the job: " + getTrackingURL());
  }

所以我们需要进入submitter.submitJObInternal()方法去看看里面的实现。

//在这个方法里面包含5件事情。
//1.检查输入和输出
//2.为每个job计算输入切片的数量
//3.4.提交资源文件
//5.提交作业,监控状态
//这里要注意的是,在2.x里面,已经没有JobTracker了。
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);
    
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                     job.getConfiguration());
    //configure the command line options correctly on the submitting dfs
    Configuration conf = job.getConfiguration();
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    //设置Job的ID
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      copyAndConfigureFiles(job, submitJobDir);
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      //写切片信息,我们主要关系这个方法 :))
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      //到这里才真正提交job
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

在这里我们关心的是

int maps = writeSplits(job, submitJobDir);

进入writeSplites()方法

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
    //可以从job里面获取configuration信息
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
        //调用新的切片方法,我们使用的2.x的hadoop,因此
        //使用的是新的切片方法
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
        //旧的切片方法
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

我们使用的版本是2.x,所以,我们使用writeNewSplites()方法。

@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
    //可以从job里面获取configuration信息
  Configuration conf = job.getConfiguration();
  //通过反射获取一个输入格式化
  //这里面返回的是TextInputFormat,即默认值              
  InputFormat<?, ?> input =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  // ==  1  ==
  
  //输入格式化进行切片计算
  List<InputSplit> splits = input.getSplits(job);                  // ==  2  ==
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

  // sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator());
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
      jobSubmitDir.getFileSystem(conf), array);
  return array.length;
}

我们看到‘==  1 ==’,这里是获取输入格式化,进入job.getInputFormatClass()方法

@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass() 
   throws ClassNotFoundException {
    //如果配置信息里面INPUT_FORMAT_CLASS_ATTR(mapreduce.job.inputformat.class)没有配置
    //则返回TextInputFormat
    //如果有配置,则返回我们配置的信息
    //意思是:默认值为TextInputFormat
  return (Class<? extends InputFormat<?,?>>) 
    conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}

我们看到,系统默认的输入格式化为TextInputFormat。

我们看到‘==  2 ==’,这里从输入格式化里面进行切片计算。那么我们进入getSplites()方法

public List<InputSplit> getSplits(JobContext job) throws IOException {
    //minSize = Math.max(1, 1L)=1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));      // == A ==
    //maxSize = Long.MAX_VALUE
    long maxSize = getMaxSplitSize(job);                                         // == B ==

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //获取输入文件列表
    List<FileStatus> files = listStatus(job);
    //遍历文件列表
    for (FileStatus file: files) {
      //一个文件一个文件的处理
      //然后计算文件的切片
      Path path = file.getPath();
      //文件大小
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
            //通过路径获取FileSystem
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          //获取文件所有块信息
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判断文件是否可以切片
        if (isSplitable(job, path)) {
            //可以切片
          //获取文件块大小
          long blockSize = file.getBlockSize();
          //切片大小 splitSize = blockSize
          //默认情况下,切片大小等于块的大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);          // == C == 

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            //块的索引
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);     // == D ==
            //切片详细信息
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                     blkLocations[blkIndex].getHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts()));
          }
        } else { // not splitable
            //不可切片
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
  }

我们看‘== A ==’, getFormatMinSplitSize()方法返回1,getMinSplitSize()方法返回1L。

protected long getFormatMinSplitSize() {
    return 1;
  }

public static long getMinSplitSize(JobContext job) {
    //如果我们在配置文件中有配置SPLIT_MINSIZE(mapreduce.input.fileinputformat.split.minsize),则取配置文件里面的
    //否则返回默认值1L
    //这里我们,没有配置,所以返回1L
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }

我们看‘== B ==’,getMaxSplitSize()方法返回Long.MAX_VALUE(我们没有进行对SPLIT_MAXSIZE进行配置)

public static long getMaxSplitSize(JobContext context) {
    //如果我们在配置文件中有配置SPLIT_MAXSIZE(mapreduce.input.fileinputformat.split.maxsize),则取配置文件里面的
    //否则返回默认值Long.MAX_VALUE
    //这里我们,没有配置,所以返回Long.MAX_VALUE
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }

我们看‘== C ==’,在我们没有进行配置的情况下,切片大小等于块大小。

//minSize=1
//maxSize=Long.MAX_VALUE
protected long computeSplitSize(long blockSize, long minSize,
        long maxSize) {
    //Math.min(maxSize, blockSize) -> Math.min(Long.MAX_VALUE, blockSize) -> blockSize
    //Math.max(minSize, blockSize) -> Math.max(1, blockSize) -> blockSize
return Math.max(minSize, Math.min(maxSize, blockSize));
}

我们看‘== D ==’,通过偏移量获取块的索引信息。

protected int getBlockIndex(BlockLocation[] blkLocations, 
        long offset) {
    //通过偏移量获取块的索引
    for (int i = 0 ; i < blkLocations.length; i++) {
        // is the offset inside this block?
        if ((blkLocations[i].getOffset() <= offset) &&
        (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
            return i;
        }
    }
    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    throw new IllegalArgumentException("Offset " + offset + 
                     " is outside of file (0.." +
                     fileLength + ")");
}

4.小结

用通俗的语言来描述上面的事情,可以用下面的图来说明:

系统默认的块大小为128MB,在我们没有进行其他配置的时候,块大小等于切片大小。

Type1:块大小为45MB,小于系统默认大小128MB,

切片信息:path, 0, 45, [3, 8, 10]

切片信息:文件的位置path, 偏移量0, 切片大小45, 块的位置信息[3, 8, 10]=该文件(块)存在HDFS文件系统的datanode3,datanode8,datanode10上面。

Type2:块大小为128MB,即等于系统默认大小128MB,不会分成两个快,和Type1一样。

Type3:块大小为414MB,即大于系统默认128MB,那么在我们上传该文件到HDFS的时候,系统就会把该文件分成很多块,每一块128MB,每一块128MB,直到分完为止,最后剩下30MB单独为一块。那么,每一个切片信息由文件位置path, 偏移量,切片大小, 块的位置信息构成。我们把这一串信息称为文件的切片清单。

当系统拿到了文件的切片清单了以后,那么就会把这些清单提交给分布式系统,再由分布式系统去处理各个切片。

 

5.Mapper详解

5.1.map输入

map从HDFS获取输入流,然后定位到切片的位置,除了第一个切片,其他切片都是从第二行开始读取数据进行处理。

在org.apache.hadoop.mapred.MapTask里面,包含了run()方法

//org.apache.hadoop.mapred.MapTask
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
        throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
      //我们在客户端可以设置reduce的个数
      // job.setNumReduceTasks(10);
      //如果没有Reduce,只有map阶段,
      if (conf.getNumReduceTasks() == 0) {
          //那么就执行这行
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        //只要有Reduce阶段,
        mapPhase = getProgress().addPhase("map", 0.667f);
        //就要加入排序
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    //是否使用新的API
    if (useNewApi) {
        //我们使用的是new mapper
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

我们进入到runNewMapper()方法,我们可以看到整个map的宏观动作

1.输入初始化

2.调用org.apache.hadoop.mapreduce.Mapper.run()方法

3.更新状态

4.关闭输入

5.关闭输出

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final