MongoDB内核:主从同步之源码剖析

时间:2022-07-26
本文章向大家介绍MongoDB内核:主从同步之源码剖析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

导语:前面文章提到了MongoDB的复制集协议是一种raft-like的协议。其中一点差别就是关于log的拉取和回放。本文将尝试结合代码深入探究主从同步中一些细节。(PS:本文代码和分析基于源码版本V4.0.3版本。水平有限,文章中有错误或理解不当的地方,还望指出,共同学习)

一、主从同步的大致流程

之前的文章提到过,MongoDB复制集协议采用的是pull而不是push的方式。也就是说从节点定期去主节点的oplog集合中拉取最新的操作并应用到自身中。

主从同步流程.png

大致的流程如下(与图中编号并不对应):

  1. 从节点上的 rsBackgroundSync 后台线程通过 find/getmore 命令到主节点上获取oplog,并放入到 OplogBuffer中;
  2. replBatcher 线程感知到OplogBuffer中的数据并消费,保存到OpQueue中;
  3. OplogApplier 线程感知OpQueue中的新数据,通过多个(默认16个)worker线程回放Oplog,并更新lastAppliedOpTimelastDurableOpTime
  4. 从节点上的 SyncSourceFeedback 后台线程感知到有新数据写入成功,将自身最新的 lastAppliedOpTimelastDurableOpTime等信息通过 replSetUpdatePosition 内部命令返回给主节点;
  5. 主节点接受到各个从节点 最新的 lastAppliedOpTimelastDurableOpTime,计算大多数节点(包括自己)当前的数据同步进展,并更新 lastCommittedOpTime

这里oplog的拉取和回放可以理解为是一个“单个生产者多个消费者”的生产者-消费者模型。彼此是独立的,正常情况下相互不阻塞。

二、代码细节

当节点处于SECONDARY状态时,BackgroundSync线程是一个死循环,每次循环中它都会完成从节点从其同步源上获取oplog并应用到自身的过程。

以一个从节点的视角出发,主从同步可以大致分为如下几个阶段:

2.1 获得一个同步源

SyncSourceResolver负责获取一个同步源的工作,代码路径如下:

