Kettle构建Hadoop ETL实践(五):数据抽取

时间:2022-07-26
本文章向大家介绍Kettle构建Hadoop ETL实践(五):数据抽取,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

目录

一、Kettle数据抽取概览

1. 文件抽取

(1)处理文本文件

(2)处理XML文件

2. 数据库抽取

二、变化数据捕获

1. 基于源数据的CDC

2. 基于触发器的CDC

3. 基于快照的CDC

4. 基于日志的CDC

三、使用Sqoop抽取数据

1. Sqoop简介

2. 使用Sqoop抽取数据

3. Sqoop优化

(1)调整Sqoop命令行参数

(2)调整数据库

四、小结


本篇介绍如何利用Kettle提供的转换步骤和作业项实现Hadoop数据仓库的数据抽取,即ETL过程中的Extract部分。首先简述Kettle中几种抽取数据的组件,然后讲述变化数据捕获(Change Data Capture,CDC),以及Kettle如何支持不同的CDC技术。Hadoop生态圈中的Sqoop工具可以直接在关系数据库和HDFS或Hive之间互导数据,而Kettle支持Sqoop输入、输出作业项。最后我们使用Kettle里的Sqoop作业项以及基于时间戳的CDC转换实现销售订单示例的数据抽取过程,将MySQL中的源数据抽取到Hive的rds数据库中。

数据抽取是一个艰难的工作,因为数据源是多样和复杂的。在传统数据仓库环境下,数据通常来源于事务类应用系统,大部分这类系统都是把数据存储在MySQL、Oracle或SQL Server等关系数据库中。一般要从业务角度进行抽取,这也是一个挑战,从技术上来看,最好能使用JDBC直连数据库。如果数据库不是关系型的或者没有可用的驱动,一般就需要使用具有固定分隔符的文本文件来获取数据。还有一种情况是数据属于外部系统,不能直连,使用文本文件交换数据是唯一选择。除此之外,Kettle提供了几种方法来访问互联网数据,如通过RSS或者Salesforce.com网站直连,或者通过Web Service等。

一、Kettle数据抽取概览

Kettle大部分数据抽取类的步骤都放在“输入”类别下。输入类的步骤,顾名思义就是从外部数据源抽取数据,把数据输入到Kettle的数据流中。Kettle 8.3的输入类下有37个步骤,其中最常用的是“文本文件输入”和“表输入”。一般来说准备要读取的数据(尤其是文件类数据)的功能往往在作业里完成,实际读取数据才在转换这一层。各个步骤和作业项的功能选项,大都能直接从选项名称了解其含义。详细说明可使用Kettle在线帮助文档。在菜单条上选择“帮助” -> “显示欢迎屏幕” -> “Documentation”打开在线帮助文档。

1. 文件抽取

Kettle在转换里提供了文件基本的读写操作,对于文件的其它操作(移动、复制、创建、删除、比较、压缩、解压缩等)都在“文件管理”作业项中。在使用“文本文件输出”步骤前,不必先创建一个文件。如果文件不存在,该步骤会自动创建一个。下面介绍两种最常用的处理场景,即从文本文件与XML文件抽取数据。

(1)处理文本文件

文本文件可能是使用ETL工具处理的最简单的一种数据源,读写文本文件没有太多技巧。文本文件易于交换,压缩比较高,任何文本编辑器都可以用于打开文本文件。总体说有以下两类文本文件:

  • 固定分隔符文件:这种文件里,每列都由特定字符分隔。通常这类文件也称为CSV(逗号分隔值)文件或TSV(制表符分隔值)文件。
  • 固定宽度文件:每列都有指定的长度。尽管固定宽度文件的格式非常明确,但也需要一些时间来定义。Kettle在“固定宽度文件输入”的“获取字段”选项里提供了一些辅助工具,但如果要在分隔符文件和固定宽度文件之间选择,最好还是选择分隔符文件。

对于这两种文件,都可以选择文件编码。UTF-8是通常情况下的标准编码格式,但其它编码格式,如ANSI或UTF-8-BOM也在广泛使用。为了正常读取文件内容,必须要设置正确的文件编码。文件编辑软件能够查看文件编码,如使用Notepad++打开文件,选择“编码”菜单即可查看或修改当前文件的编码。

“CSV文件输入”是基本的文本文件输入步骤,CSV文件是一种用具有固定列分隔符的文本文件。在处理这种文件之前要确定分隔符和字段。“CSV文件输入”步骤和与之相似的“固定宽度文件输入”步骤都不太适合一次处理多个文件,这两个步骤其实都是“文本文件输入”步骤的简化版。“文本文件输入”步骤是一个功能强大的步骤,也是处理文本文件的首选步骤。其主要功能如下:

  • 从前一个步骤读取文件名。
  • 一次运行读取多个文件。
  • 从.zip或.gzip压缩文件中读取文件。
  • 不用指定文件结构就可以显示文件内容。注意需要指定文件格式(DOS、UNIX或Mixed),因为Kettle需要知道文件的换行符。在DOS用rn代表换行,UNIX只用n代表换行。
  • 指定转义字符,用来读取字段数据里包含分隔符的字段。通常的转义字符是反斜线()。
  • 错误处理。
  • 过滤。
  • 指定本地化的日期格式。

当然使用这些功能是有代价的,“文本文件输入”步骤比“CSV文件输入”步骤和“固定宽度文件输入”步骤需要占用更多内存和CPU处理能力。

下面看一个Kettle处理的常见场景。假设有一组zip压缩文件,每个zip文件中包含若干文本文件,所有文本文件具有相同的格式。需求是将文本文件中的记录抽取到数据库表中,并且标明每条记录所属的文本文件和zip文件。在“Kettle构建Hadoop ETL实践(一):ETL与Kettle”里介绍Kettle虚拟文件系统时,我们知道了Kettle使用Apache的通用VFS作为文件处理接口,能够直接读取zip压缩包中的多个文件,本例将使用这一特性。

我们用的例子文件是a.zip和b.zip,a.zip中包含1.txt和2.txt两个文件,b.zip中包含3.txt和4.txt两个文件。文本文件具有三个字段,以逗号作为列分隔符。4个文本文件的内容如下,反斜杠是转义字符:

# 1.txt
11,1a,aa,2020-01-01 01:01:01
12,1b,bb,2020-01-01 02:02:02
13,1c,cc,2020-01-01 03:03:03

# 2.txt
21,2a,aa,2020-02-02 01:01:01
22,2b,bb,2020-02-02 02:02:02
23,2c,cc,2020-02-02 03:03:03

# 3.txt
31,3a,aa,2020-03-03 01:01:01
32,3b,bb,2020-03-03 02:02:02
33,3c,cc,2020-03-03 03:03:03

# 4.txt
41,4a,aa,2020-04-04 01:01:01
42,4b,bb,2020-04-04 02:02:02
43,4c,cc,2020-04-04 03:03:03

创建的目标表如下,c1、c2、c3三个字段对应分别对应文本文件中的三列,c4字段存储记录所属的文件名:

create table t_txt (
  c1 int(11) default null,
  c2 varchar(20) default null,
  c3 datetime default null,
  c4 varchar(100) default null);

创建的Kettle转换如图5-1所示,包含“自定义常量数据”、“获取文件名”、“文本文件输入”、“表输出”四个步骤。

图5-1 从文本文件抽取数据

“自定义常量数据”步骤用于定义zip和txt的文件名。当然也可以直接在“获取文件名”步骤中的“文件或目录”写死所要读取的文件名。这里使用“自定义常量数据”步骤的目的是想使输入的文件名参数化,当需要从不同的文件抽取时,只需修改这个步骤,而后面的步骤都不用变更。

在“自定义常量数据”步骤里的“元数据”标签页中创建两个字符串类型的字段zip和txt,然后在“数据”标签页中给这两个字段赋值如图5-2所示。注意两个字段值的写法。zip字段以zip协议开头,后面是zip文件的绝对路径,以‘!/’结尾。txt字段值为正则表达式,表示zip包中所有‘.txt’后缀的文件。

图5-2 在“自定义常量数据”步骤中设置文件名

“获取文件名”步骤的设置如图5-3所示。选中“文件名定义在字段里”选项,“从字段获取文件名”选择“zip”,“从字段获取通配符”选择“txt”。这两个字段的值从前一步骤传递过来。

