【Flink】第二十三篇:join 之 temporal join

时间:2022-10-13
本文章向大家介绍【Flink】第二十三篇:join 之 temporal join,主要内容包括传统 join 方式、Processing Time Temporal Join、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

相关推荐:

【Flink】第十篇:join 之 regular join

【Flink】第十一篇:join 之 interval join

继以上 Flink Join 两篇文章之后探讨最后一类Flink的Join:temporal join。

传统 join 方式

传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

1. Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。

2. Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行分别排序,然后再对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,但是如果两个输入是有序的数据集,则可以作为一种优化方案)。

3. Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。

  • 第一阶段和第一个数据集分别称为 build 阶段和 build table;
  • 第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。

Hash Join 效率较高但是对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案 (并不是不允许溢写磁盘)。

注意:Sort-Merge Join 和 Hash Join 只适用于 Equi-Join ( Join 条件均使用等于作为比较算子)。

Flink SQL 流批一体的核心是:流表二象性。围绕这一核心有若干概念,例如,动态表(Dynamic Table)/时态表(Temporal Table)、版本(Version)、版本表(Version Table)、普通表、连续查询、物化视图/虚拟视图、CDC(Change Data Capture)、Changelog Stream。

  1. 将流转换为动态表。
  2. 在动态表上计算一个连续查询,生成一个新的动态表。
  3. 生成的动态表被转换回流。

理解:流和表只是数据在特定场景下的两种形态(联想到光的波粒二象性?笔者已经傻傻分不清)

temporal join

Flink Join 主要包含:

  • Event Time Temporal Join
  • Processing Time Temporal Join

语法(SQL 2011 标准):

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

其中,

  • 左表:任意表(探针侧,probe site)
  • 右表:版本表(versioned table)/普通表(构建侧,build side)

本文主要探索Event Time temporal join的一些设计特性,即右侧是版本表的join。

Event Time Temporal Join

一个典型的场景是订单和汇率,下方示例展示了一个append-only 订单表Orders 与一个不断改变的汇率表 RatesHistory 的 Join 操作:

SELECT * FROM Orders;

rowtime amount currency
======= ====== =========
10:15        2 Euro
10:30        1 US Dollar
10:32       50 Yen
10:52        3 Euro
11:04        5 US Dollar

RatesHistory 表示不断变化的汇率信息。汇率以日元为基准(即 Yen 永远为 1)。

SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

基于上述信息,欲计算 Orders 表中所有交易量并全部转换成日元。

例如,09:0010:45 间欧元对日元的汇率是 11410:4511:15 间为 116,10:45 以后是119。如果要将10:52的这笔订单进行汇率转换,最终选择 10:45这个版本,

由于temporal join设计很多特定的影响因素,以以下测试用例探索join规则:

左流(主表、探针侧):

create table left_upsert (
    id string,
    op_ts timestamp(3),
    primary key(id) not enforced,
    watermark for op_ts as op_ts - intervcal '0' second
) with (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = '...',
    'topic' = '...'
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.group.id' = '...'
)

右流(维表、构建侧):

create table right_upsert (
    id string,
    op_ts timestamp(3),
    primary key(id) not enforced,
    watermark for op_ts as op_ts - intervcal '0' second
) with (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = '...',
    'topic' = '...'
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.group.id' = '...'
)

1. 支持inner join, left join

2. 右流版本表既要定义为事件时间(水位线)也要定义主键;左流需要定义为事件时间(水位线)。

其实,版本表的特点是可以追溯历史版本,所以,时间和主键是必须要同时具备的。

3. 关联等式条件必须有维表的主键,但是可以加入其它辅助条件,例如,

on left_upsert.id = right_upsert.id and left_upsert.id <> '2'

4. 水位线起到一个触发写出的作用,在写出之前,左右流的元素在缓存中join。

例如,测试数据及 join sql, 程序 join 结果如下,

Left:
    key                    value                     produce seq
{"id":"1"}  {"id":"1","op_ts":"1970-01-03 00:00:00"}      1     --- watermark
{"id":"2"}  {"id":"2","op_ts":"1970-01-01 01:00:00"}      3
{"id":"3"}  {"id":"3","op_ts":"1970-01-04 00:00:00"}      6     --- watermark

Right:
    key                     value                    produce seq
{"id":"1"}  {"id":"1","op_ts":"1970-01-03 00:00:00"}      2     --- watermark
{"id":"2"}  {"id":"2","op_ts":"1970-01-01 00:00:00"}      4
{"id":"2"}  {"id":"2","op_ts":"1970-01-01 02:00:00"}      5
{"id":"3"}  {"id":"3","op_ts":"1970-01-04 00:00:00"}      7     --- watermark

join sql:
select * from left_upsert as l 
left join right_upsert for system_time as of l.op_ts as r 
on l.id = r.id

