flink 1.11.2 学习笔记(1)-wordCount
时间:2022-07-26
本文章向大家介绍flink 1.11.2 学习笔记(1)-wordCount,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
一、pom依赖
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5
6 <groupId>com.cnblogs.yjmyzz</groupId>
7 <artifactId>spring-boot-demo</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <properties>
11 <java.version>1.8</java.version>
12 <flink.version>1.11.2</flink.version>
13 </properties>
14
15 <dependencies>
16
17 <!--lombok可选-->
18 <dependency>
19 <groupId>org.projectlombok</groupId>
20 <artifactId>lombok</artifactId>
21 <version>1.18.12</version>
22 </dependency>
23
24 <!-- spring应用最小依赖-可选 -->
25 <dependency>
26 <groupId>org.springframework</groupId>
27 <artifactId>spring-context</artifactId>
28 <version>5.2.4.RELEASE</version>
29 </dependency>
30
31 <!-- flink -->
32 <dependency>
33 <groupId>org.apache.flink</groupId>
34 <artifactId>flink-core</artifactId>
35 <version>${flink.version}</version>
36 </dependency>
37
38 <dependency>
39 <groupId>org.apache.flink</groupId>
40 <artifactId>flink-java</artifactId>
41 <version>${flink.version}</version>
42 </dependency>
43
44 <dependency>
45 <groupId>org.apache.flink</groupId>
46 <artifactId>flink-scala_2.12</artifactId>
47 <version>${flink.version}</version>
48 </dependency>
49
50 <dependency>
51 <groupId>org.apache.flink</groupId>
52 <artifactId>flink-clients_2.12</artifactId>
53 <version>${flink.version}</version>
54 </dependency>
55
56 <dependency>
57 <groupId>org.apache.flink</groupId>
58 <artifactId>flink-test-utils-junit</artifactId>
59 <version>${flink.version}</version>
60 </dependency>
61 </dependencies>
62
63 <build>
64 <plugins>
65 <plugin>
66 <artifactId>maven-compiler-plugin</artifactId>
67 <version>3.1</version>
68 <configuration>
69 <source>1.8</source>
70 <target>1.8</target>
71 </configuration>
72 </plugin>
73
74 <!-- Scala Compiler -->
75 <plugin>
76 <groupId>net.alchim31.maven</groupId>
77 <artifactId>scala-maven-plugin</artifactId>
78 <version>4.4.0</version>
79 <executions>
80 <!-- Run scala compiler in the process-resources phase, so that dependencies on
81 scala classes can be resolved later in the (Java) compile phase -->
82 <execution>
83 <id>scala-compile-first</id>
84 <phase>process-resources</phase>
85 <goals>
86 <goal>compile</goal>
87 </goals>
88 </execution>
89 </executions>
90 <configuration>
91 <jvmArgs>
92 <jvmArg>-Xms128m</jvmArg>
93 <jvmArg>-Xmx512m</jvmArg>
94 </jvmArgs>
95 </configuration>
96 </plugin>
97
98 </plugins>
99 </build>
100
101 </project>
二、WordCount(批处理版本)
1 import org.apache.flink.api.common.functions.FlatMapFunction;
2 import org.apache.flink.api.java.DataSet;
3 import org.apache.flink.api.java.ExecutionEnvironment;
4 import org.apache.flink.api.java.tuple.Tuple2;
5 import org.apache.flink.util.Collector;
6
7 /**
8 * WordCount批处理版本
9 */
10 public class BatchWordCount {
11
12 public static void main(String[] args) throws Exception {
13
14 // 1 设置环境
15 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
16
17 // 2. 定义数据
18 String wordFilePath = "/Users/jimmy/Downloads/word.txt";
19 DataSet<String> text = env.readTextFile(wordFilePath);
20
21 // 3. 处理逻辑
22 DataSet<Tuple2<String, Integer>> counts =
23
24 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
25 @Override
26 public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
27 //将每行按word拆分
28 String[] tokens = value.toLowerCase().split("\W+");
29
30 //收集(类似:map-reduce思路)
31 for (String token : tokens) {
32 if (token.length() > 0) {
33 out.collect(new Tuple2<>(token, 1));
34 }
35 }
36 }
37 })
38 //按Tuple2里的第0项,即:word分组
39 .groupBy(0)
40 //然后对Tuple2里的第1项求合
41 .sum(1);
42
43 // 4. 打印结果
44 counts.print();
45
46 }
47 }
注:数据文件/Users/jimmy/Downloads/word.txt的位置,大家可根据实际情况调整,该文件的内容类似:
hello java
hello flink
三、WordCount(流式处理版本)
1 import org.apache.flink.api.common.functions.FlatMapFunction;
2 import org.apache.flink.api.java.tuple.Tuple2;
3 import org.apache.flink.streaming.api.datastream.DataStream;
4 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5 import org.apache.flink.util.Collector;
6
7 public class StreamWordCount {
8
9 public static void main(String[] args) throws Exception {
10 // 1 设置环境
11 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
12
13 // 2. 定义数据
14 DataStream<String> text = env.socketTextStream("127.0.0.1", 9999);
15
16 // 3. 处理逻辑
17 DataStream<Tuple2<String, Integer>> counts =
18
19 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
20 @Override
21 public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
22 //将每行按word拆分
23 String[] tokens = value.toLowerCase().split("\b+");
24
25 //收集(类似:map-reduce思路)
26 for (String token : tokens) {
27 if (token.trim().length() > 0) {
28 out.collect(new Tuple2<>(token.trim(), 1));
29 }
30 }
31 }
32 })
33 //按Tuple2里的第0项,即:word分组
34 .keyBy(value -> value.f0)
35 //然后对Tuple2里的第1项求合
36 .sum(1);
37
38 // 4. 打印结果
39 counts.print();
40
41 // execute program
42 env.execute("Streaming WordCount");
43
44 }
45 }
注:运行前,先在终端命令行输入nc -l 9999,开启一个tcp server作为流式处理的数据源,然后再运行上面的代码,然后命令行输入一些文本,即可看到输出
- Linux下性能调试工具-top和sar运维笔记
- Apache+wsgi+flask部署
- “勒索病毒”到底会勒索啥,尽可以做到让全球对之恐惧无奈!
- 解决win10 关键错误开始菜单和cortana无法工作 的问题(转-真的成功了)
- “AS3.0高级动画编程”学习:第二章转向行为(下)
- windows系统中eclipse C开发环境的架设
- 5个酷毙的Python工具
- ”盒模型“之如何防止边框和内边距把元素撑开
- excel中的不同类型图表叠加
- 这几天遇到的关于IE6/sql2008/win2003的奇怪bug
- 基于Web的工作流管理系统的设计与实现
- 这是对position讲解最通俗易懂的版本了。
- 你到底该如何看待比特币?
- OpenApplus小程序容器
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 语雀自动同步到hexo博客
- 推荐 3 款超好用的 Docker 图形化管理工具
- python标准库之glob介绍
- Python 为什么只需一条语句“a,b=b,a”,就能直接交换两个变量?
- 使用List中的remove方法遇到的坑,不信你没有踩过!
- python opencv 图像尺寸变换
- OpenCv保存图像
- 机器学习|KNN
- docker 查看容器日志
- consul配置ACL
- CentOS7.5更改python版本后及yum不能用的解决办法,非编译!
- nginx 配置websocket
- PostgreSQL10分区表性能研究报告
- linux文件目录管理基本命令总结
- Linux中的硬链接与软链接?