图5-3 “获取文件名”步骤

下一步骤是“文本文件输入”步骤。首先要确定文件的结构,打开“文本文件输入”步骤设置对话框,在“文件”标签页中点击“浏览”按钮,找到其中一个zip文件,然后点击“增加”按钮把这个文件添加到“选中的文件”列表中,如“zip:/root/kettle_hadoop/5/a.zip!/”。现在可以点击“文件”标签页中的“显示文件内容”按钮打开这个文件,可以看到这个文件的列分隔符、是否带有表头和封闭符等信息。我们可以使用这些信息来设置“内容”标签页里的选项,本例具体如图5-4所示。

图5-4 “内容”标签页定义文本文件格式

定义完文件格式后,再选择“字段”标签页并点击“获取字段”按钮。Kettle会尽量判断出每个字段的数据类型,本例中如图5-5所示。

图5-5 自动获取文本文件字段

为了验证设置的正确性,点击“预览记录”按钮,如果出现预览的数据,说明设置正确。下一步需要把“获取文件名”步骤和“文本文件输入”步骤连接起来。回到“文本文件输入”步骤的“文件”标签页,选中“从以前的步骤接受文件名”和“从以前的步骤接受字段名”,并选中“获取文件名”步骤作为文件名的来源,选中filename字段作为文件名的字段,该字段由“获取文件名”步骤所生成。注意现在不能再使用“预览记录”选项,只能在该步骤上选择转换里的预览。

我们注意到在“文本文件输入”步骤里也有路径和文件名正则表达式选项,但最好把选择文件的过程单独放在“获取文件名”步骤里。因为“获取文件名”步骤可以从前面的步骤获得路径名和文件名的正则表达式,这样比较灵活。而且“文本文件输入”步骤本身不能获取到文件名。 最后一个步骤是“表输出”,将文件内容装载到数据库表中。在该步骤中勾选“指定数据库字段”选项,然后在“数据库字段”标签页点击“获取字段”按钮,在“插入的字段”列表中将会出现前面步骤数据流中的所有字段。只需要选择表字段及其对应的流字段,本例中为:

c1    Field_000
c2    Field_001
c3    Field_002
c4    filename

保存并执行该转换后,t_txt表中数据如下:

mysql> select * from t_txt;
+------+-------+---------------------+-----------------------------------------------+
| c1   | c2    | c3                  | c4                                            |
+------+-------+---------------------+-----------------------------------------------+
|   11 | 1a,aa | 2020-01-01 01:01:01 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt |
|   12 | 1b,bb | 2020-01-01 02:02:02 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt |
|   13 | 1c,cc | 2020-01-01 03:03:03 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt |
|   21 | 2a,aa | 2020-02-02 01:01:01 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt |
|   22 | 2b,bb | 2020-02-02 02:02:02 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt |
|   23 | 2c,cc | 2020-02-02 03:03:03 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt |
|   31 | 3a,aa | 2020-03-03 01:01:01 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt |
|   32 | 3b,bb | 2020-03-03 02:02:02 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt |
|   33 | 3c,cc | 2020-03-03 03:03:03 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt |
|   41 | 4a,aa | 2020-04-04 01:01:01 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt |
|   42 | 4b,bb | 2020-04-04 02:02:02 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt |
|   43 | 4c,cc | 2020-04-04 03:03:03 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt |
+------+-------+---------------------+-----------------------------------------------+
12 rows in set (0.00 sec)

(2)处理XML文件

XML是扩展标识语言(eXtensible Markup Language)的缩写,是一种在平面文件中定义数据结构和内容的开放标准。XML格式非常流行,很多系统都使用这种格式交换数据。XML实际是一种遵照规范的结构化的文本文件,可以使用文本编辑器打开。Kettle里有四种验证XML数据是否有效的方法。

  • 验证XML文件是否有效:只验证XML是否有完整的开始和结束标签,及各层嵌套的结构是否完整。
  • DTD验证:检查XML文件的结构是否符合DTD(Data Type Definition)文件的要求。DTD可以是一个独立的文件,也可以包含在XML文件中。
  • XSD验证(作业):检查XML文件的结构是否符合XML Schema定义文件的要求。
  • XSD验证(转换):和上面相同,但XML是在数据流的字段里(如数据库的列里包含XML格式数据)。

可以使用“Get data from XML”步骤读取XML文件。读取XML文件的主要障碍就是分析嵌套的文件结构。从这个步骤输出的数据流是平面的没有嵌套的数据结构,可以存储在关系数据库中。与之相反,“Add XML”步骤用来把平面数据构造成嵌套形式的XML格式数据。

如果想把XML转成其它格式,如另一种格式的XML文件、平面文件或HTML文件,要使用“XSL transformation”步骤。XSL是扩展样式语言(eXtensible Stylesheet Language)的缩写,这是一种用来转换XML文档的XML语言。转换里的“XSD validator”步骤验证数据流里的XML格式的数据,作业里的“XSD validator”作业项用于验证一个完整的XML文件。 XML是一种非常灵活的格式,可以用来表达很多种数据结构,下面看一个简单的示例。首先准备一个XML文档,然后创建一个转换,从该文档抽取数据,并把数据保存在一个MySQL表中。最后再创建一个功能相反的转换,从MySQL表中抽取数据并保存成XML文件。

示例XML文档sample.xml的内容如下:

<rows>
  <info>
    <infodata user="user1">
      <data>data1</data>
    </infodata>
    <infodata user="user2">
      <data>data2</data>
    </infodata>
  </info>

  <row>
    <parameter>
      <user>user1</user>
      <password>pass1</password>
    </parameter>
    <parameter>
      <user>user2</user>
      <password>pass2</password>
    </parameter>
    <parameter>
      <user>user3</user>
      <password>pass3</password>
    </parameter>    
  </row>
</rows>

<rows>节点下包括了一个<info>节点和一个<row>节点。这两个节点分别又包含一组<infodata>节点和一组<parameter>节点。<infodata>节点具有属性user。<parameter>节点下的<user>节点包括了某个<infodata>节点的user属性值。

对应MySQL表t_xml结构如下:

mysql> desc t_xml;
+----------+-------------+------+-----+---------+-------+
| Field    | Type        | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| rn       | int(11)     | YES  |     | NULL    |       |
| username | varchar(20) | YES  |     | NULL    |       |
| pass     | varchar(20) | YES  |     | NULL    |       |
| info     | varchar(20) | YES  |     | NULL    |       |
| xmlfile  | varchar(50) | YES  |     | NULL    |       |
+----------+-------------+------+-----+---------+-------+
5 rows in set (0.01 sec)

rn存储记录行号,username和pass字段分别存储XML文档中<user>和<password>节点的值,info字段保存<data>节点的值,xmlfile保存XML文件名。

如图5-6所示的转换从sample.xml文件抽取数据并转载到数据库表中。

图5-6 抽取XML文件数据

这个转换只有“Get data from XML”和“表输出”两个步骤。“Get data from XML”步骤从静态XML文件读取数据,并输出XML节点值,本质上是将一个层次结构平面化展开的过程。

在该步骤的“文件”标签页选择要读取的XML文件。点击“浏览”按钮选择本地的sample.xml文件,然后点击“增加”按钮,/root/kettle_hadoop/5/sample.xml将出现在“选中的文件和目录”列表中。“内容”标签页定义XML文件格式,如图5-7所示。

图5-7 定义XML文件格式

标签页里最重要的属性是“循环读取路径”。这里需要设置一个XPath表达式。XPath表达式将从XML文档中过滤出一个节点集,就是XML节点的一个集合。集合里的每一个节点都将被解析为一行记录,并放到输出流中。本例中设置为/rows/row/parameter。如果已经在“文件”标签页中指定了一个XML文件,可以点击“获取XML文档的所有路径”按钮帮助设置XPath属性。这个按钮获取了XML文档里的全部路径,如图5-8所示。

图5-8 获取全部路径的选择列表

