presto自定义函数开发

时间:2020-03-24
本文章向大家介绍presto自定义函数开发,主要包括presto自定义函数开发使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1 Presto介绍

Presto 是 Facebook 开源的分布式查询引擎,在交互式查询任务中担当着重要的职责。随着越来越多的人开始使用 SQL 在 Presto 上分析数据,我们发现需要将一些业务逻辑开发成类似 Hive 中的 UDF,提高 SQL 使用人员的效率,同时也保证 Hive 和 Presto 环境中的 UDF 统一。

1.1 Presto函数介绍

在 Presto 中,函数大体分为三种:scalar,aggregation 和 window 类型。分别如下:

1)scalar标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态。

2)aggregation累积状态的函数,或聚集函数,如count,avg。如果只是单节点,单机状态可以直接用一个变量存储即可,但是presto是分布式计算引擎,状态数据会在多个节点之间传输,因此状态数据需要被序列化成 Presto 的内部格式才可以被传输。

3)window 窗口函数,如同sparkSQL中的窗口函数类似

2 自定义函数实现

官网地址:https://prestodb.github.io/docs/current/develop/functions.html

2.1自定义Scalar函数的实现

2.1.1定义一个java类

1)用 @ScalarFunction 的 Annotation 标记实现业务逻辑的静态方法。

2)用 @Description 描述函数的作用,这里的内容会在 SHOW FUNCTIONS 中显示。

3)用@SqlType 标记函数的返回值类型,如返回字符串,因此是 StandardTypes.VARCHAR。

4)Java 方法的返回值必须使用 Presto 内部的序列化方式,因此字符串类型必须返回 Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型

示例代码:

 1 public class LiulishuoFunctions {
 2 
 3   public static final String DATE_FORMAT = "yyyy-MM-dd";
 4 
 5   @ScalarFunction
 6   @Description("hive to_date function")
 7   @SqlType(StandardTypes.VARCHAR)
 8   public static Slice to_date(@SqlType(StandardTypes.TIMESTAMP) long input) {
 9       final DateFormat format = new SimpleDateFormat(DATE_FORMAT);
10       return Slices.utf8Slice(format.format(new Date(input)));
11   }
12  }

2.1.2 Presto插件机制

presto不能像hive那样配置自定义的udf,要采用这种插件机制实现。Presto 的插件(Plugin)机制,是 Presto 能够整合多种数据源的核心。通过实现不同的 Plugin,Presto 允许用户在不同类型的数据源之间进行 JOIN 等计算。Presto 内部的所有数据源都是通过插件机制实现, 例如 MySQL、Hive、HBase等。Presto 插件机制不仅通过加载 Connector 来实现不同数据源的访问,还通过加载 FunctionFactory 来实现 UDF 的加载。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 规范, 实现非常简单。

实现一个plugin接口如:

 1 import com.facebook.presto.spi.Plugin;
 2 
 3 import com.google.common.collect.ImmutableSet;
 4  6 
 7 import java.util.Set;
 8 
 9 public class PrestoFunctionsPlugin implements Plugin {
10      @Override
11      public Set<Class<?>> getFunctions() {
12         return ImmutableSet.<Class<?>>builder()
13                  .add(PvFlowStatsAggregation.class)
14                  .add(AvgAggregationDemo.class)
15                   .build();
16     }
17 
18 }

最后打包上传到指定的presto的plugin目录下,需要重启presto才能将jar中的自定义函数加载进去

2.2 自定义Aggregation函数

2.2.1实现原理步骤

Presto 把 Aggregation 函数分解成三个步骤执行:

1、input(state, data): 针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。

2、combine(state1, state2):将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。

3、output(final_state, out):最终输出结果到一个 BlockBuilder。

2.2.2 具体代码实现过程

1、定义一个 Java 类,使用 @AggregationFunction 标记为 Aggregation 函数

2、使用 @InputFunction、 @CombineFunction、@OutputFunction 分别标记计算函数、合并结果函数和最终输出函数在 Plugin 处注册 Aggregation 函数

3、一个继承AccumulatorState的State接口,get和set方法

4、并使用 @AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 类信息(stateFactoryClass指定)。自己写一个序列化类和一个工厂类。

