Hadoop2.8.5 容器的投运

时间:2019-01-10
本文章向大家介绍Hadoop2.8.5 容器的投运,主要包括Hadoop2.8.5 容器的投运使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

上一篇我们考察了YARN调度系统的容器周转和分配,RM受理作业后就为该作业分配容器,最后由发射架将容器发送到对岸的NodeManager上,现在我们来看NM收到容器后如何启动JVM并创建AM作为作业的领头人,之后的事情就交给了AM。今天我们就来考察容器投运到NM这一过程。为了投运一个作业,RM 首先得在某个NodeManag er 节点上启动一个进程作为这个作业的主管,扮演类似于项目组长那样的角色,这就是 ApplicationMaster 即 AM ,对于 MapReduce作业就是 MRAppMaster 。后者是前者的一种特例,其 Application 是专门进行 MapReduce 计算的 App 。然而,虽然这个 App 本身是多任务的作业,其 ApplicationMaster 或 MRAppMaster 却只是单任务的作业,所以 ApplicationMaster 或 MRAppMaster 的投运其实只相当于一个单任务的投运。而且, RM 之于 ApplicationMaster 的投运又恰如 ApplicationMaster 之于具体计算任务(如 MapTask )的投运,其作用和过程都很相似。

1. 客户端(RM)投运容器

resourcemanager\amlauncher\ApplicationMasterLauncher.java
AM发射架,发射容器

public class ApplicationMasterLauncher extends AbstractService implements EventHandler<AMLauncherEvent> {
  
  // createRunnableLauncher : 创建了AMLauncher任务
  private void launch(RMAppAttempt application) {
    Runnable launcher = createRunnableLauncher(application, 
        AMLauncherEventType.LAUNCH);
    masterEvents.add(launcher);
  }
  
  private class LauncherThread extends Thread {
    @Override
    public void run() {
      while (!this.isInterrupted()) {
        Runnable toLaunch;
        try {
          toLaunch = masterEvents.take();
          launcherPool.execute(toLaunch); //AMLauncher.run()
        } catch (InterruptedException e) {
          return;
        }
      }
    }
  }    
  
}

server\resourcemanager\amlauncher\AMLauncher.java
容器发射任务

public class AMLauncher implements Runnable {
  //RPC通讯协议
  private ContainerManagementProtocol containerMgrProxy;
  
  @SuppressWarnings("unchecked")
  public void run() {
    switch (eventType) {
    case LAUNCH:
      try {
        launch();
        //AppAttempt 转入 LAUNCHED,RMAppAttemptImpl 会启动一个 AMLivelinessMonitor 监视 AM 是否存活
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));
      } catch(Exception ie) {
        ......
      }
      break;
    }
  }
  
  //发射容器
  private void launch() throws IOException, YarnException {
    connect(); //RPC
    ContainerId masterContainerID = masterContainer.getId();
    //ApplicationSubmissionContext 来自用户的作业提交
    ApplicationSubmissionContext applicationContext =application.getSubmissionContext(); 
    // ContainerLaunchContext 用于让 NM 节点创建进程以执行任务
    ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID);
    //RPC请求
    StartContainerRequest scRequest = StartContainerRequest.newInstance(launchContext,
          masterContainer.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
    StartContainersRequest allRequests = StartContainersRequest.newInstance(list);
    //RPC启动容器
    StartContainersResponse response = containerMgrProxy.startContainers(allRequests);
    }
  }
  
  private void connect() throws IOException {
    ContainerId masterContainerID = masterContainer.getId();
    containerMgrProxy = getContainerMgrProxy(masterContainerID);
  }

  protected ContainerManagementProtocol getContainerMgrProxy(final ContainerId containerId) {
    //主容器中有NM的 NodeId
    final NodeId node = masterContainer.getNodeId();
    //还有对方的 IP 地址和端口号
    final InetSocketAddress containerManagerConnectAddress =
        NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
    //创建底层 RPC
    final YarnRPC rpc = getYarnRPC();
    //返回代理,这个时候NM作为SERVER端
    //CLIENT : ContainerManagementProtocolPBClientImpl
    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
        currentUser, rpc, containerManagerConnectAddress);
  }
}

