diff options
author | Filipe David Borba Manana <fdmanana@apache.org> | 2011-01-24 14:10:21 +0000 |
---|---|---|
committer | Filipe David Borba Manana <fdmanana@apache.org> | 2011-01-24 14:10:21 +0000 |
commit | 7e354ef1c1b675daff9be1a842bad1489dd6df8e (patch) | |
tree | 5e0f42c79a7b961ef9d7bc594fc1be75c804d0d4 | |
parent | e434d8ae3f605a5e122c8e13c29caae27ae61ce1 (diff) |
Merge revision 1062783 from trunk
Replicator DB: on restart, make several attempts to restart the replications
Now on restart, the replicator database listener will make up to 10 attempts
to restart each replication. Before each attempt, it waits, using an exponential
backoff strategy, before doing the next attempt.
This is very useful because when one server restarts, other servers that are
endpoints of its replications, may not be online yet.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1062784 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | share/www/script/test/replicator_db.js | 13 | ||||
-rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 73 |
2 files changed, 70 insertions, 16 deletions
diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js index 3c6a5d8e..277787b9 100644 --- a/share/www/script/test/replicator_db.js +++ b/share/www/script/test/replicator_db.js @@ -805,9 +805,16 @@ couchTests.replicator_db = function(debug) { restartServer(); continuous_replication_survives_restart(); - repDb.deleteDb(); - restartServer(); - run_on_modified_server(server_config, error_state_replication); +/* + * Disabled, since error state would be set on the document only after + * the exponential backoff retry done by the replicator database listener + * terminates, which takes too much time for a unit test. + */ +/* + * repDb.deleteDb(); + * restartServer(); + * run_on_modified_server(server_config, error_state_replication); + */ // cleanup diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 4e0a929e..819f167d 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -20,6 +20,8 @@ -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). +-define(MAX_RETRIES, 10). +-define(INITIAL_WAIT, 5). -record(state, { changes_feed_loop = nil, @@ -58,6 +60,29 @@ init(_) -> handle_call({rep_db_update, Change}, _From, State) -> {reply, ok, process_update(State, Change)}; +handle_call({triggered, {BaseId, _}}, _From, State) -> + case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of + [{BaseId, {DocId, true}}] -> + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}); + _ -> + ok + end, + {reply, ok, State}; + +handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> + DocId = get_value(<<"_id">>, Props), + [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), + ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " + "the document `~s`. Last error reason was: ~p", + [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]), + couch_rep:update_rep_doc( + RepDoc, + [{<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), + true = ets:delete(?DOC_ID_TO_REP_ID, DocId), + {reply, ok, State}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p", [Msg, From]), @@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, JsonRepDoc) -> {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), + Server = self(), Pid = spawn_link(fun() -> - start_replication(JsonRepDoc, RepId, UserCtx) + start_replication(Server, JsonRepDoc, RepId, UserCtx) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; - [{BaseId, DocId}] -> + [{BaseId, {DocId, _}}] -> State; - [{BaseId, OtherDocId}] -> + [{BaseId, {OtherDocId, false}}] -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + State; + [{BaseId, {OtherDocId, true}}] -> + ?LOG_INFO("The replication specified by the document `~s` is already" + " being triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), State end. @@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> end. -start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> ?LOG_INFO("Document `~s` triggered replication `~s`", [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), + ok = gen_server:call(Server, {triggered, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> - couch_rep:update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(Base)} - ] - ), - ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error]) + keep_retrying( + Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES) + end. + + +keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> + ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); + +keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> + ?LOG_ERROR("Error starting replication `~s`: ~p. " + "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]), + ok = timer:sleep(Wait * 1000), + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + Pid when is_pid(Pid) -> + ok = gen_server:call(Server, {triggered, RepId}, infinity), + {RepProps} = RepDoc, + ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", + [get_value(<<"_id">>, RepProps), pp_rep_id(RepId), + ?MAX_RETRIES - RetriesLeft + 1]), + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); + NewError -> + keep_retrying( + Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1) end. |