flink实战教程-使用set实时计算当天网站uv
时间:2022-07-25
本文章向大家介绍flink实战教程-使用set实时计算当天网站uv,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
背景
对于web网站,我们一般会有这样的需求,实时的计算出来当天网站的uv,尽可能快的展示出来。今天我们就讲一下基于java的set集合做一下实时uv的统计。
简易需求:
- 实时计算出当天零点截止到当前时间各个端(android,ios,h5)下的uv
- 每秒钟更新一次统计结果
案例讲解
模拟source
首先我们模拟生成一下最简单的数据,生成一个flink的二元组Tuple2.分别表示分类和用户id
public static class MySource implements SourceFunction<Tuple2<String,Integer>>{
private volatile boolean isRunning = true;
String category[] = {"Android", "IOS", "H5"};
@Override
public void run(SourceContext<Tuple2<String,Integer>> ctx) throws Exception{
while (isRunning){
Thread.sleep(10);
//具体是哪个端的用户
String type = category[(int) (Math.random() * (category.length))];
//随机生成10000以内的int类型数据作为userid
int userid = (int) (Math.random() * 10000);
ctx.collect(Tuple2.of(type, userid));
}
}
@Override
public void cancel(){
isRunning = false;
}
}
定义窗口
接下来我们定义一个周期是一天的滑动窗口,因为我们要每秒钟输出窗口的数据,所以我们紧接着窗口定义了一个1秒的触发器。
DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new MySource());
dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.aggregate(new MyAggregate(),new WindowResult())
.print();
自定义聚合算子
接下来我们自定义一个聚合算子来实现该功能。
对于聚合算子的理解可以参考这个文章:
https://mp.weixin.qq.com/s/ZCWexNGzhSchRpxipa1x-g
public static class MyAggregate
implements AggregateFunction<Tuple2<String,Integer>,Set<Integer>,Integer>{
@Override
public Set<Integer> createAccumulator(){
return new HashSet<>();
}
@Override
public Set<Integer> add(Tuple2<String,Integer> value, Set<Integer> accumulator){
accumulator.add(value.f1);
return accumulator;
}
@Override
public Integer getResult(Set<Integer> accumulator){
return accumulator.size();
}
@Override
public Set<Integer> merge(Set<Integer> a, Set<Integer> b){
a.addAll(b);
return a;
}
}
处理输出结果
我们这里将结果输出到控制台,实际的生产中我们可以将数据写入redis或者hbase等。
1> Result{, dateTime='2020-06-21 19:23:30'type='IOS', uv=136}
2> Result{, dateTime='2020-06-21 19:23:30'type='Android', uv=150}
1> Result{, dateTime='2020-06-21 19:23:30'type='H5', uv=134}
1> Result{, dateTime='2020-06-21 19:23:31'type='IOS', uv=164}
2> Result{, dateTime='2020-06-21 19:23:31'type='Android', uv=177}
1> Result{, dateTime='2020-06-21 19:23:31'type='H5', uv=167}
2> Result{, dateTime='2020-06-21 19:23:32'type='Android', uv=205}
1> Result{, dateTime='2020-06-21 19:23:32'type='IOS', uv=193}
1> Result{, dateTime='2020-06-21 19:23:32'type='H5', uv=198}
完整代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/RealTimePvUv_Set.java
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法