flink cep 案例之机架温度监控报警

时间:2022-07-25
本文章向大家介绍flink cep 案例之机架温度监控报警,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

FlinkCEP是在Flink之上实现的复杂事件处理库。它提供了丰富的API,允许您在不停止的事件流中检测事件模式,并对复杂事件做相应处理。模式匹配是复杂事件处理的一个有力的保障,应用场景包括受一系列事件驱动的各种业务流程,例如在正常的网略行为中侦测异常行为;在金融应用中查找价格、交易量和其他行为的模式。

特点:

  • 复杂性:多个流join,窗口聚合,事件序列或patterns检测
  • 低延迟:秒或毫秒级别,比如做信用卡盗刷检测,或攻击检测
  • 高吞吐:每秒上万条消息

在这篇博客中,我们将通过一个案例来讲解flink CEP的使用。 案例来源于官网博客:https://flink.apache.org/news/2016/04/06/cep-monitoring.html

输入事件流由来自一组机架的温度和功率事件组成。目标是检测 当机架过热时我们需要发出警告和报警。

我们通过自定义的source来模拟生成机架的温度,然后定义以下的规则来生成警告和报警

  • 警告:某机架在10秒内连续两次上报的温度超过阈值;
  • 报警:某机架在20秒内连续两次匹配警告;

首先我们定义一个监控事件

注意要重写里面的hashcode方法和equal方法

来自官网:The events in the DataStream to which you want to apply pattern matching must implement proper equals() and hashCode() methods because FlinkCEP uses them for comparing and matching events.

public abstract class MonitoringEvent {
    private int rackID;

    public MonitoringEvent(int rackID) {
        this.rackID = rackID;
    }

    public int getRackID() {
        return rackID;
    }

    public void setRackID(int rackID) {
        this.rackID = rackID;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof MonitoringEvent) {
            MonitoringEvent monitoringEvent = (MonitoringEvent) obj;
            return monitoringEvent.canEquals(this) && rackID == monitoringEvent.rackID;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return rackID;
    }

    public boolean canEquals(Object obj) {
        return obj instanceof MonitoringEvent;
    }
}




public class TemperatureEvent extends MonitoringEvent {
    private double temperature;
    ...
}

public class PowerEvent extends MonitoringEvent {
    private double voltage;
    ...
}

我们通过自定义的source来模拟生成MonitoringEvent数据。



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Input stream of monitoring events
        DataStream<MonitoringEvent> inputEventStream = env
                .addSource(new MonitoringEventSource(
                        MAX_RACK_ID,
                        PAUSE,
                        TEMPERATURE_RATIO,
                        POWER_STD,
                        POWER_MEAN,
                        TEMP_STD,
                        TEMP_MEAN))
                .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

接下来定义模式,在10秒钟之内连续两个event的温度超过阈值


       // Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
        // appearing within a time interval of 10 seconds
        Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
                .subtype(TemperatureEvent.class)
                .where(new IterativeCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                    }
                })
                .next("second")  //紧接着上一个事件
                
                
                .subtype(TemperatureEvent.class)
                .where(new IterativeCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                    }
                })
                .within(Time.seconds(10));
                

使用报警模式和输入流生成模式流

     // Create a pattern stream from our warning pattern
        PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
                inputEventStream.keyBy("rackID"),
                warningPattern);

使用select方法为每个匹配的报警模式生成相应的报警。其中返回值是一个map,key是我们定义的模式,value是匹配的事件列表。


    // Generate temperature warnings for each matched warning pattern
        DataStream<TemperatureWarning> warnings = tempPatternStream.select(
                (Map<String, List<MonitoringEvent>> pattern) -> {
                    TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
                    TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);

                    return new TemperatureWarning(first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2);
                }
        );

以上我们最后生成了相应的用于警告的DataStream类型的数据流warnings,接下来我们使用这个警告流来生成我们的报警流,即在20秒内连续两次发生警告。


   // Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
        Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
                .next("second")
                .within(Time.seconds(20));

然后通过上面的报警模式alertPattern和警告流warnings生成我们的报警流alertPatternStream。


   // Create a pattern stream from our alert pattern
        PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
                warnings.keyBy("rackID"),
                alertPattern);

最后当收集到的两次警告中,第一次警告的平均温度小于第二次的时候,生成报警,封装TemperatureAlert信息返回。


  // Generate a temperature alert only if the second temperature warning's average temperature is higher than
        // first warning's temperature
        DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
                (Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
                    TemperatureWarning first = pattern.get("first").get(0);
                    TemperatureWarning second = pattern.get("second").get(0);

                    if (first.getAverageTemperature() < second.getAverageTemperature()) {
                        out.collect(new TemperatureAlert(first.getRackID()));
                    }
                },
                TypeInformation.of(TemperatureAlert.class));

最后我们将报警流和警告流输出,当然我们也可以对这两个流做其他的操作,比如发到报警系统等。

   // Print the warning and alert events to stdout
        warnings.print();
        alerts.print();

参考: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html [2] https://flink.apache.org/news/2016/04/06/cep-monitoring.html