Spark Job-Stage-Task实例理解
Spark Job-Stage-Task实例理解
基于一个word count的简单例子理解Job、Stage、Task的关系,以及各自产生的方式和对并行、分区等的联系;
相关概念
- Job:Job是由Action触发的,因此一个Job包含一个Action和N个Transform操作;
- Stage:Stage是由于shuffle操作而进行划分的Task集合,Stage的划分是根据其宽窄依赖关系;
- Task:最小执行单元,因为每个Task只是负责一个分区的数据 处理,因此一般有多少个分区就有多少个Task,这一类的Task其实是在不同的分区上执行一样的动作;
例子代码
'''
DAG: Job vs Stage vs Task
'''
# 初始化spark环境
from pyspark import SparkContext,SparkConf
conf = SparkConf()
conf.setMaster('local').setAppName('Job vs Stage vs Task')
sc = SparkContext(conf=conf)
alpha_rdd1 = sc.parallelize(['a c','a b','b c','b d','c d'],10)
word_count1 = alpha_rdd1.flatMap(lambda a:a.split(' ')).map(lambda a:(a,1)).reduceByKey(lambda x,y:x+y)
alpha_rdd2 = sc.parallelize(['a c','a b','b c','b d','c d'],10)
word_count2 = alpha_rdd2.flatMap(lambda a:a.split(' ')).map(lambda a:(a,1)).reduceByKey(lambda x,y:x+y)
word_count1.join(word_count2).collect()
print('END')
input() # input是方便脚本运行不会终止导致web ui不能正常浏览
可以看到,主要的数据处理逻辑分为三部分,分别是两个word count,以及最后对两个结果的join,事实上这也对应了3个stage,下面是代码与stage的对应图,注意图中的并行关系:
从图中可以看出,原代码只有一个action(collect),因此只有一个Job,这个Job被换分为3个Stage,划分原因是有shuffle出现(reductByKey),而明显看出的是Stage 0和Stage 1互相没有依赖关系,因此可以并行,而Stage 2则是依赖于0和1的,因此会最后一个执行;
Spark Web UI
下面通过Web UI来进一步查看Job、Stage、Task的关系;
从上图看到,只有一个已完成的Job,该Job包含3个Stage,30个Task(注意之前的代码里parallelize设置的分区数为10,3*10=30);
上图表示该Job的运行时间线图,可以明显的看到Stage0和Stage1在时间上有大部分重叠,也就是并行进行,而Stage2是在Stage1结束后才开始,因为Stage0结束的更早,这里对于依赖关系的展示还是很明显的;
另外,对于stage0和stage1,虽然处理的数据量很小,但是依然可以看出二者的运行时间比较接近,也就是没有明显的数据偏斜的情况出现,当然,这里因为只是测试数据,而真实场景下很容易出现个别stage执行时间远远超过其他的stage,导致整体的时间被拖长;
上图是该Job对应的DAG可视化图,它是直接的对Stage以及Stage间的依赖关系进行展示,也验证了我们之前的分析,这里每个Stage还可以继续点进去;
上图中可以更清晰的看到,每个Stage中都包含10个Task,其实就是对应10个partition,对于Stage0和Stage1,他们都是在shuffle前的Stage,因此他们都有Shuffle Write的动作,大小都是514,而Stage2则是join这两部分数据,因此有Shuffle Read动作,大小而前二者之和,也就是1028;
- Google工程师:谷歌翻译在几个月内效果明显提升的秘诀
- 回调与并发: 通过实例剖析WCF基于ConcurrencyMode.Reentrant模式下的并发控制机制
- EnterLib PIAB又一个BUG?[续]——这是一个致命的BUG
- 年终盘点2017年发生在上海的科技大新闻
- 数字供应链第六章-网络风险、知识产权盗窃、合规和数据挖掘业务合同
- 使命必达: 深入剖析WCF的可靠会话[实例篇](内含美女图片,定力差者慎入)
- 快速全面构建大数据认知体系
- 谈谈分布式事务之二:基于DTC的分布式事务管理模型[上篇]
- ConcurrencyMode.Multiple 模式下的WCF服务就一定是并发执行的吗:探讨同步上下文对并发的影响[下篇]
- 谈谈分布式事务之二:基于DTC的分布式事务管理模型[上篇]
- 学习SpringMVC——拦截器
- 学习SpringMVC——国际化+上传+下载
- 行业研究:大数据(一)
- 控制并发访问的三道屏障: WCF限流(Throttling)体系探秘[下篇]
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Cocos2d-js中的简易MVC框架(二)数据模型Model
- Python 还能实现图片去雾?FFA 去雾算法、暗通道去雾算法用起来! | 附代码
- 探秘计算机视觉中的注意力机制
- 采用 Vue 编写的功能强大的 Swagger-ui 页面
- Spring Data REST不完全指南(三)
- Spark Kafka 基于Direct自己管理offset
- 使用Reactor响应式编程
- 多场景下MySQL临时表的作用
- Flink 自定义触发器实现带超时时间的 CountWindow
- 聊聊Spring Boot Actuator
- [译]按功能(特性)分包
- Spring Boot Admin简介及实践
- Spring Boot Admin实现服务健康预警
- 除了FastJson,你也应该了解一下Jackson(二)
- 除了FastJson,你也应该了解一下Jackson(一)