flink实战-模拟简易双11实时统计大屏
背景
在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。除了这个,还有一些其他场景的应用,比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等,其实做法都是类似的。
今天我们就做一个最简单的模拟电商统计大屏的小例子,我们抽取一下最简单的需求。
- 实时计算出当天零点截止到当前时间的销售总额
- 计算出各个分类的销售top3
- 每秒钟更新一次统计结果
实例讲解
构造数据
首先我们通过自定义source 模拟订单的生成,生成了一个Tuple2,第一个元素是分类,第二个元素表示这个分类下产生的订单金额,金额我们通过随机生成.
/**
* 模拟生成某一个分类下的订单生成
*/
public static class MySource implements SourceFunction<Tuple2<String,Double>>{
private volatile boolean isRunning = true;
private Random random = new Random();
String category[] = {
"女装", "男装",
"图书", "家电",
"洗护", "美妆",
"运动", "游戏",
"户外", "家具",
"乐器", "办公"
};
@Override
public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{
while (isRunning){
Thread.sleep(10);
//某一个分类
String c = category[(int) (Math.random() * (category.length - 1))];
//某一个分类下产生了price的成交订单
double price = random.nextDouble() * 100;
ctx.collect(Tuple2.of(c, price));
}
}
@Override
public void cancel(){
isRunning = false;
}
}
构造统计结果类
public static class CategoryPojo{
// 分类名称
private String category;
// 改分类总销售额
private double totalPrice;
// 截止到当前时间的时间
private String dateTime;
getter and setter ........
}
定义窗口和触发器
DataStream<CategoryPojo> result = dataStream.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.days(
1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
1)))
.aggregate(
new PriceAggregate(),
new WindowResult()
);
首先我们定义一个窗口期是一天的滚动窗口,然后设置一个1秒钟的触发器,之后进行聚合计算.
集合计算
private static class PriceAggregate
implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
@Override
public Double createAccumulator(){
return 0D;
}
@Override
public Double add(Tuple2<String,Double> value, Double accumulator){
return accumulator + value.f1;
}
@Override
public Double getResult(Double accumulator){
return accumulator;
}
@Override
public Double merge(Double a, Double b){
return a + b;
}
}
聚合计算也比较简单,其实就是对price的简单sum操作
收集窗口结果数据
private static class WindowResult
implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<Double> input,
Collector<CategoryPojo> out) throws Exception{
CategoryPojo categoryPojo = new CategoryPojo();
categoryPojo.setCategory(((Tuple1<String>) key).f0);
BigDecimal bg = new BigDecimal(input.iterator().next());
double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
categoryPojo.setTotalPrice(p);
categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
out.collect(categoryPojo);
}
}
我们最聚合的结果进行简单的封装,封装成CategoryPojo类以便后续处理
使用聚合窗口的结果
result.keyBy("dateTime")
.window(TumblingProcessingTimeWindows.of(Time.seconds(
1)))
.process(new WindowResultProcess());
接下来我们要使用上面聚合的结果,所以我们使用上面的window聚合结果流又定义了时间是1秒的滚动窗口.
如何使用窗口的结果,可以参考flink的官网[1]
结果统计
接下来我们做最后的结果统计,在这里,我们会把各个分类的总价加起来,就是全站的总销量金额,然后我们同时使用优先级队列计算出分类销售的Top3,打印出结果,在生产过程中我们可以把这个结果数据发到hbase或者redis等外部存储,以供前端的实时页面展示。
private static class WindowResultProcess
extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{
@Override
public void process(
Tuple tuple,
Context context,
Iterable<CategoryPojo> elements,
Collector<Object> out) throws Exception{
String date = ((Tuple1<String>) tuple).f0;
Queue<CategoryPojo> queue = new PriorityQueue<>(
3,
(o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
double price = 0D;
Iterator<CategoryPojo> iterator = elements.iterator();
int s = 0;
while (iterator.hasNext()){
CategoryPojo categoryPojo = iterator.next();
if (queue.size() < 3){
queue.add(categoryPojo);
} else {
CategoryPojo tmp = queue.peek();
if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){
queue.poll();
queue.add(categoryPojo);
}
}
price += categoryPojo.getTotalPrice();
}
List<String> list = queue.stream()
.sorted((o1, o2)->o1.getTotalPrice() <=
o2.getTotalPrice() ? 1 : -1)
.map(f->"(分类:" + f.getCategory() + " 销售额:" +
f.getTotalPrice() + ")")
.collect(
Collectors.toList());
System.out.println("时间 : " + date + " 总价 : " + price + " top3 " +
StringUtils.join(list, ","));
System.out.println("-------------");
}
}
示例运行结果
3> CategoryPojo{category='户外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='游戏', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='洗护', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='运动', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='乐器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='图书', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='家电', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='男装', totalPrice=796.06, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='女装', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='美妆', totalPrice=768.37, dateTime=2020-06-13 22:55:34}
时间 : 2020-06-13 22:55:34 总价 : 9617.050000000001 top3 (分类:家电 销售额:1225.34),(分类:女装 销售额:1018.88),(分类:图书 销售额:1010.12)
完整的代码请参考
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java
参考资料 【1】https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#working-with-window-results
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- pytest文档57-单元测试代码覆盖率(pytest-cov)
- pytest文档58-随机执行测试用例(pytest-random-order)
- Kubernetes探针踩坑记
- 大揭秘| 我司项目组Gitlab Flow && DevOps流程
- 离线安装Superset 0.37(截图详细版)
- 如何高速转储、索引和第7层网络流量过滤?
- 爬虫 | JS逆向某验滑动加密(二)
- 闲聊 Kotlin-Native (0) - 我们为什么应该关注一下 Kotlin Native?
- 哈佛大学单细胞课程|笔记汇总 (五)
- 通过源码理解IGMP v1的实现(基于linux1.2.13)
- 微服务下数据一致性的几种实现方式
- 关于mac electron设备权限申请的方法
- 两种实现方式 | 如何查看消费者组的消费情况
- 一致性hash算法(golang)
- 微服务安全吗?