diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_rep.erl | 144 | ||||
-rw-r--r-- | src/couchdb/couch_replication_manager.erl | 424 |
2 files changed, 327 insertions, 241 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 4fabdd99..a9c156e9 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,12 +16,10 @@ code_change/3]). -export([replicate/2, checkpoint/1]). --export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([make_replication_id/2]). -export([start_replication/3, end_replication/1, get_result/4]). --export([update_rep_doc/2]). -include("couch_db.hrl"). --include("couch_js_functions.hrl"). -include("../ibrowse/ibrowse.hrl"). -define(REP_ID_VERSION, 2). @@ -54,7 +52,6 @@ committed_seq = 0, stats = nil, - rep_doc = nil, source_db_update_notifier = nil, target_db_update_notifier = nil }). @@ -94,11 +91,11 @@ end_replication({BaseId, Extension}) -> end end. -start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> +start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) -> Replicator = { BaseId ++ Extension, {gen_server, start_link, - [?MODULE, [BaseId, RepDoc, UserCtx], []]}, + [?MODULE, [RepId, RepDoc, UserCtx], []]}, temporary, 1, worker, @@ -135,7 +132,7 @@ init(InitArgs) -> {stop, Error} end. -do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> +do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -152,10 +149,8 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), - maybe_set_triggered(RepDoc, RepId), - [SourceLog, TargetLog] = find_replication_logs( - [Source, Target], RepId, {PostProps}, UserCtx), + [Source, Target], BaseId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), {ok, ChangesFeed} = @@ -174,10 +169,12 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - {ShortId, _} = lists:split(6, RepId), + {ShortId, _} = lists:split(6, BaseId), couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), + couch_replication_manager:replication_started(RepId), + State = #state{ changes_feed = ChangesFeed, missing_revs = MissingRevs, @@ -200,7 +197,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - rep_doc = RepDoc, source_db_update_notifier = source_db_update_notifier(Source), target_db_update_notifier = target_db_update_notifier(Target) }, @@ -272,27 +268,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(normal, #state{checkpoint_scheduled=nil} = State) -> +terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) -> do_terminate(State), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); + couch_replication_manager:replication_completed(RepId); -terminate(normal, State) -> +terminate(normal, #state{init_args=[RepId | _]} = State) -> timer:cancel(State#state.checkpoint_scheduled), do_terminate(do_checkpoint(State)), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); + couch_replication_manager:replication_completed(RepId); terminate(shutdown, #state{listeners = Listeners} = State) -> % continuous replication stopped [gen_server:reply(L, {ok, stopped}) || L <- Listeners], terminate_cleanup(State); -terminate(Reason, #state{listeners = Listeners} = State) -> +terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], terminate_cleanup(State), - update_rep_doc( - State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]). + couch_replication_manager:replication_error(RepId, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -698,7 +691,7 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats, - rep_doc = {RepDoc} + init_args = [_RepId, {RepDoc} | _] } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> @@ -901,113 +894,6 @@ parse_proxy_params(ProxyUrl) -> [{proxy_user, User}, {proxy_password, Passwd}] end. -update_rep_doc({Props} = _RepDoc, KVs) -> - case couch_util:get_value(<<"_id">>, Props) of - undefined -> - % replication triggered by POSTing to _replicate/ - ok; - RepDocId -> - % replication triggered by adding a Rep Doc to the replicator DB - {ok, RepDb} = ensure_rep_db_exists(), - case couch_db:open_doc(RepDb, RepDocId, []) of - {ok, LatestRepDoc} -> - update_rep_doc(RepDb, LatestRepDoc, KVs); - _ -> - ok - end, - couch_db:close(RepDb) - end. - -update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> - NewRepDocBody = lists:foldl( - fun({<<"_replication_state">> = K, State} = KV, Body) -> - case couch_util:get_value(K, Body) of - State -> - Body; - _ -> - Body1 = lists:keystore(K, 1, Body, KV), - lists:keystore( - <<"_replication_state_time">>, 1, - Body1, {<<"_replication_state_time">>, timestamp()}) - end; - ({K, _V} = KV, Body) -> - lists:keystore(K, 1, Body, KV) - end, - RepDocBody, - KVs - ), - case NewRepDocBody of - RepDocBody -> - ok; - _ -> - % might not succeed - when the replication doc is deleted right - % before this update (not an error) - couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) - end. - -% RFC3339 timestamps. -% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). -timestamp() -> - {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), - UTime = erlang:universaltime(), - LocalTime = calendar:universal_time_to_local_time(UTime), - DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - - calendar:datetime_to_gregorian_seconds(UTime), - zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), - iolist_to_binary( - io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", - [Year, Month, Day, Hour, Min, Sec, - zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). - -zone(Hr, Min) when Hr >= 0, Min >= 0 -> - io_lib:format("+~2..0w:~2..0w", [Hr, Min]); -zone(Hr, Min) -> - io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). - - -maybe_set_triggered({RepProps} = RepDoc, RepId) -> - case couch_util:get_value(<<"_replication_state">>, RepProps) of - <<"triggered">> -> - ok; - _ -> - update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_id">>, ?l2b(RepId)} - ] - ) - end. - -ensure_rep_db_exists() -> - DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), - Opts = [ - {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, - sys_db - ], - case couch_db:open(DbName, Opts) of - {ok, Db} -> - Db; - _Error -> - {ok, Db} = couch_db:create(DbName, Opts) - end, - ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), - {ok, Db}. - -ensure_rep_ddoc_exists(RepDb, DDocID) -> - case couch_db:open_doc(RepDb, DDocID, []) of - {ok, _Doc} -> - ok; - _ -> - DDoc = couch_doc:from_json_obj({[ - {<<"_id">>, DDocID}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]}), - {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) - end, - ok. - source_db_update_notifier(#db{name = DbName}) -> Server = self(), {ok, Notifier} = couch_db_update_notifier:start_link( diff --git a/src/couchdb/couch_replication_manager.erl b/src/couchdb/couch_replication_manager.erl index e3d97c37..7e2c8118 100644 --- a/src/couchdb/couch_replication_manager.erl +++ b/src/couchdb/couch_replication_manager.erl @@ -13,14 +13,20 @@ -module(couch_replication_manager). -behaviour(gen_server). +% public API +-export([replication_started/1, replication_completed/1, replication_error/2]). + +% gen_server callbacks -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). -export([code_change/3, terminate/2]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). --define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). --define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). --define(INITIAL_WAIT, 5). +-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id). +-define(REP_TO_STATE, couch_rep_id_to_rep_state). +-define(INITIAL_WAIT, 2.5). % seconds +-define(MAX_WAIT, 600). % seconds -record(state, { changes_feed_loop = nil, @@ -30,6 +36,16 @@ max_retries }). +-record(rep_state, { + doc_id, + user_ctx, + doc, + starting, + retries_left, + max_retries, + wait = ?INITIAL_WAIT +}). + -import(couch_util, [ get_value/2, get_value/3, @@ -40,17 +56,56 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +replication_started({BaseId, _} = RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity), + ?LOG_INFO("Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]) + end. + + +replication_completed(RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} = St -> + update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]) + end. + + +replication_error({BaseId, _} = RepId, Error) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + % TODO: maybe add error reason to replication document + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity) + end. + + init(_) -> process_flag(trap_exit, true), - _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]), - _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), + ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), + ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]), Server = self(), ok = couch_config:register( fun("replicator", "db", NewName) -> ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}); - ("replicator", "max_replication_retry_count", NewMaxRetries1) -> - NewMaxRetries = list_to_integer(NewMaxRetries1), - ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries}) + ("replicator", "max_replication_retry_count", V) -> + ok = gen_server:cast(Server, {set_max_retries, retries_value(V)}) end ), {Loop, RepDbName} = changes_feed_loop(), @@ -58,7 +113,7 @@ init(_) -> changes_feed_loop = Loop, rep_db_name = RepDbName, db_notifier = db_update_notifier(), - max_retries = list_to_integer( + max_retries = retries_value( couch_config:get("replicator", "max_replication_retry_count", "10")) }}. @@ -68,32 +123,35 @@ handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> process_update(State, Change) catch _Tag:Error -> - JsonRepDoc = get_value(doc, ChangeProps), - rep_db_update_error(Error, JsonRepDoc), + {RepProps} = get_value(doc, ChangeProps), + DocId = get_value(<<"_id">>, RepProps), + rep_db_update_error(Error, DocId), State end, {reply, ok, NewState}; -handle_call({triggered, {BaseId, _}}, _From, State) -> - [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}), +handle_call({rep_started, RepId}, _From, State) -> + case rep_state(RepId) of + nil -> + ok; + RepState -> + NewRepState = RepState#rep_state{ + starting = false, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries, + wait = ?INITIAL_WAIT + }, + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}) + end, {reply, ok, State}; -handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> - DocId = get_value(<<"_id">>, Props), - [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup( - ?DOC_ID_TO_REP_ID, DocId), - ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " - "the document `~s`. Last error reason was: ~p", - [pp_rep_id(RepId), MaxRetries, DocId, Error]), - couch_rep:update_rep_doc( - RepDoc, - [{<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(BaseId)}]), - true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), - true = ets:delete(?DOC_ID_TO_REP_ID, DocId), +handle_call({rep_complete, RepId}, _From, State) -> + true = ets:delete(?REP_TO_STATE, RepId), {reply, ok, State}; +handle_call({rep_error, RepId, Error}, _From, State) -> + {reply, ok, replication_error(State, RepId, Error)}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", [Msg, From]), @@ -150,8 +208,8 @@ terminate(_Reason, State) -> catch exit(Pid, stop) end, [Loop | StartPids]), - true = ets:delete(?REP_ID_TO_DOC_ID), - true = ets:delete(?DOC_ID_TO_REP_ID), + true = ets:delete(?REP_TO_STATE), + true = ets:delete(?DOC_TO_REP), couch_db_update_notifier:stop(Notifier). @@ -160,7 +218,7 @@ code_change(_OldVsn, State, _Extra) -> changes_feed_loop() -> - {ok, RepDb} = couch_rep:ensure_rep_db_exists(), + {ok, RepDb} = ensure_rep_db_exists(), Server = self(), Pid = spawn_link( fun() -> @@ -245,21 +303,20 @@ process_update(State, {Change}) -> State; false -> case get_value(<<"_replication_state">>, RepProps) of + undefined -> + maybe_start_replication(State, DocId, JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(State, DocId, JsonRepDoc); <<"completed">> -> replication_complete(DocId), State; - <<"error">> -> - stop_replication(DocId), - State; - <<"triggered">> -> - maybe_start_replication(State, DocId, JsonRepDoc); - undefined -> - maybe_start_replication(State, DocId, JsonRepDoc) + _ -> + State end end. -rep_db_update_error(Error, {Props} = JsonRepDoc) -> +rep_db_update_error(Error, DocId) -> case Error of {bad_rep_doc, Reason} -> ok; @@ -267,9 +324,8 @@ rep_db_update_error(Error, {Props} = JsonRepDoc) -> Reason = to_binary(Error) end, ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", - [get_value(<<"_id">>, Props), Reason]), - couch_rep:update_rep_doc( - JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]). + [DocId, Reason]), + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]). rep_user_ctx({RepDoc}) -> @@ -284,30 +340,39 @@ rep_user_ctx({RepDoc}) -> end. -maybe_start_replication(#state{max_retries = MaxRetries} = State, - DocId, JsonRepDoc) -> - UserCtx = rep_user_ctx(JsonRepDoc), - {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx), - case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of - [] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), - true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}), +maybe_start_replication(State, DocId, RepDoc) -> + UserCtx = rep_user_ctx(RepDoc), + {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx), + case rep_state(RepId) of + nil -> + RepState = #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc, + starting = true, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries + }, + true = ets:insert(?REP_TO_STATE, {RepId, RepState}), + true = ets:insert(?DOC_TO_REP, {DocId, RepId}), + ?LOG_INFO("Attempting to start replication `~s` (document `~s`).", + [pp_rep_id(RepId), DocId]), Server = self(), Pid = spawn_link(fun() -> - start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries) + start_replication(Server, RepDoc, RepId, UserCtx, 0) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; - [{BaseId, {DocId, _}}] -> + #rep_state{doc_id = DocId} -> State; - [{BaseId, {OtherDocId, false}}] -> + #rep_state{starting = false, doc_id = OtherDocId} -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), - maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), State; - [{BaseId, {OtherDocId, true}}] -> + #rep_state{starting = true, doc_id = OtherDocId} -> ?LOG_INFO("The replication specified by the document `~s` is already" " being triggered by the document `~s`", [DocId, OtherDocId]), - maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), State end. @@ -323,98 +388,233 @@ make_rep_id(RepDoc, UserCtx) -> end. -maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> - case get_value(<<"_replication_id">>, Props) of +maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> + case get_value(<<"_replication_id">>, RepProps) of RepId -> ok; _ -> - couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}]) + update_rep_doc(DocId, [{<<"_replication_id">>, RepId}]) end. -start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) -> +start_replication(Server, RepDoc, RepId, UserCtx, Wait) -> + ok = timer:sleep(Wait * 1000), case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> - ?LOG_INFO("Document `~s` triggered replication `~s`", - [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), - ok = gen_server:call(Server, {triggered, RepId}, infinity), + ok = gen_server:call(Server, {rep_started, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> - couch_rep:update_rep_doc( - RepDoc, - [{<<"_replication_state">>, <<"error">>}, - {<<"_replication_id">>, ?l2b(element(1, RepId))}]), - keep_retrying( - Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries) + replication_error(RepId, Error) end. -keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> - ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); - -keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> - {RepProps} = RepDoc, - DocId = get_value(<<"_id">>, RepProps), - ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. " - "Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]), - ok = timer:sleep(Wait * 1000), - case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of - Pid when is_pid(Pid) -> - ok = gen_server:call(Server, {triggered, RepId}, infinity), - [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), - ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", - [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]), - couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); - NewError -> - keep_retrying( - Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1) +replication_complete(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + case rep_state(RepId) of + nil -> + couch_rep:end_replication(RepId); + #rep_state{} -> + ok + end, + true = ets:delete(?DOC_TO_REP, DocId); + _ -> + ok end. rep_doc_deleted(DocId) -> - case stop_replication(DocId) of - {ok, RepId} -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), ?LOG_INFO("Stopped replication `~s` because replication document `~s`" " was deleted", [pp_rep_id(RepId), DocId]); - none -> + [] -> ok end. -replication_complete(DocId) -> - case stop_replication(DocId) of - {ok, RepId} -> - ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", - [pp_rep_id(RepId), DocId]); - none -> - ok +replication_error(State, RepId, Error) -> + case rep_state(RepId) of + nil -> + State; + RepState -> + maybe_retry_replication(RepId, RepState, Error, State) end. - -stop_replication(DocId) -> - case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of - [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] -> - couch_rep:end_replication(RepId), - true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), - true = ets:delete(?DOC_ID_TO_REP_ID, DocId), - {ok, RepId}; - [] -> - none - end. +maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + max_retries = MaxRetries + } = RepState, + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nReached maximum retry attempts (~p).", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]), + State; + +maybe_retry_replication(RepId, RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc + } = RepState, + #rep_state{wait = Wait} = NewRepState = state_after_error(RepState), + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nRestarting replication in ~p seconds.", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]), + Server = self(), + Pid = spawn_link(fun() -> + start_replication(Server, RepDoc, RepId, UserCtx, Wait) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}. stop_all_replications() -> ?LOG_INFO("Stopping all ongoing replications because the replicator" " database was deleted or changed", []), ets:foldl( - fun({_, {RepId, _}}, _) -> + fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end, - ok, ?DOC_ID_TO_REP_ID), - true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), - true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). + ok, ?DOC_TO_REP), + true = ets:delete_all_objects(?REP_TO_STATE), + true = ets:delete_all_objects(?DOC_TO_REP). + + +update_rep_doc(RepDocId, KVs) -> + {ok, RepDb} = ensure_rep_db_exists(), + try + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end + catch throw:conflict -> + % Shouldn't happen, as by default only the role _replicator can + % update replication documents. + ?LOG_ERROR("Conflict error when updating replication document `~s`." + " Retrying.", [RepDocId]), + ok = timer:sleep(5), + update_rep_doc(RepDocId, KVs) + after + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, State} = KV, Body) -> + case get_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, timestamp()}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) + end. + + +% RFC3339 timestamps. +% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). +timestamp() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), + UTime = erlang:universaltime(), + LocalTime = calendar:universal_time_to_local_time(UTime), + DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - + calendar:datetime_to_gregorian_seconds(UTime), + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", + [Year, Month, Day, Hour, Min, Sec, + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). + +zone(Hr, Min) when Hr >= 0, Min >= 0 -> + io_lib:format("+~2..0w:~2..0w", [Hr, Min]); +zone(Hr, Min) -> + io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). + + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. % pretty-print replication id pp_rep_id({Base, Extension}) -> Base ++ Extension. + + +rep_state(RepId) -> + case ets:lookup(?REP_TO_STATE, RepId) of + [{RepId, RepState}] -> + RepState; + [] -> + nil + end. + + +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. + + +retries_value("infinity") -> + infinity; +retries_value(Value) -> + list_to_integer(Value). + + +state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) -> + Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT), + case Left of + infinity -> + State#rep_state{wait = Wait2}; + _ -> + State#rep_state{retries_left = Left - 1, wait = Wait2} + end. |