efk
准备三台centos7的服务器 两核两G的
关闭防火墙和SELinux
1
2
|
systemctl stop firewalld setenforce 0 |
1.每一台都安装jdk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
rpm -ivh jdk-8u131-linux-x64_.rpm 准备中... ################################# [100%] 正在升级/安装... 1:jdk1.8.0_131-2000:1.8.0_131-fcs ################################# [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar... java -version // 执行这条命令可以看到jdk的版本 就是jdk环境配置成功 java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode) |
2. 因为我装的是无图形化界面的centos 每一台虚拟机都需要
1
2
3
4
5
6
|
yum -y install wget cd /usr/local/src ##这个下载的是zookeeper wget http: //mirrors .cnnic.cn /apache/zookeeper/zookeeper-3 .4.14 /zookeeper-3 .4.14. tar .gz ##这个下载的是kafka wget http: //mirrors .tuna.tsinghua.edu.cn /apache/kafka/2 .2.0 /kafka_2 .11-2.2.0.tgz |
3. 解压zookeeper 然后移动到/usr/local/zookeeper kafka做同样的操作 kafka移动到/usr/local/kafka
1
2
3
4
|
tar -zxf zookeeper-3.4.14. tar .gz mv zookeeper-3.4.14 /usr/local/zookeeper tar -zxf kafka_2.11-2.2.0.tgz mv kafka_2.11-2.2.0 /usr/local/kafka |
4.在 /usr/local/zookeeper创建两个目录 zkdatalog zkdata
1
2
|
cd /usr/local/zookeeper mkdir {zkdatalog,zkdata} |
5.进入/usr/loca/zookeeper/conf 复制一个配置文件 修改复制出来的配置文件
cd /usr/local/zookeeper/conf cp zoo_sample.cfg zoo.cfg vim zoo.cfg ******************************* tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/zkdata datalogDir=/usr/local/zookeeper/zkdatalog clientPort=2181 server.1=192.168.18.140:2888:3888 server.2=192.168.18.141:2888:3888 server.3=192.168.18.142:2888:3888 *******************************
6. 每一台的操作都不一样
1
2
3
4
5
6
|
第一台 echo '1' > /usr/local/zookeeper/zkdata/myid 第二台 echo '2' > /usr/local/zookeeper/zkdata/myid 第三胎 echo '3' > /usr/local/zookeeper/zkdata/myid |
7.启动服务 启动的时候按顺序开启 第一台 第二台 第三台
1
2
3
4
5
|
cd /usr/local/zookeeper/bin . /zkServer .sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/ .. /conf/zoo .cfg Starting zookeeper ... STARTED |
8.查看是否开启 zookpeer 的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
第一台 显示这个状态表示成功 . /zkServer .sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/ .. /conf/zoo .cfg Mode: follower 第二台 . /zkServer .sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/ .. /conf/zoo .cfg Mode: leader 第三台 . /zkServer .sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/ .. /conf/zoo .cfg Mode: follower |
搭建kafka
第一台 cd /usr/local/kafka/config vim server.properties ********************* broker.id=1 advertised.listeners=PLAINTEXT://kafka01:9092 zookeeper.connect=192.168.18.140:2181,192.168.18.141:2181,192.168.18.142:2181 (三台服务器的IP地址) 第2台 cd /usr/local/kafka/config vim server.properties ********************* broker.id=2 advertised.listeners=PLAINTEXT://kafka02:9092 zookeeper.connect=192.168.18.140:2181,192.168.18.141:2181,192.168.18.142:2181 (三台服务器的IP地址) 第3台 cd /usr/local/kafka/config vim server.properties ********************* broker.id=3 advertised.listeners=PLAINTEXT://kafka03:9092 zookeeper.connect=192.168.18.140:2181,192.168.18.141:2181,192.168.18.142:2181 (三台服务器的IP地址)
10.vim /etc/hosts
添加以下内容在每一台服务器上 192.168.18.140 kafka01 192.168.18.141 kafka02 192.168.18.142 kafka03
11.启动kafka的命令 每一台都启动kafka
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties
启动之后可以查看端口9092
ss -ntlp | grep 9092
12.创建主题 topics
cd /usr/local/kafka/bin
./kafka-topics.sh --create --zookeeper 192.168.18.140:2181(这个ip地址随便写三台服务器中的一个就可以)--replication-factor 2 --partitions 3 --topic wg007
查看主题 如果可以显示刚刚创建的就是成功了
cd /usr/local/kafka/bin
./kafka-topics.sh --list --zookeeper 192.168.18.140:2181
模拟生产者 执行代码后就会有一个小的 >
cd /usr/local/kafka/bin
./kafka-console-producer.sh --broker-list 192.168.18.140:9092 --topic wg007 >
在第二台服务器上模拟消费者
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic wg007 --from-beginning
测试模拟生产者和消费者是否成功
在模拟生产者的服务器上写一些东西 可以在模拟消费者的服务器上可以看到表示成功如下所示
写一个脚本用来创建kafka的topic
cd /usr/local/kafka/bin vim kafka-create-topics.sh ################################# #!/bin/bash read -p "请输入一个你想要创建的topic:" topic cd /usr/local/kafka/bin ./kafka-topics.sh --create --zookeeper 192.168.18.140:2181 --replication-factor 2 --partitions 3 --topic ${topic}
创建一个新的yum源
vim /etc/yum.repo.d/filebeat.repo ***加入以下内容 [filebeat-6.x] name=Elasticsearch repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
安装
yum -y install filebeat
编辑filebeat的配置文件
cd /etc/filebeat vim filebeat.yml 修改以下内容 enabled: true paths: - /var/log/nginx/*.log //写这个路径的前提是安装nginx output.kafka: enabled: true # Array of hosts to connect to. hosts: ["192.168.18.140:9092","192.168.18.141:9092","192.168.18.142:9092"] topic: nginx5 //写这个nginx_log1 的前提是有nginx_log1的topic 上面有生产者的脚本
安装nginx
yum -y install epel* yum -y install nginx
启动nginx和filebeat
systemctl start filebeat systemctl enable filebeat systemctl start nginx
可以给ningx生产一些数据
yum -y install httpd-tools ab -n1000 -c 200 http://127.0.0.1/cccc //这条命令可以多执行几次
可以在安装filebeat的服务器上测试一下nginx的服务
curl -I 192.168.18.140:80
在模拟消费者的服务器上 如果可以显示一下内容 表示成功了
cd /usr/local/kafka/bin ./kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic nginx5(这里的topic的名字一定要和filebeat的配置文件里的一致) --from-beginning
如图所示
现在开始收集多个日志 system nginx secure 和日志 编辑filebeat的配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#讲filebeat的input改成下面的样子 filebeat.inputs: #这个是收集nginx的日志 - type : log enabled: true paths: - /var/log/nginx/ *.log //nginx 的日志文件 fields: log_topics: nginx5 // 这个是收集nginx的topic #这个是收集system的日志 - type : log enabled: true paths: - /var/log/messages //system 的日志文件目录 fields: log_topics: messages // 这个是收集system的topic #收集secure的日志 - type : log enabled: true paths: - /var/log/secure //secure 的日志文件 fields: log_topics: secure // 这个是收集secure的topic output.kafka: enabled: true hosts: [ "192.168.18.140:9092" , "192.168.18.141:9092" , "192.168.18.142:9092" ] topic: '%{[fields][log_topics]}' |
注意:一点更要创建三个topic 就是上面的配置文件提到的topic 可以使用上面的脚本创建topic 重启filebeat
systemctl restart filebeat
我是用了三台服务器来做EFK集群
接下来在第二胎安装logstash 在第三胎安装ES集群(就是elasticsearch和kibana)
安装
1
2
3
4
5
|
安装logstash在第二台 rpm -ivh logstash-6.6.0.rpm 安装 kibana 和elasticsearch rpm -ivh elasticsearch-6.6.2.rpm rpm -ivh kibana-6.6.2-x86_64.rpm |
编辑elasticsearch的配置文件
1
2
3
4
|
vim /etc/elasticsearch/elasticsearch .yml ############################ network.host: 192.168.18.142 http.port: 9200 |
启动elasticsearch
1
|
systemctl restart elasticsearch |
编辑kibana的配置文件
1
2
3
4
5
6
7
8
|
vim /etc/kibana/kibana .yml ###################### server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: [ "http://192.168.18.142:9200" ] 然后启动kibana systemctl restart kibana |
现在开始 编写logstash的三个配置文件
cd /etc/logstash/conf.d
现在是messages的 vim messages.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
input { kafka { bootstrap_servers => [ "192.168.18.140:9092,192.168.18.141:9092,192.168.18.142:9092" ] group_id => "logstash" topics => "messages" consumer_threads => 5 } } output { elasticsearch { hosts => "192.168.18.142:9200" index => "messages-%{+YYYY.MM.dd}" } } |
现在是nginx的 vim nginx.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
input { kafka { bootstrap_servers => [ "192.168.18.140:9092,192.168.18.141:9092,192.168.18.142:9092" ] group_id => "logstash" topics => "nginx5" consumer_threads => 5 } } filter { grok { match => { "message" => "%{NGINXACCESS}" } } } output { elasticsearch { hosts => "192.168.18.142:9200" index => "nginx1_log-%{+YYYY.MM.dd}" } } |
现在是secure的 vim secure.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
input { kafka { bootstrap_servers => [ "192.168.18.140:9092,192.168.18.141:9092,192.168.18.142:9092" ] group_id => "logstash" topics => "secure" consumer_threads => 5 } } output { elasticsearch { hosts => "192.168.18.142:9200" index => "secure-%{+YYYY.MM.dd}" } } |
添加管道
1
2
3
4
5
6
7
8
|
vim /etc/logstash/pipelines .yml - pipeline. id : messages path.config: "/etc/logstash/conf.d/messages.conf" - pipeline. id : nginx path.config: "/etc/logstash/conf.d/nginx.conf" - pipeline. id : secure path.config: "/etc/logstash/conf.d/secure.conf" |
正则匹配
cd /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns vim nginx_access URIPARAM1 [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]* NGINXACCESS %{IPORHOST:client_ip} (%{USER:ident}|- ) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} (%{NOTSPACE:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)" %{NUMBER:status} (?:%{NUMBER:bytes}|-) "(?:%{URI:referrer}|-)" "%{GREEDYDATA:agent}"
重启logstash systemctl restart logstash
可以在第二台机器上查看模拟消费者的状态 messages的
执行下面的命令可以显示出日志内容就是成功
1
2
|
cd /usr/local/kafak/bin . /kafka-console-consumer .sh --bootstrap-server 192.168.18.141:9092 --topic messages --from-beginning<br><br> |
可以在第二台机器上查看模拟消费者的状态 secure的
执行下面的命令可以显示出日志内容就是成功
1
2
|
cd /usr/local/kafak/bin . /kafka-console-consumer .sh --bootstrap-server 192.168.18.141:9092 --topic secure --from-beginning |
可以在第二台机器上查看模拟消费者的状态 nginx的 nginx的可以生产一些日志文件 创建一些访问记录
执行下面的命令可以显示出日志内容就是成功
1
2
|
cd /usr/local/kafak/bin . /kafka-console-consumer .sh --bootstrap-server 192.168.18.141:9092 --topic nginx5 --from-beginning |
原文地址:https://www.cnblogs.com/gaiting/p/12088310.html
- Spring Security入门(二):基于数据库验证
- flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题
- hduoj-----(2896)病毒侵袭(ac自动机)
- MySQL数据库(二):基本管理
- Golang编程实现生成n个从a到b不重复随机数的方法
- TiDB 在株式会社 FUNYOURS JAPAN 的应用
- MySQL数据库(三):数据类型
- spark开发环境详细教程2:window下sbt库的设置
- hdu----(2222)Keywords Search(ac自动机)
- MySQL数据库(四):约束条件
- hdu----(2084)数塔(dp)
- golang简单tls协议用法完整示例
- spark开发环境详细教程1:IntelliJ IDEA使用详细说明
- MySQL数据库(五):索引
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 你的GitHub项目被封存到北极了吗?
- Logstash: 如何创建可维护和可重用的 Logstash 管道
- 手把手教你微信好友头像形成指定的文字
- Logstash: 应用实践 - 装载 CSV 文档到 Elasticsearch
- LeetCode 剑指 Offer 28. 对称的二叉树
- Mysql拼接查询结果
- 手把手教你如何重建二叉树(超精彩配图)
- 一文搞定插入排序算法
- LeetCode 107. 二叉树的层次遍历 II
- LeetCode 103. 二叉树的锯齿形层次遍历
- 我是怎么一步一步调试出来二叉树的遍历(超精彩配图),二叉树遍历再也不用愁了
- 重中之重的二分查找
- LeetCode 剑指Offer 面试题27. 二叉树的镜像
- 一文搞定选择排序算法
- 一文搞定冒泡排序算法