Flink教程-flink 1.11 流式数据ORC格式写入file
- StreamingFileSink简介
- 写入orc工厂类
- 向量化操作
- 构造OrcBulkWriterFactory
- 实例讲解
- 构造source
- 构造OrcBulkWriterFactory
- 构造StreamingFileSink
在flink中,StreamingFileSink是一个很重要的把流式数据写入文件系统的sink,可以支持写入行格式(json,csv等)的数据,以及列格式(orc、parquet)的数据。 hive作为一个广泛的数据存储,而ORC作为hive经过特殊优化的列式存储格式,在hive的存储格式中占有很重要的地位。今天我们主要讲一下使用StreamingFileSink将流式数据以ORC的格式写入文件系统,这个功能是flink 1.11版本开始支持的。
StreamingFileSink简介
StreamingFileSink提供了两个静态方法来构造相应的sink,forRowFormat用来构造写入行格式数据的sink,forBulkFormat方法用来构造写入列格式数据的sink,
我们看一下方法forBulkFormat。
public static <IN> StreamingFileSink.DefaultBulkFormatBuilder<IN> forBulkFormat(
final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
return new StreamingFileSink.DefaultBulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
}
这里需要两个参数,第一个是一个写入的路径,第二个是一个用于创建writer的实现BulkWriter.Factory接口的工厂类。
写入orc工厂类
首先我们要引入相应的pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
flink为我们提供了写入orc格式的工厂类OrcBulkWriterFactory,我们简单看下这个工厂类的一些变量。
@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
private static final Path FIXED_PATH = new Path(".");
private final Vectorizer<T> vectorizer;
private final Properties writerProperties;
private final Map<String, String> confMap;
private OrcFile.WriterOptions writerOptions;
public OrcBulkWriterFactory(Vectorizer<T> vectorizer) {
this(vectorizer, new Configuration());
}
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration) {
this(vectorizer, null, configuration);
}
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration) {
...................
}
.............
}
向量化操作
flink使用了hive的VectorizedRowBatch来写入ORC格式的数据,所以需要把输入数据组织成VectorizedRowBatch对象,而这个转换的功能就是由OrcBulkWriterFactory中的变量---也就是抽象类Vectorizer类完成的,主要实现的方法就是org.apache.flink.orc.vector.Vectorizer#vectorize方法。
- 在flink中,提供了一个支持RowData输入格式的RowDataVectorizer,在方法vectorize中,根据不同的类型,将输入的RowData格式的数据转成VectorizedRowBatch类型。
@Override
public void vectorize(RowData row, VectorizedRowBatch batch) {
int rowId = batch.size++;
for (int i = 0; i < row.getArity(); ++i) {
setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
}
}
- 如果用户想将自己的输入格式以orc格式写入,那么需要继承抽象类Vectorizer,并且实现自己的转换方法vectorize。
- 如果用户在写入orc文件之后,想添加一些自己的元数据信息,可以覆盖org.apache.flink.orc.vector.Vectorizer#addUserMetadata方法来添加相应的信息。
构造OrcBulkWriterFactory
工厂类一共提供了三个构造方法,我们看到最全的一个构造方法一共接受三个参数,第一个就是我们上面讲到的Vectorizer对象,第二个是一个写入orc格式的配置属性,第三个是hadoop的配置文件.
写入的配置来自https://orc.apache.org/docs/hive-config.html,具体可以是以下的值.
key |
缺省值 |
注释 |
---|---|---|
orc.compress |
ZLIB |
high level compression = {NONE, ZLIB, SNAPPY} |
orc.compress.size |
262,144 |
compression chunk size |
orc.stripe.size |
67,108,864 |
memory buffer in bytes for writing |
orc.row.index.stride |
10,000 |
number of rows between index entries |
orc.create.index |
true |
create indexes? |
orc.bloom.filter.columns |
”” |
comma separated list of column names |
orc.bloom.filter.fpp |
0.05 |
bloom filter false positive rate |
实例讲解
最后,我们通过一个简单的实例来讲解一下具体的使用。
构造source
首先我们自定义一个source,模拟生成RowData数据,我们这个也比较简单,主要是生成了一个int和double类型的随机数.
public static class MySource implements SourceFunction<RowData>{
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception{
while (true){
GenericRowData rowData = new GenericRowData(2);
rowData.setField(0, (int) (Math.random() * 100));
rowData.setField(1, Math.random() * 100);
sourceContext.collect(rowData);
Thread.sleep(10);
}
}
@Override
public void cancel(){
}
}
构造OrcBulkWriterFactory
接下来定义构造OrcBulkWriterFactory需要的参数。
//写入orc格式的属性
final Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "LZ4");
//定义类型和字段名
LogicalType[] orcTypes = new LogicalType[]{new IntType(), new DoubleType()};
String[] fields = new String[]{"a1", "b2"};
TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
orcTypes,
fields));
//构造工厂类OrcBulkWriterFactory
final OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>(
new RowDataVectorizer(typeDescription.toString(), orcTypes),
writerProps,
new Configuration());
构造StreamingFileSink
StreamingFileSink orcSink = StreamingFileSink
.forBulkFormat(new Path("file:///tmp/aaaa"), factory)
.build();
完整的代码请参考:https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/filesystem/StreamingWriteFileOrc.java
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP实现唤起微信支付功能
- PHP封装的mysqli数据库操作类示例
- PHP-FPM 的管理和配置详解
- PHP基于curl实现模拟微信浏览器打开微信链接的方法示例
- 实例讲解PHP表单验证功能
- python如何从键盘获取输入实例
- 使用Keras实现Tensor的相乘和相加代码
- php无限级分类实现评论及回复功能
- php获取手机端的号码以及ip地址实例代码
- PHP数组遍历的几种常见方式总结
- 详解php协程知识点
- php curl简单采集图片生成base64编码(并附curl函数参数说明)
- PHP通过get方法获得form表单数据方法总结
- PHP filesize函数用法浅析
- PHP中创建和编辑Excel表格的方法