Spark内核源码解析九:executor原理解析和源码解析
时间:2020-05-12
本文章向大家介绍Spark内核源码解析九:executor原理解析和源码解析,主要包括Spark内核源码解析九:executor原理解析和源码解析使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1、executor注册机制
executor启动的是一个CoarseGrainedExecutorBackend这样一个进程,
// Make fake resource offers on just one executor def makeOffers(executorId: String) { val executorData = executorDataMap(executorId) launchTasks(scheduler.resourceOffers( Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSet.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } }
override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) // 向driver发送反向注册消息 driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
override def receiveWithLogging = { // Driver注册executor成功后,会发送回来RegisteredExecutor消息, // Executor会创建executor作为执行句柄,其实他的大部分功能都是通过Executor实现的 case RegisteredExecutor => logInfo("Successfully registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort) executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) // 启动task case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { // 反序列化task val ser = env.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } case KillTask(taskId, _, interruptThread) => if (executor == null) { logError("Received KillTask command but executor was null") System.exit(1) } else { executor.killTask(taskId, interruptThread) } case x: DisassociatedEvent => if (x.remoteAddress == driver.anchorPath.address) { logError(s"Driver $x disassociated! Shutting down.") System.exit(1) } else { logWarning(s"Received irrelevant DisassociatedEvent $x") } case StopExecutor => logInfo("Driver commanded a shutdown") executor.stop() context.stop(self) context.system.shutdown() }
2、task启动机制
case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { // 反序列化task val ser = env.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
// 对于每一个task都会创建一个taskRunner,继承了java的Runnable接口可以作为一个线程任务进行调用,后面会放入线程池中进行执行 def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) // 放入内存缓存 runningTasks.put(taskId, tr) threadPool.execute(tr) }
原文地址:https://www.cnblogs.com/xiaofeiyang/p/12877868.html
- 移除WordPress 仪表盘首页的“插件”“其它WordPress 新闻”小工具
- 解决VMware 7在Windows 7上无法上网的问题
- Windows Server 2008群集仲裁机制
- [C#2] 5-迭代器
- 服务器未能识别 HTTP 标头 SOAPAction 的值
- 实用代码-C#获取本机网络适配器信息及MAC地址
- WordPress 自定义 login (登录页面)CSS 样式
- [C#1] 12-特性
- HTTP Basic Authentication for RESTFul Service
- [C#2] 4-可空类型、静态类
- jquery 操作css 尺寸
- Windows 7上IIS出现http 500错误
- [C#2] 2-匿名方法
- jquery 操作css 选择器
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法