summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-05-28 15:25:15 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-05-28 15:31:47 -0400
commitf0161d3167265b6b4e1aaf5755799417c451b415 (patch)
tree042159c0fd9630b2c697c9648384b92f146c1235 /src
parent46273631d3496586bfb1fa1713c2e9b8484bec61 (diff)
split fabric_db into dedicated modules
Diffstat (limited to 'src')
-rw-r--r--src/fabric.erl8
-rw-r--r--src/fabric_all_databases.erl38
-rw-r--r--src/fabric_create_db.erl66
-rw-r--r--src/fabric_db.erl217
-rw-r--r--src/fabric_delete_db.erl58
-rw-r--r--src/fabric_get_db_info.erl93
6 files changed, 259 insertions, 221 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
index 399b76d6..9fdea34c 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -16,16 +16,16 @@ db_path(RawUri, Customer) ->
Path.
all_databases(Customer) ->
- fabric_db:all_databases(Customer).
+ fabric_all_databases:all_databases(Customer).
get_db_info(DbName, Customer) ->
- fabric_db:get_db_info(dbname(DbName), Customer).
+ fabric_get_db_info:get_db_info(dbname(DbName), Customer).
create_db(DbName, Options) ->
- fabric_db:create_db(dbname(DbName), Options).
+ fabric_create_db:create_db(dbname(DbName), Options).
delete_db(DbName, Options) ->
- fabric_db:delete_db(dbname(DbName), Options).
+ fabric_delete_db:delete_db(dbname(DbName), Options).
diff --git a/src/fabric_all_databases.erl b/src/fabric_all_databases.erl
new file mode 100644
index 00000000..1b1d4d00
--- /dev/null
+++ b/src/fabric_all_databases.erl
@@ -0,0 +1,38 @@
+-module(fabric_all_databases).
+-author(brad@cloudant.com).
+
+-export([all_databases/1]).
+
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
+
+
+%% @doc gets all databases in the cluster.
+-spec all_databases(binary() | []) -> [binary()].
+all_databases([]) ->
+ Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
+ new_acc(DbName, AccIn)
+ end, [], partitions),
+ {ok, Dbs};
+all_databases(Customer) ->
+ ?debugFmt("~nCustomer: ~p~n", [Customer]),
+ Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
+ DbNameStr = ?b2l(DbName),
+ case string:str(DbNameStr, Customer) of
+ 1 ->
+ new_acc(DbNameStr, AccIn);
+ _ -> AccIn
+ end
+ end, [], dbs_cache),
+ {ok, Dbs}.
+
+
+%% =====================
+%% internal
+%% =====================
+
+new_acc(DbName, Acc) ->
+ case lists:member(DbName, Acc) of
+ true -> Acc;
+ _ ->[DbName | Acc]
+ end.
diff --git a/src/fabric_create_db.erl b/src/fabric_create_db.erl
new file mode 100644
index 00000000..b0ff39ce
--- /dev/null
+++ b/src/fabric_create_db.erl
@@ -0,0 +1,66 @@
+-module(fabric_create_db).
+-author(brad@cloudant.com).
+
+-export([create_db/2]).
+
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
+
+
+%% @doc Create a new database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+-spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
+create_db(DbName, Options) ->
+ Fullmap = partitions:fullmap(DbName, Options),
+ {ok, FullNodes} = mem3:fullnodes(),
+ RefPartMap = send_create_calls(Fullmap, Options),
+ Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
+ {_,#shard{range=[Beg,_]}} <- RefPartMap])},
+ Result = case fabric_util:receive_loop(
+ RefPartMap, 1, fun handle_create_msg/3, Acc0) of
+ {ok, _Results} -> ok;
+ Error -> Error
+ end,
+ % always install partition map, even w/ errors, so delete is possible
+ partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
+ Result.
+
+%%
+%% internal
+%%
+
+%% @doc create the partitions on all appropriate nodes (rexi calls)
+-spec send_create_calls(fullmap(), list()) -> [{reference(), part()}].
+send_create_calls(Fullmap, Options) ->
+ lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
+ Ref = rexi:async_server_call({couch_server, Node},
+ {create, ShardName, Options}),
+ {Ref, Part}
+ end, Fullmap).
+
+%% @doc handle create messages from shards
+handle_create_msg(_, file_exists, _) ->
+ {error, file_exists};
+handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) ->
+ {ok, {Complete, N-1, Parts}};
+handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) ->
+ if
+ Complete -> {stop, ok};
+ true -> {error, create_db_fubar}
+ end;
+handle_create_msg(_, _, {true, 1, _Acc}) ->
+ {stop, ok};
+handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) ->
+ PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
+ case is_complete(PartResults) of
+ true -> {stop, ok};
+ false -> {error, create_db_fubar}
+ end;
+handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) ->
+ {ok, {true, N-1, Parts}};
+handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) ->
+ PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
+ {ok, {is_complete(PartResults), Rem-1, PartResults}}.
+
+is_complete(List) ->
+ lists:all(fun({_,Bool}) -> Bool end, List).
diff --git a/src/fabric_db.erl b/src/fabric_db.erl
deleted file mode 100644
index 088faa7b..00000000
--- a/src/fabric_db.erl
+++ /dev/null
@@ -1,217 +0,0 @@
--module(fabric_db).
--author('Brad Anderson <brad@cloudant.com>').
--author('Adam Kocoloski <adam@cloudant.com>').
-
--export([all_databases/1, get_db_info/2, create_db/2, delete_db/2]).
-
--include("../../couch/src/couch_db.hrl").
--include("../../dynomite/include/membership.hrl").
-
-%% @doc gets all databases in the cluster.
--spec all_databases(binary() | []) -> [binary()].
-all_databases([]) ->
- Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
- new_acc(DbName, AccIn)
- end, [], partitions),
- {ok, Dbs};
-all_databases(Customer) ->
- ?debugFmt("~nCustomer: ~p~n", [Customer]),
- Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
- DbNameStr = ?b2l(DbName),
- case string:str(DbNameStr, Customer) of
- 1 ->
- new_acc(DbNameStr, AccIn);
- _ -> AccIn
- end
- end, [], dbs_cache),
- {ok, Dbs}.
-
-%% @doc get database information tuple
-get_db_info(DbName, Customer) ->
- Name = cloudant_db_name(Customer, DbName),
- Shards = partitions:all_parts(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
- Acc0 = {false, length(Workers), lists:usort([ {Beg, nil} ||
- #shard{range=[Beg,_]} <- Workers])},
- case fabric_util:recv(Workers, #shard.ref, fun handle_info_msg/3, Acc0) of
- {ok, ShardInfos} ->
- {ok, process_infos(ShardInfos, [{db_name, Name}])};
- Error -> Error
- end.
-
-%% @doc Create a new database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
--spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-create_db(DbName, Options) ->
- Fullmap = partitions:fullmap(DbName, Options),
- {ok, FullNodes} = mem3:fullnodes(),
- RefPartMap = send_create_calls(Fullmap, Options),
- Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
- {_,#shard{range=[Beg,_]}} <- RefPartMap])},
- Result = case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_create_msg/3, Acc0) of
- {ok, _Results} -> ok;
- Error -> Error
- end,
- % always install partition map, even w/ errors, so delete is possible
- partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
- Result.
-
-%% @doc Delete a database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
--spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-delete_db(DbName, Options) ->
- Fullmap = partitions:all_parts(DbName),
- RefPartMap = send_delete_calls(Fullmap, Options),
- Acc0 = {true, length(RefPartMap)},
- case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_delete_msg/3, Acc0) of
- {ok, _Results} ->
- delete_fullmap(DbName),
- ok;
- Error -> Error
- end.
-
-%% =====================
-%% internal
-%% =====================
-
-new_acc(DbName, Acc) ->
- case lists:member(DbName, Acc) of
- true -> Acc;
- _ ->[DbName | Acc]
- end.
-
-handle_info_msg(_, _, {true, _, Infos0}) ->
- {stop, Infos0};
-handle_info_msg(_, _, {false, 1, Infos0}) ->
- MissingShards = lists:reverse(lists:foldl(fun
- ({S,nil}, Acc) -> [S|Acc];
- (_, Acc) -> Acc
- end, [], Infos0)),
- ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]),
- {error, get_db_info};
-handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) ->
- case couch_util:get_value(Beg, Infos0) of
- nil ->
- Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}),
- case is_complete(info, Infos) of
- true -> {ok, {true, N-1, Infos}};
- false -> {ok, {false, N-1, Infos}}
- end;
- _ ->
- {ok, {false, N-1, Infos0}}
- end;
-handle_info_msg(_, _Other, {Complete, N, Infos0}) ->
- {ok, {Complete, N-1, Infos0}}.
-
-is_complete(info, List) ->
- not lists:any(fun({_,Info}) -> Info =:= nil end, List);
-is_complete(create, List) ->
- lists:all(fun({_,Bool}) -> Bool end, List).
-
-cloudant_db_name(Customer, FullName) ->
- case Customer of
- "" -> FullName;
- Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}])
- end.
-
-%% Loop through Tasks on the flattened Infos and get the aggregated result
-process_infos(Infos, Initial) ->
- Tasks = [
- {doc_count, fun sum/2, 0},
- {doc_del_count, fun sum/2, 0},
- {update_seq, fun max/2, 1},
- {purge_seq, fun sum/2, 0},
- {compact_running, fun bool/2, 0},
- {disk_size, fun sum/2, 0},
- {instance_start_time, fun(_, _) -> <<"0">> end, 0},
- {disk_format_version, fun max/2, 0}],
-
- Infos1 = lists:flatten(Infos),
-
- Result = lists:map(fun({Type, Fun, Default}) ->
- {Type, process_info(Type, Fun, Default, Infos1)}
- end, Tasks),
- lists:flatten([Initial, Result]).
-
- process_info(Type, Fun, Default, List) ->
- lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default,
- proplists:get_all_values(Type, List)).
-
-sum(New, Existing) ->
- New + Existing.
-
-bool(New, Existing) ->
- New andalso Existing.
-
-max(New, Existing) ->
- case New > Existing of
- true -> New;
- false -> Existing
- end.
-
-
-%% @doc create the partitions on all appropriate nodes (rexi calls)
--spec send_create_calls(fullmap(), list()) -> [{reference(), part()}].
-send_create_calls(Fullmap, Options) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
- Ref = rexi:async_server_call({couch_server, Node},
- {create, ShardName, Options}),
- {Ref, Part}
- end, Fullmap).
-
-handle_create_msg(_, file_exists, _) ->
- {error, file_exists};
-handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) ->
- {ok, {Complete, N-1, Parts}};
-handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) ->
- if
- Complete -> {stop, ok};
- true -> {error, create_db_fubar}
- end;
-handle_create_msg(_, _, {true, 1, _Acc}) ->
- {stop, ok};
-handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- case is_complete(create, PartResults) of
- true -> {stop, ok};
- false -> {error, create_db_fubar}
- end;
-handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) ->
- {ok, {true, N-1, Parts}};
-handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- {ok, {is_complete(create, PartResults), Rem-1, PartResults}}.
-
-
-%% @doc delete the partitions on all appropriate nodes (rexi calls)
--spec send_delete_calls(fullmap(), list()) -> [{reference(), part()}].
-send_delete_calls(Parts, Options) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
- Ref = rexi:async_server_call({couch_server, Node},
- {delete, ShardName, Options}),
- {Ref, Part}
- end, Parts).
-
-handle_delete_msg(_, not_found, {NotFound, N}) ->
- {ok, {NotFound, N-1}};
-handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) ->
- {ok, {NotFound, N-1}};
-handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) ->
- {error, delete_db_fubar};
-handle_delete_msg(_, _, {NotFound, 1}) ->
- if
- NotFound -> {stop, not_found};
- true -> {stop, ok}
- end;
-handle_delete_msg(_, ok, {_NotFound, N}) ->
- {ok, {false, N-1}}.
-
-delete_fullmap(DbName) ->
- case couch_db:open(<<"dbs">>, []) of
- {ok, Db} ->
- {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []),
- couch_api:update_doc(Db, Doc#doc{deleted=true});
- Error -> Error
- end.
diff --git a/src/fabric_delete_db.erl b/src/fabric_delete_db.erl
new file mode 100644
index 00000000..42faf763
--- /dev/null
+++ b/src/fabric_delete_db.erl
@@ -0,0 +1,58 @@
+-module(fabric_delete_db).
+-author(brad@cloudant.com).
+
+-export([delete_db/2]).
+
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
+
+
+%% @doc Delete a database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+-spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
+delete_db(DbName, Options) ->
+ Fullmap = partitions:all_parts(DbName),
+ RefPartMap = send_delete_calls(Fullmap, Options),
+ Acc0 = {true, length(RefPartMap)},
+ case fabric_util:receive_loop(
+ RefPartMap, 1, fun handle_delete_msg/3, Acc0) of
+ {ok, _Results} ->
+ delete_fullmap(DbName),
+ ok;
+ Error -> Error
+ end.
+
+%%
+%% internal
+%%
+
+%% @doc delete the partitions on all appropriate nodes (rexi calls)
+-spec send_delete_calls(fullmap(), list()) -> [{reference(), part()}].
+send_delete_calls(Parts, Options) ->
+ lists:map(fun(#shard{node=Node, name=ShardName} = Part) ->
+ Ref = rexi:async_server_call({couch_server, Node},
+ {delete, ShardName, Options}),
+ {Ref, Part}
+ end, Parts).
+
+handle_delete_msg(_, not_found, {NotFound, N}) ->
+ {ok, {NotFound, N-1}};
+handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) ->
+ {ok, {NotFound, N-1}};
+handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) ->
+ {error, delete_db_fubar};
+handle_delete_msg(_, _, {NotFound, 1}) ->
+ if
+ NotFound -> {stop, not_found};
+ true -> {stop, ok}
+ end;
+handle_delete_msg(_, ok, {_NotFound, N}) ->
+ {ok, {false, N-1}}.
+
+delete_fullmap(DbName) ->
+ case couch_db:open(<<"dbs">>, []) of
+ {ok, Db} ->
+ {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []),
+ couch_api:update_doc(Db, Doc#doc{deleted=true});
+ Error -> Error
+ end.
diff --git a/src/fabric_get_db_info.erl b/src/fabric_get_db_info.erl
new file mode 100644
index 00000000..cccfde6a
--- /dev/null
+++ b/src/fabric_get_db_info.erl
@@ -0,0 +1,93 @@
+-module(fabric_get_db_info).
+-author(brad@cloudant.com).
+
+-export([get_db_info/2]).
+
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
+
+
+%% @doc get database information tuple
+get_db_info(DbName, Customer) ->
+ Name = cloudant_db_name(Customer, DbName),
+ Shards = partitions:all_parts(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
+ Acc0 = {false, length(Workers), lists:usort([ {Beg, nil} ||
+ #shard{range=[Beg,_]} <- Workers])},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_info_msg/3, Acc0) of
+ {ok, ShardInfos} ->
+ {ok, process_infos(ShardInfos, [{db_name, Name}])};
+ Error -> Error
+ end.
+
+
+%% =====================
+%% internal
+%% =====================
+
+handle_info_msg(_, _, {true, _, Infos0}) ->
+ {stop, Infos0};
+handle_info_msg(_, _, {false, 1, Infos0}) ->
+ MissingShards = lists:reverse(lists:foldl(fun
+ ({S,nil}, Acc) -> [S|Acc];
+ (_, Acc) -> Acc
+ end, [], Infos0)),
+ ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]),
+ {error, get_db_info};
+handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) ->
+ case couch_util:get_value(Beg, Infos0) of
+ nil ->
+ Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}),
+ case is_complete(Infos) of
+ true -> {ok, {true, N-1, Infos}};
+ false -> {ok, {false, N-1, Infos}}
+ end;
+ _ ->
+ {ok, {false, N-1, Infos0}}
+ end;
+handle_info_msg(_, _Other, {Complete, N, Infos0}) ->
+ {ok, {Complete, N-1, Infos0}}.
+
+is_complete(List) ->
+ not lists:any(fun({_,Info}) -> Info =:= nil end, List).
+
+cloudant_db_name(Customer, FullName) ->
+ case Customer of
+ "" -> FullName;
+ Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}])
+ end.
+
+%% Loop through Tasks on the flattened Infos and get the aggregated result
+process_infos(Infos, Initial) ->
+ Tasks = [
+ {doc_count, fun sum/2, 0},
+ {doc_del_count, fun sum/2, 0},
+ {update_seq, fun max/2, 1},
+ {purge_seq, fun sum/2, 0},
+ {compact_running, fun bool/2, 0},
+ {disk_size, fun sum/2, 0},
+ {instance_start_time, fun(_, _) -> <<"0">> end, 0},
+ {disk_format_version, fun max/2, 0}],
+
+ Infos1 = lists:flatten(Infos),
+
+ Result = lists:map(fun({Type, Fun, Default}) ->
+ {Type, process_info(Type, Fun, Default, Infos1)}
+ end, Tasks),
+ lists:flatten([Initial, Result]).
+
+ process_info(Type, Fun, Default, List) ->
+ lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default,
+ proplists:get_all_values(Type, List)).
+
+sum(New, Existing) ->
+ New + Existing.
+
+bool(New, Existing) ->
+ New andalso Existing.
+
+max(New, Existing) ->
+ case New > Existing of
+ true -> New;
+ false -> Existing
+ end.