hadoop源码_hdfs启动流程_3_心跳机制

时间:2021-07-21
本文章向大家介绍hadoop源码_hdfs启动流程_3_心跳机制,主要包括hadoop源码_hdfs启动流程_3_心跳机制使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

hadoop在启动namenode和datanode之后,两者之间是如何联动了?datanode如何向namenode注册?如何汇报数据?namenode又如何向datanode发送命令?

心跳机制基础概念

​ 心跳就是HDFS中从节点DataNode周期性的向名字节点DataNode做汇报,汇报自己的健康情况、负载状况等,并从NameNode处领取命令在本节点执行,保证NameNode这一HDFS指挥官熟悉HDFS的全部运行情况,并对从节点DataNode发号施令,以完成来自外部的数据读写请求或内部的负载均衡等任务。

​ 另外,在集群启动时,NameNode会NameNode#initialize方法中调用loadNamesystem(conf);方法,从磁盘加载fsimage以及edits文件,初始化FsNamesystem、FsDirectory、 LeaseManager等。但是与数据节点相关的信息不保留在NameNode的本地文件系统中,而是每次启动时,都会动态地重建这些信息。

​ 而这些数据也是正是在从节点DataNode接入集群后,由其发送心跳信息汇报给主节点NameNode的。

BlockPoolManager

​ 结合DataNode的启动源码来看,可以看到DataNode中有个私有的成员变量private BlockPoolManager blockPoolManager;,她的初始化代码在DataNode#startDataNode中:

    // 实例化BlockPoolManager
		blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(getConf());

​ 接着进入BlockPoolManager类,看一下这个类的注释说明和BlockPoolManager(DataNode dn)的构造函数:

/**
 * Manages the BPOfferService objects for the data node.
 * Creation, removal, starting, stopping, shutdown on BPOfferService
 * objects must be done via APIs in this class.
 */
// 管理数据节点的BPOfferService对象。
// BPOfferService对象的创建、删除、启动、停止和关闭必须通过这个类中的api完成。
@InterfaceAudience.Private
class BlockPoolManager {
  private static final Logger LOG = DataNode.LOG;

  // NameserviceId与BPOfferService的对应Map
  private final Map<String, BPOfferService> bpByNameserviceId =
    Maps.newHashMap();
  // BlockPoolId与BPOfferService的对应Map
  private final Map<String, BPOfferService> bpByBlockPoolId =
    Maps.newHashMap();
  // 所有的BPOfferService集合
  private final List<BPOfferService> offerServices =
      new CopyOnWriteArrayList<>();

  private final DataNode dn;

  //This lock is used only to ensure exclusion of refreshNamenodes
  // 这个refreshNamenodesLock仅仅在refreshNamenodes()方法中被用作互斥锁
  private final Object refreshNamenodesLock = new Object();
  
  BlockPoolManager(DataNode dn) {
    this.dn = dn;
  }
  
  // ...其余代码省略
}

​ 可以看到构造函数仅是将dataNode的引用赋给自身的私有成员变量,而通过BlockPoolManager的注释可以看出她负责管理DataNode中所有的BPOfferService,包括完整的生命周期和各种操作都需要由BlockPoolManager来代理。

BPOfferService类

​ 接下来看看BPOfferService的类定义和成员变量:

/**
 * One instance per block-pool/namespace on the DN, which handles the
 * heartbeats to the active and standby NNs for that namespace.
 * This class manages an instance of {@link BPServiceActor} for each NN,
 * and delegates calls to both NNs. 
 * It also maintains the state about which of the NNs is considered active.
 */
// DN上的每个块池/名称空间一个实例,它处理该名称空间的主和备用NameNode的心跳。
// 这个类为每个NN管理一个BPServiceActor实例,并委托对两个NN的调用。
// 它也保存了哪个NameNode是active状态。
@InterfaceAudience.Private
class BPOfferService {
  static final Logger LOG = DataNode.LOG;
  
  /**
   * Information about the namespace that this service
   * is registering with. This is assigned after
   * the first phase of the handshake.
   */
  // 关于此服务要注册的命名空间的信息。这是在握手的第一阶段之后分配的。
  NamespaceInfo bpNSInfo;

  /**
   * The registration information for this block pool.
   * This is assigned after the second phase of the
   * handshake.
   */
  // 此块池的注册信息。这是在握手的第二阶段之后分配的。
  volatile DatanodeRegistration bpRegistration;

  private final String nameserviceId;
  private volatile String bpId;
  private final DataNode dn;

  /**
   * A reference to the BPServiceActor associated with the currently
   * ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
   * this can be null. If non-null, this must always refer to a member
   * of the {@link #bpServices} list.
   */
  // 对BPServiceActor的引用,该引用与当前的ACTIVE NN关联。
  // 当所有namenode都为STANDBY模式时,该值可以为空。
  // 如果非空,则必须始终引用bpServices列表的成员。
  private BPServiceActor bpServiceToActive = null;
  
  /**
   * The list of all actors for namenodes in this nameservice, regardless
   * of their active or standby states.
   */
  // 此名称服务中namenode的所有参与者的列表,无论其处于active或standby状态。
  private final List<BPServiceActor> bpServices =
    new CopyOnWriteArrayList<BPServiceActor>();

