Spark SQL中Not in Subquery为何低效以及如何规避
首先看个Not in Subquery的SQL:
// test_partition1 和 test_partition2为Hive外部分区表
select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
对应的完整的逻辑计划和物理计划为:
== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT 't1.id IN (list#3 [])
: +- 'Project ['id]
: +- 'UnresolvedRelation `test_partition2`
+- 'SubqueryAlias `t1`
+- 'UnresolvedRelation `test_partition1`
== Analyzed Logical Plan ==
id: string, name: string, dt: string
Project [id#4, name#5, dt#6]
+- Filter NOT id#4 IN (list#3 [])
: +- Project [id#7]
: +- SubqueryAlias `default`.`test_partition2`
: +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
+- SubqueryAlias `t1`
+- SubqueryAlias `default`.`test_partition1`
+- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
== Optimized Logical Plan ==
Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- Project [id#7]
+- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- BroadcastExchange IdentityBroadcastMode
+- Scan hive default.test_partition2 [id#7], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
通过上述逻辑计划和物理计划可以看出,Spark SQL在对not in subquery处理,从逻辑计划转换为物理计划时,会最终选择BroadcastNestedLoopJoin(对应到Spark源码中BroadcastNestedLoopJoinExec.scala)策略。
提起BroadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到应用,比如mysql。它的工作方式是循环从一张表(outer table)中读取数据,然后访问另一张表(inner table,通常有索引),将outer表中的每一条数据与inner表中的数据进行join,类似一个嵌套的循环并且在循环的过程中进行数据的比对校验是否满足一定条件。
对于被连接的数据集较小的情况下,Nested Loop Join是个较好的选择。但是当数据集非常大时,从它的执行原理可知,效率会很低甚至可能影响整个服务的稳定性。
而Spark SQL中的BroadcastNestedLoopJoin就类似于Nested Loop Join,只不过加上了广播表(build table)而已。
BroadcastNestedLoopJoin是一个低效的物理执行计划,内部实现将子查询(select id from test_partition2)进行广播,然后test_partition1每一条记录通过loop遍历广播的数据去匹配是否满足一定条件。
private def leftExistenceJoin(
// 广播的数据
relation: Broadcast[Array[InternalRow]],
exists: Boolean): RDD[InternalRow] = {
assert(buildSide == BuildRight)
/* streamed对应物理计划中:
Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
*/
streamed.execute().mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
// 条件是否定义。此处为Some(((id#4 = id#7) || isnull((id#4 = id#7))))
if (condition.isDefined) {
streamedIter.filter(l =>
// exists主要是为了根据joinType来进一步条件判断数据的返回与否,此处joinType为LeftAnti
buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
)
// else
} else if (buildRows.nonEmpty == exists) {
streamedIter
} else {
Iterator.empty
}
}
}
由于BroadcastNestedLoopJoin的低效率执行,可能导致长时间占用executor资源,影响集群性能。同时,因为子查询的结果集要进行广播,如果数据量特别大,对driver端也是一个严峻的考验,极有可能带来OOM的风险。因此,在实际生产中,要尽可能利用其他效率相对高的SQL来避免使用Not in Subquery。
虽然通过改写Not in Subquery的SQL,进行低效率的SQL到高效率的SQL过渡,能够避免上面所说的问题。但是这往往建立在我们发现任务执行慢甚至失败,然后排查任务中的SQL,发现"问题"SQL的前提下。那么如何在任务执行前,就"检查"出这样的SQL,从而进行提前预警呢?
这里笔者给出一个思路,就是解析Spark SQL计划,根据Spark SQL的join策略匹配条件等,来判断任务中是否使用了低效的Not in Subquery进行预警,然后通知业务方进行修改。同时,我们在实际完成数据的ETL处理等分析时,也要事前避免类似的低性能SQL。
- NGS基础 - FASTQ格式解释和质量评估
- 数据结构之串
- 生信宝典之傻瓜式 (一) 如何提取指定位置的基因组序列
- 注意map<> 的[]
- 生信宝典之傻瓜式 (二) 如何快速查找指定基因的调控网络
- React Native组件只Image
- 数据结构之线性表
- 生信宝典之傻瓜式 (三) 我的基因在哪里发光 - 如何查找基因在发表研究中的表达
- 谈谈 char *num="123";和char num[4]="123";的区别
- 未越狱的iPhone/iPad也中招:走近强大的间谍软件XAgent与MadCap
- 【年末收藏】17个新手常见Python运行时错误
- C++ STL之priority_queue
- 链表的相关操作
- 《笨办法学Python》 第0课手记
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Java Jar源码反编译工具对比
- 深入理解JVM虚拟机---垃圾回收与内存分配
- rxjs pipe和filter组合的一个实际例子的单步调试
- SAP Spartacus基于travis的持续集成
- Angular里如何测试一个具有外部依赖的Component
- Angular Component的DOM单元测试
- 对具有依赖的Angular服务进行单元测试的几种方式
- 使用TestBed测试具有依赖关系的Angular服务
- 使用jasmine.createSpyObj测试具有依赖关系的Angular服务
- 使用setup函数替代beforeEach函数进行Angular单元测试
- 对Angular使用了HttpClient的服务进行单元测试
- Elasticsearch中什么是 tokenizer、analyzer、filter ?
- ElasticSearch Snowball token filter
- Hibernate入门篇(三)——编写第一个Hibernate例子
- RabbitMQ与Kafka选型对比