Flink 自定义触发器实现带超时时间的 CountWindow
时间:2022-07-22
本文章向大家介绍Flink 自定义触发器实现带超时时间的 CountWindow,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Flink 的 window 有两个基本款,TimeWindow 和 CountWindow。 TimeWindow 是到时间就触发窗口,CountWindow 是到数量就触发。
如果我需要到时间就触发,并且到时间之前如果已经积累了足够数量的数据;或者在限定时间内没有积累足够数量的数据,我依然希望触发窗口业务,那么就需要自定义触发器。
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 带超时的计数窗口触发器
*/
public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);
/**
* 窗口最大数据量
*/
private int maxCount;
/**
* event time / process time
*/
private TimeCharacteristic timeType;
/**
* 用于储存窗口当前数据量的状态对象
*/
private ReducingStateDescriptor<Long> countStateDescriptor =
new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);
public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {
this.maxCount = maxCount;
this.timeType = timeType;
}
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.add(1L);
if (countState.get() >= maxCount) {
LOG.info("fire with count: " + countState.get());
return fireAndPurge(window, ctx);
}
if (timestamp >= window.getEnd()) {
LOG.info("fire with tiem: " + timestamp);
return fireAndPurge(window, ctx);
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.ProcessingTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
LOG.info("fire with process tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.EventTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
LOG.info("fire with event tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.clear();
}
/**
* 计数方法
*/
class Sum implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
使用示例(超时时间 10 秒,数据量上限 1000):
stream
.timeWindowAll(Time.seconds(10))
.trigger(
new CountTriggerWithTimeout(1000, TimeCharacteristic.ProcessingTime)
)
.process(new XxxxWindowProcessFunction())
.addSink(new XxxSinkFunction())
.name("Xxx");
即可。
- 来一波Linux中查看cpu、磁盘、内存、网络的命令
- PXE+kickstart网络安装CentOS7.4系统及过程中各种报错
- [MSDN]通过避免下列 10 个常见 ASP.NET 缺陷使网站平稳运行
- Flask-配置与调试
- PXE+kickstart网络安装CentOS7.4系统及过程中各种报错
- MSDN官方的ASP.Net异步页面的经典示例代码
- Nginx反向代理、负载均衡功能
- 2018即将面临的12个云安全风险
- Flask快速入门,知识整理
- python的reduce()函数
- 企业级memcached缓存数据库结合php使用与web管理memcached
- python中星号的意义(**字典,*列表或元组)
- Python 实现抽象类的两种方式+邮件提醒+动态导入模块+反射(参考Django中间件源码)
- Python 实现抽象类的两种方式+邮件提醒+动态导入模块+反射(参考Django中间件源码)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- dubbo 启动Failed to save registry store file报错
- 大数据的列式存储格式:Parquet
- springBoot 入门(六)—— 整合Spring框架开启自带的任务调度器执行任务(注解方式)
- java字节流入门(缓冲输出流)
- EsotericSoftware Kryo —— 官方(1)
- Java的I/O类库的基本架构一句话介绍
- 树莓派综合项目1:智能温度测量系统实验
- 一句话ListenableFuture简介
- Google Guava Cache 使用
- Linux下安装maven3.6.2遇到的问题
- Unity3D网络通讯(五)--Socket通讯之Udp通讯
- IP不能作为rocketmq 的topic
- Unity3D网络通讯(六)-- UnityWebRequest实现WebService通讯
- Java Maven编译时没问题,运行时报java.lang.NoSuchMethodError
- 树莓派基础实验34:L298N模块驱动直流电机实验