开始 ok = rabbit_exchange:recover(),
recover() ->
ok = rabbit_misc:table_foreach(
fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
Exchange, write)
end, rabbit_durable_exchange),
ok = rabbit_misc:table_foreach(
fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
ok = mnesia:write(rabbit_route,
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
end, rabbit_durable_route).
几个recover主要是吧持久数据库表里的数据恢复到临时表里
持久!
rabbit_persister.erl
{"persister",
fun () ->
ok = start_child(rabbit_persister)
end},
init(_Args) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
Snapshot = #psnapshot{serial = 0,
transactions = dict:new(),
messages = ets:new(messages, []),
queues = ets:new(queues, [])},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
{file, FileName}]) of
{ok, LH} -> LH;
{repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
WarningFun = if
Bad > 0 -> fun rabbit_log:warning/2;
true -> fun rabbit_log:info/2
end,
WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
[Recovered, Bad]),
LH
end,
{Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
NewSnapshot = LoadedSnapshot#psnapshot{
serial = LoadedSnapshot#psnapshot.serial + 1},
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
{error, Reason} ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
State = #pstate{log_handle = LogHandle,
entry_count = 0,
deadline = infinity,
pending_logs = [],
pending_replies = [],
snapshot = NewSnapshot},
{ok, State}.
current_snapshot(_Snapshot = #psnapshot{serial = Serial,
transactions= Ts,
messages = Messages,
queues = Queues}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
prune_table(Messages, ets:foldl(
fun ({{_QName, PKey}, _Delivered}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
InnerSnapshot = {{serial, Serial},
{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
prune_table(Tab, Keys) ->
true = ets:safe_fixtable(Tab, true),
ok = prune_table(Tab, Keys, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false). %%%ets 文档safe_fixtable 有介绍这样做的原因
prune_table(_Tab, _Keys, '$end_of_table') -> ok;
prune_table(Tab, Keys, Key) ->
case sets:is_element(Key, Keys) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
prune_table(Tab, Keys, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
{{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
serial = Serial,
transactions = Ts}),
Snapshot2 = requeue_messages(Snapshot1),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
{ok, Snapshot2#psnapshot{transactions = dict:new()}};
{error, Reason} -> {{error, Reason}, Snapshot}
end.
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->
replay(Items, LogHandle, K1, Snapshot);
{K1, Items, Badbytes} ->
rabbit_log:warning("~p bad bytes recovering persister log~n",
[Badbytes]),
replay(Items, LogHandle, K1, Snapshot);
eof -> Snapshot
end;
replay([Item | Items], LogHandle, K, Snapshot) ->
NewSnapshot = internal_integrate_messages(Item, Snapshot),
replay(Items, LogHandle, K, NewSnapshot).
internal_integrate_messages(Items, Snapshot) ->
lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
Snapshot, Items).
internal_integrate1({extend_transaction, Key, MessageList},
Snapshot = #psnapshot {transactions = Transactions}) ->
NewTransactions =
dict:update(Key,
fun (MessageLists) -> [MessageList | MessageLists] end,
[MessageList],
Transactions),
Snapshot#psnapshot{transactions = NewTransactions};
internal_integrate1({rollback_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions}) ->
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
messages = Messages,
queues = Queues}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
lists:reverse(MessageLists)),
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
Snapshot = #psnapshot {messages = Messages,
queues = Queues}) ->
perform_work(MessageList, Messages, Queues),
Snapshot.
perform_work(MessageList, Messages, Queues) ->
lists:foreach(
fun (Item) -> perform_work_item(Item, Messages, Queues) end,
MessageList).
perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
ets:insert(Messages, {PKey, Message}),
ets:insert(Queues, {QK, false});
perform_work_item({tied, QK}, _Messages, Queues) ->
ets:insert(Queues, {QK, false});
perform_work_item({deliver, QK}, _Messages, Queues) ->
%% from R12B-2 onward we could use ets:update_element/3 here
ets:delete(Queues, QK),
ets:insert(Queues, {QK, true});
perform_work_item({ack, QK}, _Messages, Queues) ->
ets:delete(Queues, QK).
-spec foldl(fun((T, _) -> _), _, [T]) -> _.
foldl(F, Accu, [Hd|Tail]) ->
foldl(F, F(Hd, Accu), Tail);
foldl(F, Accu, []) when is_function(F, 2) -> Accu.
阅读(638) | 评论(0) | 转发(0) |