核心代码示例:

 1 @AggregationFunction("pv_stats")
 2 public class PvFlowStatsAggregation {
 3     private PvFlowStatsAggregation() {}
 4     @AccumulatorStateMetadata(stateSerializerClass = PvFlowStatsStateSerializer.class,         stateFactoryClass = PvFlowStatsFactory.class)
 5 
 6     public interface State extends AccumulatorState {
 7         PvFlowStats get();
 8         void set(PvFlowStats value);
 9     }
10 
11 @InputFunction
12 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id,
13 
14                              @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder) {
15         handleDataInput(state, id, serialisedTree, pvOrder, null);
16     }
17 
18 @InputFunction
19 
20 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id,
21 
22                              @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder,
23                              @SqlType(StandardTypes.VARCHAR) Slice endUrl) {
24         handleDataInput(state, id, serialisedTree, pvOrder, endUrl);
25     }
26 
27     private static void handleDataInput(State state, long id, Slice serialisedTree, Slice pvOrder, Slice endUrl) {
28 
29         PvFlowStats stats = state.get();
30         if (stats == null) {
31             stats = new PvFlowStats();
32             state.set(stats);
33         }
34 ......
35 }
36     @CombineFunction
37    public static void combine(@AggregationState State state, State other) {
38 
39         PvFlowStats input = other.get();
40         PvFlowStats previous = state.get();
41         if (previous == null) {
42             state.set(input);
43         } else {
44             previous.mergeWith(input);
45         }
46     }
47     @OutputFunction(StandardTypes.VARCHAR)
48     public static void output(@AggregationState State state, BlockBuilder out) {
49         PvFlowStats stats = state.get();
50         if (stats == null) {
51             out.appendNull();
52             return;
53         }
54         // 统计
55         Slice result = stats.statisticNextPage();
56         if (result == null) {
57             out.appendNull();
58         } else {
59             VarcharType.VARCHAR.writeSlice(out, result);
60         }
61     }
62 }

2.2.3 复杂数据类型(list,map或自定义的类)

对于复杂的类型,需要自定义序列化类和工厂类,需要自己实现类的序列化和反序列化。

下面是示例:

主类:

 1 /*** **   id | value* ** ----+-------* **   2 | ddd* **   2 | ddd* **   1 | bbb* **   1 | bbb* **   1 | ccc* **   1 | aaa* **   1 | bbb* **   2 | aaa* **   2 | ccc* **   1 | ccc* ***   *返回* **  [{id:1,{aaa:1,ccc:2,bbb:3},{id:2,{aaa:1,ccc:1,ddd:2}]* **/*@AggregationFunction("presto_collect")
 2 public class CollectListAggregation {
 3 
 4     @AccumulatorStateMetadata(stateSerializerClass = CollectListStatsSerializer.class, stateFactoryClass = CollectListStatsFactory.class)
 5     public interface CollectState extends AccumulatorState {
 6         CollectListStats get();
 7         void set(CollectListStats value);
 8     }
 9    @InputFunction
10     public static void input(@AggregationState CollectState state, @SqlType(StandardTypes.*VARCHAR*) Slice id,@SqlType(StandardTypes.*VARCHAR*) Slice key) {
11         try {
12             CollectListStats stats = state.get();
13             if (stats == null) {
14                 stats = new CollectListStats();
15                 state.set(stats);
16             }
17             int inputId = Integer.*parseInt*(id.toStringUtf8());
18             String inputKey = key.toStringUtf8();
19             stats.addCollectList(inputId,inputKey, 1);
20        } catch (Exception e) {
21             throw new RuntimeException(e+" ---------  input err");
22         }
23     }
24 
25     @CombineFunction
26     public static void combine(@AggregationState CollectState state, CollectState otherState) {
27         try {
28             CollectListStats collectListStats = state.get();
29             CollectListStats oCollectListStats = otherState.get();
30             if(collectListStats == null) {
31                 state.set(oCollectListStats);
32             } else {
33                 collectListStats.mergeWith(oCollectListStats);
34             }
35         }catch (Exception e) {
36             throw new RuntimeException(e+" --------- combine err");
37         }
38     }
39 
40     @OutputFunction(StandardTypes.*VARCHAR*)
41     public static void output(@AggregationState CollectState state, BlockBuilder out) {
42         try {
43             CollectListStats stats = state.get();
44             if (stats == null) {
45                 out.appendNull();
46                 return;
47             }
48             // 统计
49             Slice result = stats.getCollectResult();
50             if (result == null) {
51                 out.appendNull();
52             } else {
53                 VarcharType.*VARCHAR*.writeSlice(out, result);
54             }
55         } catch (Exception e) {
56             throw new RuntimeException(e+" -------- output err");
57         }
58     }
59 }

