RabbitMQ网络框架代码分析二:命令分发
前面讲了基本的网络框架,说了一个连接进来会经过哪些文件,RabbitMQ网络框架代码分析,今天会讲消息的具体的处理过程,即一条消息过来,最终是如何分发到具体的处理函数的。
从rabbit_reader:start_connection说起吧,一个连接进来会调用这个函数,它会调用recvloop进行不断地读数据,并进行相应的处理:
start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
name = list_to_binary(Name),
host = Host,
peer_host = PeerHost,
port = Port,
peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
capabilities = [],
auth_mechanism = none,
auth_state = none,
connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
queue_collector = undefined, %% started on tune-ok
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
channel_count = 0,
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
last_blocked_at = never}},
run({?MODULE, recvloop,
[Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)]})
注:这里删掉了一些非核心代码。
重点关注switch_callback 函数,其中第2个参数,表示当前由哪个函数处理,初始化时由 handshake 来处理;第3个参数将设置接下来要接收数据包的长度,初始化时8。
这里简单说下一次消息发送会发生哪些交互:
这是在测试环境截取的包,其中第一个包就是发送handshake包,具体格式如下:
这里不深入讨论具体命令的含义,有兴趣的同学可以查下相关资料。
再来看recvloop 函数:
Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
case rabbit_net:setopts(Sock, [{active, once}]) of
ok -> mainloop(Deb, Buf, BufLen,
State#v1{pending_recv = true});
{error, Reason} -> stop(Reason, State)
end;
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
{DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
Data = list_to_binary(lists:reverse(DataLRev)),
{<<>>, State1} = handle_input(State#v1.callback, Data, State),
recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
这里有6个分支,我们从上到下为 每一个分支编号,实际上调用的最多的是4,5,6这几个分支。
刚开始设置要读取8个字节,并且callback为 handshake ,所以走到分支4,
分支4会调用mainloop读取数据:
Recv = rabbit_net:recv(Sock),
case Recv of
{data, Data} ->
recvloop(Deb, [Data | Buf], BufLen + size(Data),
State#v1{pending_recv = false});
读到数据之后,来到分支5,
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
调用 handle_input 来处理数据,这里的callback是 handshake 。
handle_input也有不同的分支,我们看下 handshake 分支:
handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
{Rest, handshake({A, B, C, D}, State)};
handshake({0, 0, 9, 1}, State) ->
start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
protocol = Protocol},
connection_state = starting},
frame_header, 7).
handshake会调用start_connection, 后者会向客户端回一个connection.start的包,并且将callback设置为frame_header。
frame_header就是读取消息头,大部分消息头都是一个固定的格式:
Type:1字节
Channel:1字节
Length:4字节
Body:长度为上面的Length
看下frame_header的处理:
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
State) ->
{Rest, ensure_stats_timer(
switch_callback(State,
{frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1))};
它会将 callback设置为 frame_payload,
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
<<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
{Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
它会调用handle_frame 来分发命令,
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}})
when ?IS_STOPPING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
end;
handle_frame 会调用rabbit_command_assembler:analyze_frame 来拆包,拆出来的就是具体的命令了,这里的Protocol就是0_9_0_1,对应src/rabbit_framing_amqp_0_9_1.erl。
lookup_method_name({10, 10}) -> 'connection.start';
lookup_method_name({10, 11}) -> 'connection.start_ok';
lookup_method_name({10, 20}) -> 'connection.secure';
lookup_method_name({10, 21}) -> 'connection.secure_ok';
lookup_method_name({10, 30}) -> 'connection.tune';
lookup_method_name({10, 31}) -> 'connection.tune_ok';
这里根据不同的字节流得到相应的命令,PS:erlang做这些确实很简单~
得到命令之后就是执行命令了,由 handle_method0 来执行,这里就不详细讲了,可以自己看下代码
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State0 = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
Capabilities =
case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
一部分消息处理命令在 rabbit_reader.erl中,还是些是关于channel的,相应命令的 rabbit_channel.erl文件中。
最后我们总结下,消息分发流程:
1、start_connection
2、recvloop,它会调用mainloop读取数据
3、handle_input 处理数据
正常来说消息分这几步:
约定协议:handshake
读取消息头:header
读取消息body:payload
payload的格式是:
Type:1字节
Channel:1字节
Length:4字节
Body:Length字节
4、消息处理
主要处理函数是handle_frame,拆包之后调用不同的handle_method0函数。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- android自定义圆形倒计时显示控件
- android实现上下左右滑动界面布局
- Android使用MediaCodec将摄像头采集的视频编码为h264
- Android开发人脸识别登录功能
- Android利用碎片fragment实现底部标题栏(Github模板开源)
- Android MediaPlayer 播放音频的方式
- Android切圆角的几种常见方式总结
- Android DSelectorBryant 单选滚动选择器的实例代码
- Android 拍照选择图片并上传功能的实现思路(包含权限动态获取)
- Android Canvas的drawText()与文字居中方案详解
- JeecgCloud版,部署项目。
- docker(镜像常用命令)
- [- Flutter基础篇 -] 聊聊那些弹框
- 聊一聊Android中的StateListAnimator
- Linux KeyLogger