Flink源码解读系列 | 任务提交流程
时间:2022-07-25
本文章向大家介绍Flink源码解读系列 | 任务提交流程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Flink在1.10版本对整个作业提交流程有了较大改动,详情请见FLIP-73。本文基于1.10对作业提交的关键流程进行分析,不深究。 入口: 依旧是main函数最后env.execute();
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
return execute(getStreamGraph(jobName));//构造streamGraph
}
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);//异步执行,返回JobClient对象
//......省略部分代码
}
根据execution.target配置获取对应PipelineExecutorFactory
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)//PipelineExecutor
.execute(streamGraph, configuration);
//......
}
PipelineExecutor接口有多种实现,以LocalExecutor为例,Pipeline是一个空接口为了把 StreamGraph(stream 程序) 和 Plan (batch 程序)抽象到一起。
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
final JobGraph jobGraph = getJobGraph(pipeline, configuration);//根据streamGraph生成jobGraph
final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);//启动MiniCluster (这个启动过程比较复杂,包括启动rpcservice、haservice、taskmanager、resourcemanager等)
final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);
CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);//通过MiniClusterClient提交job
jobIdFuture
.thenCompose(clusterClient::requestJobResult)
.thenAccept((jobResult) -> clusterClient.shutDownCluster());
return jobIdFuture.thenApply(jobID ->
new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
}
通过RPC方式提交jobGraph
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
//......
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
//......
}
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
//......
return internalSubmitJob(jobGraph);
//.......
}
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
//......
}
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
jobGraphWriter.putJobGraph(jobGraph);
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);//run job
}
创建JobManagerRunner
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
//创建JobManagerRunner
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
return jobManagerRunnerFuture
.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
//......
jobManagerRunner.start();
//......
}
启动的 JobManagerRunner 会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster。
@Override
public void start() throws Exception {
try {
leaderElectionService.start(this);
} catch (Exception e) {
log.error("Could not start the JobManager because the leader election service did not start.", e);
throw new Exception("Could not start the leader election service.", e);
}
}
public void start(LeaderContender contender) throws Exception {
checkNotNull(contender);
addContender(this, contender);//参与leader竞争
}
/**
* Callback from leader contenders when they start their service.
*/
private void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
//......
if (!allLeaderContenders.add(service)) {//添加到竞争者集合
throw new IllegalStateException("leader election service was added to this service multiple times");
}
//一旦竞争leader成功,则更新leader
updateLeader().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
fatalError(throwable);
}
});
//.......
}
private CompletableFuture<Void> updateLeader() {
//.......
if (allLeaderContenders.isEmpty()) {
// no new leader available, tell everyone that there is no leader currently
return notifyAllListeners(null, null);
}
else {//上面已经添加了一个竞争者,走else逻辑
// propose a leader and ask it
final UUID leaderSessionId = UUID.randomUUID();//生成leaderSessionId
EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
currentLeaderSessionId = leaderSessionId;
currentLeaderProposed = leaderService;
currentLeaderProposed.isLeader = true;
LOG.info("Proposing leadership to contender {}", leaderService.contender.getDescription());
return execute(new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));//启动一个线程
}
}
private static class GrantLeadershipCall implements Runnable {
public void run() {
try {
contender.grantLeadership(leaderSessionId);
}
catch (Throwable t) {
logger.warn("Error granting leadership to contender", t);
contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
}
}
}
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.info("JobManagerRunner already shutdown.");
return;
}
leadershipOperation = leadershipOperation.thenCompose(
(ignored) -> {
synchronized (lock) {
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);//启动jobManager
}
});
handleException(leadershipOperation, "Could not start the job manager.");
}
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
e));
}
final CompletableFuture<Acknowledge> startFuture;
try {
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));//启动 JobMaster
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
leaderSessionId,
jobMasterService.getAddress(),
currentLeaderGatewayFuture),
executor);
}
JobMaster启动之后会和ResourceManager通信
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
//.......
startJobMasterServices();
resetAndStartScheduler();//这里开始正式的任务调度过程了
//......
}
private void startJobMasterServices() throws Exception {
startHeartbeatServices();
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
scheduler.start(getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));//和ResourceManager建立连接
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
之后就是任务的调度过程了,下次再说。
总结: 本文主要分析了flink job提交的流程。从streamgraph、jobgraph生成,启动MiniCluster,通过RPC把jobGraph提交给Dispacher,然后启动jobMaster竞争leader并和ResourceManager建立通信。
- 聚合索引(clustered index) / 非聚合索引(nonclustered index)
- 域名资讯:单词域名can.com以15.5万美金成功交易
- jQuery无缝图片横向(水平)/竖向(垂直)滚动
- Centos下MooseFS(MFS)分布式存储共享环境部署记录
- MFS+Keepalived双机高可用热备方案操作记录
- Docker容器学习梳理-容器时间跟宿主机时间同步
- AS1.0(2.0)中的XML示例
- kvm虚拟机日常操作命令梳理
- mongodb 总结
- 关于微信小程序内置组件swiper,circular使用分享
- zabbix问题记录
- MSDTC 故障排除
- 洪泰智造工场&腾讯云创业加速营全球招募
- MySQL存储引擎之Myisam和Innodb总结性梳理
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Activity启动时生命周期汇总
- 「程序员」Flutter:从网络获取数据遇到的坑
- 程序员:拿到新电脑如何配置Git环境
- What?数据量巨大还不分库分表?JDBC 入门与项目实战
- 简书:如何去掉图片下面烦人的“图片发自简书App”
- 命令行shell复制并以当前时间重命名文件夹
- 如何获取PHP命令行参数
- 学习PHP弱引用的知识
- 「okhttp」Gradle引用改jar包引用(一波三折)
- 「问答」解决CSV文件用Excel打开乱码问题
- 「问答」解决jar包运行时相对路径问题
- 「Eclipse」生成能用命令行运行的jar包
- 「AndroidStudio」fastjson导包报错:Could not resolve com.alibaba:fastjson:1.1.56.android
- 「Android」通过注解自动生成类文件:APT实战(AbstractProcessor)
- 五、开始Github和码云之旅,新手如何上路