summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/fabric.erl16
-rw-r--r--src/fabric_create.erl72
-rw-r--r--src/fabric_db.erl218
-rw-r--r--src/fabric_delete.erl65
-rw-r--r--src/fabric_info.erl122
-rw-r--r--src/fabric_open.erl47
-rw-r--r--src/fabric_rpc.erl1
7 files changed, 226 insertions, 315 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
index 15269bbb..e75e31eb 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -3,23 +3,23 @@
-export([all_databases/1, create_db/2, delete_db/2, open_doc/3, open_doc/4,
get_db_info/2]).
-%% maybe this goes away, and these are called directly in their own modules in
-%% fabric_api ??
+% db operations
all_databases(Customer) ->
- fabric_info:all_databases(Customer).
+ fabric_db:all_databases(Customer).
+
+get_db_info(DbName, Customer) ->
+ fabric_db:get_db_info(DbName, Customer).
create_db(DbName, Options) ->
- fabric_create:create_db(DbName, Options).
+ fabric_db:create_db(DbName, Options).
delete_db(DbName, Options) ->
- fabric_delete:delete_db(DbName, Options).
+ fabric_db:delete_db(DbName, Options).
+% doc operations
open_doc(Db, DocId, Options) ->
fabric_doc:open_doc(Db, DocId, Options).
open_doc(Db, DocId, Revs, Options) ->
fabric_open:open_doc(Db, DocId, Revs, Options).
-
-get_db_info(DbName, Customer) ->
- fabric_info:get_db_info(DbName, Customer).
diff --git a/src/fabric_create.erl b/src/fabric_create.erl
deleted file mode 100644
index 79bf1040..00000000
--- a/src/fabric_create.erl
+++ /dev/null
@@ -1,72 +0,0 @@
--module(fabric_create).
--author('Brad Anderson <brad@cloudant.com>').
-
--include("../../couch/src/couch_db.hrl").
--include("../../dynomite/include/membership.hrl").
-
-%% api
--export([create_db/2]).
-
-
-%% =====================
-%% api
-%% =====================
-
-%% @doc Create a new database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
--spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-create_db(DbName, Options) ->
- Fullmap = partitions:fullmap(DbName, Options),
- {ok, FullNodes} = mem3:fullnodes(),
- RefPartMap = send_create_calls(DbName, Options, Fullmap),
- Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
- {_,#shard{range=[Beg,_]}} <- RefPartMap])},
- case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of
- {ok, _Results} ->
- partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
- ok;
- Error -> Error
- end.
-
-
-%% =====================
-%% internal
-%% =====================
-
-%% @doc create the partitions on all appropriate nodes (rexi calls)
--spec send_create_calls(binary(), list(), fullmap()) -> [{reference(), part()}].
-send_create_calls(DbName, Options, Fullmap) ->
- lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) ->
- ShardName = showroom_utils:shard_name(Beg, DbName),
- Ref = rexi:async_server_call({couch_server, Node},
- {create, ShardName, Options}),
- {Ref, Part}
- end, Fullmap).
-
-handle_create_msg(_, file_exists, _) ->
- {error, file_exists};
-handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) ->
- {ok, {Complete, N-1, Parts}};
-handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) ->
- if
- Complete -> {stop, ok};
- true -> {error, create_db_fubar}
- end;
-handle_create_msg(_, _, {true, 1, _Acc}) ->
- {stop, ok};
-handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- case is_complete(PartResults) of
- true -> {stop, ok};
- false -> {error, create_db_fubar}
- end;
-handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) ->
- {ok, {true, N-1, Parts}};
-handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) ->
- PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
- {ok, {is_complete(PartResults), Rem-1, PartResults}}.
-
-
-is_complete(List) ->
- lists:all(fun({_,Bool}) -> Bool end, List).
diff --git a/src/fabric_db.erl b/src/fabric_db.erl
new file mode 100644
index 00000000..95648e01
--- /dev/null
+++ b/src/fabric_db.erl
@@ -0,0 +1,218 @@
+-module(fabric_db).
+-author('Brad Anderson <brad@cloudant.com>').
+-author('Adam Kocoloski <adam@cloudant.com>').
+
+-export([all_databases/1, get_db_info/2, create_db/2, delete_db/2]).
+
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
+
+%% @doc gets all databases in the cluster.
+-spec all_databases(binary() | []) -> [binary()].
+all_databases([]) ->
+ Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
+ new_acc(DbName, AccIn)
+ end, [], partitions),
+ {ok, Dbs};
+all_databases(Customer) ->
+ ?debugFmt("~nCustomer: ~p~n", [Customer]),
+ Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
+ DbNameStr = ?b2l(DbName),
+ case string:str(DbNameStr, Customer) of
+ 1 ->
+ new_acc(DbNameStr, AccIn);
+ _ -> AccIn
+ end
+ end, [], dbs_cache),
+ {ok, Dbs}.
+
+%% @doc get database information tuple
+get_db_info(DbName, Customer) ->
+ Name = cloudant_db_name(Customer, DbName),
+ Shards = partitions:all_parts(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
+ Acc0 = {false, length(Workers), lists:usort([ {Beg, nil} ||
+ #shard{range=[Beg,_]} <- Workers])},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_info_msg/3, Acc0) of
+ {ok, ShardInfos} ->
+ {ok, process_infos(ShardInfos, [{db_name, Name}])};
+ Error -> Error
+ end.
+
+%% @doc Create a new database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+-spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
+create_db(DbName, Options) ->
+ Fullmap = partitions:fullmap(DbName, Options),
+ {ok, FullNodes} = mem3:fullnodes(),
+ RefPartMap = send_create_calls(DbName, Options, Fullmap),
+ Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, false} ||
+ {_,#shard{range=[Beg,_]}} <- RefPartMap])},
+ case fabric_util:receive_loop(
+ RefPartMap, 1, fun handle_create_msg/3, Acc0, 5000, infinity) of
+ {ok, _Results} ->
+ partitions:install_fullmap(DbName, Fullmap, FullNodes, Options),
+ ok;
+ Error -> Error
+ end.
+
+%% @doc Delete a database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+-spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
+delete_db(DbName, Options) ->
+ Parts = partitions:all_parts(DbName),
+ RefPartMap = send_delete_calls(DbName, Options, Parts),
+ Acc0 = {true, length(RefPartMap)},
+ case fabric_util:receive_loop(
+ RefPartMap, 1, fun handle_delete_msg/3, Acc0, 5000, infinity) of
+ {ok, _Results} ->
+ delete_fullmap(DbName),
+ ok;
+ Error -> Error
+ end.
+
+%% =====================
+%% internal
+%% =====================
+
+new_acc(DbName, Acc) ->
+ case lists:member(DbName, Acc) of
+ true -> Acc;
+ _ ->[DbName | Acc]
+ end.
+
+handle_info_msg(_, _, {true, _, Infos0}) ->
+ {stop, Infos0};
+handle_info_msg(_, _, {false, 1, Infos0}) ->
+ MissingShards = lists:reverse(lists:foldl(fun
+ ({S,nil}, Acc) -> [S|Acc];
+ (_, Acc) -> Acc
+ end, [], Infos0)),
+ ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]),
+ {error, get_db_info};
+handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) ->
+ case couch_util:get_value(Beg, Infos0) of
+ nil ->
+ Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}),
+ case is_complete(info, Infos) of
+ true -> {ok, {true, N-1, Infos}};
+ false -> {ok, {false, N-1, Infos}}
+ end;
+ _ ->
+ {ok, {false, N-1, Infos0}}
+ end;
+handle_info_msg(_, _Other, {Complete, N, Infos0}) ->
+ {ok, {Complete, N-1, Infos0}}.
+
+is_complete(info, List) ->
+ not lists:any(fun({_,Info}) -> Info =:= nil end, List);
+is_complete(create, List) ->
+ lists:all(fun({_,Bool}) -> Bool end, List).
+
+cloudant_db_name(Customer, FullName) ->
+ case Customer of
+ "" -> FullName;
+ Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}])
+ end.
+
+%% Loop through Tasks on the flattened Infos and get the aggregated result
+process_infos(Infos, Initial) ->
+ Tasks = [
+ {doc_count, fun sum/2, 0},
+ {doc_del_count, fun sum/2, 0},
+ {update_seq, fun max/2, 1},
+ {purge_seq, fun sum/2, 0},
+ {compact_running, fun bool/2, 0},
+ {disk_size, fun sum/2, 0},
+ {instance_start_time, fun(_, _) -> <<"0">> end, 0},
+ {disk_format_version, fun max/2, 0}],
+
+ Infos1 = lists:flatten(Infos),
+
+ Result = lists:map(fun({Type, Fun, Default}) ->
+ {Type, process_info(Type, Fun, Default, Infos1)}
+ end, Tasks),
+ lists:flatten([Initial, Result]).
+
+ process_info(Type, Fun, Default, List) ->
+ lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default,
+ proplists:get_all_values(Type, List)).
+
+sum(New, Existing) ->
+ New + Existing.
+
+bool(New, Existing) ->
+ New andalso Existing.
+
+max(New, Existing) ->
+ case New > Existing of
+ true -> New;
+ false -> Existing
+ end.
+
+
+%% @doc create the partitions on all appropriate nodes (rexi calls)
+-spec send_create_calls(binary(), list(), fullmap()) -> [{reference(), part()}].
+send_create_calls(DbName, Options, Fullmap) ->
+ lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) ->
+ ShardName = showroom_utils:shard_name(Beg, DbName),
+ Ref = rexi:async_server_call({couch_server, Node},
+ {create, ShardName, Options}),
+ {Ref, Part}
+ end, Fullmap).
+
+handle_create_msg(_, file_exists, _) ->
+ {error, file_exists};
+handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) ->
+ {ok, {Complete, N-1, Parts}};
+handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) ->
+ if
+ Complete -> {stop, ok};
+ true -> {error, create_db_fubar}
+ end;
+handle_create_msg(_, _, {true, 1, _Acc}) ->
+ {stop, ok};
+handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) ->
+ PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
+ case is_complete(create, PartResults) of
+ true -> {stop, ok};
+ false -> {error, create_db_fubar}
+ end;
+handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) ->
+ {ok, {true, N-1, Parts}};
+handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) ->
+ PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
+ {ok, {is_complete(create, PartResults), Rem-1, PartResults}}.
+
+
+%% @doc delete the partitions on all appropriate nodes (rexi calls)
+-spec send_delete_calls(binary(), list(), fullmap()) -> [{reference(), part()}].
+send_delete_calls(DbName, Options, Parts) ->
+ lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) ->
+ ShardName = showroom_utils:shard_name(Beg, DbName),
+ Ref = rexi:async_server_call({couch_server, Node},
+ {delete, ShardName, Options}),
+ {Ref, Part}
+ end, Parts).
+
+handle_delete_msg(_, not_found, {NotFound, N}) ->
+ {ok, {NotFound, N-1}};
+handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) ->
+ {ok, {NotFound, N-1}};
+handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) ->
+ {error, delete_db_fubar};
+handle_delete_msg(_, _, {NotFound, 1}) ->
+ if
+ NotFound -> {stop, not_found};
+ true -> {stop, ok}
+ end;
+handle_delete_msg(_, ok, {_NotFound, N}) ->
+ {ok, {false, N-1}}.
+
+delete_fullmap(DbName) ->
+ case couch_db:open(<<"dbs">>, []) of
+ {ok, Db} ->
+ {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []),
+ couch_api:update_doc(Db, Doc#doc{deleted=true});
+ Error -> Error
+ end.
diff --git a/src/fabric_delete.erl b/src/fabric_delete.erl
deleted file mode 100644
index 9a65f72b..00000000
--- a/src/fabric_delete.erl
+++ /dev/null
@@ -1,65 +0,0 @@
--module(fabric_delete).
--author('Brad Anderson <brad@cloudant.com>').
-
--include("../../couch/src/couch_db.hrl").
--include("../../dynomite/include/membership.hrl").
-
-%% api
--export([delete_db/2]).
-
-
-%% =====================
-%% api
-%% =====================
-
-%% @doc Delete a database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
--spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
-delete_db(DbName, Options) ->
- Parts = partitions:all_parts(DbName),
- RefPartMap = send_calls(DbName, Options, Parts),
- Acc0 = {true, length(RefPartMap)},
- case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_delete_msg/3, Acc0, 5000, infinity) of
- {ok, _Results} ->
- delete_fullmap(DbName),
- ok;
- Error -> Error
- end.
-
-
-%% =====================
-%% internal
-%% =====================
-
-%% @doc delete the partitions on all appropriate nodes (rexi calls)
--spec send_calls(binary(), list(), fullmap()) -> [{reference(), part()}].
-send_calls(DbName, Options, Parts) ->
- lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) ->
- ShardName = showroom_utils:shard_name(Beg, DbName),
- Ref = rexi:async_server_call({couch_server, Node},
- {delete, ShardName, Options}),
- {Ref, Part}
- end, Parts).
-
-handle_delete_msg(_, not_found, {NotFound, N}) ->
- {ok, {NotFound, N-1}};
-handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) ->
- {ok, {NotFound, N-1}};
-handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) ->
- {error, delete_db_fubar};
-handle_delete_msg(_, _, {NotFound, 1}) ->
- if
- NotFound -> {stop, not_found};
- true -> {stop, ok}
- end;
-handle_delete_msg(_, ok, {_NotFound, N}) ->
- {ok, {false, N-1}}.
-
-delete_fullmap(DbName) ->
- case couch_db:open(<<"dbs">>, []) of
- {ok, Db} ->
- {ok, Doc} = couch_api:open_doc(Db, DbName, nil, []),
- couch_api:update_doc(Db, Doc#doc{deleted=true});
- Error -> Error
- end.
diff --git a/src/fabric_info.erl b/src/fabric_info.erl
deleted file mode 100644
index cb32eac0..00000000
--- a/src/fabric_info.erl
+++ /dev/null
@@ -1,122 +0,0 @@
--module(fabric_info).
-
--export([all_databases/1, get_db_info/2]).
-
--include("../../couch/src/couch_db.hrl").
--include("../../dynomite/include/membership.hrl").
-
-%% @doc gets all databases in the cluster.
--spec all_databases(binary() | []) -> [binary()].
-all_databases([]) ->
- Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
- new_acc(DbName, AccIn)
- end, [], partitions),
- {ok, Dbs};
-all_databases(Customer) ->
- ?debugFmt("~nCustomer: ~p~n", [Customer]),
- Dbs = ets:foldl(fun(#shard{dbname=DbName}, AccIn) ->
- DbNameStr = ?b2l(DbName),
- case string:str(DbNameStr, Customer) of
- 1 ->
- new_acc(DbNameStr, AccIn);
- _ -> AccIn
- end
- end, [], dbs_cache),
- {ok, Dbs}.
-
-%% @doc get database information tuple
-get_db_info(DbName, Customer) ->
- Name = cloudant_db_name(Customer, DbName),
- Parts = partitions:all_parts(DbName),
- RefPartMap = send_info_calls(DbName, Parts),
- Acc0 = {false, length(RefPartMap), lists:usort([ {Beg, nil} ||
- {_,#shard{range=[Beg,_]}} <- RefPartMap])},
- case fabric_util:receive_loop(
- RefPartMap, 1, fun handle_info_msg/3, Acc0, 5000, infinity) of
- {ok, ShardInfos} ->
- {ok, process_infos(ShardInfos, [{db_name, Name}])};
- Error -> Error
- end.
-
-
-%% =====================
-%% internal
-%% =====================
-
-new_acc(DbName, Acc) ->
- case lists:member(DbName, Acc) of
- true -> Acc;
- _ ->[DbName | Acc]
- end.
-
-send_info_calls(DbName, Parts) ->
- lists:map(fun(#shard{node=Node, range=[Beg,_]} = Part) ->
- ShardName = showroom_utils:shard_name(Beg, DbName),
- Ref = rexi:cast(Node, {fabric_rpc, get_db_info, ShardName}),
- {Ref, Part}
- end, Parts).
-
-handle_info_msg(_, _, {true, _, Infos0}) ->
- {stop, Infos0};
-handle_info_msg(_, _, {false, 1, Infos0}) ->
- MissingShards = lists:keyfind(nil, 2, Infos0),
- ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]),
- {error, get_db_info};
-handle_info_msg({_,#shard{range=[Beg,_]}}, {ok, Info}, {false, N, Infos0}) ->
- case couch_util:get_value(Beg, Infos0) of
- nil ->
- Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}),
- case is_complete(Infos) of
- true -> {ok, {true, N-1, Infos}};
- false -> {ok, {false, N-1, Infos}}
- end;
- _ ->
- {ok, {false, N-1, Infos0}}
- end;
-handle_info_msg(_, _Other, {Complete, N, Infos0}) ->
- {ok, {Complete, N-1, Infos0}}.
-
-
-is_complete(List) ->
- not lists:any(fun({_,nil}) -> true end, List).
-
-cloudant_db_name(Customer, FullName) ->
- case Customer of
- "" -> FullName;
- Name -> re:replace(FullName, [Name,"/"], "", [{return, binary}])
- end.
-
-%% Loop through Tasks on the flattened Infos and get the aggregated result
-process_infos(Infos, Initial) ->
- Tasks = [
- {doc_count, fun sum/2, 0},
- {doc_del_count, fun sum/2, 0},
- {update_seq, fun max/2, 1},
- {purge_seq, fun sum/2, 0},
- {compact_running, fun bool/2, 0},
- {disk_size, fun sum/2, 0},
- {instance_start_time, fun(_, _) -> <<"0">> end, 0},
- {disk_format_version, fun max/2, 0}],
-
- Infos1 = lists:flatten(Infos),
-
- Result = lists:map(fun({Type, Fun, Default}) ->
- {Type, process_info(Type, Fun, Default, Infos1)}
- end, Tasks),
- lists:flatten([Initial, Result]).
-
- process_info(Type, Fun, Default, List) ->
- lists:foldl(fun(V, AccIn) -> Fun(V, AccIn) end, Default,
- couch_util:get_all_values(Type, List)).
-
-sum(New, Existing) ->
- New + Existing.
-
-bool(New, Existing) ->
- New andalso Existing.
-
-max(New, Existing) ->
- case New > Existing of
- true -> New;
- false -> Existing
- end.
diff --git a/src/fabric_open.erl b/src/fabric_open.erl
deleted file mode 100644
index cab10c5d..00000000
--- a/src/fabric_open.erl
+++ /dev/null
@@ -1,47 +0,0 @@
--module(fabric_open).
-
--export([open_doc/4]).
-
--include("../../couch/src/couch_db.hrl").
-
-
-% open_db(<<"S", ShardFileName/binary>> = Name, Options) ->
-% case couch_db:open(ShardFileName, Options) of
-% {ok, Db} ->
-% {ok, Db#db{name = Name}};
-% {not_found, no_db_file} ->
-% {not_found, no_db_file}
-% end;
-%
-% open_db(DbName, Options) ->
-% Part = case lists:keyfind(node(), 1, membership2:all_nodes_parts(false)) of
-% {_, P} -> P;
-% _ -> throw({node_has_no_parts, node()})
-% end,
-% ShardName = partitions:shard_name(Part, DbName),
-% open_db(<<"S", ShardName/binary>>, Options).
-
-
-open_doc(DbName, DocId, _Revs, _Options) ->
- NPs = partitions:key_nodes_parts(DbName, DocId),
- ?debugFmt("~nNPs: ~p~n", [NPs]),
- {ok, #doc{}}.
-
-
-
-
-
-% open_doc(Db, DocId, Revs, Options) ->
-% {R,N} = get_quorum_constants(r, Options),
-% case cluster_ops:key_lookup(DocId, {dynomite_couch_api, get,
-% [Db, DocId, Revs, Options]}, r, R, N) of
-% {ok, [Doc|_Rest]} ->
-% {ok, Doc};
-% {ok, {not_found, Reason}, _Reasons} ->
-% {not_found, Reason};
-% [{error, Error}, {good, Good}, {bad, Bad}] ->
-% showroom_quorum_utils:handle_error(Error, Good, Bad);
-% Other ->
-% ?LOG_DEBUG("~nopen_doc Other: ~p~n", [Other]),
-% throw(Other)
-% end.
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 54f3b338..a0c0a568 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -13,7 +13,6 @@ open_doc(DbName, DocId, Revs, Options) ->
with_db(DbName, {couch_api, open_doc, [DocId, Revs, Options]}).
get_db_info(DbName) ->
- ?debugHere,
with_db(DbName, {couch_db, get_db_info, []}).
%%