Spark Kubernetes 的源码分析系列 - scheduler

时间:2022-07-22
本文章向大家介绍Spark Kubernetes 的源码分析系列 - scheduler,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1 Overview

这一块代码可以理解为 Spark 是如何实现一个基于 K8S 的调度器,来调度生成 Executor Pod 的。

2 分析

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler
└── cluster
    └── k8s
        ├── ExecutorPodStates.scala
        ├── ExecutorPodsAllocator.scala
        ├── ExecutorPodsLifecycleManager.scala
        ├── ExecutorPodsPollingSnapshotSource.scala
        ├── ExecutorPodsSnapshot.scala
        ├── ExecutorPodsSnapshotsStore.scala
        ├── ExecutorPodsSnapshotsStoreImpl.scala
        ├── ExecutorPodsWatchSnapshotSource.scala
        ├── KubernetesClusterManager.scala
        ├── KubernetesClusterSchedulerBackend.scala
        └── KubernetesExecutorBuilder.scala

2 directories, 11 files

2.1 KubernetesExecutorBuilder

由于上篇文章主要介绍了 Driver 的 Pod 是如何生成的,在讲 scheduler 之前,先补充一下 Executor 的配置步骤。重点代码在下面这个 features 里。步骤跟 Driver 类似,但是少了一些,剩下的就是 一个 Basic 的配置,当然是包含 Pod 或者 Container 的一些 meta 信息。此外,跟 ApiServer 交互请求 Executor Pod 的时候也需要 K8S 的安全认证的机制。然后就是类似 Env 和本地目录挂载的一些配置。

val features = Seq(
  new BasicExecutorFeatureStep(conf, secMgr),
  new ExecutorKubernetesCredentialsFeatureStep(conf),
  new MountSecretsFeatureStep(conf),
  new EnvSecretsFeatureStep(conf),
  new LocalDirsFeatureStep(conf),
  new MountVolumesFeatureStep(conf))

2.2 KubernetesClusterManager

这个是 Spark 这一段,关于 K8S 集群作为 resource manager 的一个管理中心。这个类是继承了 ExternalClusterManager 接口的,主要是控制生成 schedulerBackend 对象。

2.3 KubernetesClusterSchedulerBackend

这是 K8S 集群调度器的封装,SchedulerBackend,简称 SB 就好了…SB 主要是包含了申请 request 和删除 remove Executor 的逻辑。

// 这里是指定初始申请的 Executor 的数量,可以通过 conf 来配置
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
// 这个是 Executor 出问题 debug 的关键
// 默认情况下 Executor 退出后,会由 Spark 的 K8S 客户端主动进行删除
// 所以 Executor 的日志就找不到了
// 开启这个配置 spark.kubernetes.executor.deleteOnTermination
// 这样 Executor 即时 Failed 了,他的 Pod 也不会被自动删除
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

// 移除 Executor 的逻辑,上面说到的 Pod 被删除就是这里的 delete 导致的
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
  logInfo("do send request to kill executors!")
  kubernetesClient
    .pods()
    .withLabel(SPARK_APP_ID_LABEL, applicationId())
    .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
    .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
    .delete()
  // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}

2.4 ExecutorPodsSnapshotsStore

这个接口是用于管理 Executor Pod,下面简称 EP…EP 的状态,并且用 ExecutorPodsSnapshot 的数据结构来记录变化的情况。

2.5 ExecutorPodsSnapshot

ExecutorPodsSnapshot 是关于 Spark App 在集群里 EP 的状态的不可变视图。

private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {

  import ExecutorPodsSnapshot._
  
  // 核心方法,witUpdate 通过传入 Pod 参数,通过 new 一个 EP snapshot 视图来记录 EP 的状态,本质上一个 Map(Executor id -> Executor Pod 状态) 的数据结构
  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
    val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
    new ExecutorPodsSnapshot(newExecutorPods)
  }
}

2.6 ExecutorPodsSnapshotsStoreImpl

这是 ExecutorPodsSnapshotsStore 的实现类。下面一段是理解整个 scheduler 的关键,所以建议拿着英文注释认真看一遍,大概就能理解了。

