path: root/src/couchdb/couch_rep_db_listener.erl
diff options
Diffstat (limited to 'src/couchdb/couch_rep_db_listener.erl')
1 files changed, 60 insertions, 13 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index 4e0a929e..819f167d 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -20,6 +20,8 @@
-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
+-define(MAX_RETRIES, 10).
+-define(INITIAL_WAIT, 5).
-record(state, {
changes_feed_loop = nil,
@@ -58,6 +60,29 @@ init(_) ->
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
+handle_call({triggered, {BaseId, _}}, _From, State) ->
+ case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
+ [{BaseId, {DocId, true}}] ->
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
+ _ ->
+ ok
+ end,
+ {reply, ok, State};
+handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
+ DocId = get_value(<<"_id">>, Props),
+ [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+ ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
+ "the document `~s`. Last error reason was: ~p",
+ [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
+ couch_rep:update_rep_doc(
+ RepDoc,
+ [{<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+ true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+ {reply, ok, State};
handle_call(Msg, From, State) ->
?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
[Msg, From]),
@@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, JsonRepDoc) ->
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+ Server = self(),
Pid = spawn_link(fun() ->
- start_replication(JsonRepDoc, RepId, UserCtx)
+ start_replication(Server, JsonRepDoc, RepId, UserCtx)
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, DocId}] ->
+ [{BaseId, {DocId, _}}] ->
- [{BaseId, OtherDocId}] ->
+ [{BaseId, {OtherDocId, false}}] ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ State;
+ [{BaseId, {OtherDocId, true}}] ->
+ ?LOG_INFO("The replication specified by the document `~s` is already"
+ " being triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
@@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
-start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
+start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
[get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+ ok = gen_server:call(Server, {triggered, RepId}, infinity),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
- couch_rep:update_rep_doc(
- RepDoc,
- [
- {<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(Base)}
- ]
- ),
- ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
+ keep_retrying(
+ Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
+ end.
+keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
+ ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
+keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
+ ?LOG_ERROR("Error starting replication `~s`: ~p. "
+ "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]),
+ ok = timer:sleep(Wait * 1000),
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ ok = gen_server:call(Server, {triggered, RepId}, infinity),
+ {RepProps} = RepDoc,
+ ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
+ [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
+ ?MAX_RETRIES - RetriesLeft + 1]),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ NewError ->
+ keep_retrying(
+ Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)