数据库中间件 MyCAT 源码解析 —— 分片结果合并(一)
1. 概述
相信很多同学看过 MySQL 各种优化的文章,里面 99% 会提到:单表数据量大了,需要进行分片(水平拆分 or 垂直拆分)。分片之后,业务上必然面临的场景:跨分片的数据合并。今天我们就一起来瞅瞅 MyCAT 是如何实现分片结果合并。
跨分片查询大体流程如下:
flow
和 《【单库单表】查询》 不同的两个过程:
- 【2】多分片执行 SQL
- 【4】合并多分片结果
下面,我们来逐条讲解这两个过程。
2. 多分片执行 SQL
execute_sql
经过 SQL 解析后,计算出需要执行 SQL 的分片节点,遍历分片节点发送 SQL 进行执行。
核心代码:
- MultiNodeQueryHandler.java#execute(...)
SQL 解析 详细过程,我们另开文章,避免内容过多,影响大家对 分片结果合并 流程和逻辑的理解。
3. 合并多分片结果
handle_response
和 《【单库单表】查询》 不同,多个分片节点都会分别响应 记录头(header) 和 记录行(row) 。在开始分析 MyCAT 是怎么合并多分片结果之前,我们先来回想下 SQL 的执行顺序。
FROM // [1] 选择表
WHERE // [2] 过滤表
GROUP BY // [3] 分组SELECT // [4] 普通字段,max / min / avg / sum / count 等函数,distinctHAVING // [5] 再过滤表ORDER BY // [6] 排序LIMIT // [7] 分页
3.1 记录头(header)
多个分片节点响应时,会响应多次 记录头(header) 。MyCAT 在实际处理时,只处理第一个返回的 记录头(header) 。因此,在使用时要保证表的 Schema 相同。
分片节点响应的 记录头(header) 可以直接返回 MySQL Client 吗?答案是不可以。AVG
函数 是特殊情况,MyCAT 需要将 AVG
拆成 SUM
+ COUNT
进行计算。举个例子:
// [1] MySQL Client => MyCAT :
SELECT AVG(age) FROM student;
// [2] MyCAT => MySQL Server :
SELECT SUM(age) AS AVG0SUM, COUNT(age) AS AVG0COUNT FROM student;
// [3] 最终:AVG(age) = SUM(age) AS AVG0SUM / COUNT(age)
核心代码:
- MultiNodeQueryHandler.java#fieldEofResponse(...)。
3.2 记录行(row)
3.1 AbstractDataNodeMerge
MyCAT 对分片结果合并通过 AbstractDataNodeMerge
子类来完成。
merge_service
AbstractDataNodeMerge
:
- -packs :待合并记录行(row)队列。队列尾部插入
END_FLAG_PACK
表示队列已结束。 - -running :合并逻辑是否正在执行中的标记。
- ~onRowMetaData(...) :根据记录列信息(ColMeta)构建对应的排序组件和聚合组件。需要子类进行实现。
- ~onNewRecord(...) :插入记录行(row) 到
packs
。 - ~outputMergeResult(...) :插入
END_FLAG_PACK
到packs
。 - ~run(...) :执行合并分片结果逻辑,并将合并结果返回给 MySQL Client。需要子类进行实现。
AbstractDataNodeMerge_run.png
通过 running
标记保证同一条 SQL 同时只有一个线程正在执行,并且不需要等到每个分片结果都返回就可以执行聚合逻辑。当然,排序逻辑需要等到所有分片结果都返回才可以执行。
核心代码:
- AbstractDataNodeMerge.java
- DataNodeMergeManager.java#run(...)
3.2 DataNodeMergeManager
AbstractDataNodeMerge
有两种子类实现:
-
DataMergeService
:基于堆内内存合并分片结果。 -
DataNodeMergeManager
:基于堆外内存合并分片结果。
目前官方默认配置使用 DataNodeMergeManager
。主要有如下优点:
- 可以使用更大的内存空间。当并发量大或者数据量大时,更大的内存空间意味着更好的性能。
- 减少 GC 暂停时间。记录行(row)对象小且重用性很低,需要能够进行类似 C / C++ 的自主内存释放。
- 更快的内存复制和读取速度,对排序和聚合带来很好的提速。
如果对堆外内存不太了解,推荐阅读如下文章:
- 《从0到1起步-跟我进入堆外内存的奇妙世界》
- 《堆内内存还是堆外内存?》
- 《JAVA堆外内存》
- 《JVM源码分析之堆外内存完全解读》
本文主要分析 DataNodeMergeManager
实现,DataMergeService
可以自己阅读或者等待后续文章(?欢迎订阅我的公众号噢)。
DataNodeMergeManager
有三个组件:
-
globalSorter
:UnsafeExternalRowSorter
=> 实现记录行(row)合并并排序逻辑。 -
globalMergeResult
:UnsafeExternalRowSorter
=> 实现记录行(row)合并不排序逻辑。 -
unsafeRowGrouper
:UnsafeRowGrouper
=> 实现记录行(row)聚合逻辑。
DataNodeMergeManager#run(...)
逻辑如下:
- [1] 写入记录行(row)到
UnsafeRow
。 - [2] 根据情况将
UnsafeRow
插入对应组件。 - [3] 当所有
UnsafeRow
插入完后,根据情况使用组件聚合、排序。
是否排序 |
是否聚合 |
依赖组件 |
[2] |
[3] |
---|---|---|---|---|
否 |
否 |
globalSorter |
插入 globalSorter |
使用 globalSorter 合并并排序 |
是 |
否 |
globalMergeResult |
插入 globalMergeResult |
使用 globalMergeResult 合并不排序 |
否 |
是 |
unsafeRowGrouper + globalSorter |
插入 unsafeRowGrouper 进行聚合 |
使用 globalSorter 合并并排序 |
是 |
是 |
unsafeRowGrouper + globalMergeResult |
插入 unsafeRowGrouper 进行聚合 |
使用 globalMergeResult 合并不排序 |
核心代码:
- DataNodeMergeManager.java。
?看到这里,可能很多同学都有点懵逼,问题不大,我们继续往下瞅。
3.3 UnsafeRow
unsafe_row
记录行(row)写到 UnsafeRow
的 baseObject
属性,结构如下:
unsafe_row_object
unsafe_row_2.png
- 拆分成三个区域,每个区域按照格子记录信息,每个格子 64bits(8 Bytes)。
- 记录行(row)按照字段顺序位置记录到
baseObject
。 - [1] 空标记位区域 :标记字段对应的值是否为 NULL。
- 当字段对应的值为 NULL 时,其对应的字段顺序对应的 bit 设置为 1。举个例子,第 0 个位置字段为 NULL,则第一个格子对应的 64 bits 从右边第一个 bit 设置为 1。
- 因为每个格子是 64 bits,每 64 个字段占用一个格子,不满一个格子,按照一个格子计算。因此,该区域的长度(
bitSetWidthInBytes
) = 字段占用的格子数 * 64 bits。
- [2] 位置长度区域 :记录字段对应的值在
[3]区域
所在的位置和长度。- 每个字段记录
[2]区域
的位置 =baseOffset
+bitSetWidthInBytes
+ 8 Bytes * 字段顺序。 - 占用一个格子,前 32 bits 为
[3]区域
的位置,后 32 bits 为字段对应的值长度。
- 每个字段记录
- [3] 值区域 :记录字段对应的值。
- 每个字段对应的值占用格子数 = 字段对应的值长度 / 8 Byte,如果无法整除再 + 1。
- 因为字段对应的值可能无法刚好占满每个格子,未使用的 bit 用 0 占位。
写入 UnsafeRow
,MyCAT 可以顺序访问每个字段,而不需要在记录行(row)进行遍历。
?日常开发使用位操作的机会比较少,可能较为难理解,需要反复理解下,相信会获得很大启发。恩,该部分代码引用自开源运算框架 Spark
,是不是更加有动力列?。
核心代码:
- UnsafeRow.java
- BufferHolder.java
- UnsafeRowWriter.java
3.4 UnsafeExternalRowSorter
如果使用 Java 实现 SELECT * FROM student ORDER BY age desc, nickname asc
,不考虑算法优化的情况下,我们可以简单如下实现:
Collections.sort(students, new Comparator<Comparable>() { @Override
public int compare(Student o1, Student o2) { int cmp = compare(o2.age, o1.age); return cmp != 0 ? cmp : compare(o1.nickname, o2.nickname);
}
}
});
从功能上,UnsafeExternalRowSorter
是这么实现排序逻辑。当然肯定的是,不是这么“简单”的实现。
sorter_write
UnsafeRow
会写入到两个地方:
-
List<MemoryBlock>
:内存块数组。当前MemoryBlock
无法容纳写入的UnsafeRow
时,生成新的MemoryBlock
提供写入。每条UnsafeRow
存储在MemoryBlock
由 长度 + 字节内容 组成。 -
LongArray
:每条UnsafeRow
存储在LongArray
由两部分组成:address + prefix。-
address
:UnsafeRow
存储在List<MemoryBlock>
的位置。前 13 bits 记录所在MemoryBlock
的 index,后 51 bit 记录在MemoryBlock
的 offset。 -
prefix
:UnsafeRow
第一个排序字段值前 64 bits 计算的值。
-
UnsafeExternalRowSorter
排序实现方式 :提供 TimSort 和 RadixSort 两种排序算法,前者为默认实现。TimSort 折半查找时,使用 LongArray
,先比较 prefix
,若相等,则顺序对比每个排序字段直到不等,提升计算效率。插入操作在 LongArray
操作,List<MemoryBlock>
只作为原始数据。
另外,当需要排序特别大的数据量时,会使用存储数据到文件进行排序。限于笔者暂时未阅读该处源码,后续会另开文章分析。?
核心源码:
- UnsafeExternalRowSorter.java
- UnsafeExternalRowSorter.java
- TimSort.java
3.5 UnsafeRowGrouper
如果使用 Java 实现 SELECT nickname, COUNT(*) FROM student group by nickname
,不考虑算法优化的情况下,我们可以简单如下实现:
Map<String, List<Object>> map = new HashMap<>();// 聚合for (student : students) { if (map.contains(student.nickname)) {
map.put(student.nickname, map.get(student.nickname).get(1) + 1);
} else {
List<Object> value = new Array<>();
value.add(nickname);
value.add(1);
map.put(student.nickname, value);
}
}// 输出for (value : map.values) {
System.out.println(value);
}
从功能上,UnsafeRowGrouper
是这么实现排序逻辑。当然肯定的是,也不是这么“简单”的实现。
?具体怎么实现的呢?我们在《MyCAT 源码解析 —— 分片结果合并(二)》继续分析。
- 在Linux系统运行WinForm程序
- 将ZIP文件添加到程序集资源文件然后在运行时解压文件
- Android中App安装位置详解
- Java面试题系列之基础部分(二)——每天学5个问题
- Java面试题系列之基础部分(四)——每天学5个问题
- 使用ORM框架,必须迁就数据库的设计吗?
- 使用OQL+SQLMAP解决ORM多表复杂的查询问题
- PostgreSQL的.NET驱动程序Npgsql中参数对象的一个Bug
- 和Emoji相关的那些开源项目
- PostgreSQL的PDF.NET驱动程序构建过程
- 基于Docker的PHP开发环境
- 以太坊·物流场景初探
- Python接口自动化-3-POST请求
- 【Python环境】Python中的结构化数据分析利器-Pandas简介
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Centos7中添加、删除Swap交换分区的方法
- Bash中文件描述符的详细介绍
- Linux 块设备驱动代码编写
- ubuntu中终端命令提示符太长的修改方法汇总
- CentOS 6.5 环境实现本地局域网搭建YUM的方法【基于FTP】
- iPhone手机越狱-逆向砸壳-代码注入
- Flutter基础widgets教程-SizedBox篇
- 详解linux 看门狗驱动编写
- CentOS 6.5平台本地YUM配置的方法
- Linux环境(CentOS6.7 64位)下安装subversion1.9.5的方法
- CentOS 6.5平台实现快速部署FTP的方法
- Linux系统中sudo命令的十个技巧总结
- 详解linux电源管理驱动编写
- CentOS6.5系统简单安装与配置Nginx服务器的方法
- 详解linux 摄像头驱动编写