Flink教程-flink 1.11 流式数据ORC格式写入file

时间:2022-07-25
本文章向大家介绍Flink教程-flink 1.11 流式数据ORC格式写入file,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
  • StreamingFileSink简介
  • 写入orc工厂类
    • 向量化操作
    • 构造OrcBulkWriterFactory
  • 实例讲解
    • 构造source
    • 构造OrcBulkWriterFactory
    • 构造StreamingFileSink

在flink中,StreamingFileSink是一个很重要的把流式数据写入文件系统的sink,可以支持写入行格式(json,csv等)的数据,以及列格式(orc、parquet)的数据。 hive作为一个广泛的数据存储,而ORC作为hive经过特殊优化的列式存储格式,在hive的存储格式中占有很重要的地位。今天我们主要讲一下使用StreamingFileSink将流式数据以ORC的格式写入文件系统,这个功能是flink 1.11版本开始支持的。

StreamingFileSink简介

StreamingFileSink提供了两个静态方法来构造相应的sink,forRowFormat用来构造写入行格式数据的sink,forBulkFormat方法用来构造写入列格式数据的sink,

我们看一下方法forBulkFormat。

 public static <IN> StreamingFileSink.DefaultBulkFormatBuilder<IN> forBulkFormat(
   final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
  return new StreamingFileSink.DefaultBulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
 }

这里需要两个参数,第一个是一个写入的路径,第二个是一个用于创建writer的实现BulkWriter.Factory接口的工厂类。

写入orc工厂类

首先我们要引入相应的pom

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-orc_2.11</artifactId>
  <version>1.11.0</version>
</dependency>

flink为我们提供了写入orc格式的工厂类OrcBulkWriterFactory,我们简单看下这个工厂类的一些变量。

@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

 private static final Path FIXED_PATH = new Path(".");

 private final Vectorizer<T> vectorizer;
 private final Properties writerProperties;
 private final Map<String, String> confMap;
 private OrcFile.WriterOptions writerOptions;
 
 public OrcBulkWriterFactory(Vectorizer<T> vectorizer) {
  this(vectorizer, new Configuration());
 }
 public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration) {
  this(vectorizer, null, configuration);
 }
 public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration) {
        ...................
 }
    .............
}

向量化操作

flink使用了hive的VectorizedRowBatch来写入ORC格式的数据,所以需要把输入数据组织成VectorizedRowBatch对象,而这个转换的功能就是由OrcBulkWriterFactory中的变量---也就是抽象类Vectorizer类完成的,主要实现的方法就是org.apache.flink.orc.vector.Vectorizer#vectorize方法。

  • 在flink中,提供了一个支持RowData输入格式的RowDataVectorizer,在方法vectorize中,根据不同的类型,将输入的RowData格式的数据转成VectorizedRowBatch类型。
 @Override
 public void vectorize(RowData row, VectorizedRowBatch batch) {
  int rowId = batch.size++;
  for (int i = 0; i < row.getArity(); ++i) {
   setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
  }
 }
  • 如果用户想将自己的输入格式以orc格式写入,那么需要继承抽象类Vectorizer,并且实现自己的转换方法vectorize。
  • 如果用户在写入orc文件之后,想添加一些自己的元数据信息,可以覆盖org.apache.flink.orc.vector.Vectorizer#addUserMetadata方法来添加相应的信息。

构造OrcBulkWriterFactory

工厂类一共提供了三个构造方法,我们看到最全的一个构造方法一共接受三个参数,第一个就是我们上面讲到的Vectorizer对象,第二个是一个写入orc格式的配置属性,第三个是hadoop的配置文件.

写入的配置来自https://orc.apache.org/docs/hive-config.html,具体可以是以下的值.

key

缺省值

注释

orc.compress

ZLIB

high level compression = {NONE, ZLIB, SNAPPY}

orc.compress.size

262,144

compression chunk size

orc.stripe.size

67,108,864

memory buffer in bytes for writing

orc.row.index.stride

10,000

number of rows between index entries

orc.create.index

true

create indexes?

orc.bloom.filter.columns

””

comma separated list of column names

orc.bloom.filter.fpp

0.05

bloom filter false positive rate

实例讲解

最后,我们通过一个简单的实例来讲解一下具体的使用。

构造source

首先我们自定义一个source,模拟生成RowData数据,我们这个也比较简单,主要是生成了一个int和double类型的随机数.

 public static class MySource implements SourceFunction<RowData>{
  @Override
  public void run(SourceContext<RowData> sourceContext) throws Exception{
   while (true){
    GenericRowData rowData = new GenericRowData(2);
    rowData.setField(0, (int) (Math.random() * 100));
    rowData.setField(1, Math.random() * 100);
    sourceContext.collect(rowData);
    Thread.sleep(10);
   }
  }
  @Override
  public void cancel(){

  }
 }

构造OrcBulkWriterFactory

接下来定义构造OrcBulkWriterFactory需要的参数。

  //写入orc格式的属性
  final Properties writerProps = new Properties();
  writerProps.setProperty("orc.compress", "LZ4");

  //定义类型和字段名
  LogicalType[] orcTypes = new LogicalType[]{new IntType(), new DoubleType()};
  String[] fields = new String[]{"a1", "b2"};
  TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
    orcTypes,
    fields));

  //构造工厂类OrcBulkWriterFactory
  final OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>(
    new RowDataVectorizer(typeDescription.toString(), orcTypes),
    writerProps,
    new Configuration());

构造StreamingFileSink

  StreamingFileSink orcSink = StreamingFileSink
    .forBulkFormat(new Path("file:///tmp/aaaa"), factory)
    .build();

完整的代码请参考:https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/filesystem/StreamingWriteFileOrc.java