flink实战-使用自定义聚合函数统计网站TP指标
- 背景
- 自定义聚合函数
- 实例讲解
背景
在网站性能测试中,我们经常会选择 TP50、TP95 或者 TP99 等作为性能指标。接下来我们讲讲这些指标的含义、以及在flink中如何实时统计:
- TP50,top percent 50,即 50% 的数据都满足某一条件;
- TP95,top percent 95,即 95% 的数据都满足某一条件;
- TP99,top percent 99,即 99% 的数据都满足某一条件;
我们举一个例子,我们要统计网站一分钟之内的的响应时间的TP90,正常的处理逻辑就是把这一分钟之内所有的网站的响应时间从小到大排序,然后计算出总条数count,然后计算出排名在90%的响应时间是多少(count*0.9),就是我们要的值。
自定义聚合函数
这个需求很明显就是一个使用聚合函数来做的案例,Flink中提供了大量的聚合函数,比如count,max,min等等,但是对于这个需求,却无法满足,所以我们需要自定义一个聚合函数来实现我们的需求。
在前段时间,我们聊了聊flink的聚合算子,具体可参考: flink实战-聊一聊flink中的聚合算子 , 聚合算子是我们在写代码的时候用来实现一个聚合功能,聚合函数其实和聚合算子类似,只不过聚合函数用于在写sql的时候使用。
自定义聚合函数需要继承抽象类org.apache.flink.table.functions.AggregateFunction。并实现下面几个方法。
- createAccumulator():这个方法会在一次聚合操作的开始调用一次,主要用于构造一个Accumulator,用于存储在聚合过程中的临时对象。
- accumulate() 这个方法,每来一条数据会调用一次这个方法,我们就在这个方法里实现我们的聚合函数的具体逻辑。
- getValue() 这个方法是在聚合结束以后,对中间结果做处理,然后将结果返回,最终sql中得到的结果数据就是这个值。
实例讲解
对于TP指标,正常的思路我们可以先创建一个临时变量,里面有一个list,每来一个数据,就放到这个list里面,在getValue方法里,进行排序,取相应的TP值。
但是这种思路会有一个问题,就是如果要聚合的时间范围内,数据过多的话。就会在list存储大量的数据,会造成checkpoint过大,时间过长,最后导致程序失败。得不到正确的结果。
所以我们需要换一个思路,既然最后我们想要的是一个有序列表,那么我们是不是可以把这个list结构优化一下,使用Treemap来存储,map的key就是指标,比如响应时间。value就是对应的指标出现的次数。这样getValue方法里,只需要将map的value值累加,就能得到总数count,然后计算出来相应的tp值的位置position,最后我们再从头累加map的value,直到累加结果大于相应的位置position,则map的key即为所求。
示例如下:我们先构建一个source,只是随机生成一个变量,网站的相应时间response_time。
String sql = "CREATE TABLE source (n" +
" response_time INT,n" +
" ts AS localtimestamp,n" +
" WATERMARK FOR ts AS ts," +
"proctime as proctime()n" +
") WITH (n" +
" 'connector' = 'datagen',n" +
" 'rows-per-second'='1000',n" +
" 'fields.response_time.min'='1',n" +
" 'fields.response_time.max'='1000'" +
")";
定义一个聚合函数用的临时变量:
public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}
实现自定义聚合函数类
public static class TP extends AggregateFunction<Integer,TPAccum>{
@Override
public TPAccum createAccumulator(){
return new TPAccum();
}
@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);
int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}
}
return responseTime;
}
}
public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}
}
实际的查询sql如下:
String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";
完整代码请参考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP动态特性的捕捉与逃逸
- ES6 随性学习之 新增数据类型 Symbol
- 最简单入门深度学习
- 持续集成-Jenkins常用插件安装
- 小白学PyTorch | 15 TF2实现一个简单的服装分类任务
- 小白学PyTorch | 16 TF2读取图片的方法
- 小白学PyTorch | 17 TFrec文件的创建与读取
- 小白学PyTorch | 18 TF2构建自定义模型
- 扩展之Tensorflow2.0 | 19 TF2模型的存储与载入
- 扩展之Tensorflow2.0 | 20 TF2的eager模式与求导
- Django JSONField SQL注入漏洞(CVE-2019-14234)分析与影响
- 扩展之Tensorflow2.0 | 21 Keras的API详解(上)卷积、激活、初始化、正则
- 持续代码质量管理-SonarQube-7.3部署
- 攻击Scrapyd爬虫
- 【webpack】从vue-cli 2x 到 3x 迁移与实践