RabbitMQ网络框架代码分析

时间:2022-07-24
本文章向大家介绍RabbitMQ网络框架代码分析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

RabbitMQ的启动步骤是一个有向无环图,具体细节后面另外章节再聊,其中网络一块的启动集中在文件rabbit_network.erl中

boot() ->
    ok = record_distribution_listener(),
    ok = start(),
    ok = boot_tcp(),
    ok = boot_ssl().

第1行往mnesia中插入监听信息,第4行是SSL的,我们也先不看,重点看下第2、3行代码。

1、start函数

start() -> rabbit_sup:start_supervisor_child(
             rabbit_tcp_client_sup, rabbit_client_sup,
             [{local, rabbit_tcp_client_sup},
              {rabbit_connection_sup,start_link,[]}]).

启动名为 rabbit_tcp_client_sup 的监督者进程,这个监督者进程的入口为

rabbit_client_sup:start_link,后面一长串为启动参数。

rabbit_tcp_client_sup 进程启动就做1件事:启动rabbit_connection_sup监督者进程。

而rabbit_connection_sup监督者进程会启动真正干活的 rabbit_reader进程,这个后面再细讲。

2、boot_tcp函数

boot_tcp() ->
    {ok, TcpListeners} = application:get_env(tcp_listeners),
    [ok = start_tcp_listener(Listener) || Listener <- TcpListeners],
    ok.

根据tcp_listeners配置调用 start_tcp_listener 函数;

start_tcp_listener会调用start_listener,后者会调用start_listener0

start_listener0(Address, Protocol, Label, OnConnect) ->
    Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
                             Protocol, Label, OnConnect),
    case supervisor:start_child(rabbit_sup, Spec) of
        {ok, _}                -> ok;
        {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
                                  exit({could_not_start_tcp_listener,
                                        {rabbit_misc:ntoa(IPAddress), Port}})
    end.

最终也是启动子进程,测试环境参数打印如下:

{'rabbit_tcp_listener_sup_:::5672',  
                  {tcp_listener_sup,start_link,
                      [{0,0,0,0,0,0,0,0},  //IPAddress
                       5672,  //Port
                       [inet6,binary,binary,
                        {packet,raw},
                        {reuseaddr,true},
                        {backlog,128},
                        {nodelay,true},
                        {linger,{true,0}},
                        {exit_on_close,false},
                        {active,false}], //SocketOpts
                       {rabbit_networking,tcp_listener_started,[amqp]}, //OnStartup
                       {rabbit_networking,tcp_listener_stopped,[amqp]}, //OnShutdown
                       {rabbit_networking,start_client,[]}, //AcceptCallback
                       "TCP Listener"]},
                  transient,infinity,supervisor,
                  [tcp_listener_sup]}

关注AcceptCallback函数,这个在每个连接进来的时候会调用,下面会再讲。

再以这些参数启动 tcp_listener_sup 监督者进程,后者再会启动 tcp_acceptor_sup和 tcp_listener子进程,tcp_acceptor_sup还是一个监督者进程,它会启动最终干活的 tcp_acceptor 进程。

是不是感觉有点晕,我们来整理下各进程关系:

tcp_listener 进程就是用来监听套接字的,它会调用 gen_tcp:listen 来监听套接字,下面会细讲。

上面我们整理了初始化顺序,那么一个连接过到底会经过哪些进程呢?

1)首先是tcp_listener

init({IPAddress, Port, SocketOpts,
      ConcurrentAcceptorCount, AcceptorSup,
      {M,F,A} = OnStartup, OnShutdown, Label}) ->
    process_flag(trap_exit, true),
    case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
                                             {active, false}]) of
        {ok, LSock} ->
            lists:foreach(fun (_) ->
                                  {ok, _APid} = supervisor:start_child(
                                                  AcceptorSup, [LSock])
                          end,
                          lists:duplicate(ConcurrentAcceptorCount, dummy)),
            {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
            apply(M, F, A ++ [IPAddress, Port]),
            {ok, #state{sock = LSock,
                        on_startup = OnStartup, on_shutdown = OnShutdown,
                        label = Label}}
    end.

这里省略了一些异常分支代码。

tcp_listener进程在初始化时会调用 AcceptorSup 也就是 tcp_acceptor_sup 来启动工作线程,真正干活的是tcp_acceptor进程,这个进程会通过 prim_inet:async_accept 来异步 accespt连接,每当新连接进来时会调用 rabbit_networking:start_client 进行初始化(前面有说明),相关代码如下:

init({Callback, LSock}) ->
    gen_server:cast(self(), accept),
    {ok, #state{callback=Callback, sock=LSock}}.
    
accept(State = #state{sock=LSock}) ->
    case prim_inet:async_accept(LSock, -1) of
        {ok, Ref} -> {noreply, State#state{ref=Ref}};
        Error     -> {stop, {cannot_accept, Error}, State}
    end.

rabbit_networking:start_client 会启动rabbit_reader,然后向其发送go 消息

start_client(Sock, SockTransform) ->
    {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
    ok = rabbit_net:controlling_process(Sock, Reader),
    Reader ! {go, Sock, SockTransform},

再看看rabbit_reader是如何处理 go 消息的:

init(Parent, HelperSup) ->
    Deb = sys:debug_options([]),
    receive
        {go, Sock, SockTransform} ->
            start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
    end.

start_connection然后会调用 recv_loop来启动消息循环,一直读包,然后解包,这块代码比较细,后面再细讲。

总结下,一个连接进程会经过的进程链如下,中间省略了监督者进程:

tcp_listener ——> tcp_acceptor ——> rabbit_reader