套接字模式
主动模式(选项{active, true})一般让人很喜欢,非阻塞消息接收,但在系统无法应对超大流量请求时,客户端发送的数据快过服务器可以处理的速度,那么系统就可能会造成消息缓冲区被塞满,可能出现持续繁忙的流量的极端情况下,系统因请求而溢出,虚拟机造成内存不足的风险而崩溃。
使用被动模式(选项{active, false})的套接字,底层的TCP缓冲区可用于抑制请求,并拒绝客户端的消息,在接收数据的地方都会调用gen_tcp:recv,造成阻塞(单进程模式下就只能消极等待某一个具体的客户端套接字,很危险)。需要注意的是,操作系统可能还会做一些缓存允许客户端机器继续发送少量数据,然后才会将其阻塞,此时Erlang尚未调用recv函数。
混合型模式(半阻塞),使用选项{active, once}打开,主动仅针对一个消息,在控制进程发送完一个数据消息后,必须显示调用inet:setopts(Socket, [{active, once}])重新激活以便接受下一个消息(在此之前,系统处于阻塞状态)。可见,混合型模式综合了主动模式和被动模式的两者优势,可实现流量控制,防止服务器被过多消息淹没。
以下TCP Server代码,都是建立在混合型模式(半阻塞)基础上。
prim_inet相关说明
prim_inet没有官方文档,可以认为是对底层socket的直接包装。淘宝说,这是otp内部实现的细节 是针对Erlang库开发者的private module,底层模块,不推荐使用。但在示范中演示了prim_inet操作Socket异步特性。
设计模式
一般来说,需要一个单独进程进行客户端套接字监听,每一个子进程进行处理来自具体客户端的socket请求。
在示范中,子进程使用gen_fsm处理,很巧妙的结合状态机和消息事件,值得学习。
在文章中,作者也是使用此模式,但子进程不符合OTP规范,因此个人认为不是一个很好的实践模式。
simple_one_for_one
简易的一对一监督进程,用来创建一组动态子进程。对于需要并发处理多个请求的服务器较为合适。比如socket 服务端接受新的客户端连接请求以后,需要动态创建一个新的socket连接处理子进程。若遵守OTP原则,那就是子监督进程。
TCP Server实现
基于标准API简单实现
也是基于{active, once}模式,但阻塞的等待下一个客户端连接的任务被抛给了子监督进程。
看一下入口tcp_server_app吧
-
-module(tcp_server_app).
-
-author('yongboy@gmail.com').
-
-behaviour(application).
-
-export([start/2, stop/1]).
-
-define(DEF_PORT, 2222).
-
-
start(_Type, _Args) ->
-
Opts = [binary, {packet, 2}, {reuseaddr, true},
-
{keepalive, true}, {backlog, 30}, {active, false}],
-
ListenPort = get_app_env(listen_port, ?DEF_PORT),
-
{ok, LSock} = gen_tcp:listen(ListenPort, Opts),
-
case tcp_server_sup:start_link(LSock) of
-
{ok, Pid} ->
-
tcp_server_sup:start_child(),
-
{ok, Pid};
-
Other ->
-
{error, Other}
-
end.
-
-
stop(_S) ->
-
ok.
-
-
get_app_env(Opt, Default) ->
-
case application:get_env(application:get_application(), Opt) of
-
{ok, Val} -> Val;
-
_ ->
-
case init:get_argument(Opt) of
-
[[Val | _]] -> Val;
-
error -> Default
-
end
-
end.
读取端口,然后启动主监督进程(此时还不会监听处理客户端socket请求),紧接着启动子监督进程,开始处理来自客户端的socket的连接。
监督进程tcp_server_sup也很简单:
-
-module(tcp_server_sup).
-
-author('yongboy@gmail.com').
-
-behaviour(supervisor).
-
-export([start_link/1, start_child/0]).
-
-export([init/1]).
-
-define(SERVER, ?MODULE).
-
-
start_link(LSock) ->
-
supervisor:start_link({local, ?SERVER}, ?MODULE, [LSock]).
-
-
start_child() ->
-
supervisor:start_child(?SERVER, []).
-
-
init([LSock]) ->
-
Server = {tcp_server_handler, {tcp_server_handler, start_link, [LSock]},
-
temporary, brutal_kill, worker, [tcp_server_handler]},
-
Children = [Server],
-
RestartStrategy = {simple_one_for_one, 0, 1},
-
{ok, {RestartStrategy, Children}}.
需要注意的是,只有调用start_child函数时,才真正调用tcp_server_handler:start_link([LSock])函数。
tcp_server_handler的代码也不复杂:
-
-module(tcp_server_handler).
-
-behaviour(gen_server).
-
-export([start_link/1]).
-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-
terminate/2, code_change/3]).
-
-record(state, {lsock, socket, addr}).
-
-
start_link(LSock) ->
-
gen_server:start_link(?MODULE, [LSock], []).
-
-
init([Socket]) ->
-
inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
-
{ok, #state{lsock = Socket}, 0}.
-
-
handle_call(Msg, _From, State) ->
-
{reply, {ok, Msg}, State}.
-
-
handle_cast(stop, State) ->
-
{stop, normal, State}.
-
-
handle_info({tcp, Socket, Data}, State) ->
-
inet:setopts(Socket, [{active, once}]),
-
io:format("~p got message ~p\n", [self(), Data]),
-
ok = gen_tcp:send(Socket, <<"Echo back : ", Data/binary>>),
-
{noreply, State};
-
-
handle_info({tcp_closed, Socket}, #state{addr=Addr} = StateData) ->
-
error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
-
{stop, normal, StateData};
-
-
handle_info(timeout, #state{lsock = LSock} = State) ->
-
{ok, ClientSocket} = gen_tcp:accept(LSock),
-
{ok, {IP, _Port}} = inet:peername(ClientSocket),
-
tcp_server_sup:start_child(),
-
{noreply, State#state{socket=ClientSocket, addr=IP}};
-
-
handle_info(_Info, StateData) ->
-
{noreply, StateData}.
-
-
terminate(_Reason, #state{socket=Socket}) ->
-
(catch gen_tcp:close(Socket)),
-
ok.
-
-
code_change(_OldVsn, State, _Extra) ->
-
{ok, State}.
代码很精巧,有些小技巧在里面。子监督进程调用start_link函数,init会返回{ok, #state{lsock = Socket}, 0}. 数字0代表了timeout数值,意味着gen_server马上调用handle_info(timeout, #state{lsock = LSock} = State)函数,执行客户端socket监听,阻塞于此,但不会影响在此模式下其它函数的调用。直到有客户端进来,然后启动一个新的子监督进程tcp_server_handler,当前子监督进程解除阻塞。
基于prim_inet实现
这个实现师从于Non-blocking TCP server using OTP principles一文,但子进程改为了gen_server实现。
看一看入口,很简单的:
-
-module(tcp_server_app).
-
-author('yongboy@gmail.com').
-
-behaviour(application).
-
-export([start_client/1]).
-
-export([start/2, stop/1]).
-
-define(DEF_PORT, 2222).
-
-
%% A startup function for spawning new client connection handling FSM.
-
%% To be called by the TCP listener process.
-
start_client(Socket) ->
-
tcp_server_sup:start_child(Socket).
-
-
start(_Type, _Args) ->
-
ListenPort = get_app_env(listen_port, ?DEF_PORT),
-
tcp_server_sup:start_link(ListenPort, tcp_client_handler).
-
-
stop(_S) ->
-
ok.
-
-
get_app_env(Opt, Default) ->
-
case application:get_env(application:get_application(), Opt) of
-
{ok, Val} -> Val;
-
_ ->
-
case init:get_argument(Opt) of
-
[[Val | _]] -> Val;
-
error -> Default
-
end
-
end.
监督进程代码:
-
-module(tcp_server_sup).
-
-author('yongboy@gmail.com').
-
-behaviour(supervisor).
-
-export([start_child/1, start_link/2, init/1]).
-
-define(SERVER, ?MODULE).
-
-define(CLIENT_SUP, tcp_client_sup).
-
-define(MAX_RESTART, 5).
-
-define(MAX_TIME, 60).
-
-
start_child(Socket) ->
-
supervisor:start_child(?CLIENT_SUP, [Socket]).
-
-
start_link(ListenPort, HandleMoudle) ->
-
supervisor:start_link({local, ?SERVER}, ?MODULE, [ListenPort, HandleMoudle]).
-
-
init([Port, Module]) ->
-
TcpListener = {tcp_server_sup, % Id = internal id
-
{tcp_listener, start_link, [Port, Module]}, % StartFun = {M, F, A}
-
permanent, % Restart = permanent | transient | temporary
-
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
-
worker, % Type = worker | supervisor
-
[tcp_listener] % Modules = [Module] | dynamic
-
},
-
TcpClientSupervisor = {?CLIENT_SUP,
-
{supervisor, start_link, [{local, ?CLIENT_SUP}, ?MODULE, [Module]]},
-
permanent,
-
infinity,
-
supervisor,
-
[]
-
},
-
{ok,
-
{{one_for_one, ?MAX_RESTART, ?MAX_TIME},
-
[TcpListener, TcpClientSupervisor]
-
}
-
};
-
-
init([Module]) ->
-
{ok,
-
{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
-
[
-
% TCP Client
-
{ undefined, % Id = internal id
-
{Module, start_link, []}, % StartFun = {M, F, A}
-
temporary, % Restart = permanent | transient | temporary
-
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
-
worker, % Type = worker | supervisor
-
[] % Modules = [Module] | dynamic
-
}
-
]
-
}
-
}.
策略不一样,one_for_one包括了一个监听进程tcp_listener,还包含了一个tcp_client_sup进程树(simple_one_for_one策略)
tcp_listener单独一个进程用于监听来自客户端socket的连接:
-module(tcp_listener).
-author('saleyn@gmail.com').
-behaviour(gen_server).
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {
listener, % Listening socket
acceptor, % Asynchronous acceptor's internal reference
module % FSM handling module
}).
start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).
init([Port, Module]) ->
process_flag(trap_exit, true),
Opts = [binary, {packet, 2}, {reuseaddr, true},
{keepalive, true}, {backlog, 30}, {active, false}],
case gen_tcp:listen(Port, Opts) of
{ok, Listen_socket} ->
%%Create first accepting process
{ok, Ref} = prim_inet:async_accept(Listen_socket, -1),
{ok, #state{listener = Listen_socket,
acceptor = Ref,
module = Module}};
{error, Reason} ->
{stop, Reason}
end.
handle_call(Request, _From, State) ->
{stop, {unknown_call, Request}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
#state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
try
case set_sockopt(ListSock, CliSocket) of
ok -> ok;
{error, Reason} -> exit({set_sockopt, Reason})
end,
%% New client connected - spawn a new process using the simple_one_for_one
%% supervisor.
{ok, Pid} = tcp_server_app:start_client(CliSocket),
gen_tcp:controlling_process(CliSocket, Pid),
%% Signal the network driver that we are ready to accept another connection
case prim_inet:async_accept(ListSock, -1) of
{ok, NewRef} -> ok;
{error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
end,
{noreply, State#state{acceptor=NewRef}}
catch exit:Why ->
error_logger:error_msg("Error in async accept: ~p.\n", [Why]),
{stop, Why, State}
end;
handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),
{stop, Error, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
gen_tcp:close(State#state.listener),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Taken from prim_inet. We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
true = inet_db:register_socket(CliSocket, inet_tcp),
case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
{ok, Opts} ->
case prim_inet:setopts(CliSocket, Opts) of
ok -> ok;
Error -> gen_tcp:close(CliSocket), Error
end;
Error ->
gen_tcp:close(CliSocket), Error
end.
很显然,接收客户端的连接之后,转交给tcp_client_handler模块进行处理:
-
-module(tcp_client_handler).
-
-behaviour(gen_server).
-
-export([start_link/1]).
-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-
-record(state, {socket, addr}).
-
-define(TIMEOUT, 120000).
-
-
start_link(Socket) ->
-
gen_server:start_link(?MODULE, [Socket], []).
-
-
init([Socket]) ->
-
inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
-
{ok, {IP, _Port}} = inet:peername(Socket),
-
{ok, #state{socket=Socket, addr=IP}}.
-
-
handle_call(Request, From, State) ->
-
{noreply, ok, State}.
-
-
handle_cast(Msg, State) ->
-
{noreply, State}.
-
-
handle_info({tcp, Socket, Data}, State) ->
-
inet:setopts(Socket, [{active, once}]),
-
io:format("~p got message ~p\n", [self(), Data]),
-
ok = gen_tcp:send(Socket, <<"Echo back : ", Data/binary>>),
-
{noreply, State};
-
-
handle_info({tcp_closed, Socket}, #state{addr=Addr} = StateData) ->
-
error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
-
{stop, normal, StateData};
-
-
handle_info(_Info, StateData) ->
-
{noreply, StateData}.
-
-
terminate(_Reason, #state{socket=Socket}) ->
-
(catch gen_tcp:close(Socket)),
-
ok.
-
-
code_change(OldVsn, State, Extra) ->
-
{ok, State}.
和标准API对比一下,可以感受到异步IO的好处。
小结
通过不同的模式,简单实现一个基于Erlang OTP的TCP服务器,也是学习总结,不至于忘记。
您若有更好的建议,欢迎告知,谢谢。
参考资料
-
-
-
《Erlang程序设计》
-
《Erlang/OTP并发编程实战》
原文链接:
http://www.blogjava.net/yongboy/archive/2012/10/24/390185.html