diff options
-rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 73 |
1 files changed, 63 insertions, 10 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index e4e8b246..1448c56f 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -22,9 +22,10 @@ -define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). -record(state, { - changes_feed_loop, - changes_queue, - changes_processor + changes_feed_loop = nil, + changes_queue = nil, + changes_processor = nil, + db_notifier = nil }). @@ -40,35 +41,57 @@ init(_) -> Server = self(), ok = couch_config:register( fun("replicator", "db") -> - ok = gen_server:call(Server, rep_db_changed, infinity) + ok = gen_server:cast(Server, rep_db_changed) end ), {ok, #state{ changes_feed_loop = Loop, changes_queue = Queue, - changes_processor = Processor} + changes_processor = Processor, + db_notifier = db_update_notifier()} }. -handle_call(rep_db_changed, _From, State) -> + +handle_call(Msg, From, State) -> + ?LOG_ERROR("Replicator DB listener receive unexpected call ~p from ~p", + [Msg, From]), + {stop, {error, {unexpected_call, Msg}}, State}. + + +handle_cast(rep_db_changed, State) -> #state{ changes_feed_loop = Loop, changes_queue = Queue } = State, exit(Loop, rep_db_changed), + couch_work_queue:queue(Queue, stop_all_replications), {ok, NewLoop} = changes_feed_loop(Queue), - {reply, ok, State#state{changes_feed_loop = NewLoop}}. + {noreply, State#state{changes_feed_loop = NewLoop}}; +handle_cast(rep_db_created, #state{changes_feed_loop = nil} = State) -> + {ok, NewLoop} = changes_feed_loop(State#state.changes_queue), + {noreply, State#state{changes_feed_loop = NewLoop}}; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + ?LOG_ERROR("Replicator DB listener receive unexpected cast ~p", [Msg]), + {stop, {error, {unexpected_cast, Msg}}, State}. +handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) -> + % replicator DB deleted + couch_work_queue:queue(State#state.changes_queue, stop_all_replications), + {noreply, State#state{changes_feed_loop = nil}}; + +handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> + ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), + {stop, {db_update_notifier_died, Reason}, State}; + handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) -> {noreply, State}; handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) -> ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]), - {stop, rep_db_changes_processor_error, State}. + {stop, {rep_db_changes_processor_died, Reason}, State}. terminate(_Reason, State) -> @@ -118,6 +141,23 @@ changes_feed_loop(ChangesQueue) -> {ok, Pid}. +db_update_notifier() -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({created, DbName}) -> + case ?l2b(couch_config:get("replicator", "db", "_replicator")) of + DbName -> + ok = gen_server:cast(Server, rep_db_created); + _ -> + ok + end; + (_) -> + ok + end + ), + Notifier. + + changes_processor(ChangesQueue) -> Pid = spawn_link( fun() -> @@ -148,6 +188,10 @@ has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> has_valid_rep_id(_Else) -> true. +process_change(stop_all_replications) -> + ?LOG_INFO("Stopping all ongoing replications because the replicator DB " + "was deleted or changed", []), + stop_all_replications(); process_change({Change}) -> {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), @@ -257,3 +301,12 @@ stop_replication(DocId) -> [] -> none end. + +stop_all_replications() -> + ets:foldl( + fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end, + ok, + ?DOC_TO_REP_ID_MAP + ), + true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP), + true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP). |