diff options
author | Brad Anderson <brad@cloudant.com> | 2010-05-26 15:21:51 -0400 |
---|---|---|
committer | Brad Anderson <brad@cloudant.com> | 2010-05-26 15:21:51 -0400 |
commit | 7801b1fa879a752d11d538f35ee4905609779801 (patch) | |
tree | e5cd9d9aced0721e14ea098059e0746baa6be3ed | |
parent | 0b8e2d0bb9bed5bac6302c8f22737c37a02bcc68 (diff) |
rest of recent fabric changes. switch to #part{} record, add get_db_info work (not functional), and change others around to use new fullmap
-rw-r--r-- | ebin/fabric.app | 3 | ||||
-rw-r--r-- | src/fabric.erl | 6 | ||||
-rw-r--r-- | src/fabric_api.erl | 2 | ||||
-rw-r--r-- | src/fabric_create.erl | 114 | ||||
-rw-r--r-- | src/fabric_info.erl | 104 | ||||
-rw-r--r-- | src/fabric_open.erl | 21 |
6 files changed, 148 insertions, 102 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app index 76dff491..73943af5 100644 --- a/ebin/fabric.app +++ b/ebin/fabric.app @@ -8,7 +8,8 @@ fabric_api, fabric_create, fabric_delete, - fabric_info + fabric_info, + fabric_open ]}, {registered, []}, {included_applications, []}, diff --git a/src/fabric.erl b/src/fabric.erl index 5aefed5b..a54264dc 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -1,6 +1,7 @@ -module(fabric). --export([all_databases/1, create_db/2, delete_db/2, open_db/2, open_doc/4]). +-export([all_databases/1, create_db/2, delete_db/2, open_db/2, open_doc/4, + get_db_info/2]). %% maybe this goes away, and these are called directly in their own modules in %% fabric_api ?? @@ -19,3 +20,6 @@ open_db(DbName, Options) -> open_doc(Db, DocId, Revs, Options) -> fabric_open:open_doc(Db, DocId, Revs, Options). + +get_db_info(DbName, Customer) -> + fabric_info:get_db_info(DbName, Customer). diff --git a/src/fabric_api.erl b/src/fabric_api.erl index e399e048..53dc96e3 100644 --- a/src/fabric_api.erl +++ b/src/fabric_api.erl @@ -43,7 +43,7 @@ close_db(Db) -> -spec get_db_info(#db{}, bstring()) -> {ok, [{atom(), any()}]}. get_db_info(Db, Customer) -> - showroom_db:get_db_info(Db, Customer). + fabric:get_db_info(Db, Customer). -spec replicate_db(ejson(), #user_ctx{}) -> {ok, ejson()}. replicate_db(PostBody, UserCtx) -> diff --git a/src/fabric_create.erl b/src/fabric_create.erl index 986e4c2a..1143d080 100644 --- a/src/fabric_create.erl +++ b/src/fabric_create.erl @@ -18,9 +18,10 @@ create_db(DbName, Options) -> Fullmap = partitions:fullmap(DbName, Options), {ok, FullNodes} = mem3:fullnodes(), - RefNodePart = send_create_calls(DbName, Options, Fullmap), - {ok, Results} = create_db_loop(RefNodePart), - case create_results(Results, RefNodePart) of + RefPartMap = send_create_calls(DbName, Options, Fullmap), + {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000, + fun create_db_loop/3), + case create_results(Results, RefPartMap) of ok -> partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), {ok, #db{name=DbName}}; @@ -33,78 +34,69 @@ create_db(DbName, Options) -> %% ===================== %% @doc create the partitions on all appropriate nodes (rexi calls) --spec send_create_calls(binary(), list(), [mem_node()]) -> [{reference(), np()}]. +-spec send_create_calls(binary(), list(), fullmap()) -> [{reference(), part()}]. send_create_calls(DbName, Options, Fullmap) -> - lists:map(fun({Node, Part}) -> - ShardName = showroom_utils:shard_name(Part, DbName), + lists:map(fun(#part{node=Node, b=Beg} = Part) -> + ShardName = showroom_utils:shard_name(Beg, DbName), Ref = rexi:async_server_call({couch_server, Node}, {create, ShardName, Options}), - {Ref, {Node, Part}} + {Ref, Part} end, Fullmap). -%% @doc set up the receive loop with an overall timeout --spec create_db_loop([ref_node_part()]) -> {ok, np_acc()}. -create_db_loop(RefNodePart) -> - TimeoutRef = erlang:make_ref(), - {ok, TRef} = timer:send_after(5000, {timeout, TimeoutRef}), - Results = create_db_loop(RefNodePart, TimeoutRef, []), - timer:cancel(TRef), - Results. - %% @doc create_db receive loop %% Acc is either an accumulation of responses, or if we've received all %% responses, it's {ok, Responses} --spec create_db_loop([ref_node_part()], tref(), np_acc()) -> - np_acc() | {ok, np_acc()}. +-spec create_db_loop([ref_part_map()], tref(), beg_acc()) -> + beg_acc() | {ok, beg_acc()}. create_db_loop(_,_,{ok, Acc}) -> {ok, Acc}; -create_db_loop(RefNodePart, TimeoutRef, AccIn) -> +create_db_loop(RefPartMap, TimeoutRef, AccIn) -> receive {Ref, {ok, MainPid}} when is_reference(Ref) -> % for dev only, close the Fd TODO: remove me gen_server:call({couch_server, node(MainPid)}, {force_close, MainPid}), - AccOut = check_all_parts(Ref, RefNodePart, AccIn, ok), - create_db_loop(RefNodePart, TimeoutRef, AccOut); + AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok), + create_db_loop(RefPartMap, TimeoutRef, AccOut); {Ref, Reply} when is_reference(Ref) -> - AccOut = check_all_parts(Ref, RefNodePart, AccIn, Reply), - create_db_loop(RefNodePart, TimeoutRef, AccOut); + AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply), + create_db_loop(RefPartMap, TimeoutRef, AccOut); {timeout, TimeoutRef} -> {error, timeout} end. -%% @doc check the results of the create replies -%% If we have a good reply from each partition, return ok --spec create_results(np_acc(), [ref_node_part()]) -> ok | create_quorum_error. -create_results(Results, RefNodePart) -> - ResultParts = create_result(Results, []), - DistinctParts = distinct_parts(RefNodePart), +%% @doc check the results (beginning of each partition range) of the create +%% replies. If we have a good reply from each partition, return ok +-spec create_results(beg_acc(), [ref_part_map()]) -> ok | create_quorum_error. +create_results(Results, RefPartMap) -> + ResultBegParts = create_result(Results, []), + DistinctBegParts = distinct_parts(RefPartMap), if - ResultParts =:= DistinctParts -> ok; + ResultBegParts =:= DistinctBegParts -> ok; true -> - ?debugFmt("~nResultParts: ~p~nDistinctParts: ~p~n", - [ResultParts, DistinctParts]), + ?debugFmt("~nResultBegParts: ~p~nDistinctBegParts: ~p~n", + [ResultBegParts, DistinctBegParts]), create_quorum_error end. --spec create_result(np_acc(), [np()]) -> [np()] | file_exists. +-spec create_result(beg_acc(), [part()]) -> [part()] | file_exists. create_result([], Acc) -> lists:usort(Acc); -create_result([{{_N,P}, ok}|Rest], Acc) -> - create_result(Rest, [P|Acc]); -create_result([{_NP, {error, file_exists}}|_Rest], _Acc) -> +create_result([{#part{b=Beg}, ok}|Rest], Acc) -> + create_result(Rest, [Beg|Acc]); +create_result([{_, {error, file_exists}}|_Rest], _Acc) -> {error, file_exists}; % if any replies were file_exists, return that -create_result([{{_N,_P}, Result}|Rest], Acc) -> +create_result([{_, Result}|Rest], Acc) -> showroom_log:message(error, "create_db error: ~p", [Result]), create_result(Rest, Acc). -check_all_parts(Ref, RefNodePart, Acc, Reply) -> - case couch_util:get_value(Ref, RefNodePart) of - {Node, Part} -> - case lists:keyfind({Node, Part}, 1, Acc) of +check_all_parts(Ref, RefPartMap, Acc, Reply) -> + case couch_util:get_value(Ref, RefPartMap) of + #part{} = Part -> + case lists:keyfind(Part, 1, Acc) of true -> Acc; % already present... that's odd _ -> - NewAcc = [{{Node, Part}, Reply} | Acc], - case length(NewAcc) >= length(RefNodePart) of + NewAcc = [{Part, Reply} | Acc], + case length(NewAcc) >= length(RefPartMap) of true -> {ok, NewAcc}; _ -> NewAcc end @@ -112,35 +104,7 @@ check_all_parts(Ref, RefNodePart, Acc, Reply) -> _ -> Acc % ignore a non-matching Ref end. -%% @doc check that we have a good reply from each partition. -%% If we do, return {ok, Acc}, if we don't, return Acc of partitions -%% Three 'case' statements and one 'if', a personal best. fml -%% @end -% check_distinct_parts(Ref, RefNodePart, Acc, Msg) -> -% Parts = distinct_parts(RefNodePart), -% case couch_util:get_value(Ref, RefNodePart) of -% {Node, Part} -> -% case lists:member(Part, Acc) of -% true -> Acc; -% _ -> -% case Msg of -% ok -> -% NewAcc = lists:usort([Part|Acc]), -% if -% Parts =:= NewAcc -> {ok, NewAcc}; -% true -> NewAcc -% end; -% _ -> -% Hex = showroom_utils:int_to_hexstr(Part), -% showroom_log:message(error, -% "create_db reply error: ~p from ~p ~p", [Msg, Node, Hex]), -% Acc -% end -% end; -% _ -> Acc % ignore a non-matching Ref -% end. - -distinct_parts(RefNodePart) -> - {_Refs, NPs} = lists:unzip(RefNodePart), - {_Nodes, Parts} = lists:unzip(NPs), - lists:usort(Parts). +distinct_parts(RefPartMap) -> + {_Refs, Parts} = lists:unzip(RefPartMap), + BegParts = lists:map(fun(#part{b=Beg}) -> Beg end, Parts), + lists:usort(BegParts). diff --git a/src/fabric_info.erl b/src/fabric_info.erl index 95408b6f..51529da3 100644 --- a/src/fabric_info.erl +++ b/src/fabric_info.erl @@ -1,23 +1,113 @@ -module(fabric_info). --export([all_databases/1]). +-export([all_databases/1, get_db_info/2]). -include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). %% @doc gets all databases in the cluster. -spec all_databases(binary() | []) -> [binary()]. all_databases([]) -> - Dbs = ets:foldl(fun({DbName, _}, Acc0) -> - [DbName | Acc0] - end, [], dbs_cache), + Dbs = ets:foldl(fun(#part{dbname=DbName}, AccIn) -> + new_acc(DbName, AccIn) + end, [], partitions), {ok, Dbs}; all_databases(Customer) -> ?debugFmt("~nCustomer: ~p~n", [Customer]), - Dbs = ets:foldl(fun({DbName, _}, Acc0) -> + Dbs = ets:foldl(fun(#part{dbname=DbName}, AccIn) -> DbNameStr = ?b2l(DbName), case string:str(DbNameStr, Customer) of - 1 -> [DbNameStr | Acc0]; - _ -> Acc0 + 1 -> + new_acc(DbNameStr, AccIn); + _ -> AccIn end end, [], dbs_cache), {ok, Dbs}. + +%% @doc get database information tuple +get_db_info(DbName, _Customer) -> + Parts = partitions:all_parts(DbName), + RefPartMap = send_info_calls(DbName, Parts), + {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000, fun info_loop/3), + InfoList = Results, + % process_infos(ShardInfos, [{db_name, Name}]); + {ok, InfoList}. + +%% ===================== +%% internal +%% ===================== + +new_acc(DbName, Acc) -> + case lists:member(DbName, Acc) of + true -> Acc; + _ ->[DbName | Acc] + end. + +send_info_calls(DbName, Parts) -> + lists:map(fun(#part{node=Node, b=Beg} = Part) -> + ShardName = showroom_utils:shard_name(Beg, DbName), + Ref = rexi:cast(Node, {rexi_rpc, get_db_info, ShardName}), + {Ref, Part} + end, Parts). + +%% @doc create_db receive loop +%% Acc is either an accumulation of responses, or if we've received all +%% responses, it's {ok, Responses} +-spec info_loop([ref_part_map()], tref(), beg_acc()) -> + beg_acc() | {ok, beg_acc()}. +info_loop(_,_,{ok, Acc}) -> {ok, Acc}; +info_loop(RefPartMap, TimeoutRef, AccIn) -> + receive + {Ref, {ok, Info}} when is_reference(Ref) -> + AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok), + info_loop(RefPartMap, TimeoutRef, AccOut); + {Ref, Reply} when is_reference(Ref) -> + AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply), + info_loop(RefPartMap, TimeoutRef, AccOut); + {timeout, TimeoutRef} -> + {error, timeout} + end. + + +cloudant_db_name(Customer, FullName) -> + case Customer of + "" -> + FullName; + Name -> + re:replace(FullName, [Name,"/"], "", [{return, binary}]) + end. + +%% Loop through Tasks on the flattened Infos and get the aggregated result +process_infos(Infos, Initial) -> + Tasks = [ + {doc_count, fun sum/2, 0}, + {doc_del_count, fun sum/2, 0}, + {update_seq, fun max/2, 1}, + {purge_seq, fun sum/2, 0}, + {compact_running, fun bool/2, 0}, + {disk_size, fun sum/2, 0}, + {instance_start_time, fun(_, _) -> <<"0">> end, 0}, + {disk_format_version, fun max/2, 0}], + + Infos1 = lists:flatten(Infos), + + Result = lists:map(fun({Type, Fun, Default}) -> + {Type, process_info(Type, Fun, Default, Infos1)} + end, Tasks), + lists:flatten([Initial, Result]). + + process_info(Type, Fun, Default, List) -> + lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default, + couch_util:get_all_values(Type, List)). + +sum(New, Existing) -> + New + Existing. + +bool(New, Existing) -> + New andalso Existing. + +max(New, Existing) -> + case New > Existing of + true -> New; + false -> Existing + end. diff --git a/src/fabric_open.erl b/src/fabric_open.erl index a8c0204e..289a0681 100644 --- a/src/fabric_open.erl +++ b/src/fabric_open.erl @@ -23,25 +23,12 @@ % open_db(<<"S", ShardName/binary>>, Options). -open_doc(DbName, DocId, Revs, Options) -> +open_doc(DbName, DocId, _Revs, _Options) -> NPs = partitions:key_nodes_parts(DbName, DocId), ?debugFmt("~nNPs: ~p~n", [NPs]), - ok. - - -open_doc_endpoint(DbName, DocId, Revs, Options) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - try - couch_api:open_doc(Db, DocId, Revs, Options) - after - couch_db:close(Db) - end; - {not_found, no_db_file} -> - throw({not_found, <<"The database does not exist.">>}); - Error -> - throw(Error) - end. + {ok, #doc{}}. + + |