rabbit.erl: 部分数据库初始化操作:
{"recovery",
fun () ->
ok = maybe_insert_default_data(),
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
false -> ok
end.
insert_default_data() ->
{ok, DefaultUser} = application:get_env(default_user),
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
ok = rabbit_access_control:
add_vhost(DefaultVHost),
ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
ok = rabbit_access_control:
set_permissions(DefaultUser, DefaultVHost,
DefaultConfigurePerm,
DefaultWritePerm,
DefaultReadPerm),
ok.
rabbit_access_control.erl :
add_vhost(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, VHostPath}) of
[] ->
ok = mnesia:write(rabbit_vhost,
#vhost{virtual_host = VHostPath},
write),
[
rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
{<<"amq.match">>, headers}, %% per 0-9-1 pdf
{<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
end
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
R.
-type(vhost() :: binary()).
-record(vhost, {virtual_host, dummy}).
rabbit_misc.erl:
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name}.
r(VHostPath, Kind) when is_binary(VHostPath) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
rabbit_exchage.erl
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, ExchangeName}) of
[] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
if Durable ->
ok = mnesia:write(rabbit_durable_exchange,
Exchange, write);
true -> ok
end,
Exchange;
[ExistingX] -> ExistingX
end
end).
validate_regexp(RegexpBin) ->
Regexp = binary_to_list(RegexpBin),
case regexp:parse(Regexp) of
{ok, _} -> ok;
{error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:
with_user_and_vhost(
Username, VHostPath,
fun () -> ok = mnesia:write(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
username = Username,
virtual_host = VHostPath},
permission = #permission{
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
write)
end)).
with_user(Username, Thunk) ->
fun () ->
case mnesia:read({rabbit_user, Username}) of
[] ->
mnesia:abort({no_such_user, Username});
[_U] ->
Thunk()
end
end.
with_vhost(VHostPath, Thunk) ->
fun () ->
case mnesia:read({rabbit_vhost, VHostPath}) of
[] ->
mnesia:abort({no_such_vhost, VHostPath});
[_V] ->
Thunk()
end
end.
with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, with_vhost(VHostPath, Thunk)).
-type(r(Kind) ::
#resource{virtual_host :: vhost(),
kind :: Kind,
name :: resource_name()}).
-type(queue_name() :: r('queue')).
-type(exchange_name() :: r('exchange')).
-type(user() ::
#user{username :: username(),
password :: password()}).
-type(permission() ::
#permission{configure :: regexp(),
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
durable :: boolean(),
auto_delete :: boolean(),
arguments :: amqp_table(),
pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: boolean(),
auto_delete :: boolean(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
queue_name :: queue_name(),
key :: binding_key()}).