基于Flink快速开发实时TopN程序最简单的思路
时间:2022-07-26
本文章向大家介绍基于Flink快速开发实时TopN程序最简单的思路,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标(如出现次数)计算排名并快速出发出更新后的排行榜。
我们以统计词频为例展示一下如何快速开发一个计算TopN的flink程序。
Flink支持各种各样的流数据接口作为数据的数据源,本次demo我们采用内置的socketTextStream作为数据数据源。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义
DataStream<String> text = env.socketTextStream(hostName, port); //监听指定socket端口作为输入
与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加。
DataStream<Tuple2<String, Integer>> ds = text
.flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型
private static final class LineSplitter implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0) //按照Tuple2<String, Integer>的第一个元素为key,也就是单词
.window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
//key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
.sum(1);// 将相同的key的元素第二个count值相加
全局TopN
数据流经过前面的处理后会每20s计算一次各个单词的count值并发送到下游窗口。
DataStream<Tuple2<String, Integer>> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
//所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
.process(new TopNAllFunction(5));//计算该窗口TopN
windowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。
private static class TopNAllFunction
extends
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input,
Collector<Tuple2<String, Integer>> out) throws Exception {
// TODO Auto-generated method stub
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
new Comparator<Integer>() {
@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}
}); //treemap按照key降序排列,相同count值不覆盖
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN个元素
treemap.pollLastEntry();
}
}
for (Entry<Integer, Tuple2<String, Integer>> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
分组TopN
在部分场景下,用户希望根据不同的分组进行排序,计算出每个分组的一个排行榜。
wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
.process(new TopNFunction(5)) //分组TopN统计
private static class TupleKeySelectorByStart implements
KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
// TODO Auto-generated method stub
return value.f0.substring(0, 1); //取首字母做key
}
}
/**
*
*针对keyby window的TopN函数,继承自ProcessWindowFunction
*
*/
private static class TopNFunction
extends
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
private int topSize = 10;
public TopNFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(
String arg0,
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context arg1,
Iterable<Tuple2<String, Integer>> input,
Collector<Tuple2<String, Integer>> out) throws Exception {
// TODO Auto-generated method stub
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
new Comparator<Integer>() {
@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}
});
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) {
treemap.pollLastEntry();
}
}
for (Entry<Integer, Tuple2<String, Integer>> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
上面的代码实现了按照首字母分组,取每组元素count最高的TopN方法。
嵌套TopN
全局topN的缺陷是,由于windowall是一个全局并发为1的操作,所有的数据只能汇集到一个节点进行 TopN 的计算,那么计算能力就会受限于单台机器,容易产生数据热点问题。
解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。
- 注册中心 Eureka 源码解析 —— Eureka-Client 初始化(二)之 EurekaClientConfig
- Golang 中"泛型"的支持
- 编码习惯之Controller规范
- Go-简洁的并发
- 多线程基础之Runnable/Thread与Callable
- 使用Ldoc给Lua生成文档
- MySQL 读写分离
- Mac必备软件集之Brew
- spring系列之自定义扩展PropertyPlaceHolderConfigurer
- 并发编程之ReentrantLock
- 【学术】一篇关于机器学习中的稀疏矩阵的介绍
- 浅谈java中extends与implements的区别
- 并发编程之读写锁
- 类数组借用数组方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- SAP Spartacus 读取payment detail数据的API
- SRCMS 多处越权+权限提升管理员漏洞
- SAP Spartacus把指定产品添加到购物车的API
- MyBatis源码解析之基础模块—Plugin
- php框架slim架构上存在XXE漏洞(XXE的典型存在形式)
- 个人博客搭建
- 安全箱子的秘密
- Linux 机器 CPU 毛刺问题排查
- phpwind 利用哈希长度扩展攻击进行getshell
- dotnet 构建 SourceRoot items must include at least one top-level item when DeterministicSourcePaths is
- Magicodes.IE 2.4版本发布
- EC2磁盘扩容-DiskPressure
- 谈一谈php://filter的妙用
- 新型php漏洞挖掘之debug导致的安全漏洞(Edusoho)
- CVE-2016-3714 - ImageMagick 命令执行分析