主类实现的比较简单,input,combine,output即可

存放数据的类:此类需要实现数据的序列化和反序列化,这是最关键和比较麻烦的地方,贴一个例子,关键在于需要自己控制存储空间以及数据的顺序,和读取的时候按照一定顺序读取。对于字符要先存储长度,然后是字节,读取则先读取字符长度,然后读取这么长的数据,最后转化为字符

  1 public class CollectListStats {
  2     private static final int *INSTANCE_SIZE* = ClassLayout.*parseClass*(CollectListStats.class).instanceSize();
  3     //<id,<key,value>>
  4     private Map<Integer,Map<String,Integer>> collectContainer = new HashMap<>();
  5     private long contentEstimatedSize = 0;
  6     private int keyByteLen = 0;
  7     private int keyListLen = 0;
  8     CollectListStats() {
  9     }
 10     CollectListStats(Slice serialized) {
 11         deserialize(serialized);
 12     }
 13    void addCollectList(Integer id, String key, int value) {
 14         if (collectContainer.containsKey(id)) {
 15             Map<String, Integer> tmpMap = collectContainer.get(id);
 16             if (tmpMap.containsKey(key)) {
 17                 tmpMap.put(key, tmpMap.get(key)+value);
 18             }else{
 19                 tmpMap.put(key,value);
 20                 contentEstimatedSize += ( key.getBytes().length + SizeOf.*SIZE_OF_INT*);
 21                 keyByteLen += key.getBytes().length;
 22                 keyListLen++;
 23             }
 24         } else {
 25             Map<String,Integer> tmpMap = new HashMap<String,Integer>();
 26             tmpMap.put(key, value);
 27             keyByteLen += key.getBytes().length;
 28             keyListLen++;
 29             collectContainer.put(id, tmpMap);
 30             contentEstimatedSize += SizeOf.*SIZE_OF_INT*;
 31         }
 32     }
 33     //[{id:1,{"aaa":3,"fadf":6},{}]
 34     Slice getCollectResult() {
 35         Slice jsonSlice = null;
 36         try {
 37             StringBuilder jsonStr = new StringBuilder();
 38             jsonStr.append("[");
 39             int collectLength = collectContainer.entrySet().size();
 40             for (Map.Entry<Integer, Map<String, Integer>> mapEntry : collectContainer.entrySet()) {
 41                 Integer id = mapEntry.getKey();
 42                 Map<String, Integer> vMap = mapEntry.getValue();
 43                 jsonStr.append("{id:").append(id).append(",{");
 44                 int vLength = vMap.entrySet().size();
 45                 for (Map.Entry<String, Integer> vEntry : vMap.entrySet()) {
 46                     String key = vEntry.getKey();
 47                     Integer value = vEntry.getValue();
 48                     jsonStr.append(key).append(":").append(value);
 49                     vLength--;
 50                     if (vLength != 0) {
 51                         jsonStr.append(",");
 52                     }
 53                 }
 54                 jsonStr.append("}");
 55                 collectLength--;
 56                 if (collectLength != 0) {
 57                     jsonStr.append(",");
 58                 }
 59             }
 60             jsonStr.append("]");
 61             jsonSlice = Slices.*utf8Slice*(jsonStr.toString());
 62         } catch (Exception e) {
 63             throw new RuntimeException(e+" ---------- get CollectResult err");
 64         }
 65         return jsonSlice;
 66     }
 67     public void deserialize(Slice serialized) {
 68         try {
 69             SliceInput input = serialized.getInput();
 70             //外层map的长度
 71             int collectStatsEntrySize = input.readInt();
 72             for (int collectCnt = 0; collectCnt < collectStatsEntrySize; collectCnt++) {
 73 
 74                 int id = input.readInt();
 75                 int keyEntrySize = input.readInt();
 76                 for (int idCnt = 0; idCnt < keyEntrySize; idCnt++) {
 77                     int keyBytesLen = input.readInt();
 78                     byte[] keyBytes = new byte[keyBytesLen];
 79                     for (int byteIdx = 0; byteIdx < keyBytesLen; byteIdx++) {
 80                         keyBytes[byteIdx] = input.readByte();
 81                     }
 82                     String key = new String(keyBytes);
 83                     int value = input.readInt();
 84                     addCollectList(id, key, value);
 85                 }
 86             }
 87         } catch (Exception e) {
 88             throw new RuntimeException(e+" ----- deserialize err");
 89         }
 90     }
 91 
 92     public Slice serialize() {
 93         SliceOutput builder = null;
 94         int requiredBytes =                                                 //对应 SliceOutput builder append的内容所占用的空间
 95                 SizeOf.*SIZE_OF_INT* * 3                                      //id entry数目,id数值,key Entry数目
 96                 \+ keyListLen * SizeOf.*SIZE_OF_INT*                           //key bytes长度
 97                 \+ keyByteLen                                                //key byte总长度
 98                 \+ keyListLen * SizeOf.*SIZE_OF_INT*;                          //value
 99         try {
100             // 序列化
101             builder = Slices.*allocate*(requiredBytes).getOutput();
102             for (Map.Entry<Integer,Map<String, Integer>> entry : collectContainer.entrySet()) {
103                 //id个数
104                 builder.appendInt(collectContainer.entrySet().size());
105                 //id 数值
106                 builder.appendInt(entry.getKey());
107                 Map<String, Integer> kMap = entry.getValue();
108                 builder.appendInt(kMap.entrySet().size());
109                 for (Map.Entry<String, Integer> vEntry : kMap.entrySet()) {
110                     byte[] keyBytes = vEntry.getKey().getBytes();
111                     builder.appendInt(keyBytes.length);
112                     builder.appendBytes(keyBytes);
113                     builder.appendInt(vEntry.getValue());
114                 }
115             }
116             return builder.getUnderlyingSlice();
117         } catch (Exception e) {
118             throw new RuntimeException(e+" ---- serialize err  requiredBytes = " + requiredBytes + " keyByteLen= " + keyByteLen + " keyListLen = " + keyListLen);
119        }
120     }
121     long estimatedInMemorySize() {
122         return *INSTANCE_SIZE* + contentEstimatedSize;
123     }
124     void mergeWith(CollectListStats other) {
125         if (other == null) {
126             return;
127         }
128        for (Map.Entry<Integer,Map<String, Integer>> cEntry : other.collectContainer.entrySet()) {
129             Integer id = cEntry.getKey();
130             Map<String, Integer> kMap = cEntry.getValue();
131             for (Map.Entry<String, Integer> kEntry : kMap.entrySet()) {
132                 addCollectList(id, kEntry.getKey(), kEntry.getValue());
133             }
134         }
135     }
136 }

