flink教程-基于flink 1.11 使 sql客户端支持执行sql文件
- 背景
- 源码修改
- CliOptionsParser.java
- CliOptions.java
- SqlClient.java
- SqlClient#executeFile
- 社区现状
背景
目前flink的sql客户端提供了一种交互式的sql查询服务,用户可以使用sql客户端执行一些sql的批任务或者流任务。但是当我想执行一些sql的定时任务时,flink却没有提供一个合适的方式,所以综合考虑了一下,我决定在sql的客户端基础上给加一个 '-filename (-f)' 参数,就像类似'hive -f abc.sql' 一样,可以执行一批sql任务。
源码修改
目前我只是想通过sql客户端执行一些批任务,再加上flink sql 客户端本身的一些设计,所以目前修改后的sql client 执行sql文件的时候支持 SET,DDL,INSERT INTO SELECT ...等语句,其他比如select暂不支持。
修改后执行的方式为:
/home/flink/bin/sql-client.sh embedded -f flink.sql
CliOptionsParser.java
在这个sql 客户端参数解析类里添加一个选项,用于解析-f参数。
public static final Option OPTION_FILENAME = Option
.builder("f")
.required(false)
.longOpt("filename")
.numberOfArgs(1)
.argName("the path of the sql file")
.desc("SQL from files")
.build();
CliOptions.java
在这里添加一个变量filename
private final String filename;
SqlClient.java
在SqlClient里添加对于-filename的处理
if (options.getUpdateStatement() != null){
// execute update statement
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
}
} else if (options.getFilename() != null){
final boolean success = cli.executeFile(options.getFilename());
if (!success) {
throw new SqlClientException("Could not submit given SQL file to cluster.");
}
} else {
cli.open();
}
SqlClient#executeFile
添加具体的执行sql文件的方法,sql文件里的所有sql以分号切分,然后分别判断是什么类型,调用不同的方法来执行。
public boolean executeFile(String filename){
File file = new File(filename);
if (!file.exists()){
printError("the file do not exist");
return false;
} else {
String statement = null;
try {
statement = FileUtils.readFileToString(file);
} catch (IOException e){
printError("read the sql file error , " + e.getMessage());
return false;
}
String[] sqls = statement.split(";");
for (String sql : sqls){
if (sql == null || "".equals(sql.trim())){
continue;
}
final Optional<SqlCommandCall> parsedStatement = parseCommand(sql);
if (parsedStatement.isPresent()){
SqlCommandCall cmdCall = parsedStatement.get();
switch (cmdCall.command) {
case SET:
callSet(cmdCall);
break;
................
case INSERT_INTO:
case INSERT_OVERWRITE:
callInsert(cmdCall);
break;
case CREATE_TABLE:
callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);
break;
.....................
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
}
}
}
return true;
}
完整的代码请参考:
https://github.com/zhangjun0x01/flink/tree/release-1.11.0-sqlclient
社区现状
这个问题目前社区有一个相关的issue,但是不知道为什么迟迟没有更新和发布。 https://issues.apache.org/jira/browse/FLINK-12828
我看了一下这个相关的pr,感觉有些问题,比如sql文件没法支持多个sql,如果复用了原来的CliClient#callCommand方法,有些sql是没法执行的,比如clear、select等等,因为select在flink的客户端是开启了一个新的窗口来显示select的结果,但是我们需要的是执行一个sql文件。
所以我自己基于源码改了一版。
- linuxmint下pycharm创建桌面快捷方式
- hdu 1811 Rank of Tetris (并查集+拓扑排序)
- Pycharm常用技巧
- hdu 1598 find the most comfortable road(枚举+卡鲁斯卡尔最小生成树)
- 查询IP地址归属详情
- oracle commit详解
- hdu 4315 Climbing the Hill(阶梯博弈转nim博弈)
- iftop实时网络流量监控工具的安装使用
- hdu 3908 Triple(组合计数、容斥原理)
- hdu 4034 Graph (floyd的深入理解)
- hdu 4033Regular Polygon(二分+余弦定理)
- Debian8配置SSH允许root登陆
- hdu 4405Aeroplane chess(概率DP)
- hdu 3853LOOPS (概率DP)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 这一次搞懂SpringMVC原理
- 这一次搞懂Spring Web零xml配置原理以及父子容器关系
- 这一次搞懂SpringBoot核心原理(自动配置、事件驱动、Condition)
- 全网最深分析SpringBoot MVC自动配置失效的原因
- Mybatis源码初探——优雅精良的骨架
- 深入Mybatis源码——配置解析
- 深入Mybatis源码——执行流程
- Mybatis插件扩展以及与Spring整合原理
- 你所不知道的Spring的@Autowired实现细节
- modbus-RTU-crc16——c语言
- KEIL 生成 Binaxf 文件
- Istio可观测性
- 大点干!早点散----------使用Haproxy搭建web群集
- 嵌入式系统FreeRTOS — 互斥信号量
- 面试题系列第6篇:JVM字符串常量池及String的intern方法详解?