From 7e354ef1c1b675daff9be1a842bad1489dd6df8e Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Mon, 24 Jan 2011 14:10:21 +0000 Subject: Merge revision 1062783 from trunk Replicator DB: on restart, make several attempts to restart the replications Now on restart, the replicator database listener will make up to 10 attempts to restart each replication. Before each attempt, it waits, using an exponential backoff strategy, before doing the next attempt. This is very useful because when one server restarts, other servers that are endpoints of its replications, may not be online yet. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1062784 13f79535-47bb-0310-9956-ffa450edef68 --- share/www/script/test/replicator_db.js | 13 ++++-- src/couchdb/couch_rep_db_listener.erl | 73 ++++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js index 3c6a5d8e..277787b9 100644 --- a/share/www/script/test/replicator_db.js +++ b/share/www/script/test/replicator_db.js @@ -805,9 +805,16 @@ couchTests.replicator_db = function(debug) { restartServer(); continuous_replication_survives_restart(); - repDb.deleteDb(); - restartServer(); - run_on_modified_server(server_config, error_state_replication); +/* + * Disabled, since error state would be set on the document only after + * the exponential backoff retry done by the replicator database listener + * terminates, which takes too much time for a unit test. + */ +/* + * repDb.deleteDb(); + * restartServer(); + * run_on_modified_server(server_config, error_state_replication); + */ // cleanup diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 4e0a929e..819f167d 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -20,6 +20,8 @@ -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). +-define(MAX_RETRIES, 10). +-define(INITIAL_WAIT, 5). -record(state, { changes_feed_loop = nil, @@ -58,6 +60,29 @@ init(_) -> handle_call({rep_db_update, Change}, _From, State) -> {reply, ok, process_update(State, Change)}; +handle_call({triggered, {BaseId, _}}, _From, State) -> + case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of + [{BaseId, {DocId, true}}] -> + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}); + _ -> + ok + end, + {reply, ok, State}; + +handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> + DocId = get_value(<<"_id">>, Props), + [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), + ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " + "the document `~s`. Last error reason was: ~p", + [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]), + couch_rep:update_rep_doc( + RepDoc, + [{<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), + true = ets:delete(?DOC_ID_TO_REP_ID, DocId), + {reply, ok, State}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p", [Msg, From]), @@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, JsonRepDoc) -> {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), + Server = self(), Pid = spawn_link(fun() -> - start_replication(JsonRepDoc, RepId, UserCtx) + start_replication(Server, JsonRepDoc, RepId, UserCtx) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; - [{BaseId, DocId}] -> + [{BaseId, {DocId, _}}] -> State; - [{BaseId, OtherDocId}] -> + [{BaseId, {OtherDocId, false}}] -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + State; + [{BaseId, {OtherDocId, true}}] -> + ?LOG_INFO("The replication specified by the document `~s` is already" + " being triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), State end. @@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> end. -start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> ?LOG_INFO("Document `~s` triggered replication `~s`", [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), + ok = gen_server:call(Server, {triggered, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> - couch_rep:update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(Base)} - ] - ), - ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error]) + keep_retrying( + Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES) + end. + + +keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> + ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); + +keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> + ?LOG_ERROR("Error starting replication `~s`: ~p. " + "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]), + ok = timer:sleep(Wait * 1000), + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + Pid when is_pid(Pid) -> + ok = gen_server:call(Server, {triggered, RepId}, infinity), + {RepProps} = RepDoc, + ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", + [get_value(<<"_id">>, RepProps), pp_rep_id(RepId), + ?MAX_RETRIES - RetriesLeft + 1]), + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); + NewError -> + keep_retrying( + Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1) end. -- cgit v1.2.3