summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep_db_listener.erl73
1 files changed, 63 insertions, 10 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index e4e8b246..1448c56f 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -22,9 +22,10 @@
-define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
-record(state, {
- changes_feed_loop,
- changes_queue,
- changes_processor
+ changes_feed_loop = nil,
+ changes_queue = nil,
+ changes_processor = nil,
+ db_notifier = nil
}).
@@ -40,35 +41,57 @@ init(_) ->
Server = self(),
ok = couch_config:register(
fun("replicator", "db") ->
- ok = gen_server:call(Server, rep_db_changed, infinity)
+ ok = gen_server:cast(Server, rep_db_changed)
end
),
{ok, #state{
changes_feed_loop = Loop,
changes_queue = Queue,
- changes_processor = Processor}
+ changes_processor = Processor,
+ db_notifier = db_update_notifier()}
}.
-handle_call(rep_db_changed, _From, State) ->
+
+handle_call(Msg, From, State) ->
+ ?LOG_ERROR("Replicator DB listener receive unexpected call ~p from ~p",
+ [Msg, From]),
+ {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast(rep_db_changed, State) ->
#state{
changes_feed_loop = Loop,
changes_queue = Queue
} = State,
exit(Loop, rep_db_changed),
+ couch_work_queue:queue(Queue, stop_all_replications),
{ok, NewLoop} = changes_feed_loop(Queue),
- {reply, ok, State#state{changes_feed_loop = NewLoop}}.
+ {noreply, State#state{changes_feed_loop = NewLoop}};
+handle_cast(rep_db_created, #state{changes_feed_loop = nil} = State) ->
+ {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
+ {noreply, State#state{changes_feed_loop = NewLoop}};
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(Msg, State) ->
+ ?LOG_ERROR("Replicator DB listener receive unexpected cast ~p", [Msg]),
+ {stop, {error, {unexpected_cast, Msg}}, State}.
+handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
+ % replicator DB deleted
+ couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
+ {noreply, State#state{changes_feed_loop = nil}};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+ ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
+ {stop, {db_update_notifier_died, Reason}, State};
+
handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) ->
{noreply, State};
handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
- {stop, rep_db_changes_processor_error, State}.
+ {stop, {rep_db_changes_processor_died, Reason}, State}.
terminate(_Reason, State) ->
@@ -118,6 +141,23 @@ changes_feed_loop(ChangesQueue) ->
{ok, Pid}.
+db_update_notifier() ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({created, DbName}) ->
+ case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
+ DbName ->
+ ok = gen_server:cast(Server, rep_db_created);
+ _ ->
+ ok
+ end;
+ (_) ->
+ ok
+ end
+ ),
+ Notifier.
+
+
changes_processor(ChangesQueue) ->
Pid = spawn_link(
fun() ->
@@ -148,6 +188,10 @@ has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
has_valid_rep_id(_Else) ->
true.
+process_change(stop_all_replications) ->
+ ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
+ "was deleted or changed", []),
+ stop_all_replications();
process_change({Change}) ->
{RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
@@ -257,3 +301,12 @@ stop_replication(DocId) ->
[] ->
none
end.
+
+stop_all_replications() ->
+ ets:foldl(
+ fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
+ ok,
+ ?DOC_TO_REP_ID_MAP
+ ),
+ true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
+ true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).