Flume-Hbase-Sink针对不同版本flume与HBase的适配研究与经验总结

时间:2022-04-23
本文章向大家介绍Flume-Hbase-Sink针对不同版本flume与HBase的适配研究与经验总结,主要内容包括一、Flume 的HBaseSinks 详细介绍、二、两大类HBasesinks的详细用法、三、使用flume-hbase-sink的常见错误总结、四、总结flume与HBase版本适配问题&&用户自定义HBase 的column、【附录:源码解读】、2、HBasesink---RegexHbaseEventSerializer、3、AsyncHBasesink---SimpleAsyncHbaseEventSerializer、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

导语:本文细致而全面地讲解使用flume输出数据到HBase的三种不同 Flume-Hbase-Sink 之间的差异性,以及技术细节。并且透彻而全面地总结了不同版本flume和HBase之间的兼容性问题。 为了更加详细说明三种不同hbasesink的差异性,本文在附录附上详细的源码解读。

一、Flume 的HBaseSinks 详细介绍

Flume 有两大类 HBasesinks: HBaseSink (org.apache.flume.sink.hbase.HBaseSink) 和 AsyncHBaseSink (org.apache.flume.sink.hbase.AsyncHBaseSink) 。

1.1、HBasesink

提供两种序列化模式:

1.1.1、SimpleHbaseEventSerializer

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column;

1.1.2、RegexHbaseEventSerializer

根据正则表达式将event 的body拆分到不同的列当中,因此在插入HBase的时候,支持用户自定义插入同一个rowkey对应的同一个columnFamily 的多个column。

【优点】

(a) 安全性较高:支持secure HBase clusters (FLUME-1626) ,支持往secure hbase写数据(hbase可以开启kerberos校验);

(b) 支持0.96及以上版本的HBase 的IPC通信----- the new HBase IPC which was introduced in HBase 0.96 and up。

【缺点】

性能没有AsyncHBaseSink高。因为HBaseSink采用阻塞调用(blocking calls),而AsyncHBaseSink采用非阻塞调用(non-blocking calls)。

1.2、AsyncHBaseSink

目前只提供一种序列化模式:SimpleAsyncHbaseEventSerializer:

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column。

【优点】

AsyncHBaseSink采用非阻塞调用(non-blocking calls),因此,性能比HBaseSink高;

【缺点】

(a) 不支持secure HBase clusters (FLUME-1626),不支持往secure hbase写数据;

(b) 不支持0.96及以上版本的HBase 的IPC通信----- the new HBase IPC which was introduced in HBase 0.96 and up。

二、两大类HBasesinks的详细用法

2.1 HBasesink--SimpleHbaseEventSerializer

Required properties 如下表格所示:

Property Name

Default

Description

channel

type

The component type name, needs to be org.apache.flume.sink.HBaseSink

table

The name of the table in Hbase to write to.

columnFamily

The column family in Hbase to write to.

batchSize

100

Number of events to be written per txn.

serializer

org.apache.flume.sink.hbase.SimpleHbaseEventSerializer;org.apache.flume.sink.hbase.RegexHbaseEventSerializer

two serializers are provided with flume inHBasesink

serializer.*

Properties to be passed to the serializer

如下是展示如何使用 HBasesink--SimpleHbaseEventSerializer:

agenttest.channels = memoryChannel-1
agenttest.sinks = hbaseSink-1
agenttest.sinks.hbaseSink-1.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-1.table = test_hbase_table  //HBase表名
agenttest.sinks.hbaseSink-1.columnFamily = familycolumn-1  //HBase表的列族名称
agenttest.sinks.hbaseSink-1.serializer= org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agenttest.sinks.hbaseSink-1.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-1.channels = memoryChannel-1

注:当指定存入到HBase表的某个列族的指定列column时,不能写成:

agenttest.sinks.hbaseSink-1.columnName = columnname
或者:
agenttest.sinks.hbaseSink-1.column = columnname

这些都是网上的错误写法!另外两个序列化模式也是不能这样使用。

2.2 HBasesink--RegexHbaseEventSerializer

如下是展示如何使用 HBasesink--RegexHbaseEventSerializer(使用正则匹配切割event,然后存入HBase表的多个列):