  /**
   * Each time we receive a heartbeat from a NN claiming to be ACTIVE,
   * we record that NN's most recent transaction ID here, so long as it
   * is more recent than the previous value. This allows us to detect
   * split-brain scenarios in which a prior NN is still asserting its
   * ACTIVE state but with a too-low transaction ID. See HDFS-2627
   * for details. 
   */
  // 每次我们收到一个自称为ACTIVE的NN的心跳时,我们在这里记录NN最近的事务ID,只要它比之前的值更近。
  // 这允许我们检测裂脑场景,即先前的神经网络仍然断言其ACTIVE状态,但事务ID过低。
  private long lastActiveClaimTxId = -1;

  // 锁
  private final ReentrantReadWriteLock mReadWriteLock =
      new ReentrantReadWriteLock();
  private final Lock mReadLock  = mReadWriteLock.readLock();
  private final Lock mWriteLock = mReadWriteLock.writeLock();

  // utility methods to acquire and release read lock and write lock
  void readLock() {
    mReadLock.lock();
  }

  void readUnlock() {
    mReadLock.unlock();
  }

  void writeLock() {
    mWriteLock.lock();
  }

  void writeUnlock() {
    mWriteLock.unlock();
  }

  BPOfferService(
      final String nameserviceId, List<String> nnIds,
      List<InetSocketAddress> nnAddrs,
      List<InetSocketAddress> lifelineNnAddrs,
      DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
        "Must pass same number of NN addresses and lifeline addresses.");
    this.nameserviceId = nameserviceId;
    this.dn = dn;
		// 每个namenode一个BPServiceActor
    for (int i = 0; i < nnAddrs.size(); ++i) {
      this.bpServices.add(new BPServiceActor(nameserviceId, nnIds.get(i),
          nnAddrs.get(i), lifelineNnAddrs.get(i), this));
    }
  }
 // ......其余代码省略
}

​ 由代码可以看出,BPOfferService是DataNode上每个块池或名称空间对应的一个实例,她处理该命名空间到对应的活跃/备份状态NameNode的心跳。这个类管理每个NameNode的一个BPServiceActor实例,同时也会保存哪个是active状态。

BPServiceActor类

​ 接下来看看每个块池/名称空间一个的BPOfferService中,保存的每个NameNode一个的BPServiceActor的具体类定义:

/**
 * A thread per active or standby namenode to perform:
 * <ul>
 * <li> Pre-registration handshake with namenode</li>
 * <li> Registration with namenode</li>
 * <li> Send periodic heartbeats to the namenode</li>
 * <li> Handle commands received from the namenode</li>
 * </ul>
 */
// 每个活动或备用namenode要执行的线程:
// 与namenode预注册握手
// 在namenode上注册
// 定期发送心跳到namenode
// 处理从namenode接收到的命令
@InterfaceAudience.Private
class BPServiceActor implements Runnable {
  // ......其余代码省略
  
  BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
      InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
    this.bpos = bpos;
    this.dn = bpos.getDataNode();
    this.nnAddr = nnAddr;
    this.lifelineSender = lifelineNnAddr != null ?
        new LifelineSender(lifelineNnAddr) : null;
    this.initialRegistrationComplete = lifelineNnAddr != null ?
        new CountDownLatch(1) : null;
    this.dnConf = dn.getDnConf();
    this.ibrManager = new IncrementalBlockReportManager(
        dnConf.ibrInterval,
        dn.getMetrics());
    prevBlockReportId = ThreadLocalRandom.current().nextLong();
    fullBlockReportLeaseId = 0;
    scheduler = new Scheduler(dnConf.heartBeatInterval,
        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
        dnConf.outliersReportIntervalMs);
    // get the value of maxDataLength.
    this.maxDataLength = dnConf.getMaxDataLength();
    if (serviceId != null) {
      this.serviceId = serviceId;
    }
    if (nnId != null) {
      this.nnId = nnId;
    }
    commandProcessingThread = new CommandProcessingThread(this);
    commandProcessingThread.start();
  }
  // ......其余代码省略
}

​ 可以看出,BPServiceActor就是负责与特定NameNode通信的工作线程,类注解上也很明确的列出了该类的功能。

DataNode#createDataNode

​ 最后再回到DataNode#createDataNode方法中:

  
  /** Instantiate &amp; Start a single datanode daemon and wait for it to
   * finish.
   *  If this thread is specifically interrupted, it will stop waiting.
   */
  @VisibleForTesting
  @InterfaceAudience.Private
  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 初始化datanode
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 启动datanode进程
      dn.runDatanodeDaemon();
    }
    return dn;
  }

  public void runDatanodeDaemon() throws IOException {
    blockPoolManager.startAll();

    // start dataXceiveServer
    dataXceiverServer.start();
    if (localDataXceiverServer != null) {
      localDataXceiverServer.start();
    }
    ipcServer.setTracer(tracer);
    ipcServer.start();
    startPlugins(getConf());
  }

