diff options
-rw-r--r-- | share/www/script/test/replicator_db.js | 13 | ||||
-rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 73 |
2 files changed, 70 insertions, 16 deletions
diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js index 3c6a5d8e..277787b9 100644 --- a/share/www/script/test/replicator_db.js +++ b/share/www/script/test/replicator_db.js @@ -805,9 +805,16 @@ couchTests.replicator_db = function(debug) { restartServer(); continuous_replication_survives_restart(); - repDb.deleteDb(); - restartServer(); - run_on_modified_server(server_config, error_state_replication); +/* + * Disabled, since error state would be set on the document only after + * the exponential backoff retry done by the replicator database listener + * terminates, which takes too much time for a unit test. + */ +/* + * repDb.deleteDb(); + * restartServer(); + * run_on_modified_server(server_config, error_state_replication); + */ // cleanup diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 4e0a929e..819f167d 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -20,6 +20,8 @@ -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). +-define(MAX_RETRIES, 10). +-define(INITIAL_WAIT, 5). -record(state, { changes_feed_loop = nil, @@ -58,6 +60,29 @@ init(_) -> handle_call({rep_db_update, Change}, _From, State) -> {reply, ok, process_update(State, Change)}; +handle_call({triggered, {BaseId, _}}, _From, State) -> + case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of + [{BaseId, {DocId, true}}] -> + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}); + _ -> + ok + end, + {reply, ok, State}; + +handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> + DocId = get_value(<<"_id">>, Props), + [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), + ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " + "the document `~s`. Last error reason was: ~p", + [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]), + couch_rep:update_rep_doc( + RepDoc, + [{<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), + true = ets:delete(?DOC_ID_TO_REP_ID, DocId), + {reply, ok, State}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p", [Msg, From]), @@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, JsonRepDoc) -> {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), + Server = self(), Pid = spawn_link(fun() -> - start_replication(JsonRepDoc, RepId, UserCtx) + start_replication(Server, JsonRepDoc, RepId, UserCtx) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; - [{BaseId, DocId}] -> + [{BaseId, {DocId, _}}] -> State; - [{BaseId, OtherDocId}] -> + [{BaseId, {OtherDocId, false}}] -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + State; + [{BaseId, {OtherDocId, true}}] -> + ?LOG_INFO("The replication specified by the document `~s` is already" + " being triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), State end. @@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> end. -start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> ?LOG_INFO("Document `~s` triggered replication `~s`", [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), + ok = gen_server:call(Server, {triggered, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> - couch_rep:update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(Base)} - ] - ), - ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error]) + keep_retrying( + Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES) + end. + + +keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> + ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); + +keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> + ?LOG_ERROR("Error starting replication `~s`: ~p. " + "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]), + ok = timer:sleep(Wait * 1000), + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + Pid when is_pid(Pid) -> + ok = gen_server:call(Server, {triggered, RepId}, infinity), + {RepProps} = RepDoc, + ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", + [get_value(<<"_id">>, RepProps), pp_rep_id(RepId), + ?MAX_RETRIES - RetriesLeft + 1]), + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); + NewError -> + keep_retrying( + Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1) end. |