Flink之Transform操作
时间:2020-01-09
本文章向大家介绍Flink之Transform操作,主要包括Flink之Transform操作使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //设置全局并行度为1
import org.apache.flink.api.scala._
val streamFromFile = env.readTextFile("sensor1.txt")
//1.基本转换算子和简单聚合算子
val dataStream = streamFromFile.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
//注意观察结果,flink是来一条数据处理一条,所以不会只看到最终求和后的结果
dataStream.keyBy(0).sum(2).print()
dataStream.keyBy("id").sum("temperature").print() //方式二
//例子:输出当前传感器最新的温度加10,而时间戳是上一次数据的时间戳加1
dataStream.keyBy(0).reduce((x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10)).print() //x和y分别代表当前值和新来的值
//2.多流转换算子
//split分流
val splitStream = dataStream.split(sensorData => {
if (sensorData.temperature > 30)
Seq("high")
else
Seq("low")
})
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")
high.print("high temperature")
low.print("low temperature")
all.print("all")
//合并: connect和union
/*
* 1.union之前两个流的类型必须是一样, connect可以不一样, 在之后的coMap中再去调整成为一样的
* 2. connect只能操作两个流, union可以操作多个
*/
val warning = high.map(x => (x.id, x.temperature))
val connectedStream = warning.connect(low)
val coMap = connectedStream.map(
warningData => (warningData._1, warningData._2, "warning"),
safeData => (safeData.id, "safe")
)
coMap.print()
val unionStream = high.union(low)
unionStream.print()
// streamFromFile.map(data => {
// val len = data.split(",")
// len(0) + " " + len(1)
// }).print()
//自定义函数类
dataStream.filter(new MyFilter).print()
env.execute("transform test")
}
}
class MyFilter() extends FilterFunction[SensorReading] {
override def filter(t: SensorReading): Boolean = {
t.id.startsWith("sensor_1")
}
}
原文地址:https://www.cnblogs.com/wddqy/p/12172284.html
- ADO.NET入门教程(二)了解.NET数据提供程序
- ADO.NET入门教程(三) 连接字符串,你小觑了吗?
- ADO.NET入门教程(四) 品味Connection对象
- ADO.NET入门教程(五) 细说数据库连接池
- ADO.NET入门教程(六) 谈谈Command对象与数据检索
- ADO.NET入门教程(七) 谈谈Command对象高级应用
- ADO.NET入门教程(八) 深入理解DataAdapter(上)
- 深入理解DIP、IoC、DI以及IoC容器
- C#委托使用详解(Delegates)
- LINC switch系列之安装指南
- 设计模式成长记(一) 抽象工厂模式(Abstract Factory)
- 基于Open vSwitch的VxLAN隧道实验网络
- 来来来,快来围观那个Kotlin
- 怎样提高网站访问速度缩短网页加载时间
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使用Apache commons-cli包进行命令行参数解析的示例代码
- 详解如何在Ubuntu 16.04上增加Swap分区
- Mac本地文件上传到CentOS云服务器方法
- linux中把.c的文件编译成.so文件
- Ubuntu16.04 中 locate文件查找命令
- Ubuntu 16.04与Apache虚拟主机配置的步骤详解
- Linux删除目录下的文件的10种方法小结
- 利用Linux防火墙隔离本地欺骗地址的方法详解
- 视图在SQL中的作用是什么,它是怎样工作的?
- Linux命令行上如何使用日历详解
- 在Linux下修改和重置root密码的方法(超简单)
- 在Centos上搭建Maven中央仓库的方法
- 详解Ubuntu16.04启动器图标异常解决方法
- Linux(ubuntu)下实现增加/删除文件权限
- Vim中文件编码处理与重新打开乱码文件详解