ContainerManagementProtocolPBClientImpl.java

public class ContainerManagementProtocolPBClientImpl implements ContainerManagementProtocol, Closeable {
  //Client端,启动容器
  public StartContainersResponse startContainers(StartContainersRequest requests) throws IOException {
    //获取请求报文协议
    StartContainersRequestProto requestProto = ((StartContainersRequestPBImpl) requests).getProto();
    try {
      //由代理执行: proxy : StartContainersRequestPBImpl
      return new StartContainersResponsePBImpl(proxy.startContainers(null, requestProto));
    } catch (ServiceException e) {
      return null;
    }
  }
}

2. 服务端(NM)启动容器

ContainerManagementProtocolPBServiceImpl.java

public class ContainerManagementProtocolPBServiceImpl implements ContainerManagementProtocolPB  {
  @Override
  public StartContainersResponseProto startContainers(RpcController arg0,
      StartContainersRequestProto proto) throws ServiceException {
    StartContainersRequestPBImpl request = new StartContainersRequestPBImpl(proto);
    try {
      //real :ContainerManagerImpl
      StartContainersResponse response = real.startContainers(request);
      return ((StartContainersResponsePBImpl)response).getProto();
    } catch (YarnException e) {
      throw new ServiceException(e);
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }
}

ContainerManagerImpl.java

public class ContainerManagerImpl extends CompositeService implements ContainerManagementProtocol,
    EventHandler<ContainerManagerEvent> {
    //启动容器
    public StartContainersResponse startContainers(
      StartContainersRequest requests) throws YarnException, IOException {
    synchronized (this.context) {
      for (StartContainerRequest request : requests
          .getStartContainerRequests()) {
        ContainerId containerId = null;
        try {
          //处理请求
          startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
              request);
          succeededContainers.add(containerId);
        } catch (YarnException e) {
        }
      }
      return StartContainersResponse
          .newInstance(getAuxServiceMetaData(), succeededContainers,
              failedContainers);
    }
  }
  
  //初始化容器
  private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
      ContainerTokenIdentifier containerTokenIdentifier,
      StartContainerRequest request) throws YarnException, IOException {
    //容器ID
    ContainerId containerId = containerTokenIdentifier.getContainerID();
    String containerIdStr = containerId.toString();
    String user = containerTokenIdentifier.getApplicationSubmitter();
    //从 request 中恢复出 ContainerLaunchContext ,即 CLC
    ContainerLaunchContext launchContext = request.getContainerLaunchContext();
    //在 NM 节点上创建一个 ContainerImpl 对象
    Container container = new ContainerImpl(getConfig(), this.dispatcher,
            launchContext, credentials, metrics, containerTokenIdentifier, context);
    this.readLock.lock();
    try {
      if (!serviceStopped) {
        // 创建一个 ApplicationImpl注意这个 dispatcher 就是 ContainerManagerImpl.dispatcher
        Application application =
            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
        if (null == context.getApplications().putIfAbsent(applicationID, application)) {
          context.getNMStateStore().storeApplication(applicationID,
              buildAppProto(applicationID, user, credentials, appAcls,
                logAggregationContext));
          //产生一个 INIT _ APPLICATION 事件,用来触发这个 ApplicationImpl 的状态机
          dispatcher.getEventHandler().handle(
            new ApplicationInitEvent(applicationID, appAcls,
              logAggregationContext));
        }

        this.context.getNMStateStore().storeContainer(containerId,
            containerTokenIdentifier.getVersion(), request);
         //用来运载这个新建的 Container,产生一个 ApplicationEventType.INIT _ CONTAINER 事件
         //把 Container 交给 ApplicationImpl
        dispatcher.getEventHandler().handle(
          new ApplicationContainerInitEvent(container));
        //用于统计目的
        metrics.launchedContainer();
        metrics.allocateContainer(containerTokenIdentifier.getResource());
      }
    } finally {
      this.readLock.unlock();
    }
  }
}

server\nodemanager\containermanager\application\ApplicationImpl.java

public class ApplicationImpl implements Application {
    //跳变规则
	addTransition ( ApplicationState.NEW , ApplicationState.INITING ,
                    ApplicationEventType.INIT _ APPLICATION , newAppInitTransition ())
                    
