RabbitMQ源代码分析系列三:消息存储

时间:2022-07-24
本文章向大家介绍RabbitMQ源代码分析系列三:消息存储,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

今天分析RabbitMQ消息的持久化,即客户端发送一条持久化的MQ消息后,服务端做了哪些事情。

下面是客户端的发送代码:

$client = new Client('127.0.0.1', 5672, 'guest', 'guest');
        //设置正常交换机、队列
        $type = 'topic';
        $routingKey = 'hello';
        $exchangeName = 'hello_exchange'
        $exchange = new Exchange($client, $exchangeName, $type);
        $exchange->setDurable(true);

        //队列
        $queue = new Queue(
            $client, $this->queueName, [
                new Consumer(
                    function (AMQPMessage $msg) {
                        var_dump($msg);
                    }
                ),
            ]
        );

        $binding = new Binding($exchange, $queue);
        $binding->setRoutingKey($routingKey);

        $client->register($binding);

        $message = new Message("hello" . str_repeat('123456789', 13));
        $res     = $exchange->publish($message, $routingKey);

分析下网络包,发送消息的时候,实际上是往服务端发送basic.publish命令。

调用链分析

入口在rabbit_channel文件:

handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
                               routing_key = RoutingKey,
                               mandatory   = Mandatory},
              Content, State = #ch{virtual_host    = VHostPath,
                                   tx              = Tx,
                                   channel         = ChannelNum,
                                   confirm_enabled = ConfirmEnabled,
                                   trace_state     = TraceState,
                                   user            = #user{username = Username},
                                   conn_name       = ConnName,
                                   delivery_flow   = Flow}) ->

  ……
    case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
        {ok, Message} ->
            Delivery = rabbit_basic:delivery(
                         Mandatory, DoConfirm, Message, MsgSeqNo),
            QNames = rabbit_exchange:route(Exchange, Delivery),
            DQ = {Delivery#delivery{flow = Flow}, QNames},
            {noreply, case Tx of
                          none         -> deliver_to_queues(DQ, State1);
                          {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
                                          State1#ch{tx = {Msgs1, Acks}}
                      end};
    end;

上面删除了一些非关键代码,这里看是否有事务,如果没事务则 通过 deliver_to_queues 发送, 有事务先进队列,今天主要分析无事务的处理过程。

deliver_to_queues({Delivery = #delivery{message    = Message = #basic_message{
                                                       exchange_name = XName},
                                        mandatory  = Mandatory,
                                        confirm    = Confirm,
                                        msg_seq_no = MsgSeqNo},
                   DelQNames}, State = #ch{queue_names    = QNames,
                                           queue_monitors = QMons}) ->
    Qs = rabbit_amqqueue:lookup(DelQNames),
    DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),

后者调用 rabbit_amqqueue:deliver 来处理:

deliver(Qs, Delivery = #delivery{flow = Flow}) ->
    {MPids, SPids} = qpids(Qs),
    QPids = MPids ++ SPids,

    MMsg = {deliver, Delivery, false},
    SMsg = {deliver, Delivery, true},
    delegate:cast(MPids, MMsg),
    delegate:cast(SPids, SMsg),
    QPids.

deliver的逻辑就 比较简单,分主、从进程ID,如果没有开启镜像队列,从进程ID是空的,今天先不分析镜像队列。

发送deliver消息到主进程,这个进程是rabbit-amqueue-process。

再来看rabbit-amqueue-process是如何处理的:

handle_cast({deliver, Delivery = #delivery{sender = Sender,
                                           flow   = Flow}, SlaveWhenPublished},
            State = #q{senders = Senders}) ->
    
  %% SlaveWhenPublished 只有在从的时候才为true
    noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));

中间的代码还比较多,就不一一贴了,大概说下,deliver_or_enqueue会调用attempt_delivery,然后调用到rabbit-variable-queue:publish

publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
        MsgProps = #message_properties { needs_confirming = NeedsConfirming },
        IsDelivered, _ChPid, _Flow,
        State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
                           qi_embed_msgs_below = IndexMaxSize,
                           next_seq_id         = SeqId,
                           in_counter          = InCount,
                           durable             = IsDurable,
                           unconfirmed         = UC }) ->
    IsPersistent1 = IsDurable andalso IsPersistent,
    MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
     {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
   

调用maybe_write_to_disk 进行消息的持久化:

maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
    {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
    maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).

maybe_write_msg_to_disk用来将消息持久化,maybe_write_index_to_disk用来将索引持久化。

maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
                                 msg = Msg, msg_id = MsgId,
                                 is_persistent = IsPersistent },
                        State = #vqstate{ msg_store_clients = MSCState,
                                          disk_write_count  = Count})
  when Force orelse IsPersistent ->
    case persist_to(MsgStatus) of
        msg_store   -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
                                            prepare_to_store(Msg)),
                       {MsgStatus#msg_status{msg_in_store = true},
                        State#vqstate{disk_write_count = Count + 1}};
        queue_index -> {MsgStatus, State}
    end;

如果消息大小小于配置文件中的queue_index_embed_msgs_below,

则persist_to返回queue_index,反之返回 msg_store,这个参数默认是4096,即如果消息体大小小于4096,是不会将消息写到消息持久化文件,而是写到索引文件中。

消息的持久化由文件rabbit_msg_store负责,msg_store_write会调用write_message进行消息的保存:

write_message(MsgId, Msg,
              State = #msstate { current_file_handle = CurHdl,
                                 current_file        = CurFile,
                                 sum_valid_data      = SumValid,
                                 sum_file_size       = SumFileSize,
                                 file_summary_ets    = FileSummaryEts }) ->
    {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
    {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),

  %%io:format("insert index, msg_id:~p, file:~p, offset:~p, total_size:~p ~n", [MsgId, CurFile, CurOffset, TotalSize]),
    ok = index_insert(
           #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
                           offset = CurOffset, total_size = TotalSize }, State),
    [#file_summary { right = undefined, locked = false }] =
        ets:lookup(FileSummaryEts, CurFile),
    [_,_] = ets:update_counter(FileSummaryEts, CurFile,
                               [{#file_summary.valid_total_size, TotalSize},
                                {#file_summary.file_size,        TotalSize}]),
    maybe_roll_to_new_file(CurOffset + TotalSize,
                           State #msstate {
                             sum_valid_data = SumValid    + TotalSize,
                             sum_file_size  = SumFileSize + TotalSize }).

这里的逻辑就比较简单了,将消息内容到当前文件,再判断当前文件的大小,如果需要,则创建一个新的持久化文件。

这里讲一下segment,每个segment对应一个文件(所在的目录在mnesia数据目录下的msg_store_persistent)。每个文件最多可以保存SEGMENT_ENTRY_COUNT(16384)个消息索引信息。

这些文件是以整数来命名的,某条消息对应哪个segment文件呢?用消息索引本身对SEGMENT_ENTRY_COUNT取整,相关代码可以看下

rabbit_queue_index:add_to_journal。

最后再看索引的持久化

maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
                                   msg           = Msg,
                                   msg_id        = MsgId,
                                   seq_id        = SeqId,
                                   is_persistent = IsPersistent,
                                   is_delivered  = IsDelivered,
                                   msg_props     = MsgProps},
                          State = #vqstate{target_ram_count = TargetRamCount,
                                           disk_write_count = DiskWriteCount,
                                           index_state      = IndexState})
  when Force orelse IsPersistent ->
{MsgOrId, DiskWriteCount1} =
        case persist_to(MsgStatus) of
            msg_store   -> {MsgId, DiskWriteCount};
            queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
        end,
    IndexState1 = rabbit_queue_index:publish(
                    MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
                    IndexState),

索引通过rabbit_queue_index:publish 来落盘:

publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
        State = #qistate{unconfirmed     = UC,
                         unconfirmed_msg = UCM}) ->
    MsgId = case MsgOrId of
                #basic_message{id = Id} -> Id;
                Id when is_binary(Id)   -> Id
            end,
    ?MSG_ID_BYTES = size(MsgId),
  %%JournalHd1对应journal.jif
    {JournalHdl, State1} =
        get_journal_handle(
          case {MsgProps#message_properties.needs_confirming, MsgOrId} of
              {true,  MsgId} -> UC1  = gb_sets:add_element(MsgId, UC),
                                State#qistate{unconfirmed     = UC1};
              {true,  _}     -> UCM1 = gb_sets:add_element(MsgId, UCM),
                                State#qistate{unconfirmed_msg = UCM1};
              {false, _}     -> State
          end),
    file_handle_cache_stats:update(queue_index_journal_write),
    {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
    ok = file_handle_cache:append(
           JournalHdl, [<<(case IsPersistent of
                               true  -> ?PUB_PERSIST_JPREFIX;
                               false -> ?PUB_TRANS_JPREFIX
                           end):?JPREFIX_BITS,
                          SeqId:?SEQ_BITS, Bin/binary,
                          (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
      maybe_flush_journal(
      JournalSizeHint,
      add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).

索引文件会先写到 journal缓存中,再定期刷到磁盘中,相关参数为

queue_index_max_journal_entries,

判断当前写入次数是否达到queue_index_max_journal_entries,是则进行刷盘到索引持久化文件。

实际刷盘是在 rabbit_variable_queue:handle_pre_hibernate中异步去刷的,这里不详述。

索引持久化文件在mnesia目录的queues目录下,文件扩展名为idx。

如何保证消息的不丢呢,即如果写入journal文件成功了,但没有刷新到索引的持久化文件中如何恢复,可以看下代码 rabbit_variable_queue:init, RabbitMQ启动的时候启动每个队列之前会调用它来从journal中恢复索引和消息。

最后总结

持久化分消息体和索引的持久化,如果消息体小于queue_index_embed_msgs_below,则将消息写入到索引文件中,只进行1次磁盘操作,反之要写2次磁盘:消息体+索引,消息体写入到segment文件中,一个segment可以保存16384条消息。

为了加快写入的性能,写入消息体时是追加方式进行的;索引的持久化则是先追加到journal文件中,再异步刷新到索引文件中。