Hadoop2.8.5 容器的投运
上一篇我们考察了YARN调度系统的容器周转和分配,RM受理作业后就为该作业分配容器,最后由发射架将容器发送到对岸的NodeManager上,现在我们来看NM收到容器后如何启动JVM并创建AM作为作业的领头人,之后的事情就交给了AM。今天我们就来考察容器投运到NM这一过程。为了投运一个作业,RM 首先得在某个NodeManag er 节点上启动一个进程作为这个作业的主管,扮演类似于项目组长那样的角色,这就是 ApplicationMaster 即 AM ,对于 MapReduce作业就是 MRAppMaster 。后者是前者的一种特例,其 Application 是专门进行 MapReduce 计算的 App 。然而,虽然这个 App 本身是多任务的作业,其 ApplicationMaster 或 MRAppMaster 却只是单任务的作业,所以 ApplicationMaster 或 MRAppMaster 的投运其实只相当于一个单任务的投运。而且, RM 之于 ApplicationMaster 的投运又恰如 ApplicationMaster 之于具体计算任务(如 MapTask )的投运,其作用和过程都很相似。
1. 客户端(RM)投运容器
resourcemanager\amlauncher\ApplicationMasterLauncher.java
AM发射架,发射容器
public class ApplicationMasterLauncher extends AbstractService implements EventHandler<AMLauncherEvent> {
// createRunnableLauncher : 创建了AMLauncher任务
private void launch(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
}
private class LauncherThread extends Thread {
@Override
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;
try {
toLaunch = masterEvents.take();
launcherPool.execute(toLaunch); //AMLauncher.run()
} catch (InterruptedException e) {
return;
}
}
}
}
}
server\resourcemanager\amlauncher\AMLauncher.java
容器发射任务
public class AMLauncher implements Runnable {
//RPC通讯协议
private ContainerManagementProtocol containerMgrProxy;
@SuppressWarnings("unchecked")
public void run() {
switch (eventType) {
case LAUNCH:
try {
launch();
//AppAttempt 转入 LAUNCHED,RMAppAttemptImpl 会启动一个 AMLivelinessMonitor 监视 AM 是否存活
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
......
}
break;
}
}
//发射容器
private void launch() throws IOException, YarnException {
connect(); //RPC
ContainerId masterContainerID = masterContainer.getId();
//ApplicationSubmissionContext 来自用户的作业提交
ApplicationSubmissionContext applicationContext =application.getSubmissionContext();
// ContainerLaunchContext 用于让 NM 节点创建进程以执行任务
ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID);
//RPC请求
StartContainerRequest scRequest = StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest.newInstance(list);
//RPC启动容器
StartContainersResponse response = containerMgrProxy.startContainers(allRequests);
}
}
private void connect() throws IOException {
ContainerId masterContainerID = masterContainer.getId();
containerMgrProxy = getContainerMgrProxy(masterContainerID);
}
protected ContainerManagementProtocol getContainerMgrProxy(final ContainerId containerId) {
//主容器中有NM的 NodeId
final NodeId node = masterContainer.getNodeId();
//还有对方的 IP 地址和端口号
final InetSocketAddress containerManagerConnectAddress =
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
//创建底层 RPC
final YarnRPC rpc = getYarnRPC();
//返回代理,这个时候NM作为SERVER端
//CLIENT : ContainerManagementProtocolPBClientImpl
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
currentUser, rpc, containerManagerConnectAddress);
}
}
ContainerManagementProtocolPBClientImpl.java
public class ContainerManagementProtocolPBClientImpl implements ContainerManagementProtocol, Closeable {
//Client端,启动容器
public StartContainersResponse startContainers(StartContainersRequest requests) throws IOException {
//获取请求报文协议
StartContainersRequestProto requestProto = ((StartContainersRequestPBImpl) requests).getProto();
try {
//由代理执行: proxy : StartContainersRequestPBImpl
return new StartContainersResponsePBImpl(proxy.startContainers(null, requestProto));
} catch (ServiceException e) {
return null;
}
}
}
2. 服务端(NM)启动容器
ContainerManagementProtocolPBServiceImpl.java
public class ContainerManagementProtocolPBServiceImpl implements ContainerManagementProtocolPB {
@Override
public StartContainersResponseProto startContainers(RpcController arg0,
StartContainersRequestProto proto) throws ServiceException {
StartContainersRequestPBImpl request = new StartContainersRequestPBImpl(proto);
try {
//real :ContainerManagerImpl
StartContainersResponse response = real.startContainers(request);
return ((StartContainersResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
ContainerManagerImpl.java
public class ContainerManagerImpl extends CompositeService implements ContainerManagementProtocol,
EventHandler<ContainerManagerEvent> {
//启动容器
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
synchronized (this.context) {
for (StartContainerRequest request : requests
.getStartContainerRequests()) {
ContainerId containerId = null;
try {
//处理请求
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
request);
succeededContainers.add(containerId);
} catch (YarnException e) {
}
}
return StartContainersResponse
.newInstance(getAuxServiceMetaData(), succeededContainers,
failedContainers);
}
}
//初始化容器
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest request) throws YarnException, IOException {
//容器ID
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIdStr = containerId.toString();
String user = containerTokenIdentifier.getApplicationSubmitter();
//从 request 中恢复出 ContainerLaunchContext ,即 CLC
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
//在 NM 节点上创建一个 ContainerImpl 对象
Container container = new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics, containerTokenIdentifier, context);
this.readLock.lock();
try {
if (!serviceStopped) {
// 创建一个 ApplicationImpl注意这个 dispatcher 就是 ContainerManagerImpl.dispatcher
Application application =
new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID, application)) {
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext));
//产生一个 INIT _ APPLICATION 事件,用来触发这个 ApplicationImpl 的状态机
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, appAcls,
logAggregationContext));
}
this.context.getNMStateStore().storeContainer(containerId,
containerTokenIdentifier.getVersion(), request);
//用来运载这个新建的 Container,产生一个 ApplicationEventType.INIT _ CONTAINER 事件
//把 Container 交给 ApplicationImpl
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
//用于统计目的
metrics.launchedContainer();
metrics.allocateContainer(containerTokenIdentifier.getResource());
}
} finally {
this.readLock.unlock();
}
}
}
server\nodemanager\containermanager\application\ApplicationImpl.java
public class ApplicationImpl implements Application {
//跳变规则
addTransition ( ApplicationState.NEW , ApplicationState.INITING ,
ApplicationEventType.INIT _ APPLICATION , newAppInitTransition ())
static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// 触发日志服务的状态机,启动具体 App 的运行日志( LOG )机制
app.logAggregationContext = initEvent.getLogAggregationContext();
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, app.applicationACLs,
app.logAggregationContext));
}
}
ddTransition ( ApplicationState.INITING , ApplicationState.INITING ,
ApplicationEventType.INIT _ CONTAINER ,newInitContainerTransition ())
static class InitContainerTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer();
//将 container 放在这个 ApplicationImpl 的 containers 集合中
app.containers.put(container.getContainerId(), container);
......
}
}
//启动资源定位服务
static class AppLogInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}
}
}
server\nodemanager\containermanager\localizer\ResourceLocalizationService.java
资源定位服务,处理本地资源跟踪
public class ResourceLocalizationService extends CompositeService
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
//处理事务
public void handle(LocalizationEvent event) {
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
handleInitApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
case INIT_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CONTAINER_RESOURCES_LOCALIZED:
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
break;
case CACHE_CLEANUP:
handleCacheCleanup();
break;
case CLEANUP_CONTAINER_RESOURCES:
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
break;
case DESTROY_APPLICATION_RESOURCES:
handleDestroyApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
default:
throw new YarnRuntimeException("Unknown localization event: " + event);
}
}
//LocalResourcesTrackerImpl 构造函数的第四个参数是 useLocalCacheDirectoryManager ,意为
//是否使用 LocalCacheDirectoryManager ,即本地的缓存目录管理
@SuppressWarnings("unchecked")
private void handleInitApplicationResources(Application app) {
String userName = app.getUser();
//创建一个 LocalResourcesTrackerImpl 对象,并将其放在 privateRsrc 集合中
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
String appIdStr = app.getAppId().toString();
//再创建一个 LocalResourcesTrackerImpl 对象,并将其放在 appRsrc 集合中
appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
app.getAppId(), dispatcher, false, super.getConfig(), stateStore,
dirsHandler));
//实际上是 ApplicationEventType.APPLICATION _ INITED
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
app.getAppId()));
}
}
public class ApplicationImpl implements Application {
//跳变规则
addTransition ( ApplicationState.INITING , ApplicationState.RUNNING ,
ApplicationEventType.APPLICATION _ INITED , newAppInitDoneTransition ())
@SuppressWarnings("unchecked")
static class AppInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
// 对于 containers 集合中的每个容器, 产生一个 INIT _ CONTAINER 事件
for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId()));
}
}
}
}
server\nodemanager\containermanager\container\ContainerImpl.java
容器初始化
public class ContainerImpl implements Container {
//跳变规则
addTransition(ContainerState.NEW, EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
ContainerState.LOCALIZATION_FAILED, ContainerState.DONE),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
@SuppressWarnings("unchecked") // dispatcher not typed
static class RequestResourcesTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
//容器的恢复已经完成(以前保存了容器内容,但是执行失败,需要重执)
if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
//发送 ApplicationContainerFinishedEvent 等事件
container.sendFinishedEvents();
return ContainerState.DONE;
} else if (container.recoveredAsKilled &&
container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
// container was killed but never launched
container.metrics.killedContainer();
container.metrics.releaseContainer(container.resource);
- Python学习(七):模块 优雅的封装
- 非Kerberos环境下Kafka数据到Flume进Hive表
- 6.如何为Hue配置OpenLDAP认证
- Socket 通信原理
- PHP 面试知识梳理
- 5.如何为Impala配置OpenLDAP认证
- 传统Spring项目使用FeignClient组件访问微服务
- 4. 如何为Hive配置OpenLDAP认证
- 3.如何实现OpenLDAP的主主同步
- 2.OpenLDAP集成SSH登录并使用SSSD同步用户
- 1.如何在CentOS6.5安装OpenLDAP并配置客户端
- PHP 面试知识梳理
- 如何修改CM及CDH元数据库配置
- 如何实现CDH元数据库MySQL的高可用
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Chevereto——简介及安装
- Typecho开启伪静态并隐藏index.php
- Prism代码高亮Pjax重载函数
- Qt示例-AnalogClock-自定义窗体-使用QPainter的转换和缩放特性简化绘图
- 2019-12-27-traefik
- 2019-12-05-eck-qustion
- 聊聊dubbo-go的ConsumerSignFilter
- Javascript错误处理
- 程序员说模型过拟合的时候,说的是什么?
- 分分钟教会你搭建企业级的 npm 私有仓库
- Golang语言之字符串操作
- Go语言ORM-gorm学习笔记(二)
- TypeScript: 常用的高级类型
- TypeScript:得泛型者,得天下
- 微信PC端多开的秘密