flume安装及配置介绍(二)

时间:2022-05-06
本文章向大家介绍flume安装及配置介绍(二),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

注: 环境: skylin-linux

Flume的下载方式:  

wget http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar.

下载完成之后,使用tar进行解压

tar -zvxf  apache-flume-1.6..0-bin.tar.

进入flume的conf配置包中,使用命令touch flume.conf,然后cp flume-conf.properties.template flume.conf

 使vim/gedit flume.conf 编辑配置文件,需要说明的的是,Flume conf文件用的是Java版的property文件的key-value键值对模式.

  在Flume配置文件中,我们需要

     1. 需要命名当前使用的Agent的名称.

     2. 命名Agent下的source的名字.

     3. 命名Agent下的channal的名字.

     4. 命名Agent下的sink的名字.

     5. 将source和sink通过channal绑定起来.

一般来说,在Flume中会存在着多个Agent,所以我们需要给它们分别取一个名字来区分它们,注意名字不要相同,名字保持唯一!

例如:

#Agent取名为 agent_name
#source 取名为 source_name ,一次类推
agent_name.source = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

上图对应的是单个Agent,单个sink,单个channel情况,如下图

如果我们需要在一个Agent上配置n个sink,m个channel(n>1, m>1),

那么只需要这样配置即可:

#Agent取名为 agent_name
#source 取名为 source_name ,一次类推
agent_name.source = source_name ,source_name1
agent_name.channels = channel_name,channel_name1
agent_name.sinks = sink_name,sink_name1

上面的配置就表示一个Agent中有两个 source,sink,channel的情况,如图所示

以上是对多sink,channel,source情况,对于 多个Agent,只需要给每个Agent取一个独一无二的名字即可!

Flume支持各种各样的sources,sinks,channels,它们支持的类型如下:

Sources

Channels

Sinks

Avro Source Thrift Source Exec Source JMS Source Spooling Directory Source Twitter 1% firehose Source Kafka Source NetCat Source Sequence Generator Source Syslog Sources Syslog TCP Source Multiport Syslog TCP Source Syslog UDP Source HTTP Source Stress Source Legacy Sources Thrift Legacy Source Custom Source Scribe Source

Memory Channel JDBC Channel Kafka Channel File Channel Spillable Memory Channel Pseudo Transaction Channel

HDFS Sink Hive Sink Logger Sink Avro Sink Thrift Sink IRC Sink File Roll Sink Null Sink HBaseSink AsyncHBaseSink MorphlineSolrSink ElasticSearchSink Kite Dataset Sink Kafka Sink

  • Avro Source
  • Thrift Source
  • Exec Source
  • JMS Source
  • Spooling Directory Source
  • Twitter 1% firehose Source
  • Kafka Source
  • NetCat Source
  • Sequence Generator Source
  • Syslog Sources
  • Syslog TCP Source
  • Multiport Syslog TCP Source
  • Syslog UDP Source
  • HTTP Source
  • Stress Source
  • Legacy Sources
  • Thrift Legacy Source
  • Custom Source
  • Scribe Source
  • Memory Channel
  • JDBC Channel
  • Kafka Channel
  • File Channel
  • Spillable Memory Channel
  • Pseudo Transaction Channel
  • HDFS Sink
  • Hive Sink
  • Logger Sink
  • Avro Sink
  • Thrift Sink
  • IRC Sink
  • File Roll Sink
  • Null Sink
  • HBaseSink
  • AsyncHBaseSink
  • MorphlineSolrSink
  • ElasticSearchSink
  • Kite Dataset Sink
  • Kafka Sink

 以上的类型,你可以根据自己的需求来搭配组合使用,当然如果你愿意,你可以为所欲为的搭配.比如我们使用Avro source类型,采用Memory channel,使用HDFS sink存储,那我们的配置可以接着上的配置这样写

#Agent取名为 agent_name
#source 取名为 source_name ,一次类推
agent_name.source = Avro
agent_name.channels = MemoryChannel
agent_name.sinks = HDFS

当你命名好Agent的组成部分后,你还需要对Agent的组成sources , sinks, channles去一一描述. 下面我们来逐一的细说;

Source的配置

注: 需要特别说明,在Agent中对于存在的N(N>1)个source,其中的每一个source都需要单独进行配置,首先我们需要对source的type进行设置,然后在对每一个type进行对应的属性设置.其通用的模式如下:

