编写一个直接在Yarn上运行的程序
我们知道基于mapReduce框架的分布式程序的编写,在这种框架下我们不需要考虑申请资源,只需要安照mapreduce框架的要求,直接编写Map函数和reduce函数即可。如何在Yarn上直接编写应用程序呢?
要想在Yarn上编写应用程序,需要编写两个组件,Client和ApplicationMaster. 例如,JobClient和MRAppMaster是Yarn专门为Mapreduce设计实现的两个Client和ApplicationMaster组件。
客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序的状态。
ApplicationManager负责向ResourceManager申请资源(返回以Container形式),并与NodeManager通信以启动各个Container,同时负责监控运行的状态,并在失败时候重新申请资源。
client设计
客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序的状态。
- 客户端通过RPC与ResourceManager通信获取appId和可分配的资源等信息
- 客户端通过RPC将ApplicationMaster提交到ResourceManager
客户端将启动ApplicationMaster所需要的信息打包到ApplicationSubmisionContext中 主要包括:application_id,name,priority,queue,user,unmanagered_am等
也需要提提供ApplicationClient接口实现,以供返回信息,包括集群信息,节点信息,kill信息,运行状态
当然这些程序可以使用java的RPC进行编程,Yarn提供了YarnClient类的封装编程库,使用maven载入。
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.2.0</version>
</dependency>
public class YarnClientDemo {
//创建一个yarnclient客户断
private YarnClient client;
//这里配置Yarn集群信息或者采用加在yarn-site.xml文件形式
private Configuration conf;
public void initClient() throws IOException, YarnException {
client = YarnClient.createYarnClient();
client.init(conf);
//启动YarnClient
client.start();
//获取一个applicationID
YarnClientApplication app = client.createApplication();
//构造applicationsubmit用于提交
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
//获取appid
ApplicationId appId = appContext.getApplicationId();
//获取资源信息
Resource resource = appContext.getResource();
//设置app信息
appContext.setApplicationName("statistic app");
appContext.setQueue("background");
appContext.setUnmanagedAM(false);
client.submitApplication(appContext);
}
}
ApplicationMaster设计
ApplicationManager负责向ResourceManager申请资源(返回以Container形式),并与NodeManager通信以启动各个Container,同时负责监控运行的状态,并在失败时候重新申请资源。
所以我们分为AM-RM交互和AM-NM交互
AM-RM
- ApplicationMaster通过RPC向ResourceManager注册
ApplicationMaster启动时向ResourceManager注册,注册信息封装RegisterApplicationMasterRequest中,主要信息有host, rp_port,tracking_url追踪URL.
注册成功后收到一个RegisterApplicationMasterResponse返回值,主要信息有,maximumCapability最大可用资源,client_to_am_token_master_key等信息
- ApplicationMaster通过RPC向ResourceManager申请资源
ApplicationMaster将要请求的资源封装为AllocateRequest参数,主要包括,priority,resource_name,capability,num_containers资源数目,进度,请求加入黑名单的机器。用户可以将一个机器加入黑名单使,RM不把该机器资源分配给本程序。
ApplicationMaster调用后会收到AllocationResponse类型返回信息。主要包括,reponse_id,allocated_containers,limit,状态等信息。
- ApplicationMaster通过RPC告诉ResourceManager程序运行完毕,退出
ApplicationMaster与ResourceManager交互由AMRMClientImpl和AMRMClientAsync实现,但是AMRMClientImpl是阻塞的,AMRMClientAsync是非阻塞的
public class MyCallByHandler implements AMRMClientAsync.CallbackHandler
{
//配置yarnconf信息
private YarnConfiguration conf;
public void onContainersCompleted(List<ContainerStatus> list) {
}
public void onContainersAllocated(List<Container> list) {
//构建句柄
AMRMClientAsync.CallbackHandler callbackHandler = new MyCallByHandler();
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
try {
//三个需要填的参数是name,port,url
RegisterApplicationMasterResponse response = asyncClient.registerApplicationMaster("statistic app",5200,"url");
asyncClient.addContainerRequest(ContainerRequest.newBuilder().build());
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void onShutdownRequest() {
}
public void onNodesUpdated(List<NodeReport> list) {
}
public float getProgress() {
return 0;
}
public void onError(Throwable throwable) {
}
}
上简单实现构建的代表
AM-NM
- ApplicationMaster将申请到的资源二次分配给内部的任务,并通过RPC与NodeManager通信,启动Container。
该过程传参为StartContainerRequest,主要包括localResources,environment,container_token等信息,返回一个StartContainerResponse,主要包括services_meta_data,成功或失败请求值
- ApplicationMaster向NodeManager询问container的运行状态,失败会重新申请资源
- Container运行完成,ApplicationMaster通过RPC释放Container
ApplicationMaster与ResourceManager交互由NMClientImpl和NMClientAsync实现,但是NMClientImpl是阻塞的,NMClientAsync是非阻塞的
实现方法差不多,实现NMClientAsync接口。
Yarn实现了DistributionShell的实例
DistributionShell 是Yarn自带的Application实现的例子,可以运行shell命令,代码也不多
1)构造RPC句柄。
利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager:
private void connectToASM() throws IOException {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
LOG.info(“Connecting to ResourceManager at ” + rmAddress);
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, rmAddress, conf));
}
(2)获取application id。
与ResourceManager通信,请求application id:
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
(3)构造ContainerLaunchContext。
构造一个用于运行ApplicationMaster的container,container相关信息被封装到ContainerLaunchContext对象中:
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//添加本地资源
//填充localResources
amContainer.setLocalResources(localResources);
//添加运行ApplicationMaster所需的环境变量
Map<String, String> env = new HashMap<String, String>();
//填充env
amContainer.setEnvironment(env);
//添加启动ApplicationMaster的命令
//填充commands;
amContainer.setCommands(commands);
//设置ApplicationMaster所需的资源
amContainer.setResource(capability);
(4)构造ApplicationSubmissionContext。
构造一个用于提交ApplicationMaster的ApplicationSubmissionContext:
ApplicationSubmissionContext appContext =
Records.newRecord(ApplicationSubmissionContext.class);
//设置application id,调用GetNewApplicationResponse#getApplicationId()
appContext.setApplicationId(appId);
//设置Application名称:“DistributedShell”
appContext.setApplicationName(appName);
//设置前面创建的container
appContext.setAMContainerSpec(amContainer);
//设置application的优先级,默认是0
pri.setPriority(amPriority);
//设置application的所在队列,默认是”"
appContext.setQueue(amQueue);
//设置application的所属用户,默认是”"
appContext.setUser(amUser);
(5)提交ApplicationMaster。
将ApplicationMaster提交到ResourceManager上,从而完成作业提交功能:
applicationsManager.submitApplication(appRequest);
(6) 显示应用程序运行状态。
为了让用户知道应用程序进度,Client会每隔几秒在shell终端上打印一次应用程序运行状态:
while (true) {
Thread.sleep(1000);
GetApplicationReportRequest reportRequest =
Records.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse =
applicationsManager.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
//打印report内容
…
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
return true;
} else {
return false;
}
} else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
return false;
}
}
- SDP(11):MongoDB-Engine功能实现
- SDP(10):文本式大数据运算环境-MongoDB-Engine功能设计
- Kaggle Titanic 生存预测比赛超完整笔记(下)
- SDP(9):MongoDB-Scala - data access and modeling
- 数据清理的遗留问题处理(r6笔记第87天)
- 一次DB time抖动发现的expdp的bug(r6笔记第86天)
- Python中map函数
- 10g,11g中数据库静默安装中的细小差别(r6笔记第85天)
- SDP(8):文本式数据库-MongoDB-Scala基本操作
- SDP(7):Cassandra- Cassandra-Engine:Streaming
- TensorFlow实现神经网络入门篇
- 27.反射,类加载器,设计模式,jdk新特性
- SDP(6):分布式数据库运算环境- Cassandra-Engine
- 配置dg broker的问题分析及修复(r6笔记第84天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法