Flink中Watermark定时生成源码分析
时间:2021-07-26
本文章向大家介绍Flink中Watermark定时生成源码分析,主要包括Flink中Watermark定时生成源码分析使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
watermark的生成策略有两种:一种是周期性生成,另外一种是根据特定标记生成。在实际使用中大多数情况下会选择周期性生成方式也就是AssignerWithPeriodicWatermarks方式,使用方式如下:
//指定为evenTime时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //生成watermark的周期 env.getConfig.setAutoWatermarkInterval(watermarkInterval) //指定方式 dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) { override def extractTimestamp(element: Element): Long = element.dT })
assignTimestampsAndWatermarks 可以理解为是一个算子转换操作,等同于map/window一样理解,可以为其设置并行度、名称,也是一个transformation/operator,
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
在生成的jobGraph中,也是作为其中的一部分:
默认的名称就是 Timestamps/Watermarks。
接下来深入分析其使用的StreamOperator类型TimestampsAndPeriodicWatermarksOperator,其继承了AbstractUdfStreamOperator,实现了OneInputStreamOperator接口与ProcessingTimeCallback接口,具体包含的方法:
open方法:
public void open() throws Exception { super.open(); //初始化默认当前watermark currentWatermark = Long.MIN_VALUE; //生成watermark周期时间配置 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); //注册定时其配置 if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } }
最重要的就是getProcessingTimeService().registerTimer 注册一个watermarkInterval后触发的定时器,传入回调参数是this,也就是会调用当前对象的onProcessingTime方法(关于这部分知识可以查看Flink的定时系列)。
processElement方法:
public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); }
提取当前的事件时间,在BoundedOutOfOrdernessTimestampExtractor中会保存当前最大的事件时间。
onProcessingTime方法:
public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); //当新的watermark大于当前的watermark if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); //将符合要求的watermark发送出去 output.emitWatermark(newWatermark); } //注册下一次触发时间 long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
该方法表示的就是定时回调的方法,将符合要求的watermark发送出去并且注册下一个定时器。另外该方法与processElement方法是两个互斥的方法,内部使用了同一把锁做同步控制。
processWatermark方法:
public void processWatermark(Watermark mark) throws Exception { if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } }
用来处理上游发送过来的watermark,可以认为不做任何处理,下游的watermark只与其上游最近的生成方式相关。
原文地址:https://www.cnblogs.com/gentlescholar/p/15061000.html
- hql中setDate和setTimeStamp的区别
- Spring源码解析——如何阅读源码
- java获取当前时间和前一天日期
- 国内首个二代基因测序 FPGA 加速方案-背后的技术
- Java基础04 封装与接口
- Maven那点事儿(Eclipse版)
- 【Spring开发】—— Spring注入静态变量
- java判断list为空
- Java基础01 从HelloWorld到面向对象
- Java基础02 方法与数据成员
- 【Spring实战】—— 13 AspectJ注解切面
- EasyUI日期选择框
- Java基础03 构造器与方法重载
- web.xml is missing and <failOnMissingWebXml> is set to true
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使用IDEA写Python之pytest环境搭建及第一个程序编写
- PAT (Basic Level) Practice (中文)1026 程序运行时间
- PAT (Basic Level) Practice (中文)1091 N-自守数
- PAT (Basic Level) Practice (中文)1007 素数对猜想
- PAT (Basic Level) Practice (中文)1019 数字黑洞
- PAT (Basic Level) Practice (中文)1022 D进制的A+B
- 记一次线上商城系统高并发的优化
- 15个必须知道的JavaScript数组方法
- RedLock究竟是不是Redis分布式锁分布式环境下的银弹?
- PAT (Basic Level) Practice (中文)1023 组个最小数
- PAT (Basic Level) Practice (中文)1041 考试座位号
- PAT (Basic Level) Practice (中文)1042 字符统计
- 搭建 Apache Jmeter 分布式压测与监控,真那么难搞定?|实战干货
- PAT (Basic Level) Practice (中文)1056 组合数的和
- PAT (Basic Level) Practice (中文)1057 数零壹