diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index 6f1f0443..4e0a929e 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -18,98 +18,113 @@
--define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
+-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
+-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
-record(state, {
changes_feed_loop = nil,
- changes_queue = nil,
- changes_processor = nil,
- db_notifier = nil
+ db_notifier = nil,
+ rep_db_name = nil,
+ rep_start_pids = []
+-import(couch_util, [
+ get_value/2,
+ get_value/3
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
process_flag(trap_exit, true),
- {ok, Queue} = couch_work_queue:new(
- [{max_size, 1024 * 1024}, {max_items, 1000}]),
- {ok, Processor} = changes_processor(Queue),
- {ok, Loop} = changes_feed_loop(Queue),
+ ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
+ ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
Server = self(),
ok = couch_config:register(
- fun("replicator", "db") ->
- ok = gen_server:cast(Server, rep_db_changed)
+ fun("replicator", "db", NewName) ->
+ ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
+ {Loop, RepDbName} = changes_feed_loop(),
{ok, #state{
changes_feed_loop = Loop,
- changes_queue = Queue,
- changes_processor = Processor,
+ rep_db_name = RepDbName,
db_notifier = db_update_notifier()}
+handle_call({rep_db_update, Change}, _From, State) ->
+ {reply, ok, process_update(State, Change)};
handle_call(Msg, From, State) ->
?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
[Msg, From]),
{stop, {error, {unexpected_call, Msg}}, State}.
-handle_cast(rep_db_changed, State) ->
- #state{
- changes_feed_loop = Loop,
- changes_queue = Queue
- } = State,
- catch unlink(Loop),
- catch exit(Loop, rep_db_changed),
- couch_work_queue:queue(Queue, stop_all_replications),
- {ok, NewLoop} = changes_feed_loop(Queue),
- {noreply, State#state{changes_feed_loop = NewLoop}};
-handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) ->
- catch unlink(Loop),
- catch exit(Loop, rep_db_changed),
- {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
- {noreply, State#state{changes_feed_loop = NewLoop}};
+handle_cast({rep_db_changed, NewName},
+ #state{rep_db_name = NewName} = State) ->
+ {noreply, State};
+handle_cast({rep_db_changed, _NewName}, State) ->
+ {noreply, restart(State)};
+handle_cast({rep_db_created, NewName},
+ #state{rep_db_name = NewName} = State) ->
+ {noreply, State};
+handle_cast({rep_db_created, _NewName}, State) ->
+ {noreply, restart(State)};
handle_cast(Msg, State) ->
?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
{stop, {error, {unexpected_cast, Msg}}, State}.
handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
% replicator DB deleted
- couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
- {noreply, State#state{changes_feed_loop = nil}};
+ {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
{stop, {db_update_notifier_died, Reason}, State};
-handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
- ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
- {stop, {rep_db_changes_processor_died, Reason}, State}.
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+ % one of the replication start processes terminated successfully
+ {noreply, State#state{rep_start_pids = Pids -- [From]}};
+handle_info(Msg, State) ->
+ ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]),
+ {stop, {unexpected_msg, Msg}, State}.
terminate(_Reason, State) ->
+ rep_start_pids = StartPids,
changes_feed_loop = Loop,
- changes_queue = Queue
+ db_notifier = Notifier
} = State,
- exit(Loop, stop),
- % closing the queue will cause changes_processor to shutdown
- couch_work_queue:close(Queue),
- ok.
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, stop)
+ end,
+ [Loop | StartPids]),
+ true = ets:delete(?REP_ID_TO_DOC_ID),
+ true = ets:delete(?DOC_ID_TO_REP_ID),
+ couch_db_update_notifier:stop(Notifier).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-changes_feed_loop(ChangesQueue) ->
+changes_feed_loop() ->
{ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+ Server = self(),
Pid = spawn_link(
fun() ->
ChangesFeedFun = couch_changes:handle_changes(
@@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) ->
fun({change, Change, _}, _) ->
case has_valid_rep_id(Change) of
true ->
- couch_work_queue:queue(ChangesQueue, Change);
+ ok = gen_server:call(
+ Server, {rep_db_update, Change}, infinity);
false ->
@@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) ->
- {ok, Pid}.
+ {Pid, couch_db:name(RepDb)}.
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
db_update_notifier() ->
@@ -146,121 +170,106 @@ db_update_notifier() ->
fun({created, DbName}) ->
case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
DbName ->
- ok = gen_server:cast(Server, rep_db_created);
+ ok = gen_server:cast(Server, {rep_db_created, DbName});
_ ->
(_) ->
+ % no need to handle the 'deleted' event - the changes feed loop
+ % dies when the database is deleted
-changes_processor(ChangesQueue) ->
- Pid = spawn_link(
- fun() ->
- ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
- ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
- consume_changes(ChangesQueue),
- true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
- true = ets:delete(?DOC_TO_REP_ID_MAP)
- end
- ),
- {ok, Pid}.
-consume_changes(ChangesQueue) ->
- case couch_work_queue:dequeue(ChangesQueue) of
- closed ->
- ok;
- {ok, Changes} ->
- lists:foreach(fun process_change/1, Changes),
- consume_changes(ChangesQueue)
- end.
-has_valid_rep_id({Change}) ->
- has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
- false;
-has_valid_rep_id(_Else) ->
- true.
+restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, rep_db_changed)
+ end,
+ [Loop | StartPids]),
+ {NewLoop, NewRepDbName} = changes_feed_loop(),
+ State#state{
+ changes_feed_loop = NewLoop,
+ rep_db_name = NewRepDbName,
+ rep_start_pids = []
+ }.
-process_change(stop_all_replications) ->
- ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
- "was deleted or changed", []),
- stop_all_replications();
-process_change({Change}) ->
- {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
- DocId = couch_util:get_value(<<"_id">>, RepProps),
- case couch_util:get_value(<<"deleted">>, Change, false) of
+process_update(State, {Change}) ->
+ {RepProps} = JsonRepDoc = get_value(doc, Change),
+ DocId = get_value(<<"_id">>, RepProps),
+ case get_value(<<"deleted">>, Change, false) of
true ->
- rep_doc_deleted(DocId);
+ rep_doc_deleted(DocId),
+ State;
false ->
- case couch_util:get_value(<<"_replication_state">>, RepProps) of
+ case get_value(<<"_replication_state">>, RepProps) of
<<"completed">> ->
- replication_complete(DocId);
+ replication_complete(DocId),
+ State;
<<"error">> ->
- stop_replication(DocId);
+ stop_replication(DocId),
+ State;
<<"triggered">> ->
- maybe_start_replication(DocId, JsonRepDoc);
+ maybe_start_replication(State, DocId, JsonRepDoc);
undefined ->
- maybe_start_replication(DocId, JsonRepDoc);
- _ ->
- ?LOG_ERROR("Invalid value for the `_replication_state` property"
- " of the replication document `~s`", [DocId])
+ maybe_start_replication(State, DocId, JsonRepDoc)
- end,
- ok.
+ end.
rep_user_ctx({RepDoc}) ->
- case couch_util:get_value(<<"user_ctx">>, RepDoc) of
+ case get_value(<<"user_ctx">>, RepDoc) of
undefined ->
#user_ctx{roles = [<<"_admin">>]};
{UserCtx} ->
- name = couch_util:get_value(<<"name">>, UserCtx, null),
- roles = couch_util:get_value(<<"roles">>, UserCtx, [])
+ name = get_value(<<"name">>, UserCtx, null),
+ roles = get_value(<<"roles">>, UserCtx, [])
-maybe_start_replication(DocId, JsonRepDoc) ->
+maybe_start_replication(State, DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
- case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of
+ case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
- true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}),
- true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
- spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+ true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+ Pid = spawn_link(fun() ->
+ start_replication(JsonRepDoc, RepId, UserCtx)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
[{BaseId, DocId}] ->
- ok;
+ State;
[{BaseId, OtherDocId}] ->
- maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId)
+ ?LOG_INFO("The replication specified by the document `~s` was already"
+ " triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ State
-maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) ->
- case couch_util:get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
+ case get_value(<<"_replication_id">>, Props) of
RepId ->
_ ->
- ?LOG_INFO("The replication specified by the document `~s` was already"
- " triggered by the document `~s`", [DocId, OtherDocId]),
couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
-start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
+start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- RepPid when is_pid(RepPid) ->
+ Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
- [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]),
- couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
+ [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
@@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
{<<"_replication_id">>, ?l2b(Base)}
- ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error])
+ ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
rep_doc_deleted(DocId) ->
case stop_replication(DocId) of
- {ok, {Base, Ext}} ->
+ {ok, RepId} ->
?LOG_INFO("Stopped replication `~s` because replication document `~s`"
- " was deleted", [Base ++ Ext, DocId]);
+ " was deleted", [pp_rep_id(RepId), DocId]);
none ->
replication_complete(DocId) ->
case stop_replication(DocId) of
- {ok, {Base, Ext}} ->
+ {ok, RepId} ->
?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
- [Base ++ Ext, DocId]);
+ [pp_rep_id(RepId), DocId]);
none ->
stop_replication(DocId) ->
- case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
+ case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
[{DocId, {BaseId, _} = RepId}] ->
- true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId),
- true = ets:delete(?DOC_TO_REP_ID_MAP, DocId),
+ true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+ true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
{ok, RepId};
[] ->
stop_all_replications() ->
+ ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
+ "was deleted or changed", []),
fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
- true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
- true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).
+ true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
+ true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+% pretty-print replication id
+pp_rep_id({Base, Extension}) ->
+ Base ++ Extension.