diff options
author | Brad Anderson <brad@cloudant.com> | 2010-05-27 16:02:40 -0400 |
---|---|---|
committer | Brad Anderson <brad@cloudant.com> | 2010-05-27 16:02:40 -0400 |
commit | 591eb31af0ad55e528358608f352474283b47430 (patch) | |
tree | 11d6e770883a93337e3710468e7523d37fe5882c | |
parent | 6cba6c59340b1a6b359efecb1856d20cd0343db8 (diff) |
work on get_db_info, conflict resovled for fabric_rpc
-rw-r--r-- | src/fabric_create.erl | 4 | ||||
-rw-r--r-- | src/fabric_info.erl | 67 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 5 | ||||
-rw-r--r-- | src/fabric_util.erl | 2 |
4 files changed, 46 insertions, 32 deletions
diff --git a/src/fabric_create.erl b/src/fabric_create.erl index 17c85b6a..79bf1040 100644 --- a/src/fabric_create.erl +++ b/src/fabric_create.erl @@ -19,8 +19,8 @@ create_db(DbName, Options) -> Fullmap = partitions:fullmap(DbName, Options), {ok, FullNodes} = mem3:fullnodes(), RefPartMap = send_create_calls(DbName, Options, Fullmap), - Acc0 = {false, length(RefPartMap), - lists:usort([ {Beg, false} || {_,#shard{range=[Beg,_]}} <- RefPartMap])}, + Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || + {_,#shard{range=[Beg,_]}} <- RefPartMap])}, case fabric_util:receive_loop( RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of {ok, _Results} -> diff --git a/src/fabric_info.erl b/src/fabric_info.erl index 662c06b6..cb32eac0 100644 --- a/src/fabric_info.erl +++ b/src/fabric_info.erl @@ -25,13 +25,19 @@ all_databases(Customer) -> {ok, Dbs}. %% @doc get database information tuple -get_db_info(DbName, _Customer) -> +get_db_info(DbName, Customer) -> + Name = cloudant_db_name(Customer, DbName), Parts = partitions:all_parts(DbName), RefPartMap = send_info_calls(DbName, Parts), - {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000, fun info_loop/3), - InfoList = Results, - % process_infos(ShardInfos, [{db_name, Name}]); - {ok, InfoList}. + Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, nil} || + {_,#shard{range=[Beg,_]}} <- RefPartMap])}, + case fabric_util:receive_loop( + RefPartMap, 1, fun handle_info_msg/3, Acc0, 5000, infinity) of + {ok, ShardInfos} -> + {ok, process_infos(ShardInfos, [{db_name, Name}])}; + Error -> Error + end. + %% ===================== %% internal @@ -46,37 +52,38 @@ new_acc(DbName, Acc) -> send_info_calls(DbName, Parts) -> lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) -> ShardName = showroom_utils:shard_name(Beg, DbName), - Ref = rexi:cast(Node, {rexi_rpc, get_db_info, ShardName}), + Ref = rexi:cast(Node, {fabric_rpc, get_db_info, ShardName}), {Ref, Part} end, Parts). -%% @doc create_db receive loop -%% Acc is either an accumulation of responses, or if we've received all -%% responses, it's {ok, Responses} --spec info_loop([ref_part_map()], tref(), beg_acc()) -> - beg_acc() | {ok, beg_acc()}. -info_loop(_,_,{ok, Acc}) -> {ok, Acc}; -info_loop(RefPartMap, TimeoutRef, AccIn) -> - receive - {Ref, {ok, Info}} when is_reference(Ref) -> - %AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok), - %info_loop(RefPartMap, TimeoutRef, AccOut); - ok; - {Ref, Reply} when is_reference(Ref) -> - %AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply), - %info_loop(RefPartMap, TimeoutRef, AccOut); - ok; - {timeout, TimeoutRef} -> - {error, timeout} - end. - +handle_info_msg(_, _, {true, _, Infos0}) -> + {stop, Infos0}; +handle_info_msg(_, _, {false, 1, Infos0}) -> + MissingShards = lists:keyfind(nil, 2, Infos0), + ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]), + {error, get_db_info}; +handle_info_msg({_,#shard{range=[Beg,_]}}, {ok, Info}, {false, N, Infos0}) -> + case couch_util:get_value(Beg, Infos0) of + nil -> + Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}), + case is_complete(Infos) of + true -> {ok, {true, N-1, Infos}}; + false -> {ok, {false, N-1, Infos}} + end; + _ -> + {ok, {false, N-1, Infos0}} + end; +handle_info_msg(_, _Other, {Complete, N, Infos0}) -> + {ok, {Complete, N-1, Infos0}}. + + +is_complete(List) -> + not lists:any(fun({_,nil}) -> true end, List). cloudant_db_name(Customer, FullName) -> case Customer of - "" -> - FullName; - Name -> - re:replace(FullName, [Name,"/"], "", [{return, binary}]) + "" -> FullName; + Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}]) end. %% Loop through Tasks on the flattened Infos and get the aggregated result diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 49809145..54f3b338 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -2,13 +2,18 @@ -export([open_doc/3, open_doc/4, get_db_info/1]). +-include_lib("eunit/include/eunit.hrl"). open_doc(DbName, DocId, Options) -> with_db(DbName, {couch_db, open_doc, [DocId, Options]}). +%% rpc endpoints +%% call to with_db will supply your M:F with a #db{} and then remaining args + open_doc(DbName, DocId, Revs, Options) -> with_db(DbName, {couch_api, open_doc, [DocId, Revs, Options]}). get_db_info(DbName) -> + ?debugHere, with_db(DbName, {couch_db, get_db_info, []}). %% diff --git a/src/fabric_util.erl b/src/fabric_util.erl index 4b2f611a..347670a4 100644 --- a/src/fabric_util.erl +++ b/src/fabric_util.erl @@ -3,6 +3,7 @@ -export([submit_jobs/3, recv/4, receive_loop/4, receive_loop/6]). -include("../../dynomite/include/membership.hrl"). +-include_lib("eunit/include/eunit.hrl"). submit_jobs(Shards, EndPoint, ExtraArgs) -> lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> @@ -55,6 +56,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> {ok, Acc0}; RefPart -> % call the Fun that understands the message + ?debugFmt("~nAcc0: ~p~n", [Acc0]), Fun(RefPart, Msg, Acc0) end; {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> |