agenttest.channels = memoryChannel-2
agenttest.sinks = hbaseSink-2
agenttest.sinks.hbaseSink-2.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-2.table = test_hbase_table
agenttest.sinks.hbaseSink-2.columnFamily = familycolumn-2
agenttest.sinks.hbaseSink-2.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer
// 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbaseSink-2.serializer.regex = \[(.*?)\]\ \[(.*?)\]\ \[(.*?)\]\ \[(.*?)\]
// 指定上面正则匹配到的数据对应的hbase的familycolumn-2 列族下的4个cloumn列名
agent.sinks.hbaseSink-2.serializer.colNames = column-1,column-2,column-3,column-4
#agent.sinks.hbaseSink-2.serializer.payloadColumn = test
agenttest.sinks.hbaseSink-2.channels = memoryChannel-

2.3 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer

Required properties 如下表格所示:

Property Name

Default

Description

channel

type

The component type name, needs to be org.apache.flume.sink.AsyncHBaseSink

table

The name of the table in Hbase to write to.

columnFamily

The column family in Hbase to write to.

batchSize

100

Number of events to be written per txn.

timeout

The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction. If no timeout is specified, the sink will wait forever.

serializer

org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

serializer.*

Properties to be passed to the serializer.

如下是展示如何使用 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer:

agenttest.channels = memoryChannel-3
agenttest.sinks = hbaseSink-3
agenttest.sinks.hbaseSink-3.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agenttest.sinks.hbaseSink-3.table = test_hbase_table
agenttest.sinks.hbaseSink-3.columnFamily = familycolumn-3
agenttest.sinks.hbaseSink-3.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agenttest.sinks.hbaseSink-3.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-3.channels = memoryChannel-3

如果读者感兴趣,可以仔细阅读Apache flume官网,上面有一些更加详细的信息:

http://archive.cloudera.com/cdh/3/flume-ng/FlumeUserGuide.html

三、使用flume-hbase-sink的常见错误总结

3.1、无HBase读写权限

如果提交./flume-ng 任务的用户没有HBase的读写权限,那么就会出现无权限读写HBase:

[ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@f46fdc1 counterGroup:{ name:null counters:{} } } - Exception follows.
org.apache.flume.FlumeException: Could not start sink. Table or column family does not exist in Hbase (Permission denied).

【解决方法】

需要为用户赋予HBase读写权限,或者超级管理员权限。

3.2、低版本flume使用错误的序列化模式,导致与HBase版本接口不匹配

本文作者使用 flume-1.6.0 的RegexHbaseEventSerializer(属于 HBasesink)对HBase-1.1.3 和 HBase-1.2.1进行插入数据,出现以下错误:

2016-12-22 12:14:50 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hbase.HBaseSink.process:351)  - Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
    at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
    at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
    at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)

错误信息提示:

java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V

  查看源码,SimpleHbaseEventSerializer和 RegexHbaseEventSerializer的getActions函数产生的是put对象实例,也就是org.apache.hadoop.hbase.client.Put实例(想要了解更详细的内容,可以阅读本文的【附录:源码解读】章节)。org.apache.hadoop.hbase.client.Put里的确包含setWriteToWAL(boolean write)这个函数。新版本的hbase(0.98以上版本),setWriteToWAL(boolean write)方法改变了返回值,从void 变成了 Mutation。而flume-1.6.0以及以下版本,无法适配setWriteToWAL(boolean write)的改变,从而导致出错。

与SimpleHbaseEventSerializer和 RegexHbaseEventSerializer不同的是,SimpleAsyncHbaseEventSerializer的getActions函数不是产生put实例,而是生成PutRequest实例。而PutRequest实例是可以与任意版本的HBase接口适配的。

想要了解更详细的内容,可以阅读本文的【附录:源码解读】章节。

【解决方法】

(1) 如果不改变flume的版本,那么需要将HBase降级到0.98 及以下版本;

(2) 如果不改变HBase版本,需要将flume升级到 1.7.0 及以上版本。

四、总结flume与HBase版本适配问题&&用户自定义HBase 的column

总结:经过上述解读,以及作者本人验证,有以下几条经验总结:

4.1 flume与HBase版本适配问题

