summaryrefslogtreecommitdiff
path: root/deps/fabric/src/fabric_db_update_listener.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/fabric/src/fabric_db_update_listener.erl')
-rw-r--r--deps/fabric/src/fabric_db_update_listener.erl114
1 files changed, 114 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric_db_update_listener.erl b/deps/fabric/src/fabric_db_update_listener.erl
new file mode 100644
index 00000000..e29f3ec7
--- /dev/null
+++ b/deps/fabric/src/fabric_db_update_listener.erl
@@ -0,0 +1,114 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_update_listener).
+
+-export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(Parent, ParentRef, DbName, Timeout) ->
+ Notifiers = start_update_notifiers(DbName),
+ MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]),
+ RexiMon = rexi_monitor:start(MonRefs),
+ %% Add calling controller node as rexi end point as this controller will
+ %% receive messages from it
+ Workers = [{Parent, ParentRef} | Notifiers],
+ try
+ receive_results(Workers, {Workers, Parent, unset}, Timeout)
+ after
+ rexi_monitor:stop(RexiMon),
+ stop_update_notifiers(Notifiers)
+ end.
+
+start_update_notifiers(DbName) ->
+ lists:map(fun(#shard{node=Node, name=Name}) ->
+ {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
+ end, mem3:shards(DbName)).
+
+% rexi endpoint
+start_update_notifier(DbName) ->
+ {Caller, Ref} = get(rexi_from),
+ Fun = fun({_, X}) when X == DbName ->
+ erlang:send(Caller, {Ref, db_updated}); (_) -> ok end,
+ Id = {couch_db_update_notifier, make_ref()},
+ ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
+ receive {gen_event_EXIT, Id, Reason} ->
+ rexi:reply({gen_event_EXIT, DbName, Reason})
+ end.
+
+stop_update_notifiers(Notifiers) ->
+ [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
+
+stop({Pid, Ref}) ->
+ erlang:send(Pid, {Ref, done}).
+
+wait_db_updated({Pid, Ref}) ->
+ erlang:send(Pid, {Ref, get_state}),
+ receive
+ Any ->
+ Any
+ end.
+
+receive_results(Workers, State, Timeout) ->
+ case rexi_utils:recv(Workers, 2, fun handle_message/3, State,
+ infinity, Timeout) of
+ {timeout, {NewWorkers, Parent, State1}} ->
+ erlang:send(Parent, timeout),
+ State2 =
+ case State1 of
+ waiting ->
+ unset;
+ Any -> Any
+ end,
+ receive_results(NewWorkers, {NewWorkers, Parent, State2}, Timeout);
+ {_, NewState} ->
+ {ok, NewState}
+ end.
+
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, Parent, State}) ->
+ NewWorkers = lists:filter(fun({_Node, Ref}) -> NodeRef =/= Ref end, Workers),
+ case NewWorkers of
+ [] ->
+ {error, {nodedown, <<"progress not possible">>}};
+ _ ->
+ {ok, {NewWorkers, Parent, State}}
+ end;
+handle_message({rexi_EXIT, Reason}, Worker, {Workers, Parent, State}) ->
+ NewWorkers = lists:delete(Worker,Workers),
+ case NewWorkers of
+ [] ->
+ {error, Reason};
+ _ ->
+ {ok, {NewWorkers, Parent, State}}
+ end;
+handle_message(db_updated, {_Worker, _From}, {Workers, Parent, waiting}) ->
+ % propagate message to calling controller
+ erlang:send(Parent, updated),
+ {ok, {Workers, Parent, unset}};
+handle_message(db_updated, _Worker, {Workers, Parent, State})
+ when State == unset orelse State == updated ->
+ {ok, {Workers, Parent, updated}};
+handle_message(get_state, {_Worker, _From}, {Workers, Parent, unset}) ->
+ {ok, {Workers, Parent, waiting}};
+handle_message(get_state, {_Worker, _From}, {Workers, Parent, State}) ->
+ erlang:send(Parent, State),
+ {ok, {Workers, Parent, unset}};
+handle_message(done, _, _) ->
+ {stop, ok}.
+
+
+