Flink SQL 自定义函数指南 - 以读取 GBK 编码的数据库为例
背景介绍
近期我们遇到了一位客户提出的问题:MySQL 建表时,数据库表定义的字符集是 latin1,里面的数据是以 GBK 编码的方式写入的。当 Flink 的 JDBC Connector 在读取此维表时,输出数据的中文出现了乱码现象,如下图:
原因分析
对于 Oceanus 平台而言,内部的数据处理都是以 Unicode 为标准的。对于非 Unicode 的字符集,在 JDBC Connector 读取时,可能会出现各种异常情况,即使 JDBC 连接 URL 参数中指定了characterEncoding
也无法避免中文乱码问题。
对于 MySQL 数据而言,最怕的不是数据乱码,而是变成问号 (????)。通常来讲,如果遇到了全是问号的情况,则数据基本无法还原了;而对于乱码来说,很可能源数据还在,只是编码选错了,通过恰当的解码方式,还是有希望恢复原有的数据。
因此我们需要编写一个 UDF(用户自定义函数),将 JDBC Connector 读到的 Latin1(这里实际上是 GBK)数据进行解码。
首先我们来看一下数据库中的原始数据(首先需要将终端的编码改为 GBK,否则显示的仍然是乱码):
以 id 为 1 的数据为例,这里喵的 GBK 编码是0xDF 0xF7
。
那问题来了,既然 Flink 并没有报类型错误,说明输入输出还是当作字符串看待的,只是字符串没有经过妥善解码。那 Flink 将其读取成了什么呢?我们来写一个 UDF 自定义函数看看。
UDF 编写
对于这种编解码的场景,适合使用 Flink 的标量函数(Scalar Function),即单条记录进,单条记录出,无需聚合等复杂操作。
在当前的流计算 Oceanus 版本中,已经支持通过CREATE TEMPORARY SYSTEM FUNCTION
的方式来 声明 UDF。声明 UDF 后,在 程序包管理 界面,可以上传具体的实现类 JAR 包。
我们先编写一个打印出 String 里每个 Char 内容的函数,类名为DecodeLatin1
.
初步代码
请先在 pom.xml 中引入 Flink 相关依赖,随后可以开始编写 UDF:
package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.stream.IntStream;
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
char[] inputCharArray = input.toCharArray(); // 不能用 getBytes() 方法, 否则原始内容会被再次编码
IntStream.range(0, inputCharArray.length).forEach(i -> {
LOGGER.info("{}", Integer.toHexString(inputCharArray[i]));
});
return input;
}
}
编写完成并打包后,可以将程序包上传(对于自建的 Flink 集群,则是放入 Flink 的 lib 目录):
随后可以在 SQL 代码中,引用这个程序包:
作业提交运行后,我们可以尝试读取 id=1 的数据,发现打印出来的日志里,字符串中实际上保留了原始字符的 GBK 编码,只是没有经过妥善解码,导致输出时误当作 Unicode 处理了。
另外还注意到,对于原始 Latin1 而言,每个字符占 1 个字节,而这里 Java String 中使用的是 Char 结构,每个字符占了 2 个字节,且高位字节恒为 0. 此猜想在 这篇 MySQL 官方文档 中得到了验证。
那么给我们的启示是:可以直接将 char[] 数组转为等长的 byte[] 数组,而不能按照传统思路,创建一个长度为 char[] 数组两倍的 byte[] 数组。
改版后的代码
按照上面的思路,我们重新实现了一版,该版本可以实现解码并重新生成正确 String。
package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.stream.IntStream;
/**
* 如果 JDBC 数据库的 VARCHAR 为 Latin1 (或 GBK 等) 编码
* 可以使用这个函数转换为标准字符串
*
* SQL 代码声明方式:
* CREATE TEMPORARY SYSTEM FUNCTION DECODE_LATIN1 AS 'com.tencent.cloud.oceanus.udf.DecodeLatin1' LANGUAGE JAVA;
*/
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
return eval(input, "latin1");
}
public String eval(String input, String fromCharset) {
char[] inputCharArray = input.toCharArray();
// JDBC Driver 读取的 Latin1 字符, 高 8 位都是 0x00, 因此只考虑低 8 位即可, byte 和 char 数据部分等长, 长度无需乘以二
byte[] inputBytes = new byte[inputCharArray.length];
IntStream.range(0, inputCharArray.length).forEach(i -> {
inputBytes[i] = (byte) inputCharArray[i];
LOGGER.debug("{}", String.format("0x%02X ", inputBytes[i]));
});
try {
return new String(inputBytes, fromCharset);
} catch (UnsupportedEncodingException e) {
// 与 GET_JSON_OBJECT 的异常处理方式保持一致, 遇到异常数据时输出 null, 避免日志过量打印
LOGGER.debug("Unsupported charset {} for input string {}", fromCharset, input, e);
return null;
}
}
}
上传新版的 UDF,然后再次运行(注意本次增加了一个新字段FromCharset
,表示解码使用的实际字符集):
然后我们再读取数据库中 id 为 1 的数据,现在输出就正常了:
总结
在遇到数据乱码等原生 Flink 无法轻易解决的问题时,可以尝试自定义函数来定位和排查,一旦确认问题根源,可以同样使用自定义函数来对数据进行校正。大大扩展了 Flink SQL 的功能。
另外,程序包可以分版本在不同的作业之间复用,基础包(UDF)和业务包(调用 UDF 的主程序)可以实现解耦。如果有更优化的实现,可以只更新基础包,避免对业务包的改动引入的风险。
- Elasticsearch索引增量统计及定时邮件实现
- 机器学习中的特征空间
- 简单易学的机器学习算法——马尔可夫链蒙特卡罗方法MCMC
- 推荐算法——基于图的推荐算法PersonalRank算法
- 推荐算法——非负矩阵分解(NMF)
- 【Go 语言社区】转-golang windows 判断锁屏
- 【Go 语言社区】单点redis 持久化在高并发下存在延迟情况
- 设计模式(2)-策略模式之多用组合少用继承
- Golang获取随机端口和本机ip地址
- 设计模式(3)-装扮你的类(装饰模式)
- [Go 语言社区]服务器读取配置文件只-json数据
- gsoap开发webservice
- [Go 语言社区]测试模块之---utf8例子
- org.hibernate.type.StringType cannot be cast to org.hibernate.type.VersionType
- MySQL 教程
- MySQL 安装
- MySQL 管理与配置
- MySQL PHP 语法
- MySQL 连接
- MySQL 创建数据库
- MySQL 删除数据库
- MySQL 选择数据库
- MySQL 数据类型
- MySQL 创建数据表
- MySQL 删除数据表
- MySQL 插入数据
- MySQL 查询数据
- MySQL where 子句
- MySQL UPDATE 查询
- MySQL DELETE 语句
- MySQL LIKE 子句
- mysql order by
- Mysql Join的使用
- MySQL NULL 值处理
- MySQL 正则表达式
- MySQL 事务
- MySQL ALTER命令
- MySQL 索引
- MySQL 临时表
- MySQL 复制表
- 查看MySQL 元数据
- MySQL 序列 AUTO_INCREMENT
- MySQL 处理重复数据
- MySQL 及 SQL 注入
- MySQL 导出数据
- MySQL 导入数据
- MYSQL 函数大全
- MySQL Group By 实例讲解
- MySQL Max()函数实例讲解
- mysql count函数实例
- MYSQL UNION和UNION ALL实例
- MySQL IN 用法
- MySQL between and 实例讲解
- Android使用CrashHandler来获取应用的crash信息的方法
- 数据魔术师小白零基础实现简单人脸识别
- Android编程实现XML解析与保存的三种方法详解
- 浅谈关于Android路由的实现
- Android中EditText禁止输入表情的实例代码
- Android仿微信右滑返回功能的实例代码
- 算法复现·推荐算法 | DeepFM for CTR Prediction
- Android 中Notification弹出通知实现代码
- Android编程实现添加低电流提醒功能的方法
- Android头像上传功能的实现代码(获取头像加剪切)
- Android自定义View画圆功能
- Android打包版本号设置方法
- Android编程实现获取系统内存、CPU使用率及状态栏高度的方法示例
- Android 6.0开发实现关机菜单添加重启按钮的方法
- 基于Socket.IO实现Android聊天功能代码示例