summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep_db_listener.erl55
1 files changed, 31 insertions, 24 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index 819f167d..89a18e85 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -20,14 +20,14 @@
-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
--define(MAX_RETRIES, 10).
-define(INITIAL_WAIT, 5).
-record(state, {
changes_feed_loop = nil,
db_notifier = nil,
rep_db_name = nil,
- rep_start_pids = []
+ rep_start_pids = [],
+ max_retries
}).
-import(couch_util, [
@@ -41,40 +41,42 @@ start_link() ->
init(_) ->
process_flag(trap_exit, true),
- ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
- ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
+ _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
+ _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
Server = self(),
ok = couch_config:register(
fun("replicator", "db", NewName) ->
- ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
+ ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
+ ("replicator", "max_replication_retry_count", NewMaxRetries1) ->
+ NewMaxRetries = list_to_integer(NewMaxRetries1),
+ ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries})
end
),
{Loop, RepDbName} = changes_feed_loop(),
{ok, #state{
changes_feed_loop = Loop,
rep_db_name = RepDbName,
- db_notifier = db_update_notifier()}
- }.
+ db_notifier = db_update_notifier(),
+ max_retries = list_to_integer(
+ couch_config:get("replicator", "max_replication_retry_count", "10"))
+ }}.
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
handle_call({triggered, {BaseId, _}}, _From, State) ->
- case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
- [{BaseId, {DocId, true}}] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
- _ ->
- ok
- end,
+ [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
{reply, ok, State};
handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
DocId = get_value(<<"_id">>, Props),
- [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+ [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
+ ?DOC_ID_TO_REP_ID, DocId),
?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
"the document `~s`. Last error reason was: ~p",
- [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
+ [pp_rep_id(RepId), MaxRetries, DocId, Error]),
couch_rep:update_rep_doc(
RepDoc,
[{<<"_replication_state">>, <<"error">>},
@@ -103,6 +105,9 @@ handle_cast({rep_db_created, NewName},
handle_cast({rep_db_created, _NewName}, State) ->
{noreply, restart(State)};
+handle_cast({set_max_retries, MaxRetries}, State) ->
+ {noreply, State#state{max_retries = MaxRetries}};
+
handle_cast(Msg, State) ->
?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
{stop, {error, {unexpected_cast, Msg}}, State}.
@@ -259,16 +264,17 @@ rep_user_ctx({RepDoc}) ->
end.
-maybe_start_replication(State, DocId, JsonRepDoc) ->
+maybe_start_replication(#state{max_retries = MaxRetries} = State,
+ DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
- true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+ true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
Server = self(),
Pid = spawn_link(fun() ->
- start_replication(Server, JsonRepDoc, RepId, UserCtx)
+ start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
[{BaseId, {DocId, _}}] ->
@@ -295,7 +301,7 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
end.
-start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
+start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
@@ -304,7 +310,7 @@ start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
keep_retrying(
- Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
+ Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
end.
@@ -319,9 +325,10 @@ keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
Pid when is_pid(Pid) ->
ok = gen_server:call(Server, {triggered, RepId}, infinity),
{RepProps} = RepDoc,
+ DocId = get_value(<<"_id">>, RepProps),
+ [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
- [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
- ?MAX_RETRIES - RetriesLeft + 1]),
+ [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
NewError ->
keep_retrying(
@@ -351,7 +358,7 @@ replication_complete(DocId) ->
stop_replication(DocId) ->
case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
- [{DocId, {BaseId, _} = RepId}] ->
+ [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
couch_rep:end_replication(RepId),
true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
@@ -365,7 +372,7 @@ stop_all_replications() ->
?LOG_INFO("Stopping all ongoing replications because the replicator DB "
"was deleted or changed", []),
ets:foldl(
- fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
+ fun({_, {RepId, _}}, _) -> couch_rep:end_replication(RepId) end,
ok,
?DOC_ID_TO_REP_ID
),