Kafka 独立消费者
以前我们讨论的消费组,都是 group 的形式,group 可以自动地帮助消费者分配分区,且在发生异常时,还能自定地进行重平衡(Rebalance)。正常来说,group 帮助用户实现自动监听分区消费,但是在用户需要指定分区进行精确消费的场景下,由于 group 的重平衡机制,会打破这种消费方式,这不前段时间某项目就有个需求是这样的:
消息源端有若干个,每个消息源都会产生不同的消息,目标端也有若干个,每个目标端需要消费指定的消息源类型。
在以往,由于消费组的重平衡机制会打乱这种消费方式,只能申请多个主题对消息进行隔离,每个消息源将消息发送到指定主题,目标端监听指定的主题。这么做肯定没有指定分区消费这么优雅了,每增加一种消息源,都需要新增一个 topic,且消费粒度不能灵活组合。
针对以上问题,Kafka 的提供了独立消费者模式,可以消费者可以指定分区进行消费,如果只用一个 topic,每个消息源启动一个生产者,分别发往不同的分区,消费者指定消费相关的分区即可,用如下图所示:
但是 Kafka 独立消费者也有它的限定场景:
1、 Kafka 独立消费者模式下,Kafka 集群并不会维护消费者的消费偏移量,需要每个消费者维护监听分区的消费偏移量,因此,独立消费者模式与 group 模式切勿混合使用!
2、group 模式的重平衡机制在消费者异常时可将其监听的分区重分配给其它正常的消费者,使得这些分区不会停止被监听消费,但是独立消费者由于是手动进行监听指定分区,因此独立消费者发生异常时,并不会将其监听的分区进行重分配,这就会造成某些分区消息堆积。因此,在该模式下,独立消费者需要实现高可用,例如独立消费者使用 K8s Deployment 进行部署。
下面将演示如何使用 Kafka#assgin 方法手动订阅指定分区进行消费:
public static void main(String[] args) {
Properties kafkaProperties = new Properties();
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProperties.put("bootstrap.servers", "localhost:9092");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProperties);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition("test_topic", 0));
partitions.add(new TopicPartition("test_topic", 1));
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, byte[]> record : records) {
System.out.printf("topic:%s, partition:%s%n", record.topic(), record.partition());
}
}
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- CentOS8更换yum源后出现同步仓库缓存失败的问题
- log4j配置方式
- 基于MHA搭建MySQL Replication集群高可用架构
- PyQt5 技巧篇-解决相对路径无法加载图片问题,styleSheet通过"相对"路径加载图片,python获取当前运行文件的绝对路径。
- 基于MMM搭建MySQL Replication集群高可用架构
- Python 技术篇-按任意格式灵活获取日期、时间、年月日、时分秒。日期格式化。
- 当删库时如何避免跑路
- Python 句法错误:"SyntaxError: invalid character in identifier",原因及解决方法
- Python3 多线程问题:ModuleNotFoundError: No module named 'thread',原因及解决办法。
- 文件传输和秒传
- 关于数据库的各种备份与还原姿势详解
- Python 技术篇-多线程的2种创建方法,多线程的简单用法,快速上手。
- Python 技术篇-调用浏览器访问指定网页,一行代码实现。非Selenium。
- 数据库热备份神器 - XtraBackup
- Python 技术篇-读取文件,将内容保存dict字典中。去掉字符串中的指定字符方法。dict字典的遍历。