Flink SQL 自定义 Sink
时间:2022-07-28
本文章向大家介绍Flink SQL 自定义 Sink,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1.背景
内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11
2.步骤
- implements DynamicTableSinkFactory
- implements DynamicTableSink
- 创建 Redis Sink
3.自定义 sink 代码
import com.ishansong.bigdata.common.util.redis.RedisUtil;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import redis.clients.jedis.JedisCluster;
import java.util.*;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* @author shengjk1
* @date 2020/10/16
*/
public class RedisTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "redis";
public static final ConfigOption<String> HOST_PORT = key("hostPort")
.stringType()
.noDefaultValue()
.withDescription("redis host and port,");
public static final ConfigOption<String> PASSWORD = key("password")
.stringType()
.noDefaultValue()
.withDescription("redis password");
public static final ConfigOption<Integer> EXPIRE_TIME = key("expireTime")
.intType()
.noDefaultValue()
.withDescription("redis key expire time");
public static final ConfigOption<String> KEY_TYPE = key("keyType")
.stringType()
.noDefaultValue()
.withDescription("redis key type,such as hash,string and so on ");
public static final ConfigOption<String> KEY_TEMPLATE = key("keyTemplate")
.stringType()
.noDefaultValue()
.withDescription("redis key template ");
public static final ConfigOption<String> FIELD_TEMPLATE = key("fieldTemplate")
.stringType()
.noDefaultValue()
.withDescription("redis field template ");
public static final ConfigOption<String> VALUE_NAMES = key("valueNames")
.stringType()
.noDefaultValue()
.withDescription("redis value name ");
@Override
// 当 connector 与 IDENTIFIER 一直才会找到 RedisTableSinkFactory 通过
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}
@Override
//我们自己定义的所有选项 (with 后面的 ) 都会在这里获取
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOST_PORT);
options.add(PASSWORD);
options.add(EXPIRE_TIME);
options.add(KEY_TYPE);
options.add(KEY_TEMPLATE);
options.add(FIELD_TEMPLATE);
options.add(VALUE_NAMES);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
return new RedisSink(
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
options);
}
private static class RedisSink implements DynamicTableSink {
private final DataType type;
private final ReadableConfig options;
private RedisSink(DataType type, ReadableConfig options) {
this.type = type;
this.options = options;
}
@Override
//ChangelogMode
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
//具体运行的地方,真正开始调用用户自己定义的 streaming sink ,建立 sql 与 streaming 的联系
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter = context.createDataStructureConverter(type);
return SinkFunctionProvider.of(new RowDataPrintFunction(converter, options, type));
}
@Override
// sink 可以不用实现,主要用来 source 的谓词下推
public DynamicTableSink copy() {
return new RedisSink(type, options);
}
@Override
public String asSummaryString() {
return "redis";
}
}
/**
同 flink streaming 自定义 sink ,只不过我们这次处理的是 RowData,不细说
*/
private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
private static final long serialVersionUID = 1L;
private final DataStructureConverter converter;
private final ReadableConfig options;
private final DataType type;
private RowType logicalType;
private HashMap<String, Integer> fields;
private JedisCluster jedisCluster;
private RowDataPrintFunction(
DataStructureConverter converter, ReadableConfig options, DataType type) {
this.converter = converter;
this.options = options;
this.type = type;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logicalType = (RowType) type.getLogicalType();
fields = new HashMap<>();
List<RowType.RowField> rowFields = logicalType.getFields();
int size = rowFields.size();
for (int i = 0; i < size; i++) {
fields.put(rowFields.get(i).getName(), i);
}
jedisCluster = RedisUtil.getJedisCluster(options.get(HOST_PORT));
}
@Override
/*
2> +I(1,30017323,1101)
2> -U(1,30017323,1101)
2> +U(2,30017323,1101)
2> -U(2,30017323,1101)
2> +U(3,30017323,1101)
2> -U(3,30017323,1101)
2> +U(4,30017323,1101)
3> -U(3,980897,3208)
3> +U(4,980897,3208)
*/
public void invoke(RowData rowData, Context context) {
RowKind rowKind = rowData.getRowKind();
Row data = (Row) converter.toExternal(rowData);
if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT)) {
String keyTemplate = options.get(KEY_TEMPLATE);
if (Objects.isNull(keyTemplate) || keyTemplate.trim().length() == 0) {
throw new NullPointerException(" keyTemplate is null or keyTemplate is empty");
}
if (keyTemplate.contains("${")) {
String[] split = keyTemplate.split("\$\{");
keyTemplate = "";
for (String s : split) {
if (s.contains("}")) {
String filedName = s.substring(0, s.length() - 1);
int index = fields.get(filedName);
keyTemplate = keyTemplate + data.getField(index).toString();
} else {
keyTemplate = keyTemplate + s;
}
}
}
String keyType = options.get(KEY_TYPE);
String valueNames = options.get(VALUE_NAMES);
// type=hash must need fieldTemplate
if ("hash".equalsIgnoreCase(keyType)) {
String fieldTemplate = options.get(FIELD_TEMPLATE);
if (fieldTemplate.contains("${")) {
String[] split = fieldTemplate.split("\$\{");
fieldTemplate = "";
for (String s : split) {
if (s.contains("}")) {
String fieldName = s.substring(0, s.length() - 1);
int index = fields.get(fieldName);
fieldTemplate = fieldTemplate + data.getField(index).toString();
} else {
fieldTemplate = fieldTemplate + s;
}
}
}
//fieldName = fieldTemplate-valueName
if (valueNames.contains(",")) {
HashMap<String, String> map = new HashMap<>();
String[] fieldNames = valueNames.split(",");
for (String fieldName : fieldNames) {
String value = data.getField(fields.get(fieldName)).toString();
map.put(fieldTemplate + "_" + fieldName, value);
}
jedisCluster.hset(keyTemplate, map);
} else {
jedisCluster.hset(keyTemplate, fieldTemplate + "_" + valueNames, data.getField(fields.get(valueNames)).toString());
}
} else if ("set".equalsIgnoreCase(keyType)) {
jedisCluster.set(keyTemplate, data.getField(fields.get(valueNames)).toString());
} else if ("sadd".equalsIgnoreCase(keyType)) {
jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
} else if ("zadd".equalsIgnoreCase(keyType)) {
jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
} else {
throw new IllegalArgumentException(" not find this keyType:" + keyType);
}
if (Objects.nonNull(options.get(EXPIRE_TIME))) {
jedisCluster.expire(keyTemplate, options.get(EXPIRE_TIME));
}
}
}
}
}
4.使用 Redis Sink
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
/**
* @author shengjk1
* @date 2020/9/25
*/
public class SqlKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
// enable checkpointing
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
") WITH (" +
"'connector' = 'kafka','topic' = 'xxx'," +
"'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup'," +
"'format' = 'json','scan.startup.mode' = 'earliest-offset')";
tableEnv.executeSql(sql);
//15017284 distinct
Table bigtable = tableEnv.sqlQuery("select distinct a.id,a.courier_id,a.status,a.city_id,b.info_index from (select id,status,city_id,courier_id from sourcedata where tableName = 'orders' and status=60)a join (select " +
" order_id,max(info_index)info_index from sourcedata where tableName = 'infos' group by order_id )b on a.id=b.order_id");
sql = "CREATE TABLE redis (info_index BIGINT,courier_id BIGINT,city_id BIGINT" +
") WITH (" +
"'connector' = 'redis'," +
"'hostPort'='xxx'," +
"'keyType'='hash'," +
"'keyTemplate'='test2_${city_id}'," +
"'fieldTemplate'='test2_${courier_id}'," +
"'valueNames'='info_index,city_id'," +
"'expireTime'='1000')";
tableEnv.executeSql(sql);
Table resultTable = tableEnv.sqlQuery("select sum(info_index)info_index,courier_id,city_id from " + bigtable + " group by city_id,courier_id");
TupleTypeInfo<Tuple3<Long, Long, Long>> tupleType = new TupleTypeInfo<>(
Types.LONG(),
Types.LONG(),
Types.LONG());
tableEnv.toRetractStream(resultTable, tupleType).print("===== ");
tableEnv.executeSql("INSERT INTO redis SELECT info_index,courier_id,city_id FROM " + resultTable);
env.execute("");
}
}
5.详细解释
create table test(
`id` bigint,
`url` string,
`day` string,
`pv` long,
`uv` long
) with {
'connector'='redis',
'hostPort'='xxx',
'password'='',
'expireTime'='100',
'keyType'='hash',
'keyTemplate'='test_${id}',
'fieldTemplate'='${day}',
'valueNames'='pv,uv',
}
redis result: 假设 id=1 day=20201016 pv=20,uv=20
hash
test_1 20201016-pv 20,20201016-uv 20
参数解释:
connector 固定写法
hostPort redis 的地址
password redis 的密码
expireTime redis key 过期时间,单位为 s
keyType redis key 的类型,目前有 hash、set、sadd、zadd
keyTemplate redis key 的表达式,如 test_${id} 注意 id 为表的字段名
fieldTemplate redis keyType==hash 时,此选项为必选,表达式规则同 keyTemplate
valueNames redis value only 可以有多个
6.原理
- 整个流程如图,CatalogTable —> DynamicTableSource and DynamicTableSink 这个过程中,其实是通过 DynamicTableSourceFactory and DynamicTableSinkFactory 起到了一个桥梁的作用
- (Source/Sink)Factory 通过 connector=‘xxx’ 找到,理论上会做三种操作 1. validate options 2. configure encoding/decoding formats( if required ) 3. create a parameterized instance of the table connector 其中 formats 是通过 format=‘xxx’ 找到
- DynamicTableSource DynamicTableSink 官网虽说可以看做是有状态的,但是否真的有状态取决于具体实现的 source 和 sink
- 生成 Runtime logic,Runtime logic 被 Flink core connector interfaces( 如 InputFormat or SourceFunction),如果是 kafka 的话 则 是 FlinkKafkaConsumer 实现,而这些实现又被抽象为 *Provider,然后开始执行 *Provider
- *Provider 是连接 SQL 与 Streaming 代码级别的桥梁
7.参考
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
- 一次浴火重生的MySQL优化(EXPLAIN命令详解)
- 简单聊聊不可或缺的Nginx反向代理服务器--实现负载均衡【上篇】
- Java设计模式之适配器设计模式(项目升级案例)
- Java设计模式之模板方法设计模式(银行计息案例)
- 多线程之策略模式
- 文件上传的动作不能太俗,必须页面无刷新上传
- 这次真的忽略了一些ActiveMQ内心的娇艳
- 多线程编程:阻塞、并发队列的使用总结
- 多线程编程:多线程并发制单的开发记录【一】
- 如何使用线程锁来提高多线程并发效率
- 如何在分布式环境中同步solr索引库和缓存信息
- 如何在分布式环境中同步solr索引库和缓存信息
- Info模式下的隐形杀手(SpringMVC同时使用<mvc:resources.../>和FormattingConversionServiceFactoryBean时出现的问题)
- 关于web.xml3.0启动报错
- MySQL 教程
- MySQL 安装
- MySQL 管理与配置
- MySQL PHP 语法
- MySQL 连接
- MySQL 创建数据库
- MySQL 删除数据库
- MySQL 选择数据库
- MySQL 数据类型
- MySQL 创建数据表
- MySQL 删除数据表
- MySQL 插入数据
- MySQL 查询数据
- MySQL where 子句
- MySQL UPDATE 查询
- MySQL DELETE 语句
- MySQL LIKE 子句
- mysql order by
- Mysql Join的使用
- MySQL NULL 值处理
- MySQL 正则表达式
- MySQL 事务
- MySQL ALTER命令
- MySQL 索引
- MySQL 临时表
- MySQL 复制表
- 查看MySQL 元数据
- MySQL 序列 AUTO_INCREMENT
- MySQL 处理重复数据
- MySQL 及 SQL 注入
- MySQL 导出数据
- MySQL 导入数据
- MYSQL 函数大全
- MySQL Group By 实例讲解
- MySQL Max()函数实例讲解
- mysql count函数实例
- MYSQL UNION和UNION ALL实例
- MySQL IN 用法
- MySQL between and 实例讲解
- 【Python】秀儿!两行代码制作你的专属动态二维码
- Windows 技术篇-网卡物理(MAC)地址查看方法
- Go使用工厂方法实例结构体
- Java交互界面实现计算器开发设计【附函数源码】
- Python 技术篇-读取本地文件的字节数
- Golang Flag包的参数格式问题
- Python 技术篇-对音频、图片等文件进行base64编码和解码
- 图片:“给你五十行代码把我变成字符画!” 程序:“太多了,一半都用不完!”
- HTML布局标记和列表标记
- 使用frp搭建内网穿透
- CNS图表复现03—单细胞区分免疫细胞和肿瘤细胞
- C#记事本项目开发,一个可以实现批量操作的记事本!【附源码】
- Python 库引用问题:name 'json' is not defined,原因及解决办法
- Java集合类之Collection接口,集合的“爸爸”接口了解一下?
- Python 微信机器人-下载微信接收到的语音、图片等资源