summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-05-26 15:21:51 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-26 15:21:51 -0400
commit7801b1fa879a752d11d538f35ee4905609779801 (patch)
treee5cd9d9aced0721e14ea098059e0746baa6be3ed
parent0b8e2d0bb9bed5bac6302c8f22737c37a02bcc68 (diff)
rest of recent fabric changes. switch to #part{} record, add get_db_info work (not functional), and change others around to use new fullmap
-rw-r--r--ebin/fabric.app3
-rw-r--r--src/fabric.erl6
-rw-r--r--src/fabric_api.erl2
-rw-r--r--src/fabric_create.erl114
-rw-r--r--src/fabric_info.erl104
-rw-r--r--src/fabric_open.erl21
6 files changed, 148 insertions, 102 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app
index 76dff491..73943af5 100644
--- a/ebin/fabric.app
+++ b/ebin/fabric.app
@@ -8,7 +8,8 @@
fabric_api,
fabric_create,
fabric_delete,
- fabric_info
+ fabric_info,
+ fabric_open
]},
{registered, []},
{included_applications, []},
diff --git a/src/fabric.erl b/src/fabric.erl
index 5aefed5b..a54264dc 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -1,6 +1,7 @@
-module(fabric).
--export([all_databases/1, create_db/2, delete_db/2, open_db/2, open_doc/4]).
+-export([all_databases/1, create_db/2, delete_db/2, open_db/2, open_doc/4,
+ get_db_info/2]).
%% maybe this goes away, and these are called directly in their own modules in
%% fabric_api ??
@@ -19,3 +20,6 @@ open_db(DbName, Options) ->
open_doc(Db, DocId, Revs, Options) ->
fabric_open:open_doc(Db, DocId, Revs, Options).
+
+get_db_info(DbName, Customer) ->
+ fabric_info:get_db_info(DbName, Customer).
diff --git a/src/fabric_api.erl b/src/fabric_api.erl
index e399e048..53dc96e3 100644
--- a/src/fabric_api.erl
+++ b/src/fabric_api.erl
@@ -43,7 +43,7 @@ close_db(Db) ->
-spec get_db_info(#db{}, bstring()) -> {ok, [{atom(), any()}]}.
get_db_info(Db, Customer) ->
- showroom_db:get_db_info(Db, Customer).
+ fabric:get_db_info(Db, Customer).
-spec replicate_db(ejson(), #user_ctx{}) -> {ok, ejson()}.
replicate_db(PostBody, UserCtx) ->
diff --git a/src/fabric_create.erl b/src/fabric_create.erl
index 986e4c2a..1143d080 100644
--- a/src/fabric_create.erl
+++ b/src/fabric_create.erl
@@ -18,9 +18,10 @@
create_db(DbName, Options) ->
Fullmap = partitions:fullmap(DbName, Options),
{ok, FullNodes} = mem3:fullnodes(),
- RefNodePart = send_create_calls(DbName, Options, Fullmap),
- {ok, Results} = create_db_loop(RefNodePart),
- case create_results(Results, RefNodePart) of
+ RefPartMap = send_create_calls(DbName, Options, Fullmap),
+ {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000,
+ fun create_db_loop/3),
+ case create_results(Results, RefPartMap) of
ok ->
partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
{ok, #db{name=DbName}};
@@ -33,78 +34,69 @@ create_db(DbName, Options) ->
%% =====================
%% @doc create the partitions on all appropriate nodes (rexi calls)
--spec send_create_calls(binary(), list(), [mem_node()]) -> [{reference(), np()}].
+-spec send_create_calls(binary(), list(), fullmap()) -> [{reference(), part()}].
send_create_calls(DbName, Options, Fullmap) ->
- lists:map(fun({Node, Part}) ->
- ShardName = showroom_utils:shard_name(Part, DbName),
+ lists:map(fun(#part{node=Node, b=Beg} = Part) ->
+ ShardName = showroom_utils:shard_name(Beg, DbName),
Ref = rexi:async_server_call({couch_server, Node},
{create, ShardName, Options}),
- {Ref, {Node, Part}}
+ {Ref, Part}
end, Fullmap).
-%% @doc set up the receive loop with an overall timeout
--spec create_db_loop([ref_node_part()]) -> {ok, np_acc()}.
-create_db_loop(RefNodePart) ->
- TimeoutRef = erlang:make_ref(),
- {ok, TRef} = timer:send_after(5000, {timeout, TimeoutRef}),
- Results = create_db_loop(RefNodePart, TimeoutRef, []),
- timer:cancel(TRef),
- Results.
-
%% @doc create_db receive loop
%% Acc is either an accumulation of responses, or if we've received all
%% responses, it's {ok, Responses}
--spec create_db_loop([ref_node_part()], tref(), np_acc()) ->
- np_acc() | {ok, np_acc()}.
+-spec create_db_loop([ref_part_map()], tref(), beg_acc()) ->
+ beg_acc() | {ok, beg_acc()}.
create_db_loop(_,_,{ok, Acc}) -> {ok, Acc};
-create_db_loop(RefNodePart, TimeoutRef, AccIn) ->
+create_db_loop(RefPartMap, TimeoutRef, AccIn) ->
receive
{Ref, {ok, MainPid}} when is_reference(Ref) ->
% for dev only, close the Fd TODO: remove me
gen_server:call({couch_server, node(MainPid)}, {force_close, MainPid}),
- AccOut = check_all_parts(Ref, RefNodePart, AccIn, ok),
- create_db_loop(RefNodePart, TimeoutRef, AccOut);
+ AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok),
+ create_db_loop(RefPartMap, TimeoutRef, AccOut);
{Ref, Reply} when is_reference(Ref) ->
- AccOut = check_all_parts(Ref, RefNodePart, AccIn, Reply),
- create_db_loop(RefNodePart, TimeoutRef, AccOut);
+ AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply),
+ create_db_loop(RefPartMap, TimeoutRef, AccOut);
{timeout, TimeoutRef} ->
{error, timeout}
end.
-%% @doc check the results of the create replies
-%% If we have a good reply from each partition, return ok
--spec create_results(np_acc(), [ref_node_part()]) -> ok | create_quorum_error.
-create_results(Results, RefNodePart) ->
- ResultParts = create_result(Results, []),
- DistinctParts = distinct_parts(RefNodePart),
+%% @doc check the results (beginning of each partition range) of the create
+%% replies. If we have a good reply from each partition, return ok
+-spec create_results(beg_acc(), [ref_part_map()]) -> ok | create_quorum_error.
+create_results(Results, RefPartMap) ->
+ ResultBegParts = create_result(Results, []),
+ DistinctBegParts = distinct_parts(RefPartMap),
if
- ResultParts =:= DistinctParts -> ok;
+ ResultBegParts =:= DistinctBegParts -> ok;
true ->
- ?debugFmt("~nResultParts: ~p~nDistinctParts: ~p~n",
- [ResultParts, DistinctParts]),
+ ?debugFmt("~nResultBegParts: ~p~nDistinctBegParts: ~p~n",
+ [ResultBegParts, DistinctBegParts]),
create_quorum_error
end.
--spec create_result(np_acc(), [np()]) -> [np()] | file_exists.
+-spec create_result(beg_acc(), [part()]) -> [part()] | file_exists.
create_result([], Acc) ->
lists:usort(Acc);
-create_result([{{_N,P}, ok}|Rest], Acc) ->
- create_result(Rest, [P|Acc]);
-create_result([{_NP, {error, file_exists}}|_Rest], _Acc) ->
+create_result([{#part{b=Beg}, ok}|Rest], Acc) ->
+ create_result(Rest, [Beg|Acc]);
+create_result([{_, {error, file_exists}}|_Rest], _Acc) ->
{error, file_exists}; % if any replies were file_exists, return that
-create_result([{{_N,_P}, Result}|Rest], Acc) ->
+create_result([{_, Result}|Rest], Acc) ->
showroom_log:message(error, "create_db error: ~p", [Result]),
create_result(Rest, Acc).
-check_all_parts(Ref, RefNodePart, Acc, Reply) ->
- case couch_util:get_value(Ref, RefNodePart) of
- {Node, Part} ->
- case lists:keyfind({Node, Part}, 1, Acc) of
+check_all_parts(Ref, RefPartMap, Acc, Reply) ->
+ case couch_util:get_value(Ref, RefPartMap) of
+ #part{} = Part ->
+ case lists:keyfind(Part, 1, Acc) of
true -> Acc; % already present... that's odd
_ ->
- NewAcc = [{{Node, Part}, Reply} | Acc],
- case length(NewAcc) >= length(RefNodePart) of
+ NewAcc = [{Part, Reply} | Acc],
+ case length(NewAcc) >= length(RefPartMap) of
true -> {ok, NewAcc};
_ -> NewAcc
end
@@ -112,35 +104,7 @@ check_all_parts(Ref, RefNodePart, Acc, Reply) ->
_ -> Acc % ignore a non-matching Ref
end.
-%% @doc check that we have a good reply from each partition.
-%% If we do, return {ok, Acc}, if we don't, return Acc of partitions
-%% Three 'case' statements and one 'if', a personal best. fml
-%% @end
-% check_distinct_parts(Ref, RefNodePart, Acc, Msg) ->
-% Parts = distinct_parts(RefNodePart),
-% case couch_util:get_value(Ref, RefNodePart) of
-% {Node, Part} ->
-% case lists:member(Part, Acc) of
-% true -> Acc;
-% _ ->
-% case Msg of
-% ok ->
-% NewAcc = lists:usort([Part|Acc]),
-% if
-% Parts =:= NewAcc -> {ok, NewAcc};
-% true -> NewAcc
-% end;
-% _ ->
-% Hex = showroom_utils:int_to_hexstr(Part),
-% showroom_log:message(error,
-% "create_db reply error: ~p from ~p ~p", [Msg, Node, Hex]),
-% Acc
-% end
-% end;
-% _ -> Acc % ignore a non-matching Ref
-% end.
-
-distinct_parts(RefNodePart) ->
- {_Refs, NPs} = lists:unzip(RefNodePart),
- {_Nodes, Parts} = lists:unzip(NPs),
- lists:usort(Parts).
+distinct_parts(RefPartMap) ->
+ {_Refs, Parts} = lists:unzip(RefPartMap),
+ BegParts = lists:map(fun(#part{b=Beg}) -> Beg end, Parts),
+ lists:usort(BegParts).
diff --git a/src/fabric_info.erl b/src/fabric_info.erl
index 95408b6f..51529da3 100644
--- a/src/fabric_info.erl
+++ b/src/fabric_info.erl
@@ -1,23 +1,113 @@
-module(fabric_info).
--export([all_databases/1]).
+-export([all_databases/1, get_db_info/2]).
-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
%% @doc gets all databases in the cluster.
-spec all_databases(binary() | []) -> [binary()].
all_databases([]) ->
- Dbs = ets:foldl(fun({DbName, _}, Acc0) ->
- [DbName | Acc0]
- end, [], dbs_cache),
+ Dbs = ets:foldl(fun(#part{dbname=DbName}, AccIn) ->
+ new_acc(DbName, AccIn)
+ end, [], partitions),
{ok, Dbs};
all_databases(Customer) ->
?debugFmt("~nCustomer: ~p~n", [Customer]),
- Dbs = ets:foldl(fun({DbName, _}, Acc0) ->
+ Dbs = ets:foldl(fun(#part{dbname=DbName}, AccIn) ->
DbNameStr = ?b2l(DbName),
case string:str(DbNameStr, Customer) of
- 1 -> [DbNameStr | Acc0];
- _ -> Acc0
+ 1 ->
+ new_acc(DbNameStr, AccIn);
+ _ -> AccIn
end
end, [], dbs_cache),
{ok, Dbs}.
+
+%% @doc get database information tuple
+get_db_info(DbName, _Customer) ->
+ Parts = partitions:all_parts(DbName),
+ RefPartMap = send_info_calls(DbName, Parts),
+ {ok, Results} = fabric_rpc:receive_loop(RefPartMap, 5000, fun info_loop/3),
+ InfoList = Results,
+ % process_infos(ShardInfos, [{db_name, Name}]);
+ {ok, InfoList}.
+
+%% =====================
+%% internal
+%% =====================
+
+new_acc(DbName, Acc) ->
+ case lists:member(DbName, Acc) of
+ true -> Acc;
+ _ ->[DbName | Acc]
+ end.
+
+send_info_calls(DbName, Parts) ->
+ lists:map(fun(#part{node=Node, b=Beg} = Part) ->
+ ShardName = showroom_utils:shard_name(Beg, DbName),
+ Ref = rexi:cast(Node, {rexi_rpc, get_db_info, ShardName}),
+ {Ref, Part}
+ end, Parts).
+
+%% @doc create_db receive loop
+%% Acc is either an accumulation of responses, or if we've received all
+%% responses, it's {ok, Responses}
+-spec info_loop([ref_part_map()], tref(), beg_acc()) ->
+ beg_acc() | {ok, beg_acc()}.
+info_loop(_,_,{ok, Acc}) -> {ok, Acc};
+info_loop(RefPartMap, TimeoutRef, AccIn) ->
+ receive
+ {Ref, {ok, Info}} when is_reference(Ref) ->
+ AccOut = check_all_parts(Ref, RefPartMap, AccIn, ok),
+ info_loop(RefPartMap, TimeoutRef, AccOut);
+ {Ref, Reply} when is_reference(Ref) ->
+ AccOut = check_all_parts(Ref, RefPartMap, AccIn, Reply),
+ info_loop(RefPartMap, TimeoutRef, AccOut);
+ {timeout, TimeoutRef} ->
+ {error, timeout}
+ end.
+
+
+cloudant_db_name(Customer, FullName) ->
+ case Customer of
+ "" ->
+ FullName;
+ Name ->
+ re:replace(FullName, [Name,"/"], "", [{return, binary}])
+ end.
+
+%% Loop through Tasks on the flattened Infos and get the aggregated result
+process_infos(Infos, Initial) ->
+ Tasks = [
+ {doc_count, fun sum/2, 0},
+ {doc_del_count, fun sum/2, 0},
+ {update_seq, fun max/2, 1},
+ {purge_seq, fun sum/2, 0},
+ {compact_running, fun bool/2, 0},
+ {disk_size, fun sum/2, 0},
+ {instance_start_time, fun(_, _) -> <<"0">> end, 0},
+ {disk_format_version, fun max/2, 0}],
+
+ Infos1 = lists:flatten(Infos),
+
+ Result = lists:map(fun({Type, Fun, Default}) ->
+ {Type, process_info(Type, Fun, Default, Infos1)}
+ end, Tasks),
+ lists:flatten([Initial, Result]).
+
+ process_info(Type, Fun, Default, List) ->
+ lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default,
+ couch_util:get_all_values(Type, List)).
+
+sum(New, Existing) ->
+ New + Existing.
+
+bool(New, Existing) ->
+ New andalso Existing.
+
+max(New, Existing) ->
+ case New > Existing of
+ true -> New;
+ false -> Existing
+ end.
diff --git a/src/fabric_open.erl b/src/fabric_open.erl
index a8c0204e..289a0681 100644
--- a/src/fabric_open.erl
+++ b/src/fabric_open.erl
@@ -23,25 +23,12 @@
% open_db(<<"S", ShardName/binary>>, Options).
-open_doc(DbName, DocId, Revs, Options) ->
+open_doc(DbName, DocId, _Revs, _Options) ->
NPs = partitions:key_nodes_parts(DbName, DocId),
?debugFmt("~nNPs: ~p~n", [NPs]),
- ok.
-
-
-open_doc_endpoint(DbName, DocId, Revs, Options) ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- try
- couch_api:open_doc(Db, DocId, Revs, Options)
- after
- couch_db:close(Db)
- end;
- {not_found, no_db_file} ->
- throw({not_found, <<"The database does not exist.">>});
- Error ->
- throw(Error)
- end.
+ {ok, #doc{}}.
+
+