[DB] Flink

时间:2021-07-29
本文章向大家介绍[DB] Flink,主要包括[DB] Flink使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

概述

  • 流式计算,本质上是增量计算,需要不断查询过去的状态

概念

  • Streams(流):分为有界流(固定大小,不随时间增加而增长)和无界流(随时间增加而增长),
  • State(状态):在进行流式计算过程中的信息,用于容错恢复和持久化
  • Time(时间):支持Event time、Ingestion time、Processing time等,用来判断业务状态是否滞后或延迟
  • API:分为SQL/Table API、DataStream API、ProcessFunction三层

集群

  • JobManager:集群管理者,负责调度任务,协调checkpoints、协调故障恢复,收集Job状态,管理TaskManager
  • TaskManager:实际执行计算的Worker,在其上执行Flink Job 的一组Task,将所在节点的服务器信息如内存、磁盘、任务运行情况等向JobManager汇报
  • Clinent:将任务提交到集群,根据用户参数选择提交模式(yarn per job,stand-alone,yarn-session)

模型

  • DataStream 的编程模型包括四个部分:Environment、DataSource、Transformation、Sink
  • DataSource(数据源):文件、Collection、Socket、自定义
  • Sink(数据目标):Kafka、Elasticsearch、RabbitMQ、Cassandra、Redis
  • 每个数据流起始于一个或多个Source,并终止于一个或多个Sink

资源

  • 一个TaskManager就是一个JVM进程,会用独立的线程来执行Task
  • 每个TaskManager为集群提供Slot,每个task slot代表了TaskManager的一个固定大小的资源子集,slot数一般为每个节点的cpu核数
  • 一个Flink程序由多个任务组成(source、transformation和 sink)
  • 一个任务由多个并行的实例(线程)来执行,一个任务的并行实例 (线程) 数目就被称为该任务的并行度

优点

  • 架构:主从模式
  • 容错:基于两阶段提交,实现了精确的一次处理语义
  • 反压:当消费者速度低于生产者时,需要消费者将信息反馈给生产者,使二者速度匹配,Flink使用分布式阻塞队列实现

连接器

  • Kafka
  • Redis
  • ElasticSearch

算子

  • Map:接受一个元素作为输入,根据开发者自定义的逻辑处理后输出
 1 class StreamingDemo {
 2     public static void main(String[] args) throws Exception {
 3 
 4         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 5         //获取数据源
 6         DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 
 7         //Map
 8         SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
 9             @Override
10             public Object map(MyStreamingSource.Item item) throws Exception {
11                 return item.getName();
12             }
13         });
14         //打印结果
15         mapItems.print().setParallelism(1);
16         String jobName = "user defined streaming source";
17         env.execute(jobName);
18     }
19 }
View Code
  • FlatMap:接受一个元素,返回0到多个元素,和Map的区别是,当返回值是列表时,FlatMap会将列表平铺,以单个元素的形式输出
1 SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
2     @Override
3     public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
4         String name = item.getName();
5         collector.collect(name);
6     }
7 });
View Code
  • Filter:过滤掉不需要的数据,每个元素都会被Filter处理,如果Filter函数返回true则保留,否则丢弃
1 SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
2     @Override
3     public boolean filter(MyStreamingSource.Item item) throws Exception {
4 
5         return item.getId() % 2 == 0;
6     }
7 });
View Code
  • KeyBy:根据数据的某种属性分组,然后对不同的组采取不同的处理方式
  • Aggregations:聚合函数,常见的有sum、max、min等,需要指定一个key进行聚合
  • Reduce:按照用户自定义逻辑进行分组聚合

状态

  • Flink框架的计算是有状态的
  • 状态即中间计算结果,是在流处理过程中需要记住的数据,包括业务数据和元数据
  • 状态存储在JVM中
  • Flink支持不同类型的状态,对状态的持久化提供专门机制和状态管理器
  • 对于任何一个状态数据,可以设置过期时间(TTL)
  • 基本类型:是否按照某个key进行分区
    • Keyed State:每个key都有自己的状态
    • Operator State(Keyed State):每个算子实例共享一个状态

容错

  • Checkpoint

窗口

  • 滚动窗口
  • 滑动窗口
  • 会话窗口

时间

  • 生成时间
  • 接入时间
  • 处理时间

水位

  • 由于网络延迟等因素,事件数据往往不能即使传递至Flink系统中,导致系统的不稳定或数据乱序
  • 衡量数据处理进度,确保事件数据全部到达Flink系统,即使乱序或迟到,也能像预期一样计算出正确和连续的结果
  • 任何Event进入Flink系统,都会根据当前最大事件时间产生Watermarks时间戳