“内容”标签页里还包括以下属性。

  • 编码:用来定义XML文档的编码。如果XML文档本身没有指定编码,就要用到这个选项。通常情况下,XML文档的编码在文件头定义,例如:<?xml version="1.0" encoding="UTF-8"?>
  • 考虑命名空间:如果文档使用了命名空间,需要选中该选项。
  • 忽略注释:通常情况下,XML注释也被看作是节点,如果要忽略注释节点,要选中该选项。
  • 验证XML:如果想在抽取数据前使用DTD验证,要选中该选项。
  • 使用标记:该选项用于“字段”标签页的设置,在后面讨论。
  • 忽略空文件:如果指定的文件是空,不会抛出异常。
  • 如果没有文件不要报告错误:如果指定的文件不存在,不会抛出异常。
  • 限制:限制生成的最大记录行数,默认值为0,意味着对每一个抽取到的XML节点都生成一条记录。
  • 用于截取数据的XML路径(大文件):一般情况下,XML文档一次性读如内存,读取路径XPath表达式可以应用于整个文档。但如果XML文档非常大,XPath表达式匹配到的所有XML节点不能一次放入内存中,此时就需要指定另一个XPath表达式把XML文档分成多块,就是这里的XML截取路径。这个用于把XML文档分块的XPath路径不支持全部的XPath语法,只能使用斜线分隔的节点名这种语法格式,不支持命名空间和谓词表达式。另外截取路径XPath必须是读取路径的上一级或同级目录。
  • 输出中包括文件名/文件名字段:如果使用XML文件作为源,该选项可以在输出流中增加一个字段保存XML文件名。“文件名字段”选项设置新增字段的字段名。
  • 输出中包括行号/行数字段:该选项可以为每一个数据行生成一个序列号。“行数字段”选项设置行号字段的字段名。
  • 将文件增加到结果文件中:如果使用了XML文件,选中该选项把文件添加到结果文件列表中。在父作业中就可以再处理这个文件。

在“内容”标签页中已经使用XPath表达式匹配了XML节点集。“字段”标签页用来从XML节点抽取字段,如图5-9所示。

图5-9 定义抽取的字段

列表中的前两行是点击“获取字段”自动得到的。“名称”列用来设置要抽取的字段名。“XML路径”列使用XPath表达式指定从哪里获得字段的值。XPath表达式用来匹配XML数据行里的字段。下面详细说一下第三行data字段获取。

“字段”标签页里的XPath表达式支持一种非标准化的称为token的扩展形式。token用来参数化XPath表达式,它可以把字段值绑定到XPath表达式里。本例中data字段的XPath是../../info/infodata[@user=@_user-]/data。../代表的返回上一层,当前路径是/rows/row/parameter,因此对应的绝对路经为/rows/info。infodata[@user=@_user-]这一段指的是infodata目录下满足条件的用户,也就是token的作用所在。@user所引用的是infodata节点的user属性值,表达式@_user-就是token,这个token包括一个@符号,一条下划线,然后是字段名user,最后是一个短横线。可以看到token的功能和数据库中表join相似,user1的用户名密码等属性没有和data数据在一个读取路径下,那么通过token我们就可以像表一样给它们连接起来,得到user1的数值data1。

如果要使用token,需要选中“内容”标签页里的“使用标记”复选框。另外使用token有以下几个限制:

  • XML文档中被引用的节点(<infodata>)必须出现在引用它的节点(<user>)之前。
  • token里使用的字段(本例的user)必须出现在使用token的字段(本例的data)之前。
  • token语法只对“字段”标签页中的XPath表达式有效,不能用于“内容”标签页中的XPath表达式。

本例中的第二个步骤是“表输出”,只要连接到目标数据库表,勾选“指定数据库字段”选项,然后在“数据库字段”标签页定义表字段与流字段的关系如下:

username    user
pass        password
info        data
xmlfile     filename
rn          rn

保存并成功执行转换后,表t_xml数据如下:

mysql> select * from test.t_xml;
+------+----------+-------+-------+----------------------------------+
| rn   | username | pass  | info  | xmlfile                          |
+------+----------+-------+-------+----------------------------------+
|    1 | user1    | pass1 | data1 | /root/kettle_hadoop/5/sample.xml |
|    2 | user2    | pass2 | data2 | /root/kettle_hadoop/5/sample.xml |
|    3 | user3    | pass3 | NULL  | /root/kettle_hadoop/5/sample.xml |
+------+----------+-------+-------+----------------------------------+
3 rows in set (0.00 sec)

图5-10所示的转换执行一个反向的过程,读取数据库表数据,然后用数据生成XML节点。“表输入”步骤连接的数据库表就是上个转换所装载的t_xml。

图5-10 生成XML节点

“Add XML”步骤用于生成XML节点。对输入流里的每一行,该步骤会添加一个包含XML字符串的新字段,并把这一行发送到下一个步骤中。在配置对话框里有“内容”和“字段”两个标签页,可以设置生成的XML节点的名称、属性、内容等。本例中的内容标签页各选项值如下:

  • 编码:UTF-8
  • Output Value:xmlvaluename
  • 根XML元素:ROW
  • Omit XML header:勾选
  • Omit null values from XML result:勾掉

“内容”这个标签名字有一点令人迷惑,它实际用于设置生成的XML节点的属性,而不是它的内容。“编码”下拉列表用来指定一个编码(默认UTF-8)。“Output Value”属性设置保存XML节点的字段名。“根XML元素”属性设置XML节点的名称。注意,节点名称目前是一个字符串常量,不能指定一个字段来动态设置节点名称。“Omit XML header”复选框用来只生成XML片段,以后合并到其它XML文档中。对于最外层的节点来说,一定要清除这个选项,以便生成带有XML定义的XML文档。“Omit null values from XML result”复选框可以用来控制对NULL的展现,是对文档内容的设置。

“字段”标签页用来控制如何使用输入流字段生成XML文档的内容或属性。可以通过点击“获取字段”按钮,自动得到从前面的步骤输出的字段,本例中为表t_xml的 rn、username、pass、info四个字段,如图5-11所示。

图5-11 “Add XML”步骤的“字段”标签页

输入流字段可以通过四种方式来构成XML文档。

  • 生成“根XML元素”的子节点,把字段内容作为子节点的内容。表格中的“Element name”用来设置节点名。
  • 生成“根XML元素”的属性,把字段内容作为属性的内容。这种方式需要把表格里的“属性”列设置为Y,并把“Attribute parent name”列留空。
  • 把字段内容作为“根XML元素”的文本内容。这种方式的配置和上面的第一种方式的配置非常类似。唯一的不同之处是必须使用“根XML元素”的名字作为节点的名字。尽管配置变化不大,最后效果相差却很大:不会生成子节点,字段的值作为“根XML元素”节点的内容。
  • 生成“根XML元素”的子节点,把字段内容作为子节点的属性。这种方式的配置和第二种方式类似。不同之处就是需要在“Attribute parent name”列中输入要设置的节点的名字。

如果字段中有NULL值,默认情况下会产生一个空节点或属性值。可以选中“内容”标签页中的“Omit null values from XML result”选项来忽略这样的节点或属性值。

执行转换后,xmlvaluename字段的值如下,可以点击“Add XML”步骤右键菜单中的Preview菜单项来查看。

<Row><rn>1</rn><username>user1</username><pass>pass1</pass><info>data1</info></Row>
<Row><rn>2</rn><username>user2</username><pass>pass2</pass><info>data2</info></Row>
<Row><rn>3</rn><username>user3</username><pass>pass3</pass><info></info></Row>

2. 数据库抽取

本节讨论如何从传统关系型数据库抽取数据,从“表输入”步骤开始,用示例解释这个步骤里的参数和变量如何工作。源数据表就用处理文本文件时创建的t_txt表。“表输入”步骤的功能实际上是向所连接的数据库发送select查询语句,并将查询结果返回到输出流中。

可以有两种参数化的查询方法:使用参数和使用变量替换。使用参数的方法需要在“表输入”步骤前面有一个步骤,用来给“表输入”步骤提供一个或多个参数,这些参数替换“表输入”步骤的SQL语句里的问号。这种方法的配置窗口如图5-12所示。

图5-12 参数化查询

这个例子中的“自定义常量数据”步骤定义了两个常量a和b,数据类型分别是String和Date,两个常量的数据这就是后面“表输入”步骤查询语句中替换两个问号的数据。例如在“自定义常量数据”步骤的“数据”标签页中给常量a和b分别赋值‘a’和‘2020-02-02’,则转换执行时,“表输入”步骤的查询语句实际为:

