MongoDB内核:主从同步之源码剖析
导语:前面文章提到了MongoDB的复制集协议是一种raft-like的协议。其中一点差别就是关于log的拉取和回放。本文将尝试结合代码深入探究主从同步中一些细节。(PS:本文代码和分析基于源码版本V4.0.3版本。水平有限,文章中有错误或理解不当的地方,还望指出,共同学习)
一、主从同步的大致流程
之前的文章提到过,MongoDB复制集协议采用的是pull而不是push的方式。也就是说从节点定期去主节点的oplog集合中拉取最新的操作并应用到自身中。
大致的流程如下(与图中编号并不对应):
- 从节点上的
rsBackgroundSync
后台线程通过find/getmore
命令到主节点上获取oplog,并放入到OplogBuffer
中; -
replBatcher
线程感知到OplogBuffer
中的数据并消费,保存到OpQueue
中; -
OplogApplier
线程感知OpQueue
中的新数据,通过多个(默认16个)worker线程回放Oplog,并更新lastAppliedOpTime
和lastDurableOpTime
; - 从节点上的
SyncSourceFeedback
后台线程感知到有新数据写入成功,将自身最新的lastAppliedOpTime
和lastDurableOpTime
等信息通过replSetUpdatePosition
内部命令返回给主节点; - 主节点接受到各个从节点 最新的
lastAppliedOpTime
和lastDurableOpTime
,计算大多数节点(包括自己)当前的数据同步进展,并更新lastCommittedOpTime
;
这里oplog的拉取和回放可以理解为是一个“单个生产者多个消费者”的生产者-消费者模型。彼此是独立的,正常情况下相互不阻塞。
二、代码细节
当节点处于SECONDARY状态时,BackgroundSync
线程是一个死循环,每次循环中它都会完成从节点从其同步源上获取oplog并应用到自身的过程。
以一个从节点的视角出发,主从同步可以大致分为如下几个阶段:
2.1 获得一个同步源
SyncSourceResolver
负责获取一个同步源的工作,代码路径如下:
SyncSourceResolver::startup()-->_chooseAndProbeNextSyncSource()-->_chooseNewSyncSource()-->chooseNewSyncSource()-->ReplicationCoordinator::chooseNewSyncSource()-->TopologyCoordinator::chooseNewSyncSource()
// find a target to sync from the last optime fetched
{
OpTime minValidSaved;
{
auto opCtx = cc().makeOperationContext();
minValidSaved = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
}
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
if (!_syncSourceHost.empty()) {
log() << "Clearing sync source " << _syncSourceHost << " to choose a new one.";
}
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique<SyncSourceResolver>(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
}
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}
其中ReplicationCoordinator
对象负责协调副本集与系统其余部分的交互。这里TopologyCoordinator::chooseNewSyncSource
大致的逻辑如下:
- 根据用户参数
forceSyncSoureCandidate
选择同步源,并check。如果同步源无效或者不属于副本集或者处于黑名单中都会失败,否则会返回指定的同步源; - 尝试之前用过的同步源;
- 等待拓扑结构中节点的ping结果;
- 然后会有关链式复制的选择,如果不允许链式复制的话,那么就将当前的主节点作为同步源并返回;
- 否则将尝试找到一个ping时间最短并且oplog比当前节点更新的节点;
- 获取主节点的oplogTime;
- 两轮尝试做降级筛选,遍历副本集成员节点。在第一轮尝试中忽略以下节点:没有投票权、hidden状态、与主节点的差距太大的节点;否则会忽略以下节点:
buildIndex
参数不同的节点、oplog落后于自身的节点、黑名单中的节点。当然了如果第一轮就找到了理想的同步源,自然也就不需要第二轮了。 - 无论是否找到同步源都会输出日志并返回同步源(没找到时会返回":27017");
如果没有节点满足必要条件,则BackgroundSync
等待1秒钟,然后重新开始同步源选择过程。
2.2 oplog拉取
这一过程是由oplogFetcher
完成的,也发生于BackgroundSync
阶段中。代码路径如下:
oplogFetcher::startup()-->AbstractAsyncComponent::startup()-->AbstractOplogFetcher::_doStartup_inlock()-->AbstractOplogFetcher::_makeAndScheduleFetcherCallback()-->OplogFetcher::_makeFindCommandObject()-->AbstractOplogFetcher::_makeFetcher()-->AbstractOplogFetcher::_callback()-->OplogFetcher::_onSuccessfulBatch()
在_makeFindCommandObject()
中,我们可以看到其生成的oplog查询语句的细节。
- 指定了namespace
- 过滤条件指定了大于
lastOpTimeFetched
- 指定了
{oplogReplay:true}
选项。oplogReplay表明拉取oplog的目的是为了回放 - 使用了
{tailable:true}
和{awaitData:true}
选项。tailable cursor是类似于tail -f
命令的操作,作用与像log.rs
这样的Capped Collection上,使得我们可以不关闭cursor而从中持续不断地读出新的数据。awaitData参数的目的在于阻塞批处理。设置为true
时,当tailable cursor遍历到集合末尾时,会在一段时间内阻塞查询线程,等待新的写入到来。当新写入插入该集合中时,阻塞线程会被唤醒并将这一批数据返回给客户端。 - 指定了超时时间(default
60s
) - 指定了batch大小(default
16M/ 12 * 10
)
综合理解上面的查询条件,得到以下几个结论:
- oplog表不包含
_id
,没办法走索引,查询的初始扫描是比较耗性能的 - 查询的过滤条件为:大于或等于上一次拉取的最后一个oplog条目的时间戳。由于这里是
$gte
,所以应始终至少返回一个文档 - 每次拉取要么满足一批的大小限制,要么满足一批的时间限制。都是批量拉取而非单条拉取(可以有效减少网络传输消耗)
OplogFetcher::_onSuccessfulBatch()
处理成功拉回一批oplog的结果,更新自己的_lastFetched
视图,并会返回下一次需要发送的getMore
命令。其大致逻辑如下:
-
checkRemoteOplogStart()
检查第一批拉回来的oplog结果。如果在同步源中找不到刚刚拉取的操作的optime,则会返回OplogStartMissing
的错误; -
validateDocuments()
检验文档的合法性,在这里检查oplog乱序等问题; -
BackgroundSync::_enqueueDocuments()
将oplogFetcher
拉取到的结果放入oplogBuffer
中; -
shouldStopFetching()
处理一些需要停止oplog拉取的错误场景; -
makeGetMoreCommandObject()
根据当前的cursorId
来生成新的getMore
命令;
外层的BackgroundSync
会根据上面提到的fetcherReturnStatus
返回的状态码进行相应的处理
- oplog乱序:输出错误日志并return
- 同步源上找不到需要的oplog: 进入Rollbak流程
- 非法BSON:将同步源加入黑名单60s
- 其他错误:输出错误日志
2.3 并发回放
这一过程是由oplogBUffer
+oplogApplier
完成。前者主要将拉到的oplog缓存在本地,pushAllNonBlocking()
中会遍历所有的oplog条目并条用存储层的接口insertDocuments()
。
后者的代码路径如下:
OplogApplier::startup()-->SyncTail::oplogApplication()-->SyncTail::_oplogApplication()-->SyncTail::multiApply()-->multiSyncApply()
其中,oplogApplier
会启动一个新的ReplBatcher
线程,它会不断尝试load可能动态更改的replBatchLimitBytes
和replBatchLimitOperations
参数,然后调用SyncTail::tryPopAndWaitForMore()
。
在tryPopAndWaitForMore()
中会尝试从oplogBuffer
中取数据并保存到OpQueue
里。有以下几种情况会等待数据长达1s:
1)oplogBuffer
和oplogQueue
均为空;
2)设置了延迟节点,拉回来的oplog还不满足延迟条件;
SyncTail::_consume()
用于消费数据,但这里有关于DDL操作的额外处理逻辑。当遇到这种操作(包括:create,renameCollection, dbCheck, drop, collMod, dropDatabse, emptyCapped, convertToCapped, createIndexes, dropIndexes
。注:applyOps
除外)时,将会从批处理转成单条处理的方式。
bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
OplogBuffer* oplogBuffer,
SyncTail::OpQueue* ops,
const BatchLimits& limits) {
{
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = oplogBuffer->peek(opCtx, &op);
if (!peek_success) {
// If we don't have anything in the queue, wait a bit for something to appear.
if (ops->empty()) {
if (inShutdown()) {
ops->setMustShutdownFlag();
} else {
// Block up to 1 second. We still return true in this case because we want this
// op to be the first in a new batch with a new start time.
oplogBuffer->waitForData(Seconds(1));
}
}
return true;
}
if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) {
return true; // Return before wasting time parsing the op.
}
ops->emplace_back(std::move(op)); // Parses the op in-place.
}
auto& entry = ops->back();
auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) {
ops->pop_back(); // Don't do this op yet.
if (ops->empty()) {
sleepsecs(1);
}
return true;
}
// !关于非CURD操作的处理!
if ((entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) ||
entry.getNamespace().isSystemDotViews()) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
_consume(opCtx, oplogBuffer);
} else {
// This op must be processed alone, but we already had ops in the queue so we can't
// include it in this batch. Since we didn't call consume(), we'll see this again next
// time and process it alone.
ops->pop_back();
}
// Apply what we have so far.
return true;
}
// We are going to apply this Op.
_consume(opCtx, oplogBuffer);
// Go back for more ops, unless we've hit the limit.
return ops->getCount() >= limits.ops;
而对于_oplogApplication()
,其处理逻辑大致如下:
-
getNextBatch()
从opQueue
中取一批,超时为1s。如果没取到则继继续下一次循环; - 获取这一批oplog条目的第一条和最后一条
oplogTime
以及自身的lasterAppliedOpTime
,如果第一条opTime
比本地已经apply的opTime
还要小的话,返回oplog乱序的错误——OplogOutOfOrder
(当然,基本不会出现); -
multiApply()
进行oplog并发回放;它会返回一个本次apply中最后一条oplog的OpTime
,肯定会等于第二步中获取的批处理中最后一条opTime;
- 更新自己关于
last applied optime
的视图并持久化; -
oplogDiskLocRegister()
通知存储引擎来更新这一批已applied oplog的可见性;
在SyncTail::multiApply()
中,multikeyVector
是用于并发回放的线程池。multiApply()
的大致逻辑如下:
-
scheduleWritesToOplog()
将oplog写入本地oplog集合; -
fillWriterVectors()
将待处理的一批oplog分发到不同的回放线程; -
ThreadPool::waitForIdle()
等待上一次multiApply完成; -
applyOps()
进行oplog回放; -
replicationBatchIsComplete()
通知存储引擎这一批oplog已经完成了回放,这意味着所有跟这一批oplog条目相关的写入都结束了,不会再有新的写入操作了; - 返回写入成功的最后一条oplog的
opTime
;
先来看看oplog分发的逻辑——fillWriterVectors()
,它会遍历这一批待回放的oplog
如果是CURD的操作(指insert
,delete
,update
),通过getIdElement()
取出其操作的文档_id
并计算hash值,当然对于update命令需要去o
里面取。然后以nampespace得到的hash值作为murmur哈希的seed为_id
的hash值计算出一个新的hash值来标识一条oplog。
然后使用该hash值直接对回放线程池大小进行取模,来决定一条oplog应该分发到哪个线程。
上面的逻辑保证了对于同一个doc操作的oplog(_id
一致)会在一个回放线程中完成回放,而oplog的时间顺序性保证了这些操作的顺序回放。
再来看看oplog回放的逻辑——applyOps
,代码实现比较简洁。遍历线程池,每个负责回放的线程都会调用multiSyncApply()
函数。
writerPool
是由ReplicationCoordinatorExternalStateImpl::startThreads()
中调用SyncTail::makeWriterPool()
生成的,会使用replWriterThreadCount
(缺省为16)作为线程池的线程数。调用链为:
ReplicationCoordinatorExternalStateImpl::startThreads()-->SyncTail::makeWriterPool()-->ThreadPool::startup()-->_startWorkerThread_inlock()-->ThreadPool::_workerThreadBody()-->ThreadPool::_consumeTasks()-->ThreadPool::_doOneTask()
在multiSyncApply()
中首先用stableSortByNamespace()
将这一批oplog按namespace排序。然后InsertGroup::groupAndApplyInserts()
尝试将一批对同一个namespace的insert组成一个批量insert操作;当然如果没办法变成批处理也只好单条处理。最后调用SyncTail::syncApply()
,里面会根据不同的op
类型进行不同的处理,非DDL操作会调用applyOperation_inlock()
,DDL操作会调用applyCommand_inlock()
auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe());
if (opType == OpTypeEnum::kNoop) { //空操作
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
return applyOp(ctx.db());
} else if (opType == OpTypeEnum::kInsert && nss.isSystemDotIndexes()) {// 对于system.indexes的'特殊'insert操作
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
return applyOp(ctx.db());
} else if (OplogEntry::isCrudOpType(opType)) { //其他的CURD操作
return writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] {
try {
AutoGetCollection autoColl(opCtx, getNsOrUUID(nss, op), MODE_IX);
auto db = autoColl.getDb();
OldClientContext ctx(opCtx, autoColl.getNss().ns(), db);
return applyOp(ctx.db());
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
ex.addContext(str::stream() << "Failed to apply operation: " << redact(op));
throw;
}
});
} else if (opType == OpTypeEnum::kCommand) { //DDL操作
return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
// a command may need a global write lock. so we will conservatively go
// ahead and grab one here. suboptimal. :-(
Lock::GlobalWrite globalWriteLock(opCtx); //!注意:这里加的是全局的写锁!
// special case apply for commands to avoid implicit database creation
Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode);
incrementOpsAppliedStats();
return status;
});
}
- 可以看到,除了对于DDL操作需要加全局写锁之外,其他的oplog apply操作都只需要db维度的锁即可。
- 同一批次内的Oplog条目不一定按顺序apply。
- 对文档的操作必须是原子性的且有序的,因此对同一文档的操作将放在要序列化的同一线程上
- 此外,
$cmd
操作将以大小为1的批处理顺序进行
然后在applyOperation_inlock()
中对不同的op操作类型(n,i,u,d
)进行了不同的处理,最终都是WriteUnitOfWork
进行一次事务写操作并提交wuow.commit()
。其中插入操作也会尝试进行批处理,以提高性能。
2.4 返回给主节点
这一过程由syncSourceFeedback
完成。它会将自身最新的 lastAppliedOpTime
和lastDurableOpTime
等信息通过replSetUpdatePosition
内部命令返回给主节点。
ReplicationCoordinatorExternalState
在启动时创建一个SyncSourceFeedback
对象,该对象负责发送replSetUpdatePosition
命令。
SyncSourceFeedback
会启动一个循环。 在每次迭代中,它首先等待条件变量,每当ReplicationCoordinator
发现副本集中的某个节点复制了更多操作并更新为最新状态时,该条件变量就会被通知。 在继续之前,它会检查它是否不处于PRIMARY
或STARTUP
状态。
然后,它获取节点的同步源,并创建一个Reporter
,由该Reporter
将replSetUpdatePosition
命令发送到同步源。 该命令每隔keepAliveInterval
毫秒(也就是(electionTimeout / 2)
)保持发送,以维护有关副本集中节点的活动信息。
replSetUpdatePosition
命令包含以下信息:
- 一个
opTimes
数组,其中包含每个活动副本集成员的对象。 该信息由ReplicationCoordinator
使用其SlaveInfo
中的信息填充。 不包括被认为是挂掉的节点。 每个节点都包含以下信息:
- `last durable OpTime`
- `last applied OpTime`
- 成员ID
- `ReplicaSetConfig`版本
-
ReplSetMetadata
,副本集元数据,包括以下信息
- 上游节点的`last commited OpTime`
- 当前term
- `ReplicaSetConfig`的版本和term
- 副本集ID
- 上游节点是否为主
2.5 补充说明
可以看到,2.1~2.4中的任务分别由不同的线程进行处理,是相互独立的,他们都是由ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication)()
启动的(SyncSourceResolver
和oplogFetcher
的启动在BackgroundSync::startup()
内部):
log() << "Starting replication fetcher thread";
_oplogBuffer = stdx::make_unique<OplogBufferBlockingQueue>();
_oplogBuffer->startup(opCtx);
_bgSync =
stdx::make_unique<BackgroundSync>(replCoord, this, _replicationProcess, _oplogBuffer.get());
_bgSync->startup(opCtx);
_oplogApplier = stdx::make_unique<OplogApplier>(_oplogApplierTaskExecutor.get(),
_oplogBuffer.get(),
_bgSync.get(),
replCoord,
_replicationProcess->getConsistencyMarkers(),
_storageInterface,
OplogApplier::Options(),
_writerPool.get());
_oplogApplierShutdownFuture = _oplogApplier->startup();
log() << "Starting replication reporter thread";
auto bgSyncPtr = _bgSync.get();
_syncSourceFeedbackThread = stdx::make_unique<stdx::thread>([this, bgSyncPtr, replCoord] {
_syncSourceFeedback.run(_taskExecutor.get(), bgSyncPtr, replCoord);
});
三、结论
- 内核中关于主从同步这一部分的代码相对比较清晰,不同的模块(线程)负责不同的工作,共同保证MongoDB的主从同步。
- oplog并发回放以保证主从同步性能,并发中对于DDL又有加全局写锁的”串行“操作。
- 为什么在
oplogBuffer
和oplogApplier
中间要加一层opQueue
以及ReplBatcher
呢?一方面将oplog变得尽可能平滑,减少源端写入不均带来的影响;另一方面要做“并行--串行--并行”这样的转换操作,保证DDL是串行处理的(一个batch里面只有单条DDL操作,只会发送给后端16个回放线程中的一个)。 - oplog并发回放先按namesapce排序,然后再按
_id
hash到不同线程进行回放。同一批次内的oplog并不是按顺序apply的。按namespace排序应该是为了更好地利用局部性原理(同一个ns内的操作在相同的cache、内存或磁盘扇区的概率更大)
参考资料
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Django封装交互接口代码
- 使用K.function()调试keras操作
- tensorflow图像裁剪进行数据增强操作
- ThinkPHP3.2.3框架Memcache缓存使用方法实例总结
- Python+PyQt5+MySQL实现天气管理系统
- 浅谈Python协程
- Python logging模块异步线程写日志实现过程解析
- php-fpm重启导致的程序执行中断问题详解
- Python Socket TCP双端聊天功能实现过程详解
- django 将自带的数据库sqlite3改成mysql实例
- 利用python对mysql表做全局模糊搜索并分页实例
- PHP chop()函数讲解
- Linux下安装Memcached服务器和客户端与PHP使用示例
- PHP xpath()函数讲解
- CentOS7编译安装php7.1的教程详解