summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2011-01-24 14:10:21 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2011-01-24 14:10:21 +0000
commit7e354ef1c1b675daff9be1a842bad1489dd6df8e (patch)
tree5e0f42c79a7b961ef9d7bc594fc1be75c804d0d4
parente434d8ae3f605a5e122c8e13c29caae27ae61ce1 (diff)
Merge revision 1062783 from trunk
Replicator DB: on restart, make several attempts to restart the replications Now on restart, the replicator database listener will make up to 10 attempts to restart each replication. Before each attempt, it waits, using an exponential backoff strategy, before doing the next attempt. This is very useful because when one server restarts, other servers that are endpoints of its replications, may not be online yet. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1062784 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--share/www/script/test/replicator_db.js13
-rw-r--r--src/couchdb/couch_rep_db_listener.erl73
2 files changed, 70 insertions, 16 deletions
diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js
index 3c6a5d8e..277787b9 100644
--- a/share/www/script/test/replicator_db.js
+++ b/share/www/script/test/replicator_db.js
@@ -805,9 +805,16 @@ couchTests.replicator_db = function(debug) {
restartServer();
continuous_replication_survives_restart();
- repDb.deleteDb();
- restartServer();
- run_on_modified_server(server_config, error_state_replication);
+/*
+ * Disabled, since error state would be set on the document only after
+ * the exponential backoff retry done by the replicator database listener
+ * terminates, which takes too much time for a unit test.
+ */
+/*
+ * repDb.deleteDb();
+ * restartServer();
+ * run_on_modified_server(server_config, error_state_replication);
+ */
// cleanup
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index 4e0a929e..819f167d 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -20,6 +20,8 @@
-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
+-define(MAX_RETRIES, 10).
+-define(INITIAL_WAIT, 5).
-record(state, {
changes_feed_loop = nil,
@@ -58,6 +60,29 @@ init(_) ->
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
+handle_call({triggered, {BaseId, _}}, _From, State) ->
+ case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
+ [{BaseId, {DocId, true}}] ->
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
+ _ ->
+ ok
+ end,
+ {reply, ok, State};
+
+handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
+ DocId = get_value(<<"_id">>, Props),
+ [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+ ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
+ "the document `~s`. Last error reason was: ~p",
+ [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
+ couch_rep:update_rep_doc(
+ RepDoc,
+ [{<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+ true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+ {reply, ok, State};
+
handle_call(Msg, From, State) ->
?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
[Msg, From]),
@@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, JsonRepDoc) ->
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+ Server = self(),
Pid = spawn_link(fun() ->
- start_replication(JsonRepDoc, RepId, UserCtx)
+ start_replication(Server, JsonRepDoc, RepId, UserCtx)
end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, DocId}] ->
+ [{BaseId, {DocId, _}}] ->
State;
- [{BaseId, OtherDocId}] ->
+ [{BaseId, {OtherDocId, false}}] ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ State;
+ [{BaseId, {OtherDocId, true}}] ->
+ ?LOG_INFO("The replication specified by the document `~s` is already"
+ " being triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
State
end.
@@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
end.
-start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
+start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
[get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+ ok = gen_server:call(Server, {triggered, RepId}, infinity),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
- couch_rep:update_rep_doc(
- RepDoc,
- [
- {<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(Base)}
- ]
- ),
- ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
+ keep_retrying(
+ Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
+ end.
+
+
+keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
+ ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
+
+keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
+ ?LOG_ERROR("Error starting replication `~s`: ~p. "
+ "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]),
+ ok = timer:sleep(Wait * 1000),
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ ok = gen_server:call(Server, {triggered, RepId}, infinity),
+ {RepProps} = RepDoc,
+ ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
+ [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
+ ?MAX_RETRIES - RetriesLeft + 1]),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ NewError ->
+ keep_retrying(
+ Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
end.