diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 55 |
1 files changed, 31 insertions, 24 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 819f167d..89a18e85 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -20,14 +20,14 @@ -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). --define(MAX_RETRIES, 10). -define(INITIAL_WAIT, 5). -record(state, { changes_feed_loop = nil, db_notifier = nil, rep_db_name = nil, - rep_start_pids = [] + rep_start_pids = [], + max_retries }). -import(couch_util, [ @@ -41,40 +41,42 @@ start_link() -> init(_) -> process_flag(trap_exit, true), - ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]), - ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), + _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]), + _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), Server = self(), ok = couch_config:register( fun("replicator", "db", NewName) -> - ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}) + ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}); + ("replicator", "max_replication_retry_count", NewMaxRetries1) -> + NewMaxRetries = list_to_integer(NewMaxRetries1), + ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries}) end ), {Loop, RepDbName} = changes_feed_loop(), {ok, #state{ changes_feed_loop = Loop, rep_db_name = RepDbName, - db_notifier = db_update_notifier()} - }. + db_notifier = db_update_notifier(), + max_retries = list_to_integer( + couch_config:get("replicator", "max_replication_retry_count", "10")) + }}. handle_call({rep_db_update, Change}, _From, State) -> {reply, ok, process_update(State, Change)}; handle_call({triggered, {BaseId, _}}, _From, State) -> - case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of - [{BaseId, {DocId, true}}] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}); - _ -> - ok - end, + [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}), {reply, ok, State}; handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> DocId = get_value(<<"_id">>, Props), - [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), + [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup( + ?DOC_ID_TO_REP_ID, DocId), ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " "the document `~s`. Last error reason was: ~p", - [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]), + [pp_rep_id(RepId), MaxRetries, DocId, Error]), couch_rep:update_rep_doc( RepDoc, [{<<"_replication_state">>, <<"error">>}, @@ -103,6 +105,9 @@ handle_cast({rep_db_created, NewName}, handle_cast({rep_db_created, _NewName}, State) -> {noreply, restart(State)}; +handle_cast({set_max_retries, MaxRetries}, State) -> + {noreply, State#state{max_retries = MaxRetries}}; + handle_cast(Msg, State) -> ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]), {stop, {error, {unexpected_cast, Msg}}, State}. @@ -259,16 +264,17 @@ rep_user_ctx({RepDoc}) -> end. -maybe_start_replication(State, DocId, JsonRepDoc) -> +maybe_start_replication(#state{max_retries = MaxRetries} = State, + DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), - true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), + true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}), Server = self(), Pid = spawn_link(fun() -> - start_replication(Server, JsonRepDoc, RepId, UserCtx) + start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; [{BaseId, {DocId, _}}] -> @@ -295,7 +301,7 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> end. -start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) -> +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> ?LOG_INFO("Document `~s` triggered replication `~s`", @@ -304,7 +310,7 @@ start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) -> couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> keep_retrying( - Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES) + Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries) end. @@ -319,9 +325,10 @@ keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> Pid when is_pid(Pid) -> ok = gen_server:call(Server, {triggered, RepId}, infinity), {RepProps} = RepDoc, + DocId = get_value(<<"_id">>, RepProps), + [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", - [get_value(<<"_id">>, RepProps), pp_rep_id(RepId), - ?MAX_RETRIES - RetriesLeft + 1]), + [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); NewError -> keep_retrying( @@ -351,7 +358,7 @@ replication_complete(DocId) -> stop_replication(DocId) -> case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of - [{DocId, {BaseId, _} = RepId}] -> + [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] -> couch_rep:end_replication(RepId), true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), true = ets:delete(?DOC_ID_TO_REP_ID, DocId), @@ -365,7 +372,7 @@ stop_all_replications() -> ?LOG_INFO("Stopping all ongoing replications because the replicator DB " "was deleted or changed", []), ets:foldl( - fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end, + fun({_, {RepId, _}}, _) -> couch_rep:end_replication(RepId) end, ok, ?DOC_ID_TO_REP_ID ), |