5分钟Flink - 流处理API转换算子集合

时间:2022-07-24
本文章向大家介绍5分钟Flink - 流处理API转换算子集合,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

本文总结了Flink Streaming的算子操作,统统简单实现一次算子操作类型,更加熟悉了Flink带来的便利,有时间可以浏览一次,理解一次,后面具体使用的时候,可以进行查看

Operators将一个或多个DataStream转换为新的DataStream。Flink程序可以将多种转换组合成复杂的数据流拓扑。

版本:Flink 1.10.0 语言:Scala

以下实现都使用了Scala语言,有需要Java版本的,可以直接官网查看

下面包含三部分,分别为

a. DataStream Transformations

b. Physical partitioning

c. Task chaining and resource groups

1. DataStream Transformations

Map DataStream → DataStream

取一个元素并产生一个元素。一个映射函数,将输入流的值加倍:

dataStream.map { x => x * 2 }

FlatMap DataStream → DataStream

取一个元素并产生零个,一个或多个元素。平面图功能,可将句子拆分为单词:

dataStream.flatMap { str => str.split(" ") }
Filter DataStream → DataStream

为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。过滤出零值的过滤器:

dataStream.filter { _ != 0 }
KeyBy DataStream → KeyedStream

在逻辑上将流划分为不相交的分区,每个分区都包含同一键的元素。在内部,这是通过哈希分区实现的。此转换返回KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce KeyedStream → DataStream

对键控数据流进行“滚动”压缩。将当前元素与最后一个减小的值合并并发出新值。一个reduce函数,用于创建部分和流

keyedStream.reduce { _ + _ }
Fold KeyedStream → DataStream

带有初始值的键控数据流上的“滚动”折叠。将当前元素与上一个折叠值组合在一起并发出新值。折叠函数,应用于序列(1,2,3,4,5)时,会发出序列“ start-1”,“ start-1-2”,“ start-1-2-3”,...根据相同的Key进行不断的折叠,新的key会进行新的折叠

val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations KeyedStream → DataStream

在键控数据流上滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(与max和maxBy相同).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window KeyedStream → WindowedStream

可以在已经分区的KeyedStreams上定义Windows。Windows根据某些特征将每个键中的数据分组(例如,最近5秒钟内到达的数据).

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAll DataStream → AllWindowedStream

Windows可以在常规DataStreams上定义。Windows会根据某些特征(例如,最近5秒钟内到达的数据)对所有流事件进行分组。警告:*在许多情况下,这是非并行*转换。所有记录将被收集到windowAll运算符的一项任务中.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply WindowedStream → DataStream AllWindowedream → DataStream

将一般功能应用于整个窗口。下面是一个手动求和窗口元素的函数。注意:如果使用windowAll转换,则需要使用AllWindowFunction代替.

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce WindowedStream → DataStream

将功能化约简函数应用于窗口并返回缩减后的值.

windowedStream.reduce { _ + _ }
Window Fold WindowedStream → DataStream

Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows WindowedStream → DataStream

聚合窗口的内容。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(与max和maxBy相同).

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union DataStream* → DataStream

两个或多个数据流的并集,创建一个包含所有流中所有元素的新流。注意:如果您将数据流与其自身合并,则在结果流中每个元素将获得两次.

dataStream.union(otherStream1, otherStream2, ...)
Window Join DataStream,DataStream → DataStream

在给定键和公共窗口上连接两个数据流

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }
Window CoGroup DataStream,DataStream → DataStream
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}
Connect DataStream,DataStream → ConnectedStreams

“连接”两个保持其类型的数据流,从而允许两个流之间共享状态.注意1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。2. Connect只能操作两个流,Union可以操作多个。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap ConnectedStreams → DataStream

与连接的数据流上的map和flatMap相似

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)
Split DataStream → SplitStream

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)
Select SplitStream → DataStream

从拆分流中选择一个或多个流.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate DataStream → IterativeStream → DataStream

通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码从流开始,并连续应用迭代主体。大于0的元素将被发送回反馈通道,其余元素将被转发到下游.

initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}
Extract Timestamps DataStream → DataStream

从记录中提取时间戳,以便与使用事件时间语义的窗口一起使用。参见事件时间.

stream.assignTimestamps { timestampExtractor }

2. Physical partitioning

Custom partitioning DataStream → DataStream

使用用户定义的分区程序为每个元素选择目标任务.

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random partitioning DataStream → DataStream

根据均匀分布对元素进行随机划分.

dataStream.shuffle()
Rebalancing (Round-robin partitioning) DataStream → DataStream

分区元素轮循,每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用.

dataStream.rebalance()
Rescaling DataStream → DataStream

将元素循环地分区到下游操作的子集。如果您希望拥有管道,例如,从源的每个并行实例散开到几个映射器的子集以分配负载,但又不希望 rebalance() 引起完全的重新平衡,则这很有用。这将仅需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,例如TaskManager的插槽数。上游操作向其发送元素的下游操作的子集取决于两个上游操作的并行度和下游操作。例如,如果上游操作具有并行性2,而下游操作具有并行性4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。彼此不是整数倍,一个或几个下游操作将具有与上游操作不同的输入数量。请参见此图以查看上例中的连接模式:

dataStream.rescale()
Broadcasting DataStream → DataStream

向每个分区广播元素.

dataStream.broadcast()

3. Task chaining and resource groups

Start new chain

从此运算符开始,开始新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器.

someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining

禁止将链路OperateChain的连接操作

someStream.map(...).disableChaining()
Set slot sharing group

设置操作的插槽共享组。Flink会将具有相同插槽共享组的操作放入同一插槽,同时将没有插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一插槽共享组中,则插槽共享组将从输入操作继承。默认插槽共享组的名称为“ default”,可以通过调用slotSharingGroup(“ default”)将操作显式放入该组中。.

someStream.filter(...).slotSharingGroup("name")

作者:Johngo