Flink源码解读系列 | 任务提交流程

时间:2022-07-25
本文章向大家介绍Flink源码解读系列 | 任务提交流程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Flink在1.10版本对整个作业提交流程有了较大改动,详情请见FLIP-73。本文基于1.10对作业提交的关键流程进行分析,不深究。 入口: 依旧是main函数最后env.execute();

public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

    return execute(getStreamGraph(jobName));//构造streamGraph
  }
  
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    final JobClient jobClient = executeAsync(streamGraph);//异步执行,返回JobClient对象
    //......省略部分代码
}

根据execution.target配置获取对应PipelineExecutorFactory

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    checkNotNull(streamGraph, "StreamGraph cannot be null.");
    checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

    final PipelineExecutorFactory executorFactory =
      executorServiceLoader.getExecutorFactory(configuration);

    checkNotNull(
      executorFactory,
      "Cannot find compatible factory for specified execution.target (=%s)",
      configuration.get(DeploymentOptions.TARGET));

    CompletableFuture<JobClient> jobClientFuture = executorFactory
      .getExecutor(configuration)//PipelineExecutor
      .execute(streamGraph, configuration);
      //......
}

PipelineExecutor接口有多种实现,以LocalExecutor为例,Pipeline是一个空接口为了把 StreamGraph(stream 程序) 和 Plan (batch 程序)抽象到一起。

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
    checkNotNull(pipeline);
    checkNotNull(configuration);

    // we only support attached execution with the local executor.
    checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

    final JobGraph jobGraph = getJobGraph(pipeline, configuration);//根据streamGraph生成jobGraph
    final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);//启动MiniCluster (这个启动过程比较复杂,包括启动rpcservice、haservice、taskmanager、resourcemanager等)
    final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

    CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);//通过MiniClusterClient提交job

    jobIdFuture
        .thenCompose(clusterClient::requestJobResult)
        .thenAccept((jobResult) -> clusterClient.shutDownCluster());

    return jobIdFuture.thenApply(jobID ->
        new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
  }

通过RPC方式提交jobGraph

public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
    //......
    final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
      .thenCombine(
        dispatcherGatewayFuture,
        (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
      .thenCompose(Function.identity());
    //......
  }
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
    //......
        return internalSubmitJob(jobGraph);
    //.......
  }
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
    final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
      .thenApply(ignored -> Acknowledge.get());
  //......
}
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
    jobGraphWriter.putJobGraph(jobGraph);

    final CompletableFuture<Void> runJobFuture = runJob(jobGraph);//run job
}

创建JobManagerRunner

private CompletableFuture<Void> runJob(JobGraph jobGraph) {
    Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
    //创建JobManagerRunner
    final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
    jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

    return jobManagerRunnerFuture
      .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
      .thenApply(FunctionUtils.nullFn())
      .whenCompleteAsync(
        (ignored, throwable) -> {
          if (throwable != null) {
            jobManagerRunnerFutures.remove(jobGraph.getJobID());
          }
        },
        getMainThreadExecutor());
}
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
  //......
  jobManagerRunner.start();
  //......
}

启动的 JobManagerRunner 会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster。

@Override
  public void start() throws Exception {
    try {
      leaderElectionService.start(this);
    } catch (Exception e) {
      log.error("Could not start the JobManager because the leader election service did not start.", e);
      throw new Exception("Could not start the leader election service.", e);
    }
  }
public void start(LeaderContender contender) throws Exception {
      checkNotNull(contender);
      addContender(this, contender);//参与leader竞争
    }
/**
   * Callback from leader contenders when they start their service.
   */
  private void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
    //......
      if (!allLeaderContenders.add(service)) {//添加到竞争者集合
          throw new IllegalStateException("leader election service was added to this service multiple times");
        }
    //一旦竞争leader成功,则更新leader
    updateLeader().whenComplete((aVoid, throwable) -> {
          if (throwable != null) {
            fatalError(throwable);
          }
        });
    //.......
}
private CompletableFuture<Void> updateLeader() {
//.......
    if (allLeaderContenders.isEmpty()) {
        // no new leader available, tell everyone that there is no leader currently
        return notifyAllListeners(null, null);
      }
      else {//上面已经添加了一个竞争者,走else逻辑
        // propose a leader and ask it
        final UUID leaderSessionId = UUID.randomUUID();//生成leaderSessionId 
        EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();

        currentLeaderSessionId = leaderSessionId;
        currentLeaderProposed = leaderService;
        currentLeaderProposed.isLeader = true;

        LOG.info("Proposing leadership to contender {}", leaderService.contender.getDescription());

        return execute(new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));//启动一个线程
      }
}
private static class GrantLeadershipCall implements Runnable {
public void run() {
      try {
        contender.grantLeadership(leaderSessionId);
      }
      catch (Throwable t) {
        logger.warn("Error granting leadership to contender", t);
        contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
      }
    }
}

public void grantLeadership(final UUID leaderSessionID) {
    synchronized (lock) {
      if (shutdown) {
        log.info("JobManagerRunner already shutdown.");
        return;
      }

      leadershipOperation = leadershipOperation.thenCompose(
        (ignored) -> {
          synchronized (lock) {
            return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);//启动jobManager
          }
        });

      handleException(leadershipOperation, "Could not start the job manager.");
    }
  }
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
    final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

    return jobSchedulingStatusFuture.thenCompose(
      jobSchedulingStatus -> {
        if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
          return jobAlreadyDone();
        } else {
          return startJobMaster(leaderSessionId);
        }
      });
  }

  private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
    log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
      jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());

    try {
      runningJobsRegistry.setJobRunning(jobGraph.getJobID());
    } catch (IOException e) {
      return FutureUtils.completedExceptionally(
        new FlinkException(
          String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
          e));
    }

    final CompletableFuture<Acknowledge> startFuture;
    try {
      startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));//启动 JobMaster
    } catch (Exception e) {
      return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
    }

    final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
    return startFuture.thenAcceptAsync(
      (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
        leaderSessionId,
        jobMasterService.getAddress(),
        currentLeaderGatewayFuture),
      executor);
  }

JobMaster启动之后会和ResourceManager通信

public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
    // make sure we receive RPC and async calls
    start();

    return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
  }
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

    //.......

    startJobMasterServices();
    resetAndStartScheduler();//这里开始正式的任务调度过程了

    //......
  }
private void startJobMasterServices() throws Exception {
    startHeartbeatServices();

    // start the slot pool make sure the slot pool now accepts messages for this leader
    slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
    scheduler.start(getMainThreadExecutor());

    //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
    // try to reconnect to previously known leader
    reconnectToResourceManager(new FlinkException("Starting JobMaster component."));//和ResourceManager建立连接

    // job is ready to go, try to establish connection with resource manager
    //   - activate leader retrieval for the resource manager
    //   - on notification of the leader, the connection will be established and
    //     the slot pool will start requesting slots
    resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  }

之后就是任务的调度过程了,下次再说。

总结: 本文主要分析了flink job提交的流程。从streamgraph、jobgraph生成,启动MiniCluster,通过RPC把jobGraph提交给Dispacher,然后启动jobMaster竞争leader并和ResourceManager建立通信。