% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_replication_manager). -behaviour(gen_server). -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). -export([code_change/3, terminate/2]). -include("couch_db.hrl"). -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). -define(INITIAL_WAIT, 5). -record(state, { changes_feed_loop = nil, db_notifier = nil, rep_db_name = nil, rep_start_pids = [], max_retries }). -import(couch_util, [ get_value/2, get_value/3, to_binary/1 ]). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init(_) -> process_flag(trap_exit, true), _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]), _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), Server = self(), ok = couch_config:register( fun("replicator", "db", NewName) -> ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}); ("replicator", "max_replication_retry_count", NewMaxRetries1) -> NewMaxRetries = list_to_integer(NewMaxRetries1), ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries}) end ), {Loop, RepDbName} = changes_feed_loop(), {ok, #state{ changes_feed_loop = Loop, rep_db_name = RepDbName, db_notifier = db_update_notifier(), max_retries = list_to_integer( couch_config:get("replicator", "max_replication_retry_count", "10")) }}. handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> NewState = try process_update(State, Change) catch _Tag:Error -> JsonRepDoc = get_value(doc, ChangeProps), rep_db_update_error(Error, JsonRepDoc), State end, {reply, ok, NewState}; handle_call({triggered, {BaseId, _}}, _From, State) -> [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}), {reply, ok, State}; handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> DocId = get_value(<<"_id">>, Props), [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup( ?DOC_ID_TO_REP_ID, DocId), ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " "the document `~s`. Last error reason was: ~p", [pp_rep_id(RepId), MaxRetries, DocId, Error]), couch_rep:update_rep_doc( RepDoc, [{<<"_replication_state">>, <<"error">>}, {<<"_replication_id">>, ?l2b(BaseId)}]), true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), true = ets:delete(?DOC_ID_TO_REP_ID, DocId), {reply, ok, State}; handle_call(Msg, From, State) -> ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", [Msg, From]), {stop, {error, {unexpected_call, Msg}}, State}. handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) -> {noreply, State}; handle_cast({rep_db_changed, _NewName}, State) -> {noreply, restart(State)}; handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) -> {noreply, State}; handle_cast({rep_db_created, _NewName}, State) -> {noreply, restart(State)}; handle_cast({set_max_retries, MaxRetries}, State) -> {noreply, State#state{max_retries = MaxRetries}}; handle_cast(Msg, State) -> ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]), {stop, {error, {unexpected_cast, Msg}}, State}. handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) -> % replicator DB deleted {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}}; handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), {stop, {db_update_notifier_died, Reason}, State}; handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> % one of the replication start processes terminated successfully {noreply, State#state{rep_start_pids = Pids -- [From]}}; handle_info(Msg, State) -> ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]), {stop, {unexpected_msg, Msg}, State}. terminate(_Reason, State) -> #state{ rep_start_pids = StartPids, changes_feed_loop = Loop, db_notifier = Notifier } = State, stop_all_replications(), lists:foreach( fun(Pid) -> catch unlink(Pid), catch exit(Pid, stop) end, [Loop | StartPids]), true = ets:delete(?REP_ID_TO_DOC_ID), true = ets:delete(?DOC_ID_TO_REP_ID), couch_db_update_notifier:stop(Notifier). code_change(_OldVsn, State, _Extra) -> {ok, State}. changes_feed_loop() -> {ok, RepDb} = couch_rep:ensure_rep_db_exists(), Server = self(), Pid = spawn_link( fun() -> ChangesFeedFun = couch_changes:handle_changes( #changes_args{ include_docs = true, feed = "continuous", timeout = infinity, db_open_options = [sys_db] }, {json_req, null}, RepDb ), ChangesFeedFun( fun({change, Change, _}, _) -> case has_valid_rep_id(Change) of true -> ok = gen_server:call( Server, {rep_db_update, Change}, infinity); false -> ok end; (_, _) -> ok end ) end ), couch_db:close(RepDb), {Pid, couch_db:name(RepDb)}. has_valid_rep_id({Change}) -> has_valid_rep_id(get_value(<<"id">>, Change)); has_valid_rep_id(<>) -> false; has_valid_rep_id(_Else) -> true. db_update_notifier() -> Server = self(), {ok, Notifier} = couch_db_update_notifier:start_link( fun({created, DbName}) -> case ?l2b(couch_config:get("replicator", "db", "_replicator")) of DbName -> ok = gen_server:cast(Server, {rep_db_created, DbName}); _ -> ok end; (_) -> % no need to handle the 'deleted' event - the changes feed loop % dies when the database is deleted ok end ), Notifier. restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> stop_all_replications(), lists:foreach( fun(Pid) -> catch unlink(Pid), catch exit(Pid, rep_db_changed) end, [Loop | StartPids]), {NewLoop, NewRepDbName} = changes_feed_loop(), State#state{ changes_feed_loop = NewLoop, rep_db_name = NewRepDbName, rep_start_pids = [] }. process_update(State, {Change}) -> {RepProps} = JsonRepDoc = get_value(doc, Change), DocId = get_value(<<"_id">>, RepProps), case get_value(<<"deleted">>, Change, false) of true -> rep_doc_deleted(DocId), State; false -> case get_value(<<"_replication_state">>, RepProps) of <<"completed">> -> replication_complete(DocId), State; <<"error">> -> stop_replication(DocId), State; <<"triggered">> -> maybe_start_replication(State, DocId, JsonRepDoc); undefined -> maybe_start_replication(State, DocId, JsonRepDoc) end end. rep_db_update_error(Error, {Props} = JsonRepDoc) -> case Error of {bad_rep_doc, Reason} -> ok; _ -> Reason = to_binary(Error) end, ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", [get_value(<<"_id">>, Props), Reason]), couch_rep:update_rep_doc( JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]). rep_user_ctx({RepDoc}) -> case get_value(<<"user_ctx">>, RepDoc) of undefined -> #user_ctx{}; {UserCtx} -> #user_ctx{ name = get_value(<<"name">>, UserCtx, null), roles = get_value(<<"roles">>, UserCtx, []) } end. maybe_start_replication(#state{max_retries = MaxRetries} = State, DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}), Server = self(), Pid = spawn_link(fun() -> start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries) end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; [{BaseId, {DocId, _}}] -> State; [{BaseId, {OtherDocId, false}}] -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), State; [{BaseId, {OtherDocId, true}}] -> ?LOG_INFO("The replication specified by the document `~s` is already" " being triggered by the document `~s`", [DocId, OtherDocId]), maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), State end. make_rep_id(RepDoc, UserCtx) -> try couch_rep:make_replication_id(RepDoc, UserCtx) catch throw:{error, Reason} -> throw({bad_rep_doc, Reason}); Tag:Err -> throw({bad_rep_doc, to_binary({Tag, Err})}) end. maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> case get_value(<<"_replication_id">>, Props) of RepId -> ok; _ -> couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}]) end. start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> ?LOG_INFO("Document `~s` triggered replication `~s`", [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), ok = gen_server:call(Server, {triggered, RepId}, infinity), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> couch_rep:update_rep_doc( RepDoc, [{<<"_replication_state">>, <<"error">>}, {<<"_replication_id">>, ?l2b(element(1, RepId))}]), keep_retrying( Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries) end. keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> {RepProps} = RepDoc, DocId = get_value(<<"_id">>, RepProps), ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. " "Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]), ok = timer:sleep(Wait * 1000), case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of Pid when is_pid(Pid) -> ok = gen_server:call(Server, {triggered, RepId}, infinity), [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]), couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); NewError -> keep_retrying( Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1) end. rep_doc_deleted(DocId) -> case stop_replication(DocId) of {ok, RepId} -> ?LOG_INFO("Stopped replication `~s` because replication document `~s`" " was deleted", [pp_rep_id(RepId), DocId]); none -> ok end. replication_complete(DocId) -> case stop_replication(DocId) of {ok, RepId} -> ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", [pp_rep_id(RepId), DocId]); none -> ok end. stop_replication(DocId) -> case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] -> couch_rep:end_replication(RepId), true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), true = ets:delete(?DOC_ID_TO_REP_ID, DocId), {ok, RepId}; [] -> none end. stop_all_replications() -> ?LOG_INFO("Stopping all ongoing replications because the replicator" " database was deleted or changed", []), ets:foldl( fun({_, {RepId, _}}, _) -> couch_rep:end_replication(RepId) end, ok, ?DOC_ID_TO_REP_ID), true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). % pretty-print replication id pp_rep_id({Base, Extension}) -> Base ++ Extension.