​ 可以看到在这里调用了blockPoolManager.startAll();方法,之后就是一连串的start()方法调用:

	// BlockPoolManager#startAll()
	synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

	// BPOfferService#start()
  //This must be called only by blockPoolManager
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }
	
	// BPServiceActor#start()
	//This must be called only by BPOfferService
  void start() {
    if ((bpThread != null) && (bpThread.isAlive())) {
      //Thread is started already
      return;
    }
    bpThread = new Thread(this);
    bpThread.setDaemon(true); // needed for JUnit testing

    if (lifelineSender != null) {
      lifelineSender.start();
    }
    bpThread.start();
  }

​ 最终是调用到了BPServiceActor#start(),启动了自身线程和生命线发送线程。再之后就是在DataNode#secureMain中有datanode.join();方法来等待这些子线程执行结束。

​ 所以,整个心跳机制的大致结构就是:

  1. 每个DataNode上都有一个BlockPoolManager实例

  2. 每个BlockPoolManager实例管理着所有名称服务空间对应的BPOfferService实例

  3. 每个BPOfferService实例管理者对应名称空间到所有NameNode的BPServiceActor工作线程:包含一个Active与若干Standby状态的NN

  4. BPServiceActor是针对特定的NameNode进行通讯和完成心跳与接收响应命令的工作线程。

    心跳机制的大致流程是:

  5. DataNode#startDataNode方法中中对BlockPoolManager进行实例化

  6. DataNode#startDataNode方法中中调用BlockPoolManager#refreshNamenodes方法来更新namenode的nameservice,以及创建对应的BPOfferServiceBPServiceActor等,之后进行连通namenode,

  7. DataNode#createDataNode方法中调用BlockPoolManager#startAll方法来启动所有心跳相关的线程

  8. DataNode#secureMain中调用datanode.join()方法来等待心跳线程被中止

心跳机制代码详解

​ 接下来来看看心跳机制的具体代码实现过程

DataNode#startDataNode

首先来看datanode启动流程中对心跳机制的调用:

  // 此方法使用指定的conf启动数据节点,如果设置了conf的config_property_simulation属性,则创建一个模拟的基于存储的数据节点
  void startDataNode(List<StorageLocation> dataDirectories,
                     SecureResources resources
                     ) throws IOException {

    // ...... 本方法更详细的代码见上一篇博客哦
    
    // 按照namespace(nameservice)、namenode的结构进行初始化
    blockPoolManager = new BlockPoolManager(this);
    // 心跳管理
    blockPoolManager.refreshNamenodes(getConf());

		// ......
  }

BlockPoolManager的构造方法为this.dn = dn;

BlockPoolManager#refreshNamenodes

重点看看心跳管理:

  void refreshNamenodes(Configuration conf)
      throws IOException {
    // DFSConfigKeys.DFS_NAMESERVICES: 取配置项:dfs.nameservices, 默认值为null
    LOG.info("Refresh request received for nameservices: " +
        conf.get(DFSConfigKeys.DFS_NAMESERVICES));

    Map<String, Map<String, InetSocketAddress>> newAddressMap = null;
    Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = null;

    try {
      // 获取管理这个集群的namenode对应的InetSocketAddresses列表,对应配置项dfs.namenode.servicerpc-address
      // 返回的格式为:Map<nameserviceId, Map<namenodeId, InetSocketAddress>>
      newAddressMap =
          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
      // 从配置中获取与namenode上的生命线RPC服务器对应的InetSocketAddresses列表,对应配置项dfs.namenode.lifeline.rpc-address
      newLifelineAddressMap =
          DFSUtil.getNNLifelineRpcAddressesForCluster(conf);
    } catch (IOException ioe) {
      LOG.warn("Unable to get NameNode addresses.", ioe);
    }

    if (newAddressMap == null || newAddressMap.isEmpty()) {
      throw new IOException("No services to connect, missing NameNode " +
          "address.");
    }

    synchronized (refreshNamenodesLock) {
      doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
    }
  }

refreshNamenodes根据配置拼接集群的Map<nameserviceId, Map<namenodeId, InetSocketAddress>>,和生命线的同格式的映射关系。之后调用doRefreshNamenodes方法执行具体的刷新NameNode过程。