    static class AppInitTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
      app.applicationACLs = initEvent.getApplicationACLs();
      app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
      // 触发日志服务的状态机,启动具体 App 的运行日志( LOG )机制
      app.logAggregationContext = initEvent.getLogAggregationContext();
      app.dispatcher.getEventHandler().handle(
          new LogHandlerAppStartedEvent(app.appId, app.user,
              app.credentials, app.applicationACLs,
              app.logAggregationContext));
    }
  }

  ddTransition ( ApplicationState.INITING , ApplicationState.INITING ,
				 ApplicationEventType.INIT _ CONTAINER ,newInitContainerTransition ())
				 
  static class InitContainerTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      ApplicationContainerInitEvent initEvent =
        (ApplicationContainerInitEvent) event;
      Container container = initEvent.getContainer();
      //将 container 放在这个 ApplicationImpl 的 containers 集合中
      app.containers.put(container.getContainerId(), container);
      ......
    }
  }
  
  //启动资源定位服务
  static class AppLogInitDoneTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      app.dispatcher.getEventHandler().handle(
          new ApplicationLocalizationEvent(
              LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
    }
  }
}

server\nodemanager\containermanager\localizer\ResourceLocalizationService.java
资源定位服务,处理本地资源跟踪

public class ResourceLocalizationService extends CompositeService
    implements EventHandler<LocalizationEvent>, LocalizationProtocol {
  //处理事务
  public void handle(LocalizationEvent event) {
    // TODO: create log dir as $logdir/$user/$appId
    switch (event.getType()) {
    case INIT_APPLICATION_RESOURCES:
      handleInitApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    case INIT_CONTAINER_RESOURCES:
      handleInitContainerResources((ContainerLocalizationRequestEvent) event);
      break;
    case CONTAINER_RESOURCES_LOCALIZED:
      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
      break;
    case CACHE_CLEANUP:
      handleCacheCleanup();
      break;
    case CLEANUP_CONTAINER_RESOURCES:
      handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
      break;
    case DESTROY_APPLICATION_RESOURCES:
      handleDestroyApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    default:
      throw new YarnRuntimeException("Unknown localization event: " + event);
    }
  }
  
  //LocalResourcesTrackerImpl 构造函数的第四个参数是 useLocalCacheDirectoryManager ,意为
  //是否使用 LocalCacheDirectoryManager ,即本地的缓存目录管理
  @SuppressWarnings("unchecked")
  private void handleInitApplicationResources(Application app) {
    String userName = app.getUser();
    //创建一个 LocalResourcesTrackerImpl 对象,并将其放在 privateRsrc 集合中
    privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
        null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
    String appIdStr = app.getAppId().toString();
    //再创建一个 LocalResourcesTrackerImpl 对象,并将其放在 appRsrc 集合中
    appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
        app.getAppId(), dispatcher, false, super.getConfig(), stateStore,
        dirsHandler));
     //实际上是 ApplicationEventType.APPLICATION _ INITED
    dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
          app.getAppId()));
  }
}
public class ApplicationImpl implements Application {
   //跳变规则
   addTransition ( ApplicationState.INITING , ApplicationState.RUNNING ,
                   ApplicationEventType.APPLICATION _ INITED , newAppInitDoneTransition ())
  @SuppressWarnings("unchecked")
   static class AppInitDoneTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      // 对于 containers 集合中的每个容器, 产生一个 INIT _ CONTAINER 事件
      for (Container container : app.containers.values()) {
        app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
              container.getContainerId()));
      }
    }
  }
}

server\nodemanager\containermanager\container\ContainerImpl.java
容器初始化

public class ContainerImpl implements Container {
   //跳变规则
  addTransition(ContainerState.NEW, EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
                 ContainerState.LOCALIZATION_FAILED, ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
      //容器的恢复已经完成(以前保存了容器内容,但是执行失败,需要重执)
      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
        //发送 ApplicationContainerFinishedEvent 等事件
        container.sendFinishedEvents();
        return ContainerState.DONE;
      } else if (container.recoveredAsKilled &&
          container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
        // container was killed but never launched
        container.metrics.killedContainer();
        container.metrics.releaseContainer(container.resource);