hadoop2.7.3源码解析之HA架构分析

时间:2022-07-25
本文章向大家介绍hadoop2.7.3源码解析之HA架构分析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

@

  • ZKFailoverController
    • 概述
    • 启动
  • HealthMonitor
  • ActiveStandbyElector

整体架构概述

在hadoop 1.0的时候,hadoop集群只有一个namenode,一旦namenode挂掉了,整个集群就会不可用,hadoop 的HA机制(High Availability)就是为了解决上述问题而产生的。

在HA机制中,总共会有两个namenode,一个是active的,对外提供服务,一个是standby状态,作为备份的namenode。datanode会同时向两个namenode发送注册,块汇报等信息,以保持两个namenode的命名空间的数据的一致性,以便在出现问题的时候可以快速切换。但是datanode只会执行active的namenode的命令。

为了使active namenode和standby namenode能够保持命名空间的数据一致,他们会与一组独立的日志节点JournalNode交互(org.apache.hadoop.hdfs.qjournal.server.JournalNode),当遇到需要更新editlog的时候,active namenode会同时向这些JournalNode写日志,记录更改的内容,而stanby节点会定期的从这些JournalNode来更新日志,同步自己的命名空间,来保持和active namenode数据的一致性。

ZKFailoverController作为主备namenode的控制器,负责对namenode进行监控和主备切换,目前这个切换是依赖于zookeeper的,当然目前也支持手工切换主备namenode。

整体的架构图如下所示:

这里写图片描述

具体分析

ZKFailoverController

概述

ZKFailoverController是作为一个独立的线程启动的,在hadoop集群的bin目录的hdfs脚本中,我们看到具体的启动类是ZKFailoverController 的子类DFSZKFailoverController。

在ZKFailoverController的内部维护着两个重要的类,HealthMonitor和ActiveStandbyElector ,HealthMonitor主要用于namenode的健康检查,一旦检查到active的namenode出现异常,就会调用ActiveStandbyElector相关方法进行主备选举,选举成功之后,进行相应的主备切换。

启动