BlockPoolManager#doRefreshNamenodes

  private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap,
      Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
      throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);

    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      // 步骤1:循环所有获取到的nameservice,
      // 判断她是一个已存在nameservice中的被更新了的NN集合,还是完全的一个新的nameservice
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }

      // Step 2. Any nameservices we currently have but are no longer present need to be removed.
      // 步骤2:我们当前拥有但不再存在的任何名称服务都需要删除。(bpByNameserviceId中存在,而配置信息addrMap中没有的)
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
      
      assert toRefresh.size() + toAdd.size() ==
        addrMap.size() :
          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
          "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
          "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);

      
      // Step 3. Start new nameservices
      // 步骤3:启动所有新的nameservice
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
      
        for (String nsToAdd : toAdd) {
          Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
          Map<String, InetSocketAddress> nnIdToLifelineAddr =
              lifelineAddrMap.get(nsToAdd);
          ArrayList<InetSocketAddress> addrs =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          ArrayList<String> nnIds =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          ArrayList<InetSocketAddress> lifelineAddrs =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          for (String nnId : nnIdToAddr.keySet()) {
            addrs.add(nnIdToAddr.get(nnId));
            nnIds.add(nnId);
            lifelineAddrs.add(nnIdToLifelineAddr != null ?
                nnIdToLifelineAddr.get(nnId) : null);
          }
          // 创建新的BPOfferService
          BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
              lifelineAddrs);
          // 将新的bops放入集合中
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 全部启动
      startAll();
    }

    // Step 4. Shut down old nameservices. This happens outside
    // of the synchronized(this) lock since they need to call
    // back to .remove() from another thread
    // 步骤4:关闭旧的名称服务。这发生在synchronized(This)锁之外,因为它们需要从另一个线程回调.remove()
    if (!toRemove.isEmpty()) {
      LOG.info("Stopping BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRemove));
      
      for (String nsToRemove : toRemove) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
        bpos.stop();
        bpos.join();
        // they will call remove on their own
        // 这里的执行逻辑大概描述如下:
        // bpos.stop() -> actor.stop(); -> shouldServiceRun = false;
        // bpos.join() -> actor.join(); -> bpThread.join();
        // -> BPServiceActor#run 方法中 shouldRun() 返回false,执行finally中的 BPServiceActor#cleanUp
        // -> BPOfferService#shutdownActor -> DataNode#shutdownBlockPool -> BlockPoolManager#remove
      }
    }
    
    // Step 5. Update nameservices whose NN list has changed
    // 步骤5:更新NN列表已更改的名称服务
    if (!toRefresh.isEmpty()) {
      // 正在刷新名称服务的nn列表
      LOG.info("Refreshing list of NNs for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRefresh));
      
      for (String nsToRefresh : toRefresh) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
        Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
        Map<String, InetSocketAddress> nnIdToLifelineAddr =
            lifelineAddrMap.get(nsToRefresh);
        ArrayList<InetSocketAddress> addrs =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        ArrayList<InetSocketAddress> lifelineAddrs =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        ArrayList<String> nnIds = Lists.newArrayListWithCapacity(
            nnIdToAddr.size());
        for (String nnId : nnIdToAddr.keySet()) {
          addrs.add(nnIdToAddr.get(nnId));
          lifelineAddrs.add(nnIdToLifelineAddr != null ?
              nnIdToLifelineAddr.get(nnId) : null);
          nnIds.add(nnId);
        }
        try {
          UserGroupInformation.getLoginUser()
              .doAs(new PrivilegedExceptionAction<Object>() {
                @Override
                public Object run() throws Exception {
                  bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
                  return null;
                }
              });
        } catch (InterruptedException ex) {
          IOException ioe = new IOException();
          ioe.initCause(ex.getCause());
          throw ioe;
        }
      }
    }
  }

根据官方给的注释可以看到总共分了五步,步骤一二都是对比refreshNamenodes方法中根据配置拼接出的需要连接的nameservice,与当前已经连接好的bpByNameserviceId集合相对比,分别将差异的数据分到toRefreshtoAddtoRemove三组中。

接下来步骤三,是启动所有新的namenode,代码可以分成三块,第一块是整理各种需要的参数,第二块是创建新的BPOfferService并将新的bops放入到成员变量中。第三步是全部启动创建好的bpos。

创建新的BPOfferService

接下来先看看第二块BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs, lifelineAddrs);代码中都做了什么:

  protected BPOfferService createBPOS(
      final String nameserviceId,
      List<String> nnIds,
      List<InetSocketAddress> nnAddrs,
      List<InetSocketAddress> lifelineNnAddrs) {
    return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs,
        dn);
  }

可以看到这就是为了方便测试特地独立出来的方法,简单的调用了BPOfferService的构造函数。

BPOfferService构造函数

BPOfferService(
    final String nameserviceId, List<String> nnIds,
    List<InetSocketAddress> nnAddrs,
    List<InetSocketAddress> lifelineNnAddrs,
    DataNode dn) {
  // 至少有一个namenode可以连接
  Preconditions.checkArgument(!nnAddrs.isEmpty(),
      "Must pass at least one NN.");
  // NameNode地址和生命线地址数量要相同
  Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
      "Must pass same number of NN addresses and lifeline addresses.");
  this.nameserviceId = nameserviceId;
  this.dn = dn;
  // 挨个儿创建BPServiceActor实例,并存入bpServices集合中。
  for (int i = 0; i < nnAddrs.size(); ++i) {
    this.bpServices.add(new BPServiceActor(nameserviceId, nnIds.get(i),
        nnAddrs.get(i), lifelineNnAddrs.get(i), this));
  }
}

可以看到除了判断和赋值以外,就挨个儿调用了BPServiceActor的构造函数。那继续来看下一个构造函数的具体代码。