SELECT
  c1
, c2
, c3
, c4
FROM t_txt
where c2 like concat('%','a','%') and c3 >='2020-02-02';

点击“表输入”步骤右键的Preview菜单项预览数据,显示如下:

21    2a,aa    2020/02/02 01:01:01.000000000    zip:file:///root/kettle_hadoop/5/a.zip!/2.txt
31    3a,aa    2020/03/03 01:01:01.000000000    zip:file:///root/kettle_hadoop/5/b.zip!/3.txt
41    4a,aa    2020/04/04 01:01:01.000000000    zip:file:///root/kettle_hadoop/5/b.zip!/4.txt

“表输入”步骤中的主要选项含义如下。

  • 允许简易转换:选中此选项后,在可能情况下避免转换进行不必要的数据类型转换,可以显著提高性能。
  • 替换SQL语句里的变量:选择此选项可替换脚本中的变量。此特性提供了使用变量替换的测试功能。
  • 从步骤插入数据:选择提供替换SQL语句中问号参数数据的步骤。
  • 执行每一行:选择此选项可对每一输入行执行查询。
  • 记录数量限制:指定要从数据库中读取的行数,缺省值0表示读取所有行。

本例的“自定义常量数据”步骤只用来演示,实际使用中,最好用其它步骤替换它。在本篇后面的CDC部分能看到一个类似的例子。

第二种参数化查询方法是使用变量,变量要在使用变量的转换之前的转换中进行设置。设置变量的转换如图5-13所示,设置变量的转换往往是作业里的第一个转换。

图5-13 设置变量的转换

两个变量var_c2和var_c3的值来自前面的“自定义常量数据”步骤里a和b定义的值。在后面转换的“表输入”步骤中可以使用这些变量,查询里的变量名被变量的值替换。使用变量的表输入步骤如图5-14所示。

图5-14 使用变量的表输入步骤

为了查看转换的执行结果,使用“文本文件输出”步骤将表输入步骤的查询结果写入一个文本文件。上面两个转换都在一个作业里,图5-15显示了这两个转换,第一个转换时设置变量,第二个转换使用变量作为表输入步骤的参数。

图5-15 使用变量的作业

本例中常量a和b的值分别为‘b’和‘2020-01-01’。执行作业后,生成的文本文件内容如下:

[root@localhost 5]# more file.txt 
12    1b,bb    2020-01-01 02:02:02    zip:file:///root/kettle_hadoop/5/a.zip!/1.txt
22    2b,bb    2020-02-02 02:02:02    zip:file:///root/kettle_hadoop/5/a.zip!/2.txt
32    3b,bb    2020-03-03 02:02:02    zip:file:///root/kettle_hadoop/5/b.zip!/3.txt
42    4b,bb    2020-04-04 02:02:02    zip:file:///root/kettle_hadoop/5/b.zip!/4.txt

二、变化数据捕获

抽取数据是ETL处理过程的第一个步骤,也是数据仓库中最重要和最具有挑战性的部分,适当的数据抽取是成功建立数据仓库的关键。从源抽取数据导入数据仓库或过渡区有两种方式,可以从源把数据抓取出来(拉),也可以请求源把数据发送(推)到数据仓库。影响选择数据抽取方式的一个重要因素是操作型系统的可用性和数据量,这是抽取整个数据还是仅仅抽取自最后一次抽取以来的变化数据的基础。我们考虑以下两个问题:

  • 需要抽取哪部分源数据加载到数据仓库?有两种可选方式,完全抽取和变化数据捕获。
  • 数据抽取的方向是什么?有两种方式,拉模式,即数据仓库主动去源系统拉取数据;推模式,由源系统将自己的数据推送给数据仓库。

对于第二个问题来说,通常要改变或增加操作型业务系统的功能是非常困难的,这种困难不仅是技术上的,还有来自于业务系统用户及其开发者的阻力。理论上讲,数据仓库不应该要求对源系统做任何改造,实际上也很少由源系统推数据给数据仓库。因此对这个问题的答案比较明确,大都采用拉数据模式。下面我们着重讨论第一个问题。

如果数据量很小并且易处理,一般来说采取完全源数据抽取,就是将所有的文件记录或所有的数据库表数据抽取至数据仓库。这种方式适合基础编码类型的源数据,比如邮政编码、学历、民族等。基础编码型源数据通常是维度表的数据来源。如果源数据量很大,抽取全部数据是不可行的,那么只能抽取变化的源数据,即最后一次抽取以来发生了变化的数据。这种数据抽取模式称为变化数据捕获,简称CDC,常被用于抽取操作型系统的事务数据,比如销售订单、用户注册,或各种类型的应用日志记录等。

CDC大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓侵入式的是指CDC操作会给源系统带来性能的影响。只要CDC操作以任何一种方式对源库执行了SQL语句,就可以认为是侵入式的CDC。常用的四种CDC方法是:基于源数据的CDC、基于触发器的CDC、基于快照的CDC、基于日志的CDC,其中前三种是侵入性的。表5-1总结了四种CDC方案的特点。

源数据

触发器

快照

日志

能区分插入/更新

周期内,检测到多次更新

能检测到删除

不具有侵入性

支持实时

不依赖数据库

表5-1 四种CDC方案比较

1. 基于源数据的CDC

基于源数据的CDC要求源数据里有相关的属性字段,抽取过程可以利用这些属性字段来判断哪些数据是增量数据。最常见的属性字段有以下两种。

  • 时间戳:这种方法至少需要一个更新时间戳,但最好有两个,一个插入时间戳,表示记录何时创建,一个更新时间戳,表示记录最后一次更新的时间。
  • 序列:大多数数据库系统都提供自增功能。如果数据库表列被定义成自增的,就可以很容易地根据该列识别出新插入的数据。

这种方法的实现较为简单,假设表t1中有一个时间戳字段last_inserted,t2表中有一个自增序列字段id,则下面SQL语句的查询结果就是新增的数据,其中{last_load_time}和{last_load_id}分别表示ETL系统中记录的最后一次数据装载时间和最大自增序列号。

select * from t1 where last_inserted > {last_load_time};
select * from t2 where id > {last_load_id};

通常需要建立一个额外的数据库表存储上一次更新时间或上一次抽取的最后一个序列号。实践中,一般是在一个独立的模式下或在数据过渡区里创建这个参数表。下面来看Kettle里使用时间戳方式CDC的例子。前一篇建立的ETL示例模型中,source.sales_order表的entry_date字段表示订单录入的时间。我们还需要把上一次装载时间存储在属性文件或参数表里。先使用下面的脚本在hive里的rds库中建立一个名为cdc_time的时间戳表,并设置初始数据。

use rds;  
  
drop table if exists cdc_time ;  
create table cdc_time  
( id int, last_load date, current_load date)  
clustered by (id) into 1 buckets     
stored as orc tblproperties ('transactional'='true');

insert into table cdc_time select 1, '1971-01-01', '1971-01-01' ;

后面的Kettle转换中需要对cdc_time执行行级更新,因此该表必须分桶、使用ORC格式、设置支持事务。id字段用于分桶,不做更新操作。时间戳有last_load和current_load两个字段。之所以需要两个字段,是因为抽取到的数据可能会多于本次需要处理的数据。比如,两点执行ETL过程,则零点到两点这两个小时的数据不会在本次处理。为了确定这个截至时间点,需要给时间戳设定一个上限条件,即这里的current_load字段值。本示例的时间粒度为每天,时间戳只要保留日期部分即可,因此数据类型选为date。最开始这个两个时间戳都设置成一个早于所有业务数据的时间,当开始装载时,current_load时间戳设置为当前时间。

该表的逻辑描述如下。 1. 装载作业开始后,作业要先把current_load设置成作业的开始日期,可以通过如图5-16的“设置系统日期”转换实现。

图5-16 “设置系统日期”转换

在“获取系统信息”步骤里创建一个当前日期的字段cur_date,以及一个前一天的日期pre_date字段,然后将两个字段的数据复制分发到“插入/更新”步骤和“字段选择”步骤。“插入/更新”步骤的“更新字段”部分,用流里的字段“cur_date”去更新表里的字段“current_load”。另外还要设置“用来查询的关键字”部分,把表的“current_load”条件设置为“IS NOT NULL”即可。其含义是当“current_load”为空时执行插入,否则执行更新操作。