结果:
+----+------+-------------------+-------+-------------------+
| op |   id |             op_ts |   id0 |            op_ts0 |
+----+------+-------------------+-------+-------------------+
| +I |    1 |  1970-01-03T00:00 |     1 |  1970-01-03T00:00 |
| +I |    2 |  1970-01-01T01:00 |     2 |  1970-01-01T00:00 |
| +I |    3 |  1970-01-04T00:00 |     3 |  1970-01-04T00:00 |

1和2消息将水位线提升到先生产1970-01-03 00:00:00,会触发join写出

| +I |    1 |  1970-01-03T00:00 |     1 |  1970-01-03T00:00 |

紧接着按照测试数据的produce seq顺序发出测试数据,当在6和7测试数据发出后,又触发一次写出:

| +I |    2 |  1970-01-01T01:00 |     2 |  1970-01-01T00:00 |
| +I |    3 |  1970-01-04T00:00 |     3 |  1970-01-04T00:00 |

此时,会将内存中缓存的以下join结果也写出,

| +I |    2 |  1970-01-01T01:00 |     2 |  1970-01-01T00:00 |

并且可以看到join的时间版本也符合之前的规则。

5. 左流元素才会触发join的作用,join的结果只会看到从左流探针侧触发的join。

例如,测试数据及 join sql, 程序 join 结果如下,

Left:
    key                     value                    produce seq
{"id":"1"}  {"id":"1","op_ts":"1970-01-03 00:00:00"}    1     --- watermark
{"id":"2"}  {"id":"2","op_ts":"1970-01-01 01:00:00"}    3
{"id":"3"}  {"id":"3","op_ts":"1970-01-04 00:00:00"}    7     --- watermark

Right:
    key                     value                    produce seq
{"id":"1"}  {"id":"1","op_ts":"1970-01-03 00:00:00"}    2     --- watermark
{"id":"2"}  {"id":"2","op_ts":"1970-01-01 00:00:00"}    4
{"id":"2"}  {"id":"2","op_ts":"1970-01-01 02:00:00"}    5
{"id":"2"}  null                                        6
{"id":"3"}  {"id":"3","op_ts":"1970-01-04 00:00:00"}    8     --- watermark

join sql:
select * from left_upsert as l 
left join right_upsert for system_time as of l.op_ts as r 
on l.id = r.id

结果:
+----+------+-------------------+------+-------------------+
| op |   id |             op_ts |  id0 |            op_ts0 |
+----+------+-------------------+------+-------------------+
| +I |    1 |  1970-01-03T00:00 |    1 |  1970-01-03T00:00 |
| +I |    2 |  1970-01-01T01:00 |    2 |  1970-01-01T00:00 |
| +I |    3 |  1970-01-04T00:00 |    3 |  1970-01-04T00:00 |

在produce seq为6的数据发出之前,内存中左流的id=2的元素与右流的id=2的1970-01-01 00:00:00版本join,当右流{"id":"2"} null发出后,语义上理解{"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"}这条数据应该被撤回,但是从join结果看,并不是这样的:

| +I |    2 |  1970-01-01T01:00 |    2 |  1970-01-01T00:00 |

说明,{"id":"2"} null并没有触发join结果的更新,这也说明了右流是不会触发join结果的更新的。

如果将上述左流测试数据{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"}改为,

{"id":"2"}  {"id":"2","op_ts":"1970-01-04 00:00:00"}

测试结果中,id=2的join结果变为,

+----+-----+------------------+--------+-------------------+
| op |  id |            op_ts |    id0 |            op_ts0 |
+----+-----+------------------+--------+-------------------+
| +I |   1 | 1970-01-03T00:00 |      1 |  1970-01-03T00:00 |
| +I |   2 | 1970-01-01T01:00 |      2 |  1970-01-01T00:00 |
| -U |   2 | 1970-01-01T01:00 |      2 |  1970-01-01T00:00 |
| +U |   2 | 1970-01-04T00:00 | (NULL) |            (NULL) |

从以上这个输出也可看出,

6. 在缓存中的join结果没有merge,而是将每次触发join的结果依次输出。

7. 当触发写出后,在缓存中只保留元素最新版本,过期版本将删除。

总结

  • 支持inner join, left join。
  • 右流版本表既要定义为事件时间(水位线)也要定义主键;左流需要定义为事件时间(水位线)。
  • 关联等式条件必须有维表的主键,但是可以加入其它辅助条件。
  • 水位线起到一个触发写出的作用,在写出之前,左右流的元素在缓存中join。
  • 左流元素才会触发join的作用,join的结果只会看到从左流探针侧触发的join。
  • 在缓存中的join结果没有merge,而是将每次触发join的结果依次输出。
  • 当触发写出后,在缓存中只保留元素最新版本,过期版本将删除。

以上,可以看出Event Time Temporal Join的适用场景比较特殊,因为构建侧的维表的数据流必须是【缓慢变化维】,否则无法确join的合适的时间版本,并且水位线无法推进。

Processing Time Temporal Join

Processing Time Temporal Join用于和以处理时间作为时间属性的构建侧流表进行Join,这种维表通常我们用HBase、MySQL此类具有Lookup能力的表进行Join。

语法同Event Time Temporal Join,在此不做赘述。