diff options
author | Brad Anderson <brad@cloudant.com> | 2010-05-28 15:25:15 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-05-28 15:31:47 -0400 |
commit | f0161d3167265b6b4e1aaf5755799417c451b415 (patch) | |
tree | 042159c0fd9630b2c697c9648384b92f146c1235 /src | |
parent | 46273631d3496586bfb1fa1713c2e9b8484bec61 (diff) |
split fabric_db into dedicated modules
Diffstat (limited to 'src')
-rw-r--r-- | src/fabric.erl | 8 | ||||
-rw-r--r-- | src/fabric_all_databases.erl | 38 | ||||
-rw-r--r-- | src/fabric_create_db.erl | 66 | ||||
-rw-r--r-- | src/fabric_db.erl | 217 | ||||
-rw-r--r-- | src/fabric_delete_db.erl | 58 | ||||
-rw-r--r-- | src/fabric_get_db_info.erl | 93 |
6 files changed, 259 insertions, 221 deletions
diff --git a/src/fabric.erl b/src/fabric.erl index 399b76d6..9fdea34c 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -16,16 +16,16 @@ db_path(RawUri, Customer) -> Path. all_databases(Customer) -> - fabric_db:all_databases(Customer). + fabric_all_databases:all_databases(Customer). get_db_info(DbName, Customer) -> - fabric_db:get_db_info(dbname(DbName), Customer). + fabric_get_db_info:get_db_info(dbname(DbName), Customer). create_db(DbName, Options) -> - fabric_db:create_db(dbname(DbName), Options). + fabric_create_db:create_db(dbname(DbName), Options). delete_db(DbName, Options) -> - fabric_db:delete_db(dbname(DbName), Options). + fabric_delete_db:delete_db(dbname(DbName), Options). diff --git a/src/fabric_all_databases.erl b/src/fabric_all_databases.erl new file mode 100644 index 00000000..1b1d4d00 --- /dev/null +++ b/src/fabric_all_databases.erl @@ -0,0 +1,38 @@ +-module(fabric_all_databases). +-author(brad@cloudant.com). + +-export([all_databases/1]). + +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). + + +%% @doc gets all databases in the cluster. +-spec all_databases(binary() | []) -> [binary()]. +all_databases([]) -> + Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> + new_acc(DbName, AccIn) + end, [], partitions), + {ok, Dbs}; +all_databases(Customer) -> + ?debugFmt("~nCustomer: ~p~n", [Customer]), + Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> + DbNameStr = ?b2l(DbName), + case string:str(DbNameStr, Customer) of + 1 -> + new_acc(DbNameStr, AccIn); + _ -> AccIn + end + end, [], dbs_cache), + {ok, Dbs}. + + +%% ===================== +%% internal +%% ===================== + +new_acc(DbName, Acc) -> + case lists:member(DbName, Acc) of + true -> Acc; + _ ->[DbName | Acc] + end. diff --git a/src/fabric_create_db.erl b/src/fabric_create_db.erl new file mode 100644 index 00000000..b0ff39ce --- /dev/null +++ b/src/fabric_create_db.erl @@ -0,0 +1,66 @@ +-module(fabric_create_db). +-author(brad@cloudant.com). + +-export([create_db/2]). + +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). + + +%% @doc Create a new database, and all its partition files across the cluster +%% Options is proplist with user_ctx, n, q +-spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}. +create_db(DbName, Options) -> + Fullmap = partitions:fullmap(DbName, Options), + {ok, FullNodes} = mem3:fullnodes(), + RefPartMap = send_create_calls(Fullmap, Options), + Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || + {_,#shard{range=[Beg,_]}} <- RefPartMap])}, + Result = case fabric_util:receive_loop( + RefPartMap, 1, fun handle_create_msg/3, Acc0) of + {ok, _Results} -> ok; + Error -> Error + end, + % always install partition map, even w/ errors, so delete is possible + partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), + Result. + +%% +%% internal +%% + +%% @doc create the partitions on all appropriate nodes (rexi calls) +-spec send_create_calls(fullmap(), list()) -> [{reference(), part()}]. +send_create_calls(Fullmap, Options) -> + lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> + Ref = rexi:async_server_call({couch_server, Node}, + {create, ShardName, Options}), + {Ref, Part} + end, Fullmap). + +%% @doc handle create messages from shards +handle_create_msg(_, file_exists, _) -> + {error, file_exists}; +handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) -> + {ok, {Complete, N-1, Parts}}; +handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) -> + if + Complete -> {stop, ok}; + true -> {error, create_db_fubar} + end; +handle_create_msg(_, _, {true, 1, _Acc}) -> + {stop, ok}; +handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) -> + PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), + case is_complete(PartResults) of + true -> {stop, ok}; + false -> {error, create_db_fubar} + end; +handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) -> + {ok, {true, N-1, Parts}}; +handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) -> + PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), + {ok, {is_complete(PartResults), Rem-1, PartResults}}. + +is_complete(List) -> + lists:all(fun({_,Bool}) -> Bool end, List). diff --git a/src/fabric_db.erl b/src/fabric_db.erl deleted file mode 100644 index 088faa7b..00000000 --- a/src/fabric_db.erl +++ /dev/null @@ -1,217 +0,0 @@ --module(fabric_db). --author('Brad Anderson <brad@cloudant.com>'). --author('Adam Kocoloski <adam@cloudant.com>'). - --export([all_databases/1, get_db_info/2, create_db/2, delete_db/2]). - --include("../../couch/src/couch_db.hrl"). --include("../../dynomite/include/membership.hrl"). - -%% @doc gets all databases in the cluster. --spec all_databases(binary() | []) -> [binary()]. -all_databases([]) -> - Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> - new_acc(DbName, AccIn) - end, [], partitions), - {ok, Dbs}; -all_databases(Customer) -> - ?debugFmt("~nCustomer: ~p~n", [Customer]), - Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) -> - DbNameStr = ?b2l(DbName), - case string:str(DbNameStr, Customer) of - 1 -> - new_acc(DbNameStr, AccIn); - _ -> AccIn - end - end, [], dbs_cache), - {ok, Dbs}. - -%% @doc get database information tuple -get_db_info(DbName, Customer) -> - Name = cloudant_db_name(Customer, DbName), - Shards = partitions:all_parts(DbName), - Workers = fabric_util:submit_jobs(Shards, get_db_info, []), - Acc0 = {false, length(Workers), lists:usort([ {Beg, nil} || - #shard{range=[Beg,_]} <- Workers])}, - case fabric_util:recv(Workers, #shard.ref, fun handle_info_msg/3, Acc0) of - {ok, ShardInfos} -> - {ok, process_infos(ShardInfos, [{db_name, Name}])}; - Error -> Error - end. - -%% @doc Create a new database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q --spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -create_db(DbName, Options) -> - Fullmap = partitions:fullmap(DbName, Options), - {ok, FullNodes} = mem3:fullnodes(), - RefPartMap = send_create_calls(Fullmap, Options), - Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} || - {_,#shard{range=[Beg,_]}} <- RefPartMap])}, - Result = case fabric_util:receive_loop( - RefPartMap, 1, fun handle_create_msg/3, Acc0) of - {ok, _Results} -> ok; - Error -> Error - end, - % always install partition map, even w/ errors, so delete is possible - partitions:install_fullmap(DbName, Fullmap, FullNodes, Options), - Result. - -%% @doc Delete a database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q --spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}. -delete_db(DbName, Options) -> - Fullmap = partitions:all_parts(DbName), - RefPartMap = send_delete_calls(Fullmap, Options), - Acc0 = {true, length(RefPartMap)}, - case fabric_util:receive_loop( - RefPartMap, 1, fun handle_delete_msg/3, Acc0) of - {ok, _Results} -> - delete_fullmap(DbName), - ok; - Error -> Error - end. - -%% ===================== -%% internal -%% ===================== - -new_acc(DbName, Acc) -> - case lists:member(DbName, Acc) of - true -> Acc; - _ ->[DbName | Acc] - end. - -handle_info_msg(_, _, {true, _, Infos0}) -> - {stop, Infos0}; -handle_info_msg(_, _, {false, 1, Infos0}) -> - MissingShards = lists:reverse(lists:foldl(fun - ({S,nil}, Acc) -> [S|Acc]; - (_, Acc) -> Acc - end, [], Infos0)), - ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]), - {error, get_db_info}; -handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) -> - case couch_util:get_value(Beg, Infos0) of - nil -> - Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}), - case is_complete(info, Infos) of - true -> {ok, {true, N-1, Infos}}; - false -> {ok, {false, N-1, Infos}} - end; - _ -> - {ok, {false, N-1, Infos0}} - end; -handle_info_msg(_, _Other, {Complete, N, Infos0}) -> - {ok, {Complete, N-1, Infos0}}. - -is_complete(info, List) -> - not lists:any(fun({_,Info}) -> Info =:= nil end, List); -is_complete(create, List) -> - lists:all(fun({_,Bool}) -> Bool end, List). - -cloudant_db_name(Customer, FullName) -> - case Customer of - "" -> FullName; - Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}]) - end. - -%% Loop through Tasks on the flattened Infos and get the aggregated result -process_infos(Infos, Initial) -> - Tasks = [ - {doc_count, fun sum/2, 0}, - {doc_del_count, fun sum/2, 0}, - {update_seq, fun max/2, 1}, - {purge_seq, fun sum/2, 0}, - {compact_running, fun bool/2, 0}, - {disk_size, fun sum/2, 0}, - {instance_start_time, fun(_, _) -> <<"0">> end, 0}, - {disk_format_version, fun max/2, 0}], - - Infos1 = lists:flatten(Infos), - - Result = lists:map(fun({Type, Fun, Default}) -> - {Type, process_info(Type, Fun, Default, Infos1)} - end, Tasks), - lists:flatten([Initial, Result]). - - process_info(Type, Fun, Default, List) -> - lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default, - proplists:get_all_values(Type, List)). - -sum(New, Existing) -> - New + Existing. - -bool(New, Existing) -> - New andalso Existing. - -max(New, Existing) -> - case New > Existing of - true -> New; - false -> Existing - end. - - -%% @doc create the partitions on all appropriate nodes (rexi calls) --spec send_create_calls(fullmap(), list()) -> [{reference(), part()}]. -send_create_calls(Fullmap, Options) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> - Ref = rexi:async_server_call({couch_server, Node}, - {create, ShardName, Options}), - {Ref, Part} - end, Fullmap). - -handle_create_msg(_, file_exists, _) -> - {error, file_exists}; -handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) -> - {ok, {Complete, N-1, Parts}}; -handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) -> - if - Complete -> {stop, ok}; - true -> {error, create_db_fubar} - end; -handle_create_msg(_, _, {true, 1, _Acc}) -> - {stop, ok}; -handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - case is_complete(create, PartResults) of - true -> {stop, ok}; - false -> {error, create_db_fubar} - end; -handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) -> - {ok, {true, N-1, Parts}}; -handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) -> - PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), - {ok, {is_complete(create, PartResults), Rem-1, PartResults}}. - - -%% @doc delete the partitions on all appropriate nodes (rexi calls) --spec send_delete_calls(fullmap(), list()) -> [{reference(), part()}]. -send_delete_calls(Parts, Options) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> - Ref = rexi:async_server_call({couch_server, Node}, - {delete, ShardName, Options}), - {Ref, Part} - end, Parts). - -handle_delete_msg(_, not_found, {NotFound, N}) -> - {ok, {NotFound, N-1}}; -handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) -> - {ok, {NotFound, N-1}}; -handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) -> - {error, delete_db_fubar}; -handle_delete_msg(_, _, {NotFound, 1}) -> - if - NotFound -> {stop, not_found}; - true -> {stop, ok} - end; -handle_delete_msg(_, ok, {_NotFound, N}) -> - {ok, {false, N-1}}. - -delete_fullmap(DbName) -> - case couch_db:open(<<"dbs">>, []) of - {ok, Db} -> - {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []), - couch_api:update_doc(Db, Doc#doc{deleted=true}); - Error -> Error - end. diff --git a/src/fabric_delete_db.erl b/src/fabric_delete_db.erl new file mode 100644 index 00000000..42faf763 --- /dev/null +++ b/src/fabric_delete_db.erl @@ -0,0 +1,58 @@ +-module(fabric_delete_db). +-author(brad@cloudant.com). + +-export([delete_db/2]). + +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). + + +%% @doc Delete a database, and all its partition files across the cluster +%% Options is proplist with user_ctx, n, q +-spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}. +delete_db(DbName, Options) -> + Fullmap = partitions:all_parts(DbName), + RefPartMap = send_delete_calls(Fullmap, Options), + Acc0 = {true, length(RefPartMap)}, + case fabric_util:receive_loop( + RefPartMap, 1, fun handle_delete_msg/3, Acc0) of + {ok, _Results} -> + delete_fullmap(DbName), + ok; + Error -> Error + end. + +%% +%% internal +%% + +%% @doc delete the partitions on all appropriate nodes (rexi calls) +-spec send_delete_calls(fullmap(), list()) -> [{reference(), part()}]. +send_delete_calls(Parts, Options) -> + lists:map(fun(#shard{node=Node, name=ShardName} = Part) -> + Ref = rexi:async_server_call({couch_server, Node}, + {delete, ShardName, Options}), + {Ref, Part} + end, Parts). + +handle_delete_msg(_, not_found, {NotFound, N}) -> + {ok, {NotFound, N-1}}; +handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) -> + {ok, {NotFound, N-1}}; +handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) -> + {error, delete_db_fubar}; +handle_delete_msg(_, _, {NotFound, 1}) -> + if + NotFound -> {stop, not_found}; + true -> {stop, ok} + end; +handle_delete_msg(_, ok, {_NotFound, N}) -> + {ok, {false, N-1}}. + +delete_fullmap(DbName) -> + case couch_db:open(<<"dbs">>, []) of + {ok, Db} -> + {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []), + couch_api:update_doc(Db, Doc#doc{deleted=true}); + Error -> Error + end. diff --git a/src/fabric_get_db_info.erl b/src/fabric_get_db_info.erl new file mode 100644 index 00000000..cccfde6a --- /dev/null +++ b/src/fabric_get_db_info.erl @@ -0,0 +1,93 @@ +-module(fabric_get_db_info). +-author(brad@cloudant.com). + +-export([get_db_info/2]). + +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). + + +%% @doc get database information tuple +get_db_info(DbName, Customer) -> + Name = cloudant_db_name(Customer, DbName), + Shards = partitions:all_parts(DbName), + Workers = fabric_util:submit_jobs(Shards, get_db_info, []), + Acc0 = {false, length(Workers), lists:usort([ {Beg, nil} || + #shard{range=[Beg,_]} <- Workers])}, + case fabric_util:recv(Workers, #shard.ref, fun handle_info_msg/3, Acc0) of + {ok, ShardInfos} -> + {ok, process_infos(ShardInfos, [{db_name, Name}])}; + Error -> Error + end. + + +%% ===================== +%% internal +%% ===================== + +handle_info_msg(_, _, {true, _, Infos0}) -> + {stop, Infos0}; +handle_info_msg(_, _, {false, 1, Infos0}) -> + MissingShards = lists:reverse(lists:foldl(fun + ({S,nil}, Acc) -> [S|Acc]; + (_, Acc) -> Acc + end, [], Infos0)), + ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]), + {error, get_db_info}; +handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) -> + case couch_util:get_value(Beg, Infos0) of + nil -> + Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}), + case is_complete(Infos) of + true -> {ok, {true, N-1, Infos}}; + false -> {ok, {false, N-1, Infos}} + end; + _ -> + {ok, {false, N-1, Infos0}} + end; +handle_info_msg(_, _Other, {Complete, N, Infos0}) -> + {ok, {Complete, N-1, Infos0}}. + +is_complete(List) -> + not lists:any(fun({_,Info}) -> Info =:= nil end, List). + +cloudant_db_name(Customer, FullName) -> + case Customer of + "" -> FullName; + Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}]) + end. + +%% Loop through Tasks on the flattened Infos and get the aggregated result +process_infos(Infos, Initial) -> + Tasks = [ + {doc_count, fun sum/2, 0}, + {doc_del_count, fun sum/2, 0}, + {update_seq, fun max/2, 1}, + {purge_seq, fun sum/2, 0}, + {compact_running, fun bool/2, 0}, + {disk_size, fun sum/2, 0}, + {instance_start_time, fun(_, _) -> <<"0">> end, 0}, + {disk_format_version, fun max/2, 0}], + + Infos1 = lists:flatten(Infos), + + Result = lists:map(fun({Type, Fun, Default}) -> + {Type, process_info(Type, Fun, Default, Infos1)} + end, Tasks), + lists:flatten([Initial, Result]). + + process_info(Type, Fun, Default, List) -> + lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default, + proplists:get_all_values(Type, List)). + +sum(New, Existing) -> + New + Existing. + +bool(New, Existing) -> + New andalso Existing. + +max(New, Existing) -> + case New > Existing of + true -> New; + false -> Existing + end. |