消息队列中间件 RocketMQ 源码分析 —— Message 存储
- 1、概述
- 2、CommitLog 结构
- 3、CommitLog 存储消息
- MappedFile#落盘
- FlushRealTimeService
- CommitRealTimeService
- GroupCommitService
- CommitLog#putMessage(...)
- MappedFileQueue#getLastMappedFile(...)
- MappedFile#appendMessage(...)
- DefaultAppendMessageCallback#doAppend(...)
- FlushCommitLogService
1、概述
本文接《RocketMQ 源码分析 —— Message 发送与接收》。
主要解析 CommitLog
存储消息部分。
2、CommitLog 结构
CommitLog
、MappedFileQueue
、MappedFile
的关系如下:
CommitLog
: MappedFileQueue
: MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
CommitLog
、MappedFileQueue
、MappedFile
的定义如下:
-
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等文件。 -
MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。- 每个
MappedFile
统一文件大小。 - 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
CommitLog
里默认为 1GB。
- 每个
-
CommitLog
:针对MappedFileQueue
的封装使用。
CommitLog
目前存储在 MappedFile
有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
CommitLog
存储在 MappedFile
的结构:
MESSAGE[1] |
MESSAGE[2] |
... |
MESSAGE[n - 1] |
MESSAGE[n] |
BLANK |
---|
MESSAGE
在 CommitLog
存储结构:
第几位 |
字段 |
说明 |
数据类型 |
字节数 |
---|---|---|---|---|
1 |
MsgLen |
消息总长度 |
Int |
4 |
2 |
MagicCode |
MESSAGE_MAGIC_CODE |
Int |
4 |
3 |
BodyCRC |
消息内容CRC |
Int |
4 |
4 |
QueueId |
消息队列编号 |
Int |
4 |
5 |
Flag |
flag |
Int |
4 |
6 |
QueueOffset |
消息队列位置 |
Long |
8 |
7 |
PhysicalOffset |
物理位置。在 CommitLog 的顺序存储位置。 |
Long |
8 |
8 |
SysFlag |
MessageSysFlag |
Int |
4 |
9 |
BornTimestamp |
生成消息时间戳 |
Long |
8 |
10 |
BornHost |
生效消息的地址+端口 |
Long |
8 |
11 |
StoreTimestamp |
存储消息时间戳 |
Long |
8 |
12 |
StoreHost |
存储消息的地址+端口 |
Long |
8 |
13 |
ReconsumeTimes |
重新消费消息次数 |
Int |
4 |
14 |
PreparedTransationOffset |
Long |
8 |
|
15 |
BodyLength + Body |
内容长度 + 内容 |
Int + Bytes |
4 + bodyLength |
16 |
TopicLength + Topic |
Topic长度 + Topic |
Byte + Bytes |
1 + topicLength |
17 |
PropertiesLength + Properties |
拓展字段长度 + 拓展字段 |
Short + Bytes |
2 + PropertiesLength |
BLANK
在 CommitLog
存储结构:
第几位 |
字段 |
说明 |
数据类型 |
字节数 |
---|---|---|---|---|
1 |
maxBlank |
空白长度 |
Int |
4 |
2 |
MagicCode |
BLANK_MAGIC_CODE |
Int |
4 |
3、CommitLog 存储消息
CommitLog#putMessage(...)
// 省略代码
- 说明 :存储消息,并返回存储结果。
- 第 2 行 :设置存储时间等。
- 第 16 至 36 行 :事务消息相关,暂未了解。
- 第 45 & 97 行 :获取锁与释放锁。
- 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
- 第 55 至 62 行 :获取
MappedFile
,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。 - 第 65 行 :插入消息到
MappedFile
,解析解析见:MappedFile#appendMessage(...)。 - 第 69 至 80 行 :
MappedFile
已满,创建新的,再次插入消息。 - 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
- 第 143 至 173 行 :
Broker
主从同步。后面的文章会详细解析?。
MappedFileQueue#getLastMappedFile(...)
// 省略代码
- 说明 :获取最后一个
MappedFile
,若不存在或文件已满,则进行创建。 - 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的
createOffset
。 - 第 14 行 :计算文件名。从此处我们可
以得知,
MappedFile
的文件命名规则:
> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)
目前 `CommitLog` 的 `startOffset` 为 0。
此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如:
| startOffset | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3 |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |
_如果有知道的同学,麻烦提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*
- 第 30 至 35 行 :设置
MappedFile
是否是第一个创建的文件。该标识用于ConsumeQueue
对应的MappedFile
,详见ConsumeQueue#fillPreBlank
。
MappedFile#appendMessage(...)
// 省略代码
- 说明 :插入消息到
MappedFile
,并返回插入结果。 - 第 8 行 :获取需要写入的字节缓冲区。为什么会有
writeBuffer != null
的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。 - 第 9 至 11 行 :设置写入
position
,执行写入,更新wrotePosition
(当前写入位置,下次开始写入开始位置)。
DefaultAppendMessageCallback#doAppend(...)
// 省略代码
- 说明 :插入消息到字节缓冲区。
- 第 45 行 :计算物理位置。在
CommitLog
的顺序存储位置。 - 第 47 至 49 行 :计算
CommitLog
里的offsetMsgId
。这里一定要和msgId
区分开。
计算方式 |
长度 |
|||
---|---|---|---|---|
offsetMsgId |
Broker存储时生成 |
Hex(storeHostBytes, wroteOffset) |
32 |
|
msgId |
Client发送消息时生成 |
Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) |
32 |
《RocketMQ 源码分析 —— Message 基础》 |
- 第 51 至 61 行 :获取队列位置(offset)。
- 第 78 至 95 行 :计算消息总长度。
- 第 98 至 112 行 :当文件剩余空间不足时,写入
BLANK
占位,返回结果。 - 第 114 至 161 行 :写入
MESSAGE
。 - 第 173 行 :更新队列位置(offset)。
FlushCommitLogService
线程服务 |
场景 |
插入消息性能 |
---|---|---|
CommitRealTimeService |
异步刷盘 && 开启内存字节缓冲区 |
第一 |
FlushRealTimeService |
异步刷盘 && 关闭内存字节缓冲区 |
第二 |
GroupCommitService |
同步刷盘 |
第三 |
MappedFile#落盘
方式 |
|||
---|---|---|---|
方式一 |
写入内存字节缓冲区(writeBuffer) |
从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) |
文件通道(fileChannel)flush |
方式二 |
写入映射文件字节缓冲区(mappedByteBuffer) |
映射文件字节缓冲区(mappedByteBuffer)flush |
flush相关代码
考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE
才进行 flush
。
// 省略代码
commit相关代码:
考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE
才进行 commit
。
// 省略代码
FlushRealTimeService
消息插入成功时,异步刷盘时使用。
// 省略代码
- 说明:实时
flush
线程服务,调用MappedFile#flush
相关逻辑。 - 第 23 至 29 行 :每
flushPhysicQueueThoroughInterval
周期,执行一次flush
。因为不是每次循环到都能满足flushCommitLogLeastPages
大小,因此,需要一定周期进行一次强制flush
。当然,不能每次循环都去执行强制flush
,这样性能较差。 - 第 33 行 至 37 行 :根据
flushCommitLogTimed
参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用commitLogService.wakeup()
。 - 第 45 行 :调用
MappedFile
进行flush
。 - 第 61 至 65 行 :
Broker
关闭时,强制flush
,避免有未刷盘的数据。
CommitRealTimeService
消息插入成功时,异步刷盘时使用。
和 FlushRealTimeService
类似,性能更好。
// 省略代码
GroupCommitService
消息插入成功时,同步刷盘时使用。
// 省略代码
- 说明:批量写入线程服务。
- 第 16 至 25 行 :添加写入请求。方法设置了
sync
的原因:this.requestsWrite
会和this.requestsRead
不断交换,无法保证稳定的同步。 - 第 27 至 34 行 :读写队列交换。
- 第 38 至 60 行 :循环写入队列,进行
flush
。- 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个
MappedFile
(写第N个消息时,MappedFile
已满,创建了一个新的),所以需要有循环2次。 - 第 51 行 :唤醒等待写入请求线程,通过
CountDownLatch
实现
- 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个
- 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的
isWaitStoreMsgOK
未设置成TRUE
,导致未走批量提交。 - 第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果
wakeup()
时,则会立即进行一次批量提交。当Broker
设置成同步落盘 && 消息isWaitStoreMsgOK=true
,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。
- CSS魔法堂:说说Float那个被埋没的志向
- Netbeans配置Xdebug
- rpc框架: thrift/avro/protobuf 之maven插件生成java类
- WebComponent魔法堂:深究Custom Element 之 从过去看现在
- 数据可视化-EChart2.0使用总结1
- JavaScript事件概览
- gradle项目与maven项目相互转化
- rpc框架之gRPC 学习 - hello world
- Angular Service入门
- spring:如何用代码动态向容器中添加或移除Bean ?
- WebComponent魔法堂:深究Custom Element 之 标准构建
- druid 数据源 使用属性文件的一个坑
- Angular企业级开发(3)-Angular MVC实现
- spring: 加载远程配置
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Rancher2.4.3 Rest API修改镜像地址
- Python UNIX系统管理指南
- 聊聊dubbo-go的ConsistentHashLoadBalance
- Tacotron2运行笔记
- 小米底包
- 新特性解读 | GROUPING() 函数用法解析
- 给我1万字,也讲Java不清内存排查。1万不行来2万~.~
- Typescript 设计模式之工厂方法
- vue中获取外网IP的方法
- WPF图片处理相关
- WPF文件压缩
- 如何实现SAP WebClient UI附件批量上传
- Python基础教程
- 使用Source Monitor检测Java代码的环复杂度
- 使用Source Monitor检测Java代码的环复杂度