一文搞懂 Flink Timer

时间:2022-07-24
本文章向大家介绍一文搞懂 Flink Timer,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

什么是 Timer

顾名思义就是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的

Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

真实的事件样例,告诉我 只要 key 不同是不会并发修改的,如果一直都是完全相同的 key ,比如我的 key 一直都是 1,完全是会并发修改的。key 的重复性越高,并发修改的可能性就越大,除非 rocksdb 自身保证同一个每个 key ( rocksdb key ) 的事务。

Timer 的使用

public class KeyedProcessFunctionImp extends KeyedProcessFunction<String, Tuple2<String, Object>, Tuple2<String, String>> {
		@Override
	public void open(Configuration parameters) throws Exception {
	}
	
	@Override
	public void close() throws Exception {
	}
	
	@Override
	public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<Tuple2<String, String>> collector) throws Exception {
		System.out.println("注册一个 timer");
		long currentProcessingTime = context.timerService().currentProcessingTime() / 1000 * 1000 + 60 * 1000;
		context.timerService().registerProcessingTimeTimer(currentProcessingTime);
			
	}
	
	
	@Override
	//TODO timer 与 process 同时发生
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
		System.out.println("我是一个 timer");
		}
	}

Timer的存储

Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。

Timer的源码分析

context.timerService().registerProcessingTimeTimer(currentProcessingTime); 会直接调用 InternalTimerServiceImpl.registerProcessingTimeTimer 方法

public void registerProcessingTimeTimer(N namespace, long time) {
		InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
		if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
			// check if we need to re-schedule our timer to earlier
			if (time < nextTriggerTime) {
				if (nextTimer != null) {
					nextTimer.cancel(false);
				}
				//registerProcessingTimeTimer 定时调用 onProcessingTimer 调用,
				// 最终调用 triggerTarget.onProcessingTimer,比如 windowOperator.onProcessingTimer
				// ScheduledThreadPoolExecutor.schedule
				nextTimer = processingTimeService.registerTimer(time, this);
			}
		}
	}

processingTimeTimersQueue 可以保证相同的 key 和 time 对应的 timer 只会注册一次。我们以 rocksdb 为例看细节

@Override
	// 按照时间戳的顺序添加的,时间戳越大优先级越低
	public boolean add(@Nonnull E toAdd) {
		
		//会依据条件将 rocksdb 中 store 的 timer 存储到 orderedCache 中
		checkRefillCacheFromStore();

		final byte[] toAddBytes = serializeElement(toAdd);

		// 默认 128
		// orderedCache 通过 treeSet 进行操作的
		final boolean cacheFull = orderedCache.isFull();

		if ((!cacheFull && allElementsInCache) ||
			LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {

			if (cacheFull) {
				// we drop the element with lowest priority from the cache
				orderedCache.pollLast();
				// the dropped element is now only in the store
				allElementsInCache = false;
			}

			if (orderedCache.add(toAddBytes)) {
				// write-through sync
				addToRocksDB(toAddBytes);
				if (toAddBytes == orderedCache.peekFirst()) {
					peekCache = null;
					return true;
				}
			}
		} else {
			// we only added to the store
			addToRocksDB(toAddBytes);
			allElementsInCache = false;
		}
		return false;
	}

orderedCache 是通过 treeSet 来实现的,所以 time + key + namespace (非window 是固定不变的) 为 treeMap 的 key 。新来的 timer 除了添加到 orderedCache 外还会添加到 rocksdb。 添加完成之后,就正式开始注册 定时任务了。当定时任务开始执行时,调用

@Override
	// registerProcessingTimeTimer 定时调用 onProcessingTime
	// time 设定的那个 timestamp
	public void onProcessingTime(long time) throws Exception {
		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
		// inside the callback.
		nextTimer = null;

		InternalTimer<K, N> timer;

		// 小于这个 time 的所有 timer 都会被触发
		while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
			processingTimeTimersQueue.poll();
			keyContext.setCurrentKey(timer.getKey());
			// windowOperator onProcessingTime
			// 自己定义的 timer
			triggerTarget.onProcessingTime(timer);
		}

		if (timer != null && nextTimer == null) {
			//再次创建 timer
			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
		}
	}

至此的话,自己写的 Timer 方法就被执行了。 当然还有一些更细节的东西,比如 timer restore ,timer snapshot ,startTimerService 等读者自己可以依需查看

Timer的其他情况

Window Operator 的 Timer 与此类似