4.1.1 对于HBasesink

(a) 对于Flume-1.6.0 及以下版本:HBasesink目前只支持往 HBase-0.98 版本及以下版本写入数据,当HBase超过0.98版本,1.6.0 及以下版本的Flume则不支持往HBase写入数据;

(b) 对于Flume-1.7.0 及以上版本:HBasesink目前支持往0.98及以上版本的HBase写入数据(当然也支持往0.98及以下版本的HBase写入数据);

4.1.2 对于AsyncHBaseSink

(a) 支持所有版本的HBase写入数据。

(b) 不支持0.96及以上版本的HBase 的IPC通信方式------ the new HBase IPC which was introduced in HBase 0.96 and up。

4.2 flume-hbase-sink支持用户自定义HBase的column

4.2.1 对于HBasesink

(a)序列化模式SimpleHbaseEventSerializer

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column;

(b) 序列化模式RegexHbaseEventSerializer

根据正则表达式将event body拆分到不同的列当中,因此在插入HBase的时候,支持用户自定义插入同一个rowkey对应的同一个columnFamily 的多个column。

4.2.2 对于AsyncHBaseSink

目前只提供一种序列化模式:SimpleAsyncHbaseEventSerializer:

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column。

【附录:源码解读】

1、HBasesink---SimpleHbaseEventSerializer

SimpleHbaseEventSerializer类中包括的函数有:

l  SimpleHbaseEventSerialzer函数:这是构造函数

l  configure(ComponentConfiguration conf)函数:这是配置函数,目前是空白。

l  close()函数:这是个关闭函数,估计是用于关闭调用资源的。

l  configure(Context context)函数:配置函数,主要作用是从flume的配置文件中读取信息。

l  initalize(Event event, byte[] cf): 从event中读取内容,并配置HBase的column family

l  getActions():创建put对象,每个put对应着hbase的一行数据。

l  getIncrements():将hbase的自增列加1

Configure(Context context)函数

 // 读取flume配置文件中的rowPrefix,rowPrefix的默认值是default 
·         rowPrefix = context.getString("rowPrefix", "default"); 
·         // 读取flume配置文件中的incrementRow,默认值是inRow
·         incrementRow = 
·             context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); 
·         // 读取flume配置文件中的suffix,默认值是uuid 
·         String suffix = context.getString("suffix", "uuid"); 
·         // 读取flume配置文件的payloadColumn,默认之是pCol。payloadColumn对应这hbase的列名 
·         String payloadColumn = context.getString("payloadColumn","pCol"); 
·         // 读取flume配置文件中的incrementColumn,默认值是iCol 
·         String incColumn = context.getString("incrementColumn","iCol"); 
·         if(payloadColumn != null && !payloadColumn.isEmpty()) { 
·           // 这几行代码是配置hbase中的rowkey前缀     
·           if(suffix.equals("timestamp")){ 
·             keyType = KeyType.TS; 
·           } else if (suffix.equals("random")) { 
·             keyType = KeyType.RANDOM; 
·           } else if(suffix.equals("nano")){ 
·             keyType = KeyType.TSNANO; 
·           } else { 
·             keyType = KeyType.UUID; 
·           } 
·           plCol = payloadColumn.getBytes(Charsets.UTF_8); 
·         } 
·         if(incColumn != null && !incColumn.isEmpty()) { 
·           incCol = incColumn.getBytes(Charsets.UTF_8); 
·         } 

对于Configure函数,主要需要说明的flume配置文件和代码之间的对应关系。

比如,如果你在flume的配置文件中有一行如:a1.sinks.k1.serializer.payloadColumn=colName。 那么Configure函数中的context.getString("payloadColumn", "pCol")的返回值就是colName.

同样如果你设置 a1.sinks.k1.serializer.rowPrefix=123456, 那么context.getString("rowPrefix", "default")的返回值就是123456.

initalize(Event event, byte[] cf)函数

·         public void initialize(Event event, byte[] cf) { 
·             this.payload = event.getBody();
·             this.cf = cf; 
·           }

这个函数代码简单,cf表示hbase中的column family; event是flume的一个事件,是flume数据流中的一个data object。如果flume的source是文本文件的话,文件中的每一行就会产生一个flume event。

