Flink Table读取外部数据(File、Kafka)以及查询转换
时间:2021-01-12
本文章向大家介绍Flink Table读取外部数据(File、Kafka)以及查询转换,主要包括Flink Table读取外部数据(File、Kafka)以及查询转换使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
知识点
FlinkTable步骤: // 1、创建表的执行环境 val tableEnv = ... // 2、创建一张表,用于读取数据 tableEnv.connect(...).createTemporaryTable("inputTable") // 3、1通过 Table API 查询算子,得到一张结果表 val result = tableEnv.from("inputTable").select(...) // 3、2通过 SQL 查询语句,得到一张结果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") // 4、注册一张表,用于把计算结果输出 tableEnv.connect(...).createTemporaryTable("outputTable") // 5、将结果表写入输出表中 result.insertInto("outputTable")
1、CSV文件依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.1</version> </dependency>
2、代码案例
package table import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment} import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema} import org.apache.flink.table.api.scala._ import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/12 21:53 * @version 1.0 */ object TableApiTest { def main(args: Array[String]): Unit = { //1、创建表执行环境、就得使用流式环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) /** //1、1老版本planner的流处理 val setttings = EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() .build() val oldStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, setttings) //1.2老版本的批处理 val batchEnv = ExecutionEnvironment.getExecutionEnvironment val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv) //1.1新版本,基于blink planner的流处理 val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings) //1.2新版本,基于blink planner的批处理 val blinkBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build() val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings) **/ //2、连接外部系统,读取数据,注册表 //2.1读取文件 val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" tableEnv.connect(new FileSystem().path(inputFile)) // new OldCsv()是一个非标的格式描述 .withFormat(new Csv()) .withSchema(new Schema().field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") val inputTable: Table = tableEnv.from("inputTable") inputTable.toAppendStream[(String,Long,Double)].print("result") //2.2读取kafka数据 tableEnv.connect(new Kafka() .version("0.11") .topic("Demo") .property("zookeeper.connect","localhost:2181") .property("bootstrap.servers","localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema().field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ).createTemporaryTable("kafkaTable") val kafkaTable: Table = tableEnv.from("kafkaTable") kafkaTable.toAppendStream[(String,Long,Double)].print("kafkaResult") //3、查询转换 //3.1 使用table api val sensorTable: Table = tableEnv.from("inputTable") val apiResult: Table = sensorTable.select('id, 'temperature) .filter('id === "sensor_1") //3.2sql实现 val sqlResult: Table = tableEnv.sqlQuery( """ |select id ,temperature |from inputTable |where id = 'sensor_1' """.stripMargin) apiResult.toAppendStream[(String, Double)].print("apiResult") sqlResult.toAppendStream[(String, Double)].print("sqlResult") env.execute("table api test") } }
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- python爬虫获取淘宝天猫商品详细参数
- vue将对象新增的属性添加到检测序列的方法
- Java写出生肖年判断
- 解决Vue不能检测数组或对象变动的问题
- php实现生成带二维码图片并强制下载功能
- python正则表达式爬取猫眼电影top100
- vue watch监听对象及对应值的变化详解
- 使用watch监听路由变化和watch监听对象的实例
- python爬虫爬取淘宝商品信息(selenum+phontomjs)
- 基于Spring Batch向Elasticsearch批量导入数据示例
- 浅谈VUE监听窗口变化事件的问题
- JS抛物线动画实例制作
- 解决tomcat在Debug模式下无法启动问题
- 解决VUE框架 导致绑定事件的阻止冒泡失效问题
- JavaScript中利用Array filter() 方法压缩稀疏数组