不要被kafka的异步模式欺骗了
啥是异步模式
kafka的生产者可以选择使用异步方式发送数据,所谓异步方式,就是我们调用 send()
方法,并指定一个回调函数, 服务器在返回响应时调用该函数。
kafka在客户端里暴露了两个send
方法,我们可以自己选择同步或者异步模式。我们来看一个kafka的生产者发送示例,有个直观的感受。这个示例是一个同步的模式。
ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “测试”);//Topic Key Value
try{
Future future = producer.send(record);
future.get();//获取执行结果
} catch(Exception e) {
e.printStackTrace();
}
我们从源码层面来继续看下。
首先kafka定义了一个接口,
然后KafkaProducer
实现了这两个方法,我们看下异步方法的实现逻辑。
可以看到最终是调用doSend
方法,调用的时候传入一个回调。这个回调就是监听方法的执行结果的。
异步模式也会阻塞的
很多人会认为,既然是异步模式,不管结果是成功还是失败,肯定方法调用会马上返回的。那我只能告诉你,不好意思,不一定是这样。我自己就曾经踩过这个坑。
我们当时有个业务流程需要在执行完成后发送kakfa消息给某个业务方,为了尽量减少影响我这个主流程的执行时间,采用了异步方式发送kafka消息。在使用中,因为配错了kafka的TOPIC信息,发现流程阻塞发送消息这里长达6秒(kafka默认的发送超时时间)。
究竟为啥异步方式还会阻塞呢?我们继续看源码。
不管是同步模式还是异步模式,最终都会调用到doSend
方法,注意看上图中的waitOnMetadata
方法,我上面说的阻塞的情况就是阻塞在这个方法里。那我们继续看这个方法。
通过代码中的注释我们大概能了解这个方法的功能,不过我这里还是要解释下。(防止有人看不懂英文,哈哈)
waitOnMetadata
获取当前的集群元数据信息,如果缓存有,并且分区没有超过指定分区范围则缓存返回,否则触发更新,等待新的metadata。这个等待的操作在下面这行代码:
metadata.awaitUpdate(version, remainingWaitMs);
然后就继续跟喽,
这个方法很好理解,就是一直在等一个条件,这个条件达到了就返回,否则一直等待超时退出。而这个条件就是当前的版本号要大于上个版本号。
那么谁来更新版本号呢?就是我们前面提到的sender
线程。当我们的topic配置错误的时候导致metadata一直无法更新,然后一直等到超时。
破案了!
总结
kafka的异步模式可以让我们在业务场景中发送消息时即刻返回,不必等待发送的结果。但是当metadata取不到时,发送的过程还是需要等待一直超时的。
程序员是一个尤其需要不断学习的工种,平时养成阅读源码的习惯,不光能避免踩一些坑,还能在遇到问题是快递定位到问题的根源。
- Typescript 查缺补漏
- Git -- Stash
- Git -- Rebase
- Git -- 分支与合并 (命令行+可视化工具p4merge) Fast Forward 合并禁用 Fast Forward 合并自动合并解决合并的冲突
- 使用Angular CLI进行单元测试和E2E测试
- Git - 使用命令和P4Merge进行diff
- 使用Angular CLI进行Build (构建) 和 Serve
- 使用Angular CLI生成路由
- 使用Angular CLI从蓝本生成代码
- 使用Angular CLI生成 Angular 5项目
- Git基本命令 -- 创建Git项目
- Entity Framework Core 2.0 入门
- 用VSCode开发一个asp.net core2.0+angular5项目(5): Angular5+asp.net core 2.0 web api文件上传
- 用VSCode开发一个asp.net core 2.0+angular 5项目(4): Angular5全局错误处理
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Pycharm打开已有项目配置python环境的方法
- python cv2.resize函数high和width注意事项说明
- pytorch SENet实现案例
- python如何安装下载后的模块
- Python爬虫如何应对Cloudflare邮箱加密
- 如何使用Python处理HDF格式数据及可视化问题
- tp5框架使用composer实现日志记录功能示例
- python 图像插值 最近邻、双线性、双三次实例
- tp5(thinkPHP5)框架实现多数据库查询的方法
- Python-openCV开运算实例
- php curl获取https页面内容,不直接输出返回结果的设置方法
- 详解php中curl返回false的解决办法
- Pytorch mask-rcnn 实现细节分享
- pytorch中的weight-initilzation用法
- python安装读取grib库总结(推荐)