广播变量

  • 允许在每台机器上保持一个只读的缓存变量,即一个公共的共享变量
  • 可以把一个dataset数据集广播出去,然后不同的task在节点上都能获取到

案例

  • 安装flink
    • tar -zxvf flink-1.9.2-bin-scala_2.11.tgz -C ~/training/
  • 修改flink配置文件
    • vim flink-conf.yaml
  • 启动hadoop,zookeeper,flink
    •  bin/start-cluster.sh
  • socket数据源
    • nc -lk 9999
  • 在idea中创建maven工程,开发计数程序

FlinkStreaming.scala

 1 package com.kaikeba.demo1
 2 
 3 import org.apache.flink.runtime.state.filesystem.FsStateBackend
 4 import org.apache.flink.streaming.api.CheckpointingMode
 5 import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
 6 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 7 import org.apache.log4j.{Level, Logger}
 8 
 9 //导入隐式转换的包
10 import org.apache.flink.api.scala._
11 
12 /**
13   * flink接受socket数据,进行单词计数
14   */
15 object FlinkStream {
16   Logger.getLogger("org").setLevel(Level.ERROR)
17 
18   def main(args: Array[String]): Unit = {
19       //todo:1、构建流处理的环境
20       val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
21 
22      //todo:2、从socket获取数据
23        val sourceStream: DataStream[String] = environment.socketTextStream("bigdata111",9999)
24 
25     //todo:3、对数据进行处理     hadoop spark
26     val result: DataStream[(String, Int)] = sourceStream
27                                                         .flatMap(x => x.split(" ")) //按照空格切分
28                                                         .map(x => (x, 1))   //每个单词计为1
29                                                         .keyBy(0)         //按照下标为0的单词进行分组
30                                                         .sum(1)           //按照下标为1累加相同单词出现的1
31 
32 
33     //todo: 4、对数据进行打印  sink
34     result.print()
35 
36 
37     //todo: 5、开启任务
38     environment.execute("FlinkStream")
39   }
40 
41 }
View Code

FlinkWordCount.scala

 1 package com.kaikeba.demo1
 2 
 3 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 4 import org.apache.flink.streaming.api.windowing.time.Time
 5 
 6 /**
 7   * 使用滑动窗口
 8   * 每隔1秒钟统计最近2秒钟的每个单词出现的次数
 9   */
10 object FlinkStream {
11 
12   def main(args: Array[String]): Unit = {
13       //构建流处理的环境
14         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
15 
16      //从socket获取数据
17        val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)
18 
19      //导入隐式转换的包
20       import org.apache.flink.api.scala._
21 
22      //对数据进行处理
23      val result: DataStream[(String, Int)] = sourceStream
24           .flatMap(x => x.split(" ")) //按照空格切分
25           .map(x => (x, 1))           //每个单词计为1
26           .keyBy(0)                   //按照下标为0的单词进行分组      
27           .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s处理2s的数据
28           .sum(1)            //按照下标为1累加相同单词出现的次数
29 
30         //对数据进行打印
31         result.print()
32 
33         //开启任务
34          env.execute("FlinkStream")
35       }
36 
37 }
View Code
  • 打包jar文件,提交到yarn  
    • ~/training/flink-1.9.2/bin/flink run -m yarn-cluster -yjm 1024 -c com.kaikeba.demo1.FlinkStream original-flask_demo-1.0-SNAPSHOT.jar
  • 查看结果
    • http://bigdata111:8088

参考

Spark Streaming 和 Flink 

https://blog.csdn.net/csdnnews/article/details/81518143

读写MySQL

https://blog.csdn.net/hyy1568786/article/details/105886518/

Flink 和 kafka

https://blog.csdn.net/SqrsCbrOnly1/article/details/100011933

State

https://blog.csdn.net/mhaiy24/article/details/102707958

Flink 广播

https://blog.csdn.net/nazeniwaresakini/article/details/107404951

https://www.jianshu.com/p/520376ae837e

Flink 状态

https://blog.csdn.net/mhaiy24/article/details/102707958

Flink入门到项目

https://blog.csdn.net/lp284558195/article/details/92798595

Flink 使用 broadcast 实现维表或配置的实时更新

https://blog.csdn.net/tzs_1041218129/article/details/105283325

flink+kafka实现wordcount实时计算+错误解决方案

https://blog.csdn.net/xiaoyutongxue6/article/details/88861087

flink流处理访问mysql

https://blog.csdn.net/u012447842/article/details/89175772

原文地址:https://www.cnblogs.com/cxc1357/p/12709836.html