BigData--分布式流数据流引擎Apache Flink

时间:2022-07-25
本文章向大家介绍BigData--分布式流数据流引擎Apache Flink,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

官网:https://flink.apache.org/

一、Flink的重要特点

1)事件驱动型(Event-driven)

  • 事件驱动的应用程序是一个有状态的应用程序,它从一个或多个事件流接收事件,并通过触发计算、状态更新或外部操作对传入事件作出反应。
  • 事件驱动应用程序是传统应用程序设计的一种发展,它具有分离的计算和数据存储层。在这种体系结构中,应用程序从远程事务数据库读取数据并将其持久化。
  • 相反,事件驱动应用程序基于有状态流处理应用程序。在这个设计中,数据和计算被放在同一个位置,从而产生本地(内存或磁盘)数据访问。容错是通过定期将检查点写入远程持久存储来实现的。下图描述了传统应用程序体系结构与事件驱动应用程序之间的区别。

kafka作为消息队列就是一种典型的事件驱动型应用。

2) 流、批(stream,micro-batching)

Spark中,一切都是批次组成的,离线数据是一个大批次,实时数据是一个个无限的小批次组成的。 Flink中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

3)分层API

越顶层越抽象,最高层级的抽象是SQL。 越底层越具体

二、Flink使用(word count)

1、设置pom文件

注意下面的依赖设置,使用的是scala 2.12.x版本,Flink版本为1.10.1

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.buildworld.flink</groupId>
    <artifactId>FlinkTrain</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.4.0</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2、编写scala代码

1)批处理 wordcount

scala

package cn.buildworld.flink

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

// 批处理的word count
object WordCount {
  def main(args: Array[String]): Unit = {

    //创建一个批处理的执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //从文件中读取数据
    val inputPath = "D:\Java\project\Scala\FlinkTrain\src\main\resources\hello.txt"

    val dataSet: DataSet[String] = env.readTextFile(inputPath)

    // 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计

    val resultDataSet: DataSet[(String, Int)] = dataSet
      .flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0) //以第一个元素为key进行分组
      .sum(1) //对所有数据的第二个元素求和

    resultDataSet.print()
  }
}
2)流处理wordcount

超级简单,比sparkstreaming的流式处理简单多了!!!

scala

import org.apache.flink.streaming.api.scala._

/**
 * 流处理的word count
 *
 */
object WordCountByStream {
  def main(args: Array[String]): Unit = {

    //创建一个批处理的执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置并行度
    env.setParallelism(6)

    //从端口中读取数据
    val dataSet: DataStream[String] = env.socketTextStream("192.168.162.102", 7777)

    // 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计

    val resultDataSet = dataSet
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0) //以第一个元素为key进行分组
      .sum(1) //对所有数据的第二个元素求和

    resultDataSet.print()

    // 启动任务执行
    env.execute()
  }
}

补充

scala

import org.apache.flink.api.java.utils.ParameterTool

//可以冲启动参数里面读取指定的参数
val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = parameterTool.get("host")
val port: Int = parameterTool.getInt("port")