一文搞懂 Flink Timer
什么是 Timer
顾名思义就是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的
Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.
真实的事件样例,告诉我 只要 key 不同是不会并发修改的,如果一直都是完全相同的 key ,比如我的 key 一直都是 1,完全是会并发修改的。key 的重复性越高,并发修改的可能性就越大,除非 rocksdb 自身保证同一个每个 key ( rocksdb key ) 的事务。
Timer 的使用
public class KeyedProcessFunctionImp extends KeyedProcessFunction<String, Tuple2<String, Object>, Tuple2<String, String>> {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public void close() throws Exception {
}
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<Tuple2<String, String>> collector) throws Exception {
System.out.println("注册一个 timer");
long currentProcessingTime = context.timerService().currentProcessingTime() / 1000 * 1000 + 60 * 1000;
context.timerService().registerProcessingTimeTimer(currentProcessingTime);
}
@Override
//TODO timer 与 process 同时发生
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
System.out.println("我是一个 timer");
}
}
Timer的存储
Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。
Timer的源码分析
context.timerService().registerProcessingTimeTimer(currentProcessingTime); 会直接调用 InternalTimerServiceImpl.registerProcessingTimeTimer 方法
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
//registerProcessingTimeTimer 定时调用 onProcessingTimer 调用,
// 最终调用 triggerTarget.onProcessingTimer,比如 windowOperator.onProcessingTimer
// ScheduledThreadPoolExecutor.schedule
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
processingTimeTimersQueue 可以保证相同的 key 和 time 对应的 timer 只会注册一次。我们以 rocksdb 为例看细节
@Override
// 按照时间戳的顺序添加的,时间戳越大优先级越低
public boolean add(@Nonnull E toAdd) {
//会依据条件将 rocksdb 中 store 的 timer 存储到 orderedCache 中
checkRefillCacheFromStore();
final byte[] toAddBytes = serializeElement(toAdd);
// 默认 128
// orderedCache 通过 treeSet 进行操作的
final boolean cacheFull = orderedCache.isFull();
if ((!cacheFull && allElementsInCache) ||
LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
if (cacheFull) {
// we drop the element with lowest priority from the cache
orderedCache.pollLast();
// the dropped element is now only in the store
allElementsInCache = false;
}
if (orderedCache.add(toAddBytes)) {
// write-through sync
addToRocksDB(toAddBytes);
if (toAddBytes == orderedCache.peekFirst()) {
peekCache = null;
return true;
}
}
} else {
// we only added to the store
addToRocksDB(toAddBytes);
allElementsInCache = false;
}
return false;
}
orderedCache 是通过 treeSet 来实现的,所以 time + key + namespace (非window 是固定不变的) 为 treeMap 的 key 。新来的 timer 除了添加到 orderedCache 外还会添加到 rocksdb。 添加完成之后,就正式开始注册 定时任务了。当定时任务开始执行时,调用
@Override
// registerProcessingTimeTimer 定时调用 onProcessingTime
// time 设定的那个 timestamp
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
// 小于这个 time 的所有 timer 都会被触发
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
// windowOperator onProcessingTime
// 自己定义的 timer
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
//再次创建 timer
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
至此的话,自己写的 Timer 方法就被执行了。 当然还有一些更细节的东西,比如 timer restore ,timer snapshot ,startTimerService 等读者自己可以依需查看
Timer的其他情况
Window Operator 的 Timer 与此类似
- 浅谈MySQL的事务隔离级别
- 国内环境下前端网页开发的几个“中国特色”代码
- 从源码的角度再看 React JS 中的 setState
- Sass 与Compass 在WordPress 主题开发中的运用
- Python爬虫Scrapy入门看这篇就够了
- Clef:为你的WordPress 站点添加两步验证
- JavaScript 基础(六) 数组方法 闭包
- 【译】WordPress 中的50个过滤器(4):第21-30个过滤器
- 【译】WordPress 中的50个过滤器(3):第11-20个过滤器
- 【译】WordPress 中的50个过滤器(2):先介绍10个过滤器
- 【译】WordPress 中的50个过滤器(1):何为过滤器?
- 哪种芯片架构将成为人工智能时代的开路先锋
- 算法系列(三)
- Facebook、Google、Amazon 是如何高效开会的
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 「Mysql索引原理(十六)」维护索引和表-更新索引统计信息
- 「Mysql索引原理(十七)」维护索引和表-减少索引和数据的碎片
- 「通信框架Netty4 源码解读(一)」起步,关于IO的简单总结,模拟一个redis客户端
- Unet实现文档图像去噪、去水印
- 「influxDB 原理与实践(一)」安装部署,实现基础的添加删除查询功能
- 「influxDB 原理与实践(二)」详解influxDB的写入与查询
- Nginx系列:https配置
- 笛卡尔积、等值连接、自然连接、外连接一文看懂
- nginx系列:常用利用shell统计日志
- Nginx系列:图片过滤处理
- Nginx系列:几款负载均衡第三方插件的安装与使用
- 「高并发通信框架Netty4 源码解读(三)」NIO缓冲区Buffer详解
- UML类图符号:各种关系说明以及举例
- 「高并发通信框架Netty4 源码解读(四)」NIO缓冲区之字节缓冲区ByteBuffer详解
- 「influxDB 原理与实践(三)」连续查询