summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-05-27 16:02:40 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-27 16:02:40 -0400
commit591eb31af0ad55e528358608f352474283b47430 (patch)
tree11d6e770883a93337e3710468e7523d37fe5882c
parent6cba6c59340b1a6b359efecb1856d20cd0343db8 (diff)
work on get_db_info, conflict resovled for fabric_rpc
-rw-r--r--src/fabric_create.erl4
-rw-r--r--src/fabric_info.erl67
-rw-r--r--src/fabric_rpc.erl5
-rw-r--r--src/fabric_util.erl2
4 files changed, 46 insertions, 32 deletions
diff --git a/src/fabric_create.erl b/src/fabric_create.erl
index 17c85b6a..79bf1040 100644
--- a/src/fabric_create.erl
+++ b/src/fabric_create.erl
@@ -19,8 +19,8 @@ create_db(DbName, Options) ->
Fullmap = partitions:fullmap(DbName, Options),
{ok, FullNodes} = mem3:fullnodes(),
RefPartMap = send_create_calls(DbName, Options, Fullmap),
- Acc0 = {false, length(RefPartMap),
- lists:usort([ {Beg, false} || {_,#shard{range=[Beg,_]}} <- RefPartMap])},
+ Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
+ {_,#shard{range=[Beg,_]}} <- RefPartMap])},
case fabric_util:receive_loop(
RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of
{ok, _Results} ->
diff --git a/src/fabric_info.erl b/src/fabric_info.erl
index 662c06b6..cb32eac0 100644
--- a/src/fabric_info.erl
+++ b/src/fabric_info.erl
@@ -25,13 +25,19 @@ all_databases(Customer) ->
{ok, Dbs}.
%% @doc get database information tuple
-get_db_info(DbName, _Customer) ->
+get_db_info(DbName, Customer) ->
+ Name = cloudant_db_name(Customer, DbName),
Parts = partitions:all_parts(DbName),
RefPartMap = send_info_calls(DbName, Parts),
- {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000, fun info_loop/3),
- InfoList = Results,
- % process_infos(ShardInfos, [{db_name, Name}]);
- {ok, InfoList}.
+ Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, nil} ||
+ {_,#shard{range=[Beg,_]}} <- RefPartMap])},
+ case fabric_util:receive_loop(
+ RefPartMap, 1, fun handle_info_msg/3, Acc0, 5000, infinity) of
+ {ok, ShardInfos} ->
+ {ok, process_infos(ShardInfos, [{db_name, Name}])};
+ Error -> Error
+ end.
+
%% =====================
%% internal
@@ -46,37 +52,38 @@ new_acc(DbName, Acc) ->
send_info_calls(DbName, Parts) ->
lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) ->
ShardName = showroom_utils:shard_name(Beg, DbName),
- Ref = rexi:cast(Node, {rexi_rpc, get_db_info, ShardName}),
+ Ref = rexi:cast(Node, {fabric_rpc, get_db_info, ShardName}),
{Ref, Part}
end, Parts).
-%% @doc create_db receive loop
-%% Acc is either an accumulation of responses, or if we've received all
-%% responses, it's {ok, Responses}
--spec info_loop([ref_part_map()], tref(), beg_acc()) ->
- beg_acc() | {ok, beg_acc()}.
-info_loop(_,_,{ok, Acc}) -> {ok, Acc};
-info_loop(RefPartMap, TimeoutRef, AccIn) ->
- receive
- {Ref, {ok, Info}} when is_reference(Ref) ->
- %AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok),
- %info_loop(RefPartMap, TimeoutRef, AccOut);
- ok;
- {Ref, Reply} when is_reference(Ref) ->
- %AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply),
- %info_loop(RefPartMap, TimeoutRef, AccOut);
- ok;
- {timeout, TimeoutRef} ->
- {error, timeout}
- end.
-
+handle_info_msg(_, _, {true, _, Infos0}) ->
+ {stop, Infos0};
+handle_info_msg(_, _, {false, 1, Infos0}) ->
+ MissingShards = lists:keyfind(nil, 2, Infos0),
+ ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]),
+ {error, get_db_info};
+handle_info_msg({_,#shard{range=[Beg,_]}}, {ok, Info}, {false, N, Infos0}) ->
+ case couch_util:get_value(Beg, Infos0) of
+ nil ->
+ Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}),
+ case is_complete(Infos) of
+ true -> {ok, {true, N-1, Infos}};
+ false -> {ok, {false, N-1, Infos}}
+ end;
+ _ ->
+ {ok, {false, N-1, Infos0}}
+ end;
+handle_info_msg(_, _Other, {Complete, N, Infos0}) ->
+ {ok, {Complete, N-1, Infos0}}.
+
+
+is_complete(List) ->
+ not lists:any(fun({_,nil}) -> true end, List).
cloudant_db_name(Customer, FullName) ->
case Customer of
- "" ->
- FullName;
- Name ->
- re:replace(FullName, [Name,"/"], "", [{return, binary}])
+ "" -> FullName;
+ Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}])
end.
%% Loop through Tasks on the flattened Infos and get the aggregated result
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 49809145..54f3b338 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -2,13 +2,18 @@
-export([open_doc/3, open_doc/4, get_db_info/1]).
+-include_lib("eunit/include/eunit.hrl").
open_doc(DbName, DocId, Options) ->
with_db(DbName, {couch_db, open_doc, [DocId, Options]}).
+%% rpc endpoints
+%% call to with_db will supply your M:F with a #db{} and then remaining args
+
open_doc(DbName, DocId, Revs, Options) ->
with_db(DbName, {couch_api, open_doc, [DocId, Revs, Options]}).
get_db_info(DbName) ->
+ ?debugHere,
with_db(DbName, {couch_db, get_db_info, []}).
%%
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 4b2f611a..347670a4 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -3,6 +3,7 @@
-export([submit_jobs/3, recv/4, receive_loop/4, receive_loop/6]).
-include("../../dynomite/include/membership.hrl").
+-include_lib("eunit/include/eunit.hrl").
submit_jobs(Shards, EndPoint, ExtraArgs) ->
lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
@@ -55,6 +56,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
{ok, Acc0};
RefPart ->
% call the Fun that understands the message
+ ?debugFmt("~nAcc0: ~p~n", [Acc0]),
Fun(RefPart, Msg, Acc0)
end;
{rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->