summaryrefslogtreecommitdiff
path: root/deps/fabric/src/fabric_db_update_listener.erl
blob: e29f3ec7146e879a10b2fa09bf8d954c1688bbb7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
% Copyright 2010 Cloudant
%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric_db_update_listener).

-export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).

-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").

go(Parent, ParentRef, DbName, Timeout) ->
    Notifiers = start_update_notifiers(DbName),
    MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]),
    RexiMon = rexi_monitor:start(MonRefs),
    %% Add calling controller node as rexi end point as this controller will
    %% receive messages from it
    Workers = [{Parent, ParentRef} | Notifiers],
    try
        receive_results(Workers, {Workers, Parent, unset}, Timeout)
    after
        rexi_monitor:stop(RexiMon),
        stop_update_notifiers(Notifiers)
    end.

start_update_notifiers(DbName) ->
    lists:map(fun(#shard{node=Node, name=Name}) ->
        {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
    end, mem3:shards(DbName)).

% rexi endpoint
start_update_notifier(DbName) ->
    {Caller, Ref} = get(rexi_from),
    Fun = fun({_, X}) when X == DbName ->
              erlang:send(Caller, {Ref, db_updated}); (_) -> ok end,
    Id = {couch_db_update_notifier, make_ref()},
    ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
    receive {gen_event_EXIT, Id, Reason} ->
        rexi:reply({gen_event_EXIT, DbName, Reason})
    end.

stop_update_notifiers(Notifiers) ->
    [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].

stop({Pid, Ref}) ->
    erlang:send(Pid, {Ref, done}).

wait_db_updated({Pid, Ref}) ->
    erlang:send(Pid, {Ref, get_state}),
    receive
    Any ->
        Any
    end.

receive_results(Workers, State, Timeout) ->
    case rexi_utils:recv(Workers, 2, fun handle_message/3, State,
            infinity, Timeout) of
    {timeout, {NewWorkers, Parent, State1}} ->
        erlang:send(Parent, timeout),
        State2 =
            case State1 of
            waiting ->
                unset;
            Any -> Any
            end,
        receive_results(NewWorkers, {NewWorkers, Parent, State2}, Timeout);
    {_, NewState} ->
        {ok, NewState}
    end.


handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, Parent, State}) ->
    NewWorkers = lists:filter(fun({_Node, Ref}) -> NodeRef =/= Ref end, Workers),
    case NewWorkers of
    [] ->
        {error, {nodedown, <<"progress not possible">>}};
    _ ->
        {ok, {NewWorkers, Parent, State}}
    end;
handle_message({rexi_EXIT, Reason}, Worker, {Workers, Parent, State}) ->
    NewWorkers = lists:delete(Worker,Workers),
    case NewWorkers of
    [] ->
        {error, Reason};
    _ ->
        {ok, {NewWorkers, Parent, State}}
    end;
handle_message(db_updated, {_Worker, _From}, {Workers, Parent, waiting}) ->
    % propagate message to calling controller
    erlang:send(Parent, updated),
    {ok, {Workers, Parent, unset}};
handle_message(db_updated, _Worker, {Workers, Parent, State})
  when State == unset orelse State == updated ->
    {ok, {Workers, Parent, updated}};
handle_message(get_state, {_Worker, _From}, {Workers, Parent, unset}) ->
    {ok, {Workers, Parent, waiting}};
handle_message(get_state, {_Worker, _From}, {Workers, Parent, State}) ->
    erlang:send(Parent, State),
    {ok, {Workers, Parent, unset}};
handle_message(done, _, _) ->
    {stop, ok}.