kafka生产者和消费者的基本操作
文章目录
在学习kafka集群之前,先来学习下单节点kafka的一些基本操作,包括安装及一些基本命令,以便后续集群环境的学习。
1.单节点安装
kafka必须依赖于zookeeper,假定当前zookeeper集群已搭建完成(如不熟悉zookeeper集群如何搭建,请参考http://www.jianshu.com/p/0e813f6a6049)。
环境依赖:
1.已安装完毕的zookeeper集群:
192.168.162.52:2181
192.168.162.235:2181
192.168.162.239:2181
2.软件环境
centos6.8
jdk "1.8.0_111"
3.kafka软件
cd /opt
mkdir kafka
cd /opt/kafka
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz
tar -zxvf kafka_2.12-0.11.0.0.tgz
kafka配置 server.properties:
1.log.dirs 修改到指定目录
log.dirs=/opt/kafka/kafka_2.12-0.11.0.0/kafka-logs
2.zookeeper配置
zookeeper.connect=192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181
server.properties 参数配置详解:
属性 |
默认值 |
说明 |
---|---|---|
broker.id |
/tmp/kafka-logs |
Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录 |
port |
9092 |
BrokerServer接受客户端连接的端口号 |
zookeeper.connect |
null |
Zookeeper的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3 需要注意的是,消费者的参数要和此参数一致 |
message.max.bytes |
1000000 |
务器可以接收到的最大的消息大小。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费。 |
num.io.threads |
8 |
服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量。 |
queued.max.requests |
500 |
/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。 |
socket.send.buffer.bytes |
100 * 1024 |
The SO_SNDBUFF buffer the server prefers for socket connections. |
socket.receive.buffer.bytes |
100 * 1024 |
The SO_RCVBUFF buffer the server prefers for socket connections. |
socket.request.max.bytes |
100 * 1024 * 1024 |
服务器允许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size. |
num.partitions |
1 |
默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5 |
log.segment.bytes |
1024 * 1024 * 1024 |
Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。 |
log.roll.{ms,hours} |
24 * 7 hours |
新建segment文件的时间,此值可以被topic级别的参数覆盖。 |
log.retention.{ms,minutes,hours} |
7 days |
Kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。 |
log.retention.bytes |
-1 |
每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。 |
log.retention.check.interval.ms |
5 minutes |
删除策略的检查周期 |
auto.create.topics.enable |
true |
自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。 |
default.replication.factor |
1 |
默认副本数量,建议改为2。 |
replica.lag.time.max.ms |
10000 |
在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除。 |
replica.lag.max.messages |
4000 |
如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除。 |
replica.socket.timeout.ms |
30 * 1000 |
replica向leader发送请求的超时时间。 |
zookeeper.session.timeout.ms |
6000 |
ZooKeeper session 超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。 此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。 |
zookeeper.connection.timeout.ms |
6000 |
客户端连接zookeeper的超时时间。 |
zookeeper.sync.time.ms |
2000 |
H ZK follower落后 ZK leader的时间。 |
controlled.shutdown.enable |
true |
允许broker shutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders转移到其它brokers上,建议启用,增加集群稳定性。 |
kafka 启动:
cd /opt/kafka/kafka_2.12-0.11.0.0
nohup ./bin/kafka-server-start.sh ./config/server.properties &
2. Topic
2.1创建topic
cd /opt/kafka/kafka_2.12-0.11.0.0
bin/kafka-topics.sh --create --topic test0 --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --partitions 1 --replication-factor 1
--create: 指定创建topic动作
--topic:指定新建topic的名称
--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--config:指定当前topic上有效的参数值,参数列表参考文档为: [Topic-level configuration](http://kafka.apache.org/documentation/)
--partitions:指定当前创建的kafka分区数量,默认为1个
--replication-factor:指定每个分区的复制因子个数,默认1个
2.2 查看Topic
./bin/kafka-topics.sh --list --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181
2.3 查看topic描述
./bin/kafka-topics.sh --describe --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181
--describe: 指定是展示详细信息命令
--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--topic:指定需要展示数据的topic名称
2.4 修改topic
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --partitions 10
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --partitions 3 ## Kafka分区数量只允许增加,不允许减少
2.5 删除topic
默认情况下Kafka的Topic不能直接删除的,需要进行相关参数配置
bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181
默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,可以通过如下两种方式:
方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除
3.启动生产者发送消息
./bin/kafka-console-producer.sh --broker-list 192.168.162.239:9092 --topic test0
生产者部分参数
属性 |
默认值 |
说明 |
---|---|---|
metadata.broker.list |
启动时producer查询brokers的列表,可以是集群中所有brokers的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之建立socket连接。格式是:host1:port1,host2:port2。 |
|
request.timeout.ms |
10000 |
Broker等待ack的超时时间,若等待时间超过此值,会返回客户端错误信息。 |
producer.type |
sync |
同步异步模式。async表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker性能,推荐设置为异步。 |
serializer.class |
kafka.serializer.DefaultEncoder |
序列号类,.默认序列化成 byte[] 。 |
key.serializer.class |
Key的序列化类,默认同上。 |
|
partitioner.class |
kafka.producer.DefaultPartitioner |
Partition类,默认对key进行hash。 |
compression.codec |
none |
指定producer消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”。 |
compressed.topics |
null |
启用压缩的topic名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的topic有效,若本参数为空,则对所有topic有效。 |
message.send.max.retries |
3 |
Producer发送失败时重试次数。若网络出现问题,可能会导致不断重试。 |
queue.buffering.max.ms |
5000 |
启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1秒的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages |
10000 |
采用异步模式时producer buffer 队列里最大缓存的消息数量,如果超过这个数值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms |
-1 |
当达到上面参数值时producer阻塞等待的时间。如果值设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉。若值设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages |
200 |
采用异步模式时,一个batch缓存的消息数量。达到这个数量值时producer才会发送消息。 |
4.启动消费者接收消息
./bin/kafka-console-consumer.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --topic test0 --from-beginning
消费者部分参数
属性 |
默认值 |
说明 |
---|---|---|
group.id |
Consumer的组ID,相同goup.id的consumer属于同一个组。 |
|
zookeeper.connect |
Consumer的zookeeper连接串,要和broker的配置一致。 |
|
consumer.id |
null |
如果不设置会自动生成。 |
socket.timeout.ms |
30 * 1000 |
网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定。 |
socket.receive.buffer.bytes |
64 * 1024 |
The socket receive buffer for network requests. |
fetch.message.max.bytes |
1024 * 1024 |
查询topic-partition时允许的最大消息大小。consumer会为每个partition缓存此大小的消息到内存,因此,这个参数可以控制consumer的内存使用量。这个值应该至少比server允许的最大消息大小大,以免producer发送的消息大于consumer允许的消息。 |
num.consumer.fetchers |
1 |
The number fetcher threads used to fetch data. |
auto.commit.enable |
true |
如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。 |
auto.commit.interval.ms |
60 * 1000 |
Consumer提交offset值到zookeeper的周期。 |
queued.max.message.chunks |
2 |
用来被consumer消费的message chunks 数量, 每个chunk可以缓存fetch.message.max.bytes大小的数据量。 |
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Java单元测试——Mock技术配置
- 简单的场景分析LinearLayout 源码
- 避免栽坑之掌握Jenkins工作原理
- 如何检测JavaScript中的死循环?
- vue插槽2.6.0+
- 聊聊claudb的transaction command
- Lombok 的作者,成功讨伐 IntelliJ IDEA!
- springboot2之优雅处理返回值
- 通过NVM管理Node.js多版本
- 探究 Android 签名机制和原理
- “有迹可循”的灰盒测试分析
- 想用 Gitee 做图床工具,失败了~~
- Nginx系列:配置跳转的常用方式
- Python骚操作:一行代码实现探索性数据分析
- 吊打 Tomcat ,Undertow 性能很炸!!