diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/fabric.erl | 16 | ||||
-rw-r--r-- | src/fabric_create.erl | 72 | ||||
-rw-r--r-- | src/fabric_db.erl | 218 | ||||
-rw-r--r-- | src/fabric_delete.erl | 65 | ||||
-rw-r--r-- | src/fabric_info.erl | 122 | ||||
-rw-r--r-- | src/fabric_open.erl | 47 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 1 |
7 files changed, 226 insertions, 315 deletions
diff --git a/src/fabric.erl b/src/fabric.erl index 15269bbb..e75e31eb 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -3,23 +3,23 @@ -export([all_databases/1, create_db/2, delete_db/2, open_doc/3, open_doc/4, get_db_info/2]). -%% maybe this goes away, and these are called directly in their own modules in -%% fabric_api ?? +% db operations all_databases(Customer) -> - fabric_info:all_databases(Customer). + fabric_db:all_databases(Customer). + +get_db_info(DbName, Customer) -> + fabric_db:get_db_info(DbName, Customer). create_db(DbName, Options) -> - fabric_create:create_db(DbName, Options). + fabric_db:create_db(DbName, Options). delete_db(DbName, Options) -> - fabric_delete:delete_db(DbName, Options). + fabric_db:delete_db(DbName, Options). +% doc operations open_doc(Db, DocId, Options) -> fabric_doc:open_doc(Db, DocId, Options). open_doc(Db, DocId, Revs, Options) -> fabric_open:open_doc(Db, DocId, Revs, Options). - -get_db_info(DbName, Customer) -> - fabric_info:get_db_info(DbName, Customer). diff --git a/src/fabric_create.erl b/src/fabric_create.erl deleted file mode 100644 index 79bf1040..00000000 --- a/src/fabric_create.erl +++ /dev/null @@ -1,72 +0,0 @@ --module(fabric_create). --author('Brad Anderson <brad@cloudant.com>'). - --include("../../couch/src/couch_db.hrl"). --include("../../dynomite/include/membership.hrl"). - -%% api --export([create_db/2]). - - -%% ===================== -%% api -%% ===================== - -%% @doc Create a new database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q --spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -create_db(DbName, Options) -> - Fullmap = partitions:fullmap(DbName, Options), - {ok, FullNodes} = mem3:fullnodes(), - RefPartMap = send_create_calls(DbName, Options, Fullmap), - Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || - {_,#shard{range=[Beg,_]}} <- RefPartMap])}, - case fabric_util:receive_loop( - RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of - {ok, _Results} -> - partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), - ok; - Error -> Error - end. - - -%% ===================== -%% internal -%% ===================== - -%% @doc create the partitions on all appropriate nodes (rexi calls) --spec send_create_calls(binary(), list(), fullmap()) -> [{reference(), part()}]. -send_create_calls(DbName, Options, Fullmap) -> - lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) -> - ShardName = showroom_utils:shard_name(Beg, DbName), - Ref = rexi:async_server_call({couch_server, Node}, - {create, ShardName, Options}), - {Ref, Part} - end, Fullmap). - -handle_create_msg(_, file_exists, _) -> - {error, file_exists}; -handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) -> - {ok, {Complete, N-1, Parts}}; -handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) -> - if - Complete -> {stop, ok}; - true -> {error, create_db_fubar} - end; -handle_create_msg(_, _, {true, 1, _Acc}) -> - {stop, ok}; -handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - case is_complete(PartResults) of - true -> {stop, ok}; - false -> {error, create_db_fubar} - end; -handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) -> - {ok, {true, N-1, Parts}}; -handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - {ok, {is_complete(PartResults), Rem-1, PartResults}}. - - -is_complete(List) -> - lists:all(fun({_,Bool}) -> Bool end, List). diff --git a/src/fabric_db.erl b/src/fabric_db.erl new file mode 100644 index 00000000..95648e01 --- /dev/null +++ b/src/fabric_db.erl @@ -0,0 +1,218 @@ +-module(fabric_db). +-author('Brad Anderson <brad@cloudant.com>'). +-author('Adam Kocoloski <adam@cloudant.com>'). + +-export([all_databases/1, get_db_info/2, create_db/2, delete_db/2]). + +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). + +%% @doc gets all databases in the cluster. +-spec all_databases(binary() | []) -> [binary()]. +all_databases([]) -> + Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> + new_acc(DbName, AccIn) + end, [], partitions), + {ok, Dbs}; +all_databases(Customer) -> + ?debugFmt("~nCustomer: ~p~n", [Customer]), + Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> + DbNameStr = ?b2l(DbName), + case string:str(DbNameStr, Customer) of + 1 -> + new_acc(DbNameStr, AccIn); + _ -> AccIn + end + end, [], dbs_cache), + {ok, Dbs}. + +%% @doc get database information tuple +get_db_info(DbName, Customer) -> + Name = cloudant_db_name(Customer, DbName), + Shards = partitions:all_parts(DbName), + Workers = fabric_util:submit_jobs(Shards, get_db_info, []), + Acc0 = {false, length(Workers), lists:usort([ {Beg, nil} || + #shard{range=[Beg,_]} <- Workers])}, + case fabric_util:recv(Workers, #shard.ref, fun handle_info_msg/3, Acc0) of + {ok, ShardInfos} -> + {ok, process_infos(ShardInfos, [{db_name, Name}])}; + Error -> Error + end. + +%% @doc Create a new database, and all its partition files across the cluster +%% Options is proplist with user_ctx, n, q +-spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}. +create_db(DbName, Options) -> + Fullmap = partitions:fullmap(DbName, Options), + {ok, FullNodes} = mem3:fullnodes(), + RefPartMap = send_create_calls(DbName, Options, Fullmap), + Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || + {_,#shard{range=[Beg,_]}} <- RefPartMap])}, + case fabric_util:receive_loop( + RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of + {ok, _Results} -> + partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), + ok; + Error -> Error + end. + +%% @doc Delete a database, and all its partition files across the cluster +%% Options is proplist with user_ctx, n, q +-spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}. +delete_db(DbName, Options) -> + Parts = partitions:all_parts(DbName), + RefPartMap = send_delete_calls(DbName, Options, Parts), + Acc0 = {true, length(RefPartMap)}, + case fabric_util:receive_loop( + RefPartMap, 1, fun handle_delete_msg/3, Acc0, 5000, infinity) of + {ok, _Results} -> + delete_fullmap(DbName), + ok; + Error -> Error + end. + +%% ===================== +%% internal +%% ===================== + +new_acc(DbName, Acc) -> + case lists:member(DbName, Acc) of + true -> Acc; + _ ->[DbName | Acc] + end. + +handle_info_msg(_, _, {true, _, Infos0}) -> + {stop, Infos0}; +handle_info_msg(_, _, {false, 1, Infos0}) -> + MissingShards = lists:reverse(lists:foldl(fun + ({S,nil}, Acc) -> [S|Acc]; + (_, Acc) -> Acc + end, [], Infos0)), + ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]), + {error, get_db_info}; +handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) -> + case couch_util:get_value(Beg, Infos0) of + nil -> + Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}), + case is_complete(info, Infos) of + true -> {ok, {true, N-1, Infos}}; + false -> {ok, {false, N-1, Infos}} + end; + _ -> + {ok, {false, N-1, Infos0}} + end; +handle_info_msg(_, _Other, {Complete, N, Infos0}) -> + {ok, {Complete, N-1, Infos0}}. + +is_complete(info, List) -> + not lists:any(fun({_,Info}) -> Info =:= nil end, List); +is_complete(create, List) -> + lists:all(fun({_,Bool}) -> Bool end, List). + +cloudant_db_name(Customer, FullName) -> + case Customer of + "" -> FullName; + Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}]) + end. + +%% Loop through Tasks on the flattened Infos and get the aggregated result +process_infos(Infos, Initial) -> + Tasks = [ + {doc_count, fun sum/2, 0}, + {doc_del_count, fun sum/2, 0}, + {update_seq, fun max/2, 1}, + {purge_seq, fun sum/2, 0}, + {compact_running, fun bool/2, 0}, + {disk_size, fun sum/2, 0}, + {instance_start_time, fun(_, _) -> <<"0">> end, 0}, + {disk_format_version, fun max/2, 0}], + + Infos1 = lists:flatten(Infos), + + Result = lists:map(fun({Type, Fun, Default}) -> + {Type, process_info(Type, Fun, Default, Infos1)} + end, Tasks), + lists:flatten([Initial, Result]). + + process_info(Type, Fun, Default, List) -> + lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default, + proplists:get_all_values(Type, List)). + +sum(New, Existing) -> + New + Existing. + +bool(New, Existing) -> + New andalso Existing. + +max(New, Existing) -> + case New > Existing of + true -> New; + false -> Existing + end. + + +%% @doc create the partitions on all appropriate nodes (rexi calls) +-spec send_create_calls(binary(), list(), fullmap()) -> [{reference(), part()}]. +send_create_calls(DbName, Options, Fullmap) -> + lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) -> + ShardName = showroom_utils:shard_name(Beg, DbName), + Ref = rexi:async_server_call({couch_server, Node}, + {create, ShardName, Options}), + {Ref, Part} + end, Fullmap). + +handle_create_msg(_, file_exists, _) -> + {error, file_exists}; +handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) -> + {ok, {Complete, N-1, Parts}}; +handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) -> + if + Complete -> {stop, ok}; + true -> {error, create_db_fubar} + end; +handle_create_msg(_, _, {true, 1, _Acc}) -> + {stop, ok}; +handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) -> + PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), + case is_complete(create, PartResults) of + true -> {stop, ok}; + false -> {error, create_db_fubar} + end; +handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) -> + {ok, {true, N-1, Parts}}; +handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) -> + PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), + {ok, {is_complete(create, PartResults), Rem-1, PartResults}}. + + +%% @doc delete the partitions on all appropriate nodes (rexi calls) +-spec send_delete_calls(binary(), list(), fullmap()) -> [{reference(), part()}]. +send_delete_calls(DbName, Options, Parts) -> + lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) -> + ShardName = showroom_utils:shard_name(Beg, DbName), + Ref = rexi:async_server_call({couch_server, Node}, + {delete, ShardName, Options}), + {Ref, Part} + end, Parts). + +handle_delete_msg(_, not_found, {NotFound, N}) -> + {ok, {NotFound, N-1}}; +handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) -> + {ok, {NotFound, N-1}}; +handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) -> + {error, delete_db_fubar}; +handle_delete_msg(_, _, {NotFound, 1}) -> + if + NotFound -> {stop, not_found}; + true -> {stop, ok} + end; +handle_delete_msg(_, ok, {_NotFound, N}) -> + {ok, {false, N-1}}. + +delete_fullmap(DbName) -> + case couch_db:open(<<"dbs">>, []) of + {ok, Db} -> + {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []), + couch_api:update_doc(Db, Doc#doc{deleted=true}); + Error -> Error + end. diff --git a/src/fabric_delete.erl b/src/fabric_delete.erl deleted file mode 100644 index 9a65f72b..00000000 --- a/src/fabric_delete.erl +++ /dev/null @@ -1,65 +0,0 @@ --module(fabric_delete). --author('Brad Anderson <brad@cloudant.com>'). - --include("../../couch/src/couch_db.hrl"). --include("../../dynomite/include/membership.hrl"). - -%% api --export([delete_db/2]). - - -%% ===================== -%% api -%% ===================== - -%% @doc Delete a database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q --spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -delete_db(DbName, Options) -> - Parts = partitions:all_parts(DbName), - RefPartMap = send_calls(DbName, Options, Parts), - Acc0 = {true, length(RefPartMap)}, - case fabric_util:receive_loop( - RefPartMap, 1, fun handle_delete_msg/3, Acc0, 5000, infinity) of - {ok, _Results} -> - delete_fullmap(DbName), - ok; - Error -> Error - end. - - -%% ===================== -%% internal -%% ===================== - -%% @doc delete the partitions on all appropriate nodes (rexi calls) --spec send_calls(binary(), list(), fullmap()) -> [{reference(), part()}]. -send_calls(DbName, Options, Parts) -> - lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) -> - ShardName = showroom_utils:shard_name(Beg, DbName), - Ref = rexi:async_server_call({couch_server, Node}, - {delete, ShardName, Options}), - {Ref, Part} - end, Parts). - -handle_delete_msg(_, not_found, {NotFound, N}) -> - {ok, {NotFound, N-1}}; -handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) -> - {ok, {NotFound, N-1}}; -handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) -> - {error, delete_db_fubar}; -handle_delete_msg(_, _, {NotFound, 1}) -> - if - NotFound -> {stop, not_found}; - true -> {stop, ok} - end; -handle_delete_msg(_, ok, {_NotFound, N}) -> - {ok, {false, N-1}}. - -delete_fullmap(DbName) -> - case couch_db:open(<<"dbs">>, []) of - {ok, Db} -> - {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []), - couch_api:update_doc(Db, Doc#doc{deleted=true}); - Error -> Error - end. diff --git a/src/fabric_info.erl b/src/fabric_info.erl deleted file mode 100644 index cb32eac0..00000000 --- a/src/fabric_info.erl +++ /dev/null @@ -1,122 +0,0 @@ --module(fabric_info). - --export([all_databases/1, get_db_info/2]). - --include("../../couch/src/couch_db.hrl"). --include("../../dynomite/include/membership.hrl"). - -%% @doc gets all databases in the cluster. --spec all_databases(binary() | []) -> [binary()]. -all_databases([]) -> - Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> - new_acc(DbName, AccIn) - end, [], partitions), - {ok, Dbs}; -all_databases(Customer) -> - ?debugFmt("~nCustomer: ~p~n", [Customer]), - Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> - DbNameStr = ?b2l(DbName), - case string:str(DbNameStr, Customer) of - 1 -> - new_acc(DbNameStr, AccIn); - _ -> AccIn - end - end, [], dbs_cache), - {ok, Dbs}. - -%% @doc get database information tuple -get_db_info(DbName, Customer) -> - Name = cloudant_db_name(Customer, DbName), - Parts = partitions:all_parts(DbName), - RefPartMap = send_info_calls(DbName, Parts), - Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, nil} || - {_,#shard{range=[Beg,_]}} <- RefPartMap])}, - case fabric_util:receive_loop( - RefPartMap, 1, fun handle_info_msg/3, Acc0, 5000, infinity) of - {ok, ShardInfos} -> - {ok, process_infos(ShardInfos, [{db_name, Name}])}; - Error -> Error - end. - - -%% ===================== -%% internal -%% ===================== - -new_acc(DbName, Acc) -> - case lists:member(DbName, Acc) of - true -> Acc; - _ ->[DbName | Acc] - end. - -send_info_calls(DbName, Parts) -> - lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) -> - ShardName = showroom_utils:shard_name(Beg, DbName), - Ref = rexi:cast(Node, {fabric_rpc, get_db_info, ShardName}), - {Ref, Part} - end, Parts). - -handle_info_msg(_, _, {true, _, Infos0}) -> - {stop, Infos0}; -handle_info_msg(_, _, {false, 1, Infos0}) -> - MissingShards = lists:keyfind(nil, 2, Infos0), - ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]), - {error, get_db_info}; -handle_info_msg({_,#shard{range=[Beg,_]}}, {ok, Info}, {false, N, Infos0}) -> - case couch_util:get_value(Beg, Infos0) of - nil -> - Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}), - case is_complete(Infos) of - true -> {ok, {true, N-1, Infos}}; - false -> {ok, {false, N-1, Infos}} - end; - _ -> - {ok, {false, N-1, Infos0}} - end; -handle_info_msg(_, _Other, {Complete, N, Infos0}) -> - {ok, {Complete, N-1, Infos0}}. - - -is_complete(List) -> - not lists:any(fun({_,nil}) -> true end, List). - -cloudant_db_name(Customer, FullName) -> - case Customer of - "" -> FullName; - Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}]) - end. - -%% Loop through Tasks on the flattened Infos and get the aggregated result -process_infos(Infos, Initial) -> - Tasks = [ - {doc_count, fun sum/2, 0}, - {doc_del_count, fun sum/2, 0}, - {update_seq, fun max/2, 1}, - {purge_seq, fun sum/2, 0}, - {compact_running, fun bool/2, 0}, - {disk_size, fun sum/2, 0}, - {instance_start_time, fun(_, _) -> <<"0">> end, 0}, - {disk_format_version, fun max/2, 0}], - - Infos1 = lists:flatten(Infos), - - Result = lists:map(fun({Type, Fun, Default}) -> - {Type, process_info(Type, Fun, Default, Infos1)} - end, Tasks), - lists:flatten([Initial, Result]). - - process_info(Type, Fun, Default, List) -> - lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default, - couch_util:get_all_values(Type, List)). - -sum(New, Existing) -> - New + Existing. - -bool(New, Existing) -> - New andalso Existing. - -max(New, Existing) -> - case New > Existing of - true -> New; - false -> Existing - end. diff --git a/src/fabric_open.erl b/src/fabric_open.erl deleted file mode 100644 index cab10c5d..00000000 --- a/src/fabric_open.erl +++ /dev/null @@ -1,47 +0,0 @@ --module(fabric_open). - --export([open_doc/4]). - --include("../../couch/src/couch_db.hrl"). - - -% open_db(<<"S", ShardFileName/binary>> = Name, Options) -> -% case couch_db:open(ShardFileName, Options) of -% {ok, Db} -> -% {ok, Db#db{name = Name}}; -% {not_found, no_db_file} -> -% {not_found, no_db_file} -% end; -% -% open_db(DbName, Options) -> -% Part = case lists:keyfind(node(), 1, membership2:all_nodes_parts(false)) of -% {_, P} -> P; -% _ -> throw({node_has_no_parts, node()}) -% end, -% ShardName = partitions:shard_name(Part, DbName), -% open_db(<<"S", ShardName/binary>>, Options). - - -open_doc(DbName, DocId, _Revs, _Options) -> - NPs = partitions:key_nodes_parts(DbName, DocId), - ?debugFmt("~nNPs: ~p~n", [NPs]), - {ok, #doc{}}. - - - - - -% open_doc(Db, DocId, Revs, Options) -> -% {R,N} = get_quorum_constants(r, Options), -% case cluster_ops:key_lookup(DocId, {dynomite_couch_api, get, -% [Db, DocId, Revs, Options]}, r, R, N) of -% {ok, [Doc|_Rest]} -> -% {ok, Doc}; -% {ok, {not_found, Reason}, _Reasons} -> -% {not_found, Reason}; -% [{error, Error}, {good, Good}, {bad, Bad}] -> -% showroom_quorum_utils:handle_error(Error, Good, Bad); -% Other -> -% ?LOG_DEBUG("~nopen_doc Other: ~p~n", [Other]), -% throw(Other) -% end. diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 54f3b338..a0c0a568 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -13,7 +13,6 @@ open_doc(DbName, DocId, Revs, Options) -> with_db(DbName, {couch_api, open_doc, [DocId, Revs, Options]}). get_db_info(DbName) -> - ?debugHere, with_db(DbName, {couch_db, get_db_info, []}). %% |