SyncSourceResolver::startup()-->_chooseAndProbeNextSyncSource()-->_chooseNewSyncSource()-->chooseNewSyncSource()-->ReplicationCoordinator::chooseNewSyncSource()-->TopologyCoordinator::chooseNewSyncSource()

 // find a target to sync from the last optime fetched
    {
        OpTime minValidSaved;
        {
            auto opCtx = cc().makeOperationContext();
            minValidSaved = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
        }
        stdx::lock_guard<stdx::mutex> lock(_mutex);
        if (_state != ProducerState::Running) {
            return;
        }
        const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
        lastOpTimeFetched = _lastOpTimeFetched;
        if (!_syncSourceHost.empty()) {
            log() << "Clearing sync source " << _syncSourceHost << " to choose a new one.";
        }
        _syncSourceHost = HostAndPort();
        _syncSourceResolver = stdx::make_unique<SyncSourceResolver>(
            _replicationCoordinatorExternalState->getTaskExecutor(),
            _replCoord,
            lastOpTimeFetched,
            requiredOpTime,
            [&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
    }

    auto status = _syncSourceResolver->startup();
    if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
        return;
    }

其中ReplicationCoordinator对象负责协调副本集与系统其余部分的交互。这里TopologyCoordinator::chooseNewSyncSource大致的逻辑如下:

  1. 根据用户参数forceSyncSoureCandidate选择同步源,并check。如果同步源无效或者不属于副本集或者处于黑名单中都会失败,否则会返回指定的同步源;
  2. 尝试之前用过的同步源;
  3. 等待拓扑结构中节点的ping结果;
  4. 然后会有关链式复制的选择,如果不允许链式复制的话,那么就将当前的主节点作为同步源并返回;
  5. 否则将尝试找到一个ping时间最短并且oplog比当前节点更新的节点;
  6. 获取主节点的oplogTime;
  7. 两轮尝试做降级筛选,遍历副本集成员节点。在第一轮尝试中忽略以下节点:没有投票权、hidden状态、与主节点的差距太大的节点;否则会忽略以下节点:buildIndex参数不同的节点、oplog落后于自身的节点、黑名单中的节点。当然了如果第一轮就找到了理想的同步源,自然也就不需要第二轮了。
  8. 无论是否找到同步源都会输出日志并返回同步源(没找到时会返回":27017");

如果没有节点满足必要条件,则BackgroundSync等待1秒钟,然后重新开始同步源选择过程。

2.2 oplog拉取

这一过程是由oplogFetcher完成的,也发生于BackgroundSync阶段中。代码路径如下:

oplogFetcher::startup()-->AbstractAsyncComponent::startup()-->AbstractOplogFetcher::_doStartup_inlock()-->AbstractOplogFetcher::_makeAndScheduleFetcherCallback()-->OplogFetcher::_makeFindCommandObject()-->AbstractOplogFetcher::_makeFetcher()-->AbstractOplogFetcher::_callback()-->OplogFetcher::_onSuccessfulBatch()

_makeFindCommandObject()中,我们可以看到其生成的oplog查询语句的细节。

  • 指定了namespace
  • 过滤条件指定了大于lastOpTimeFetched
  • 指定了{oplogReplay:true}选项。oplogReplay表明拉取oplog的目的是为了回放
  • 使用了{tailable:true}{awaitData:true}选项。tailable cursor是类似于tail -f命令的操作,作用与像log.rs这样的Capped Collection上,使得我们可以不关闭cursor而从中持续不断地读出新的数据。awaitData参数的目的在于阻塞批处理。设置为true时,当tailable cursor遍历到集合末尾时,会在一段时间内阻塞查询线程,等待新的写入到来。当新写入插入该集合中时,阻塞线程会被唤醒并将这一批数据返回给客户端。
  • 指定了超时时间(default 60s
  • 指定了batch大小(default 16M/ 12 * 10)
oplogFetcher生成oplog查询语句的代码片段.png

综合理解上面的查询条件,得到以下几个结论:

  • oplog表不包含_id,没办法走索引,查询的初始扫描是比较耗性能的
  • 查询的过滤条件为:大于或等于上一次拉取的最后一个oplog条目的时间戳。由于这里是$gte,所以应始终至少返回一个文档
  • 每次拉取要么满足一批的大小限制,要么满足一批的时间限制。都是批量拉取而非单条拉取(可以有效减少网络传输消耗)

OplogFetcher::_onSuccessfulBatch()处理成功拉回一批oplog的结果,更新自己的_lastFetched视图,并会返回下一次需要发送的getMore命令。其大致逻辑如下:

  1. checkRemoteOplogStart()检查第一批拉回来的oplog结果。如果在同步源中找不到刚刚拉取的操作的optime,则会返回OplogStartMissing的错误;
  2. validateDocuments()检验文档的合法性,在这里检查oplog乱序等问题;
  3. BackgroundSync::_enqueueDocuments()oplogFetcher拉取到的结果放入oplogBuffer中;
  4. shouldStopFetching()处理一些需要停止oplog拉取的错误场景;
  5. makeGetMoreCommandObject()根据当前的cursorId来生成新的getMore命令;

外层的BackgroundSync会根据上面提到的fetcherReturnStatus返回的状态码进行相应的处理

  • oplog乱序:输出错误日志并return
  • 同步源上找不到需要的oplog: 进入Rollbak流程
  • 非法BSON:将同步源加入黑名单60s
  • 其他错误:输出错误日志

2.3 并发回放

这一过程是由oplogBUffer+oplogApplier完成。前者主要将拉到的oplog缓存在本地,pushAllNonBlocking()中会遍历所有的oplog条目并条用存储层的接口insertDocuments()

后者的代码路径如下:

OplogApplier::startup()-->SyncTail::oplogApplication()-->SyncTail::_oplogApplication()-->SyncTail::multiApply()-->multiSyncApply()

其中,oplogApplier会启动一个新的ReplBatcher线程,它会不断尝试load可能动态更改的replBatchLimitBytesreplBatchLimitOperations参数,然后调用SyncTail::tryPopAndWaitForMore()

tryPopAndWaitForMore()中会尝试从oplogBuffer中取数据并保存到OpQueue里。有以下几种情况会等待数据长达1s:

1)oplogBufferoplogQueue均为空;

2)设置了延迟节点,拉回来的oplog还不满足延迟条件;

