kafka_2.11-2.0.0_常用操作
时间:2022-07-26
本文章向大家介绍kafka_2.11-2.0.0_常用操作,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
参考博文:kafka 1.0 中文文档(九):操作
参考博文:kafka集群管理工具kafka-manager部署安装
以下操作可以在mini01、mini02、mini03任意一台操作即可。
1. kafka通过网页管理
参考博文:kafka集群管理工具kafka-manager部署安装
2. 创建topic
1 # 参数说明 --replication-factor 2 表示有2个副本
2 # --partitions 4 表示有4个分区
3 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 2 --partitions 4 --topic test
4 Created topic "test".
5 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 3 --partitions 4 --topic zhang
6 Created topic "zhang".
7 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看
8 zhang
9 test
2.1. 各主机信息查看
mini01
1 [yun@mini01 logs]$ pwd
2 /app/kafka/logs
3 [yun@mini01 logs]$ ll
4 total 160
5 ………………
6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-1
7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-2
8 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-3
mini02
1 [yun@mini02 logs]$ pwd
2 /app/kafka/logs
3 [yun@mini02 logs]$ ll
4 total 260
5 ………………
6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-0
7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-2
mini03
1 [yun@mini03 logs]$ pwd
2 /app/kafka/logs
3 [yun@mini03 logs]$ ll
4 total 132
5 ………………
6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-0
7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-1
8 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-3
3. 修改topic
3.1. 增加分区数
注意:分区数不能减少
Kafka目前不支持减少主题的分区数量。
1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181
2 __consumer_offsets
3 test
4 test01
5 test02
6 test03
7 test04
8 zhang
9 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01
10 Topic:test01 PartitionCount:5 ReplicationFactor:1 Configs:
11 Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
12 Topic: test01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
13 Topic: test01 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
14 Topic: test01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
15 Topic: test01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
16 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 2 --topic test01 # 失败,分区数不能减少
17 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
18 Error while executing topic command : The number of partitions for a topic can only be increased. Topic test01 currently has 5 partitions, 2 would not be an increase.
19 [2018-09-16 09:12:40,034] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test01 currently has 5 partitions, 2 would not be an increase.
20 (kafka.admin.TopicCommand$)
21 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 7 --topic test01 # 增加分区数
22 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
23 Adding partitions succeeded!
24 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01
25 Topic:test01 PartitionCount:7 ReplicationFactor:1 Configs:
26 Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
27 Topic: test01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
28 Topic: test01 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
29 Topic: test01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
30 Topic: test01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
31 Topic: test01 Partition: 5 Leader: 2 Replicas: 2 Isr: 2
32 Topic: test01 Partition: 6 Leader: 0 Replicas: 0 Isr: 0
4. 删除topic
1 # server.properties中设置delete.topic.enable=true 【当前版本默认就是true】否则只是标记删除或者直接重启
2 [yun@mini01 ~]$ kafka-topics.sh --delete --zookeeper mini01:2181 --topic test
3 Topic test is marked for deletion.
4 Note: This will have no impact if delete.topic.enable is not set to true.
5 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看 只有 zhang,则表示真的删除了
6 zhang
5. 查看所有topic
1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181
2 __consumer_offsets
3 test
4 zhang
6. 查看某个Topic的详情
1 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic zhang
2 Topic:zhang PartitionCount:4 ReplicationFactor:3 Configs:
3 Topic: zhang Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4 Topic: zhang Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
5 Topic: zhang Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
6 Topic: zhang Partition: 3 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
7. 通过shell命令生产消息
7.1. 输入单条数据
1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic zhang
2 >111
3 >222
4 >333
5 >444
6 >555
7 >666
8 >777
9 >888
10 >999
7.2. 批量导入数据
1 [yun@mini01 zhangliang]$ kafka-console-producer.sh --broker-list mini01:9092 --topic liang < 001.info
8. 通过shell命令消费消息
1 # --from-beginning 从最开始读取
2 # kafka-console-consumer.sh --zookeeper mini01:2181 --from-beginning --topic zhang # 老版本
3 [yun@mini01 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --from-beginning --topic zhang
4 111
5 555
6 999
7 333
8 777
9 444
10 888
11 222
12 666
9. 消费组消费
9.1. 创建topic
1 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order
2 Created topic "order".
3 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 查看所有topic列表
4 __consumer_offsets
5 order
6 test
7 zhang
8 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 查看topic详情
9 Topic:order PartitionCount:4 ReplicationFactor:1 Configs:
10 Topic: order Partition: 0 Leader: 0 Replicas: 0 Isr: 0
11 Topic: order Partition: 1 Leader: 1 Replicas: 1 Isr: 1
12 Topic: order Partition: 2 Leader: 2 Replicas: 2 Isr: 2
13 Topic: order Partition: 3 Leader: 0 Replicas: 0 Isr: 0
9.2. 生产消息
1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic order
2 >111
3 >222
4 >333
5 >444
6 >555
9.3. 消费组消费消息
mini02机器上运行
1 # --group 指定组
2 [yun@mini02 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
mini03机器上运行
1 # --group 指定组
2 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
3
4 # 新开一个窗口执行
5 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
表示order-group消费组有3个消费者,消费topic order的信息。
9.4. 消费组消费位置信息查看
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group # 查看消费情况
2
3 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
4 order 0 4 4 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1
5 order 1 5 5 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1
6 order 2 5 5 0 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1
7 order 3 4 4 0 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1
8 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group --members --verbose
9
10 CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
11 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1 1 order(2)
12 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 2 order(0,1)
13 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1 1 order(3)
10. 管理消费组
10.1. 查看所有消费组
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
2 console-consumer-26727
3 console-consumer-92984
4 console-consumer-60755
5 console-consumer-11661
6 console-consumer-31713
7 console-consumer-20244
8 console-consumer-65733
10.2. 查看消费组消费情况【消费位置】
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-26727
2 Consumer group 'console-consumer-26727' has no active members.
3
4 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
5 zhang 3 11 11 0 - - -
6 zhang 0 9 9 0 - - -
7 zhang 2 8 8 0 - - -
8 zhang 1 11 11 0 - - -
9 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733
10
11 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
12 zhang 0 11 11 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
13 zhang 1 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
14 zhang 2 10 10 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
15 zhang 3 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
--members
1 # --members 此选项提供使用者组中所有活动成员的列表。
2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members
3
4 CONSUMER-ID HOST CLIENT-ID #PARTITIONS
5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4
--verbose
1 # --verbose 这个选项还提供了分配给每个成员的分区。
2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members --verbose
3
4 CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4 zhang(0,1,2,3)
--state
1 # --state 这个选项提供了有用的组级信息。
2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --state
3
4 COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
5 mini01:9092 (0) range Stable 1
10.3. 删除一个或多个用户组
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
2 console-consumer-3826
3 console-consumer-92984
4 console-consumer-60755
5 console-consumer-11661
6 console-consumer-31713
7 console-consumer-20244
8 console-consumer-65733
9 # 删除一个或多个组
10 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --delete --group console-consumer-11661 --group console-consumer-31713
11 Deletion of requested consumer groups ('console-consumer-31713', 'console-consumer-11661') was successful.
12 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
13 console-consumer-3826
14 console-consumer-92984
15 console-consumer-60755
16 console-consumer-20244
17 console-consumer-65733
11. 增加副本因子
1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order
2 Created topic "order".
3 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order
4 Topic:order PartitionCount:4 ReplicationFactor:1 Configs:
5 Topic: order Partition: 0 Leader: 0 Replicas: 0 Isr: 0
6 Topic: order Partition: 1 Leader: 1 Replicas: 1 Isr: 1
7 Topic: order Partition: 2 Leader: 2 Replicas: 2 Isr: 2
8 Topic: order Partition: 3 Leader: 0 Replicas: 0 Isr: 0
要求:topic order 的副本数由1变为2, 之前每个分区在哪台机器上在上面已给出。
说明:part 0分布在群集0,1; part 1分布在集群1,2;part 2 分布在集群2,0;part 3分布在集群0,1。
11.1. 创建一个重新调整计划文件
1 [yun@mini01 kafka_20180916]$ cat increase-replication-factor.json
2 {
3 "version":1,
4 "partitions":[
5 {"topic": "order","partition": 0,"replicas": [0,1]},
6 {"topic": "order","partition": 1,"replicas": [1,2]},
7 {"topic": "order","partition": 2,"replicas": [2,0]},
8 {"topic": "order","partition": 3,"replicas": [0,1]}
9 ]
10 }
11.2. 语句执行
1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --execute
2 Current partition replica assignment
3
4 {"version":1,"partitions":[{"topic":"order","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"order","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"order","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"order","partition":0,"replicas":[0],"log_dirs":["any"]}]}
5
6 Save this to use as the --reassignment-json-file option during rollback
7 Successfully started reassignment of partitions.
11.3. 查看是否执行成功
1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --verify
2 Status of partition reassignment:
3 Reassignment of partition order-0 completed successfully
4 Reassignment of partition order-1 completed successfully
5 Reassignment of partition order-2 completed successfully
6 Reassignment of partition order-3 completed successfully
11.4. 再次查看该topic详情
1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 由下可见分配成功
2 Topic:order PartitionCount:4 ReplicationFactor:2 Configs:
3 Topic: order Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
4 Topic: order Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
5 Topic: order Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
6 Topic: order Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
12. 创建partitions时在broker的分配机制
1 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 5 --topic test01
2 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 11 --topic test02
注意在各机器上partitions的分布
1 mini01
2 test01-0
3 test01-3
4 test02-2
5 test02-5
6 test02-8
7
8 mini02
9 test01-1
10 test01-4
11 test02-0
12 test02-3
13 test02-6
14 test02-9
15
16 mini03
17 test01-2
18 test02-1
19 test02-10
20 test02-4
21 test02-7
- 缓冲区溢出攻击初学者手册(更新版)
- 在Python机器学习中如何索引、切片和重塑NumPy数组
- HelloWorld,我的第一趟旅程出发点
- Android OpenGL开发实践 - GLSurfaceView对摄像头数据的再处理
- 走进科学:对七夕“超级病毒”XX神器的逆向分析
- 机器学习 - 朴素贝叶斯分类器的意见和文本挖掘
- 认知指纹:颠覆性的身份认证技术
- 跟我学姿势:极客教你如何科学的看电影
- Discuz 5.x/6.x/7.x投票SQL注入分析
- 论如何高效的挖掘漏洞
- Rxjava + retrofit + dagger2 + mvp搭建Android框架
- 走进科学:如何正确的隐藏自己的行踪
- 比特儿(Bter.com) 比特币交易平台被盗事件全解析
- BitTorrent Bleep:无法被监控的聊天软件
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- MySQL 切换数据库、用户卡死:“You can turn off this feature to get a quicker startup with -A“处理方法
- MySQL 数据库mysqlbinlog使用问题:unknown variable ‘default-character-set=utf8‘.解决方法
- Python 技术篇-pip安装提示:‘pip‘ 不是内部或外部命令,也不是可运行的程序或批处理文件,问题解决方法
- Jaskson精讲第6篇-自定义JsonSerialize与Deserialize实现数据类型转换
- let和var和const
- Jupyter 编写python代码实现代码自动补齐功能设置实例演示
- 第37期:从头学二叉搜索树(面试常考)
- Jupyter 工具的安装与使用方法,jupyter运行python代码演示,好用的python编辑器推荐!
- Nginx相关配置与操作
- Python 技术篇-全局与当前socket超时连接时间设置方法实例演示,查看socket超时连接时间
- 给 JDK 报了一个 P4 的 Bug,结果居然……
- Python 套接字-判断socket服务端有没有关闭的方法实例演示,查看socket运行状态
- docker安装logstash
- Rook Operator 源码分析(1) - osd 启动的流程
- Python 技术篇-利用pyqt5库监听剪切板变动,clipboard.dataChanged.connect()剪切板监听