getActions()函数

·         public List<Row> getActions() throws FlumeException { 
·           List<Row> actions = new LinkedList<Row>(); 
·           if(plCol != null){ 
·             byte[] rowKey; 
·             try { 
·               // 配置rowkey,具体靠参考SimpleRowKeyGenerator类 
·               if (keyType == KeyType.TS) { 
·                 rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); 
·               } else if(keyType == KeyType.RANDOM) { 
·                 rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); 
·               } else if(keyType == KeyType.TSNANO) { 
·                 rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); 
·               } else { 
·                 rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); 
·               } 
·               // 创建rowkey的一个put 
·               Put put = new Put(rowKey); 
·               // 在put中添加一列数据。columnfamily是cf,colunname是plCol,value是payload。 
·               // plCol是payloadColumn的byte形式。而payloadColumn初始化于Configure函数,来自于flume的配置文件 
·               // payload初始化于initalize函数,来自于event 
·               put.add(cf, plCol, payload); 
·               actions.add(put); 
·             } catch (Exception e){ 
·               throw new FlumeException("Could not get row key!", e); 
·             }
·           } 
·           return actions; 
·         } 

getActions函数,它生成一个put对象实例,put最后插入到hbase中。需要注意的是put实例中所有的数据来源。        plCol来自于payloadColumn, payloadColumn来自于flume的配置文件;cf也是来自于flume配置文件;payload来自于event。

plCol对应hbase中的colum, cf对应hbase中的columnfamily,payload对应hbase中的value。

2、HBasesink---RegexHbaseEventSerializer

RegexHbaseEventSerializer 的源码和 SimpleHbaseEventSerializer 差不多,主要在于以下几个区别:

(1) RegexHbaseEventSerializer.configure(Context context):

此Serializer根据正则可以写入多列:

public void configure(Context context) {
    String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);  //获取配置文件中的正则表达式,默认是“(.*)”
    regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG,
        INGORE_CASE_DEFAULT);  //是否忽略大小写
    inputPattern = Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));  //将给定的正则表达式编译到具有给定标志的模式中
    String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);  //获取配置文件中的列名
    String[] columnNames = colNameStr.split(",");  //分割列名获得列名数组
    for (String s: columnNames) {
      colNames.add(s.getBytes(Charsets.UTF_8));
    }
  }

(2) RegexHbaseEventSerializer.getActions()方法

首先会做一些判断匹配成功否?匹配出的个数和指定的列数相同否?,然后是获取rowkey,这里的rowkey是[time in millis]-[random key]-[nonce]三部分组成的字符串。剩下的是依次匹配列组成Put,返回List<Row> actions。

(3) RegexHbaseEventSerializer.getIncrements()

直接返回一个没有数据的List<Increment>,即不设置计数器。

3、AsyncHBasesink---SimpleAsyncHbaseEventSerializer

SimpleAsyncHbaseEventSerializer类和SimpleHbaseEventSerializer的主要区别在于getActions函数。

SimpleAsyncHbaseEventSerializer:getActions()函数

·         public List<PutRequest> getActions() { 
·           List<PutRequest> actions = new ArrayList<PutRequest>(); 
·           if(payloadColumn != null){ 
·             byte[] rowKey; 
·             try { 
·               switch (keyType) { 
·                 case TS: 
·                   rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); 
·                   break; 
·                 case TSNANO: 
·                   rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); 
·                   break; 
·                 case RANDOM: 
·                   rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); 
·                   break; 
·                 default: 
·                   rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); 
·                   break; 
·               } 
·               // table是hbase中的表名 
·               // rowkey是hbase中的rowkey 
·               // cf是hbase中的columnFamily 
·               // payloadColum是hbase中的column 
·               // payload是hbase中的value 
·               PutRequest putRequest =  new PutRequest(table, rowKey, cf, 
·                   payloadColumn, payload); 
·               actions.add(putRequest); 
·             } catch (Exception e){ 
·               throw new FlumeException("Could not get row key!", e); 
·             } 
·           } 
·           return actions; 
·         } 

 与SimpleHbaseEventSerializer的getActions的不同,不是产生put实例,而是生成PutRequest实例。