summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-05-26 18:36:44 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-26 18:36:44 -0400
commit47bd7a3293bb41c9d040e7fd35dcfe9faf16a992 (patch)
tree4c92f913338b59500642647f6f2f66b762c229da
parent7801b1fa879a752d11d538f35ee4905609779801 (diff)
rework create_db with new idioms for fabric / rexi workflow
-rw-r--r--ebin/fabric.app3
-rw-r--r--src/fabric_create.erl98
-rw-r--r--src/fabric_rpc.erl49
-rw-r--r--src/fabric_util.erl49
4 files changed, 96 insertions, 103 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app
index 73943af5..ba3d3e03 100644
--- a/ebin/fabric.app
+++ b/ebin/fabric.app
@@ -9,7 +9,8 @@
fabric_create,
fabric_delete,
fabric_info,
- fabric_open
+ fabric_open,
+ fabric_util
]},
{registered, []},
{included_applications, []},
diff --git a/src/fabric_create.erl b/src/fabric_create.erl
index 1143d080..c2c01ccd 100644
--- a/src/fabric_create.erl
+++ b/src/fabric_create.erl
@@ -19,13 +19,14 @@ create_db(DbName, Options) ->
Fullmap = partitions:fullmap(DbName, Options),
{ok, FullNodes} = mem3:fullnodes(),
RefPartMap = send_create_calls(DbName, Options, Fullmap),
- {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000,
- fun create_db_loop/3),
- case create_results(Results, RefPartMap) of
- ok ->
+ Acc0 = {false, length(RefPartMap),
+ lists:usort([ {Beg, false} || {_,#part{b=Beg}} <- RefPartMap])},
+ case fabric_util:receive_loop(
+ RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of
+ {ok, _Results} ->
partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
- {ok, #db{name=DbName}};
- Other -> {error, Other}
+ ok;
+ Error -> Error
end.
@@ -43,68 +44,29 @@ send_create_calls(DbName, Options, Fullmap) ->
{Ref, Part}
end, Fullmap).
-%% @doc create_db receive loop
-%% Acc is either an accumulation of responses, or if we've received all
-%% responses, it's {ok, Responses}
--spec create_db_loop([ref_part_map()], tref(), beg_acc()) ->
- beg_acc() | {ok, beg_acc()}.
-create_db_loop(_,_,{ok, Acc}) -> {ok, Acc};
-create_db_loop(RefPartMap, TimeoutRef, AccIn) ->
- receive
- {Ref, {ok, MainPid}} when is_reference(Ref) ->
- % for dev only, close the Fd TODO: remove me
- gen_server:call({couch_server, node(MainPid)}, {force_close, MainPid}),
-
- AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok),
- create_db_loop(RefPartMap, TimeoutRef, AccOut);
- {Ref, Reply} when is_reference(Ref) ->
- AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply),
- create_db_loop(RefPartMap, TimeoutRef, AccOut);
- {timeout, TimeoutRef} ->
- {error, timeout}
- end.
-
-%% @doc check the results (beginning of each partition range) of the create
-%% replies. If we have a good reply from each partition, return ok
--spec create_results(beg_acc(), [ref_part_map()]) -> ok | create_quorum_error.
-create_results(Results, RefPartMap) ->
- ResultBegParts = create_result(Results, []),
- DistinctBegParts = distinct_parts(RefPartMap),
+handle_create_msg(_, file_exists, _) ->
+ {error, file_exists};
+handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) ->
+ {ok, {Complete, N-1, Parts}};
+handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) ->
if
- ResultBegParts =:= DistinctBegParts -> ok;
- true ->
- ?debugFmt("~nResultBegParts: ~p~nDistinctBegParts: ~p~n",
- [ResultBegParts, DistinctBegParts]),
- create_quorum_error
- end.
-
--spec create_result(beg_acc(), [part()]) -> [part()] | file_exists.
-create_result([], Acc) ->
- lists:usort(Acc);
-create_result([{#part{b=Beg}, ok}|Rest], Acc) ->
- create_result(Rest, [Beg|Acc]);
-create_result([{_, {error, file_exists}}|_Rest], _Acc) ->
- {error, file_exists}; % if any replies were file_exists, return that
-create_result([{_, Result}|Rest], Acc) ->
- showroom_log:message(error, "create_db error: ~p", [Result]),
- create_result(Rest, Acc).
+ Complete -> {stop, ok};
+ true -> {error, create_db_fubar}
+ end;
+handle_create_msg(_, _, {true, 1, _Acc}) ->
+ {stop, ok};
+handle_create_msg({_, #part{b=Beg}}, {ok, _}, {false, 1, PartResults0}) ->
+ PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
+ case is_complete(PartResults) of
+ true -> {stop, ok};
+ false -> {error, create_db_fubar}
+ end;
+handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) ->
+ {ok, {true, N-1, Parts}};
+handle_create_msg({_Ref, #part{b=Beg}}, {ok, _}, {false, Rem, PartResults0}) ->
+ PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
+ {ok, {is_complete(PartResults), Rem-1, PartResults}}.
-check_all_parts(Ref, RefPartMap, Acc, Reply) ->
- case couch_util:get_value(Ref, RefPartMap) of
- #part{} = Part ->
- case lists:keyfind(Part, 1, Acc) of
- true -> Acc; % already present... that's odd
- _ ->
- NewAcc = [{Part, Reply} | Acc],
- case length(NewAcc) >= length(RefPartMap) of
- true -> {ok, NewAcc};
- _ -> NewAcc
- end
- end;
- _ -> Acc % ignore a non-matching Ref
- end.
-distinct_parts(RefPartMap) ->
- {_Refs, Parts} = lists:unzip(RefPartMap),
- BegParts = lists:map(fun(#part{b=Beg}) -> Beg end, Parts),
- lists:usort(BegParts).
+is_complete(List) ->
+ lists:all(fun({_,Bool}) -> Bool end, List).
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index fd54827c..0a034d59 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -1,49 +1,30 @@
-module(fabric_rpc).
--export([open_doc/4, get_doc_info/1]).
--export([receive_loop/3]).
+-export([open_doc/4, get_db_info/1]).
--include("../../dynomite/include/membership.hrl").
+%% rpc endpoints
+%% call to with_db will supply your M:F with a #db{} and then remaining args
open_doc(DbName, DocId, Revs, Options) ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- try
- couch_api:open_doc(Db, DocId, Revs, Options)
- after
- couch_db:close(Db)
- end;
- {not_found, no_db_file} ->
- throw({not_found, <<"The database does not exist.">>});
- Error ->
- throw(Error)
- end.
+ with_db(DbName, {couch_api, open_doc, [DocId, Revs, Options]}).
+
+get_db_info(DbName) ->
+ with_db(DbName, {couch_db, get_db_info, []}).
-get_doc_info(DbName) ->
+%%
+%% internal
+%%
+
+with_db(DbName, {M,F,A}) ->
case couch_db:open(DbName, []) of
{ok, Db} ->
- try
- couch_db:get_db_info(Db)
- after
- couch_db:close(Db)
- end;
- {not_found, no_db_file} ->
- throw({not_found, <<"The database does not exist.">>});
+ rexi:reply(apply(M, F, [Db | A]));
Error ->
- throw(Error)
+ rexi:reply(Error)
end.
+
%%
%% helper funs
%%
-
-%% @doc set up the receive loop with an overall timeout
--spec receive_loop([ref_part_map()], integer(), function()) -> {ok, beg_acc()}.
-receive_loop(RefPartMap, Timeout, Loop) ->
- TimeoutRef = erlang:make_ref(),
- {ok, TRef} = timer:send_after(Timeout, {timeout, TimeoutRef}),
- Results = Loop(RefPartMap, TimeoutRef, []),
- timer:cancel(TRef),
- Results.
-%{Status, Info} = couch_db:get_db_info(Shard),
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
new file mode 100644
index 00000000..8eeaee33
--- /dev/null
+++ b/src/fabric_util.erl
@@ -0,0 +1,49 @@
+-module(fabric_util).
+
+-export([receive_loop/6]).
+
+-include("../../dynomite/include/membership.hrl").
+
+
+%% @doc set up the receive loop with an overall timeout
+-spec receive_loop([ref_part_map()], integer(), function(), any(),
+ integer(), integer()) ->
+ {ok, beg_acc()}.
+receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
+ TimeoutRef = erlang:make_ref(),
+ {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),
+ try
+ process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
+ after
+ timer:cancel(TRef)
+ end.
+
+process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
+ {ok, Acc} ->
+ process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
+ {stop, Acc} ->
+ {ok, Acc};
+ Error ->
+ Error
+ end.
+
+process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ receive
+ {timeout, TimeoutRef} ->
+ timeout;
+ {Ref, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ RefPart ->
+ % call the Fun that understands the message
+ Fun(RefPart, Msg, Acc0)
+ end;
+ {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
+ showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]),
+ Fun(nil, Msg, Acc0)
+ after PerMsgTO ->
+ timeout
+ end.