Kafka消费过程关键源码解析
全是干货的技术号: 本文已收录在github,欢迎 star/fork: https://github.com/Wasabi1234/Java-Interview-Tutorial
1 案例引入
- 官方Consumer最简代码用例:
简短的代码,背后牵涉很多问题,Consumer如何绑定特定分区?如何实现订阅 topic 的?又如何实现拉消息?
2 订阅流程
订阅主流程主要更新如下关键属性:
- 订阅状态(SubscriptionState) - subscriptions 主要维护所订阅的topic和patition的消费位置等状态信息
- 元数据中的topic信息
metadata中维护了Kafka集群元数据的一个子集,包括集群的Broker节点、Topic和Partition在节点上分布,以及我们聚焦的第二个问题:Coordinator给Consumer分配的Partition信息。
注意acquireAndEnsureOpen()
和try-finally release()
保证该方法的线程安全。
跟进到更新元数据的方法metadata.requestUpdateForNewTopics()
Metadata.requestUpdateForNewTopics()
- 请求更新元数据。
这里,并未真正发送更新元数据的请求,只是将需要更新元数据的标志位needUpdate
置true
。Kafka必须确保在第一次拉消息前元数据可用,即必须更新一次元数据,否则Consumer不知道应该去哪个Broker拉哪个Partition的消息。
3 拉消息流程
那元数据何时才真正更新呢?
- 拉消息时序图
KafkaConsumer#poll()方法中主要调用如下方法:
updateAssignmentMetadataIfNeeded()
- 更新元数据
其内部调用coordinator.poll()
,poll()
里又调用
ConsumerNetworkClient#ensureFreshMetadata()
ConsumerNetworkClient#awaitMetadataUpdate
内部调用了client.poll()方法,实现与Cluster通信,在Coordinator注册Consumer并拉取和更新元数据。
这些都是 client 类中方法,ConsumerNetworkClient封装了Consumer和Cluster之间所有网络通信的实现,是个完全的异步实现类。没有维护任何线程
- 待发送Request都存在unsent域
- Response存放在pendingCompletion域
每次调用poll()时,在当前线程中发送所有待发送Request,处理所有收到Response。
异步设计
- 优势:
无需维护用于异步发送的和处理响应的线程,并且能充分发挥批量处理的优势,这也是Kafka的性能非常好的原因之一。
很少的线程实现高吞吐量。劣势:极大增加了代码的复杂度。
好了,下面再看另一关键方法:
pollForFetches()
主要由fetcher.sendFetches()实现,由于代码过长,简述其流程如下:
- 根据元数据的信息,构造所需Broker的拉消息的Request对象
- 然后调用
ConsumerNetworkClient#send
异步发送Request - 并且注册一个回调类处理返回的Response
所有返回的Response被暂时存放在
Fetcher#completedFetches
。注意,此时的Request并未被真正发给各Broker,而被暂存在client.unsend
等待发送。 - 然后,在调用
ConsumerNetworkClient#poll
时,会真正将之前构造的所有Request发送出去,并处理收到的Response - 最后,fetcher.fetchedRecords()方法中,将返回的Response反序列化后转换为消息列表,返回给调用者
总结
综上过程讲解,给出整个拉消息流程涉及关键类的类图
参考
- 排序算法性能比较
- 上一期前端面试题整理答案
- 自己动手用Socket写一个HttpClient发送GET请求
- Ryu:模块间通信机制分析
- 异地双活实践笔记
- OpenStack Neutron中的DVR简介与OVS流表分析
- Python爬虫,带你制作高逼格的数据聚合云图
- nodejs+ftp+linux+nginx 自动部署前端
- Highcharts使用指南
- 如何通过经纬度获取地址信息?
- ADO.NET入门教程(二)了解.NET数据提供程序
- ADO.NET入门教程(三) 连接字符串,你小觑了吗?
- ADO.NET入门教程(四) 品味Connection对象
- ADO.NET入门教程(五) 细说数据库连接池
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 打卡群刷题总结0805——不同的二叉搜索树
- 利用STS临时密钥服务快速搭建直传页面的实践
- Clickhouse在大数据分析平台-留存分析上的应用
- Rancher 高可用部署
- 为什么Web端登录需要验证码?
- 原创分享 TiDB 的列式存储引擎是如何实现的?
- Qt音视频开发5-vlc事件订阅
- matlab中使用VMD(变分模态分解)
- TKE中挂载文件到CFS子目录
- Nginx Ingress on TKE 部署最佳实践
- 腾讯地图点聚合开发-实现地图找房功能
- 机器学习Tips:关于Scikit-Learn的 10 个小秘密
- R语言ggmap空间可视化机动车碰撞–街道地图热力图
- R语言ggmap空间可视化机动车交通事故地图
- 基于matlab的Lorenz系统仿真可视化