SyncTail::_consume()用于消费数据,但这里有关于DDL操作的额外处理逻辑。当遇到这种操作(包括:create,renameCollection, dbCheck, drop, collMod, dropDatabse, emptyCapped, convertToCapped, createIndexes, dropIndexes。注:applyOps除外)时,将会从批处理转成单条处理的方式。

bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
                                    OplogBuffer* oplogBuffer,
                                    SyncTail::OpQueue* ops,
                                    const BatchLimits& limits) {
    {
        BSONObj op;
        // Check to see if there are ops waiting in the bgsync queue
        bool peek_success = oplogBuffer->peek(opCtx, &op);
        if (!peek_success) {
            // If we don't have anything in the queue, wait a bit for something to appear.
            if (ops->empty()) {
                if (inShutdown()) {
                    ops->setMustShutdownFlag();
                } else {
                    // Block up to 1 second. We still return true in this case because we want this
                    // op to be the first in a new batch with a new start time.
                    oplogBuffer->waitForData(Seconds(1));
                }
            }
            return true;
        }

        if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) {
            return true;  // Return before wasting time parsing the op.
        }
        ops->emplace_back(std::move(op));  // Parses the op in-place.
    }

    auto& entry = ops->back();

    auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
    if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) {

        ops->pop_back();  // Don't do this op yet.
        if (ops->empty()) {
            sleepsecs(1);
        }
        return true;
    }
    
    // !关于非CURD操作的处理!
     if ((entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) ||
        entry.getNamespace().isSystemDotViews()) {
        if (ops->getCount() == 1) {
            // apply commands one-at-a-time
            _consume(opCtx, oplogBuffer);
        } else {
            // This op must be processed alone, but we already had ops in the queue so we can't
            // include it in this batch. Since we didn't call consume(), we'll see this again next
            // time and process it alone.
            ops->pop_back();
        }

        // Apply what we have so far.
        return true;
    }

    // We are going to apply this Op.
    _consume(opCtx, oplogBuffer);

    // Go back for more ops, unless we've hit the limit.
    return ops->getCount() >= limits.ops;

而对于_oplogApplication(),其处理逻辑大致如下:

  1. getNextBatch()opQueue中取一批,超时为1s。如果没取到则继继续下一次循环;
  2. 获取这一批oplog条目的第一条和最后一条oplogTime以及自身的lasterAppliedOpTime,如果第一条opTime比本地已经apply的opTime还要小的话,返回oplog乱序的错误——OplogOutOfOrder(当然,基本不会出现);
  3. multiApply()进行oplog并发回放;它会返回一个本次apply中最后一条oplog的OpTime,肯定会等于第二步中获取的批处理中最后一条opTime;
  4. 更新自己关于last applied optime的视图并持久化;
  5. oplogDiskLocRegister()通知存储引擎来更新这一批已applied oplog的可见性;

SyncTail::multiApply()中,multikeyVector是用于并发回放的线程池。multiApply()的大致逻辑如下:

  1. scheduleWritesToOplog()将oplog写入本地oplog集合;
  2. fillWriterVectors()将待处理的一批oplog分发到不同的回放线程;
  3. ThreadPool::waitForIdle()等待上一次multiApply完成;
  4. applyOps()进行oplog回放;
  5. replicationBatchIsComplete()通知存储引擎这一批oplog已经完成了回放,这意味着所有跟这一批oplog条目相关的写入都结束了,不会再有新的写入操作了;
  6. 返回写入成功的最后一条oplog的opTime

