聊聊flink 1.11 中的随机数据生成器-DataGen connector
- 使用
- 示例
- 源码解析
- 创建TableSource
- 数据生成器DataGenerator
- DataGenTableSource
使用
在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。下面我们简单的聊聊如何来使用以及底层源码是如何实现的。
具体的使用方法可以先看下官网的概述。
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html
之后我来做下简单的概述,以及一些注意点:
- 目前随机生成只支持基本数据类型:数字类型(TINYINT、SMALLINT、int、bigint、FLOAT、double)、字符串类型(VARCHAR、char),以及boolean类型。
- 目前有两种数据生成器,一种是随机生成器(默认),这个是无界的,另一个是序列生成器,是有界的。
- 字段中只要有一个是按序列生成的,也就是有界的,程序就会在序列结束的时候退出。如果所有字段都是随机生成的,则程序最终不会结束。
示例
我们摘抄下官网的例子,然后做下解释。
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
)
- DDL的with属性中,除了connector是必填之外,其他都是可选的。
- rows-per-second 每秒生成的数据条数
- f_sequence字段的生成策略是按序列生成,并且指定了起始值,所以该程序将会在到达序列的结束值之后退出
- f_random 字段是按照随机生成,并指定随机生成的范围
- f_random_str是一个字符串类型,属性中指定了随机生成字符串的长度是10
- ts列是一个计算列,返回当前的时间.
源码解析
创建TableSource
既然是一个source conncector,那么就有对应的TableSourceFactory和TableSource,这个datagen所对应的就是DataGenTableSourceFactory和DataGenTableSource,接下来我们从这里作为入口来看看底层是怎么实现的。
我们首先进入DataGenTableSourceFactory#createDynamicTableSource。
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration options = new Configuration();
context.getCatalogTable().getOptions().forEach(options::setString);
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
DataGenerator[] fieldGenerators = new DataGenerator[tableSchema.getFieldCount()];
for (int i = 0; i < fieldGenerators.length; i++) {
fieldGenerators[i] = createDataGenerator(
tableSchema.getFieldName(i).get(),
tableSchema.getFieldDataType(i).get(),
options);
}
return new DataGenTableSource(fieldGenerators, tableSchema, options.get(ROWS_PER_SECOND));
}
我们看到,在这个方法里,首先获取到包含字段名和类型的TableSchema对象,然后构造了一个长度是字段个数的DataGenerator数组,之后根据每个字段的类型、以及相应的属性参数来依次构造对应的数据生成器。在最后构造了DataGenTableSource对象。
数据生成器DataGenerator
DataGenerator是一个接口,是有状态的可扩展的数据生成器,它有两个抽象的实现类,一个是RandomGenerator,一个是SequenceGenerator,系统根据每个字段的配置选项,也就是"fields.#.kind"来决定采用哪种生成策略为字段生成数据。代码如下:
见DataGenTableSourceFactory#createDataGenerator方法
private DataGenerator createDataGenerator(String name, DataType type, ReadableConfig options) {
String genType = options.get(
key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM));
switch (genType) {
case RANDOM:
return createRandomGenerator(name, type, options);
case SEQUENCE:
return createSequenceGenerator(name, type, options);
default:
throw new ValidationException("Unsupported generator type: " + genType);
}
}
我们进入createRandomGenerator方法,看到系统会根据字段的类型来调用不同的静态方法并且根据配置的最大和最小值来生成所要的数据。
private DataGenerator createRandomGenerator(String name, DataType type, ReadableConfig options) {
ConfigOption<Integer> lenKey = key(FIELDS + "." + name + "." + LENGTH)
.intType().defaultValue(100);
OptionBuilder minKey = key(FIELDS + "." + name + "." + MIN);
OptionBuilder maxKey = key(FIELDS + "." + name + "." + MAX);
switch (type.getLogicalType().getTypeRoot()) {
case BOOLEAN:
return RandomGenerator.booleanGenerator();
...............................
}
createSequenceGenerator方法的实现和这个方法类似。
DataGenTableSource
最后我们来看看DataGenTableSource,在这里,核心主要是createSource方法,用来创建一个flink的具体的source。
DataGeneratorSource<RowData> createSource() {
return new DataGeneratorSource<>(
new RowGenerator(fieldGenerators, schema.getFieldNames()),
rowsPerSecond);
}
我们看到在构造DataGeneratorSource的时候,使用了RowGenerator对象,在这个RowGenerator对象里,主要是通过hasNext来判断程序是否结束,使用next方法来获取下一条数据。
@Override
public boolean hasNext() {
for (DataGenerator generator : fieldGenerators) {
if (!generator.hasNext()) {
return false;
}
}
return true;
}
@Override
public RowData next() {
GenericRowData row = new GenericRowData(fieldNames.length);
for (int i = 0; i < fieldGenerators.length; i++) {
row.setField(i, fieldGenerators[i].next());
}
return row;
}
在hasNext方法里,对所有字段的数据生成器进行判断,只要是有一个结束了,那么就返回false,程序结束,在next方法获取数据的时候,对于每个字段,依次使用对应的DataGenerator来生成一条数据,然后把所生成的数据构造成一个RowData对象,也就是我们要输出的结果。
- Bash漏洞再次演进:缓冲区溢出导致远程任意命令执行
- Pandas,让Python像R一样处理数据,但快
- ViewPager 实现 Galler 效果, 中间大图显示,两边小图展示
- 最小生成树-Prim算法和Kruskal算法
- Bash漏洞批量检测工具与修复方案
- 组合模式
- 用Fiddler在Android上抓包(Http+https)
- Architecture Components ViewModel的控制。
- React Native之AppRegistry模块
- 揭秘:如何分分钟黑掉你的eBay账号
- C++初始化列表
- React Native调试心得
- R语言学习 - 热图简化
- Java 8新特性
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 还在使用Future轮询获取结果吗?CompletionService快来了解下。
- R语言通过伽玛与对数正态分布假设下的广义线性模型对大额索赔进行评估预测
- R语言精算学:使用链梯法Chain Ladder和泊松定律模拟和预测未来赔款数据
- 微服务[学成在线] day19:分布式事务
- 微服务[学成在线] day20:项目部署与持续集成(DevOps)
- R语言中回归模型预测的不同类型置信区间应用比较分析
- R语言进阶之坐标轴和文本
- R语言广义线性模型(GLM)广义相加模型(GAM):多元平滑回归分析保险投资风险敞口
- 面试高频题:springBoot自动装配的原理你能说出来吗?
- R语言巨灾风险下再保险合同定价研究案例:广义线性模型和帕累托分布分析
- nodejs搭建mqtt服务器
- R语言中GLM(广义线性模型),非线性和异方差可视化分析
- 解决java的http请求库dongliu.requests请求结果中文乱码的问题
- 保姆级教程,手把手教你实现一个SpringBoot的starter
- 微服务[学成在线] day05:消息中间件 RabbitMQ