序列化类:

 1 public class CollectListStatsSerializer implements AccumulatorStateSerializer<CollectListAggregation.CollectState> {
 2     @Override
 3     public Type getSerializedType() {
 4         return *VARBINARY*;
 5     }
 6     @Override
 7     public void serialize(CollectListAggregation.CollectState state, BlockBuilder out) {
 8         if (state.get() == null) {
 9             out.appendNull();
10         } else {
11             *VARBINARY*.writeSlice(out, state.get().serialize());
12         }
13     }
14     @Override
15     public void deserialize(Block block, int index, CollectListAggregation.CollectState state) {
16         state.set(new CollectListStats(*VARBINARY*.getSlice(block, index)));
17     }
18 }

工厂类:

 1 /*** **/*public class CollectListStatsFactory implements AccumulatorStateFactory<CollectListAggregation.CollectState> {
 2     @Override
 3     public CollectListAggregation.CollectState createSingleState() {
 4         return new SingleState();
 5     }
 6     @Override
 7     public Class<? extends CollectListAggregation.CollectState> getSingleStateClass() {
 8         return SingleState.class;
 9     }
10     @Override
11     public CollectListAggregation.CollectState createGroupedState() {
12         return new GroupState();
13     }
14     @Override
15     public Class<? extends CollectListAggregation.CollectState> getGroupedStateClass() {
16         return GroupState.class;
17     }
18     public static class GroupState implements GroupedAccumulatorState, CollectListAggregation.CollectState {
19         private final ObjectBigArray<CollectListStats> collectStatsList = new ObjectBigArray<>();
20         private long size;
21         private long groupId;
22         @Override
23         public void setGroupId(long groupId) {
24             this.groupId = groupId;
25         }
26         @Override
27         public void ensureCapacity(long size) {
28             collectStatsList.ensureCapacity(size);
29         }
30         @Override
31         public CollectListStats get() {
32             return collectStatsList.get(groupId);
33         }
34         @Override
35         public void set(CollectListStats value) {
36             CollectListStats previous = get();
37             if (previous != null) {
38                 size -= previous.estimatedInMemorySize();
39             }
40             collectStatsList.set(groupId, value);
41             size += value.estimatedInMemorySize();
42         }
43         @Override
44         public long getEstimatedSize() {
45             return size + collectStatsList.sizeOf();
46         }
47     }
48     public static class SingleState implements CollectListAggregation.CollectState{
49         private CollectListStats stats;
50         @Override
51         public CollectListStats get() {
52             return stats;
53         }
54         @Override
55         public void set(CollectListStats value) {
56             stats = value;
57         }
58         @Override
59         public long getEstimatedSize() {
60             if (stats == null) {
61                 return 0;
62             }
63             return stats.estimatedInMemorySize();
64         }
65     }
66 }