BPServiceActor构造函数

  BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
      InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
    this.bpos = bpos;
    this.dn = bpos.getDataNode();
    this.nnAddr = nnAddr;
    this.lifelineSender = lifelineNnAddr != null ?
        new LifelineSender(lifelineNnAddr) : null;
    this.initialRegistrationComplete = lifelineNnAddr != null ?
        new CountDownLatch(1) : null;
    this.dnConf = dn.getDnConf();
    // 初始化管理增量块(IBRs)汇报的实例
    this.ibrManager = new IncrementalBlockReportManager(
        dnConf.ibrInterval,
        dn.getMetrics());
    prevBlockReportId = ThreadLocalRandom.current().nextLong();
    fullBlockReportLeaseId = 0;
    // 实例化Scheduler: 包装用于调度心跳和块报告的时间戳计算的实用程序类
    scheduler = new Scheduler(dnConf.heartBeatInterval,
        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
        dnConf.outliersReportIntervalMs);
    // get the value of maxDataLength.
    // 获取配置项:ipc.maximum.data.length, 服务器可以接受的最大请求大小。默认值是128 * 1024 * 1024(128mb)
    this.maxDataLength = dnConf.getMaxDataLength();
    if (serviceId != null) {
      this.serviceId = serviceId;
    }
    if (nnId != null) {
      this.nnId = nnId;
    }
    // 实例化CommandProcessingThread,用于异步处理命令,且会将此线程标记为守护线程或用户线程。
    commandProcessingThread = new CommandProcessingThread(this);
    commandProcessingThread.start();
  }

可以看到步骤三的第二部分代码主要是实例化了所有需要的BPOfferServiceBPServiceActor,过程中还初始化了块增量汇报的实例和自己包装的时间戳计算累和一些其他的守护线程。

BlockPoolManager#startAll

接下来看看这最重要的startAll方法都做了什么:

  synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

可以看到核心功能就是启动所有已经实例化的bops,继续跟踪到BPOfferService#start方法:

  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }

bops中,也是启动了所有已经实例化的BPServiceActor,继续看BPServiceActor#start

  //This must be called only by BPOfferService
  void start() {
    if ((bpThread != null) && (bpThread.isAlive())) {
      //Thread is started already
      return;
    }
    bpThread = new Thread(this);
    bpThread.setDaemon(true); // needed for JUnit testing

    if (lifelineSender != null) {
      lifelineSender.start();
    }
    bpThread.start();
  }

可以看到启动了bpThreadlifelineSender,接下来按照代码执行顺序来看。

lifelineSender.start()

LifelineSender类是BPServiceActor的内部类,实现了Runnable, Closeable.

先看看LifelineSenderstart()方法:

    public void start() {
      // 创建一个线程,将LifelineSender这个内部类当做target参数传入
      lifelineThread = new Thread(this,
          formatThreadName("lifeline", lifelineNnAddr));
      // 设置为启动线程
      lifelineThread.setDaemon(true);
      lifelineThread.setUncaughtExceptionHandler(
          new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread thread, Throwable t) {
              LOG.error(thread + " terminating on unexpected exception", t);
            }
          });
      // 会调用LifelineSender的run()方法
      lifelineThread.start();
    }

函数中创建了一个守护线程,将自身作为参数传入后,调用了线程的start()方法,函数内部会调用传入的target参数的run()方法,接下来看看会被执行的LifelineSender#run方法:

   @Override
    public void run() {
      // The lifeline RPC depends on registration with the NameNode, so wait for initial registration to complete.
      // 生命线RPC依赖于向NameNode注册,所以要等待初始注册完成。
      while (shouldRun()) {
        try {
          initialRegistrationComplete.await();
          break;
        } catch (InterruptedException e) {
          // The only way thread interruption can happen while waiting on this
          // latch is if the state of the actor has been updated to signal
          // shutdown.  The next loop's call to shouldRun() will return false,
          // and the thread will finish.
          // 在等待这个锁存器的过程中,线程中断的唯一方式是行为体的状态已经被更新为关闭信号。 
          // 下一个循环对shouldRun()的调用将返回false,并且线程将结束。
          Thread.currentThread().interrupt();
        }
      }

      // After initial NameNode registration has completed, execute the main
      // loop for sending periodic lifeline RPCs if needed.  This is done in a
      // second loop to avoid a pointless wait on the above latch in every
      // iteration of the main loop.
      // 在初始的NameNode注册完成后,执行主循环以发送定期的生命线rpc(如果需要的话)。
      // 这是在第二个循环中完成的,以避免在主循环的每次迭代中对上述闩锁进行无意义的等待。
      while (shouldRun()) {
        try {
          if (lifelineNamenode == null) {
            lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
          }
          // 如果当前时间在发送Lifeline消息的周期时间内,则发送Lifeline消息
          sendLifelineIfDue();
          Thread.sleep(scheduler.getLifelineWaitTime());
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } catch (IOException e) {
          LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
              e);
        }
      }

      LOG.info("LifelineSender for " + BPServiceActor.this + " exiting.");
    }

方法中,会先阻塞住线程,等待初始注册完成(bpThread中的握手逻辑)后,会开始向NameNode发送生命线消息。

LifelineSender#sendLifelineIfDue

