kafka的生产者分区机制原理(二)
时间:2022-07-23
本文章向大家介绍kafka的生产者分区机制原理(二),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
kafka分区概念
- 消费者给kafka发送消息的时候相同的topic可以有多个分区。且分区可以被放在不同的服务器,消费者的消息只会被发送到一个分区里,比如说某个topic有100个分区,消费者产生了100条消息,此时各个分区都有一条消息。且每个分区都会有多个副本,且以其中的一个分区为leader,其他的分区为fllower。
kafka为什么要分区?
- 负载均衡,实现系统的高伸缩性。为什么这么说呢?因为不同的分区可以放置在不通的机器节点上,当我们服务的吞吐量特别大的时候,可以通过增加节点来进行提高吞吐量。
- 实现业务逻辑上的功能:实现业务级别的消息顺序的问题。
分区策略
- 分区策略指的是决定生产者将消息发送到那个分区的算法。
- kafka是有默认的分区策略
- 轮询策略,也就是给生产者向分区按顺序去发送消息。
private int defaultPartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int partition = 0;
defcount.incrementAndGet();
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = toPositive(nextValue) % availablePartitions.size();
partition = availablePartitions.get(part).partition();
} else {
partition = toPositive(nextValue) % numPartitions;
}
} else {
partition = toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
return partition;
}
2.随机策略 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
- 按消息键保序策略 也称 Key-ordering 策略,。Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
- 自定义分区策略: 需要显式的去配置参数partitioner.class
int partition = 0;
if(key<100){
partition = 0;
}else if(key<200){
partition = 1;
}else{
partition = 2;
}
ProducerRecord<String,String> records = new ProducerRecord<String,String>(TOPIC,partition,key,value);
kafkaProducer.send(records);
自定义分区的完整包名:
props.put("partitioner.class", "xx.xx.KafkaCustomPartitioner");
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 基于maven+ssm的增删改查之批量删除之全选全不选
- mybatis动态sql之初探(学习where、if、trim标签)
- 从源码角度看 PHP 字符串类型转换
- 静态链表
- 中流砥柱java的动态代理
- java的反射机制到底是做什么的?
- Java中是否直接可以使用enum进行传输
- PHP 恶意程序简单分析
- springboot之相关环境设置
- springboot之第一个springboot程序
- 「查缺补漏」巩固你的RocketMQ知识体系
- springboot之场景启动器
- ICLR2020 | 深度自适应Transformer
- springboot之自动配置
- golang--连接redis数据库并进行增删查改