Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5096454
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Erlang

2013-03-24 16:36:16

套接字模式

主动模式(选项{active, true})一般让人很喜欢,非阻塞消息接收,但在系统无法应对超大流量请求时,客户端发送的数据快过服务器可以处理的速度,那么系统就可能会造成消息缓冲区被塞满,可能出现持续繁忙的流量的极端情况下,系统因请求而溢出,虚拟机造成内存不足的风险而崩溃。

使用被动模式(选项{active, false})的套接字,底层的TCP缓冲区可用于抑制请求,并拒绝客户端的消息,在接收数据的地方都会调用gen_tcp:recv,造成阻塞(单进程模式下就只能消极等待某一个具体的客户端套接字,很危险)。需要注意的是,操作系统可能还会做一些缓存允许客户端机器继续发送少量数据,然后才会将其阻塞,此时Erlang尚未调用recv函数。

混合型模式(半阻塞),使用选项{active, once}打开,主动仅针对一个消息,在控制进程发送完一个数据消息后,必须显示调用inet:setopts(Socket, [{active, once}])重新激活以便接受下一个消息(在此之前,系统处于阻塞状态)。可见,混合型模式综合了主动模式和被动模式的两者优势,可实现流量控制,防止服务器被过多消息淹没。

以下TCP Server代码,都是建立在混合型模式(半阻塞)基础上。

prim_inet相关说明

prim_inet没有官方文档,可以认为是对底层socket的直接包装。淘宝说,这是otp内部实现的细节 是针对Erlang库开发者的private module,底层模块,不推荐使用。但在示范中演示了prim_inet操作Socket异步特性。

设计模式

一般来说,需要一个单独进程进行客户端套接字监听,每一个子进程进行处理来自具体客户端的socket请求。

在示范中,子进程使用gen_fsm处理,很巧妙的结合状态机和消息事件,值得学习。

在文章中,作者也是使用此模式,但子进程不符合OTP规范,因此个人认为不是一个很好的实践模式。

simple_one_for_one

简易的一对一监督进程,用来创建一组动态子进程。对于需要并发处理多个请求的服务器较为合适。比如socket 服务端接受新的客户端连接请求以后,需要动态创建一个新的socket连接处理子进程。若遵守OTP原则,那就是子监督进程。

TCP Server实现 

基于标准API简单实现

也是基于{active, once}模式,但阻塞的等待下一个客户端连接的任务被抛给了子监督进程。

看一下入口tcp_server_app吧

  1. -module(tcp_server_app).
  2. -author('yongboy@gmail.com').
  3. -behaviour(application).
  4. -export([start/2, stop/1]).
  5. -define(DEF_PORT, 2222).

  6. start(_Type, _Args) ->
  7.     Opts = [binary, {packet, 2}, {reuseaddr, true},
  8.             {keepalive, true}, {backlog, 30}, {active, false}],
  9.     ListenPort = get_app_env(listen_port, ?DEF_PORT),
  10.     {ok, LSock} = gen_tcp:listen(ListenPort, Opts),
  11.     case tcp_server_sup:start_link(LSock) of
  12.         {ok, Pid} ->
  13.             tcp_server_sup:start_child(),
  14.             {ok, Pid};
  15.         Other ->
  16.             {error, Other}
  17.     end.

  18. stop(_S) ->
  19.     ok.

  20. get_app_env(Opt, Default) ->
  21.     case application:get_env(application:get_application(), Opt) of
  22.      {ok, Val} -> Val;
  23.      _ ->
  24.      case init:get_argument(Opt) of
  25.          [[Val | _]] -> Val;
  26.          error -> Default
  27.      end
  28.     end.


读取端口,然后启动主监督进程(此时还不会监听处理客户端socket请求),紧接着启动子监督进程,开始处理来自客户端的socket的连接。

监督进程tcp_server_sup也很简单:


  1. -module(tcp_server_sup).
  2. -author('yongboy@gmail.com').
  3. -behaviour(supervisor).
  4. -export([start_link/1, start_child/0]).
  5. -export([init/1]).
  6. -define(SERVER, ?MODULE).

  7. start_link(LSock) ->
  8.     supervisor:start_link({local, ?SERVER}, ?MODULE, [LSock]).

  9. start_child() ->
  10.     supervisor:start_child(?SERVER, []).

  11. init([LSock]) ->
  12.     Server = {tcp_server_handler, {tcp_server_handler, start_link, [LSock]},
  13.               temporary, brutal_kill, worker, [tcp_server_handler]},
  14.     Children = [Server],
  15.     RestartStrategy = {simple_one_for_one, 0, 1},
  16.     {ok, {RestartStrategy, Children}}.


需要注意的是,只有调用start_child函数时,才真正调用tcp_server_handler:start_link([LSock])函数。

