Flink 批处理算子详解

时间:2022-07-22
本文章向大家介绍Flink 批处理算子详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

批处理程序的结果

  1. 获取运行时
 val env = ExecutionEnvironment.getExecutionEnvironment
  1. 添加Source
val text = env.fromElements("who's there","I think I hear")
  1. 定义算子转换函数
    text.flatMap{_.toLowerCase.split("\w+") filter(_.nonEmpty)}
        .map((_,1))
        .groupBy(0)
        .sum(1)
  1. 定义Sink
counts.print();
  1. 启动程序
env.execute("Kafka Dataset WordCount")

source 定义

// 递归定义整个目录下的所有文件

    val parameter = new Configuration
    parameter.setBoolean("recursive.file.enumeration",true)
    env.readTextFile("file://path/with/files").withParameters(parameter)

算子

Aggregate

    val input: DataSet[(Int,String,Double)] = env.fromElements(
      (1,"hello",4),
      (1,"hello",5),
      (2,"hello",5),
      (3,"word",6),
      (3,"word",6)
    )
    val value = input.groupBy(1).aggregate(Aggregations.SUM,0).aggregate(Aggregations.MIN,2)

连接

连接分为内连接和外连接,外连接分为左外连接,右外连接和内连接

    val input1 = env.fromElements((1,"hello"),(2,"hello"))
    val input2 = env.fromElements(("hello",1),("word",2))
    
    val result = input1.join(input2).where(0).equalTo(1)

广播变量

  1. 动态数据共享。 算子间共享输入和配置参数是静态的,广播变量共享的数据是动态的

广播变量编程步骤:

(1)创建广播变量。

val toBroadcast = env.fromElements(1,2,3);

(2) 注册广播变量

利用 RichFunction 自定义算子函数,注册广播变量

val toBroadcast = env.fromElements(1,2,3);
  val toBroadcast:DataSet[Int] = env.fromElements(1,2,3);

    toBroadcast.map(new RichMapFunction[String,String]() {
      var broadcastSet  = null

      override def open(parameters: Configuration): Unit = {
        // 读取广播变量
        broadcastSet = getRuntimeContext.getBroadcastVariable[String]("broadcastSetName").get(0)
      }
      override def map(value: String): String = {
        
      }
      // 注册广播变量
    }).withBroadcastSet(toBroadcast,"broadcastSetName");