实操 | kafka如何手动异步提交offset
时间:2022-07-25
本文章向大家介绍实操 | kafka如何手动异步提交offset,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
放弃不难,但坚持很酷~
kafka_2.11-1.1.0
Kafka 手动异步提交 offset 的步骤大概分为以下几步,如下图所示:
1、配置手动提交
enable.auto.commit 修改为 false 。
2、订阅 topic
consumer.subscribe(Arrays.asList("topic name"));
3、获取 topic 各分区当前读取到的最后一条记录的offset
首先定义一个全局变量:
//用来记录当前消费的偏移
private static Map<TopicPartition, Long> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 获取当前读取到的最后一条记录的offset
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 提交offset
offsets.put(partition, lastOffset + 1);
}
至于为什么消费者提交 offsets 时要 +1,在《Kafka消费者 之 如何提交消息的偏移量》中的概述章节里面也给出了答案。
4、手动异步提交 offset
首先定义一个全局变量:
//用来记录当需要提交的偏移
private static Map<TopicPartition, OffsetAndMetadata> commitOffset = new HashMap<>();
//
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
commitOffset.put(entry.getKey(), new OffsetAndMetadata(offsets.get(entry.getKey())));
logger.info("partition[{}], 当前待提交kafka偏移:[{}]", entry.getKey().partition(), offsets.get(entry.getKey()));
}
// 异步提交offset
consumer.commitAsync(commitOffset, (offsets, exception) -> {
if (exception != null) {
logger.error("fail to commit offsets {}, {}", offsets, exception);
// 同步提交,来做offset提交最后的保证。
consumer.commitSync();
}
});
清空:
commitOffset.clear();
offsets.clear();
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 函数指针的实例讲解(下)
- C语言"陷阱" 之运算顺序
- 内核中的宏
- IIC设备驱动实例调试
- 并发与竞态(原子操作)
- Docker 入门到实战教程(一)介绍Docker
- Docker 入门到实战教程(二)安装Docker
- Docker 入门到实战教程(三)镜像和容器
- Docker 入门到实战教程(四)容器链接
- Docker 入门到实战教程(五)构建Docker镜像
- Docker 入门到实战教程(六)Docker数据卷
- Docker 入门到实战教程(七)安装Redis
- Docker 入门到实战教程(八)安装Mysql
- Docker 入门到实战教程(九)安装Nginx
- Docker教程(九)部署Spring Boot项目