kafka增加topic的备份数量
一、困难点
建立topic的时候,可以通过指定参数 --replication-factor 设置备份数量。但是,一旦完成建立topic,则无法通过kafka-topic.sh 或者 命令修改replica数量。
二、解决办法
实际上,我们可以考虑一种 “另类” 的办法:可以利用 kafka-reassign-partitions.sh 命令对所有分区进行重新分布,在做分区重新分布的时候,通过增加每个分区的replica备份数量来达到目的。
本文将介绍如何利用 kafka-reassign-partitions.sh 命令增加topic的备份数量。
注意:以下命令使用到的topic名称、zookeeper的ip和port,需要读者替换成为实际集群的参数。
(假设kafka集群有4个broker,id分别为:1001,1002,1003,1004)
2.1、获取当前topic的所有分区分布在broker的情况
[root@tbds bin]# ./kafka-topics.sh --zookeeper 172.16.32.13:2181 --topic ranger_audits --describe
Topic:ranger_audits PartitionCount:10 ReplicationFactor:1 Configs:
Topic: ranger_audits Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 3 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 5 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 6 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 7 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: ranger_audits Partition: 8 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: ranger_audits Partition: 9 Leader: 1002 Replicas: 1002 Isr: 1002
可以看出,ranger_audits 这个topic有10个分区,每个分区只有一个feplica备份,分布在1001和1002两台broker上面。
下面我们需要将ranger_audits 的每个分区数据都增加到2个replica备份,且分布到4个broker上面。
2.2、创建增加replica备份数量的配置文件
(注意:尽量保持topic的原有每个分区的主备份不变化。因此,配置文件的每个分区的第一个broker保持不变。)
[root@tbds bin]# vim ../config/increase-replication-factor.json
{"version":1,
"partitions":[
{"topic":"ranger_audits","partition":0,"replicas":[1001,1003]},
{"topic":"ranger_audits","partition":1,"replicas":[1002,1004]},
{"topic":"ranger_audits","partition":2,"replicas":[1001,1003]},
{"topic":"ranger_audits","partition":3,"replicas":[1002,1004]},
{"topic":"ranger_audits","partition":4,"replicas":[1001,1003]},
{"topic":"ranger_audits","partition":5,"replicas":[1002,1004]},
{"topic":"ranger_audits","partition":6,"replicas":[1001,1003]},
{"topic":"ranger_audits","partition":7,"replicas":[1002,1004]},
{"topic":"ranger_audits","partition":8,"replicas":[1001,1003]},
{"topic":"ranger_audits","partition":9,"replicas":[1002,1004]}
]}
上面的配置文件说明,我们将topic的每个分区都增加了一个replica,且保持每个分区原有的主备份所在broker不变化,将每个分区新增的replica备份数据放到到1003和1004两个broker上面。
2.3、开始执行增加分区
[root@tbds bin]# ./kafka-reassign-partitions.sh -zookeeper 172.16.32.13:2181 --reassignment-json-file ../config/increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"ranger_audits","partition":3,"replicas":[1002]},{"topic":"ranger_audits","partition":9,"replicas":[1002]},{"topic":"ranger_audits","partition":8,"replicas":[1001]},{"topic":"ranger_audits","partition":1,"replicas":[1002]},{"topic":"ranger_audits","partition":4,"replicas":[1001]},{"topic":"ranger_audits","partition":2,"replicas":[1001]},{"topic":"ranger_audits","partition":5,"replicas":[1002]},{"topic":"ranger_audits","partition":0,"replicas":[1001]},{"topic":"ranger_audits","partition":6,"replicas":[1001]},{"topic":"ranger_audits","partition":7,"replicas":[1002]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,"partitions":[{"topic":"ranger_audits","partition":0,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":8,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":5,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":2,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":9,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":1,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":3,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":4,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":7,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":6,"replicas":[1001,1003]}]}
2.4、查看执行进度
[root@tbds bin]# ./kafka-reassign-partitions.sh -zookeeper 172.16.32.13:2181 --reassignment-json-file ../config/increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [ranger_audits,0] completed successfully
Reassignment of partition [ranger_audits,8] completed successfully
Reassignment of partition [ranger_audits,5] completed successfully
Reassignment of partition [ranger_audits,2] completed successfully
Reassignment of partition [ranger_audits,9] completed successfully
Reassignment of partition [ranger_audits,1] completed successfully
Reassignment of partition [ranger_audits,3] completed successfully
Reassignment of partition [ranger_audits,4] completed successfully
Reassignment of partition [ranger_audits,7] completed successfully
Reassignment of partition [ranger_audits,6] completed successfully
上面显示增加分区操作成功
2.5、再次查看topic的情况
[root@tbds bin]# ./kafka-topics.sh --zookeeper 172.16.32.13:2181 --topic ranger_audits --describe
Topic:ranger_audits PartitionCount:10 ReplicationFactor:2 Configs:
Topic: ranger_audits Partition: 0 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 1 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 2 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 3 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 4 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 5 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 6 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 7 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: ranger_audits Partition: 8 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: ranger_audits Partition: 9 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
从上面可以看出,备份数量增加成功
三、进一步思考
利用上述介绍的办法,除了可以用来增加topic的备份数量之外,还能够处理以下几个场景:
1、对topic的所有分区数据进行整体迁移。怎么理解呢?假如集群有N个broker,后来新扩容M个broker。由于新扩容的broker磁盘都是空的,原有的broker磁盘占用都很满。那么我们可以利用上述方法,将存储在原有N个broker的某些topic整体搬迁到新扩容的M个broker,进而实现kafka集群的整体数据均衡。
具体使用方法就是:通过编写2.2章节的配置文件,将topic的所有分区都配置到新的M个broker上面去,再执行excute,即可完成topic的所有分区数据整体迁移到新扩容的M个broker节点。
2、broker坏掉的情况。导致某些topic的某些分区的replica数量减少,可以利用kafka-reassign-partitions.sh增加replica;
3、kafka 某些broker磁盘占用很满,某些磁盘占用又很少。可以利用kafka-reassign-partitions.sh迁移某些topic的分区数据到磁盘占用少的broker,实现数据均衡;
4、kafka集群扩容。需要把原来broker的topic数据整体迁移到新的broker,合理利用新扩容的broker,实现负载均衡。
- 微信快速开发框架(六)-- 微信快速开发框架(WXPP QuickFramework)V2.0版本上线--源码已更新至github
- 数据结构之数组
- Android资源动态加载以及相关原理分析
- 微信快速开发框架(七)--发送客服信息,版本更新至V2.2 代码已更新至github
- 微信快速开发框架(八)-- V2.3--增加语音识别及网页获取用户信息,代码已更新至Github
- 微信公众平台快速开发框架 For Core 2.0 beta –JCSoft.WX.Core 5.2.0 beta发布
- Android系统层Watchdog机制源码分析
- 算法之插入排序
- Android Studio环境下搭建ReactNative
- Android实现两个ScrollView互相联动,同步滚动的效果
- 一个可以拖动的自定义Gridview代码
- android图片加载库Glide
- 密码最短长度为7,其中必须包含以下非字母数字字符1 完美解决方案
- android开发性能分析
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 从0开始打造UI框架:动态化框架Scrollview物理学算法解析
- 基于Keras的格式化输出Loss实现方式
- PHP信号处理机制的操作代码讲解
- php防止表单重复提交实例讲解
- Python实现封装打包自己写的代码,被python import
- 创建一个 Serverless 应用,真的没有这么难!
- PHP使用mongoclient简单操作mongodb数据库示例
- 基于TensorFlow的CNN实现Mnist手写数字识别
- django rest framework 自定义返回方式
- PHP+Ajax实现的检测用户名功能简单示例
- Yii框架学习笔记之session与cookie简单操作示例
- Ajax+Jpgraph实现的动态折线图功能示例
- Python闭包及装饰器运行原理解析
- Django中Q查询及Q()对象 F查询及F()对象用法
- keras.layer.input()用法说明