Spark executor模块① - 主要类以及创建 AppClient
本文为 Spark 2.0 源码分析笔记,由于源码只包含 standalone 模式下完整的 executor 相关代码,所以本文主要针对 standalone 模式下的 executor 模块,文中内容若不特意说明均为 standalone 模式内容
在 executor 模块中,最重要的几个类(或接口、trait)是:
- AppClient:在 Standalone 模式下的实现是
StandaloneAppClient
类 - SchedulerBackend:SchedulerBackend 是一个 trait,在 Standalone 模式下的实现是
StandaloneSchedulerBackend
类 - TaskScheduler:TaskScheduler 也是一个 trait,当前,在所有模式下的实现均为
TaskSchedulerImpl
类
接下来先简要介绍这几个类的作用以及各自主要的成员和方法,这是理解之后内容的基础
StandaloneAppClient(AppClient)
StandaloneAppClient 主要有以下几个作用:
- 向 master 注册 application
- 接收并处理来自 master 的各种消息,如
RegisteredApplication
、ApplicationRemoved
、ExecutorAdded
等 - 调用 SchedulerBackend 回调接口以通知各种重要的 event,比如:Application 失败、添加了 executor、executor 更新等
主要成员
-
private val REGISTRATION_TIMEOUT_SECONDS = 20
:注册 application 的超时 -
private val REGISTRATION_RETRIES = 3
:注册 application 的最大重试次数 -
endpoint: ClientEndpoint
:ClientEndpoint 为 StandaloneAppClient 内部嵌套类,主要用来:- 通过向 master 发送
RegisterApplication
消息来注册 application - 接收来自 master 的消息并处理,消息包括
-
RegisteredApplication
:application 已成功注册 -
ApplicationRemoved
:application 已移除 -
ExecutorAdded
:有新增加的 Executor -
ExecutorUpdated
:Executor 发生资源更新 -
MasterChanged
:master 改变
-
- 接收来自 StandaloneAppClient 发送的消息并处理,包括:
-
StopAppClient
:StandaloneAppClient stop 时通知 ClientEndpoint 也进行 stop 并反注册 application -
RequestExecutors
:StandaloneAppClient 在注册完 Application 后通过 ClientEndpoint 向 master 为执行 Application 的 tasks 申请资源 -
KillExecutors
:StandaloneAppClient 通过 ClientEndpoint 向 master 发送消息来 kill executor
-
- 通过向 master 发送
主要方法
-
def start()
:启动 StandaloneAppClient -
def requestTotalExecutors(requestedTotal: Int): Boolean
:为 application 向 master 申请指定总数的 executors -
def killExecutors(executorIds: Seq[String]): Boolean
:通过 ClientEndpoint 向 master 发送消息来 kill 一组 executors
SchedulerBackend
SchedulerBackend 在 Standalone 模式下的 SchedulerBackend 的实现是 StandaloneSchedulerBackend,但是从大体的作用上来说,各个模式下的 SchedulerBackend 作用是相同的,主要为:
- 当有新的 task 提交或资源更新时,查找各个节点空闲资源,并确定在哪个 executor 上启动哪个 task 的对应关系,对应的方法是
def reviveOffers(): Unit
- 被 TaskScheduler 调用来 kill task,对应的方法是
def killTask(...): Unit
TaskScheduler
低等级的 task 调度接口,当前只有 TaskSchedulerImpl 这一个实现。该接口支持在不同的部署模式下工作。每个 SparkContext(application) 对应唯一的一个 TaskScheduler。 TaskScheduler 从 DAGScheduler 的每一个 stage 获取 tasks,并负责发送到集群去执行这些 tasks,在失败的时候重试,并减轻掉队情况。TaskScheduler 会返回 events 给 DAGScheduler。
主要方法
-
def rootPool: Pool
:返回 root 调度对列 -
def schedulingMode: SchedulingMode
:调度模式 -
def submitTasks(taskSet: TaskSet)
:提交任务去集群执行 -
def cancelTasks(stageId: Int, interruptThread: Boolean)
:取消一个 stage 对应的 tasks -
def executorHeartbeatReceived(...)
:接收到 executor 心跳信息 -
def executorLost(executorId: String, reason: ExecutorLossReason)
:处理 executor lost
以上简要的介绍了 AppClient、SchedulerBackend、TaskScheduler 几个接口,其中 SchedulerBackend 和 TaskScheduler 接口实例是在 SparkContext 构造函数中创建的,而 AppClient 实例是在 SchedulerBackend 构造函数中被创建。
AppClient 的创建与启动
AppClient 的创建与启动也比较简单,主要流程如下:
- 在 SparkContext 的构造函数中,调用
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
来通过 master url 来创建相应模式下的 SchedulerBackend 实例 sched 以及 TaskSchedulerImpl 实例 ts(我们假定这里创建的 sched 是 StandaloneScheduler 类型的) - 随后,依然是在 SparkContext 的构造函数中,TaskScheduler 实例 ts 调用其 start 方法,在该 start 方法中会调用 SchedulerBackend 实例 sched 的 start 方法(所以,你也可以从这里知道 TaskScheduler 的实现中是包含 SchedulerBackend 的实例的)
- 在 SchedulerBackend 的 start 方法中会创建其嵌套类 ClientEndpoint 对象
- 在将 ClientEndpoint 对象注册给 rpcEnv 的过程中 ClientEndpoint 对象会收到 OnStart 消息并处理,处理过程主要就是持有 ApplicationDescription(主要包括name, maxCores, memoryPerExecutorMB, 启动命令行, appUiUrl等) 来向 Master 注册 application
再次说明,以上内容若无特别说明均指 Standalone 模式下的。本文简要的分析了几个关键类以及 AppClient 是如何启动的,更详细的剖析会在后面的文章中说明。
- 关关的刷题日记03—Leetcode 448. Find All Numbers Disappeared in an Array
- 关关的刷题日记04——Leetcode 283. Move Zeroes
- UESTC 1591 An easy problem A【线段树点更新裸题】
- 关关的刷题日记05 —— Leetcode 219. Contains Duplicate II
- 关关的刷题日记05 —— Leetcode 217. Contains Duplicate 方法1和方法2
- HDU 2602 Bone Collector(01背包裸题)
- Appium+python自动化13-native和webview切换
- HDU 2639 Bone Collector II(01背包变形【第K大最优解】)
- 专知内容生产基石-数据爬取采集利器WebCollector 介绍
- python实现字符串模糊匹配
- 动态规划之01背包详解【解题报告】
- hihoCoder #1078 : 线段树的区间修改(线段树区间更新板子题)
- HDU 2546 饭卡(01背包裸题)
- 漫谈文件系统
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- centos内核的删除或修改
- chkconfig学习笔记
- 3分钟短文:说说Laravel通用缓存Cache的使用技巧
- 【技术创作101训练营】想用代码改变世界?先用好Git和Github!
- 绘制散点图(克利夫兰系列)
- 绘制分组散点图(克里夫兰点图)
- ggplot2绘制玫瑰图
- 绘制极坐标系条形图
- 四步重新认识冗余机器人的控制器设计
- 人脸识别接入常见问题汇总
- TKE上关于postStart 和preStop使用
- 文字识别接入常见问题
- 从 1 到 0 构建博客项目(2) -- 操作系统篇(2)--定制Centos
- 使用Angular依赖注入自定义SAP Spartacus的ProductAdapter
- 获取SAP Spartacus当前显示产品json数据的又一办法