在“字段选择”步骤的“元数据”标签页中,修改pre_date字段的类型为“Date”,格式为“yyyy-MM-dd”。格式化的前一天日期值传递给“设置变量”步骤,该步骤将pre_date字段值定义为一个变量PRE_DATE,用于将日期拼接到上传至HDFS的文件名中。变量活动类型(作用域)为“Valid in the root job”,即调用该转换的所有作业均可使用该变量。

2. 从sales_order表里抽取数据的查询使用开始日期和结束日期,如图5-17所示的“装载销售订单表”转换所示。

图5-17 “装载销售订单表”转换

“表输入”步骤获取到cdc_time表的last_load和current_load日期。“数据库连接步骤”用前一步骤获得的last_load和current_load值替换查询语句中问号标识的参数。通过比较表字段entry_date的值判断新增的数据。这里假设源系统中销售订单记录一旦入库就不再改变,或者可以忽略改变。也就是说销售订单是一个随时间变化单向追加数据的表。sales_order表中有两个关于时间的字段,order_date表示订单时间,entry_date表示订单数据实际插入表里的时间,在后面第九篇“(九)事实表技术”讨论“迟到的事实”时就会看到两个时间可能不同。那么用哪个字段作为CDC的时间戳呢?设想这样的情况,一个销售订单的订单时间是2020年1月1日,实际插入表里的时间是2020年1月2日,ETL每天0点执行,抽取前一天的数据。如果按order_date抽取数据,条件为where order_date >= '2020-01-02' AND order_date < '2020-01-03',则2020年1月3日0点执行的ETL不会捕获到这个新增的订单数据,所以应该以entry_date作为CDC的时间戳。

最后将新增数据通过“Hadoop file output”步骤上传到rds.sales_order表对应的HDFS目录下。“文件”标签页中的“Hadoop Cluster”选择CDH631,“Folder/File”输入“/user/hive/warehouse/rds.db/sales_order/sales_order_{PRE_DATE}”,其中{PRE_DATE}引用的就是图5-16中“设置变量”步骤定义的变量。“内容”标签页指定分隔符为逗号,格式选择Unix,编码为UTF-8。“字段”标签页选择sales_order表中全部六个字段。

3. 如果转换中没有发生任何错误,要把current_load字段里的值复制到last_load字段里,用如图5-18所示的“SQL”作业项实现。如果转换中发生了错误,时间戳需要保持不变,以便后面再次执行。

图5-18 更新last_load的“SQL”作业项

将上述转换和作业项放到一个作业中,如图5-19所示。

图5-19 基于时间戳的CDC作业

首次作业成功执行后,hive表sales_order所对应的HDFS目录下生成了一个带有前一天日期的文件:

[root@manager~]#hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/
Found 1 items
-rw-r--r--   3 root hive       5892 2020-09-28 13:38 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-24.txt
[root@manager~]#

rds.sales_order装载全部100条销售订单记录,rds.cdc_time的last_load和current_load均更新为当前日期:

hive> use rds;
OK
hive> select * from sales_order;
OK
1    6    2    2020-03-01 20:13:34    2020-03-01 20:13:34    3777.00
2    4    2    2020-03-03 19:07:07    2020-03-03 19:07:07    9227.00
...
99    3    1    2020-08-29 01:20:11    2020-08-29 01:20:11    9058.00
100    1    2    2020-08-31 09:43:38    2020-08-31 09:43:38    5607.00
Time taken: 2.41 seconds, Fetched: 100 row(s)
hive> select * from cdc_time;
OK
1    2020-09-25    2020-09-25
hive>

基于时间戳和自增序列的方法是CDC最简单的实现方式,也是最常用的方法,但它的缺点也很明显,主要如下:

  • 不能区分插入和更新操作。只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则不能区分。
  • 不能记录删除记录的操作。不能捕获到删除操作,除非是逻辑删除,即记录没有被真的删除,只是做了逻辑上的删除标志。
  • 无法识别多次更新。如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更行操作将会丢失。
  • 不具有实时能力。时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据抽取。

这种方法是具有侵入性的,如果操作型系统中没有时间戳或时间戳信息是不可用的,那么不得不通过修改源系统把时间戳包含进去,要求修改操作型系统的表包含一个新的时间戳字段。有些数据库系统可以自动维护timestamp类型的值。如在MySQL中只要如下定义,当执行insert或update操作时,所影响数据行的ts字段将会自动更新为当前时间:

alter table t1 add column ts timestamp default current_timestamp on update current_timestamp;

而有些数据库系统,需要建立一个触发器,在修改一行时更新时间戳字段的值。下面是一个Oracle数据库的例子。当t1表上执行了insert或update操作时,触发器会将last_updated字段更新为当前系统时间。

alter table t1 add last_updated date;

create or replace trigger trigger_on_t1_change
   before insert or update
   on t1
   for each row
begin
   :new.last_updated := sysdate;
end;
/

在实施这些操作前必须被源系统的拥有者所接受,并且要仔细评估对源系统产生的影响。

2. 基于触发器的CDC

当执行INSERT、UPDATE、DELETE这些SQL语句时,可以激活数据库里的触发器,并执行一些动作,就是说触发器可以用来捕获变更的数据并把数据保存到中间临时表里。然后这些变更的数据再从临时表中取出,被抽取到数据仓库的过渡区里。大多数场合下,不允许向操作型数据库里添加触发器(业务数据库的变动通常都异常慎重),而且这种方法会降低系统的性能,所以此方法用的并不是很多。

作为直接在源数据库上建立触发器的替代方案,可以使用源数据库的复制功能,把源数据库上的数据复制到备库上,在备库上建立触发器以提供CDC功能。尽管这种方法看上去过程冗余,且需要额外的存储空间,但实际上这种方法非常有效,而且没有侵入性。复制是大部分数据库系统的标准功能,如MySQL、Oracle和SQL Server等都有各自的数据复制方案。

一个类似于内部触发器的例子是Oracle的物化视图日志。这种日志被物化视图用来识别改变的数据,并且这种日志对象能够被最终用户访问。一个物化视图日志可以建立在每一个需要捕获变化数据的源表上。之后任何时间在源表上对任何数据行做修改时,都有一条记录插入到物化视图日志中表示这一行被修改了。如果想使用基于触发器的CDC机制,并且源数据库是Oracle,这种物化视图日志方案是很方便的。物化视图日志依赖于触发器,但是它们提供了一个益处是,建立和维护这个变化数据捕获系统已经由Oracle自动管理了。我们甚至可以在物化视图上建立自己的触发器,每次物化视图刷新时,触发器基于刷新时间点的物化视图日志归并结果,在一些场景下(只要记录两次刷新时间点数据的差异,不需要记录两次刷新之间的历史变化)可以简化应用处理。下面是一个Oracle物化视图的例子。每条数据的变化可以查询物化视图日志表mlog$_tbl1,两个刷新时间点之间的数据差异,可以查询mv_tbl1_tri表。

-- 建立mv测试表  
create table tbl1(a number,b varchar2 (20));  
create unique index tbl1_pk on tbl1 (a);  
alter table tbl1 add (constraint tbl1_pl primary key(a));  
-- 建立mv日志,单一表聚合视图的快速刷新需要指定including new values子句  
create materialized view log on tbl1 including new values;  
-- 建立mv  
create materialized view mv_tbl1 build immediate refresh fast  
start with to_date('2013-06-01 08:00:00','yyyy-mm-dd hh24:mi:ss')  
next sysdate + 1/24  
as select * from tbl1;  
-- 建立trigger测试表  
create table mv_tbl1_tri (a number,b varchar (20),c varchar (20));  
-- 建立trigger  
create or replace trigger tri_mv  
   after delete or insert or update  
   on mv_tbl1  
   referencing new as new old as old  
   for each row  
begin  
   case  
      when inserting then  
         insert into mv_tbl1_tri values (:new.a, :new.b, 'insert');  
      when updating then  
         insert into mv_tbl1_tri values (:new.a, :new.b, 'update');  
      when deleting then  
         insert into mv_tbl1_tri values (:old.a, :old.b, 'delete');  
   end case;  
exception  
   when others then  
      raise;  
