Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5120289
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Python/Ruby

2012-10-16 10:25:23

在霸爷的推荐下,看了hotwheels的代码,接下来我就来分析下hotwheels的代码(主要是server端代码),hotwheels是干吗的呢,介绍在这里:

  1. Janus is a messaging server optimized to unicast over TCP to thousands of clients subscribed to topics of interest.
  2. The ultimate goal is to maintain a latency of less than 2 seconds for 20 thousand clients on Amazon EC2 (small instance).

首先来看janus.app:

  1. {application, janus,
  2.  [{description, "Janus"},
  3.   {vsn, "0.0.1"},
  4.   {id, "janus"},
  5.   {modules, [barrier,
  6.              bin,
  7.              bot,
  8.              client_proxy,
  9.              common,
  10.              flashbot,
  11.              histo,
  12.              janus,
  13.              janus_acceptor,
  14.              janus_admin,
  15.              janus_app,
  16.              janus_flash,
  17.              launcher,
  18.              mapper,
  19.              pubsub,
  20.              topman,
  21.              t,
  22.              transport,
  23.              util
  24.             ]},
  25.   {registered, [janus_sup,
  26.                 janus_topman_sup,
  27.                 janus_proxy_mapper_sup,
  28.                 janus_transport_sup,
  29.                 janus_listener]},
  30.   {applications, [kernel,
  31.                   stdlib,
  32.                   mnesia,
  33.                   inets
  34.                  ]},
  35.   {mod, {janus_app, []}},
  36.   {env, []}
  37.  ]
  38. }.

具体每个域的意思这里就不介绍了,详细可以去看erlang的文档

我们主要来看mod这个tuple,可以看到回调模块是janus_app,所以我们就从janus_app开始。

通过模块定义我们可以清楚的看到这个模块是一个application:

  1. -module(janus_app).
  2. -behaviour(application).

因此我们来看它的start函数:

  1. -define(LISTEN_PORT, 8081).

  2. start(_Type, _Args) ->
  3.     Port = janus_admin:get_env(listen_port, ?LISTEN_PORT),
  4.     supervisor:start_link({local, ?MODULE},
  5.                           ?MODULE,
  6.                           [Port, transport]).

这里可以看到首先会从环境变量里面取得端口(命令行参数),而默认的port是8001,然后调用supervisor start_link函数,这个函数会启动创建并启动一个supervisor,这里可以看到回调模块是当前模块,因此我们接下来就来看当前模块的init函数.

init返回的child spec的格式我就不介绍了,可以去看erlang的手册

  1. %% Supervisor behaviour callbacks

  2. init([Port, Module]) ->
  3.     {ok,
  4.      {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
  5.       [
  6.        %% TCP server
  7.        {janus_sup,
  8.         {janus_acceptor, start_link, [self(), Port, Module]},
  9.         permanent,
  10.         2000,
  11.         worker,
  12.         [janus_acceptor]
  13.        },
  14.        %% Topic manager
  15.        {janus_topman_sup,
  16.         {topman, start, []},
  17.         permanent,
  18.         2000,
  19.         worker,
  20.         [topman]
  21.        },
  22.        %% Client proxy mapper
  23.        {janus_proxy_mapper_sup,
  24.         {mapper, start, [client_proxy_mapper]},
  25.         permanent,
  26.         2000,
  27.         worker,
  28.         [mapper]
  29.        },
  30.        %% Client instance supervisor
  31.        {janus_transport_sup,
  32.         {supervisor, start_link, [{local, janus_transport_sup},
  33.                                   ?MODULE, [Module]]},
  34.         permanent,
  35.         infinity,
  36.         supervisor,
  37.         []
  38.        }
  39.       ]
  40.      }
  41.     };

从上面的代码可以看到这个supervisor一共会监控4个子进程,其中3个是worker,1个是supervisor。
对应的三个worker的名字分别是:

  1. janus_sup(janus_acceptor:start_link())
  2. janus_topman_sup(topman:start())
  3. janus_proxy_mapper_sup(mapper:start(client_proxy_mapper))

而唯一的supervisor是janus_transport_sup(supervisor:start_link(transport))。
后面的括号注明了子进程的启动模块和回调函数。

从上面代码的注释可以看到每个子进程都是干嘛的,我们一个个来分析,首先来看第一个janus_sup进程,这个进程调用janus_acceptor模块的start_link启动的,所以我们来看janus_acceptor这个模块。

  1. start_link(Parent, Port, Module)
  2.   when is_pid(Parent),
  3.        is_integer(Port),
  4.        is_atom(Module) ->
  5.     Args = [Parent, Port, Module],
  6.     proc_lib:start_link(?MODULE, acceptor_init, Args).

