summaryrefslogtreecommitdiff
path: root/src/fabric_delete.erl
blob: e77f813f8f54fd536a55e2877171f97992c4bbbc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
-module(fabric_delete).
-author('Brad Anderson <brad@cloudant.com>').

-include("../../couch/src/couch_db.hrl").
-include("../../dynomite/include/membership.hrl").

%% api
-export([delete_db/2]).


%% =====================
%%   api
%% =====================

%% @doc Delete a new database, and all its partition files across the cluster
%%      Options is proplist with user_ctx, n, q
-spec delete_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
delete_db(DbName, Options) ->
    Fullmap = partitions:fullmap(DbName, Options),
    RefNodePart = send_delete_calls(DbName, Options, Fullmap),
    {ok, Results} = delete_db_loop(RefNodePart),
    delete_results(Results, RefNodePart).


%delete_db(DbName, Options) ->
%    ResolveFun = fun(_Good) -> true end,
%    case cluster_ops:all_parts({dynomite_couch_api,delete_db,[DbName, Options]},
%                               d, true, ResolveFun) of
%    {ok, true} -> ok;
%    [{error, d_quorum_not_met}, {good, _Good}, {bad, Bad}] ->
%        showroom_utils:first_bad(Bad);
%    [{error, Error}, {good, _Good}, {bad, Bad}] ->
%        {Error, showroom_utils:first_bad(Bad)};
%    Other ->
%        ?debugFmt("~nOther: ~p~n", [Other]),
%        Other
%    end.

%% =====================
%%   internal
%% =====================

%% @doc delete the partitions on all appropriate nodes (rexi calls)
-spec send_delete_calls(binary(), list(), [mem_node()]) -> [{reference(), np()}].
send_delete_calls(DbName, Options, Fullmap) ->
    lists:map(fun({Node, Part}) ->
        ShardName = showroom_utils:shard_name(Part, DbName),
        Ref = rexi:async_server_call({couch_server, Node},
                                     {delete, ShardName, Options}),
        {Ref, {Node, Part}}
    end, Fullmap).

%% @doc set up the receive loop with an overall timeout
-spec delete_db_loop([ref_node_part()]) -> {ok, np_acc()}.
delete_db_loop(RefNodePart) ->
    TimeoutRef = erlang:make_ref(),
    {ok, TRef} = timer:send_after(5000, {timeout, TimeoutRef}),
    Results = delete_db_loop(RefNodePart, TimeoutRef, []),
    timer:cancel(TRef),
    Results.

%% @doc delete_db receive loop
%%      Acc is either an accumulation of responses, or if we've received all
%%      responses, it's {ok, Responses}
-spec delete_db_loop([ref_node_part()], tref(), np_acc()) ->
    np_acc() | {ok, np_acc()}.
delete_db_loop(_,_,{ok, Acc}) -> {ok, Acc};
delete_db_loop(RefNodePart, TimeoutRef, AccIn) ->
    receive
    {Ref, {ok, deleted}} when is_reference(Ref) ->
        AccOut = check_all_parts(Ref, RefNodePart, AccIn, ok),
        delete_db_loop(RefNodePart, TimeoutRef, AccOut);
    {Ref, Reply} when is_reference(Ref) ->
        AccOut = check_all_parts(Ref, RefNodePart, AccIn, Reply),
        delete_db_loop(RefNodePart, TimeoutRef, AccOut);
    {timeout, TimeoutRef} ->
        {error, timeout}
    end.

%% @doc check the results of the delete replies
%%      If we have a good reply from all partitions, return ok
-spec delete_results(np_acc(), [ref_node_part()]) ->
    ok | {error, delete_quorum_error}.
delete_results(Results, RefNodePart) ->
    ResultNPs = delete_result(Results, []),
    AllNPs = all_nodes_parts(RefNodePart),
    if
        ResultNPs =:= AllNPs -> ok;
        true -> {error, delete_quorum_error}
    end.

-spec delete_result(np_acc(), [np()]) -> [np()] | file_exists.
delete_result([], Acc) ->
    lists:sort(Acc);
delete_result([{NP, ok}|Rest], Acc) ->
    delete_result(Rest, [NP|Acc]);
delete_result([{_NP, {error, file_exists}}|_Rest], _Acc) ->
    {error, file_exists}; % if any replies were file_exists, return that
delete_result([{{_N,_P}, Result}|Rest], Acc) ->
    ?LOG_ERROR("delete_db error: ~p", [Result]),
    delete_result(Rest, Acc).

check_all_parts(Ref, RefNodePart, Acc, Reply) ->
    case couch_util:get_value(Ref, RefNodePart) of
    {Node, Part} ->
        case lists:keyfind({Node, Part}, 1, Acc) of
        true -> Acc; % already present... that's odd
        _ ->
            NewAcc = [{{Node, Part}, Reply} | Acc],
            case length(NewAcc) >= length(RefNodePart) of
            true -> {ok, NewAcc};
            _ -> NewAcc
            end
        end;
    _ -> Acc % ignore a non-matching Ref
    end.

%% @doc check that we have a good reply from each partition.
%%      If we do, return {ok, Acc}, if we don't, return Acc of partitions
%%      Three 'case' statements and one 'if', a personal best.  fml
%% @end
% check_distinct_parts(Ref, RefNodePart, Acc, Msg) ->
%     Parts = distinct_parts(RefNodePart),
%     case couch_util:get_value(Ref, RefNodePart) of
%     {Node, Part} ->
%         case lists:member(Part, Acc) of
%         true -> Acc;
%         _ ->
%             case Msg of
%             ok ->
%                 NewAcc = lists:usort([Part|Acc]),
%                 if
%                     Parts =:= NewAcc -> {ok, NewAcc};
%                     true -> NewAcc
%                 end;
%             _ ->
%                 Hex = showroom_utils:int_to_hexstr(Part),
%                 showroom_log:message(error,
%                     "delete_db reply error: ~p from ~p ~p", [Msg, Node, Hex]),
%                 Acc
%             end
%         end;
%     _ -> Acc % ignore a non-matching Ref
%     end.

all_nodes_parts(RefNodePart) ->
    {_Refs, NPs} = lists:unzip(RefNodePart),
    lists:sort(NPs).