Flink SQL and Table

时间:2020-05-22
本文章向大家介绍Flink SQL and Table ,主要包括Flink SQL and Table 使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Flink 自身提供了不同级别的抽象来支持我们开发流式或者批量处理程序,下图描述了 Flink 支持的 4 种不同级别的抽象。

Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.

Flink Table & SQL 功能一直处于完善开发中,且在不断进行迭代

Flink’s SQL support is based on Apache Calcite which implements the SQL standard

Flink把 SQL 的解析、优化和执行交给了 Apache Calcite。

Queries specified in either interface have the same semantics and specify the same result regardless whether the input is a batch input (DataSet) or a stream input (DataStream).

Flink 在编程模型上提供了 DataStream 和 DataSet 两套 API,并没有做到事实上的批流统一,因为用户和开发者还是开发了两套代码。正是因为 Flink Table & SQL 的加入,可以说 Flink 在某种程度上做到了事实上的批流一体。

Starting from Flink 1.9, Flink provides two different planner implementations for evaluating Table & SQL API programs: the Blink planner and the old planner that was available before Flink 1.9. Planners are responsible for translating relational operators into an executable, optimized Flink job. Both of the planners come with different optimization rules and runtime classes. They may also differ in the set of supported features.

Attention For production use cases, we recommend the old planner that was present before Flink 1.9 for now.

从Flink 1.9开始,Flink提供了两种不同的planner实现来评估Table&SQL API程序,分别是:Blink planner 和Flink 1.9之前的旧planner 。planner负责将关系操作转换为可执行的,并且经过优化的任务。(参考spark catalyst理解)

两者之间的主要区别参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/common.html#main-differences-between-the-two-planners

注意 对于生产情况,建议使用Flink 1.9之前的旧planner。具体如下

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

 

无论是批查询 SQL 还是流式查询 SQL,都会经过对应的Planner Parser 转换成为SQL语法数SQLNode tree,然后生成逻辑执行计划 Logical Plan,逻辑执行计划在经过优化(optimized )后生成真正可以执行的物理执行计划,交给 DataSet(批查询 SQL) 或者 DataStream (流式查询 SQL)的 API 去执行。整体流程跟Spark、Hive也比较相似。

Table API and SQL的编程模型

对于批处理和流式处理, Table API and SQL 编程都遵从相同的模式。

// create a TableEnvironment for specific planner batch or streaming
// 创建环境TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create a Table
// 获取输入表
tableEnv.connect(...).createTemporaryTable("table1");
// register an output Table
// 注册一个输出表
tableEnv.connect(...).createTemporaryTable("outputTable");
​
// create a Table object from a Table API query
// 通过Table API的方式生成表对象
Table tapiResult = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
// 通过sql查询的方式生成表对象
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
​
// emit a Table API result Table to a TableSink, same for SQL result
// 将生成的表结果输出
tapiResult.insertInto("outputTable");
​
// execute
tableEnv.execute("java_job");

 

有没有spark dataset编程和sql编程的感觉?

创建TableEnvironment

TableEnvironment主要负责以下:

  • Registering a Table in the internal catalog --在内部catalog中注册Table

  • Registering catalogs --注册catalogs

  • Loading pluggable modules --加载插件模块

  • Executing SQL queries --执行sql查询

  • Registering a user-defined (scalar, table, or aggregation) function --注册udf函数

  • Converting a DataStream or DataSet into a Table --将datastream或dataset转换为Table

  • Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment --持有对ExecutionEnvironment或StreamExecutionEnvironment的引用

下面分别就Flink流式查询、Flink批次查询、Blink流式查询、Blink批次查询给出了示例

其中Flink和Blink的区别在于Planner的选择,我们在使用中也应当显示给出用哪种Planner

If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
​
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
​
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
​
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
​
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
​
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

 

创建表(Create Tables in the Catalog)

表可以是虚拟表(VIEWS)或常规表(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES一般为外部数据对应的表,比如文件,数据库表或消息队列。

临时表与永久表

临时表与单个Flink会话的生命周期相关,临时表始终存储在内存中。

永久表需要一个catalog来管理表对应的元数据,比如hive metastore,该表将一直存在,直到明确删除该表为止。

创建虚拟表

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query 
Table projTable = tableEnv.from("X").select(...);
​
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

 

通过Connector创建表

比如外部关系型数据库、kafka

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

 

查询表(Query a Table)

以Table API的方式查询表

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");
​
// emit or convert Table
// execute query

 

以SQL的方式查询表

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );
​
// emit or convert Table
// execute query

 

以SQL的方式更新表

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );
​
// execute query

 

此外,也可以Table API和SQL两种方式混合使用。

生成表(Emit a Table / TableSink)

TableSink支持多种文件格式,比如CSV,Apache Parquet,Apache Avro;或者输出到其他存储系统,比如JDBC,Apache HBase,Apache Cassandra,Elasticsearch;也可以写到一些消息中间件,比如Apache Kafka,RabbitMQ

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());
​
tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");
​
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
​
// execute the program

 

查看执行计划(Explaining a Table)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
​
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
​
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);
​
String explanation = tEnv.explain(table);
System.out.println(explanation);

 

结果:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
​
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])
​
== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat
​
Stage 2 : Data Source
    content : collect elements with CollectionInputFormat
​
    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE
​
        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD
​
            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

 

动态表 (Dynamic Table)

与传统的表 SQL 查询相比,Flink Table & SQL 在处理流数据时会时时刻刻处于动态的数据变化中,所以便有了一个动态表的概念。

动态表的查询与静态表一样,但是,在查询动态表的时候,SQL 会做连续查询,不会终止。

下图显示了流、动态表和连续查询之间的关系:

  1. 数据流被转换成一个动态表。

  2. 连续查询动态表,生成新的动态表。

  3. 生成的动态表被转换回流。

举个例子吧,比如我们用户点击事件的实时数据流,如下

[
  user:  VARCHAR,   // the name of the user
  cTime: TIMESTAMP, // the time when the URL was accessed
  url:   VARCHAR    // the URL that was accessed by the user
]

 

我们想使用sql查询,那么首先需要将用户点击事件流转换为表,数据流中的每一条数据都视为在表中插入一条数据,如下:

在任何时间点,连续查询的结果在语义上等价于以批处理模式在输入表快照上执行相同查询的结果,比如我们基于用户点击事件表clicks统计每个用户的点击次数:

未完待续......

 

$flag 上一页 下一页