Flink 流计算算子函数详解

时间:2022-07-22
本文章向大家介绍Flink 流计算算子函数详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方

  1. Map, FlaMap 做一对一,一对多映射
  2. Reuce 多对一进行聚合
  3. 聚合函数,sum,min,minBy,MaxBy 等
  4. keyBy 按Key进行分组 名字不一样但是操作一样。
  1. 窗口函数: 窗口函数用于对每一个key开窗口,windowsAll 全体元素开窗口
text.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
text.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

窗口函数实际上分为滚动时间窗口,滑动时间窗口,会话窗口

滚动时间窗口不会发生重叠, 滑动时间窗口,当步长小于窗口大小,就会重叠。

会话窗口是根据相邻时间间隔确定窗口边界

全局窗口必须定义触发器

在窗口内也可以进行其他的操作

  1. 窗口连接

两个数据源相同窗口内的连接

text.join(windowCounts)
        .where()
        .equalTo()
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply((e1,e2) => e1 + "," + e2)
0 1      2
0 1   2  3

0,1 0,1 1,0 1,0      2,2  3,2   一个窗口一个窗口内连接

间隔连接,以一个元素去连接一个窗口形成锥形

  text.keyBy(0).intervalJoin(windowCounts.keyBy(0))
      .between(Time.milliseconds(-2),Time.milliseconds(2))
        .process(new ProcessJoinFunction[Integer,Integer,String] {
          override def processElement(in1: Integer, in2: Integer, context: ProcessJoinFunction[Integer, Integer, String]#Context, collector: Collector[String]): Unit = {
            collector.collect(in1 + "," + in2)
          }
        })
    ```
    ```
0  1         6

0     2     3

以2 进行聚合 2,0 2,1 
  1. 数据分区

数据分区的好处是,如果分区数和算子数一致,则他们会直接运行到一个节点,通过内存进行传输,减少网络带宽的压力

自定义分区 :

text.partitionCustom(partitioner,"key")

使用shuffle() 进行均匀分区

text.shuffle()`

使用负载均衡的轮询调度算法进行数据分区

text.rebalance

可伸缩动态分区,使数据尽可能在一个slot内流转,减少网络开销

dataStream.rescale()

广播分区,每一个元素广播到下一个节点

text.broadcast()
  1. 资源共享

Flink 将多个任务连接成一个任务在一个线程中执行,以实现资源共享

(1) 创建链, 开启作业优化

dataStream.map(..).map(...).startNewChain().map(...)

(2) Slot共享组

在同一个组所有任务在同一个实例中运行

dataStream.map(...).slotSharingGroup("name")

(3) 关闭作业优化

dataStream.map(...).disableChaining()
  1. RichFunction函数

处理函数生命周期和获取函数上下文能力的算子

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    private static final long serialVersionUID = 1L;
    private transient RuntimeContext runtimeContext;

    public AbstractRichFunction() {
    }
    
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }
    // 得到上下文函数 
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext == null) {
            throw new IllegalStateException("The runtime context has not been initialized.");
        } else if (this.runtimeContext instanceof IterationRuntimeContext) {
            return (IterationRuntimeContext)this.runtimeContext;
        } else {
            throw new IllegalStateException("This stub is not part of an iteration step function.");
        }
    }
    // 自定义初始化和销毁函数
    public void open(Configuration parameters) throws Exception {
    }

    public void close() throws Exception {
    }
}
  1. 触发器

基于事件的触发器

(1)onElement 窗口没收到一个元素,调用该方法

(2)onProcessingTime 根据注册处理时间进行触发,定时可以参数设定

(3)onEventTime 根据注册事件时间进行触发,定时可以参数设定

(4)onMerge 两个窗口合并时触发

  1. 清除器

在触发器后函数执行窗口前或者后执行清除的操作

evictor()可以在触发器后,窗口执行前或者后都可以触发

  1. 状态分类
val env = StreamExecutionEnvironment.getExecutionEnvironment()

env.setStateBackend(...)

设置状态后端,内存,JVM堆内存,JVM堆外内存,

9.检查点

检查点是Flink实现 exactly-once 语义的核心机制,启用检测点,需要:

(1) 支持时空穿梭的外部数据源, kafka 和 分布式文件系统

(2) 可持久化状态的外部存储, 如分布式文件系统。

检查点默认是关闭的,启用检查点需要配置 一致性级别, exactly-once

检测超时时间,

Kafka进行流计算实例

  1. 创建连接器

添加kafka source

  // 设置配置文件
          val properties = new Properties()
          properties.setProperty("bootstraps.servers","localhost:9092")
          properties.setProperty("zookeeper.connect","localhost:2181")

          // 设置消费组
          properties.setProperty("group.id","test")
          val myConsumer = new FlinkKafkaConsumer09[String]("topic",
                           new SimpleStringSchema(),properties)
          stream = env.addSource(myConsumer)
  1. 创建反序列化器
// https://mvnrepository.com/artifact/org.apache.flink/flink-avro
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.7.1'
  1. 设置消息起始位置的偏移 设置 据上一次的偏移位置
myConsumer.setStartFromGroupOffsets()

// 从最早和最晚开始记录

myConsumer.setStartFromEarliest()

myConsumer.setStartFromLatest()

// 从固定的时间点开始

myConsumer.setStartFromTimestamp(23L)

// 设置分区的起始位置

val specificStartOffsets = new java.util.HashMap[]
  1. 设置检查点
env.enableCheckpointing(5000)