2.2.4 采用Slice可以有效提高性能

使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝

不过使用Slice就需要手动控制存储的数据,记录数据的容量,长度,扩容等等。

  1 public class RouteUserAggregationBase {
  2 
  3     //...... 其他定义的静态变量
  4     /**
  5      * Slice State
  6      * 中间数据 Buffer
  7      */
  8     public interface SliceState extends AccumulatorState {
  9         Slice getSlice();
 10     
 11         void setSlice(Slice slice);
 12     }
 13 }
 14 
 15 @AggregationFunction("函数名")
 16 public class RouteUserGroupAggregation extends RouteUserAggregationBase {
 17 
 18     /** 缓存 Buffer Body 的初始字节容量 **/
 19     private static final int STORED_DATA_BODY_INIT_BYTE_SIZE = 64;
 20 
 21     /** 缓存 Buffer 头部元信息定义 **/
 22     private static int VALUES_OFFSET_HEADER_BYTE_LEN = 0;
 23     private static int VALUES_OFFSET_BODY_BYTE_SIZE = 4;
 24     private static int VALUES_OFFSET_BODY_BYTE_USED = 8;
 25 
 26     private static int VALUES_OFFSET_CONTAIN_TARGET_EVENT = 12;
 27 
 28     private static int VALUES_OFFSET_TARGET_EVENT_TYPE = 13;
 29     private static int VALUES_OFFSET_ROUTE_INTERVAL = 17;
 30     private static int VALUES_OFFSET_TARGET_EVENT_LEN = 21;
 31     private static int VALUES_OFFSET_TARGET_EVENT_BYTES = 25;
 32 
 33     @InputFunction
 34     public static void input(SliceState state,
 35                              //目标事件
 36                              @SqlType(StandardTypes.VARCHAR) Slice targetEvent,
 37                              //目标事件类型
 38                              @SqlType(StandardTypes.BIGINT) long targetType,
 39                              //事件间隔
 40                              @SqlType(StandardTypes.BIGINT) long eventInterval,
 41                              //当前事件名
 42                              @SqlType(StandardTypes.VARCHAR) Slice currEvent,
 43                              //当前事件时间
 44                              @SqlType(StandardTypes.BIGINT) long eventTime) {
 45 
 46         handleInput(state, targetEvent, (int) targetType, (int) eventInterval, currEvent, (int) eventTime, null, null);
 47     }
 48 
 49     private static void handleInput(SliceState state, Slice targetEvent, int targetType, int eventInterval, Slice currEvent, int eventTime, Slice groupByEvent, Slice groupByProp) {
 50         // 获取缓存的数据
 51         Slice storedData = state.getSlice();
 52 
 53         // 初始化缓存的元信息 不会变化的值,如:目标事件,目标类型,时间间隔
 54         if (storedData == null) {
 55             /*
 56                     Header byte大小
 57                     Body 总字节大小
 58                     Body 已使用字节大小
 59                     是否包含目标事件
 60                     目标事件类型
 61                     事件时间间隔
 62              */
 63             int headerByteLen = SizeOf.SIZE_OF_INT
 64                     + SizeOf.SIZE_OF_INT
 65                     + SizeOf.SIZE_OF_INT
 66                     + SizeOf.SIZE_OF_BYTE
 67                     + SizeOf.SIZE_OF_INT
 68                     + SizeOf.SIZE_OF_INT
 69                     ;
 70             int targetLength = SizeOf.SIZE_OF_INT + targetEvent.length();
 71             headerByteLen += targetLength;
 72 
 73             storedData = Slices.allocate(headerByteLen + STORED_DATA_BODY_INIT_BYTE_SIZE);
 74             storedData.setInt(VALUES_OFFSET_HEADER_BYTE_LEN, headerByteLen);
 75             storedData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, STORED_DATA_BODY_INIT_BYTE_SIZE);
 76             storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, 0);
 77             //是否包含目标事件
 78             storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 0);
 79             //缓存 不变的参数
 80             storedData.setInt(VALUES_OFFSET_TARGET_EVENT_TYPE, targetType);
 81             storedData.setInt(VALUES_OFFSET_ROUTE_INTERVAL, eventInterval);
 82 
 83             storedData.setInt(VALUES_OFFSET_TARGET_EVENT_LEN, targetEvent.length());
 84             storedData.setBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetEvent);
 85         }
 86 
 87         int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
 88         int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE);
 89         int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
 90 
 91         // 标记包含目标事件
 92         if (currEvent.toStringUtf8().equals(targetEvent.toStringUtf8())) {
 93             storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 1);
 94         }
 95         //直接判断,如果存在分组,判断当前事件就是分组事件,那么直接将分组值和事件拼接在一起
 96         if (groupByEvent != null && groupByEvent.toStringUtf8().equals(currEvent.toStringUtf8())) {
 97             String newEventKey = currEvent.toStringUtf8() + EVENT_CONCAT_GROUP_VALUE + groupByProp.toStringUtf8();
 98             currEvent = Slices.utf8Slice(newEventKey);
 99         }