tcp_server_handler的代码也不复杂:


  1. -module(tcp_server_handler).
  2. -behaviour(gen_server).
  3. -export([start_link/1]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  5.          terminate/2, code_change/3]).
  6. -record(state, {lsock, socket, addr}).

  7. start_link(LSock) ->
  8.     gen_server:start_link(?MODULE, [LSock], []).

  9. init([Socket]) ->
  10.     inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
  11.     {ok, #state{lsock = Socket}, 0}.

  12. handle_call(Msg, _From, State) ->
  13.     {reply, {ok, Msg}, State}.

  14. handle_cast(stop, State) ->
  15.     {stop, normal, State}.

  16. handle_info({tcp, Socket, Data}, State) ->
  17.     inet:setopts(Socket, [{active, once}]),
  18.     io:format("~p got message ~p\n", [self(), Data]),
  19.     ok = gen_tcp:send(Socket, <<"Echo back : ", Data/binary>>),
  20.     {noreply, State};

  21. handle_info({tcp_closed, Socket}, #state{addr=Addr} = StateData) ->
  22.     error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
  23.     {stop, normal, StateData};

  24. handle_info(timeout, #state{lsock = LSock} = State) ->
  25.     {ok, ClientSocket} = gen_tcp:accept(LSock),
  26.     {ok, {IP, _Port}} = inet:peername(ClientSocket),
  27.        tcp_server_sup:start_child(),
  28.     {noreply, State#state{socket=ClientSocket, addr=IP}};

  29. handle_info(_Info, StateData) ->
  30.     {noreply, StateData}.

  31. terminate(_Reason, #state{socket=Socket}) ->
  32.     (catch gen_tcp:close(Socket)),
  33.     ok.

  34. code_change(_OldVsn, State, _Extra) ->
  35.     {ok, State}.


代码很精巧,有些小技巧在里面。子监督进程调用start_link函数,init会返回{ok, #state{lsock = Socket}, 0}. 数字0代表了timeout数值,意味着gen_server马上调用handle_info(timeout, #state{lsock = LSock} = State)函数,执行客户端socket监听,阻塞于此,但不会影响在此模式下其它函数的调用。直到有客户端进来,然后启动一个新的子监督进程tcp_server_handler,当前子监督进程解除阻塞。

 

基于prim_inet实现

这个实现师从于Non-blocking TCP server using OTP principles一文,但子进程改为了gen_server实现。

看一看入口,很简单的:

  1. -module(tcp_server_app).
  2. -author('yongboy@gmail.com').
  3. -behaviour(application).
  4. -export([start_client/1]).
  5. -export([start/2, stop/1]).
  6. -define(DEF_PORT, 2222).

  7. %% A startup function for spawning new client connection handling FSM.
  8. %% To be called by the TCP listener process.
  9. start_client(Socket) ->
  10.     tcp_server_sup:start_child(Socket).

  11. start(_Type, _Args) ->
  12.     ListenPort = get_app_env(listen_port, ?DEF_PORT),
  13.     tcp_server_sup:start_link(ListenPort, tcp_client_handler).

  14. stop(_S) ->
  15.     ok.

  16. get_app_env(Opt, Default) ->
  17.     case application:get_env(application:get_application(), Opt) of
  18.      {ok, Val} -> Val;
  19.      _ ->
  20.      case init:get_argument(Opt) of
  21.          [[Val | _]] -> Val;
  22.          error -> Default
  23.      end
  24.     end.


监督进程代码:

  1. -module(tcp_server_sup).
  2. -author('yongboy@gmail.com').
  3. -behaviour(supervisor).
  4. -export([start_child/1, start_link/2, init/1]).
  5. -define(SERVER, ?MODULE).
  6. -define(CLIENT_SUP, tcp_client_sup).
  7. -define(MAX_RESTART, 5).
  8. -define(MAX_TIME, 60).

  9. start_child(Socket) ->
  10.     supervisor:start_child(?CLIENT_SUP, [Socket]).

  11. start_link(ListenPort, HandleMoudle) ->
  12.     supervisor:start_link({local, ?SERVER}, ?MODULE, [ListenPort, HandleMoudle]).

  13. init([Port, Module]) ->
  14.     TcpListener = {tcp_server_sup,         % Id = internal id
  15.                   {tcp_listener, start_link, [Port, Module]},    % StartFun = {M, F, A}
  16.                   permanent,         % Restart = permanent | transient | temporary
  17.                   2000,         % Shutdown = brutal_kill | int() >= 0 | infinity
  18.                   worker,         % Type = worker | supervisor
  19.                   [tcp_listener]         % Modules = [Module] | dynamic
  20.               },
  21.     TcpClientSupervisor = {?CLIENT_SUP,
  22.                   {supervisor, start_link, [{local, ?CLIENT_SUP}, ?MODULE, [Module]]},
  23.                   permanent,
  24.                   infinity,
  25.                   supervisor,
  26.                   []
  27.               },
  28.     {ok,
  29.         {{one_for_one, ?MAX_RESTART, ?MAX_TIME},
  30.             [TcpListener, TcpClientSupervisor]
  31.         }
  32.     };

  33. init([Module]) ->
  34.     {ok,
  35.         {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
  36.             [
  37.               % TCP Client
  38.               { undefined, % Id = internal id
  39.                   {Module, start_link, []}, % StartFun = {M, F, A}
  40.                   temporary, % Restart = permanent | transient | temporary
  41.                   2000, % Shutdown = brutal_kill | int() >= 0 | infinity
  42.                   worker, % Type = worker | supervisor
  43.                   [] % Modules = [Module] | dynamic
  44.               }
  45.             ]
  46.         }
  47.     }.


策略不一样,one_for_one包括了一个监听进程tcp_listener,还包含了一个tcp_client_sup进程树(simple_one_for_one策略)

tcp_listener单独一个进程用于监听来自客户端socket的连接:


 

-module(tcp_listener).
-author('saleyn@gmail.com').
-behaviour(gen_server).
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
         code_change/3]).

-record(state, {
                listener,       % Listening socket
                acceptor,       % Asynchronous acceptor's internal reference
                module          % FSM handling module
               }).

start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).

init([Port, Module]) ->
    process_flag(trap_exit, true),
    Opts = [binary, {packet, 2}, {reuseaddr, true},
            {keepalive, true}, {backlog, 30}, {active, false}],
    case gen_tcp:listen(Port, Opts) of
	    {ok, Listen_socket} ->
	        %%Create first accepting process
	        {ok, Ref} = prim_inet:async_accept(Listen_socket, -1),
	        {ok, #state{listener = Listen_socket,
	                    acceptor = Ref,
	                    module   = Module}};
	    {error, Reason} ->
	        {stop, Reason}
    end.

handle_call(Request, _From, State) ->
    {stop, {unknown_call, Request}, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
            #state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
    try
        case set_sockopt(ListSock, CliSocket) of
	        ok              -> ok;
	        {error, Reason} -> exit({set_sockopt, Reason})
        end,

        %% New client connected - spawn a new process using the simple_one_for_one
        %% supervisor.
        {ok, Pid} = tcp_server_app:start_client(CliSocket),
        gen_tcp:controlling_process(CliSocket, Pid),

        %% Signal the network driver that we are ready to accept another connection
        case prim_inet:async_accept(ListSock, -1) of
	        {ok,    NewRef} -> ok;
	        {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
        end,

        {noreply, State#state{acceptor=NewRef}}
    catch exit:Why ->
        error_logger:error_msg("Error in async accept: ~p.\n", [Why]),
        {stop, Why, State}
    end;

handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
    error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),
    {stop, Error, State};

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, State) ->
    gen_tcp:close(State#state.listener),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% Taken from prim_inet.  We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
    true = inet_db:register_socket(CliSocket, inet_tcp),
    case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
	    {ok, Opts} ->
	        case prim_inet:setopts(CliSocket, Opts) of
		        ok    -> ok;
		        Error -> gen_tcp:close(CliSocket), Error
	        end;
	    Error ->
	        gen_tcp:close(CliSocket), Error
    end.



 

很显然,接收客户端的连接之后,转交给tcp_client_handler模块进行处理:


  1. -module(tcp_client_handler).
  2. -behaviour(gen_server).
  3. -export([start_link/1]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  5. -record(state, {socket, addr}).
  6. -define(TIMEOUT, 120000).

  7. start_link(Socket) ->
  8.     gen_server:start_link(?MODULE, [Socket], []).

  9. init([Socket]) ->
  10.     inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
  11.     {ok, {IP, _Port}} = inet:peername(Socket),
  12.     {ok, #state{socket=Socket, addr=IP}}.

  13. handle_call(Request, From, State) ->
  14.     {noreply, ok, State}.

  15. handle_cast(Msg, State) ->
  16.     {noreply, State}.

  17. handle_info({tcp, Socket, Data}, State) ->
  18.     inet:setopts(Socket, [{active, once}]),
  19.     io:format("~p got message ~p\n", [self(), Data]),
  20.     ok = gen_tcp:send(Socket, <<"Echo back : ", Data/binary>>),
  21.     {noreply, State};

  22. handle_info({tcp_closed, Socket}, #state{addr=Addr} = StateData) ->
  23.     error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
  24.     {stop, normal, StateData};

  25. handle_info(_Info, StateData) ->
  26.     {noreply, StateData}.

  27. terminate(_Reason, #state{socket=Socket}) ->
  28.     (catch gen_tcp:close(Socket)),
  29.     ok.

  30. code_change(OldVsn, State, Extra) ->
  31.     {ok, State}.

和标准API对比一下,可以感受到异步IO的好处。

小结

通过不同的模式,简单实现一个基于Erlang OTP的TCP服务器,也是学习总结,不至于忘记。

您若有更好的建议,欢迎告知,谢谢。

参考资料

  1. 《Erlang程序设计》
  2. 《Erlang/OTP并发编程实战》

原文链接:http://www.blogjava.net/yongboy/archive/2012/10/24/390185.html


阅读(4326) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~