先来看看oplog分发的逻辑——fillWriterVectors(),它会遍历这一批待回放的oplog

如果是CURD的操作(指insert,delete,update),通过getIdElement()取出其操作的文档_id并计算hash值,当然对于update命令需要去o里面取。然后以nampespace得到的hash值作为murmur哈希的seed为_id的hash值计算出一个新的hash值来标识一条oplog。

oplog分发hash代码截图1.png

然后使用该hash值直接对回放线程池大小进行取模,来决定一条oplog应该分发到哪个线程。

oplog分发hash代码截图2.png

上面的逻辑保证了对于同一个doc操作的oplog(_id一致)会在一个回放线程中完成回放,而oplog的时间顺序性保证了这些操作的顺序回放。


再来看看oplog回放的逻辑——applyOps,代码实现比较简洁。遍历线程池,每个负责回放的线程都会调用multiSyncApply()函数。

applyOps代码片段.png

writerPool是由ReplicationCoordinatorExternalStateImpl::startThreads()中调用SyncTail::makeWriterPool()生成的,会使用replWriterThreadCount(缺省为16)作为线程池的线程数。

调用链为:ReplicationCoordinatorExternalStateImpl::startThreads()-->SyncTail::makeWriterPool()-->ThreadPool::startup()-->_startWorkerThread_inlock()-->ThreadPool::_workerThreadBody()-->ThreadPool::_consumeTasks()-->ThreadPool::_doOneTask()

multiSyncApply()中首先用stableSortByNamespace()将这一批oplog按namespace排序。然后InsertGroup::groupAndApplyInserts()尝试将一批对同一个namespace的insert组成一个批量insert操作;当然如果没办法变成批处理也只好单条处理。最后调用SyncTail::syncApply(),里面会根据不同的op类型进行不同的处理,非DDL操作会调用applyOperation_inlock(),DDL操作会调用applyCommand_inlock()

auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe());

    if (opType == OpTypeEnum::kNoop) {	//空操作
        Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
        OldClientContext ctx(opCtx, nss.ns());
        return applyOp(ctx.db());
    } else if (opType == OpTypeEnum::kInsert && nss.isSystemDotIndexes()) {// 对于system.indexes的'特殊'insert操作
        Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
        OldClientContext ctx(opCtx, nss.ns());
        return applyOp(ctx.db());
    } else if (OplogEntry::isCrudOpType(opType)) { //其他的CURD操作
        return writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] {
            try {
                AutoGetCollection autoColl(opCtx, getNsOrUUID(nss, op), MODE_IX);
                auto db = autoColl.getDb();
                OldClientContext ctx(opCtx, autoColl.getNss().ns(), db);
                return applyOp(ctx.db());
            } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
                ex.addContext(str::stream() << "Failed to apply operation: " << redact(op));
                throw;
            }
        });
    } else if (opType == OpTypeEnum::kCommand) { //DDL操作
        return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
            // a command may need a global write lock. so we will conservatively go
            // ahead and grab one here. suboptimal. :-(
            Lock::GlobalWrite globalWriteLock(opCtx);  //!注意:这里加的是全局的写锁!

            // special case apply for commands to avoid implicit database creation
            Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode);
            incrementOpsAppliedStats();
            return status;
        });
    }
  • 可以看到,除了对于DDL操作需要加全局写锁之外,其他的oplog apply操作都只需要db维度的锁即可。
  • 同一批次内的Oplog条目不一定按顺序apply
  • 对文档的操作必须是原子性的且有序的,因此对同一文档的操作将放在要序列化的同一线程上
  • 此外,$cmd操作将以大小为1的批处理顺序进行

然后在applyOperation_inlock()中对不同的op操作类型(n,i,u,d)进行了不同的处理,最终都是WriteUnitOfWork进行一次事务写操作并提交wuow.commit()。其中插入操作也会尝试进行批处理,以提高性能。