end tri_mv;  
/  
-- 对表tbl1进行一系列增删改操作
-- ...
 
-- 手工刷新mv  
exec dbms_mview.refresh('mv_tbl1');  
-- 查看物化视图日志
select * from mlog$_tbl1;  
-- 检查trigger测试表  
select * from mv_tbl1_tri; 

3. 基于快照的CDC

如果没有时间戳,也不允许使用触发器,就要使用快照表了。可以通过比较源表和快照表来获得数据变化。快照就是一次性抽取源系统中的全部数据,把这些数据装载到数据仓库的过渡区中。下次需要同步时,再从源系统中抽取全部数据,并把全部数据也放到数据仓库的过渡区中,作为这个表的第二个版本,然后再比较这两个版本的数据,从而找到变化。

有多个方法可以获得这两个版本数据的差异。假设表有两个列id和name,id是主键列。该表的第一、二个版本的快照表名为snapshot_1、snapshot_2。下面的SQL语句在主键id列上做全外链接,并根据主键比较的结果增加一个标志字段,I表示新增,U表示更新,D代表删除,N代表没有变化。外层查询过滤掉没有变化的记录。

select * from 
(select case when t2.id is null then 'D'
             when t1.id is null then 'I'
                when t1.name <> t2.name then 'U'
             else 'N'
            end as flag,
           case when t2.id is null then t1.id else t2.id end as id, t2.name
   from snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) a
 where flag <> 'N';

当然,这样的SQL语句需要数据库支持全外链接,对于MySQL这样不支持全外链接的数据库,可以使用类似下面的SQL语句:

select 'U' as flag, t2.id as id, t2.name as name
  from snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.id
 where t1.name != t2.name
 union all 
select 'D' as flag, t1.id as id, t1.name as name
  from snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.id
 where t2.id is null
 union all 
select 'I' as flag, t2.id as id, t2.name as name
  from snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.id
 where t1.id is null;

Kettle里的“合并记录”步骤能够比较两个表的差异。该步骤读取两个使用关键字排序的输入数据流,并基于数据流里的关键字比较其它字段。可以选择要比较的字段,并设置一个标志字段,作为比较结果输出字段。我们用示例模型里的source.sales_order表做个例子。

1. 先把source.sales_order表复制到另一个数据库中。

create table test.sales_order select * from source.sales_order;

2. 创建一个用于快照CDC的转换,如图5-20所示。

图5-20 用于快照CDC的转换

创建两个“表输入”步骤,一个连接source.sales_order,另一个连接test.sales_order,SQL查询语句如下:

SELECT
  order_number
, customer_number
, product_code
, order_date
, entry_date
, order_amount
FROM sales_order order by order_number;

然后添加一个“合并记录”步骤,如图5-21所示。

图5-21 “合并记录”步骤设置

把两个表输入步骤都连接到“合并记录”步骤,在步骤中在选择新旧数据源,设置标志字段名,该字段的值为new、changed、deleted或identical,分别标识新增、修改、删除和没有变化的记录。另外设置主键字段和需要比较的字段。

为了过滤没有发生变化的数据,在后面加一个“过滤记录”步骤,过滤条件是“flagfield=identical”,把所有没有变化的数据发送到“空操作”步骤,把新增、修改、删除的数据发送到“数据同步”步骤,该步骤可以根据标志字段自动进行新增、修改、删除等操作。“一般”和“高级”标签页的配置如图5-22所示。

图5-22 “数据同步”步骤设置

根据数据流中flagfield字段的值决定要执行的插入、更新或删除操作。当目标表test.sales_order中的order_number与数据流order_number相同时,更新目标表的全部六个字段。

3. 验证转换 初始source.sales_order和test.sales_order两个表数据相同:

+--------------+-----------------+--------------+---------------------+---------------------+--------------+
| order_number | customer_number | product_code | order_date          | entry_date          | order_amount |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
...
|           98 |               2 |            1 | 2020-08-27 14:02:35 | 2020-08-27 14:02:35 |      8144.00 |
|           99 |               3 |            1 | 2020-08-29 01:20:11 | 2020-08-29 01:20:11 |      9058.00 |
|          100 |               1 |            2 | 2020-08-31 09:43:38 | 2020-08-31 09:43:38 |      5607.00 |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
100 rows in set (0.00 sec)

对source.sales_order数据做一些修改:

insert into source.sales_order values (101,1,1,now(),now(),100);
delete from source.sales_order where order_number=99;
update source.sales_order set order_amount=5606 where order_number=100;

预览“合并记录”步骤的数据:

order_number    customer_number    product_code    order_date    entry_date    order_amount    flagfield
...
98    2    1    2020/08/27 14:02:35.000000000    2020/08/27 14:02:35.000000000    8144.0    identical
99    3    1    2020/08/29 01:20:11.000000000    2020/08/29 01:20:11.000000000    9058.0    deleted
100    1    2    2020/08/31 09:43:38.000000000    2020/08/31 09:43:38.000000000    5606.0    changed
101    1    1    2020/09/24 16:53:56.000000000    2020/09/24 16:53:56.000000000    100.0    new

成功执行转换后,test.sales_order的数据已经和source.sales_order同步:

+--------------+-----------------+--------------+---------------------+---------------------+--------------+
| order_number | customer_number | product_code | order_date          | entry_date          | order_amount |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
...
|           98 |               2 |            1 | 2020-08-27 14:02:35 | 2020-08-27 14:02:35 |      8144.00 |
|          100 |               1 |            2 | 2020-08-31 09:43:38 | 2020-08-31 09:43:38 |      5606.00 |
|          101 |               1 |            1 | 2020-09-24 16:53:56 | 2020-09-24 16:53:56 |       100.00 |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
100 rows in set (0.00 sec)

4. 恢复原数据

insert into source.sales_order values (99,3,1,'2020-08-29 01:20:11','2020-08-29 01:20:11',9058);
update source.sales_order set order_amount=5607 where order_number=100;
delete from source.sales_order where order_number=101;

基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的优点。它的缺点是需要大量的存储空间来保存快照,因为比较的是两个全量数据集合。同样的原因,当表很大时,这种查询会有比较严重的性能问题。

4. 基于日志的CDC

最复杂的和最没有侵入性的CDC方法是基于日志的方式。数据库会把每个插入、更新、删除操作记录到日志里。如使用MySQL数据库,只要在数据库服务器中启用二进制日志(设置log_bin服务器系统变量),之后就可以实时从数据库日志中读取到所有数据库写操作,并使用这些操作来更新数据仓库中的数据。现在十分流行的canal就是基于这个原理,将自己模拟成一个从库,接收主库的二进制日志,从而捕获数据变化。

也可以手工解析二进制日志,将其转为可以理解的格式,然后再把里面的操作按照顺序读取出来。MySQL提供了一个叫做mysqlbinlog的日志读取工具。这个工具可以把二进制的日志格式转换为可读的格式,然后就可以把这种格式的输出保存到文本文件里,或者直接把这种格式的日志应用到MySQL客户端用于数据还原操作。mysqlbinlog工具有很多命令行参数,其中最重要的一组参数可以设置开始/截止时间戳,这样能够只从日志里截取一段时间的日志。另外,日志里的每个日志项都有一个序列号,也可以用来做偏移操作。MySQL的日志提供了上述两种方式来防止CDC过程发生重复或丢失数据的情况。下面是使用mysqlbinlog的两个例子。

mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456
mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt

第一条命令将jbms_binlog.000002文件中从120偏移量以后的操作应用到一个MySQL数据库中。第二条命令将jbms_binlog.000002文件中一段时间的操作格式化输出到一个文本文件中。

其它数据库也有类似的方法,下面再来看一个使用Oracle日志分析的实例。有个项目提出的需求是这样的:部署两个相同的Oracle数据库A、B,两个库之间没有网络连接,要定期把A库里的数据复制到B库。要求:1. 应用程序不做修改。2. 实现增量数据更新,并且不允许重复数据导入。

Oracle提供了DBMS_LOGMNR系统包可以分析归档日志。我们只要将A库的归档日志文件通过离线介质拷贝到B库中,再在B库上使用DBMS_LOGMNR解析归档日志,最后将格式化后的输出应用于B库。使用DBMS_LOGMNR分析归档日志并redo变化的方案如下:

  1. A库上线前数据库需要启用归档日志。
  2. 每次同步数据时对A库先执行一次日志切换,然后拷贝归档日志文件到B库,拷贝后删除A库的归档日志。
  3. 在B库上使用DBMS_LOGMNR分析归档日志文件并重做变化。

