监控树相关的启动函数:
tcp_listener_sup.erl
-module(tcp_listener_sup).
-behaviour(supervisor).
-export([start_link/7, start_link/8]).
-export([init/1]).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, 1, Label).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,AcceptCallback, ConcurrentAcceptorCount, Label) ->
supervisor:start_link( ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, ConcurrentAcceptorCount, Label}).
init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, AcceptCallback, ConcurrentAcceptorCount, Label}) ->
Name = rabbit_misc:tcp_name(tcp_acceptor_sup, IPAddress, Port),
{ok, {{one_for_all, 10, 10},
[{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,[Name, AcceptCallback]}, transient, infinity, supervisor, [tcp_acceptor_sup]},
{tcp_listener, {tcp_listener, start_link,[IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name,OnStartup, OnShutdown, Label]},
transient, 100, worker, [tcp_listener]}]}}.
-module(tcp_acceptor_sup).
-behaviour(supervisor).
-export([start_link/2]).
-export([init/1]).
start_link(Name, Callback) ->
supervisor:start_link({local,Name}, ?MODULE, Callback).
init(Callback) ->
{ok, {{simple_one_for_one, 10, 10},
[{tcp_acceptor, {tcp_acceptor, start_link, [Callback]}, transient, brutal_kill, worker, [tcp_acceptor]
}]
}
}.
-module(tcp_client_sup).
-behaviour(supervisor).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
supervisor:start_link(?MODULE, Callback).
start_link(SupName, Callback) ->
supervisor:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
{ok, {{simple_one_for_one, 10, 10},
[{tcp_client, {M,F,A},
temporary, brutal_kill, worker, [M]}]}}.
-module(rabbit_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
-module(rabbit_amqqueue_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}.
start_child(Mod) ->
{ok,_} = supervisor:start_child(rabbit_sup,
{Mod, {Mod, start_link, []},
transient, 100, worker, [Mod]}),
ok.
%% This is based on os_mon:childspec(memsup, true)
{ok, _} = supervisor:start_child(
os_mon_sup,
{memsup, {Mod, start_link, Args},
permanent, 2000, worker, [Mod]}),
ok.
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
ok.
start_queue_process(Q) ->
{ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
Q#amqqueue{pid = Pid}.
If the case of a simple_one_for_one supervisor,
the child specification defined in Module:init/1 will
be used and ChildSpec should instead be an arbitrary
list of terms List. The child process will then be
started by appending List to the existing start
function arguments, i.e. by calling
apply(M, F, A++List) where {M,F,A} is the start
function defined in the child specification.
start() ->
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_reader,start_link,[]}]},
transient, infinity, supervisor, [tcp_client_sup]}),
ok
start_listener(Host, Port, Label, OnConnect) ->
{IPAddress, Name} =
check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name, {tcp_listener_sup, start_link,
[IPAddress, Port, ?RABBIT_TCP_OPTS ,{?MODULE,tcp_listener_started, []},{?MODULE, tcp_listener_stopped, []}, OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}),
ok.
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Child),
Child ! {go, Sock},
Child.
tcp_listener.erl
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
error_logger:info_msg("started ~s on ~s:~p~n",
[Label, inet_parse:ntoa(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
"failed to start ~s on ~s:~p - ~p~n",
[Label, inet_parse:ntoa(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
rabbit_reader.erl
init(Parent) ->
Deb = sys:debug_options([]),
receive
{go, Sock} -> start_connection(Parent, Deb, Sock)
end.
start_connection(Parent, Deb, ClientSock) ->
process_flag(trap_exit, true),
{PeerAddressS, PeerPort} = peername(ClientSock),
ProfilingValue = setup_profiling(),
try
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
true ->
fun rabbit_log:error/2
end)("exception on TCP connection ~p from ~s:~p~n~p~n",
[self(), PeerAddressS, PeerPort, Ex])
after
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
%% controlling process and hence its termination will close
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue)
end,
done.
阅读(926) | 评论(0) | 转发(0) |