图解 DataX 核心设计原理
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。
前段时间我在 K8s 相关文章中有提到过数据同步的项目,该项目就是基于 DataX 内核构建的,由于公司数据同步的需求,还需要在 DataX 原有的基础上支持增量同步功能,同时支持分布式调度,在「使用 K8s 进行作业调度实战分享」这篇文章中已经详细描述其中的实现。
基于我在项目中对 DataX 的实践过程,给大家分享我所理解的 DataX 核心设计原理。
设计理念
异构数据源离线同步是将源端数据同步到目的端,但是端与端的数据源类型种类繁多,在没有 DataX 之前,端与端的链路将组成一个复杂的网状结构,非常零散无法将同步核心逻辑抽象出来,DataX 的理念就是作为一个同步核心载体连接连接各类数据源,当我们需要数据同步时,只需要以插件的形式接入到 DataX 即可,将复杂的网状结构链路变成了一个星型结构,如下图所示:
架构设计
用过 IDEA 的小伙都知道,IDEA 有很多非常棒的插件,用户可根据自身编程需求,下载相关的插件,DataX 也是使用这种可插拔的设计,采用了 Framework + Plugin 的架构设计,如下图所示:
有了插件,DataX 可支持任意数据源到数据源,只要实现了 Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如 MySQL、Oracle、SQLServer 等,当然我们也可以开发一个 DataX 插件。
核心概念
DataX 核心主要由 Job、Task Group、Task、Channel 等概念组成:
1、Job
在 DataX 中用来描述一个源端到一个目的端的同步作业,是 DataX 数据同步面向用户的最小业务单元。一个Job 对应 一个 JobContainer, JobContainer 负责 Job 的全局切分、调度、前置语句和后置语句等工作。
2、Task Group
一组 Task 的集合,根据 DataX 的公平分配策略,公平地分配 Task 到对应的 TaskGroup 中。一个 TaskGroup 对应一个 TaskGroupContainer,负责执行一组 Task。
3、Task
Job 的最小执行单元,一个 Job 可根据 Reader 端切分策略,且分成若干个 Task,以便于并发执行。
Job、Task Group、Task 三者之间的关系可以用如下图表示:
根据切分策略将一个 Job 切分成多个 Task,根据分配策略将多个 Task 组成一个 TaskGroup。
4、Channel
DataX 会单独启动一条线程运行运行一个 Task,而 Task 会持有一个 Channel,用作 Reader 与 Writer 的数据传输媒介,DataX 的数据流向都是按照 Reader—>Channel—>Writer
的方向流转,用如下图表示:
Channel 作为传输通道,即能充当缓冲层,同时还能对数据传输进行限流操作。
5、Transformer
DataX 的 transformer 模式同时还提供了强大的数据转换功能,DataX 默认提供了丰富的数据转换实现类,用户还可以根据项目自身需求,扩展数据转换。
调度流程
DataX 将用户的 job.json 同步作业配置解析成一个 Job,DataX 通过 JobContainer 完成全局切分、调度、前置语句和后置语句等工作,整体调度流程用如下图表示:
1、切分策略
1)计算并发量(即 needChannelNumber 大小)
DataX有流控模式,其中,可以设置 bps 限速,tps 限速:
- bps 限速:needChannelNumber = 总 byteLimit / 单个 Channel byteLimit
- tps 限速:needChannelNumber = 总 recordLimit / 单个 Channel recordLimit
如果以上都没有设置,则会根据用户在 job.setting.speed.channel
配置的并发数量设置 needChannelNumber。
2)根据 needChannelNumber 将 Job 切分成多个 Task
这个步骤的具体切分逻辑交由相关插件去完成,例如 Rdb 对数据的拆分主要分成两类:
- 如果用户配置了具体的 Table 数量,那么就按照 Table 为最小单元进行拆分(即一个 Table 对应一个 Task),并生成对应的 querySql;
- 如果用户还配置了 splitPk,则会根据 splitPk 进行切分,具体逻辑是根据 splitPk 区间对 Table 进行拆分,并生成对应的 querySql。
2、公平分配策略
DataX 在执行调度之前,会调用 JobAssignUtil#assignFairly
方法对切分好的 Task 公平分配给每个 TaskGroup。
在分配之前,会计算 TaskGroup 的数量,具体公式:
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
channelNumber 即为在切分策略中根据用户配置计算得到的 needChannelNumber 并发数量大小,channelsPerTaskGroup 为每个 TaskGroup 需要的并发数量,默认为 5。
求出 TaskGroup 的数量之后,就会执行公平分配策略,将 Task 平均分配个每个 TaskGroup,最后执行调度,完成整个同步作业。举个公平分配策略的例子:
假设 A 库有表 0、1、2,B 库上有表 3、4,C 库上有表 5、6、7,如果此时有 4 个 TaskGroup,则 assign 后的结果为:
taskGroup-0: 0, 4,
taskGroup-1: 3, 6,
taskGroup-2: 5, 2,
taskGroup-3: 1, 7
举个例子来描述 Job、Task、Task Group 之间的关系:
用户构建了一个数据同步作业,该作业的目的是将 MySql 的 100 张表同步到 Oracle 库中,假设此时用户设置了 20 个并发(即 channelNumber=20):
- DataX 根据表的数量切分成 100 个 Task;
- DataX 默认给每个 TaskGroup 分配 5 个 Channel,因此 taskGroupNumber = channelNumber / channelsPerTaskGroup = 20 / 5 = 4;
- 根据 DataX 的公平分配策略,会将 100 个 Task 平均分配给每个 TaskGroup,因此每个 TaskGroup 处理 taskNumber / taskGroupNumber = 100 / 4 = 25 个 Task。
以上的例子用如下图表示:
由于一个 Channel 对应一个线程执行,因此 DataX 的线程模型可以用如下图表示:
- 几十条业务线日志系统如何收集处理?
- 0基础搭建Hadoop大数据处理-编程
- 0基础搭建Hadoop大数据处理-集群安装
- Validation of viewstate MAC failed 解决办法
- springmvc注入类 NoUniqueBeanDefinitionException: No qualifying bean of type [] is defined: expected sin
- springmvc注入类 NoUniqueBeanDefinitionException: No qualifying bean of type [] is defined: expected sin
- idea启动多个tomcat失败
- Log4Net 生成多个文件、文件名累加解决方法
- 【C#|.NET】lock(this)其实是个坑
- SpringMVC过程中@RequestBody接收Json的问题 总是报415
- 如何开发自己的搜索帝国之安装ik分词器
- 如何开发自己的搜索帝国之ES图形化Kibana安装与使用
- 高可用高性能分布式文件系统FastDFS进阶keepalived+nginx对多tracker进行高可用热备
- 分布式文件系统FastDFS如何做到高可用
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 解决 Error starting userland proxy: listen tcp 0.0.0.0:6379: bind: address already in use
- Qt官方示例-菜单栏
- 解决SpringBoot集成支付宝支付中文订单描述验签错误问题
- Linux挂载
- 一些字符处理函数
- SDAccel矩阵乘法优化(二)
- SDAccel矩阵乘法优化(三)
- matplotlib基础绘图命令之scatter
- SDAccel矩阵乘法优化(四)
- SDAccel结构实现之移位寄存器篇
- QML二维码生成器
- 精选MAC应用推荐,让你搬砖效率翻倍!
- SDAccel结构实现之脉动阵列篇
- SDAccel存储模型详解
- caffe详解之数据层