因为网不通,手工拷贝文件的工作不可避免,所以可以认为上述步骤均为手工操作。第1步为上线前的数据库准备,是一次性工作;第2、3步为周期性工作。对于第3步,可以用PL/SQL脚本实现。首先在B库机器上上规划好目录,这里D:logmine为主目录,D:logmineredo_log存放从A库拷贝来的归档日志文件。然后在B库上执行一次初始化对象脚本,建立一个外部表,存储归档日志文件名称。

create or replace directory logfilename_dir as 'D:logmine';  
grant read, write on directory logfilename_dir to u1;  
  
conn user1/password1  
  
begin  
   excute immediate 'create table logname_ext (logfile_name varchar2(300)) organization external (type oracle_loader default directory data_dir logfilename_dir location (''log_file_name.txt''))';  
exception when others then  
   if sqlcode = -955 then -- 名称已由现有对象使用  
      null;  
   else  
      raise;  
   end if;   
end;  
/

每次数据同步时要做的工作是:

  1. 拷贝A库归档日志文件到B的D:logmineredo_log目录。
  2. 执行D:logminecreate_ext_table.bat。
  3. 前面步骤成功执行后,删除第1步拷贝的归档日志文件。

create_ext_table.bat脚本文件内容如下:

echo off  
dir /a-d /b /s D:logmineredo_log*.log > D:logminelog_file_name.txt  
sqlplus user1/password1 @D:logminecreate_ext_table.sql

create_ext_table.sql脚本文件的内容如下:

begin  
   for x in (select logfile_name from logname_ext) loop  
       dbms_logmnr.add_logfile(x.logfile_name);  
   end loop;  
end;  
/  
  
execute dbms_logmnr.start_logmnr(options => dbms_logmnr.committed_data_only);  
  
begin  
   for x in (select sql_redo   
               from v$logmnr_contents 
            -- 只应用U1用户模式的数据变化,一定要按提交的SCN排序 
            where table_space != 'SYSTEM' and instr(sql_redo,'"U1".') > 0  
            order by commit_scn)  
   loop  
      execute immediate x.sql_redo;  
   end loop;  
end;  
/  
  
exit;

使用基于数据库的日志工具也有缺陷,即只能用来处理一种特定的数据库,如果要在异构的数据库环境下使用基于日志的CDC方法,就要使用Oracle GoldenGate之类的商业软件。

三、使用Sqoop抽取数据

有了前面的讨论和实验,我们现在已经可以处理从源系统获取数据的各种情况。回想上一篇建立的销售订单示例,源系统的MySQL数据库中已经添加好测试数据,Hive中建立了rds数据库作为过渡区,dw库存储维度表和事实表。这里我们将使用一种新的工具将MySQL数据抽取到Hive的rds库中,它就是Sqoop。

1. Sqoop简介

Sqoop是一个在Hadoop与结构化数据存储(如关系数据库)之间高效传输大批量数据的工具。它在2012年3月被成功孵化,现在已是Apache的顶级项目。Sqoop有Sqoop1和Sqoop2两代,Sqoop1最后的稳定版本是1.4.7,Sqoop2最后版本是1.99.6。需要注意的是,1.99.6与1.4.7并不兼容,而且截止目前为止,1.99.6并不完善,不推荐在生产环境中部署。

第一代Sqoop的设计目标很简单:

  • 在企业级数据仓库、关系数据库、文档系统和HDFS、HBase或Hive之间导入导出数据。
  • 基于客户端的模型。
  • 连接器使用厂商提供的驱动。
  • 没有集中的元数据存储。
  • 只有Map任务,没有Reduce任务,数据传输和转化都由Mappers提供。
  • 可以使用Oozie调度和管理Sqoop作业。

Sqoop1是用Java开发的,完全客户端驱动,严重依赖于JDBC,可以使用简单的命令行命令导入导出数据。例如:

# 把MySQL中testdb.PERSON表的数据导入HDFS
sqoop import --connect jdbc:mysql://localhost/testdb --table PERSON --username test --password 123456

上面这条命令形成一系列任务:

  • 生成MySQL的SQL代码。
  • 执行MySQL的SQL代码。
  • 生成Map作业。
  • 执行Map作业。
  • 数据传输到HDFS。
# 将HDFS上/user/localadmin/CLIENTS目录下的文件导出到MySQL的testdb.CLIENTS_INTG表中
sqoop export --connect jdbc:mysql://localhost/testdb --table CLIENTS_INTG --username test --password 123456 --export-dir /user/localadmin/CLIENTS

上面这条命令形成一系列任务:

  • 生成Map作业。
  • 执行Map作业。
  • 从HDFS的/user/localadmin/CLIENTS路径传输数据。
  • 生成MySQL的SQL代码。
  • 向MySQL的testdb.CLIENTS_INTG表插入数据

Sqoop1有许多简单易用的特性,如可以在命令行指定直接导入至Hive或HDFS。连接器可以连接大部分流行的数据库:Oracle、SQLServer、MySQL、Teradata、PostgreSQL等。Sqoop1的主要问题包括:繁多的命令行参数;不安全的连接方式,如直接在命令行写密码等;没有元数据存储,只能本地配置和管理,使复用受到限制。

Sqoop2体系结构比Sqoop1复杂得多,它被设计用来解决Sqoop1的问题,主要体现在易用性、可扩展性和安全性三个方面。

易用性 Sqoop1需要客户端的安装和配置,而Sqoop2是在服务器端安装和配置。这意味着连接器只在一个地方统一配置,由管理员角色管理,操作员角色使用。类似地,只需要在一台服务器上配置JDBC驱动和数据库连接。Sqoop2还有一个基于Web的服务:前端是命令行接口(CLI)和浏览器,后端是一个元数据知识库。用户可以通过交互式的Web接口进行导入导出,避免了错误选项和繁冗步骤。Sqoop2还在服务器端整合了Hive和HBase。Oozie通过REST API管理Sqoop任务,这样当安装一个新的Sqoop连接器后,无需在Oozie中安装它。

可扩展性 在Sqoop2中,连接器不再受限于JDBC的SQL语法,如不必指定database、table等,甚至可以定义自己使用的SQL方言。例如,Couchbase不需要指定表名,只需在充填或卸载操作时重载它。通用的功能将从连接器中抽取出来,使之只负责数据传输。在Reduce阶段实现通用功能,确保连接器可以从将来的功能性开发中受益。连接器不再需要提供与其它系统整合等下游功能,因此,连接器的开发者不再需要了解所有Sqoop支持的特性。

安全性 Sqoop1用户是通过执行sqoop命令运行Sqoop。Sqoop作业的安全性主要由是否对执行Sqoop的用户信任所决定。Sqoop2将作为基于应用的服务,通过按不同角色连接对象,支持对外部系统的安全访问。为了进一步安全,Sqoop2不再允许生成代码、请求直接访问Hive或HBase,也不对运行的作业开放访问所有客户端的权限。Sqoop2将连接作为一级对象,包含证书的连接一旦生成,可以被不同的导入导出作业多次使用。连接由管理员生成,被操作员使用,因此避免了最终用户的权限泛滥。此外,连接可以被限制只能进行某些基本操作,如导入导出,还可通过限制同一时间打开连接的总数和一个禁止连接的选项来管理资源。

当前的Sqoop2还缺少Sqoop1的某些特性,因此Cloudera的建议是,只有当Sqoop2完全满足需要的特性时才使用它,否则继续使用Sqoop1。CDH 6.3.1中只包含Sqoop1,版本为1.4.7。

2. 使用Sqoop抽取数据

在销售订单示例中使用Sqoop1进行数据抽取。表5-2汇总了示例中维度表和事实表用到的源数据表及其抽取模式。

源数据表

rds库中的表

dw库中的表

抽取模式

customer

customer

customer_dim

整体、拉取

product

product

product_dim

整体、拉取

sales_order

sales_order

order_dim、sales_order_fact

基于时间戳的CDC、拉取

表5-2 销售订单抽取模式

