详解flink中Look up维表的使用
- 背景
- LookupableTableSource
- 实例讲解
- 源码解析
- JdbcTableSource
- JdbcLookupFunction
背景
在流式计算中,维表是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,比如我们的source stream是来自日志的订单数据,但是日志中我们只是记录了订单商品的id,并没有其他的信息,但是我们把数据存入数仓进行数据分析的时候,却需要商品名称、价格等等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。
维表一般存储在外部存储中,比如mysql、hbase、redis等等,今天我们以mysql为例,讲讲flink中维表的使用。
LookupableTableSource
在flink中提供了一个LookupableTableSource,可以用于实现维表,也就是我们可以通过某几个key列去查询外部存储来获取相关的信息用于补全stream的数据。
public interface LookupableTableSource<T> extends TableSource<T> {
TableFunction<T> getLookupFunction(String[] lookupKeys);
AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
boolean isAsyncEnabled();
}
我们看到,LookupableTableSource有三个方法
- getLookupFunction:用于同步查询维表的数据,返回一个TableFunction,所以本质上还是通过用户自定义 UDTF来实现的。
- getAsyncLookupFunction:用于异步查询维表的数据,该方法返回一个对象
- isAsyncEnabled:默认情况下是同步查询,如果要开启异步查询,这个方法需要返回true
在flink里,我们看到实现了这个接口的主要有四个类,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天我们主要以jdbc为例讲讲如何进行维表查询。
实例讲解
接下来我们讲一个小例子,首先定义一下stream source,我们使用flink 1.11提供的datagen来生成数据。
我们来模拟生成用户的数据,这里只生成的用户的id,范围在1-100之间。
CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='100',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
)
datagen具体的使用方法可以参考:
聊聊flink 1.11 中的随机数据生成器-DataGen connector
然后再创建一个mysql维表信息:
CREATE TABLE dim_mysql (
id int,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'userinfo',
'username' = 'root',
'password' = 'root'
)
我们这个mysql表中样例数据如下:
最后执行sql查询,流表关联维表:
SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime ON datagen.userid = dim_mysql.id
结果示例如下:
3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6
我们看到对于维表中存在的数据,已经关联出来了,对于维表中没有的数据,显示为null
完整代码请参考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/dimension/JdbcDim.java
源码解析
JdbcTableSource
以jdbc为例,我们来看看flink底层是怎么做的。
JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持异步的查询,所以进入JdbcTableSource#getLookupFunction方法。
@Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JdbcLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
.setFieldTypes(rowTypeInfo.getFieldTypes())
.setFieldNames(rowTypeInfo.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}
最终是构造了一个JdbcLookupFunction对象,
- options是连接jdbc的一些参数,比如user、pass、url等。
- lookupOptions是一些有关维表的参数,主要是缓存的大小、超时时间等。
- lookupKeys也就是要去关联查询维表的字段。
JdbcLookupFunction
所以我们来看看JdbcLookupFunction类,这个JdbcLookupFunction是一个TableFunction的子类,具体的TableFunction的使用可以参考这个文章:
一个TableFunction最核心的就是eval方法,在这个方法里,做的主要的工作就是通过传进来的多个keys拼接成sql去来查询数据,首先查询的是缓存,缓存有数据就直接返回,缓存没有的话再去查询数据库,然后再将查询的结果返回并放入缓存,下次查询的时候直接查询缓存。
为什么要加一个缓存呢?默认情况下是不开启缓存的,每来一个查询,都会给维表发送一个请求去查询,如果数据量比较大的话,势必会给存储维表的系统造成一定的压力,所以flink提供了一个LRU缓存,查询维表的时候,先查询缓存,缓存没有再去查询外部系统,但是如果有一个数据查询频率比较高,一直被命中,就无法获取新数据了。所以缓存还要加一个超时时间,过了这个时间,把这个数据强制删除,去外部系统查询新的数据。
具体的怎么开启缓存呢?我们看下JdbcLookupFunction#open方法
@Override
public void open(FunctionContext context) throws Exception {
try {
establishConnectionAndStatement();
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
也就是说cacheMaxSize和cacheExpireMs需要同时设置,就会构造一个缓存对象cache来缓存数据.这两个参数对应的DDL的属性就是lookup.cache.max-rows和lookup.cache.ttl
对于具体的缓存的大小和超时时间的设置,用户需要根据自身的情况来自己定义,在数据的准确性和系统的吞吐量之间做一个权衡。
- 总结:JDK1.5-JDK1.8各个新特性
- hihoCoder #1094 : Lost in the City(枚举,微软苏州校招笔试 12月27日 )
- HDU 2563 统计问题(递归,思维题)
- python接口自动化9-https请求(SSL)
- Windows环境下php开启GD库的方法
- python接口自动化10-token登录
- HDU 1000 A + B Problem(指针版)
- Java 10 已发布!时隔 6 月带来 109 项新特性
- STL中的nth_element()方法的使用
- C++queue容器学习(详解)
- 牛客面经 |这可能不只是一篇面经
- 图的基本算法(BFS和DFS)
- C++STL中set的使用策略(详解)
- Codeforces Round #409 (rated, Div. 2, based on VK Cup 2017 Round 2)(A.思维题,B.思维题)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 招商银行校招题二
- 小程序工程化系列(一):文件依赖分析
- ucgui在windows上的移植,及为go语言打造简易跨平台GUI的想法
- React setState 是异步执行还是同步执行?
- sm2,sm3,sm4国密算法的纯c语言版本,使用于任何嵌入式平台
- sm2国密算法的纯c语言版本,使用于单片机平台(静态内存分配)
- 面试:mysql 事务和锁的解释
- 【STM32F407开发板用户手册】第35章 STM32F407的FSMC总线应用之驱动AD7606(8通道同步采样, 16bit, 正负10V)
- 玩转easyARM imax283A开发版(二),移植NES模拟器并增加按键驱动,让板子可以玩超级玛丽游戏
- 完了!TCP出了大事!
- redis高并发高可用
- 嵌入式linux之go语言开发(九)关于嵌入式GUI
- docker入门总结,从使用的角度谈起
- 使用 Go 语言开发 Android 应用的正确姿势探索
- Android的配置文件操作的完美封装(使用注解 反射让配置文件操作如此清晰和简单)