Chinaunix首页 | 论坛 | 博客
  • 博客访问: 149851
  • 博文数量: 92
  • 博客积分: 2035
  • 博客等级: 大尉
  • 技术积分: 874
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-10 09:03
文章分类

全部博文(92)

文章存档

2010年(38)

2009年(54)

我的朋友

分类: LINUX

2010-02-25 15:42:55

       {"TCP listeners",
        fun () ->
                ok = rabbit_networking:start(),
                {ok, TcpListeners} = application:get_env(tcp_listeners),
                lists:foreach(
                  fun ({Host, Port}) ->
                          ok = rabbit_networking:start_tcp_listener(Host, Port)
                  end,
                  TcpListeners)
        end},


start_tcp_listener(Host, Port) ->
    start_listener(Host, Port, "TCP Listener", {?MODULE, start_client, []}).



start_listener(Host, Port, Label, OnConnect) ->
    {IPAddress, Name} =
        check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),

    {ok,_} = supervisor:start_child(
               rabbit_sup,
               {Name,
                {tcp_listener_sup, start_link,
                 [IPAddress, Port, ?RABBIT_TCP_OPTS ,  {?MODULE, tcp_listener_started, []},    {?MODULE, tcp_listener_stopped, []},       OnConnect, Label]},
                transient, infinity, supervisor, [tcp_listener_sup]}),
    ok.



-module(tcp_listener_sup).
-behaviour(supervisor).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,   AcceptCallback, Label) ->
    start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,  AcceptCallback, 1, Label).

start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, ConcurrentAcceptorCount, Label) ->
    supervisor:start_link(  ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,  AcceptCallback, ConcurrentAcceptorCount, Label}).

init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, ConcurrentAcceptorCount, Label}) ->
    Name = rabbit_misc:tcp_name(tcp_acceptor_sup, IPAddress, Port),

    {ok, {{one_for_all, 10, 10},
          [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link, [Name, AcceptCallback]},      
               transient, infinity, supervisor, [tcp_acceptor_sup]},
           {tcp_listener, {tcp_listener, start_link, [IPAddress, Port, SocketOpts,ConcurrentAcceptorCount, Name, OnStartup, OnShutdown, Label]},
               transient, 100, worker, [tcp_listener]}]}}.



-module(tcp_acceptor_sup).
start_link(Name, Callback) ->
    supervisor:start_link({local,Name}, ?MODULE, Callback).

init(Callback) ->
    {ok, {{simple_one_for_one, 10, 10},
          [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
            transient, brutal_kill, worker, [tcp_acceptor]}]}}.

参数不一致的追踪:

-module(tcp_acceptor).
-record(state, {callback, sock, ref}).
start_link(Callback, LSock) -> %%这里的LSock应该就是下面生成的变量!!
    gen_server:start_link(?MODULE, {Callback, LSock}, []).
init({Callback, LSock}) ->
    case prim_inet:async_accept(LSock, -1) of
        {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
        Error -> {stop, {cannot_accept, Error}}
    end.

handle_info({inet_async, LSock, Ref, {ok, Sock}},
            State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
    {ok, Mod} = inet_db:lookup_socket(LSock),
    inet_db:register_socket(Sock, Mod),

    try
        %% report
        {Address, Port}         = inet_op(fun () -> inet:sockname(LSock) end),
        {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
        error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
                              [inet_parse:ntoa(Address), Port,
                               inet_parse:ntoa(PeerAddress), PeerPort]),
        %% handle
        apply(M, F, A ++ [Sock])



init({IPAddress, Port, SocketOpts,
      ConcurrentAcceptorCount, AcceptorSup,
      {M,F,A} = OnStartup, OnShutdown, Label}) ->
    process_flag(trap_exit, true),
    case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
                                             {active, false}]) of
        {ok, LSock} ->          %%LSock变量在此生成!!
            lists:foreach(fun (_) ->
                                  {ok, _APid} = supervisor:start_child(
                                                  AcceptorSup, [LSock])
                          end,
                          lists:duplicate(ConcurrentAcceptorCount, dummy)),
            {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
            error_logger:info_msg("started ~s on ~s:~p~n",
                                  [Label, inet_parse:ntoa(LIPAddress), LPort]),
            apply(M, F, A ++ [IPAddress, Port]),
            {ok, #state{sock = LSock,
                        on_startup = OnStartup, on_shutdown = OnShutdown,
                        label = Label}};
        {error, Reason} ->
            error_logger:error_msg(
              "failed to start ~s on ~s:~p - ~p~n",
              [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
            {stop, {cannot_listen, IPAddress, Port, Reason}}
    end.

阅读(795) | 评论(0) | 转发(0) |
0

上一篇:jamon简介

下一篇:erlang debug rabbitmq

给主人留下些什么吧!~~