对于customer、product这两个表采用整体拉取的方式抽数据。ETL通常是按一个固定的时间间隔,周期性定时执行的,因此对于整体拉取的方式而言,每次导入的数据需要覆盖上次导入的数据。Kettle作业中的“Sqoop import”作业项,可以调用Sqoop命令,从关系数据库抽取数据到HDFS或hive表。我们使用该作业项将源库中的customer、product两表数据全量覆盖导入hive表所对应的HDFS目录,而调用图5-19所示的作业,实现对sales_order表的增量数据导入。整体作业如图5-23所示。

图5-23 将数据从source库抽取到rds库的作业

“Sqoop import customer”作业项选项设置如图5-24所示。

图5-24 “Sqoop import customer”作业项设置

源库表为MySQL的customer表,目标为CDH631集群中,hive库表rds.customer所对应的HDFS目录/user/hive/warehouse/rds.db/customer。点击“Advanced Options”,将显示所有Sqoop所支持的命令行参数。通过点击“List View”或“Command Line View”图标,参数将分别以列表或命令行形式展现。这里只需设置“delete-target-dir”参数的值为true。Sqoop import要求目标HDFS的目录为空,为了能够幂等执行作业,需要设置delete-target-dir参数。所谓幂等操作指的是其执行任意多次所产生的影响均与一次执行的影响相同。这样就能在导入失败或修复bug后可以再次执行该操作,而不用担心重复执行会对系统造成数据混乱。定义好的作业项等价于以下sqoop命令:

sqoop import --connect jdbc:mysql://node3:3306/source --delete-target-dir --password 123456 --table customer --target-dir /user/hive/warehouse/rds.db/customer --username root

“Sqoop import product”作业项只是将源和目标表换成了product,其它都与“Sqoop import customer”相同。“load_sales_order”子作业调用图5-19所示的基于时间戳的CDC作业,向rds.sales_order表增量装载数据。

下面测试增量导入。前面介绍基于时间戳的CDC时,我们已经首次执行过装载sales_order表的作业,cdc_time表的日期为'2020-09-25'。现在向MySQL源库增加两条数据:

use source;
set @customer_number := floor(1 + rand() * 6);    
set @product_code := floor(1 + rand() * 2);    
set @order_date := from_unixtime(unix_timestamp('2020-09-26') + rand() * (unix_timestamp('2020-09-27') - unix_timestamp('2020-09-26')));  
set @amount := floor(1000 + rand() * 9000);  
  
insert into sales_order   
values (101,@customer_number,@product_code,@order_date,@order_date,@amount);  
  
set @customer_number := floor(1 + rand() * 6);  
set @product_code := floor(1 + rand() * 2);  
set @order_date := from_unixtime(unix_timestamp('2020-09-27') + rand() * (unix_timestamp('2020-09-28') - unix_timestamp('2020-09-27')));  
set @amount := floor(1000 + rand() * 9000);  
  
insert into sales_order   
values (102,@customer_number,@product_code,@order_date,@order_date,@amount);  
  
commit;

上面的语句向sales_order插入了两条记录,一条是9月26日的,另一条是9月27日的:

+--------------+-----------------+--------------+---------------------+---------------------+--------------+
| order_number | customer_number | product_code | order_date          | entry_date          | order_amount |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
...
|          101 |               4 |            1 | 2020-09-26 21:51:18 | 2020-09-26 21:51:18 |      3402.00 |
|          102 |               4 |            1 | 2020-09-27 06:15:43 | 2020-09-27 06:15:43 |      6963.00 |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
102 rows in set (0.01 sec)

下面执行图5-23所示的Kettle作业。customer、product重新全量覆盖装载数据,sales_order表只装载最新的两条数据。作业成功执行后,HDFS目录/user/hive/warehouse/rds.db/sales_order/下有两个文件:

[root@manager~]#hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/
Found 2 items
-rw-r--r--   3 root hive       5892 2020-09-28 13:38 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-24.txt
-rw-r--r--   3 root hive        120 2020-09-28 15:32 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-27.txt
[root@manager~]#

rds.sales_order表数据如下:

hive> select * from rds.sales_order;
OK
1    6    2    2020-03-01 20:13:34    2020-03-01 20:13:34    3777.00
2    4    2    2020-03-03 19:07:07    2020-03-03 19:07:07    9227.00
...
101    4    1    2020-09-26 21:51:18    2020-09-26 21:51:18    3402.00
102    4    1    2020-09-27 06:15:43    2020-09-27 06:15:43    6963.00
Time taken: 3.168 seconds, Fetched: 102 row(s)
hive>

时间戳表rds.cdc_time数据也已经更新为当前日期:

hive> select * from rds.cdc_time;
OK
1    2020-09-28    2020-09-28
Time taken: 1.369 seconds, Fetched: 1 row(s)
hive>

作业的执行结果符合预期。

3. Sqoop优化

当使用Sqoop在关系数据库和HDFS之间传输数据时,有多个因素影响其性能。可以通过调整Sqoop命令行参数或数据库参数优化Sqoop的性能。本节简要描述这两种优化方法。

(1)调整Sqoop命令行参数

可以调整下面的Sqoop参数优化性能。

  • batch:该参数的语法是--batch,指示使用批处理模式执行底层的SQL语句。在导出数据时,该参数能够将相关的SQL语句组合在一起批量执行。也可以使用有效的API在JDBC接口中配置批处理参数。
  • boundary-query:指定导入数据的范围值。当仅使用split-by参数指定的分隔列不是最优时,可以使用boundary-query参数指定任意返回两个数字列的查询。它的语法如下:--boundary-query select min(id), max(id) from <tablename>。在配置boundary-query参数时,查询语句中必须连同表名一起指定min(id)和max(id)。如果没有配置该参数,缺省时Sqoop使用select min(<split-by>), max(<split-by>) from <tablename>查询找出分隔列的边界值。
  • direct:该参数的语法是--direct,指示在导入数据时使用关系数据库自带的工具(如果存在的话),如MySQL的mysqlimport。这样可以比jdbc连接的方式更为高效地将数据导入到关系数据库中。
  • Dsqoop.export.records.per.statement:在导出数据时,可以将Dsqoop.export.records.per.statement参数与批处理参数结合在一起使用。该参数指示在一条insert语句插入的行数。当指定了这个参数时,Sqoop运行下面的插入语句:INSERT INTO table VALUES (...), (...), (...),...;某些情况下这可以提升近一倍的性能。
  • fetch-size:导入数据时,指示每次从数据库读取的记录数。使用下面的语法:--fetch-size=<n>,其中<n>表示Sqoop每次必须取回的记录数,缺省值为1000。可以基于读取的数据量、可用的内存和带宽大小适当增加fetch-size的值。某些情况下这可以提25%的性能。
  • num-mappers:该参数的语法为--num-mappers <number of map tasks>,用于指定并行数据导入的map任务数,缺省值为4。应该将该值设置成低于数据库所支持的最大连接数。
  • split-by:该参数的语法为--split-by <column name>,指定用于Sqoop分隔工作单元的列名,不能与--autoreset-to-one-mapper选项一起使用。如果不指定列名,Sqoop基于主键列分隔工作单元。

(2)调整数据库

为了优化关系数据库的性能,可执行下面的任务:

  • 为精确调整查询,分析数据库统计信息。
  • 将不同的表空间存储到不同的物理硬盘。
  • 预判数据库的增长。
  • 使用explain plan类似的语句调整查询语句。
  • 导入导出数据时禁用外键约束。
  • 导入数据前删除索引,导入完成后再重建。
  • 优化JDBC URL连接参数。
  • 确定使用最好的连接接口。

四、小结

本篇中用我们介绍了如何使用Kettle完成数据抽取任务。包括两种最常用的从文件抽取数据的场景,即把文本文件或XML文件作为输入。我们还说明了两种参数化数据库查询的方法,即使用参数和变量。变化数据捕获(CDC)是一项就有挑战性的工作,时间戳、触发器、快照表、日志是常用的四种常用变化数据捕获方法。Sqoop是一个在Hadoop与结构化数据存储,如关系数据库之间高效传输大批量数据的工具,支持全量和增量数据抽取。Kettle中包含了Sqoop import和Sqoop export作业项,用于从Kettle执行Sqoop命令。