Chinaunix首页 | 论坛 | 博客
  • 博客访问: 149897
  • 博文数量: 92
  • 博客积分: 2035
  • 博客等级: 大尉
  • 技术积分: 874
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-10 09:03
文章分类

全部博文(92)

文章存档

2010年(38)

2009年(54)

我的朋友

分类: LINUX

2010-01-24 10:12:52

监控树相关的启动函数:

tcp_listener_sup.erl
-module(tcp_listener_sup).
-behaviour(supervisor).
-export([start_link/7, start_link/8]).
-export([init/1]).

start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, Label) ->
    start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, 1, Label).

start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,AcceptCallback, ConcurrentAcceptorCount, Label) ->
    supervisor:start_link( ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, ConcurrentAcceptorCount, Label}).

init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, ConcurrentAcceptorCount, Label}) ->
    Name = rabbit_misc:tcp_name(tcp_acceptor_sup, IPAddress, Port),
    {ok, {{one_for_all, 10, 10},
          [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,[Name, AcceptCallback]}, transient, infinity, supervisor, [tcp_acceptor_sup]},
           {tcp_listener, {tcp_listener, start_link,[IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name,OnStartup, OnShutdown, Label]},
            transient, 100, worker, [tcp_listener]}]}}.



-module(tcp_acceptor_sup).
-behaviour(supervisor).
-export([start_link/2]).
-export([init/1]).

start_link(Name, Callback) ->
    supervisor:start_link({local,Name}, ?MODULE, Callback).
init(Callback) ->
    {ok, {{simple_one_for_one, 10, 10},
          [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]}, transient, brutal_kill, worker, [tcp_acceptor]
           }]
         }
    }.



-module(tcp_client_sup).
-behaviour(supervisor).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
    supervisor:start_link(?MODULE, Callback).

start_link(SupName, Callback) ->
    supervisor:start_link(SupName, ?MODULE, Callback).

init({M,F,A}) ->
    {ok, {{simple_one_for_one, 10, 10},
          [{tcp_client, {M,F,A},
            temporary, brutal_kill, worker, [M]}]}}.



-module(rabbit_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).

init([]) ->
    {ok, {{one_for_one, 10, 10}, []}}.



-module(rabbit_amqqueue_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).

start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
    {ok, {{simple_one_for_one, 10, 10},
          [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
            temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}.



start_child(Mod) ->
    {ok,_} = supervisor:start_child(rabbit_sup,
                                    {Mod, {Mod, start_link, []},
                                     transient, 100, worker, [Mod]}),
    ok.


    %% This is based on os_mon:childspec(memsup, true)
    {ok, _} = supervisor:start_child(
                os_mon_sup,
                {memsup, {Mod, start_link, Args},
                 permanent, 2000, worker, [Mod]}),
    ok.


    {ok,_} = supervisor:start_child(
               rabbit_sup,
               {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
                transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
    ok.


start_queue_process(Q) ->
    {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
    Q#amqqueue{pid = Pid}.

If the case of a simple_one_for_one supervisor, the child specification defined in Module:init/1 will be used and ChildSpec should instead be an arbitrary list of terms List. The child process will then be started by appending List to the existing start function arguments, i.e. by calling apply(M, F, A++List) where {M,F,A} is the start function defined in the child specification.



start() ->
    {ok,_} = supervisor:start_child(
               rabbit_sup,
               {rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_reader,start_link,[]}]},
                transient, infinity, supervisor, [tcp_client_sup]}),
    ok



start_listener(Host, Port, Label, OnConnect) ->
    {IPAddress, Name} =
        check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
    {ok,_} = supervisor:start_child(
               rabbit_sup,
               {Name, {tcp_listener_sup, start_link,
                 [IPAddress, Port, ?RABBIT_TCP_OPTS ,{?MODULE,tcp_listener_started, []},{?MODULE, tcp_listener_stopped, []}, OnConnect, Label]},
                transient, infinity, supervisor, [tcp_listener_sup]}),
    ok.



start_client(Sock) ->
    {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
    ok = rabbit_net:controlling_process(Sock, Child),
    Child ! {go, Sock},
    Child.



tcp_listener.erl
init({IPAddress, Port, SocketOpts,
      ConcurrentAcceptorCount, AcceptorSup,
      {M,F,A} = OnStartup, OnShutdown, Label}) ->
    process_flag(trap_exit, true),
    case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
                                             {active, false}]) of
        {ok, LSock} ->
            lists:foreach(fun (_) ->
                                  {ok, _APid} = supervisor:start_child(
                                                  AcceptorSup, [LSock])
                          end,
                          lists:duplicate(ConcurrentAcceptorCount, dummy)),
            {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
            error_logger:info_msg("started ~s on ~s:~p~n",
                                  [Label, inet_parse:ntoa(LIPAddress), LPort]),
            apply(M, F, A ++ [IPAddress, Port]),
            {ok, #state{sock = LSock,
                        on_startup = OnStartup, on_shutdown = OnShutdown,
                        label = Label}};
        {error, Reason} ->
            error_logger:error_msg(
              "failed to start ~s on ~s:~p - ~p~n",
              [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
            {stop, {cannot_listen, IPAddress, Port, Reason}}
    end.




rabbit_reader.erl
init(Parent) ->
    Deb = sys:debug_options([]),
    receive
        {go, Sock} -> start_connection(Parent, Deb, Sock)
    end.

start_connection(Parent, Deb, ClientSock) ->
    process_flag(trap_exit, true),
    {PeerAddressS, PeerPort} = peername(ClientSock),
    ProfilingValue = setup_profiling(),
    try
        rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
                        [self(), PeerAddressS, PeerPort]),
        erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
                          handshake_timeout),
        mainloop(Parent, Deb, switch_callback(
                                #v1{sock = ClientSock,
                                    connection = #connection{
                                      user = none,
                                      timeout_sec = ?HANDSHAKE_TIMEOUT,
                                      frame_max = ?FRAME_MIN_SIZE,
                                      vhost = none},
                                    callback = uninitialized_callback,
                                    recv_ref = none,
                                    connection_state = pre_init},
                                handshake, 8))
    catch
        Ex -> (if Ex == connection_closed_abruptly ->
                       fun rabbit_log:warning/2;
                  true ->
                       fun rabbit_log:error/2
               end)("exception on TCP connection ~p from ~s:~p~n~p~n",
                    [self(), PeerAddressS, PeerPort, Ex])
    after
        rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
                        [self(), PeerAddressS, PeerPort]),
        %% We don't close the socket explicitly. The reader is the
        %% controlling process and hence its termination will close
        %% the socket. Furthermore, gen_tcp:close/1 waits for pending
        %% output to be sent, which results in unnecessary delays.
        %%
        %% gen_tcp:close(ClientSock),
        teardown_profiling(ProfilingValue)
    end,
    done.


阅读(926) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~