summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/fabric_delete.erl148
-rw-r--r--src/fabric_open.erl61
2 files changed, 209 insertions, 0 deletions
diff --git a/src/fabric_delete.erl b/src/fabric_delete.erl
new file mode 100644
index 00000000..e77f813f
--- /dev/null
+++ b/src/fabric_delete.erl
@@ -0,0 +1,148 @@
+-module(fabric_delete).
+-author('Brad Anderson <brad@cloudant.com>').
+
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
+
+%% api
+-export([delete_db/2]).
+
+
+%% =====================
+%% api
+%% =====================
+
+%% @doc Delete a new database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+-spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
+delete_db(DbName, Options) ->
+ Fullmap = partitions:fullmap(DbName, Options),
+ RefNodePart = send_delete_calls(DbName, Options, Fullmap),
+ {ok, Results} = delete_db_loop(RefNodePart),
+ delete_results(Results, RefNodePart).
+
+
+%delete_db(DbName, Options) ->
+% ResolveFun = fun(_Good) -> true end,
+% case cluster_ops:all_parts({dynomite_couch_api,delete_db,[DbName, Options]},
+% d, true, ResolveFun) of
+% {ok, true} -> ok;
+% [{error, d_quorum_not_met}, {good, _Good}, {bad, Bad}] ->
+% showroom_utils:first_bad(Bad);
+% [{error, Error}, {good, _Good}, {bad, Bad}] ->
+% {Error, showroom_utils:first_bad(Bad)};
+% Other ->
+% ?debugFmt("~nOther: ~p~n", [Other]),
+% Other
+% end.
+
+%% =====================
+%% internal
+%% =====================
+
+%% @doc delete the partitions on all appropriate nodes (rexi calls)
+-spec send_delete_calls(binary(), list(), [mem_node()]) -> [{reference(), np()}].
+send_delete_calls(DbName, Options, Fullmap) ->
+ lists:map(fun({Node, Part}) ->
+ ShardName = showroom_utils:shard_name(Part, DbName),
+ Ref = rexi:async_server_call({couch_server, Node},
+ {delete, ShardName, Options}),
+ {Ref, {Node, Part}}
+ end, Fullmap).
+
+%% @doc set up the receive loop with an overall timeout
+-spec delete_db_loop([ref_node_part()]) -> {ok, np_acc()}.
+delete_db_loop(RefNodePart) ->
+ TimeoutRef = erlang:make_ref(),
+ {ok, TRef} = timer:send_after(5000, {timeout, TimeoutRef}),
+ Results = delete_db_loop(RefNodePart, TimeoutRef, []),
+ timer:cancel(TRef),
+ Results.
+
+%% @doc delete_db receive loop
+%% Acc is either an accumulation of responses, or if we've received all
+%% responses, it's {ok, Responses}
+-spec delete_db_loop([ref_node_part()], tref(), np_acc()) ->
+ np_acc() | {ok, np_acc()}.
+delete_db_loop(_,_,{ok, Acc}) -> {ok, Acc};
+delete_db_loop(RefNodePart, TimeoutRef, AccIn) ->
+ receive
+ {Ref, {ok, deleted}} when is_reference(Ref) ->
+ AccOut = check_all_parts(Ref, RefNodePart, AccIn, ok),
+ delete_db_loop(RefNodePart, TimeoutRef, AccOut);
+ {Ref, Reply} when is_reference(Ref) ->
+ AccOut = check_all_parts(Ref, RefNodePart, AccIn, Reply),
+ delete_db_loop(RefNodePart, TimeoutRef, AccOut);
+ {timeout, TimeoutRef} ->
+ {error, timeout}
+ end.
+
+%% @doc check the results of the delete replies
+%% If we have a good reply from all partitions, return ok
+-spec delete_results(np_acc(), [ref_node_part()]) ->
+ ok | {error, delete_quorum_error}.
+delete_results(Results, RefNodePart) ->
+ ResultNPs = delete_result(Results, []),
+ AllNPs = all_nodes_parts(RefNodePart),
+ if
+ ResultNPs =:= AllNPs -> ok;
+ true -> {error, delete_quorum_error}
+ end.
+
+-spec delete_result(np_acc(), [np()]) -> [np()] | file_exists.
+delete_result([], Acc) ->
+ lists:sort(Acc);
+delete_result([{NP, ok}|Rest], Acc) ->
+ delete_result(Rest, [NP|Acc]);
+delete_result([{_NP, {error, file_exists}}|_Rest], _Acc) ->
+ {error, file_exists}; % if any replies were file_exists, return that
+delete_result([{{_N,_P}, Result}|Rest], Acc) ->
+ ?LOG_ERROR("delete_db error: ~p", [Result]),
+ delete_result(Rest, Acc).
+
+check_all_parts(Ref, RefNodePart, Acc, Reply) ->
+ case couch_util:get_value(Ref, RefNodePart) of
+ {Node, Part} ->
+ case lists:keyfind({Node, Part}, 1, Acc) of
+ true -> Acc; % already present... that's odd
+ _ ->
+ NewAcc = [{{Node, Part}, Reply} | Acc],
+ case length(NewAcc) >= length(RefNodePart) of
+ true -> {ok, NewAcc};
+ _ -> NewAcc
+ end
+ end;
+ _ -> Acc % ignore a non-matching Ref
+ end.
+
+%% @doc check that we have a good reply from each partition.
+%% If we do, return {ok, Acc}, if we don't, return Acc of partitions
+%% Three 'case' statements and one 'if', a personal best. fml
+%% @end
+% check_distinct_parts(Ref, RefNodePart, Acc, Msg) ->
+% Parts = distinct_parts(RefNodePart),
+% case couch_util:get_value(Ref, RefNodePart) of
+% {Node, Part} ->
+% case lists:member(Part, Acc) of
+% true -> Acc;
+% _ ->
+% case Msg of
+% ok ->
+% NewAcc = lists:usort([Part|Acc]),
+% if
+% Parts =:= NewAcc -> {ok, NewAcc};
+% true -> NewAcc
+% end;
+% _ ->
+% Hex = showroom_utils:int_to_hexstr(Part),
+% showroom_log:message(error,
+% "delete_db reply error: ~p from ~p ~p", [Msg, Node, Hex]),
+% Acc
+% end
+% end;
+% _ -> Acc % ignore a non-matching Ref
+% end.
+
+all_nodes_parts(RefNodePart) ->
+ {_Refs, NPs} = lists:unzip(RefNodePart),
+ lists:sort(NPs).
diff --git a/src/fabric_open.erl b/src/fabric_open.erl
new file mode 100644
index 00000000..a8c0204e
--- /dev/null
+++ b/src/fabric_open.erl
@@ -0,0 +1,61 @@
+-module(fabric_open).
+
+-export([open_doc/4]).
+-export([open_doc_endpoint/4]).
+
+-include("../../couch/src/couch_db.hrl").
+
+
+% open_db(<<"S", ShardFileName/binary>> = Name, Options) ->
+% case couch_db:open(ShardFileName, Options) of
+% {ok, Db} ->
+% {ok, Db#db{name = Name}};
+% {not_found, no_db_file} ->
+% {not_found, no_db_file}
+% end;
+%
+% open_db(DbName, Options) ->
+% Part = case lists:keyfind(node(), 1, membership2:all_nodes_parts(false)) of
+% {_, P} -> P;
+% _ -> throw({node_has_no_parts, node()})
+% end,
+% ShardName = partitions:shard_name(Part, DbName),
+% open_db(<<"S", ShardName/binary>>, Options).
+
+
+open_doc(DbName, DocId, Revs, Options) ->
+ NPs = partitions:key_nodes_parts(DbName, DocId),
+ ?debugFmt("~nNPs: ~p~n", [NPs]),
+ ok.
+
+
+open_doc_endpoint(DbName, DocId, Revs, Options) ->
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ try
+ couch_api:open_doc(Db, DocId, Revs, Options)
+ after
+ couch_db:close(Db)
+ end;
+ {not_found, no_db_file} ->
+ throw({not_found, <<"The database does not exist.">>});
+ Error ->
+ throw(Error)
+ end.
+
+
+
+% open_doc(Db, DocId, Revs, Options) ->
+% {R,N} = get_quorum_constants(r, Options),
+% case cluster_ops:key_lookup(DocId, {dynomite_couch_api, get,
+% [Db, DocId, Revs, Options]}, r, R, N) of
+% {ok, [Doc|_Rest]} ->
+% {ok, Doc};
+% {ok, {not_found, Reason}, _Reasons} ->
+% {not_found, Reason};
+% [{error, Error}, {good, Good}, {bad, Bad}] ->
+% showroom_quorum_utils:handle_error(Error, Good, Bad);
+% Other ->
+% ?LOG_DEBUG("~nopen_doc Other: ~p~n", [Other]),
+% throw(Other)
+% end.