RabbitMQ网络框架代码分析二:命令分发

时间:2022-07-24
本文章向大家介绍RabbitMQ网络框架代码分析二:命令分发,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前面讲了基本的网络框架,说了一个连接进来会经过哪些文件,RabbitMQ网络框架代码分析,今天会讲消息的具体的处理过程,即一条消息过来,最终是如何分发到具体的处理函数的。

从rabbit_reader:start_connection说起吧,一个连接进来会调用这个函数,它会调用recvloop进行不断地读数据,并进行相应的处理:

start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
  
    State = #v1{parent              = Parent,
                sock                = ClientSock,
                connection          = #connection{
                  name               = list_to_binary(Name),
                  host               = Host,
                  peer_host          = PeerHost,
                  port               = Port,
                  peer_port          = PeerPort,
                  protocol           = none,
                  user               = none,
                  timeout_sec        = (HandshakeTimeout / 1000),
                  frame_max          = ?FRAME_MIN_SIZE,
                  vhost              = none,
                  client_properties  = none,
                  capabilities       = [],
                  auth_mechanism     = none,
                  auth_state         = none,
                  connected_at       = rabbit_misc:now_to_ms(os:timestamp())},
                callback            = uninitialized_callback,
                recv_len            = 0,
                pending_recv        = false,
                connection_state    = pre_init,
                queue_collector     = undefined,  %% started on tune-ok
                helper_sup          = HelperSup,
                heartbeater         = none,
                channel_sup_sup_pid = none,
                channel_count       = 0,
                throttle            = #throttle{
                                         alarmed_by      = [],
                                         last_blocked_by = none,
                                         last_blocked_at = never}},

        run({?MODULE, recvloop,
             [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
                                            State, #v1.stats_timer),
                                          handshake, 8)]})

注:这里删掉了一些非核心代码。

重点关注switch_callback 函数,其中第2个参数,表示当前由哪个函数处理,初始化时由 handshake 来处理;第3个参数将设置接下来要接收数据包的长度,初始化时8。

这里简单说下一次消息发送会发生哪些交互:

这是在测试环境截取的包,其中第一个包就是发送handshake包,具体格式如下:

这里不深入讨论具体命令的含义,有兴趣的同学可以查下相关资料。

再来看recvloop 函数:

Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
    mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
    throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
  when BufLen < RecvLen ->
    case rabbit_net:setopts(Sock, [{active, once}]) of
        ok              -> mainloop(Deb, Buf, BufLen,
                                    State#v1{pending_recv = true});
        {error, Reason} -> stop(Reason, State)
    end;
recvloop(Deb, [B], _BufLen, State) ->
    {Rest, State1} = handle_input(State#v1.callback, B, State),
    recvloop(Deb, [Rest], size(Rest), State1);
recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
    {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
    Data = list_to_binary(lists:reverse(DataLRev)),
    {<<>>, State1} = handle_input(State#v1.callback, Data, State),
    recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).

这里有6个分支,我们从上到下为 每一个分支编号,实际上调用的最多的是4,5,6这几个分支。

刚开始设置要读取8个字节,并且callback为 handshake ,所以走到分支4,

分支4会调用mainloop读取数据:

 Recv = rabbit_net:recv(Sock),
   
    case Recv of
        {data, Data} ->
            recvloop(Deb, [Data | Buf], BufLen + size(Data),
                     State#v1{pending_recv = false});

读到数据之后,来到分支5,

recvloop(Deb, [B], _BufLen, State) ->
    {Rest, State1} = handle_input(State#v1.callback, B, State),
    recvloop(Deb, [Rest], size(Rest), State1);

调用 handle_input 来处理数据,这里的callback是 handshake 。

handle_input也有不同的分支,我们看下 handshake 分支:

handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
    {Rest, handshake({A, B, C, D}, State)};
    
handshake({0, 0, 9, 1}, State) ->
    start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);

start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
                 Protocol,
                 State = #v1{sock = Sock, connection = Connection}) ->
    Start = #'connection.start'{
      version_major = ProtocolMajor,
      version_minor = ProtocolMinor,
      server_properties = server_properties(Protocol),
      mechanisms = auth_mechanisms_binary(Sock),
      locales = <<"en_US">> },
    ok = send_on_channel0(Sock, Start, Protocol),
    switch_callback(State#v1{connection = Connection#connection{
                                            timeout_sec = ?NORMAL_TIMEOUT,
                                            protocol = Protocol},
                             connection_state = starting},
                    frame_header, 7).

handshake会调用start_connection, 后者会向客户端回一个connection.start的包,并且将callback设置为frame_header。

frame_header就是读取消息头,大部分消息头都是一个固定的格式:

Type:1字节

Channel:1字节

Length:4字节

Body:长度为上面的Length

看下frame_header的处理:

handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
             State) ->
    {Rest, ensure_stats_timer(
             switch_callback(State,
                             {frame_payload, Type, Channel, PayloadSize},
                             PayloadSize + 1))};

它会将 callback设置为 frame_payload,

handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
    <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
    case EndMarker of
        ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
                      {Rest, switch_callback(State1, frame_header, 7)};
        _          -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
                                        Type, Channel, Payload, State)
    end;

它会调用handle_frame 来分发命令,

handle_frame(Type, 0, Payload,
             State = #v1{connection = #connection{protocol = Protocol}})
  when ?IS_STOPPING(State) ->
    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
        {method, MethodName, FieldsBin} ->
            handle_method0(MethodName, FieldsBin, State);
        _Other -> State
    end;

handle_frame 会调用rabbit_command_assembler:analyze_frame 来拆包,拆出来的就是具体的命令了,这里的Protocol就是0_9_0_1,对应src/rabbit_framing_amqp_0_9_1.erl。

lookup_method_name({10, 10}) -> 'connection.start';
lookup_method_name({10, 11}) -> 'connection.start_ok';
lookup_method_name({10, 20}) -> 'connection.secure';
lookup_method_name({10, 21}) -> 'connection.secure_ok';
lookup_method_name({10, 30}) -> 'connection.tune';
lookup_method_name({10, 31}) -> 'connection.tune_ok';

这里根据不同的字节流得到相应的命令,PS:erlang做这些确实很简单~

得到命令之后就是执行命令了,由 handle_method0 来执行,这里就不详细讲了,可以自己看下代码

handle_method0(#'connection.start_ok'{mechanism = Mechanism,
                                      response = Response,
                                      client_properties = ClientProperties},
               State0 = #v1{connection_state = starting,
                            connection       = Connection,
                            sock             = Sock}) ->
    AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
    Capabilities =
        case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
            {table, Capabilities1} -> Capabilities1;
            _                      -> []
        end,
    State = State0#v1{connection_state = securing,
                      connection       =
                          Connection#connection{
                            client_properties = ClientProperties,
                            capabilities      = Capabilities,
                            auth_mechanism    = {Mechanism, AuthMechanism},
                            auth_state        = AuthMechanism:init(Sock)}},
    auth_phase(Response, State);

一部分消息处理命令在 rabbit_reader.erl中,还是些是关于channel的,相应命令的 rabbit_channel.erl文件中。

最后我们总结下,消息分发流程:

1、start_connection

2、recvloop,它会调用mainloop读取数据

3、handle_input 处理数据

正常来说消息分这几步:

约定协议:handshake

读取消息头:header

读取消息body:payload

payload的格式是:

Type:1字节

Channel:1字节

Length:4字节

Body:Length字节

4、消息处理

主要处理函数是handle_frame,拆包之后调用不同的handle_method0函数。