Flink 批处理算子详解
时间:2022-07-22
本文章向大家介绍Flink 批处理算子详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
批处理程序的结果
- 获取运行时
val env = ExecutionEnvironment.getExecutionEnvironment
- 添加Source
val text = env.fromElements("who's there","I think I hear")
- 定义算子转换函数
text.flatMap{_.toLowerCase.split("\w+") filter(_.nonEmpty)}
.map((_,1))
.groupBy(0)
.sum(1)
- 定义Sink
counts.print();
- 启动程序
env.execute("Kafka Dataset WordCount")
source 定义
// 递归定义整个目录下的所有文件
val parameter = new Configuration
parameter.setBoolean("recursive.file.enumeration",true)
env.readTextFile("file://path/with/files").withParameters(parameter)
算子
Aggregate
val input: DataSet[(Int,String,Double)] = env.fromElements(
(1,"hello",4),
(1,"hello",5),
(2,"hello",5),
(3,"word",6),
(3,"word",6)
)
val value = input.groupBy(1).aggregate(Aggregations.SUM,0).aggregate(Aggregations.MIN,2)
连接
连接分为内连接和外连接,外连接分为左外连接,右外连接和内连接
val input1 = env.fromElements((1,"hello"),(2,"hello"))
val input2 = env.fromElements(("hello",1),("word",2))
val result = input1.join(input2).where(0).equalTo(1)
广播变量
- 动态数据共享。 算子间共享输入和配置参数是静态的,广播变量共享的数据是动态的
广播变量编程步骤:
(1)创建广播变量。
val toBroadcast = env.fromElements(1,2,3);
(2) 注册广播变量
利用 RichFunction 自定义算子函数,注册广播变量
val toBroadcast = env.fromElements(1,2,3);
val toBroadcast:DataSet[Int] = env.fromElements(1,2,3);
toBroadcast.map(new RichMapFunction[String,String]() {
var broadcastSet = null
override def open(parameters: Configuration): Unit = {
// 读取广播变量
broadcastSet = getRuntimeContext.getBroadcastVariable[String]("broadcastSetName").get(0)
}
override def map(value: String): String = {
}
// 注册广播变量
}).withBroadcastSet(toBroadcast,"broadcastSetName");
- 用数据来聊聊国产电影~
- 如何买卖股票?不要慌,我有妙招!
- 2017.11.7解题报告
- TensorFlow从0到1 - 11 - 74行Python实现手写体数字识别
- 让priority_queue支持小根堆的几种方法
- 一招解决4道leetcode hard题,动态规划在字符串匹配问题中的应用
- 细数Python中的数据类型以及他们的方法
- 洛谷 P3807 【模板】卢卡斯定理
- 数据城堡参赛代码实战篇(六)---使用sklearn进行数据标准化及参数寻优
- 震惊!Vector两行代码求逆序对,六行代码过普通平衡树
- 数据城堡参赛代码实战篇(五)---使用sklearn解决分类问题
- 洛谷P1894 [USACO4.2]完美的牛栏The Perfect Stall
- [编程经验]Python生成器、迭代器与yield语句小结
- TensorFlow从0到1 - 12 - TensorFlow构建3层NN玩转MNIST
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法