2.4 返回给主节点

这一过程由syncSourceFeedback完成。它会将自身最新的 lastAppliedOpTimelastDurableOpTime等信息通过replSetUpdatePosition内部命令返回给主节点。

ReplicationCoordinatorExternalState在启动时创建一个SyncSourceFeedback对象,该对象负责发送replSetUpdatePosition命令。

SyncSourceFeedback会启动一个循环。 在每次迭代中,它首先等待条件变量,每当ReplicationCoordinator发现副本集中的某个节点复制了更多操作并更新为最新状态时,该条件变量就会被通知。 在继续之前,它会检查它是否不处于PRIMARYSTARTUP状态。

然后,它获取节点的同步源,并创建一个Reporter,由该ReporterreplSetUpdatePosition命令发送到同步源。 该命令每隔keepAliveInterval毫秒(也就是(electionTimeout / 2))保持发送,以维护有关副本集中节点的活动信息。

replSetUpdatePosition命令包含以下信息:

  1. 一个opTimes数组,其中包含每个活动副本集成员的对象。 该信息由ReplicationCoordinator使用其SlaveInfo中的信息填充。 不包括被认为是挂掉的节点。 每个节点都包含以下信息:
- `last durable OpTime`
- `last applied OpTime`
- 成员ID
- `ReplicaSetConfig`版本
  1. ReplSetMetadata,副本集元数据,包括以下信息
- 上游节点的`last commited OpTime`
- 当前term
- `ReplicaSetConfig`的版本和term
- 副本集ID
- 上游节点是否为主

2.5 补充说明

可以看到,2.1~2.4中的任务分别由不同的线程进行处理,是相互独立的,他们都是由ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication)()启动的(SyncSourceResolveroplogFetcher的启动在BackgroundSync::startup()内部):

	log() << "Starting replication fetcher thread";	
	_oplogBuffer = stdx::make_unique<OplogBufferBlockingQueue>();
    _oplogBuffer->startup(opCtx);

    _bgSync =
        stdx::make_unique<BackgroundSync>(replCoord, this, _replicationProcess, _oplogBuffer.get());
    _bgSync->startup(opCtx);
_oplogApplier = stdx::make_unique<OplogApplier>(_oplogApplierTaskExecutor.get(),
                                                    _oplogBuffer.get(),
                                                    _bgSync.get(),
                                                    replCoord,
                                                    _replicationProcess->getConsistencyMarkers(),
                                                    _storageInterface,
                                                    OplogApplier::Options(),
                                                    _writerPool.get());
    _oplogApplierShutdownFuture = _oplogApplier->startup();

    log() << "Starting replication reporter thread";
	auto bgSyncPtr = _bgSync.get();
    _syncSourceFeedbackThread = stdx::make_unique<stdx::thread>([this, bgSyncPtr, replCoord] {
        _syncSourceFeedback.run(_taskExecutor.get(), bgSyncPtr, replCoord);
    });

三、结论

  • 内核中关于主从同步这一部分的代码相对比较清晰,不同的模块(线程)负责不同的工作,共同保证MongoDB的主从同步。
  • oplog并发回放以保证主从同步性能,并发中对于DDL又有加全局写锁的”串行“操作。
  • 为什么在oplogBufferoplogApplier中间要加一层opQueue以及ReplBatcher呢?一方面将oplog变得尽可能平滑,减少源端写入不均带来的影响;另一方面要做“并行--串行--并行”这样的转换操作,保证DDL是串行处理的(一个batch里面只有单条DDL操作,只会发送给后端16个回放线程中的一个)。
  • oplog并发回放先按namesapce排序,然后再_idhash到不同线程进行回放。同一批次内的oplog并不是按顺序apply的。按namespace排序应该是为了更好地利用局部性原理(同一个ns内的操作在相同的cache、内存或磁盘扇区的概率更大)

参考资料

mongodb source code

replication-internals

MongoDB主从复制介绍和常见问题说明