Hadoop数据分析平台实战——150Flume介绍离线数据分析平台实战——150Flume介绍
离线数据分析平台实战——150Flume介绍
Nginx介绍
Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器。 其特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好。 一般情况下,我们会将nginx服务器作为一个静态资源的访问容器。
Nginx安装步骤
Nginx安装步骤如下:(使用yum命令安装)
- 使用root用户登录。
- 查看nginx信息,命令:yum info nginx.
- 如果查看nginx信息提示nginx找不到,那么可以通过修改rpm源来进行后续步骤,执行命令:rpm -ivh http://nginx.org/packages/centos/6/noarch/RPMS/nginx-release-centos-6-0.el6.ngx.noarch.rpm
- 在查看nginx信息。
- 安装,命令: yum install nginx。在安装过程中直接输入y。
- 启动nginx,命令:service nginx start
- 访问http://192.168.0.120查看nginx的web页面。
image.png
image.png
image.png
Flume介绍
Flume是Apache基金会组织的一个提供的高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据; 同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
当前Flume有两个版本, Flume0.9x版本之前的统称为Flume-og, Flume1.X版本被统称为Flume-ng。
参考文档:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html
Flume-og和Flume-ng的区别
主要区别如下:
- Flume-og中采用master结构,为了保证数据的一致性,引入zookeeper进行管理。Flume-ng中取消了集中master机制和zookeeper管理机制,变成了一个纯粹的传输工具。
- Flume-ng中采用不同的线程进行数据的读写操作;在Flume-og中,读数据和写数据是由同一个线程操作的,如果写出比较慢的话,可能会阻塞flume的接收数据的能力。
Flume结构
Flume中以Agent为基本单位,一个agent可以包括source、channel、sink,三种组件都可以有多个。 其中source组件主要功能是接收外部数据,并将数据传递到channel中; sink组件主要功能是发送flume接收到的数据到目的地; channel的主要作用就是数据传输和保存的一个作用。 Flume主要分为三类结构:单agent结构、多agent链式结构和多路复用agent结构。
单agent结构
image.png
多agent链式结构
image.png
多路复用agent结构
image.png
Source介绍
Source的主要作用是接收客户端发送的数据,并将数据发送到channel中,source和channel之间的关系是多对多关系,不过一般情况下使用一个source对应多个channel。 通过名称区分不同的source。 Flume常用source有:Avro Source、Thrift Source、Exec Source、Kafka Source、Netcat Source等。 设置格式如下:
<agent-name>.sources.<source_name>.type=指定类型
<agent-name>.sources.<source_name>.channels=channels
.... 其他对应source类型需要的参数
Channel介绍
Channel的主要作用是提供一个数据传输通道,提供数据传输和数据存储(可选)等功能。 source将数据放到channel中,sink从channel中拿数据。 通过不同的名称来区分channel。 Flume常用channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel等。设置格式如下:
<agent-name>.channels.<channel_name>.type=指定类型
.... 其他对应channel类型需要的参数
Sink介绍
Sink的主要作用是定义数据写出方式,一般情况下sink从channel中获取数据,然后将数据写出到file、hdfs或者网络上。 channel和sink之间的关系是一对多的关系。 通过不同的名称来区分sink。 Flume常用sink有:Hdfs Sink、Hive Sink、File Sink、HBase Sink、Avro Sink、Thrift Sink、Logger Sink等。 设置格式如下:
<agent-name>.sinks.<sink_name1>.type=指定类型
<agent-name>.sinks.<sink_name1>.channel=<channe_name>
.... 其他对应sink类型需要的参数
Flume安装
安装步骤如下:
- 下载flume:wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
- 解压flume。
- 修改conf/flume-env.sh文件,如果没有就新建一个。
- 添加flume的bin目录到环境变量中去。
- 验证是否安装成功, flume-ng version
Flume案例1
使用netcat source监听客户端的请求,使用memory channel作为数据的传输通道,使用logger sink打印监听到的信息。 步骤:
- 在conf文件夹中建立test1.conf,里面是agent的配置。
- 启动flume-ng agent --conf ./conf/ --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console。
- 使用telenet命令连接机器,命令:telnet 192.168.100.120 4444
- 输入信息查看是否成功。
test1.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4444
a1.sources.r1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
Flume案例2
Nginx作为日志服务器,通过exec source监听nginx的日志文件,使用memory channel作为数据传输通道,使用hdfs sink将数据存储到hdfs上。
项目用到的配置文件
nginx.conf
user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
log_format log_format '$remote_addr^A$msec^A$http_host^A$request_uri';
sendfile on;
keepalive_timeout 65;
#include /etc/nginx/conf.d/*.conf;
server {
listen 80;
server_name hh 0.0.0.0;
location ~ .*(BfImg).(gif)$ {
default_type image/gif;
access_log /home/hadoop/access.log log_format;
root /etc/nginx/www/source;
}
}
}
test2.conf
agent.sources = r1
agent.sinks = k1
agent.channels = c1
## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
## sources config
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 1000000
agent.channels.c1.keep-alive = 60
#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://hh:8020/logs/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=10
agent.sinks.k1.hdfs.batchSize = 1
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true
复杂的配置方法
agent.sources = r1
agent.sinks = k1 k2
agent.channels = c1
## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.sinks.k2.channel = c1
## sources config
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
agent.sources.r1.selector.type= replicating
## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000000
agent.channels.c1.transactionCapacity = 10000000
agent.channels.c1.byteCapacityBufferPercentage = 60
agent.channels.c1.byteCapacity = 12800000000
agent.channels.c1.keep-alive = 60
#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://server30-244:8020/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=600
agent.sinks.k1.hdfs.batchSize = 100
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c1
agent.sinks.k2.hdfs.path = hdfs://server30-99:8020/agentLog/%Y/%m/%d
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = ub-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k2.hdfs.minBlockReplicas=1
agent.sinks.k2.hdfs.rollInterval=3600
agent.sinks.k2.hdfs.rollSize=132692539
agent.sinks.k2.hdfs.idleTimeout=600
agent.sinks.k2.hdfs.batchSize = 100
agent.sinks.k2.hdfs.rollCount=0
agent.sinks.k2.hdfs.round = true
agent.sinks.k2.hdfs.roundValue = 2
agent.sinks.k2.hdfs.roundUnit = minute
agent.sinks.k2.hdfs.useLocalTimeStamp = true
#####sink groups
agent.sinkgroups=g1
agent.sinkgroups.g1.sinks=k1 k2
agent.sinkgroups.g1.processor.type= failover
agent.sinkgroups.g1.processor.priority.k1=10
agent.sinkgroups.g1.processor.priority.k2=5
agent.sinkgroups.g1.processor.maxpenalty=10000
- 外部表简单总结(r3笔记第51天)
- 通过shell脚本监控sql执行频率(r3笔记第50天)
- 和Null有关的函数(r3笔记第48天)
- 关于查询转换的一些简单分析(二) (r3笔记第68天)
- 跨网络拷贝文件的简单实践(r3笔记第67天)
- 关于enq: TX - allocate ITL entry的问题分析(r3笔记第66天)
- Tensorflow学习:使用Tensorflow搭建深层网络分类器
- 关于interval partitioning(r3笔记65天)
- Spark Tips4: Kafka的Consumer Group及其在Spark Streaming中的“异动”(更新)
- 关于数据库中的一些name(r3笔记第64天)
- 码农的瑞士军刀-脚本语言
- shell基础学习总结(一) (r3笔记第63天)
- 关于sysdba,sysoper,dba的区别(r3笔记第62天)
- 使用句柄实现特定场景的无备份恢复 (r3笔记第61天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- SpringBoot整合Quartz作为调度中心完整实用例子
- SpringBoot整合SpringSecurity简单实现登入登出从零搭建
- SpringBoot整合SpringBatch实用简例
- 解决JPA懒加载典型的N+1问题-注解@NamedEntityGraph
- 基于Shiro,JWT实现微信小程序登录完整例子
- Spring Cloud Eureka 总结
- Spring Cloud Feign 总结
- 掘金15W沸点简单分析(一)
- SpringBoot整合Shiro实现基于角色的权限访问控制(RBAC)系统简单设计从零搭建
- 基于AOP和ThreadLocal实现日志记录
- 搭建prometheus+grafana监控SpringBoot应用入门
- 掘金15W沸点简单分析(二)
- 老生常谈SpringAop日志收集与处理做的工具包
- 线程间通信wait---notify
- Ubuntu安装Oracle Java8