diff options
author | Brad Anderson <brad@cloudant.com> | 2010-05-26 18:36:44 -0400 |
---|---|---|
committer | Brad Anderson <brad@cloudant.com> | 2010-05-26 18:36:44 -0400 |
commit | 47bd7a3293bb41c9d040e7fd35dcfe9faf16a992 (patch) | |
tree | 4c92f913338b59500642647f6f2f66b762c229da | |
parent | 7801b1fa879a752d11d538f35ee4905609779801 (diff) |
rework create_db with new idioms for fabric / rexi workflow
-rw-r--r-- | ebin/fabric.app | 3 | ||||
-rw-r--r-- | src/fabric_create.erl | 98 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 49 | ||||
-rw-r--r-- | src/fabric_util.erl | 49 |
4 files changed, 96 insertions, 103 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app index 73943af5..ba3d3e03 100644 --- a/ebin/fabric.app +++ b/ebin/fabric.app @@ -9,7 +9,8 @@ fabric_create, fabric_delete, fabric_info, - fabric_open + fabric_open, + fabric_util ]}, {registered, []}, {included_applications, []}, diff --git a/src/fabric_create.erl b/src/fabric_create.erl index 1143d080..c2c01ccd 100644 --- a/src/fabric_create.erl +++ b/src/fabric_create.erl @@ -19,13 +19,14 @@ create_db(DbName, Options) -> Fullmap = partitions:fullmap(DbName, Options), {ok, FullNodes} = mem3:fullnodes(), RefPartMap = send_create_calls(DbName, Options, Fullmap), - {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000, - fun create_db_loop/3), - case create_results(Results, RefPartMap) of - ok -> + Acc0 = {false, length(RefPartMap), + lists:usort([ {Beg, false} || {_,#part{b=Beg}} <- RefPartMap])}, + case fabric_util:receive_loop( + RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of + {ok, _Results} -> partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), - {ok, #db{name=DbName}}; - Other -> {error, Other} + ok; + Error -> Error end. @@ -43,68 +44,29 @@ send_create_calls(DbName, Options, Fullmap) -> {Ref, Part} end, Fullmap). -%% @doc create_db receive loop -%% Acc is either an accumulation of responses, or if we've received all -%% responses, it's {ok, Responses} --spec create_db_loop([ref_part_map()], tref(), beg_acc()) -> - beg_acc() | {ok, beg_acc()}. -create_db_loop(_,_,{ok, Acc}) -> {ok, Acc}; -create_db_loop(RefPartMap, TimeoutRef, AccIn) -> - receive - {Ref, {ok, MainPid}} when is_reference(Ref) -> - % for dev only, close the Fd TODO: remove me - gen_server:call({couch_server, node(MainPid)}, {force_close, MainPid}), - - AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok), - create_db_loop(RefPartMap, TimeoutRef, AccOut); - {Ref, Reply} when is_reference(Ref) -> - AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply), - create_db_loop(RefPartMap, TimeoutRef, AccOut); - {timeout, TimeoutRef} -> - {error, timeout} - end. - -%% @doc check the results (beginning of each partition range) of the create -%% replies. If we have a good reply from each partition, return ok --spec create_results(beg_acc(), [ref_part_map()]) -> ok | create_quorum_error. -create_results(Results, RefPartMap) -> - ResultBegParts = create_result(Results, []), - DistinctBegParts = distinct_parts(RefPartMap), +handle_create_msg(_, file_exists, _) -> + {error, file_exists}; +handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) -> + {ok, {Complete, N-1, Parts}}; +handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) -> if - ResultBegParts =:= DistinctBegParts -> ok; - true -> - ?debugFmt("~nResultBegParts: ~p~nDistinctBegParts: ~p~n", - [ResultBegParts, DistinctBegParts]), - create_quorum_error - end. - --spec create_result(beg_acc(), [part()]) -> [part()] | file_exists. -create_result([], Acc) -> - lists:usort(Acc); -create_result([{#part{b=Beg}, ok}|Rest], Acc) -> - create_result(Rest, [Beg|Acc]); -create_result([{_, {error, file_exists}}|_Rest], _Acc) -> - {error, file_exists}; % if any replies were file_exists, return that -create_result([{_, Result}|Rest], Acc) -> - showroom_log:message(error, "create_db error: ~p", [Result]), - create_result(Rest, Acc). + Complete -> {stop, ok}; + true -> {error, create_db_fubar} + end; +handle_create_msg(_, _, {true, 1, _Acc}) -> + {stop, ok}; +handle_create_msg({_, #part{b=Beg}}, {ok, _}, {false, 1, PartResults0}) -> + PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), + case is_complete(PartResults) of + true -> {stop, ok}; + false -> {error, create_db_fubar} + end; +handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) -> + {ok, {true, N-1, Parts}}; +handle_create_msg({_Ref, #part{b=Beg}}, {ok, _}, {false, Rem, PartResults0}) -> + PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), + {ok, {is_complete(PartResults), Rem-1, PartResults}}. -check_all_parts(Ref, RefPartMap, Acc, Reply) -> - case couch_util:get_value(Ref, RefPartMap) of - #part{} = Part -> - case lists:keyfind(Part, 1, Acc) of - true -> Acc; % already present... that's odd - _ -> - NewAcc = [{Part, Reply} | Acc], - case length(NewAcc) >= length(RefPartMap) of - true -> {ok, NewAcc}; - _ -> NewAcc - end - end; - _ -> Acc % ignore a non-matching Ref - end. -distinct_parts(RefPartMap) -> - {_Refs, Parts} = lists:unzip(RefPartMap), - BegParts = lists:map(fun(#part{b=Beg}) -> Beg end, Parts), - lists:usort(BegParts). +is_complete(List) -> + lists:all(fun({_,Bool}) -> Bool end, List). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index fd54827c..0a034d59 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -1,49 +1,30 @@ -module(fabric_rpc). --export([open_doc/4, get_doc_info/1]). --export([receive_loop/3]). +-export([open_doc/4, get_db_info/1]). --include("../../dynomite/include/membership.hrl"). +%% rpc endpoints +%% call to with_db will supply your M:F with a #db{} and then remaining args open_doc(DbName, DocId, Revs, Options) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - try - couch_api:open_doc(Db, DocId, Revs, Options) - after - couch_db:close(Db) - end; - {not_found, no_db_file} -> - throw({not_found, <<"The database does not exist.">>}); - Error -> - throw(Error) - end. + with_db(DbName, {couch_api, open_doc, [DocId, Revs, Options]}). + +get_db_info(DbName) -> + with_db(DbName, {couch_db, get_db_info, []}). -get_doc_info(DbName) -> +%% +%% internal +%% + +with_db(DbName, {M,F,A}) -> case couch_db:open(DbName, []) of {ok, Db} -> - try - couch_db:get_db_info(Db) - after - couch_db:close(Db) - end; - {not_found, no_db_file} -> - throw({not_found, <<"The database does not exist.">>}); + rexi:reply(apply(M, F, [Db | A])); Error -> - throw(Error) + rexi:reply(Error) end. + %% %% helper funs %% - -%% @doc set up the receive loop with an overall timeout --spec receive_loop([ref_part_map()], integer(), function()) -> {ok, beg_acc()}. -receive_loop(RefPartMap, Timeout, Loop) -> - TimeoutRef = erlang:make_ref(), - {ok, TRef} = timer:send_after(Timeout, {timeout, TimeoutRef}), - Results = Loop(RefPartMap, TimeoutRef, []), - timer:cancel(TRef), - Results. -%{Status, Info} = couch_db:get_db_info(Shard), diff --git a/src/fabric_util.erl b/src/fabric_util.erl new file mode 100644 index 00000000..8eeaee33 --- /dev/null +++ b/src/fabric_util.erl @@ -0,0 +1,49 @@ +-module(fabric_util). + +-export([receive_loop/6]). + +-include("../../dynomite/include/membership.hrl"). + + +%% @doc set up the receive loop with an overall timeout +-spec receive_loop([ref_part_map()], integer(), function(), any(), + integer(), integer()) -> + {ok, beg_acc()}. +receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> + TimeoutRef = erlang:make_ref(), + {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), + try + process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) + after + timer:cancel(TRef) + end. + +process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of + {ok, Acc} -> + process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); + {stop, Acc} -> + {ok, Acc}; + Error -> + Error + end. + +process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + receive + {timeout, TimeoutRef} -> + timeout; + {Ref, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + % this was some non-matching message which we will ignore + {ok, Acc0}; + RefPart -> + % call the Fun that understands the message + Fun(RefPart, Msg, Acc0) + end; + {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> + showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]), + Fun(nil, Msg, Acc0) + after PerMsgTO -> + timeout + end. |