RabbitMQ网络框架代码分析
RabbitMQ的启动步骤是一个有向无环图,具体细节后面另外章节再聊,其中网络一块的启动集中在文件rabbit_network.erl中
boot() ->
ok = record_distribution_listener(),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
第1行往mnesia中插入监听信息,第4行是SSL的,我们也先不看,重点看下第2、3行代码。
1、start函数
start() -> rabbit_sup:start_supervisor_child(
rabbit_tcp_client_sup, rabbit_client_sup,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]).
启动名为 rabbit_tcp_client_sup 的监督者进程,这个监督者进程的入口为
rabbit_client_sup:start_link,后面一长串为启动参数。
rabbit_tcp_client_sup 进程启动就做1件事:启动rabbit_connection_sup监督者进程。
而rabbit_connection_sup监督者进程会启动真正干活的 rabbit_reader进程,这个后面再细讲。
2、boot_tcp函数
boot_tcp() ->
{ok, TcpListeners} = application:get_env(tcp_listeners),
[ok = start_tcp_listener(Listener) || Listener <- TcpListeners],
ok.
根据tcp_listeners配置调用 start_tcp_listener 函数;
start_tcp_listener会调用start_listener,后者会调用start_listener0
start_listener0(Address, Protocol, Label, OnConnect) ->
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
Protocol, Label, OnConnect),
case supervisor:start_child(rabbit_sup, Spec) of
{ok, _} -> ok;
{error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
exit({could_not_start_tcp_listener,
{rabbit_misc:ntoa(IPAddress), Port}})
end.
最终也是启动子进程,测试环境参数打印如下:
{'rabbit_tcp_listener_sup_:::5672',
{tcp_listener_sup,start_link,
[{0,0,0,0,0,0,0,0}, //IPAddress
5672, //Port
[inet6,binary,binary,
{packet,raw},
{reuseaddr,true},
{backlog,128},
{nodelay,true},
{linger,{true,0}},
{exit_on_close,false},
{active,false}], //SocketOpts
{rabbit_networking,tcp_listener_started,[amqp]}, //OnStartup
{rabbit_networking,tcp_listener_stopped,[amqp]}, //OnShutdown
{rabbit_networking,start_client,[]}, //AcceptCallback
"TCP Listener"]},
transient,infinity,supervisor,
[tcp_listener_sup]}
关注AcceptCallback函数,这个在每个连接进来的时候会调用,下面会再讲。
再以这些参数启动 tcp_listener_sup 监督者进程,后者再会启动 tcp_acceptor_sup和 tcp_listener子进程,tcp_acceptor_sup还是一个监督者进程,它会启动最终干活的 tcp_acceptor 进程。
是不是感觉有点晕,我们来整理下各进程关系:
tcp_listener 进程就是用来监听套接字的,它会调用 gen_tcp:listen 来监听套接字,下面会细讲。
上面我们整理了初始化顺序,那么一个连接过到底会经过哪些进程呢?
1)首先是tcp_listener
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}}
end.
这里省略了一些异常分支代码。
tcp_listener进程在初始化时会调用 AcceptorSup 也就是 tcp_acceptor_sup 来启动工作线程,真正干活的是tcp_acceptor进程,这个进程会通过 prim_inet:async_accept 来异步 accespt连接,每当新连接进来时会调用 rabbit_networking:start_client 进行初始化(前面有说明),相关代码如下:
init({Callback, LSock}) ->
gen_server:cast(self(), accept),
{ok, #state{callback=Callback, sock=LSock}}.
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
end.
rabbit_networking:start_client 会启动rabbit_reader,然后向其发送go 消息
start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
再看看rabbit_reader是如何处理 go 消息的:
init(Parent, HelperSup) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
start_connection然后会调用 recv_loop来启动消息循环,一直读包,然后解包,这块代码比较细,后面再细讲。
总结下,一个连接进程会经过的进程链如下,中间省略了监督者进程:
tcp_listener ——> tcp_acceptor ——> rabbit_reader
- 设计模式(5)-己所不欲,施之于人(代理模式)
- Python标准库笔记(4) — collections模块
- 使用captcha模块生成图形验证码
- 设计模式(6)-装饰器(认识程序中的装饰器)
- Selenium Webdriver常用方法
- 设计模式(7)-模板(从事务处理应用的模板)
- Python NLP入门教程
- 设计模式(8)-状态模式(关注状态之间的变化)
- Python标准库笔记(6) — struct模块
- Golang中image/jpeg包和image/png包用法
- Python Webdriver 重新使用已经打开的浏览器实例
- Golang-实现图片缩放
- jbpm5.1介绍(2)
- pytesser模块WindowsError错误解决方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 【动手学深度学习笔记】之构造MLP模型的几种方法
- 【动手学深度学习笔记】之通过丢弃法缓解过拟合问题
- 【动手学深度学习笔记】之通过权重衰减法解决过拟合问题
- 【动手学深度学习笔记】之多层感知机实现
- Linux程序员效率工具:比man更好用的命令提示工具
- 我对torch中的gather函数的一点理解
- 冒泡排序的实现思路和优化方案
- Java Map转对象
- 59.Vue 使用webpack构建vue项目
- Android初学设置文字跑马灯效果
- 使用Zolom内存解析运行python脚本(不落地)
- 要点3:输入函数对比与自定义输入方式
- 性能测试必备命令(3)- lscpu
- 性能测试必备命令(2)- uptime
- Lua/luajit 点与冒号的区别