diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 73 | 
1 files changed, 60 insertions, 13 deletions
| diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 4e0a929e..819f167d 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -20,6 +20,8 @@  -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).  -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). +-define(MAX_RETRIES, 10). +-define(INITIAL_WAIT, 5).  -record(state, {      changes_feed_loop = nil, @@ -58,6 +60,29 @@ init(_) ->  handle_call({rep_db_update, Change}, _From, State) ->      {reply, ok, process_update(State, Change)}; +handle_call({triggered, {BaseId, _}}, _From, State) -> +    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of +    [{BaseId, {DocId, true}}] -> +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}); +    _ -> +        ok +    end, +    {reply, ok, State}; + +handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> +    DocId = get_value(<<"_id">>, Props), +    [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), +    ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " +        "the document `~s`. Last error reason was: ~p", +        [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]), +    couch_rep:update_rep_doc( +        RepDoc, +        [{<<"_replication_state">>, <<"error">>}, +            {<<"_replication_id">>, ?l2b(BaseId)}]), +    true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), +    true = ets:delete(?DOC_ID_TO_REP_ID, DocId), +    {reply, ok, State}; +  handle_call(Msg, From, State) ->      ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",          [Msg, From]), @@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, JsonRepDoc) ->      {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),      case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of      [] -> -        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),          true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), +        Server = self(),          Pid = spawn_link(fun() -> -            start_replication(JsonRepDoc, RepId, UserCtx) +            start_replication(Server, JsonRepDoc, RepId, UserCtx)          end),          State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; -    [{BaseId, DocId}] -> +    [{BaseId, {DocId, _}}] ->          State; -    [{BaseId, OtherDocId}] -> +    [{BaseId, {OtherDocId, false}}] ->          ?LOG_INFO("The replication specified by the document `~s` was already"              " triggered by the document `~s`", [DocId, OtherDocId]),          maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), +        State; +    [{BaseId, {OtherDocId, true}}] -> +        ?LOG_INFO("The replication specified by the document `~s` is already" +            " being triggered by the document `~s`", [DocId, OtherDocId]), +        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),          State      end. @@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->      end. -start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->      case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of      Pid when is_pid(Pid) ->          ?LOG_INFO("Document `~s` triggered replication `~s`",              [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), +        ok = gen_server:call(Server, {triggered, RepId}, infinity),          couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);      Error -> -        couch_rep:update_rep_doc( -            RepDoc, -            [ -                {<<"_replication_state">>, <<"error">>}, -                {<<"_replication_id">>, ?l2b(Base)} -            ] -        ), -        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error]) +        keep_retrying( +            Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES) +    end. + + +keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> +    ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); + +keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> +    ?LOG_ERROR("Error starting replication `~s`: ~p. " +        "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]), +    ok = timer:sleep(Wait * 1000), +    case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of +    Pid when is_pid(Pid) -> +        ok = gen_server:call(Server, {triggered, RepId}, infinity), +        {RepProps} = RepDoc, +        ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", +            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId), +                ?MAX_RETRIES - RetriesLeft + 1]), +        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); +    NewError -> +        keep_retrying( +            Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)      end. | 
