Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署
时间:2022-04-22
本文章向大家介绍Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署,主要内容包括Logstash工作原理、启动kafka、创建主题、测试环境、输入测试、读取测试、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
基于Logstash跑通Kafka还是需要注意很多东西,最重要的就是理解Kafka的原理。
Logstash工作原理
由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:
- 1 生产者的负载与消费者的负载解耦
- 2 消费者按照自己的能力fetch数据
- 3 消费者可以自定义消费的数量
另外,由于broker采用了主题topic-->分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。
Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader-->followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。
由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash-->kafka-->logstash的方案了。
接下来,按照下面的步骤就可以实现logstash与kafka的对接了。
启动kafka
启动zookeeper:
$zookeeper/bin/zkServer.sh start
启动kafka:
$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &
创建主题
创建主题:
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1
查看主题:
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
测试环境
执行生产者脚本:
$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello
执行消费者脚本,查看是否写入:
$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello
输入测试
input{
stdin{}
}
output{
kafka{
topic_id => "hello"
bootstrap_servers => "192.168.0.4:9092" # kafka的地址
batch_size => 5
}
stdout{
codec => rubydebug
}
}
读取测试
logstash配置文件:
input{
kafka {
codec => "plain"
group_id => "logstash1"
auto_offset_reset => "smallest"
reset_beginning => true
topic_id => "hello"
#white_list => ["hello"]
#black_list => nil
zk_connect => "192.168.0.5:2181" # zookeeper的地址
}
}
output{
stdout{
codec => rubydebug
}
}
- Debian 和Ubuntu Mono 3.0 部署包
- Apache Storm内部原理分析
- 使用lrucache和diskLrucache实现照片墙
- android 减少图片出现oom错误
- android分包方案
- 系统负载能力浅析
- 微软正式发布了Microsoft.Bcl.Async
- parcel和parcelable
- Windows Phone 7 WebBrowser 中文乱码问题
- Java并发包类总览
- 作业调度框架 Quartz.NET 2.0 beta 发布
- 系统捕获异常并发送到服务器
- 当调用GetAuthorizationGroups() 的错误-“试图访问卸载的应用程序域“(Exception from HRESULT: 0x80131014)解决方案
- WCF 4.0路由服务Routing Service
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 独家 | 手把手教你用Python的Prophet库进行时间序列预测
- 【Git】:基础的基础
- windows解决SpringBoot启动时:APPLICATION FAILED TO START
- 【Git】:基础操作篇
- 低光照图像增强算法汇总
- 用Python解决女朋友看电影没字幕的需求
- 【Git】:基础协作篇
- 最近,我用pandas处理了一把大数据……
- 【编译技术】:AST——基础的基础
- 【编译技术】:Babel——基础的基础
- 【编译技术】:解读 Babel AST Format——01
- 【编译技术】:解读 Babel AST Format——02
- PyTorch入门笔记-PyTorch初见
- ssh 连接 Linux 服务器并安装 Anaconda
- Ant Design Vue 报错:Failed to resolve directive: ant-portal的解决办法