在Kettle里使用时间戳实现变化数据捕获(CDC)
1. 建立测试表,插入数据。
use test;
create table t_color (
id int unsigned not null auto_increment primary key,
color varchar(10),
create_date datetime,
last_update timestamp
) engine=myisam;
insert into t_color (color,create_date) values('black',now()),('green',now()),('red',now()),('blue',now());
select * from t_color;
2. 建立参数表存储最后一次的抽取时间。
use test;
-- 创建时间戳表
create table cdc_time (
last_load datetime,
current_load datetime) engine=myisam;
-- 初始化数据
insert into cdc_time values ('1971-01-01 00:00:01','1971-01-01 00:00:01');
select * from cdc_time;
3. 创建初始化时间戳转换
说明:
把current_load时间设置成作业的开始时间。通过“获取系统信息”完成这一功能,在这个步骤里创建一个“系统日期(变)”类型的字段,字段名是sysdate。然后创建一个“插入/更新”步骤,把“获取系统信息”步骤和“插入/更新”步骤连接起来。在“插入/更新”步骤的“更新字段”部分里,用流里的字段“sysdate”去更新表里的字段“current_load”。另外还要设置“用来查询的关键字”部分,把表的“current_load”的条件设置为“is not null”即可。
4. 创建查询变化数据的转换
说明: 从t_color表里抽取数据的查询语句使用开始日期和结束日期,左边闭区间,右边开区间。查询条件类似下面的语句: (create_date >= last_load and create_date < current_load) or (last_update >= last_load and last_update < current_load)
这里需要两个表输入步骤,一个用来从cdc_time表中抽取时间,另一个从t_color表中抽取需要的数据。另外再看查询条件,可以发现last_load和current_load分别出现两次。就是说在第一个表输入步骤中,这些时间值需要被抽取出来两次。
select
last_load last1,
current_load cur1,
last_load last2,
current_load cur2
from cdc_time;
在t_color表输入步骤里,选中“替换 sql 语句里的变量”,在“从步骤插入数据”下拉列表里选中上个表输入步骤。在select语句里写入下面的查询条件:
where (create_date >= ? and create_date < ?) or (last_update >= ? and last_update < ?) 前一个步骤传来的参数将替换上面语句里的问号,第一个问号的值是last1,第二个问号的值是cur1,等等。 通过比较create_date和last_update的值是否相等,可以判断出是新增的还是更改的数据。 case when create_date = last_update then 'new' else 'changed' end as flagfield 把变更数据输出到文本文件里。
5. 创建更新参数表的转换
说明:
如果转换中没有发生任何错误,要把current_load字段里的值复制到last_load字段里。如果转换中发生了错误,时间戳需要保持不变。把current_load字段里的值复制到last_load字段里需要“执行sql语句”步骤,脚本如下:
update cdc_time set last_load = current_load;
cdc_time表里之所以要有两个字段,是因为在加载过程中,会有新的数据被插入或更新,为避免脏读或死锁的情况,最好给create和update时间戳设定一个上限条件,也就是这里的current_load字段。 6. 创建作业
7. 测试 -- 运行作业 -- 查看diff文件
-- 查看cdc_time表
mysql> select * from cdc_time; +---------------------+---------------------+ | last_load | current_load | +---------------------+---------------------+ | 2014-12-16 11:10:05 | 2014-12-16 11:10:05 | +---------------------+---------------------+ 1 row in set (0.00 sec)
-- 修改数据
delete from t_color where id=3;
update t_color set color='Grey' where id=1;
insert into t_color (color,create_date) values('Yellow',now());
commit;
-- 运行作业 -- 查看diff文件
-- 查看cdc_time表
mysql> select * from cdc_time; +---------------------+---------------------+ | last_load | current_load | +---------------------+---------------------+ | 2014-12-16 11:16:02 | 2014-12-16 11:16:02 | +---------------------+---------------------+ 1 row in set (0.00 sec)
8. 总结
基于源数据的CDC要求源数据里有相关的属性列,ETL过程可以利用这些属性列,来判断出哪些数据是增量数据。最常见的属性列有以下两种:
- 时间戳:这种方法至少需要一个更新时间戳,但最好有两个时间戳:一个插入时间戳,记录数据行什么时候创建;一个更新时间戳,记录数据行什么时候最后一次更新。
- 序列:大多数数据库都有自增序列。如果数据库表用到了这种序列,就可以很容易识别出新插入的数据。
这两种方法都需要一个额外的数据库表来存储上一次更新时间或上一次抽取的最后一个序列号。在实践中,一般是在一个独立的模式下或在数据缓冲区里创建这个参数表,不能在数据仓库里创建,更不能在数据集市里创建。基于时间戳和自增序列的方法是CDC最简单的实现方式,所以也是最常用的方法。但是它的缺点也是很明显的,主要如下:
- 区分插入操作和更新操作:只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则无法区分。
- 删除记录的操作:不能捕获到删除操作,除非是逻辑删除,即记录没有真的删除,只是做了逻辑上的标志。
- 多次更新检测:如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更新操作都丢失了。
- 实时能力:时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据加载。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 一个比较自闭的SpringIOC问题
- 我在大厂写React,学到了什么?
- leetcode(4)寻找正序数组中位数
- jvm源码解析(二)HashMap
- 硬件笔记(23)---- PCB的保护走线
- TabLayout+ViewPager实现切页的示例代码
- jvm源码分析(四)ThreadPoolExecutor
- python zip,lambda,map函数代码实例
- python实现控制台输出彩色字体
- Mysql免安装版的使用
- jvm源码解析(八)动态代理是如何实现的,JDK Proxy于CGLib有什么区别
- jvm源码解析(五)synchronized和ReentrantLock
- 手把手教你构建自定义的Mimikatz二进制文件
- APACHE OFBIZ XMLRPC远程代码执行漏洞分析
- Intelspy:一款功能强大的自动化网络侦察扫描工具