Chinaunix首页 | 论坛 | 博客
  • 博客访问: 149856
  • 博文数量: 92
  • 博客积分: 2035
  • 博客等级: 大尉
  • 技术积分: 874
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-10 09:03
文章分类

全部博文(92)

文章存档

2010年(38)

2009年(54)

我的朋友

分类:

2010-01-24 23:39:45

开始  ok = rabbit_exchange:recover(),

recover() ->
    ok = rabbit_misc:table_foreach(
           fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
                                              Exchange, write)
           end, rabbit_durable_exchange),
    ok = rabbit_misc:table_foreach(
           fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
                         ok = mnesia:write(rabbit_route,
                                           Route, write),
                         ok = mnesia:write(rabbit_reverse_route,
                                           ReverseRoute, write)
           end, rabbit_durable_route).

几个recover主要是吧持久数据库表里的数据恢复到临时表里


持久!
rabbit_persister.erl

       {"persister",
        fun () ->
                ok = start_child(rabbit_persister)
        end},

init(_Args) ->
    process_flag(trap_exit, true),
    FileName = base_filename(),
    ok = filelib:ensure_dir(FileName),
    Snapshot = #psnapshot{serial       = 0,
                          transactions = dict:new(),
                          messages     = ets:new(messages, []),
                          queues       = ets:new(queues, [])},
    LogHandle =
        case disk_log:open([{name, rabbit_persister},
                            {head, current_snapshot(Snapshot)},
                            {file, FileName}]) of
            {ok, LH} -> LH;
            {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
                WarningFun = if
                                 Bad > 0 -> fun rabbit_log:warning/2;
                                 true    -> fun rabbit_log:info/2
                             end,
                WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
                           [Recovered, Bad]),
                LH
        end,
    {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
    NewSnapshot = LoadedSnapshot#psnapshot{
                    serial = LoadedSnapshot#psnapshot.serial + 1},
    case Res of
        ok ->
            ok = take_snapshot(LogHandle, NewSnapshot);
        {error, Reason} ->
            rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
            ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
    end,
    State = #pstate{log_handle = LogHandle,
                    entry_count = 0,
                    deadline = infinity,
                    pending_logs = [],
                    pending_replies = [],
                    snapshot = NewSnapshot},
    {ok, State}.




current_snapshot(_Snapshot = #psnapshot{serial = Serial,
                                        transactions= Ts,
                                        messages = Messages,
                                        queues = Queues}) ->
    %% Avoid infinite growth of the table by removing messages not
    %% bound to a queue anymore
    prune_table(Messages, ets:foldl(
                            fun ({{_QName, PKey}, _Delivered}, S) ->
                                    sets:add_element(PKey, S)
                            end, sets:new(), Queues)),
    InnerSnapshot = {{serial, Serial},
                     {txns, Ts},
                     {messages, ets:tab2list(Messages)},
                     {queues, ets:tab2list(Queues)}},
    ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
    {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
     term_to_binary(InnerSnapshot)}.

prune_table(Tab, Keys) ->
    true = ets:safe_fixtable(Tab, true),
    ok = prune_table(Tab, Keys, ets:first(Tab)),
    true = ets:safe_fixtable(Tab, false).     %%%ets 文档safe_fixtable 有介绍这样做的原因
   
prune_table(_Tab, _Keys, '$end_of_table') -> ok;
prune_table(Tab, Keys, Key) ->
    case sets:is_element(Key, Keys) of
        true  -> ok;
        false -> ets:delete(Tab, Key)
    end,
    prune_table(Tab, Keys, ets:next(Tab, Key)).





internal_load_snapshot(LogHandle,
                       Snapshot = #psnapshot{messages = Messages,
                                             queues = Queues}) ->
    {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
    case check_version(Loaded_Snapshot) of
        {ok, StateBin} ->
            {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
                binary_to_term(StateBin),
            true = ets:insert(Messages, Ms),
            true = ets:insert(Queues, Qs),
            Snapshot1 = replay(Items, LogHandle, K,
                               Snapshot#psnapshot{
                                 serial = Serial,
                                 transactions = Ts}),
            Snapshot2 = requeue_messages(Snapshot1),
            %% uncompleted transactions are discarded - this is TRTTD
            %% since we only get into this code on node restart, so
            %% any uncompleted transactions will have been aborted.
            {ok, Snapshot2#psnapshot{transactions = dict:new()}};
        {error, Reason} -> {{error, Reason}, Snapshot}
    end.



replay([], LogHandle, K, Snapshot) ->
    case disk_log:chunk(LogHandle, K) of
        {K1, Items} ->
            replay(Items, LogHandle, K1, Snapshot);
        {K1, Items, Badbytes} ->
            rabbit_log:warning("~p bad bytes recovering persister log~n",
                               [Badbytes]),
            replay(Items, LogHandle, K1, Snapshot);
        eof -> Snapshot
    end;
replay([Item | Items], LogHandle, K, Snapshot) ->
    NewSnapshot = internal_integrate_messages(Item, Snapshot),
    replay(Items, LogHandle, K, NewSnapshot).

internal_integrate_messages(Items, Snapshot) ->
    lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
                Snapshot, Items).

internal_integrate1({extend_transaction, Key, MessageList},
                    Snapshot = #psnapshot {transactions = Transactions}) ->
    NewTransactions =
        dict:update(Key,
                    fun (MessageLists) -> [MessageList | MessageLists] end,
                    [MessageList],
                    Transactions),
    Snapshot#psnapshot{transactions = NewTransactions};
internal_integrate1({rollback_transaction, Key},
                    Snapshot = #psnapshot{transactions = Transactions}) ->
    Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
                    Snapshot = #psnapshot{transactions = Transactions,
                                          messages = Messages,
                                          queues = Queues}) ->
    case dict:find(Key, Transactions) of
        {ok, MessageLists} ->
            ?LOGDEBUG("persist committing txn ~p~n", [Key]),
            lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
                          lists:reverse(MessageLists)),
            Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
        error ->
            Snapshot
    end;
internal_integrate1({dirty_work, MessageList},
                    Snapshot = #psnapshot {messages = Messages,
                                           queues = Queues}) ->
    perform_work(MessageList, Messages, Queues),
    Snapshot.

perform_work(MessageList, Messages, Queues) ->
    lists:foreach(
      fun (Item) -> perform_work_item(Item, Messages, Queues) end,
      MessageList).

perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
    ets:insert(Messages, {PKey, Message}),
    ets:insert(Queues, {QK, false});

perform_work_item({tied, QK}, _Messages, Queues) ->
    ets:insert(Queues, {QK, false});

perform_work_item({deliver, QK}, _Messages, Queues) ->
    %% from R12B-2 onward we could use ets:update_element/3 here
    ets:delete(Queues, QK),
    ets:insert(Queues, {QK, true});

perform_work_item({ack, QK}, _Messages, Queues) ->
    ets:delete(Queues, QK).






-spec foldl(fun((T, _) -> _), _, [T]) -> _.

foldl(F, Accu, [Hd|Tail]) ->
    foldl(F, F(Hd, Accu), Tail);
foldl(F, Accu, []) when is_function(F, 2) -> Accu.

阅读(638) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~