FlinkSQL实践记录4 -- 实时更新的维表如何join
时间:2022-01-24
本文章向大家介绍FlinkSQL实践记录4 -- 实时更新的维表如何join,主要包括FlinkSQL实践记录4 -- 实时更新的维表如何join使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1. 背景
对于不定期更新的维表,以什么组件来处理作为FlinkSQL的source表?HBase?Kafka?或mysql?哪一种方案能得到正确结果?
且需要考虑到事实表和维表关联的时候,是否需要和维表的历史版本关联?还是只关联维表的最新版本?
下文以只关联维表的最新版本为目标进行测试。
2. 实践过程
2.1 将kafka的compacted topic作为维表
(1) kafka普通主题修改为compacted topic
bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact
(2) kafka生产者代码
// 创建消息
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn");
for (int i = 2; i < 8; i++) {
JSONObject json1 = new JSONObject();
json1.put("key", i+"");
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name444"+i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"flinksqldim",
json1.toJSONString(),
json.toJSONString()
);
}
(3) FlinkSQL主体代码
// 创建执行环境
//EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一个输入临时表
tableEnv.executeSql(
"CREATE TABLE sensor_source(" +
" id STRING, " +
" name STRING, " +
" o_time TIMESTAMP(3), " +
" WATERMARK FOR o_time AS o_time " +
" ) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把kafka中数据 映射成输入维表 - 实时变更的维表
tableEnv.executeSql(
"CREATE TABLE dim_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time, " +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka'," +
" 'topic' = 'flinksqldim'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'key.format' = 'json'," +
" 'value.format' = 'json')"
);
// 把Mysql中的表映射为一个输出临时表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入数据
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT b.name, count(*) as cnt " +
"FROM sensor_source as a " +
"INNER JOIN dim_source as b " +
"on a.id = b.id " +
"where a.id > 3 " +
"group by b.name "
// "order by name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
3. 试错
3.1 使用Regular Joins 常规join
kafka生产者代码
for (int i = 1; i < 10; i++) {
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name555"+i);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"flinksqldim2",
i,
json.toJSONString()
);
// 发送消息
Future<RecordMetadata> future = producer.send(record);
FlinkSQL处理代码
// 创建执行环境
//EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一个输入临时表
tableEnv.executeSql(
"CREATE TABLE sensor_source(" +
"id STRING, " +
"name STRING, " +
"o_time TIMESTAMP(3), " +
" WATERMARK FOR o_time AS o_time " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把kafka中数据 映射成输入维表 - 实时变更的维表, 非compacted topic
tableEnv.executeSql(
"CREATE TABLE dim_source ( " +
" id STRING, " +
" name STRING, " +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldim2'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把Mysql中的表映射为一个输出临时表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入数据
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT b.name, count(*) as cnt " +
"FROM sensor_source a " +
"JOIN dim_source b " +
"on a.id = b.id " +
"where a.id > 3 " +
"group by b.name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
维表流更新了几次数据后,结果表count_info中数据错乱
原文地址:https://www.cnblogs.com/route/p/15840551.html
- Jexus 负载均衡
- 用.NET Framework 2.0创建 Form设计器[翻译]
- jquery事件绑定
- Fontello:免费Web-font 图标大集合(font-face 图标集)
- 通过 wp-config.php 禁止安装、升级或编辑WordPress 主题与插件
- 利用VS2005中的Code Snippets提高开发效率
- 用schemaSpy制作数据库文档
- WordPress 投票插件Post Ratings,可在谷歌搜索显示星级投票(附中文包下载)
- Spark学习笔记——共享变量
- 如何升级Mono
- 使用信息架构视图访问数据库元数据
- Java 基础标识符
- 在Windows上玩转Mono/Linux
- Active Record和Domain Object + Dao
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Intellij IDEA 如何自动生成 serialVersionUID
- Spring事务是如何应用到你的业务场景中的?
- BFE.dev前端刷题#9. 解密消息 (Facebook面试题)
- JUnit 5 测试问题 must be static unless the test class is
- Intellij IDEA 如何自动生成 serialVersionUID
- 图解面试题:SQL存储过程有什么用?
- LeetCode-6.Z 字形变换 - 消费补偿算法
- python + redis 实现 分布式队列任务
- Git error: hint: Updates were rejected because the remote contains work that you do hint: not have l
- 500 行代码写一个俄罗斯方块游戏
- k8s——资源限制
- k8s群集之动态扩缩容——HPA
- k8s群集的三种Web-UI界面部署
- Hive通过Jdbc连接HiveServer2
- SAP Spartacus OccCmsComponentAdapter的findComponentsByIds方法