100 
101         //扩展的长度,eventTime int , current length的int bytes内容
102         int entryByteLen = SizeOf.SIZE_OF_INT * 2 + currEvent.length();
103         if (bodyByteUsed + entryByteLen > bodyByteSize) {
104             // 扩容 byteSize * 2
105             int newBodyByteSize = bodyByteSize * 2;
106             Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize);
107             //将storeData的数据copy到new的Slice中,然后重新设置容量
108             newStoredData.setBytes(0, storedData.getBytes());
109             newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize);
110             storedData = newStoredData;
111         }
112         //写入位置的定位
113         int writePos = headerByteLen + bodyByteUsed;
114         storedData.setInt(writePos, entryByteLen);
115         writePos += SizeOf.SIZE_OF_INT;
116         storedData.setInt(writePos, eventTime);
117         writePos += SizeOf.SIZE_OF_INT;
118         storedData.setBytes(writePos, currEvent);
119         storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, bodyByteUsed + entryByteLen);
120         // 更新缓存的数据
121         state.setSlice(storedData);
122     }
123 
124 
125     @CombineFunction
126     public static void combine(SliceState state, SliceState other) {
127         // 获取缓存的数据
128         Slice storedData = state.getSlice();
129         Slice otherStoredData = other.getSlice();
130 
131         // 合并缓存
132         if (storedData == null) {
133             state.setSlice(otherStoredData);
134         } else {
135             int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
136             int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE);
137             int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
138             int otherHeaderByteLen = otherStoredData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
139             int otherBodyByteSize = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE);
140             int otherBodyByteUsed = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
141             byte containTargetEvent = 0;
142             if (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1 || otherStoredData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1) {
143                 containTargetEvent = 1;
144             }
145             Slice finalStoredData;
146             int finalBodyByteUsed = bodyByteUsed + otherBodyByteUsed;
147             if (bodyByteSize >= finalBodyByteUsed) {
148                 // 左容量足够  这里只copy header之外的数据,就是当前事件和time
149                 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed);
150                 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed);
151                 finalStoredData = storedData;
152             } else if (otherBodyByteSize >= finalBodyByteUsed) {
153                 // 右容量足够
154                 otherStoredData.setBytes(otherHeaderByteLen + otherBodyByteUsed, storedData, headerByteLen, bodyByteUsed);
155                 otherStoredData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed);
156                 finalStoredData = otherStoredData;
157             } else {
158                 // 扩容
159                 int newBodyByteSize = bodyByteSize;
160                 while (newBodyByteSize < finalBodyByteUsed) {
161                     newBodyByteSize *= 2;
162                 }
163                 Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize);
164                 newStoredData.setBytes(VALUES_OFFSET_HEADER_BYTE_LEN, storedData.getBytes());
165                 newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize);
166                 storedData = newStoredData;
167 
168                 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed);
169                 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed);
170                 finalStoredData = storedData;
171             }
172             // 是否包含目标事件
173             finalStoredData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, containTargetEvent);
174             state.setSlice(finalStoredData);
175         }
176     }
177 
178     @OutputFunction(StandardTypes.VARCHAR)
179     public static void output(@AggregationState SliceState state, BlockBuilder out) {
180         // 获取缓存数据
181         Slice storedData = state.getSlice();
182 
183         // 数据为空,或者没有起始事件
184         if ((storedData == null) || (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 0)) {
185             out.appendNull();
186             return;
187         }
188         //匹配
189         Slice makeRoute = makeRoute(storedData);
190         if (makeRoute == null) {
191             out.appendNull();
192         } else {
193             VarcharType.VARCHAR.writeSlice(out, makeRoute);
194         }
195     }
196 
197     private static Slice makeRoute(Slice storedData) {
198         // 获取 Header 信息
199         int interval = storedData.getInt(VALUES_OFFSET_ROUTE_INTERVAL);
200         int targetType = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_TYPE);
201         int targetLength = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_LEN);
202         String targetEvent = new String(storedData.getBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetLength));
203         List<Slice> timeEventSeries = new ArrayList<>();
204         int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN);
205         int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED);
206         int bound = headerByteLen + bodyByteUsed;
207         int idx = headerByteLen;
208         while (idx < bound) {
209             //获取每个事件数据 time,事件名
210             int entryByteLen = storedData.getInt(idx);
211             Slice entry = storedData.slice(idx + SizeOf.SIZE_OF_INT, entryByteLen - SizeOf.SIZE_OF_INT);
212             idx += entryByteLen;
213             timeEventSeries.add(entry);
214         }
215         //处理逻辑
216         ......
217         // 构造返回结果
218         Slice result = null;
219         if (routes.size() > 0) {
220             for (String route : routes) {
221                 Slice routeSlice = Slices.utf8Slice(route);
222                 Slice routeInfo = Slices.allocate(SizeOf.SIZE_OF_INT + routeSlice.length());
223                 routeInfo.setInt(0, routeSlice.length());
224                 routeInfo.setBytes(4, routeSlice);
225                 if (result == null) {
226                     result = routeInfo;
227                 } else {
228                     Slice newSlice = Slices.allocate(result.length() + routeInfo.length());
229                     newSlice.setBytes(0, result);
230                     newSlice.setBytes(result.length(), routeInfo, 0, routeInfo.length());
231                     result = newSlice;
232                 }
233             }
234         }
235         return result;
236     }
237 }

原文地址:https://www.cnblogs.com/lrxvx/p/12558902.html