5分钟Flink - 流处理API转换算子集合
本文总结了Flink Streaming的算子操作,统统简单实现一次算子操作类型,更加熟悉了Flink带来的便利,有时间可以浏览一次,理解一次,后面具体使用的时候,可以进行查看
Operators将一个或多个DataStream转换为新的DataStream。Flink程序可以将多种转换组合成复杂的数据流拓扑。
版本:Flink 1.10.0 语言:Scala
以下实现都使用了Scala语言,有需要Java版本的,可以直接官网查看
下面包含三部分,分别为
a. DataStream Transformations
b. Physical partitioning
c. Task chaining and resource groups
1. DataStream Transformations
Map DataStream → DataStream
取一个元素并产生一个元素。一个映射函数,将输入流的值加倍:
dataStream.map { x => x * 2 }
FlatMap DataStream → DataStream
取一个元素并产生零个,一个或多个元素。平面图功能,可将句子拆分为单词:
dataStream.flatMap { str => str.split(" ") }
Filter DataStream → DataStream
为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。过滤出零值的过滤器:
dataStream.filter { _ != 0 }
KeyBy DataStream → KeyedStream
在逻辑上将流划分为不相交的分区,每个分区都包含同一键的元素。在内部,这是通过哈希分区实现的。此转换返回KeyedStream
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce KeyedStream → DataStream
对键控数据流进行“滚动”压缩。将当前元素与最后一个减小的值合并并发出新值。一个reduce函数,用于创建部分和流
keyedStream.reduce { _ + _ }
Fold KeyedStream → DataStream
带有初始值的键控数据流上的“滚动”折叠。将当前元素与上一个折叠值组合在一起并发出新值。折叠函数,应用于序列(1,2,3,4,5)时,会发出序列“ start-1”,“ start-1-2”,“ start-1-2-3”,...根据相同的Key进行不断的折叠,新的key会进行新的折叠
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations KeyedStream → DataStream
在键控数据流上滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(与max和maxBy相同).
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window KeyedStream → WindowedStream
可以在已经分区的KeyedStreams上定义Windows。Windows根据某些特征将每个键中的数据分组(例如,最近5秒钟内到达的数据).
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAll DataStream → AllWindowedStream
Windows可以在常规DataStreams上定义。Windows会根据某些特征(例如,最近5秒钟内到达的数据)对所有流事件进行分组。警告:*在许多情况下,这是非并行*转换。所有记录将被收集到windowAll运算符的一项任务中.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply WindowedStream → DataStream AllWindowedream → DataStream
将一般功能应用于整个窗口。下面是一个手动求和窗口元素的函数。注意:如果使用windowAll转换,则需要使用AllWindowFunction代替.
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce WindowedStream → DataStream
将功能化约简函数应用于窗口并返回缩减后的值.
windowedStream.reduce { _ + _ }
Window Fold WindowedStream → DataStream
Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(与max和maxBy相同).
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union DataStream* → DataStream
两个或多个数据流的并集,创建一个包含所有流中所有元素的新流。注意:如果您将数据流与其自身合并,则在结果流中每个元素将获得两次.
dataStream.union(otherStream1, otherStream2, ...)
Window Join DataStream,DataStream → DataStream
在给定键和公共窗口上连接两个数据流
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup DataStream,DataStream → DataStream
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
Connect DataStream,DataStream → ConnectedStreams
“连接”两个保持其类型的数据流,从而允许两个流之间共享状态.注意1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。2. Connect只能操作两个流,Union可以操作多个。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap ConnectedStreams → DataStream
与连接的数据流上的map和flatMap相似
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Split DataStream → SplitStream
Split the stream into two or more streams according to some criterion.
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select SplitStream → DataStream
从拆分流中选择一个或多个流.
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码从流开始,并连续应用迭代主体。大于0的元素将被发送回反馈通道,其余元素将被转发到下游.
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps DataStream → DataStream
从记录中提取时间戳,以便与使用事件时间语义的窗口一起使用。参见事件时间.
stream.assignTimestamps { timestampExtractor }
2. Physical partitioning
Custom partitioning DataStream → DataStream
使用用户定义的分区程序为每个元素选择目标任务.
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random partitioning DataStream → DataStream
根据均匀分布对元素进行随机划分.
dataStream.shuffle()
Rebalancing (Round-robin partitioning) DataStream → DataStream
分区元素轮循,每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用.
dataStream.rebalance()
Rescaling DataStream → DataStream
将元素循环地分区到下游操作的子集。如果您希望拥有管道,例如,从源的每个并行实例散开到几个映射器的子集以分配负载,但又不希望 rebalance() 引起完全的重新平衡,则这很有用。这将仅需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,例如TaskManager的插槽数。上游操作向其发送元素的下游操作的子集取决于两个上游操作的并行度和下游操作。例如,如果上游操作具有并行性2,而下游操作具有并行性4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。彼此不是整数倍,一个或几个下游操作将具有与上游操作不同的输入数量。请参见此图以查看上例中的连接模式:
dataStream.rescale()
Broadcasting DataStream → DataStream
向每个分区广播元素.
dataStream.broadcast()
3. Task chaining and resource groups
Start new chain
从此运算符开始,开始新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器.
someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining
禁止将链路OperateChain的连接操作
someStream.map(...).disableChaining()
Set slot sharing group
设置操作的插槽共享组。Flink会将具有相同插槽共享组的操作放入同一插槽,同时将没有插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一插槽共享组中,则插槽共享组将从输入操作继承。默认插槽共享组的名称为“ default”,可以通过调用slotSharingGroup(“ default”)将操作显式放入该组中。.
someStream.filter(...).slotSharingGroup("name")
作者:Johngo
- 响应式编程的实践
- S2-045 原理初步分析(CVE-2017-5638)
- 如何在HDFS上查看YARN历史作业运行日志
- 面向流的设计思想
- 天才第一步 Docker 纸尿裤
- 如何迁移Kudu1.2的WAL和Data目录
- WordPress REST API 内容注入漏洞分析
- 如何为Kerberos环境的CDH集群在线扩容数据节点
- WinDbg 漏洞分析调试(一)
- MySQL MVCC(多版本控制)
- 如何在集群外节点跨网段向HDFS写数据
- 如何使用HAProxy实现Kerberos环境下的Impala负载均衡
- Hue中使用Oozie创建Shell工作流在脚本中切换不同用户
- Python学习(七):模块 优雅的封装
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 基于Android studio3.6的JNI教程之ncnn人脸检测mtcnn功能
- Kotlin 使用Lambda来设置回调的操作
- Kotlin之自定义 Live Templates详解(模板代码)
- Android Studio设置颜色拾色器工具Color Picker教程
- Kotlin中常见的符号详解
- Kotlin中实体类的创建方式
- Android自定义流式布局/自动换行布局实例
- Android 中 MD5 的几种生成方式(小结)
- Flutter 日期时间DatePicker控件及国际化
- 解决Kotlin 类在实现多个接口,覆写多个接口中相同方法冲突的问题
- Kotlin 匿名类实现接口和抽象类的区别详解
- android实现微信朋友圈发布动态功能
- 基于Android studio3.6的JNI教程之helloworld思路详解
- 基于Android studio3.6的JNI教程之opencv实例详解
- AndroidStudio代码达到指定字符长度时自动换行实例