flink sql实时计算当天pv写入mysql
时间:2022-07-25
本文章向大家介绍flink sql实时计算当天pv写入mysql,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
今天我们主要来讲一个很简单但是很常见的需求,实时计算出网站当天的pv值,然后将结果实时更新到mysql数据库,以供前端查询显示。
接下来我们看看如何用flink sql来实现这个简单的功能。
首先我们还是使用datagen生成测试数据,随机生成一些用户id
String sourceSql = "CREATE TABLE datagen (n" +
" userid int,n" +
" proctime as PROCTIME()n" +
") WITH (n" +
" 'connector' = 'datagen',n" +
" 'rows-per-second'='100',n" +
" 'fields.userid.kind'='random',n" +
" 'fields.userid.min'='1',n" +
" 'fields.userid.max'='100'n" +
")";
定义mysql的sink,这里mysql是作为了一个upsert的sink,所以必须要一个主键,在mysql建表的时候我们指定了当天的日期作为主键,mysql ddl如下
CREATE TABLE `pv` (
`day_str` varchar(100) NOT NULL,
`pv` bigint(10) DEFAULT NULL,
PRIMARY KEY (`day_str`)
)
Flink中的ddl要和mysql中对的上,也要指定主键。
String mysqlsql = "CREATE TABLE pv (n" +
" day_str STRING,n" +
" pv bigINT,n" +
" PRIMARY KEY (day_str) NOT ENFORCEDn" +
") WITH (n" +
" 'connector' = 'jdbc',n" +
" 'username' = 'root',n" +
" 'password' = 'root',n" +
" 'url' = 'jdbc:mysql://localhost:3306/test',n" +
" 'table-name' = 'pv'n" +
")";
接下来我们写一个简单的查询:
tEnv.executeSql("insert into pv SELECT DATE_FORMAT(proctime, 'yyyy-MM-dd') as day_str, count(*) n" +
"FROM datagen n" +
"GROUP BY DATE_FORMAT(proctime, 'yyyy-MM-dd')");
可能对于以前一直做批处理的同学来说会感到疑惑,对于流式处理来说,group by将会返回一个可撤回流(RetractStream),转化成datastream,将会得到一个Tuple2<Boolean, T>对象,这个对象第一个字段如果是false表示数据要撤回,true表示数据是我们新添加的,第二个字段是实际的数据。在这里,我们将这个实时更新的结果写入到了mysql。这样mysql表,每天就会只有一个数据,系统会不断地更新pv字段。
类似的需求我们还可以使用flink的窗口来实现,定义一个窗口周期是一天的窗口,然后自定义一个触发器,比如每秒钟触发一次,然后将结果输出写入第三方sink,可以参考下 【flink实战-模拟简易双11实时统计大屏】
由于笔者水平有限,也难免有错误,请大家不吝赐教,更多信息,也请关注我的公众号【大数据技术与应用实战】,谢谢。
- MySQL 教程
- MySQL 安装
- MySQL 管理与配置
- MySQL PHP 语法
- MySQL 连接
- MySQL 创建数据库
- MySQL 删除数据库
- MySQL 选择数据库
- MySQL 数据类型
- MySQL 创建数据表
- MySQL 删除数据表
- MySQL 插入数据
- MySQL 查询数据
- MySQL where 子句
- MySQL UPDATE 查询
- MySQL DELETE 语句
- MySQL LIKE 子句
- mysql order by
- Mysql Join的使用
- MySQL NULL 值处理
- MySQL 正则表达式
- MySQL 事务
- MySQL ALTER命令
- MySQL 索引
- MySQL 临时表
- MySQL 复制表
- 查看MySQL 元数据
- MySQL 序列 AUTO_INCREMENT
- MySQL 处理重复数据
- MySQL 及 SQL 注入
- MySQL 导出数据
- MySQL 导入数据
- MYSQL 函数大全
- MySQL Group By 实例讲解
- MySQL Max()函数实例讲解
- mysql count函数实例
- MYSQL UNION和UNION ALL实例
- MySQL IN 用法
- MySQL between and 实例讲解