Erlang messaging server optimized to send 1 message to 40k subscribers to a topic in < 1s —
下载地址:
启动过程:
make run1
run1在MakeFile中的定义:
- run1: compile
- erl $(LOCAL_OPTS) -name $(NODE) -s janus start
janus.erl内容:
- start() ->
- start([]).
- start([]) ->
- start(8081);
- start([Port])
- when is_atom(Port) ->
- start(list_to_integer(atom_to_list(Port)));
- start(Port)
- when is_integer(Port) ->
- inets:start(),
- application:set_env(janus, listen_port, Port),
- application:start(janus).
调用了application:start/1 方法,这样会启动一个erlang 应用,应用程序的定义在同名的janus.app中:
直接看janus.app中最后的回调模块部分:
系统会调用janus_app:start/2
所以查看janus_app.erl中的代码:
- start(_Type, _Args) ->
- Port = janus_admin:get_env(listen_port, ?LISTEN_PORT),
- supervisor:start_link({local, ?MODULE},
- ?MODULE,
- [Port, transport]).
获取了服务器端监听的端口,然后启动了一个Supervisor,传入的参数为[ Port,transport ]
所以看init/2 这个初始化方法:
启动了一系列子进程,先看第一个:
- %% TCP server
- {janus_sup, %% ID名
- {janus_acceptor, start_link, [self(), Port, Module]}, %%启动时调用的方法janus_acceptor:start_link/3 传入参数[self(),监听的端口,transport]
- permanent,
- 2000,
- worker,
- [janus_acceptor]
- },
看这个启动方法:
- start_link(Parent, Port, Module)
- when is_pid(Parent),
- is_list(Port),
- is_atom(Module) ->
- start_link(Parent, list_to_integer(Port), Module);
- start_link(Parent, Port, Module)
- when is_pid(Parent),
- is_integer(Port),
- is_atom(Module) ->
- Args = [Parent, Port, Module],
- proc_lib:start_link(?MODULE, acceptor_init, Args).
调用了acceptor_init/3 函数:
- acceptor_init(Parent, Port, Module) ->
- State = #state{
- parent = Parent,
- port = Port,
- module = Module
- },
- error_logger:info_msg("Listening on port ~p~n", [Port]),
- case (catch do_init(State)) of
- {ok, ListenSocket} ->
- proc_lib:init_ack(State#state.parent, {ok, self()}),
- acceptor_loop(State#state{listener = ListenSocket});
- Error ->
- proc_lib:init_ack(Parent, Error),
- error
- end.
在do_init/1 中监听了端口,监听成功后再调用acceptor_loop/1 循环处理接入的客户端请求:
- acceptor_loop(State) ->
- case (catch gen_tcp:accept(State#state.listener, 50000)) of
- {ok, Socket} ->
- handle_connection(State, Socket),
- ?MODULE:acceptor_loop(State);
- {error, Reason} ->
- handle_error(Reason),
- ?MODULE:acceptor_loop(State);
- {'EXIT', Reason} ->
- handle_error({'EXIT', Reason}),
- ?MODULE:acceptor_loop(State)
- end.
一旦有新的请求来到,则调用handle_connection 进行处理。
- handle_connection(State, Socket) ->
- {ok, Pid} = janus_app:start_transport(State#state.port),
- ok = gen_tcp:controlling_process(Socket, Pid),
- %% Instruct the new handler to own the socket.
- (State#state.module):set_socket(Pid, Socket).
janus_app:start_transport/1的定义:
- start_transport(Port) ->
- supervisor:start_child(janus_transport_sup, [Port]).
这里的janus_transport_sup是主进程监控树下的一个子进程,而这个进程也是个监督树。start_child会在这个监督树下添加子进程。
----------------------------------------------------------------------------------------------暂停---------------------------------------------------------------------------------------------------------------
再来看janus_app.erl中定义的ID 为janus_transport_sup 这个子进程(类型为监督树)。
- %% Client instance supervisor
- {janus_transport_sup,
- {supervisor, start_link, [{local, janus_transport_sup},
- ?MODULE, [Module]]},
- permanent,
- infinity,
- supervisor,
- []
- }
supervisor:start_link/3 ,再看传入的参数: [{local,janus_transport_sup},?MODULE,[Module]] ,第二个参数值为janus_app模块本身,第三个参数的值为transport
所以就是再次以自己为模板使用transport为参数来初始化一个监控树,调用的为init/1:
- init([Module]) ->
- {ok,
- {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
- [
- %% TCP Client
- {undefined,
- {Module, start_link, []},
- temporary,
- 2000,
- worker,
- []
- }
- ]
- }
- }.
以上为这个supervisor的子进程的定义:一个transport的gen_server子进程。这个子进程的启动方法为start_link,
- start_link(Port)
- when is_integer(Port) ->
- gen_server:start_link(?MODULE, [Port], []).
---------------------------------------------------------------------------------------继续---------------------------------------------------------------------------------------------------------------------
目前的监控树结构如下:
刚才janus_sup 监听到了客户端的链接,并调用了janus_transport_sup:start_child/1
下面建立一个子进程,这个子进程启动时调用的是transport:start_link。所以服务器每接收到一个client 的连接请求就会在janus_transport_sup下添加相应的子进程。
再回到janus_acceptor:handle_connection/2,我们一但在janus_transport_sup下启动了子进程就会返回相应的PID。
然后调用gen_tcp:controlling_process/2 将这个客户端相应的socket 指定地绑定到此PID上:
- ok = gen_tcp:controlling_process(Socket, Pid),
下一句:
- (State#state.module):set_socket(Pid, Socket).
发送了一个异步的请求给刚才生成的进程,由transport:handle_cast 处理:
- handle_cast({set_socket, Socket}, State) ->
- inet:setopts(Socket, [{active, once},
- {packet, 0},
- binary]),
- {ok, Keep, Ref} = (State#state.transport):start(Socket),
- keep_alive_or_close(Keep, State#state{socket = Socket, state = Ref});
这里需要注意(State#state.transport)这个record中的transport 值是在建立子进程时定义的:
- init([Port]) ->
- process_flag(trap_exit, true),
- {ok, #state{port = Port, transport = janus_flash }}.
所以(State#state.transport):start(Socket) 调用的是janus_flash:start/1:
- start(Socket) ->
- Send = fun(Bin) -> gen_tcp:send(Socket, [Bin, 1]) end,
- {ok, Proxy, Token} = client_proxy:start(Send),
- State = #state{
- socket = Socket,
- proxy = Proxy,
- token = Token
- },
- JSON = {struct,
- [{<<"timestamp">>, tuple_to_list(now())},
- {<<"token">>, Token}
- ]},
- send(mochijson2:encode(JSON), State).
这里调用client_proxy:start将 gen_tcp:send 函数包装成了一个gen_server 所以每一个transport 就附带了一个专门用来发送数据的gen_server。
在keep_alive_or_close里判断了当前连接是否需要关闭后整个客户端连接的初始化算是完成了。
TOPIC发布消息的处理:
每个socket 客户端连接都经过 gen_tcp:controlling_process 被绑定到一个gen_server 类型的进程:transport 。
所有发送到socket 的消息都由 transport:handle_info处理(验证)。
- handle_info(Info, State)
- when State#state.transport /= undefined ->
- Mod = State#state.transport,
- {ok, Keep, TS} = Mod:process(Info, State#state.state),
- keep_alive_or_close(Keep, State#state{state = TS});
以上代码中的Mod为janus_flash,所以具体的消息处理由janus_flash:process处理。
- process({ok, <<"PUBLISH">>, Rest}, State) ->
- JSON = {struct, [{<<"topic">>, Topic},
- {<<"event">>, _},
- {<<"message_id">>, _},
- {<<"data">>, _}
- ]} = mochijson2:decode(Rest),
- topman:publish(JSON, Topic),
- {ok, shutdown, State};
将客户端发送过来的二进制数据解析为一个JSON 记录,然后交由topman:publish/2 来处理。
- publish(Msg, Topic)
- when is_binary(Topic) ->
- gen_server:abcast(?MODULE, {publish, Msg, Topic});
gen_server:abcast/2 会向本地指定的gen_server 发送一个请求,由相应gen_server 的handle_cast/2 函数进行处理。(gen_server:cast 和 gen_server:abcast区别)
- handle_cast({publish, Msg, Topic}, State) ->
- {Srv, State1} = ensure_server(Topic, State),
- pubsub:publish(Srv, Msg),
- {noreply, State1};
这里先判断相应TOPIC的服务是否存在,不存在则新建一个。
原博客文章地址:
http://blog.csdn.net/yjl49/article/details/6934083