接下来具体看看生命线消息发送的逻辑:

    private void sendLifelineIfDue() throws IOException {
      // 获取当前发送时间
      long startTime = scheduler.monotonicNow();
      if (!scheduler.isLifelineDue(startTime)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
              + ", because it is not due.");
        }
        return;
      }
      if (dn.areHeartbeatsDisabledForTests()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
              + ", because heartbeats are disabled for tests.");
        }
        return;
      }
      // 发送生命线
      sendLifeline();
      // 进行Lifeline消息的metric统计
      dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime,
          getRpcMetricSuffix());
      // 设置下次发送时间
      scheduler.scheduleNextLifeline(scheduler.monotonicNow());
    }
    private void sendLifeline() throws IOException {
      // 获取Datanode存储利用率报告
      StorageReport[] reports =
          dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
      if (LOG.isDebugEnabled()) {
        LOG.debug("Sending lifeline with " + reports.length + " storage " +
                  " reports from service actor: " + BPServiceActor.this);
      }
      // 总结DataNode的数据卷故障信息
      VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
          .getVolumeFailureSummary();
      int numFailedVolumes = volumeFailureSummary != null ?
          volumeFailureSummary.getFailedStorageLocations().length : 0;
      // 发送生命线
      // 生命线相关的概念见这篇博客:https://blog.csdn.net/Androidlushangderen/article/details/53783641
      // namenode 处理见 -> NameNodeRpcServer#sendLifeline
      lifelineNamenode.sendLifeline(bpRegistration,
                                    reports,
                                    dn.getFSDataset().getCacheCapacity(),
                                    dn.getFSDataset().getCacheUsed(),
                                    dn.getXmitsInProgress(),
                                    dn.getXceiverCount(),
                                    numFailedVolumes,
                                    volumeFailureSummary);
    }

bpThread#start

bpThread的初始化逻辑new Thread(this);可以看出,传入参数为BPServiceActor,所以找到BPServiceActorrun()方法:

  // 无论出现哪种异常,都要继续尝试offerService()。这就是连接到NameNode并提供基本DataNode功能的循环
  // 只有当“shouldRun”或“shouldServiceRun”被关闭时才会停止,这可能发生在关机时或由于refreshnamenode。
  @Override
  public void run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        // init stuff
        try {
          // setup storage
          // 连接namenode,以及握手
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          // 初始握手、存储恢复或注册失败
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            // 重试,直到所有BPOS的namenode初始化失败
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }

      runningState = RunningState.RUNNING;
      // 握完手了,可以开始发送生命线了
      if (initialRegistrationComplete != null) {
        initialRegistrationComplete.countDown();
      }

      while (shouldRun()) {
        try {
          // 每个BP线程的主循环。运行直到关闭,永远调用远程NameNode函数。
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      // 被中断后会清理自身的连接等,最终会调用BlockPoolManager#remove卸载干净
      cleanUp();
    }
  }

方法中主要做了两件事儿,一个是连接namenode,进行握手。另一个是执行offerService方法,永远调用namenode,直到集群被终止掉。

BPServiceActor#connectToNNAndHandshake

握手的大致流程:

  private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    // 获得NameNode代理
    // DatanodeProtocolClientSideTranslatorPB类是客户端转换器,
    // 用于将在DatanodeProtocol接口上发出的请求转换为实现DatanodeProtocolPB的RPC服务器。
    bpNamenode = dn.connectToNN(nnAddr);

    // First phase of the handshake with NN - get the namespace info.
    // 与NN握手的第一个阶段 — 获取命名空间信息。
    NamespaceInfo nsInfo = retrieveNamespaceInfo();

    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    // 验证这是否与这个HA对中的其他NN相匹配。
    // 如果我们是这个BP的第一个NN连接,这也将初始化我们在DN中的块池。
    // 是这个BP的第一个NN连接。
    bpos.verifyAndSetNamespaceInfo(this, nsInfo);

    /* set thread name again to include NamespaceInfo when it's available. */
    // 再次设置线程名称,以便在 NamespaceInfo 可用时将其包括在内。
    this.bpThread.setName(formatThreadName("heartbeating", nnAddr));

    // Second phase of the handshake with the NN.
    // 与NN握手的第二个阶段
    register(nsInfo);
  }

第一阶段:

  // 执行与NameNode的握手的第一部分。这将调用versionRequest来确定NN的命名空间和版本信息。
  // 它会自动重试,直到NN响应或DN正在关闭。
  @VisibleForTesting
  NamespaceInfo retrieveNamespaceInfo() throws IOException {
    NamespaceInfo nsInfo = null;
    while (shouldRun()) {
      try {
        // 获取NamespaceInfo由名称-节点返回,以响应数据-节点握手
        nsInfo = bpNamenode.versionRequest();
        LOG.debug(this + " received versionRequest response: " + nsInfo);
        break;
      } catch(SocketTimeoutException e) {  // namenode is busy
        LOG.warn("Problem connecting to server: " + nnAddr);
      } catch(IOException e ) {  // namenode is not available
        LOG.warn("Problem connecting to server: " + nnAddr);
      }
      
      // try again in a second
      // 五秒后重试...这里官方注释应该是有问题
      sleepAndLogInterrupts(5000, "requesting version info from NN");
    }
    
    if (nsInfo != null) {
      checkNNVersion(nsInfo);
    } else {
      throw new IOException("DN shut down before block pool connected");
    }
    return nsInfo;
  }

第二阶段:

  // 在相应的NameNode上注册一个bp
  //bpDatanode需要在启动时向NameNode注册,以便
  // 1)报告它现在为哪个存储提供服务;
  // 2)接收由NameNode发出的注册ID,以识别已注册的数据节点。
  void register(NamespaceInfo nsInfo) throws IOException {
    // The handshake() phase loaded the block pool storage
    // off disk - so update the bpRegistration object from that info
    // handshake()阶段从磁盘上加载了区块池存储 - 所以根据该信息更新bpRegistration对象
    DatanodeRegistration newBpRegistration = bpos.createRegistration();

    LOG.info(this + " beginning handshake with NN");

    while (shouldRun()) {
      try {
        // Use returned registration from namenode with updated fields
        // 使用从namenode返回的注册,并更新字段
        newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
        newBpRegistration.setNamespaceInfo(nsInfo);
        bpRegistration = newBpRegistration;
        break;
      } catch(EOFException e) {  // namenode might have just restarted
        LOG.info("Problem connecting to server: " + nnAddr + " :"
            + e.getLocalizedMessage());
      } catch(SocketTimeoutException e) {  // namenode is busy
        LOG.info("Problem connecting to server: " + nnAddr);
      } catch(RemoteException e) {
        LOG.warn("RemoteException in register", e);
        throw e;
      } catch(IOException e) {
        LOG.warn("Problem connecting to server: " + nnAddr);
      }
      // Try again in a second
      sleepAndLogInterrupts(1000, "connecting to server");
    }

    if (bpRegistration == null) {
      throw new IOException("DN shut down before block pool registered");
    }

    LOG.info(this + " successfully registered with NN");
    // 在一个BPServiceActors与NN成功注册后,它调用这个函数来验证它所连接的NN与其他服务于块池的NN是一致的。
    bpos.registrationSucceeded(this, bpRegistration);

    // reset lease id whenever registered to NN.
    // ask for a new lease id at the next heartbeat.
    fullBlockReportLeaseId = 0;

    // random short delay - helps scatter the BR from all DNs
    // 随机短延迟-帮助BR从所有DNs分散
    scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true);
  }

BPServiceActor#offerService

在这个方法中,会持续不断的向namenode发送心跳和块使用报告。

同时也会在启动时发送全量报告(FBR),发送后就睡眠等下一次心跳时继续发送。

  private void offerService() throws Exception {
    LOG.info("For namenode " + nnAddr + " using"
        + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msecs"
        + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msecs"
        + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msecs"
        + "; heartBeatInterval=" + dnConf.heartBeatInterval
        + (lifelineSender != null ?
            "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));

    //
    // Now loop for a long time....
    //
    while (shouldRun()) {
      try {
        DataNodeFaultInjector.get().startOfferService();
        final long startTime = scheduler.monotonicNow();

        //
        // Every so often, send heartbeat or block-report
        // 每隔一段时间,发送心跳或块报告
        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
        HeartbeatResponse resp = null;
        if (sendHeartbeat) {
          //
          // All heartbeat messages include following info:
          // -- Datanode name
          // -- data transfer port
          // -- Total capacity
          // -- Bytes remaining
          // 所有心跳信息包括以下信息:
          // Datanode的名字、数据传输端口、总容量、剩余字节数
          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          if (!dn.areHeartbeatsDisabledForTests()) {
            // 发送心跳
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
                LOG.warn(nnAddr + " sent back a full block report lease " +
                        "ID of 0x" +
                        Long.toHexString(resp.getFullBlockReportLeaseId()) +
                        ", but we already have a lease ID of 0x" +
                        Long.toHexString(fullBlockReportLeaseId) + ". " +
                        "Overwriting old lease ID.");
              }
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
                getRpcMetricSuffix());

            // If the state of this NN has changed (eg STANDBY->ACTIVE) then let the BPOfferService update itself.
            //
            // Important that this happens before processCommand below,
            // since the first heartbeat to a new active might have commands that we should actually process.
            // 如果这个NN的状态发生了变化(例如STANDBY->ACTIVE),那么让BPOfferService自己更新。
            // 重要的是,这发生在下面的processCommand之前,因为对一个新活动的第一次心跳可能有我们应该实际处理的命令。
            bpos.updateActorStatesFromHeartbeat(
                this, resp.getNameNodeHaState());
            state = resp.getNameNodeHaState().getState();

            if (state == HAServiceState.ACTIVE) {
              handleRollingUpgradeStatus(resp);
            }
            commandProcessingThread.enqueue(resp.getCommands());
          }
        }
        if (!dn.areIBRDisabledForTests() &&
            (ibrManager.sendImmediately()|| sendHeartbeat)) {
          // 发送IBRs到namenode
          ibrManager.sendIBRs(bpNamenode, bpRegistration,
              bpos.getBlockPoolId(), getRpcMetricSuffix());
        }
        // DatanodeCommand:数据节点命令的基类。由名称-节点发出,通知数据节点应该做什么。
        List<DatanodeCommand> cmds = null;
        boolean forceFullBr =
            scheduler.forceFullBlockReport.getAndSet(false);
        if (forceFullBr) {
          LOG.info("Forcing a full block report to " + nnAddr);
        }
        if ((fullBlockReportLeaseId != 0) || forceFullBr) {
          // 向Namenode报告全量列表块
          cmds = blockReport(fullBlockReportLeaseId);
          fullBlockReportLeaseId = 0;
        }
        commandProcessingThread.enqueue(cmds);

        if (!dn.areCacheReportsDisabledForTests()) {
          // 发送缓存报告
          DatanodeCommand cmd = cacheReport();
          commandProcessingThread.enqueue(cmd);
        }

        if (sendHeartbeat) {
          dn.getMetrics().addHeartbeatTotal(
              scheduler.monotonicNow() - startTime, getRpcMetricSuffix());
        }

        // There is no work to do;  sleep until hearbeat timer elapses,  or work arrives, and then iterate again.
        // 没有工作可做;睡觉直到心跳计时器结束,或者工作到来,然后再重复。
        ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
      } catch(RemoteException re) {
        String reClass = re.getClassName();
        if (UnregisteredNodeException.class.getName().equals(reClass) ||
            DisallowedDatanodeException.class.getName().equals(reClass) ||
            IncorrectVersionException.class.getName().equals(reClass)) {
          LOG.warn(this + " is shutting down", re);
          shouldServiceRun = false;
          return;
        }
        LOG.warn("RemoteException in offerService", re);
        sleepAfterException();
      } catch (IOException e) {
        LOG.warn("IOException in offerService", e);
        sleepAfterException();
      } finally {
        DataNodeFaultInjector.get().endOfferService();
      }
      processQueueMessages();
    } // while (shouldRun())
  } // offerService

