summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_replication_manager.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_replication_manager.erl')
-rw-r--r--src/couchdb/couch_replication_manager.erl387
1 files changed, 0 insertions, 387 deletions
diff --git a/src/couchdb/couch_replication_manager.erl b/src/couchdb/couch_replication_manager.erl
deleted file mode 100644
index 6537c8b2..00000000
--- a/src/couchdb/couch_replication_manager.erl
+++ /dev/null
@@ -1,387 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replication_manager).
--behaviour(gen_server).
-
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
--include("couch_db.hrl").
-
--define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
--define(INITIAL_WAIT, 5).
-
--record(state, {
- changes_feed_loop = nil,
- db_notifier = nil,
- rep_db_name = nil,
- rep_start_pids = [],
- max_retries
-}).
-
--import(couch_util, [
- get_value/2,
- get_value/3
-]).
-
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-init(_) ->
- process_flag(trap_exit, true),
- _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
- _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
- Server = self(),
- ok = couch_config:register(
- fun("replicator", "db", NewName) ->
- ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
- ("replicator", "max_replication_retry_count", NewMaxRetries1) ->
- NewMaxRetries = list_to_integer(NewMaxRetries1),
- ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries})
- end
- ),
- {Loop, RepDbName} = changes_feed_loop(),
- {ok, #state{
- changes_feed_loop = Loop,
- rep_db_name = RepDbName,
- db_notifier = db_update_notifier(),
- max_retries = list_to_integer(
- couch_config:get("replicator", "max_replication_retry_count", "10"))
- }}.
-
-
-handle_call({rep_db_update, Change}, _From, State) ->
- {reply, ok, process_update(State, Change)};
-
-handle_call({triggered, {BaseId, _}}, _From, State) ->
- [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
- {reply, ok, State};
-
-handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
- DocId = get_value(<<"_id">>, Props),
- [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
- ?DOC_ID_TO_REP_ID, DocId),
- ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
- "the document `~s`. Last error reason was: ~p",
- [pp_rep_id(RepId), MaxRetries, DocId, Error]),
- couch_rep:update_rep_doc(
- RepDoc,
- [{<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(BaseId)}]),
- true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
- {reply, ok, State};
-
-handle_call(Msg, From, State) ->
- ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
- [Msg, From]),
- {stop, {error, {unexpected_call, Msg}}, State}.
-
-
-handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
- {noreply, State};
-
-handle_cast({rep_db_changed, _NewName}, State) ->
- {noreply, restart(State)};
-
-handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
- {noreply, State};
-
-handle_cast({rep_db_created, _NewName}, State) ->
- {noreply, restart(State)};
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
- {noreply, State#state{max_retries = MaxRetries}};
-
-handle_cast(Msg, State) ->
- ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
- {stop, {error, {unexpected_cast, Msg}}, State}.
-
-
-handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
- % replicator DB deleted
- {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
-
-handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
- ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
- {stop, {db_update_notifier_died, Reason}, State};
-
-handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
- % one of the replication start processes terminated successfully
- {noreply, State#state{rep_start_pids = Pids -- [From]}};
-
-handle_info(Msg, State) ->
- ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
- {stop, {unexpected_msg, Msg}, State}.
-
-
-terminate(_Reason, State) ->
- #state{
- rep_start_pids = StartPids,
- changes_feed_loop = Loop,
- db_notifier = Notifier
- } = State,
- stop_all_replications(),
- lists:foreach(
- fun(Pid) ->
- catch unlink(Pid),
- catch exit(Pid, stop)
- end,
- [Loop | StartPids]),
- true = ets:delete(?REP_ID_TO_DOC_ID),
- true = ets:delete(?DOC_ID_TO_REP_ID),
- couch_db_update_notifier:stop(Notifier).
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-changes_feed_loop() ->
- {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
- Server = self(),
- Pid = spawn_link(
- fun() ->
- ChangesFeedFun = couch_changes:handle_changes(
- #changes_args{
- include_docs = true,
- feed = "continuous",
- timeout = infinity,
- db_open_options = [sys_db]
- },
- {json_req, null},
- RepDb
- ),
- ChangesFeedFun(
- fun({change, Change, _}, _) ->
- case has_valid_rep_id(Change) of
- true ->
- ok = gen_server:call(
- Server, {rep_db_update, Change}, infinity);
- false ->
- ok
- end;
- (_, _) ->
- ok
- end
- )
- end
- ),
- couch_db:close(RepDb),
- {Pid, couch_db:name(RepDb)}.
-
-
-has_valid_rep_id({Change}) ->
- has_valid_rep_id(get_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
- false;
-has_valid_rep_id(_Else) ->
- true.
-
-
-db_update_notifier() ->
- Server = self(),
- {ok, Notifier} = couch_db_update_notifier:start_link(
- fun({created, DbName}) ->
- case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
- DbName ->
- ok = gen_server:cast(Server, {rep_db_created, DbName});
- _ ->
- ok
- end;
- (_) ->
- % no need to handle the 'deleted' event - the changes feed loop
- % dies when the database is deleted
- ok
- end
- ),
- Notifier.
-
-
-restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
- stop_all_replications(),
- lists:foreach(
- fun(Pid) ->
- catch unlink(Pid),
- catch exit(Pid, rep_db_changed)
- end,
- [Loop | StartPids]),
- {NewLoop, NewRepDbName} = changes_feed_loop(),
- State#state{
- changes_feed_loop = NewLoop,
- rep_db_name = NewRepDbName,
- rep_start_pids = []
- }.
-
-
-process_update(State, {Change}) ->
- {RepProps} = JsonRepDoc = get_value(doc, Change),
- DocId = get_value(<<"_id">>, RepProps),
- case get_value(<<"deleted">>, Change, false) of
- true ->
- rep_doc_deleted(DocId),
- State;
- false ->
- case get_value(<<"_replication_state">>, RepProps) of
- <<"completed">> ->
- replication_complete(DocId),
- State;
- <<"error">> ->
- stop_replication(DocId),
- State;
- <<"triggered">> ->
- maybe_start_replication(State, DocId, JsonRepDoc);
- undefined ->
- maybe_start_replication(State, DocId, JsonRepDoc)
- end
- end.
-
-
-rep_user_ctx({RepDoc}) ->
- case get_value(<<"user_ctx">>, RepDoc) of
- undefined ->
- #user_ctx{};
- {UserCtx} ->
- #user_ctx{
- name = get_value(<<"name">>, UserCtx, null),
- roles = get_value(<<"roles">>, UserCtx, [])
- }
- end.
-
-
-maybe_start_replication(#state{max_retries = MaxRetries} = State,
- DocId, JsonRepDoc) ->
- UserCtx = rep_user_ctx(JsonRepDoc),
- {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
- case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
- [] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
- true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
- Server = self(),
- Pid = spawn_link(fun() ->
- start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
- end),
- State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, {DocId, _}}] ->
- State;
- [{BaseId, {OtherDocId, false}}] ->
- ?LOG_INFO("The replication specified by the document `~s` was already"
- " triggered by the document `~s`", [DocId, OtherDocId]),
- maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
- State;
- [{BaseId, {OtherDocId, true}}] ->
- ?LOG_INFO("The replication specified by the document `~s` is already"
- " being triggered by the document `~s`", [DocId, OtherDocId]),
- maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
- State
- end.
-
-
-maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
- case get_value(<<"_replication_id">>, Props) of
- RepId ->
- ok;
- _ ->
- couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
- end.
-
-
-start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
- case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- Pid when is_pid(Pid) ->
- ?LOG_INFO("Document `~s` triggered replication `~s`",
- [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
- ok = gen_server:call(Server, {triggered, RepId}, infinity),
- couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
- Error ->
- couch_rep:update_rep_doc(
- RepDoc,
- [{<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(element(1, RepId))}]),
- keep_retrying(
- Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
- end.
-
-
-keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
- ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
-
-keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
- {RepProps} = RepDoc,
- DocId = get_value(<<"_id">>, RepProps),
- ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. "
- "Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]),
- ok = timer:sleep(Wait * 1000),
- case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- Pid when is_pid(Pid) ->
- ok = gen_server:call(Server, {triggered, RepId}, infinity),
- [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
- ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
- [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
- couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
- NewError ->
- keep_retrying(
- Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
- end.
-
-
-rep_doc_deleted(DocId) ->
- case stop_replication(DocId) of
- {ok, RepId} ->
- ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
- " was deleted", [pp_rep_id(RepId), DocId]);
- none ->
- ok
- end.
-
-
-replication_complete(DocId) ->
- case stop_replication(DocId) of
- {ok, RepId} ->
- ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
- [pp_rep_id(RepId), DocId]);
- none ->
- ok
- end.
-
-
-stop_replication(DocId) ->
- case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
- [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
- couch_rep:end_replication(RepId),
- true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
- {ok, RepId};
- [] ->
- none
- end.
-
-
-stop_all_replications() ->
- ?LOG_INFO("Stopping all ongoing replications because the replicator"
- " database was deleted or changed", []),
- ets:foldl(
- fun({_, {RepId, _}}, _) ->
- couch_rep:end_replication(RepId)
- end,
- ok, ?DOC_ID_TO_REP_ID),
- true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
- true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
-
-
-% pretty-print replication id
-pp_rep_id({Base, Extension}) ->
- Base ++ Extension.