agent_name.sources. source_name.type = value 
agent_name.sources. source_name.property2 = value 
agent_name.sources. source_name.property3 = value 

具体的例子,比如我们Source选用的是Avro模式

#Agent取名为 agent_name
#source 取名为 source_name ,一次类推
agent_name.source = Avro
agent_name.channels = MemoryChannel
agent_name.sinks = HDFS

#——————————sourcec配置——————————————#
agent_name.source.Avro.type = avro
agent_name.source.Avro.bind = localhost
agent_name.source.Avro.port = 9696
#将source绑定到MemoryChannel管道上
agent_name.source.Avro.channels = MemoryChannel 

Channels的配置

 Flume在source和sink配间提供各种管道(channels)来传递数据.因而和source一样,它也需要配置属性,同source一样,对于N(N>0)个channels,

需要单个对它们注意设置属性,它们的通用模板为:

agent_name.channels.channel_name.type = value 
agent_name.channels.channel_name. property2 = value 
agent_name.channels.channel_name. property3 = value 

具体的例子,假如我们选用memory channel类型,那么我先要配置管道的类型

agent_name.channels.MemoryChannel.type = memory

但是我们现在只是设置好了管道自个儿属性,我们还需要将其和sink,source链接起来,也就是绑定,绑定设置如下,我们可以分别写在source,sink处,也可以集中写在channel处

agent_name.sources.Avro.channels = MemoryChannel
agent_name.sinks.HDFS.channels =  MemoryCHannel

Sink的配置

sink的配置和Source配置类似,它的通用格式:

agent_name.sinks. sink_name.type = value 
agent_name.sinks. sink_name.property2 = value 
agent_name.sinks. sink_name.property3 = value

具体例子,比如我们设置Sink类型为HDFS ,那么我们的配置单就如下:

agent_name.sinks.HDFS.type = hdfs
agent_name.sinks.HDFS.path = HDFS‘s path

以上就是对Flume的配置文件详细介绍,下面在补全一张完整的配置图:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

#define agent
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink kafkaSink

#
# For each one of the sources, the type is defined
#默认模式 agent.sources.seqGenSrc.type = seq / netcat / avro
agent.sources.seqGenSrc.type = avro
agent.sources.seqGenSrc.bind = localhost
agent.sources.seqGenSrc.port = 9696
#####数据来源####
#agent.sources.seqGenSrc.coommand = tail -F /home/gongxijun/Qunar/data/data.log

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

#+++++++++++++++定义sink+++++++++++++++++++++#
# Each sink's type must be defined


agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.type = hbase   
agent.sinks.loggerSink.channel = memoryChannel
#表名
agent.sinks.loggerSink.table = flume
#列名
agent.sinks.loggerSink.columnFamily= gxjun
agent.sinks.loggerSink.serializer = org.apache.flume.sink.hbase.MyHbaseEventSerializer 
#agent.sinks.loggerSink.serializer  = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.loggerSink.zookeeperQuorum=localhost:2181
agent.sinks.loggerSink.znodeParent= /hbase

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel 

# Each channel's type is defined.
#memory
agent.channels.memoryChannel.type = memory
agent.channels.memortChhannel.keep-alive = 10

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
#agent.channels.memoryChannel.checkpointDir = /home/gongxijun/Qunar/data
#agent.channels.memoryChannel.dataDirs = /home/gongxijun/Qunar/data , /home/gongxijun/Qunar/tmpData
agent.channels.memoryChannel.capacity = 10000000
agent.channels.memoryChannel.transactionCapacity = 10000



#define the sink2 kafka

#+++++++++++++++定义sink+++++++++++++++++++++#
# Each sink's type must be defined


agent.sinks.kafkaSink.type = logger
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkaSink.channel = memoryChannel
#agent.sinks.kafkaSink.server=localhost:9092
agent.sinks.kafkaSink.topic= kafka-topic
agent.sinks.kafkaSink.batchSize = 20
agent.sinks.kafkaSink.brokerList = localhost:9092
#Specify the channel the sink should use
agent.sinks.kafkaSink.channel = memoryChannel 

该配置类型如下如所示:

参考资料:

http://www.tutorialspoint.com/apache_flume/apache_flume_configuration.htm

作者: 龚细军

引用请注明出处:http://www.cnblogs.com/gongxijun/p/5661037.html