Controls the propagation of the Spark application’s executor pods state to subscribers that react to that state. Roughly follows a producer-consumer model. Producers report states of executor pods, and these states are then published to consumers that can perform any actions in response to these states. Producers push updates in one of two ways. An incremental update sent by updatePod() represents a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that the passed pods are all of the most up to date states of all executor pods for the application. The combination of the states of all executor pods for the application is collectively known as a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that most recent snapshot - either by incrementally updating the snapshot with a single new pod state, or by replacing the snapshot entirely on a full sync. Consumers, or subscribers, register that they want to be informed about all snapshots of the executor pods. Every time the store replaces its most up to date snapshot from either an incremental update or a full sync, the most recent snapshot after the update is posted to the subscriber’s buffer. Subscribers receive blocks of snapshots produced by the producers in time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different time intervals.

以上就是他的设计思想,简单来说就是依照生产消费者模式,订阅者订阅的是 EP 的状态,而这个状态是上文提到的 Snapshot。

SNAPSHOT_LOCK // 锁
subscribers // 订阅者
pollingTasks // ?
currentSnapshot // 当前的 Snapshot

2.7 ExecutorPodsWatchSnapshotSource

这里面主要是继承 K8S 客户端的一个 Wathcher 监听器,主要监听 Pod 的事件。因为 EP 被增删改出错,等都需要被 SB 感知。

enum Action {
  ADDED, MODIFIED, DELETED, ERROR
}

2.8 ExecutorPodsPollingSnapshotSource

这个是通过 K8S client 轮询 ApiServer 获取 Pod 状态并且保存到 Snapshot 里的过程。

private class PollRunnable(applicationId: String) extends Runnable {

  override def run(): Unit = Utils.tryLogNonFatalError {
    logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
    // 核心方法,将得到的 Pod 的状态,通过 replaceSnapshot 来记录
    snapshotsStore.replaceSnapshot(kubernetesClient
      .pods()
      .withLabel(SPARK_APP_ID_LABEL, applicationId)
      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
      .list()
      .getItems
      .asScala)
  }
}

// 轮询默认是30s一次
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

2.9 ExecutorPodsLifecycleManager

这个就是一个 EP 生命周期的 Manager,本质上 Pod 是创建在 K8S 集群的,Driver Pod 对 EP 的管理需要通过 K8S 的 ApiServer,而当 Pod 发生状态改变了,对应的也要告知 Driver。

private def onNewSnapshots(
    schedulerBackend: KubernetesClusterSchedulerBackend,
    snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
  val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
  snapshots.foreach { snapshot =>
    snapshot.executorPods.foreach { case (execId, state) =>
      state match {
        case deleted@PodDeleted(_) =>
          logDebug(s"Snapshot reported deleted executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          removeExecutorFromSpark(schedulerBackend, deleted, execId)
          execIdsRemovedInThisRound += execId
        case failed@PodFailed(_) =>
          logDebug(s"Snapshot reported failed executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
        case succeeded@PodSucceeded(_) =>
          logDebug(s"Snapshot reported succeeded executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
            s" unusual unless Spark specifically informed the executor to exit.")
          onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
        case _ =>
      }
    }
  }

  if (snapshots.nonEmpty) {
    val latestSnapshot = snapshots.last
    (schedulerBackend.getExecutorIds().map(_.toLong).toSet
      -- latestSnapshot.executorPods.keySet
      -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
      if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
        val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
          s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
          s" executor may have been deleted but the driver missed the deletion event."
        logDebug(exitReasonMessage)

        val exitReason = ExecutorExited(
          UNKNOWN_EXIT_CODE,
          exitCausedByApp = false,
          exitReasonMessage)
        schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
        execIdsRemovedInThisRound += missingExecutorId
      }
    }
  }

  if (execIdsRemovedInThisRound.nonEmpty) {
    logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
      s" from Spark that were either found to be deleted or non-existent in the cluster.")
  }
}

3 Summary

Scheduler 的粗浅分析就到这里,其实不是太难理解的,调度器的功能就是找到给 Driver 分配和在合适的时候移除 Executor,至于如何找合适的节点来跑 Executor,那是 K8S 的事情,这里是把 K8S 作为一个外部的集群模式,具体的调度工作是交给 K8S 的。