flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题
本文的背景: 在搜集日志的过程中,日志文件的个数及日志文件需要不断的追加。flume1.6中,可以使用tail -f可以解决不断追加的文件,但是由于日志文件的个数是变化的,不可能只产生一个文件。所以tail -f就已经不能解决这个搜集日志的问题。 需求: 需要能够监控不断增加的文件,并且单个文件也是不断追加的
解决办法: 这时候flume1.7就产生了,很好的通过 TAILDIRl解决了这个问题。TAILDIRl可以监控一个目录下的文件。 官网地址:http://flume.apache.org/FlumeUserGuide.html 官网文档截图:
上面加粗为常用属性。 这里我们只使用了下面两个属性 a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.* a1.sources.source1.type = TAILDIR flume1.7安装包 链接:http://pan.baidu.com/s/1c1Pzo9i 密码:fxa4 一、Flume安装 1. 压缩安装包
[Bash shell] 纯文本查看 复制代码
?
tar -zxvf ~/jar/apache-flume-1.7.0-bin.tar.gz -C /data
mv /data/apache-flume-1.7.0-bin/ /data/flume-1.7.0 # 重命名
2. 配置环境变量
[Bash shell] 纯文本查看 复制代码
?
echo -e "export FLUME_HOME=/data/flume-1.7.0nexport PATH=$FLUME_HOME/bin:$PATH" >> ~/.bashrc
source ~/.bashrc
3. 配置flume
[Bash shell] 纯文本查看 复制代码
?
cp flume-env.sh.template flume-env.sh修改JAVA_HOME
export JAVA_HOME= /data/jdk1.8.0_111
4. 验证安装
[Bash shell] 纯文本查看 复制代码
?
flume-ng version
二、Flume使用 一个agent由source、channel、sink组成。这儿我们使用Spooling Directory Source、File Channel、Kafka Sink。 1. 单节点的agent 1) 增加配置文件
[Bash shell] 纯文本查看 复制代码
?
cd $FLUME_HOME/conf
vim single_agent.conf
将以下内容拷贝进去
[Bash shell] 纯文本查看 复制代码
?
# agent的名称为a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
# set source
#a1.sources.source1.type = spooldir
a1.sources.source1.type = TAILDIR
a1.sources.source1.filegroups = f1
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
#a1.sources.source1.spoolDir=/data/aboutyunlog
a1sources.source1.fileHeader = flase
# set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= aboutyunlog
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
# set channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /data/flume_data/data
# bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1
2. 创建所需文件
[Bash shell] 纯文本查看 复制代码
?
mkdir -p /data/aboutyunlog
mkdir -p /data/flume_data/checkpoint
mkdir -p /data/flume_data/data
3. 查看kafka现有的topic
[Bash shell] 纯文本查看 复制代码
?
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list
4. 在kafka上创建名为aboutyunlog的topic
[Bash shell] 纯文本查看 复制代码
?
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3
5. 启动flume
[Bash shell] 纯文本查看 复制代码
?
flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
启动过程中控制台会输出很多日志。 6. 创建一个kafka的consumer
[Bash shell] 纯文本查看 复制代码
?
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic aboutyunlog --from-beginning
这条命令的意思是说创建aboutyunlog这个topic下的消费者,消费时从最开始的一条信息开始消费。 上图说明该消费者创建成功,由于本地/data/aboutyunlog目录下没有新文件加入,造成aboutyunlog这个topic没有信息输入,所以消费者没有得到一条信息。 7. 添加文件到flume source目录
[Bash shell] 纯文本查看 复制代码
?
echo -e "this is a test file! n[url]http://www.aboutyun.com20170820[/url]"
mv log.1 /data/aboutyunlog/
为:echo -e "this is a test file! nhttp://www.aboutyun.com20170820">log.1 再次执行
[Bash shell] 纯文本查看 复制代码
?
echo -e "this is a test file! n[url]http://www.aboutyun.com20170820[/url]">log.2
然后我们看到 master上
注意:需要通过xshell链接两个master。也就是打开两个master界面 8. 再次查看kafka consumer 切换到创建kafka consumer的shell界面,会看到我们log.1中文件的内容被打印在屏幕上。
上图说明我们已经成功使用flume监控/data/aboutyunlog目录,并将监控目录中的内容发送到kafka的aboutyunlog主题中。 注意:如果使用flume1.6会找不到类。
[Bash shell] 纯文本查看 复制代码
?
17/08/17 19:21:08 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: TAILDIR, class: TAILDIR
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: TAILDIR
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
... 11 more
所以需更换flume1.7
- MongoDB副本集其他细节
- 数据库表设计对性能的影响
- V部落博客管理平台开源啦! Vue+SpringBoot强强联合!
- SpringBoot+SpringSecurity处理Ajax登录请求
- MongoDB副本集配置
- MongoDB副本集搭建
- vhr部门管理数据库设计与编程
- SpringBoot+Vue前后端分离,使用SpringSecurity完美处理权限问题(六)
- 权限管理模块中动态加载Vue组件
- axios请求封装和异常统一处理
- SpringSecurity中密码加盐与SpringBoot中异常统一处理
- SpringBoot+Vue前后端分离,使用SpringSecurity完美处理权限问题(二)
- SpringBoot+Vue前后端分离,使用SpringSecurity完美处理权限问题(一)
- Java操作MongoDB
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Python、PyGame游戏项目
- windows 认证机制
- 谷歌地球引擎python文档(GEE_python_API)
- react基础
- 基于ASP.NET Core 3.x的端点路由(Endpoint Routing)实现控制器(Controller)和操作(Action)分离的接口服务
- 流量转发映射
- 什么情况用ArrayList or LinkedList呢?
- 拒绝服务攻击
- 协议攻击(一)
- 百亿级图数据JanusGraph迁移之旅
- 协议攻击(二)
- 学不会设计模式,是因为你还没用过这个神奇的网站!
- windows域的创建
- 聊一下简易版的“Spring Boot”写的咋样了
- 手写“SpringBoot”近况:IoC模块已经完成