启动方法自然是DFSZKFailoverController的main方法.

  public static void main(String args[])
      throws Exception {
    if (DFSUtil.parseHelpArgument(args, 
        ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    
    GenericOptionsParser parser = new GenericOptionsParser(
        new HdfsConfiguration(), args);
    //构造了DFSZKFailoverController对象
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(
        parser.getConfiguration());
    int retCode = 0;
    try {
      //run方法其实是调用了父类的run方法。
      retCode = zkfc.run(parser.getRemainingArgs());
    } catch (Throwable t) {
      LOG.fatal("Got a fatal error, exiting now", t);
    }
    System.exit(retCode);
  }

通过跟踪代码最终的启动方法在父类的doRun中。

具体的代码如下:

  private int doRun(String[] args)
      throws HadoopIllegalArgumentException, IOException, InterruptedException {
    try {
      initZK();
    } catch (KeeperException ke) {
      LOG.fatal("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.");
      return ERR_CODE_NO_ZK;
    }



    initRPC();
    initHM();
    startRPC();
    try {
      mainLoop();
    } finally {
      rpcServer.stopAndJoin();
      
      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }

在这里,首先会通过initZK方法初始化zookeeper,在initZK方法的底部,构造了用于主备选举的类


    elector = new ActiveStandbyElector(zkQuorum,
        zkTimeout, getParentZnode(), zkAcls, zkAuths,
        new ElectorCallbacks(), maxRetryNum);

接下来通过initRPC()方法初始化相应的rpc服务,然后通过initHM()方法来初始化HealthMonitor,并加入回调函数,HealthMonitor监测namenode的状态,通过这个回调函数来通知ZKFailoverController,以便做相应的主备选举工作。

  private void initHM() {
    healthMonitor = new HealthMonitor(conf, localTarget);
    healthMonitor.addCallback(new HealthCallbacks());
    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
    healthMonitor.start();
  }

HealthMonitor

启动的时候会启动一个内部线程MonitorDaemon去监测namenode的运行状态,主要状态是HealthMonitor内部的HealthMonitor.State 中的几个状态


  @InterfaceAudience.Private
  public enum State {
    /**
     * The health monitor is still starting up. 正在启动,初始化中
     */
    INITIALIZING,

    /**
     * The service is not responding to health check RPCs.无响应
     */
    SERVICE_NOT_RESPONDING,

    /**
     * The service is connected and healthy. 可以连接并且处于健康状态
     */
    SERVICE_HEALTHY,
    
    /**
     * The service is running but unhealthy. 服务运行但是处于不健康状态
     */
    SERVICE_UNHEALTHY,
    
    /**
     * 监控线程自身有问题
     * The health monitor itself failed unrecoverably and can
     * no longer provide accurate information.
     */
    HEALTH_MONITOR_FAILED;
  }

在MonitorDaemon的run方法中

    @Override
    public void run() {
      while (shouldRun) {
        try { 
          //尝试去连接知道连上为止
          loopUntilConnected();
          //处理健康检查
          doHealthChecks();
        } catch (InterruptedException ie) {
          Preconditions.checkState(!shouldRun,
              "Interrupted but still supposed to run");
        }
      }
    }

进入doHealthChecks方法.

  private void doHealthChecks() throws InterruptedException {
    while (shouldRun) {
      HAServiceStatus status = null;
      boolean healthy = false;
      try {
        status = proxy.getServiceStatus();
        
        //调用和namenode交互的协议HAServiceProtocol的monitorHealth方法发送远程的rpc请求检查namenode的状态,具体的实现方法是NameNodeRpcServer中的同名方法
        
        proxy.monitorHealth();
        healthy = true;
      } catch (Throwable t) {
        if (isHealthCheckFailedException(t)) {
          //服务运行但是出于不健康状态
          LOG.warn("Service health check failed for " + targetToMonitor
              + ": " + t.getMessage());
          enterState(State.SERVICE_UNHEALTHY);
        } else {
         //无响应
          LOG.warn("Transport-level exception trying to monitor health of " +
              targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
          RPC.stopProxy(proxy);
          proxy = null;
          enterState(State.SERVICE_NOT_RESPONDING);
          Thread.sleep(sleepAfterDisconnectMillis);
          return;
        }
      }
      
      if (status != null) {
        setLastServiceStatus(status);
      }
      正常状态
      if (healthy) {
        enterState(State.SERVICE_HEALTHY);
      }

      Thread.sleep(checkIntervalMillis);
    }
  }

不管是健康状态还是不健康状态,都会调用enterState方法,在enterState方法中,会对新旧进行判断,如果不相等,则对调用相应的回调方法进行处理。 比如原来的状态是健康状态,后来namenode挂掉了,则是非健康状态了,这时候就要调用ZKFailoverController的相应方法进行主备选举了。


  private synchronized void enterState(State newState) {
    if (newState != state) {
      LOG.info("Entering state " + newState);
      state = newState;
      synchronized (callbacks) {
        for (Callback cb : callbacks) {
          cb.enteredState(newState);
        }
      }
    }
  }

ActiveStandbyElector

当HealthMonitor检测到active 的状态和上次不一样的时候,会回调ZKFailoverController中的enteredState方法,

  /**
   * Callbacks from HealthMonitor
   */
  class HealthCallbacks implements HealthMonitor.Callback {
    @Override
    public void enteredState(HealthMonitor.State newState) {
      setLastHealthState(newState);
      recheckElectability();
    }
  }
  

在recheckElectability中通过switch来判断当前的状态,如果当前namenode是cHealthMonitor.State.SERVICE_HEALTHY状态,则调用ActiveStandbyElector的joinElectio方法进行namenode的主备选举工作,最终调用了方法在zookeeper上创建了一个临时节点。

  private void createLockNodeAsync() {
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
        this, zkClient);
  }

在zk上创建节点之后,会回调org.apache.hadoop.ha.ActiveStandbyElector.processResult(int, String, Object, String)方法,来进行相应的处理,如果是选主成功,则调用becomeActive方法将自己变成active节点,否则调用becomeStandby变成standby节点


  /**
   * interface implementation of Zookeeper callback for create
   */
  @Override
  public synchronized void processResult(int rc, String path, Object ctx,
      String name) {
    if (isStaleClient(ctx)) return;
    LOG.debug("CreateNode result: " + rc + " for path: " + path
        + " connectionState: " + zkConnectionState +
        "  for " + this);

    Code code = Code.get(rc);
    if (isSuccess(code)) {
      // we successfully created the znode. we are the leader. start monitoring
      if (becomeActive()) {
        monitorActiveStatus();
      } else {
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }

    if (isNodeExists(code)) {
      if (createRetryCount == 0) {
        ............
        becomeStandby();
      }
........................
    }

    .............................
  }

在becomeActive中,会先隔离原来的的active的namenode,然后回调ZKFailoverController.ElectorCallbacks中的becomeActive,然后通过和namenode交互的协议HAServiceProtocol中的transitionToActive方法转换为active的namenode。

becomeStandby会回调 ZKFailoverController 的 becomeStandby 方法,然后调用 HAServiceProtocol接口的 transitionToStandby 方法,将 NameNode 转换为 Standby 状态。