在霸爷的推荐下,看了hotwheels的代码,接下来我就来分析下hotwheels的代码(主要是server端代码),hotwheels是干吗的呢,介绍在这里:
- Janus is a messaging server optimized to unicast over TCP to thousands of clients subscribed to topics of interest.
- The ultimate goal is to maintain a latency of less than 2 seconds for 20 thousand clients on Amazon EC2 (small instance).
首先来看janus.app:
- {application, janus,
- [{description, "Janus"},
- {vsn, "0.0.1"},
- {id, "janus"},
- {modules, [barrier,
- bin,
- bot,
- client_proxy,
- common,
- flashbot,
- histo,
- janus,
- janus_acceptor,
- janus_admin,
- janus_app,
- janus_flash,
- launcher,
- mapper,
- pubsub,
- topman,
- t,
- transport,
- util
- ]},
- {registered, [janus_sup,
- janus_topman_sup,
- janus_proxy_mapper_sup,
- janus_transport_sup,
- janus_listener]},
- {applications, [kernel,
- stdlib,
- mnesia,
- inets
- ]},
- {mod, {janus_app, []}},
- {env, []}
- ]
- }.
具体每个域的意思这里就不介绍了,详细可以去看erlang的文档
我们主要来看mod这个tuple,可以看到回调模块是janus_app,所以我们就从janus_app开始。
通过模块定义我们可以清楚的看到这个模块是一个application:
- -module(janus_app).
- -behaviour(application).
因此我们来看它的start函数:
- -define(LISTEN_PORT, 8081).
- start(_Type, _Args) ->
- Port = janus_admin:get_env(listen_port, ?LISTEN_PORT),
- supervisor:start_link({local, ?MODULE},
- ?MODULE,
- [Port, transport]).
这里可以看到首先会从环境变量里面取得端口(命令行参数),而默认的port是8001,然后调用supervisor start_link函数,这个函数会启动创建并启动一个supervisor,这里可以看到回调模块是当前模块,因此我们接下来就来看当前模块的init函数.
init返回的child spec的格式我就不介绍了,可以去看erlang的手册
- %% Supervisor behaviour callbacks
- init([Port, Module]) ->
- {ok,
- {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
- [
- %% TCP server
- {janus_sup,
- {janus_acceptor, start_link, [self(), Port, Module]},
- permanent,
- 2000,
- worker,
- [janus_acceptor]
- },
- %% Topic manager
- {janus_topman_sup,
- {topman, start, []},
- permanent,
- 2000,
- worker,
- [topman]
- },
- %% Client proxy mapper
- {janus_proxy_mapper_sup,
- {mapper, start, [client_proxy_mapper]},
- permanent,
- 2000,
- worker,
- [mapper]
- },
- %% Client instance supervisor
- {janus_transport_sup,
- {supervisor, start_link, [{local, janus_transport_sup},
- ?MODULE, [Module]]},
- permanent,
- infinity,
- supervisor,
- []
- }
- ]
- }
- };
从上面的代码可以看到这个supervisor一共会监控4个子进程,其中3个是worker,1个是supervisor。
对应的三个worker的名字分别是:
- janus_sup(janus_acceptor:start_link())
- janus_topman_sup(topman:start())
- janus_proxy_mapper_sup(mapper:start(client_proxy_mapper))
而唯一的supervisor是janus_transport_sup(supervisor:start_link(transport))。
后面的括号注明了子进程的启动模块和回调函数。
从上面代码的注释可以看到每个子进程都是干嘛的,我们一个个来分析,首先来看第一个janus_sup进程,这个进程调用janus_acceptor模块的start_link启动的,所以我们来看janus_acceptor这个模块。
- start_link(Parent, Port, Module)
- when is_pid(Parent),
- is_integer(Port),
- is_atom(Module) ->
- Args = [Parent, Port, Module],
- proc_lib:start_link(?MODULE, acceptor_init, Args).
这里可以看到代码比较简单,就是调用start_link启动一个子进程,子进程的模块就是当前模块,然后回调函数是acceptor_init,参数是一个list,包含三个参数,分别是父进程id,端口号,以及module, 父进程id所指的就是的supervisor的进程id,而module是指transport模块(可以看前面janus_app模块)。
这里要注意在调用proc_lib:start_link之前,一直是处于supervisor进程中的,当start_link之后,才是启动了子进程.这里使用了proc_lib:start_link,这个函数是同步的启动一个子进程,它会一直等待,直到子进程调用init_ack,才会返回.
因此接下来我们来看acceptor_init这个函数:
acceptor_init(Parent, Port, Module) ->
State = #state{
parent = Parent,
port = Port,
module = Module
},
error_logger:info_msg("Listening on port ~p~n", [Port]),
case (catch do_init(State)) of
{ok, ListenSocket} ->
proc_lib:init_ack(State#state.parent, {ok, self()}),
acceptor_loop(State#state{listener = ListenSocket});
Error ->
proc_lib:init_ack(Parent, Error),
error
end.
这个函数可以看到就是通过调用do_init来得到监听的listen socket,然后根据返回值来做一些操作,这里可以看到不论失败,成功都会调用init_ack来返回值给父进程,当成功之后,就会调用acceptor_loop来进入后续处理.
在看acceptor_loop之前,线来看do_init方法:
- do_init(State) ->
- Opts = [binary,
- {packet, 0},
- {reuseaddr, true},
- {backlog, 1024},
- {active, false}],
- case gen_tcp:listen(State#state.port, Opts) of
- {ok, ListenSocket} ->
- {ok, ListenSocket};
- {error, Reason} ->
- throw({error, {listen, Reason}})
- end.
这里调用gen_tcp的listen方法,我们着重来看传入listen的opts,这里可以看到active被设置为false,也就是每次必须主动地调用recv来读取数据。
然后来看acceptor_loop 函数,也就是server子进程的主循环函数,这个函数主要就是通过accept来接收客户端的连接,然后交给后续模块处理.
- acceptor_loop(State) ->
- case (catch gen_tcp:accept(State#state.listener, 50000)) of
- {ok, Socket} ->
- handle_connection(State, Socket),
- ?MODULE:acceptor_loop(State);
- {error, Reason} ->
- handle_error(Reason),
- ?MODULE:acceptor_loop(State);
- {'EXIT', Reason} ->
- handle_error({'EXIT', Reason}),
- ?MODULE:acceptor_loop(State)
- end.
这里先暂停一下,我们先来看最后一个被supervisor监控的子进程,也就是一个子supervisor,janus_transport_sup。来看它的child spec:
- %% Client instance supervisor
- {janus_transport_sup,
- {supervisor, start_link, [{local, janus_transport_sup},
- ?MODULE, [Module]]},
- permanent,
- infinity,
- supervisor,
- []
- }
可以看到他会继续创建一个新的supervisor,然后也是当前模块(janus_app),只不过参数是一个参数,因此我们来看另外的一个init函数:
- init([Module]) ->
- {ok,
- {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
- [
- %% TCP Client
- {undefined,
- {Module, start_link, []},
- temporary,
- 2000,
- worker,
- []
- }
- ]
- }
- }.
可以看到这个child spec,重启策略是simple_one_one,也就是需要手动重启,并且它将会创建的子进程是Module(transport模块)的start_link函数来启动.
接下来就来看transport的启动函数以及init函数,这个模块是一个gen_server.
- -behavior(gen_server).
- start_link(Port)
- when is_integer(Port) ->
- gen_server:start_link(?MODULE, [Port], []).
- init([Port]) ->
- process_flag(trap_exit, true),
- {ok, #state{port = Port, transport = janus_flash }}.
这里需要注意的是process_flag(trap_exit, true),这个其实也就是设置表示父进程将会接收子进程的crash信息。还有一个就是state,这里state的transport设置为了janus_flash模块.
ok,然后我们再回到janus_acceptor模块,接下来来看假设有一个连接过来之后的情况。这里跳过错误处理,就来看看正确的处理流程。
- handle_connection(State, Socket),
- ?MODULE:acceptor_loop(State);
当正确接到新的连接之后,会进入handle_connection的处理,然后调用acceptor_loop进入递归.因此我们就来看handle_connection
- handle_connection(State, Socket) ->
- {ok, Pid} = janus_app:start_transport(State#state.port),
- ok = gen_tcp:controlling_process(Socket, Pid),
- %% Instruct the new handler to own the socket.
- (State#state.module):set_socket(Pid, Socket).
这里做了3个操作,首先调用janus_app:start_transport来启动一个新的子进程,而这个子进程是属于那个supervisor呢,来看代码:
- start_transport(Port) ->
- supervisor:start_child(janus_transport_sup, [Port]).
可以看到它启动了janus_transport_sup这个supervisor的子进程,而我们还记得前面分析的,这个supervisor的子进程的启动回调就是transport模块的start_link函数。这里要注意start_child返回的是子进程的pid.
- start_link(Port)
- when is_integer(Port) ->
- gen_server:start_link(?MODULE, [Port], []).
然后接下来的两个操作,就是将当前进程接受到的socket传递给新建的子进程,然后调用transport的set_socket方法。然后我们来看transport模块的set_socket方法.
- set_socket(Ref, Sock) ->
- gen_server:cast(Ref, {set_socket, Sock}).
可以看到就是给新建的子进程发送一个set_socket的方法.这里要注意就是会设置socket的属性,也就是设置active为once。
- handle_cast({set_socket, Socket}, State) ->
- inet:setopts(Socket, [{active, once},
- {packet, 0},
- binary]),
- {ok, Keep, Ref} = (State#state.transport):start(Socket),
- keep_alive_or_close(Keep, State#state{socket = Socket, state = Ref});
这里可以看到调用了state的transport的start方法,那么这个transport是那个模块呢,上面的分析中在当前transport的init方法中返回e设置的就是janus_flash模块,所以这里调用的就是janus_flash:start方法.
- start(Socket) ->
- Send = fun(Bin) -> gen_tcp:send(Socket, [Bin, 1]) end,
- {ok, Proxy, Token} = client_proxy:start(Send),
- State = #state{
- socket = Socket,
- proxy = Proxy,
- token = Token
- },
- JSON = {struct,
- [{<<"timestamp">>, tuple_to_list(now())},
- {<<"token">>, Token}
- ]},
- send(mochijson2:encode(JSON), State).
这里可以看到先是创建了一个send方法,然后调用client_proxy start,这里client_proxy其实是一个gen_server,因此我们来看这个模块的start方法以及 init方法.
- start(Send) ->
- Token = common:random_token(),
- {ok, Pid} = gen_server:start_link(?MODULE, [Token, self(), Send], []),
- {ok, Pid, Token}.
- init([Token, Parent, Send]) ->
- process_flag(trap_exit, true),
- ok = mapper:add(client_proxy_mapper, Token),
- State = #state{
- token = Token,
- parent = Parent,
- send = Send,
- messages = []
- },
- {ok, State}.
可以看到init方法里面调用了mapper模块的add方法,因此来看mapper:add方法
- add(Ref, Key) ->
- gen_server:call(Ref, {add, Key, self()}).
可以看到也就是给client_proxy_mapper这个进程发送了一个同步的消息,而对应的client_proxy_mapper也就是一开始在janus_app模块中注册的进程,这个进程就是mapper模块启动的。因此来看mapper的对应同步消息接收。
- handle_call({add, Key, Pid}, _, State) ->
- case ets:lookup(State#state.key_pid, Key) of
- [_] ->
- ok;
- _ ->
- Ref = erlang:monitor(process, Pid),
- ets:insert(State#state.key_pid, {Key, {Pid, Ref}}),
- ets:insert(State#state.pid_key, {Pid, Key})
- end,
- {reply, ok, State};
这里也就是将随机出来的token和进程通过ets关联。
前面这里对于数据的发送分析完了,剩下的就是连接的错误,断开处理以及数据的接收处理,线来看连接的接收处理,通过上面的分析,我们知道,accept到的socket是处于transport这个gen_server管理的,因此读取数据就在这个里面处理:
- handle_info({tcp, Socket, <<"", 0, Bin/binary>>}, State)
- when Socket == State#state.socket ->
- inet:setopts(Socket, [{active, once}]),
- dispatch(Bin, janus_flash, State);
这里主要还是调用dispatch来处理数据的读取,先是调用janus_flash的process方法,然后调用keep_alive_or_close来判断是否连接已经关闭.
- %%% Brand new transport
- %%% Existing connection
- dispatch(Data, Mod, State = #state{transport = Mod}) ->
- {ok, Keep, TS} = Mod:process(Data, State#state.state),
- keep_alive_or_close(Keep, State#state{state = TS}).
某网友:个人觉得hotwheel 中代码最经典的地方就是那个客户端连接进程transport 的启动、监控、socket绑定;以及gen_tcp的once方式下的数据接收。这个模型可以用在很多地方。
文章来自: