聊聊flink 1.11 中的随机数据生成器-DataGen connector

时间:2022-07-25
本文章向大家介绍聊聊flink 1.11 中的随机数据生成器-DataGen connector,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
  • 使用
  • 示例
  • 源码解析
    • 创建TableSource
    • 数据生成器DataGenerator
    • DataGenTableSource

使用

在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。下面我们简单的聊聊如何来使用以及底层源码是如何实现的。

具体的使用方法可以先看下官网的概述。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

之后我来做下简单的概述,以及一些注意点:

  • 目前随机生成只支持基本数据类型:数字类型(TINYINT、SMALLINT、int、bigint、FLOAT、double)、字符串类型(VARCHAR、char),以及boolean类型。
  • 目前有两种数据生成器,一种是随机生成器(默认),这个是无界的,另一个是序列生成器,是有界的。
  • 字段中只要有一个是按序列生成的,也就是有界的,程序就会在序列结束的时候退出。如果所有字段都是随机生成的,则程序最终不会结束。

示例

我们摘抄下官网的例子,然后做下解释。

CREATE TABLE datagen (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',

 -- optional options --

 'rows-per-second'='5',

 'fields.f_sequence.kind'='sequence',
 'fields.f_sequence.start'='1',
 'fields.f_sequence.end'='1000',

 'fields.f_random.min'='1',
 'fields.f_random.max'='1000',

 'fields.f_random_str.length'='10'
)
  • DDL的with属性中,除了connector是必填之外,其他都是可选的。
  • rows-per-second 每秒生成的数据条数
  • f_sequence字段的生成策略是按序列生成,并且指定了起始值,所以该程序将会在到达序列的结束值之后退出
  • f_random 字段是按照随机生成,并指定随机生成的范围
  • f_random_str是一个字符串类型,属性中指定了随机生成字符串的长度是10
  • ts列是一个计算列,返回当前的时间.

源码解析

创建TableSource

既然是一个source conncector,那么就有对应的TableSourceFactory和TableSource,这个datagen所对应的就是DataGenTableSourceFactory和DataGenTableSource,接下来我们从这里作为入口来看看底层是怎么实现的。

我们首先进入DataGenTableSourceFactory#createDynamicTableSource。

	@Override
	public DynamicTableSource createDynamicTableSource(Context context) {
		Configuration options = new Configuration();
		context.getCatalogTable().getOptions().forEach(options::setString);

		TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

		DataGenerator[] fieldGenerators = new DataGenerator[tableSchema.getFieldCount()];
		for (int i = 0; i < fieldGenerators.length; i++) {
			fieldGenerators[i] = createDataGenerator(
					tableSchema.getFieldName(i).get(),
					tableSchema.getFieldDataType(i).get(),
					options);
		}

		return new DataGenTableSource(fieldGenerators, tableSchema, options.get(ROWS_PER_SECOND));
	}

我们看到,在这个方法里,首先获取到包含字段名和类型的TableSchema对象,然后构造了一个长度是字段个数的DataGenerator数组,之后根据每个字段的类型、以及相应的属性参数来依次构造对应的数据生成器。在最后构造了DataGenTableSource对象。

数据生成器DataGenerator

DataGenerator是一个接口,是有状态的可扩展的数据生成器,它有两个抽象的实现类,一个是RandomGenerator,一个是SequenceGenerator,系统根据每个字段的配置选项,也就是"fields.#.kind"来决定采用哪种生成策略为字段生成数据。代码如下:

见DataGenTableSourceFactory#createDataGenerator方法

	private DataGenerator createDataGenerator(String name, DataType type, ReadableConfig options) {
		String genType = options.get(
				key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM));
		switch (genType) {
			case RANDOM:
				return createRandomGenerator(name, type, options);
			case SEQUENCE:
				return createSequenceGenerator(name, type, options);
			default:
				throw new ValidationException("Unsupported generator type: " + genType);
		}
	}

我们进入createRandomGenerator方法,看到系统会根据字段的类型来调用不同的静态方法并且根据配置的最大和最小值来生成所要的数据。

	private DataGenerator createRandomGenerator(String name, DataType type, ReadableConfig options) {
		ConfigOption<Integer> lenKey = key(FIELDS + "." + name + "." + LENGTH)
				.intType().defaultValue(100);
		OptionBuilder minKey = key(FIELDS + "." + name + "." + MIN);
		OptionBuilder maxKey = key(FIELDS + "." + name + "." + MAX);
		switch (type.getLogicalType().getTypeRoot()) {
			case BOOLEAN:
				return RandomGenerator.booleanGenerator();
		    ...............................
	}

createSequenceGenerator方法的实现和这个方法类似。

DataGenTableSource

最后我们来看看DataGenTableSource,在这里,核心主要是createSource方法,用来创建一个flink的具体的source。

	DataGeneratorSource<RowData> createSource() {
			return new DataGeneratorSource<>(
					new RowGenerator(fieldGenerators, schema.getFieldNames()),
					rowsPerSecond);
		}

我们看到在构造DataGeneratorSource的时候,使用了RowGenerator对象,在这个RowGenerator对象里,主要是通过hasNext来判断程序是否结束,使用next方法来获取下一条数据。

		@Override
		public boolean hasNext() {
			for (DataGenerator generator : fieldGenerators) {
				if (!generator.hasNext()) {
					return false;
				}
			}
			return true;
		}

		@Override
		public RowData next() {
			GenericRowData row = new GenericRowData(fieldNames.length);
			for (int i = 0; i < fieldGenerators.length; i++) {
				row.setField(i, fieldGenerators[i].next());
			}
			return row;
		}

在hasNext方法里,对所有字段的数据生成器进行判断,只要是有一个结束了,那么就返回false,程序结束,在next方法获取数据的时候,对于每个字段,依次使用对应的DataGenerator来生成一条数据,然后把所生成的数据构造成一个RowData对象,也就是我们要输出的结果。