flink的DataStreamAPI
一、WordCount流程
1 import org.apache.flink.streaming.api.scala._ 2 3 object StreamWordCount { 4 def main(args:Array[String]):Unit={ 5 //创建流处理的执行环境 6 val env=StreamExecutionEnvironment.getExecutionEnvironment; 7 8 //接受一个socket文本流即创建数据源 9 val dataStream=env.socketTextStream("localhost",7777); 10 11 //对每条数据进行处理 12 val wordCountDataStream=dataStream.flatMap(_.split(" ")) 13 .filter(_.nonEmpty) 14 .map(line=>(line,1)) 15 .keyBy(line=>line._1) 16 .sum(1); 17 //输出结果,可以直接输出也可以将处理的结果存储到外部系统中如kafka 18 wordCountDataStream.print(); 19 //flink的操作是惰性的,需要启动executor。 20 env.execute("stream WC job") 21 } 22 }
二、流程解析:env —> transform —> source
(1)创建环境Environment
1、getExecutionEnvironment:创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
val env= ExecutionEnvironment.getExecutionEnvironment,如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
2、createLocalEnvironment:返回本地执行环境,需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1),返回本地执行环境,需要在调用时指定默认的并行度。
3、createRemoteEnvironment:返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")
(2)transform算子操作
1、map:val streamMap = stream.map { x => x * 2 }
2、flatMap:val streamFlatMap = stream.flatMap(x=>x.split(",")) //花括号与小括号均可以。
3、filter:val streamFilter=stream.filter(x=>x%2==1)
4、keyBy:DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key,DataStream → KeyedStream:逻辑上将一个流拆分成不相交的分区,每个分区包含具有相同hash(key)的元素。keyBy指定key的三种方法
1)根据字段位置keyBy(0),主要对tuple类型,pojo类会出错,注意:这个是相对于最外元素而言。对于tuple类型还可以指定key的位置keyBy(x=>x._1)
2)根据字段名称,主要是pojo类。stream.map(x=>Person(x,2)).keyBy("name"),也可以多字段分区如keyBy("name","age")
3)自定义keyselector
5、Reduce:KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值。
val sumLambdaStream = dataStream.keyBy("name").reduce((s1, s2) => Score(s1.name, "Sum", s1.score + s2.score))
val inputDataSet=env.fromCollection(List(1,2,3,4,5)).reduce((x,y)=>x+y).print()
6、
7、
8、
(3)
getExecutionEnvironment
原文地址:https://www.cnblogs.com/hdc520/p/12988862.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 将二进制文件加入VC资源后释放执行
- JavaScript中的函数式编程
- JavaScript中的compose函数和pipe函数
- 常用JS函数-数组扁平化,缓存函数,柯里化函数,防抖和节流函数
- JavaScript单元测试及原理
- 前端进阶知识汇总
- 前端也能学算法:由浅入深讲解动态规划
- 轻松理解JS中的面向对象,顺便搞懂prototype和__proto__
- 前端也能学算法:由浅入深讲解贪心算法
- web.py指南性说明
- this到底指向啥?看完这篇就知道了!
- 学以致用:手把手教你撸一个工具库并打包发布,顺便解决JS小数计算不准问题
- python 实现 php 的 var_dump 功能
- RSA初探,聊聊怎么破解HTTPS
- 深入解析Underscore.js源码架构