这里可以看到代码比较简单,就是调用start_link启动一个子进程,子进程的模块就是当前模块,然后回调函数是acceptor_init,参数是一个list,包含三个参数,分别是父进程id,端口号,以及module, 父进程id所指的就是的supervisor的进程id,而module是指transport模块(可以看前面janus_app模块)。
这里要注意在调用proc_lib:start_link之前,一直是处于supervisor进程中的,当start_link之后,才是启动了子进程.这里使用了proc_lib:start_link,这个函数是同步的启动一个子进程,它会一直等待,直到子进程调用init_ack,才会返回.

因此接下来我们来看acceptor_init这个函数:

acceptor_init(Parent, Port, Module) ->
    State = #state{
      parent = Parent,
      port = Port,
      module = Module
     },
    error_logger:info_msg("Listening on port ~p~n", [Port]),
    case (catch do_init(State)) of
        {ok, ListenSocket} ->
            proc_lib:init_ack(State#state.parent, {ok, self()}),
            acceptor_loop(State#state{listener = ListenSocket});
        Error ->
            proc_lib:init_ack(Parent, Error),
            error
    end.

 

这个函数可以看到就是通过调用do_init来得到监听的listen socket,然后根据返回值来做一些操作,这里可以看到不论失败,成功都会调用init_ack来返回值给父进程,当成功之后,就会调用acceptor_loop来进入后续处理.

在看acceptor_loop之前,线来看do_init方法:

  1. do_init(State) ->
  2.     Opts = [binary,
  3.             {packet, 0},
  4.             {reuseaddr, true},
  5.             {backlog, 1024},
  6.             {active, false}],
  7.     case gen_tcp:listen(State#state.port, Opts) of
  8.         {ok, ListenSocket} ->
  9.             {ok, ListenSocket};
  10.         {error, Reason} ->
  11.             throw({error, {listen, Reason}})
  12.     end.

这里调用gen_tcp的listen方法,我们着重来看传入listen的opts,这里可以看到active被设置为false,也就是每次必须主动地调用recv来读取数据。

然后来看acceptor_loop 函数,也就是server子进程的主循环函数,这个函数主要就是通过accept来接收客户端的连接,然后交给后续模块处理.

  1. acceptor_loop(State) ->
  2.     case (catch gen_tcp:accept(State#state.listener, 50000)) of
  3.         {ok, Socket} ->
  4.             handle_connection(State, Socket),
  5.             ?MODULE:acceptor_loop(State);
  6.         {error, Reason} ->
  7.             handle_error(Reason),
  8.             ?MODULE:acceptor_loop(State);
  9.         {'EXIT', Reason} ->
  10.             handle_error({'EXIT', Reason}),
  11.             ?MODULE:acceptor_loop(State)
  12.     end.

这里先暂停一下,我们先来看最后一个被supervisor监控的子进程,也就是一个子supervisor,janus_transport_sup。来看它的child spec:

  1. %% Client instance supervisor
  2.        {janus_transport_sup,
  3.         {supervisor, start_link, [{local, janus_transport_sup},
  4.                                   ?MODULE, [Module]]},
  5.         permanent,
  6.         infinity,
  7.         supervisor,
  8.         []
  9.        }

可以看到他会继续创建一个新的supervisor,然后也是当前模块(janus_app),只不过参数是一个参数,因此我们来看另外的一个init函数:


 

  1. init([Module]) ->
  2.     {ok,
  3.      {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
  4.       [
  5.        %% TCP Client
  6.        {undefined,
  7.         {Module, start_link, []},
  8.         temporary,
  9.         2000,
  10.         worker,
  11.         []
  12.        }
  13.       ]
  14.      }
  15.     }.

可以看到这个child spec,重启策略是simple_one_one,也就是需要手动重启,并且它将会创建的子进程是Module(transport模块)的start_link函数来启动.

接下来就来看transport的启动函数以及init函数,这个模块是一个gen_server.

  1. -behavior(gen_server).

  2. start_link(Port)
  3.   when is_integer(Port) ->
  4.     gen_server:start_link(?MODULE, [Port], []).


  5. init([Port]) ->
  6.     process_flag(trap_exit, true),
  7.     {ok, #state{port = Port, transport = janus_flash }}.

这里需要注意的是process_flag(trap_exit, true),这个其实也就是设置表示父进程将会接收子进程的crash信息。还有一个就是state,这里state的transport设置为了janus_flash模块.

ok,然后我们再回到janus_acceptor模块,接下来来看假设有一个连接过来之后的情况。这里跳过错误处理,就来看看正确的处理流程。

  1. handle_connection(State, Socket),
  2. ?MODULE:acceptor_loop(State);

当正确接到新的连接之后,会进入handle_connection的处理,然后调用acceptor_loop进入递归.因此我们就来看handle_connection

  1. handle_connection(State, Socket) ->
  2.     {ok, Pid} = janus_app:start_transport(State#state.port),
  3.     ok = gen_tcp:controlling_process(Socket, Pid),
  4.     %% Instruct the new handler to own the socket.
  5.     (State#state.module):set_socket(Pid, Socket).

这里做了3个操作,首先调用janus_app:start_transport来启动一个新的子进程,而这个子进程是属于那个supervisor呢,来看代码:

  1. start_transport(Port) ->
  2.     supervisor:start_child(janus_transport_sup, [Port]).

可以看到它启动了janus_transport_sup这个supervisor的子进程,而我们还记得前面分析的,这个supervisor的子进程的启动回调就是transport模块的start_link函数。这里要注意start_child返回的是子进程的pid.

  1. start_link(Port)
  2.   when is_integer(Port) ->
  3.     gen_server:start_link(?MODULE, [Port], []).

然后接下来的两个操作,就是将当前进程接受到的socket传递给新建的子进程,然后调用transport的set_socket方法。然后我们来看transport模块的set_socket方法.

  1. set_socket(Ref, Sock) ->
  2.     gen_server:cast(Ref, {set_socket, Sock}).

可以看到就是给新建的子进程发送一个set_socket的方法.这里要注意就是会设置socket的属性,也就是设置active为once。

  1. handle_cast({set_socket, Socket}, State) ->
  2.     inet:setopts(Socket, [{active, once},
  3.                           {packet, 0},
  4.                           binary]),
  5.     {ok, Keep, Ref} = (State#state.transport):start(Socket),
  6.     keep_alive_or_close(Keep, State#state{socket = Socket, state = Ref});

这里可以看到调用了state的transport的start方法,那么这个transport是那个模块呢,上面的分析中在当前transport的init方法中返回e设置的就是janus_flash模块,所以这里调用的就是janus_flash:start方法.

  1. start(Socket) ->
  2.     Send = fun(Bin) -> gen_tcp:send(Socket, [Bin, 1]) end,
  3.     {ok, Proxy, Token} = client_proxy:start(Send),
  4.     State = #state{
  5.       socket = Socket,
  6.       proxy = Proxy,
  7.       token = Token
  8.      },
  9.     JSON = {struct,
  10.             [{<<"timestamp">>, tuple_to_list(now())},
  11.              {<<"token">>, Token}
  12.             ]},
  13.     send(mochijson2:encode(JSON), State).

这里可以看到先是创建了一个send方法,然后调用client_proxy start,这里client_proxy其实是一个gen_server,因此我们来看这个模块的start方法以及 init方法.

  1. start(Send) ->
  2.     Token = common:random_token(),
  3.     {ok, Pid} = gen_server:start_link(?MODULE, [Token, self(), Send], []),
  4.     {ok, Pid, Token}.

  5. init([Token, Parent, Send]) ->
  6.     process_flag(trap_exit, true),
  7.     ok = mapper:add(client_proxy_mapper, Token),
  8.     State = #state{
  9.       token = Token,
  10.       parent = Parent,
  11.       send = Send,
  12.       messages = []
  13.      },
  14.    {ok, State}.

可以看到init方法里面调用了mapper模块的add方法,因此来看mapper:add方法

  1. add(Ref, Key) ->
  2.     gen_server:call(Ref, {add, Key, self()}).

可以看到也就是给client_proxy_mapper这个进程发送了一个同步的消息,而对应的client_proxy_mapper也就是一开始在janus_app模块中注册的进程,这个进程就是mapper模块启动的。因此来看mapper的对应同步消息接收。

  1. handle_call({add, Key, Pid}, _, State) ->
  2.     case ets:lookup(State#state.key_pid, Key) of
  3.         [_] ->
  4.             ok;
  5.         _ ->
  6.             Ref = erlang:monitor(process, Pid),
  7.             ets:insert(State#state.key_pid, {Key, {Pid, Ref}}),
  8.             ets:insert(State#state.pid_key, {Pid, Key})
  9.     end,
  10.     {reply, ok, State};

 

这里也就是将随机出来的token和进程通过ets关联。

前面这里对于数据的发送分析完了,剩下的就是连接的错误,断开处理以及数据的接收处理,线来看连接的接收处理,通过上面的分析,我们知道,accept到的socket是处于transport这个gen_server管理的,因此读取数据就在这个里面处理:

  1. handle_info({tcp, Socket, <<"", 0, Bin/binary>>}, State)
  2.   when Socket == State#state.socket ->
  3.     inet:setopts(Socket, [{active, once}]),
  4.     dispatch(Bin, janus_flash, State);

这里主要还是调用dispatch来处理数据的读取,先是调用janus_flash的process方法,然后调用keep_alive_or_close来判断是否连接已经关闭.

  1. %%% Brand new transport
  2. %%% Existing connection

  3. dispatch(Data, Mod, State = #state{transport = Mod}) ->
  4.     {ok, Keep, TS} = Mod:process(Data, State#state.state),
  5.     keep_alive_or_close(Keep, State#state{state = TS}).


某网友:个人觉得hotwheel 中代码最经典的地方就是那个客户端连接进程transport 的启动、监控、socket绑定;以及gen_tcp的once方式下的数据接收。这个模型可以用在很多地方。



文章来自:



 

 












 

 







 

阅读(1276) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~