Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5119574
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Erlang

2013-03-23 19:32:19

(注:分析代码基于RabbitMQ 2.8.2

 

网络层的启动也是作为上一篇文章中提到的一个启动步骤来启动的,入口为[$RABBIT_SRC/src/rabbit_networking.erl --> boot/0],代码如下:

  1. boot() ->
  2.     ok = start(),
  3.     ok = boot_tcp(),
  4.     ok = boot_ssl().
      [$RABBIT_SRC/src/rabbit_networking.erl--> start/0]构建rabbit_client_sup子监控树,[$RABBIT_SRC/src/rabbit_networking.erl --> boot_tcp/0],启动TCP监听,并构建成tcp_listener_sup子监控树,这两个子树都属于rabbit最顶层监控结点rabbit_supSSL的类似)。最终的监控树如下:





(注:方框代表supervisor类型,圆角代表worker;虚线灰底的文字代表相应层子进程重启策略;各结点以各自所在的模块命名,实际监控树中的结点名有部分不同。)

[$RABBIT_SRC/src/tcp_listener.erl --> init/1]通过gen_tcp:listen/2启动监听,并根据ConcurrentAcceptorCount参数启动指定数量个tcp_acceptor(当前这个数量是1)。

     [$RABBIT_SRC/src/tcp_acceptor.erl]通过prim_inet:async_accept/2开始异步接受来自客户端的连接,主要代码如下:

  1. handle_cast(accept, State) ->
  2.     ok = file_handle_cache:obtain(),
  3.     accept(State);
  4. handle_info({inet_async, LSock, Ref, {ok, Sock}},
  5.             State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
  6.     %% patch up the socket so it looks like one we got from
  7.     %% gen_tcp:accept/1
  8.     {ok, Mod} = inet_db:lookup_socket(LSock),
  9.     inet_db:register_socket(Sock, Mod),

  10.     %% handle
  11.     file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
  12.     ok = file_handle_cache:obtain(),
  13.     %% accept more
  14. accept(State);

  15. accept(State = #state{sock=LSock}) ->
  16.     case prim_inet:async_accept(LSock, -1) of
  17.         {ok, Ref} -> {noreply, State#state{ref=Ref}};
  18.         Error -> {stop, {cannot_accept, Error}, State}
  19. end.


[$RABBIT_SRC/src/ file_handle_cache.erl]在这里的主要作用是管理每个进程拥有的文件描述符数量,确保每个进程不会超过设定的上限。其中,obtain/0方法增加拥有的文件描述符数量,transfer/1将文件描述符的拥有权转移到另外一个进程(原进程拥有的打开描述符数量减1,转移目标进程拥有的文件描述符数量加1,这里的目标进程是通过callback返回的rabbit_reader进程)。

 

 state结构中的callback是由[$RABBIT_SRC/src/rabbit_networking.erl]在启动TCP监听端口时指定的,其值为[$RABBIT_SRC/src/rabbit_networking.erl --> start_client/1],每个客户端连接上来后,会通过apply方法调用(见上面代码中的apply(M, F, A)),实际调用[$RABBIT_SRC/src/rabbit_networking.erl --> start_client/2],主要代码如下:

 

  1. start_client(Sock, SockTransform) ->
  2.     {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
  3.     ok = rabbit_net:controlling_process(Sock, Reader),
  4.     Reader ! {go, Sock, SockTransform},
  5.     Reader.

(这里rabbit_tcp_client_sup对应上图中rabbit_client_sup结点)

 

supervisor:start_child(rabbit_tcp_client_sup, [])会按照上图中rabbit_client_sup下的监控树结构依次构造各个子结点。也就是说在每一次服务端接受一个新的连接时,都会创建一个rabbit_client_sup的子进程(simple_one_for_one),并依次构建各个子结点(上图中黄色区域)。并最终返回创建的rabbit_reader

 

rabbit_net:controlling_process/2将上一步返回的rabbit_reader进程作为接收socket消息的进程(调用gen_tcp:controlling_process/2实现,原始接受消息的是tcp_acceptor进程)。

 

Reader ! {go, Sock, SockTransform}向目标rabbit_reader进程发送消息,指示开始接受从客户端发送来的数据。

 

rabbit_reader进程在创建后,会通过如下代码等待上述的go消息:



  1. receive
  2.         {go, Sock, SockTransform} ->
  3.             start_connection(
  4.               Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
  5.               SockTransform)
  6. end



(参见[$RABBIT_SRC/src/rabbit_reader.erl --> init/4]

 

收到go消息后,rabbit_reader开始与客户端进行消息交互。(参见[$RABBIT_SRC/src/rabbit_reader.erl --> start_connection /7])主要代码如下:


  1. start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
  2.                  Sock, SockTransform) ->
  3.     process_flag(trap_exit, true),
  4.     ConnStr = name(Sock),
  5.     log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
  6.     ClientSock = socket_op(Sock, SockTransform),
  7.     erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
  8.                       handshake_timeout),
  9.     State = #v1{parent = Parent,}, // 初始化state,部分代码省略

  10.     try
  11.         recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
  12.                                        State, #v1.stats_timer),
  13.                                       handshake, 8)),
  14.         log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
  15.     catch
  16.         // 异常处理

  17.     after
  18.         rabbit_net:maybe_fast_close(ClientSock),
  19.         rabbit_event:notify(connection_closed, [{pid, self()}])
  20.     end,
  21. done.


从上面的log语句可以看出,这时候与客户端的连接已经确定,要开始AMQP的握手过程。接收消息的主循环由recvloop/2来完成,其中第二个参数State保存当前连接的各种状态。recvloop按照AMQP协议对数据进行解析。

 

recvloop并没有使用任何一个OTP的行为模式,而是自己设计的一种状态转换的逻辑,该逻辑的实现依赖于AMQP协议本身,其中关键的数据是State里的callback以及recv_lenAMQP协议要求,客户端建立一个连接时,必需向服务端发送8个字节的协议头(包含AMQP4个字符以及客户端使用的AMQP版本),所以rabbit在一开始的时候,设定的recv_len=8,也就是说rabbit希望从客户端收到8个字节的数据,然后对这8个字节的处理对应handshake处理逻辑(见上面代码的recvloop/2调用)。handshake完成后,rabbit会根据AMQP协议定义的frame格式等待客户端发送来的数据:先是7个字符的frame头(recv_len=7, callback=frame_header),然后是长度声明在头中的负载数据(recv_len=PayloadSize+1callback=frame_payload,每个frame都有个结束符,所以这里负载长度要加1),收到负载数据后,会根据负载数据做对应处理,处理完成后,继续待续下一个frame。所以整个协议解析的过程就如下图所示:






rabbit会根据客户端发送的请求进行相应的操作,比如创建channel,创建exchange,创建queue等等。下篇会分析在创建每个实体时,都会做哪些工作。



原文章连接:http://jzhihui.iteye.com/blog/1544779



阅读(3466) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~