Hadoop数据分析平台实战——150Flume介绍离线数据分析平台实战——150Flume介绍

时间:2022-05-07
本文章向大家介绍Hadoop数据分析平台实战——150Flume介绍离线数据分析平台实战——150Flume介绍,主要内容包括离线数据分析平台实战——150Flume介绍、项目用到的配置文件、复杂的配置方法、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

离线数据分析平台实战——150Flume介绍

Nginx介绍

Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器。 其特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好。 一般情况下,我们会将nginx服务器作为一个静态资源的访问容器。

Nginx安装步骤

Nginx安装步骤如下:(使用yum命令安装)

  1. 使用root用户登录。
  2. 查看nginx信息,命令:yum info nginx.
  3. 如果查看nginx信息提示nginx找不到,那么可以通过修改rpm源来进行后续步骤,执行命令:rpm -ivh http://nginx.org/packages/centos/6/noarch/RPMS/nginx-release-centos-6-0.el6.ngx.noarch.rpm
  4. 在查看nginx信息。
  5. 安装,命令: yum install nginx。在安装过程中直接输入y。
  6. 启动nginx,命令:service nginx start
  7. 访问http://192.168.0.120查看nginx的web页面。

image.png

image.png

image.png

Flume介绍

Flume是Apache基金会组织的一个提供的高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据; 同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

当前Flume有两个版本, Flume0.9x版本之前的统称为Flume-og, Flume1.X版本被统称为Flume-ng。

参考文档:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html

Flume-og和Flume-ng的区别

主要区别如下:

  1. Flume-og中采用master结构,为了保证数据的一致性,引入zookeeper进行管理。Flume-ng中取消了集中master机制和zookeeper管理机制,变成了一个纯粹的传输工具。
  2. Flume-ng中采用不同的线程进行数据的读写操作;在Flume-og中,读数据和写数据是由同一个线程操作的,如果写出比较慢的话,可能会阻塞flume的接收数据的能力。

Flume结构

Flume中以Agent为基本单位,一个agent可以包括source、channel、sink,三种组件都可以有多个。 其中source组件主要功能是接收外部数据,并将数据传递到channel中; sink组件主要功能是发送flume接收到的数据到目的地; channel的主要作用就是数据传输和保存的一个作用。 Flume主要分为三类结构:单agent结构、多agent链式结构和多路复用agent结构。

单agent结构

image.png

多agent链式结构

image.png

多路复用agent结构

image.png

Source介绍

Source的主要作用是接收客户端发送的数据,并将数据发送到channel中,source和channel之间的关系是多对多关系,不过一般情况下使用一个source对应多个channel。 通过名称区分不同的source。 Flume常用source有:Avro Source、Thrift Source、Exec Source、Kafka Source、Netcat Source等。 设置格式如下:

<agent-name>.sources.<source_name>.type=指定类型
<agent-name>.sources.<source_name>.channels=channels
.... 其他对应source类型需要的参数

Channel介绍

Channel的主要作用是提供一个数据传输通道,提供数据传输和数据存储(可选)等功能。 source将数据放到channel中,sink从channel中拿数据。 通过不同的名称来区分channel。 Flume常用channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel等。设置格式如下:

<agent-name>.channels.<channel_name>.type=指定类型
.... 其他对应channel类型需要的参数

Sink介绍

Sink的主要作用是定义数据写出方式,一般情况下sink从channel中获取数据,然后将数据写出到file、hdfs或者网络上。 channel和sink之间的关系是一对多的关系。 通过不同的名称来区分sink。 Flume常用sink有:Hdfs Sink、Hive Sink、File Sink、HBase Sink、Avro Sink、Thrift Sink、Logger Sink等。 设置格式如下:

<agent-name>.sinks.<sink_name1>.type=指定类型
<agent-name>.sinks.<sink_name1>.channel=<channe_name>
.... 其他对应sink类型需要的参数

Flume安装

安装步骤如下:

  1. 下载flume:wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
  2. 解压flume。
  3. 修改conf/flume-env.sh文件,如果没有就新建一个。
  4. 添加flume的bin目录到环境变量中去。
  5. 验证是否安装成功, flume-ng version

Flume案例1

使用netcat source监听客户端的请求,使用memory channel作为数据的传输通道,使用logger sink打印监听到的信息。 步骤:

  1. 在conf文件夹中建立test1.conf,里面是agent的配置。
  2. 启动flume-ng agent --conf ./conf/ --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console。
  3. 使用telenet命令连接机器,命令:telnet 192.168.100.120 4444
  4. 输入信息查看是否成功。

test1.conf

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4444
a1.sources.r1.channels=c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

Flume案例2

Nginx作为日志服务器,通过exec source监听nginx的日志文件,使用memory channel作为数据传输通道,使用hdfs sink将数据存储到hdfs上。

项目用到的配置文件

nginx.conf
user  nginx;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    worker_connections  1024;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';


    log_format  log_format   '$remote_addr^A$msec^A$http_host^A$request_uri';

    sendfile        on;
    keepalive_timeout  65;
    #include /etc/nginx/conf.d/*.conf;

server {
    listen       80;
    server_name  hh 0.0.0.0;

    location ~ .*(BfImg).(gif)$ {
      default_type image/gif;
      access_log /home/hadoop/access.log log_format;
      root /etc/nginx/www/source;  
   }
}
}
test2.conf
agent.sources = r1
agent.sinks = k1
agent.channels = c1

## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

## sources config
agent.sources.r1.type  = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log

## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 1000000
agent.channels.c1.keep-alive = 60

#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://hh:8020/logs/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=10
agent.sinks.k1.hdfs.batchSize = 1
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true

复杂的配置方法

agent.sources = r1
agent.sinks = k1 k2
agent.channels = c1

## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.sinks.k2.channel = c1

## sources config
agent.sources.r1.type  = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
agent.sources.r1.selector.type= replicating

## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000000
agent.channels.c1.transactionCapacity = 10000000
agent.channels.c1.byteCapacityBufferPercentage = 60
agent.channels.c1.byteCapacity = 12800000000
agent.channels.c1.keep-alive = 60

#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://server30-244:8020/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=600
agent.sinks.k1.hdfs.batchSize = 100
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true

agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c1
agent.sinks.k2.hdfs.path = hdfs://server30-99:8020/agentLog/%Y/%m/%d
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = ub-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k2.hdfs.minBlockReplicas=1
agent.sinks.k2.hdfs.rollInterval=3600
agent.sinks.k2.hdfs.rollSize=132692539
agent.sinks.k2.hdfs.idleTimeout=600
agent.sinks.k2.hdfs.batchSize = 100
agent.sinks.k2.hdfs.rollCount=0
agent.sinks.k2.hdfs.round = true
agent.sinks.k2.hdfs.roundValue = 2
agent.sinks.k2.hdfs.roundUnit = minute
agent.sinks.k2.hdfs.useLocalTimeStamp = true


#####sink groups
agent.sinkgroups=g1
agent.sinkgroups.g1.sinks=k1 k2
agent.sinkgroups.g1.processor.type= failover
agent.sinkgroups.g1.processor.priority.k1=10
agent.sinkgroups.g1.processor.priority.k2=5
agent.sinkgroups.g1.processor.maxpenalty=10000