doRefreshNamenodes 步骤4

步骤4就是关闭不需要的名称服务,注意可以看一下为什么会自动调用remove():

    // Step 4. Shut down old nameservices. This happens outside
    // of the synchronized(this) lock since they need to call
    // back to .remove() from another thread
    // 步骤4:关闭旧的名称服务。这发生在synchronized(This)锁之外,因为它们需要从另一个线程回调.remove()
    if (!toRemove.isEmpty()) {
      LOG.info("Stopping BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRemove));
      
      for (String nsToRemove : toRemove) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
        bpos.stop();
        bpos.join();
        // they will call remove on their own
        // 这里的执行逻辑大概描述如下:
        // bpos.stop() -> actor.stop(); -> shouldServiceRun = false;
        // bpos.join() -> actor.join(); -> bpThread.join();
        // -> BPServiceActor#run 方法中 shouldRun() 返回false,执行finally中的 BPServiceActor#cleanUp
        // -> BPOfferService#shutdownActor -> DataNode#shutdownBlockPool -> BlockPoolManager#remove
      }
    }

doRefreshNamenodes 步骤5

// Step 5. Update nameservices whose NN list has changed
// 步骤5:更新NN列表已更改的名称服务
if (!toRefresh.isEmpty()) {
  // 正在刷新名称服务的nn列表
  LOG.info("Refreshing list of NNs for nameservices: " +
      Joiner.on(",").useForNull("<default>").join(toRefresh));
  
  for (String nsToRefresh : toRefresh) {
    BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
    Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
    Map<String, InetSocketAddress> nnIdToLifelineAddr =
        lifelineAddrMap.get(nsToRefresh);
    ArrayList<InetSocketAddress> addrs =
        Lists.newArrayListWithCapacity(nnIdToAddr.size());
    ArrayList<InetSocketAddress> lifelineAddrs =
        Lists.newArrayListWithCapacity(nnIdToAddr.size());
    ArrayList<String> nnIds = Lists.newArrayListWithCapacity(
        nnIdToAddr.size());
    for (String nnId : nnIdToAddr.keySet()) {
      addrs.add(nnIdToAddr.get(nnId));
      lifelineAddrs.add(nnIdToLifelineAddr != null ?
          nnIdToLifelineAddr.get(nnId) : null);
      nnIds.add(nnId);
    }
    try {
      UserGroupInformation.getLoginUser()
          .doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }
}

可以看到除了组装需要的参数之外,方法中重点是调用了bpos#refreshNNList,方法中是先增后删的方式更新。

  void refreshNNList(String serviceId, List<String> nnIds,
      ArrayList<InetSocketAddress> addrs,
      ArrayList<InetSocketAddress> lifelineAddrs) throws IOException {
    Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
    for (BPServiceActor actor : bpServices) {
      oldAddrs.add(actor.getNNSocketAddress());
    }
    Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
    
    // Process added NNs
    Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
    for (InetSocketAddress addedNN : addedNNs) {
      BPServiceActor actor = new BPServiceActor(serviceId,
          nnIds.get(addrs.indexOf(addedNN)), addedNN,
          lifelineAddrs.get(addrs.indexOf(addedNN)), this);
      actor.start();
      bpServices.add(actor);
    }

    // Process removed NNs
    Set<InetSocketAddress> removedNNs = Sets.difference(oldAddrs, newAddrs);
    for (InetSocketAddress removedNN : removedNNs) {
      for (BPServiceActor actor : bpServices) {
        if (actor.getNNSocketAddress().equals(removedNN)) {
          actor.stop();
          shutdownActor(actor);
          break;
        }
      }
    }
  }

心跳机制的大概源码如上...留得坑诸如具体datanode与namenode交互过程中具体的处理逻辑,和namenode向datanode发送命令和datanode执行等等,之后有空再继续补充!

原文地址:https://www.cnblogs.com/zuojing/p/15039167.html