summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_rep.erl144
-rw-r--r--src/couchdb/couch_replication_manager.erl424
2 files changed, 327 insertions, 241 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 4fabdd99..a9c156e9 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,12 +16,10 @@
code_change/3]).
-export([replicate/2, checkpoint/1]).
--export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([make_replication_id/2]).
-export([start_replication/3, end_replication/1, get_result/4]).
--export([update_rep_doc/2]).
-include("couch_db.hrl").
--include("couch_js_functions.hrl").
-include("../ibrowse/ibrowse.hrl").
-define(REP_ID_VERSION, 2).
@@ -54,7 +52,6 @@
committed_seq = 0,
stats = nil,
- rep_doc = nil,
source_db_update_notifier = nil,
target_db_update_notifier = nil
}).
@@ -94,11 +91,11 @@ end_replication({BaseId, Extension}) ->
end
end.
-start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) ->
Replicator = {
BaseId ++ Extension,
{gen_server, start_link,
- [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+ [?MODULE, [RepId, RepDoc, UserCtx], []]},
temporary,
1,
worker,
@@ -135,7 +132,7 @@ init(InitArgs) ->
{stop, Error}
end.
-do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
+do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -152,10 +149,8 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
- maybe_set_triggered(RepDoc, RepId),
-
[SourceLog, TargetLog] = find_replication_logs(
- [Source, Target], RepId, {PostProps}, UserCtx),
+ [Source, Target], BaseId, {PostProps}, UserCtx),
{StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
{ok, ChangesFeed} =
@@ -174,10 +169,12 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
ets:insert(Stats, {docs_written, 0}),
ets:insert(Stats, {doc_write_failures, 0}),
- {ShortId, _} = lists:split(6, RepId),
+ {ShortId, _} = lists:split(6, BaseId),
couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
[ShortId, dbname(Source), dbname(Target)]), "Starting"),
+ couch_replication_manager:replication_started(RepId),
+
State = #state{
changes_feed = ChangesFeed,
missing_revs = MissingRevs,
@@ -200,7 +197,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- rep_doc = RepDoc,
source_db_update_notifier = source_db_update_notifier(Source),
target_db_update_notifier = target_db_update_notifier(Target)
},
@@ -272,27 +268,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
+terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) ->
do_terminate(State),
- update_rep_doc(
- State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+ couch_replication_manager:replication_completed(RepId);
-terminate(normal, State) ->
+terminate(normal, #state{init_args=[RepId | _]} = State) ->
timer:cancel(State#state.checkpoint_scheduled),
do_terminate(do_checkpoint(State)),
- update_rep_doc(
- State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+ couch_replication_manager:replication_completed(RepId);
terminate(shutdown, #state{listeners = Listeners} = State) ->
% continuous replication stopped
[gen_server:reply(L, {ok, stopped}) || L <- Listeners],
terminate_cleanup(State);
-terminate(Reason, #state{listeners = Listeners} = State) ->
+terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
terminate_cleanup(State),
- update_rep_doc(
- State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]).
+ couch_replication_manager:replication_error(RepId, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -698,7 +691,7 @@ do_checkpoint(State) ->
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
stats = Stats,
- rep_doc = {RepDoc}
+ init_args = [_RepId, {RepDoc} | _]
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
@@ -901,113 +894,6 @@ parse_proxy_params(ProxyUrl) ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
-update_rep_doc({Props} = _RepDoc, KVs) ->
- case couch_util:get_value(<<"_id">>, Props) of
- undefined ->
- % replication triggered by POSTing to _replicate/
- ok;
- RepDocId ->
- % replication triggered by adding a Rep Doc to the replicator DB
- {ok, RepDb} = ensure_rep_db_exists(),
- case couch_db:open_doc(RepDb, RepDocId, []) of
- {ok, LatestRepDoc} ->
- update_rep_doc(RepDb, LatestRepDoc, KVs);
- _ ->
- ok
- end,
- couch_db:close(RepDb)
- end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
- NewRepDocBody = lists:foldl(
- fun({<<"_replication_state">> = K, State} = KV, Body) ->
- case couch_util:get_value(K, Body) of
- State ->
- Body;
- _ ->
- Body1 = lists:keystore(K, 1, Body, KV),
- lists:keystore(
- <<"_replication_state_time">>, 1,
- Body1, {<<"_replication_state_time">>, timestamp()})
- end;
- ({K, _V} = KV, Body) ->
- lists:keystore(K, 1, Body, KV)
- end,
- RepDocBody,
- KVs
- ),
- case NewRepDocBody of
- RepDocBody ->
- ok;
- _ ->
- % might not succeed - when the replication doc is deleted right
- % before this update (not an error)
- couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
- end.
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
- {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
- UTime = erlang:universaltime(),
- LocalTime = calendar:universal_time_to_local_time(UTime),
- DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
- calendar:datetime_to_gregorian_seconds(UTime),
- zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
- iolist_to_binary(
- io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
- [Year, Month, Day, Hour, Min, Sec,
- zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
- io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
- io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-maybe_set_triggered({RepProps} = RepDoc, RepId) ->
- case couch_util:get_value(<<"_replication_state">>, RepProps) of
- <<"triggered">> ->
- ok;
- _ ->
- update_rep_doc(
- RepDoc,
- [
- {<<"_replication_state">>, <<"triggered">>},
- {<<"_replication_id">>, ?l2b(RepId)}
- ]
- )
- end.
-
-ensure_rep_db_exists() ->
- DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
- Opts = [
- {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
- sys_db
- ],
- case couch_db:open(DbName, Opts) of
- {ok, Db} ->
- Db;
- _Error ->
- {ok, Db} = couch_db:create(DbName, Opts)
- end,
- ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
- {ok, Db}.
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
- case couch_db:open_doc(RepDb, DDocID, []) of
- {ok, _Doc} ->
- ok;
- _ ->
- DDoc = couch_doc:from_json_obj({[
- {<<"_id">>, DDocID},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ]}),
- {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
- end,
- ok.
-
source_db_update_notifier(#db{name = DbName}) ->
Server = self(),
{ok, Notifier} = couch_db_update_notifier:start_link(
diff --git a/src/couchdb/couch_replication_manager.erl b/src/couchdb/couch_replication_manager.erl
index e3d97c37..7e2c8118 100644
--- a/src/couchdb/couch_replication_manager.erl
+++ b/src/couchdb/couch_replication_manager.erl
@@ -13,14 +13,20 @@
-module(couch_replication_manager).
-behaviour(gen_server).
+% public API
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
--define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
--define(INITIAL_WAIT, 5).
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600). % seconds
-record(state, {
changes_feed_loop = nil,
@@ -30,6 +36,16 @@
max_retries
}).
+-record(rep_state, {
+ doc_id,
+ user_ctx,
+ doc,
+ starting,
+ retries_left,
+ max_retries,
+ wait = ?INITIAL_WAIT
+}).
+
-import(couch_util, [
get_value/2,
get_value/3,
@@ -40,17 +56,56 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+replication_started({BaseId, _} = RepId) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{doc_id = DocId} ->
+ update_rep_doc(DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+ ?LOG_INFO("Document `~s` triggered replication `~s`",
+ [DocId, pp_rep_id(RepId)])
+ end.
+
+
+replication_completed(RepId) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{doc_id = DocId} = St ->
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+ ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+ ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+ [pp_rep_id(RepId), DocId])
+ end.
+
+
+replication_error({BaseId, _} = RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{doc_id = DocId} ->
+ % TODO: maybe add error reason to replication document
+ update_rep_doc(DocId, [
+ {<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+ end.
+
+
init(_) ->
process_flag(trap_exit, true),
- _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
- _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
+ ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+ ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
Server = self(),
ok = couch_config:register(
fun("replicator", "db", NewName) ->
ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
- ("replicator", "max_replication_retry_count", NewMaxRetries1) ->
- NewMaxRetries = list_to_integer(NewMaxRetries1),
- ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries})
+ ("replicator", "max_replication_retry_count", V) ->
+ ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
end
),
{Loop, RepDbName} = changes_feed_loop(),
@@ -58,7 +113,7 @@ init(_) ->
changes_feed_loop = Loop,
rep_db_name = RepDbName,
db_notifier = db_update_notifier(),
- max_retries = list_to_integer(
+ max_retries = retries_value(
couch_config:get("replicator", "max_replication_retry_count", "10"))
}}.
@@ -68,32 +123,35 @@ handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
process_update(State, Change)
catch
_Tag:Error ->
- JsonRepDoc = get_value(doc, ChangeProps),
- rep_db_update_error(Error, JsonRepDoc),
+ {RepProps} = get_value(doc, ChangeProps),
+ DocId = get_value(<<"_id">>, RepProps),
+ rep_db_update_error(Error, DocId),
State
end,
{reply, ok, NewState};
-handle_call({triggered, {BaseId, _}}, _From, State) ->
- [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
+handle_call({rep_started, RepId}, _From, State) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ RepState ->
+ NewRepState = RepState#rep_state{
+ starting = false,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries,
+ wait = ?INITIAL_WAIT
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+ end,
{reply, ok, State};
-handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
- DocId = get_value(<<"_id">>, Props),
- [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
- ?DOC_ID_TO_REP_ID, DocId),
- ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
- "the document `~s`. Last error reason was: ~p",
- [pp_rep_id(RepId), MaxRetries, DocId, Error]),
- couch_rep:update_rep_doc(
- RepDoc,
- [{<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(BaseId)}]),
- true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+handle_call({rep_complete, RepId}, _From, State) ->
+ true = ets:delete(?REP_TO_STATE, RepId),
{reply, ok, State};
+handle_call({rep_error, RepId, Error}, _From, State) ->
+ {reply, ok, replication_error(State, RepId, Error)};
+
handle_call(Msg, From, State) ->
?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
[Msg, From]),
@@ -150,8 +208,8 @@ terminate(_Reason, State) ->
catch exit(Pid, stop)
end,
[Loop | StartPids]),
- true = ets:delete(?REP_ID_TO_DOC_ID),
- true = ets:delete(?DOC_ID_TO_REP_ID),
+ true = ets:delete(?REP_TO_STATE),
+ true = ets:delete(?DOC_TO_REP),
couch_db_update_notifier:stop(Notifier).
@@ -160,7 +218,7 @@ code_change(_OldVsn, State, _Extra) ->
changes_feed_loop() ->
- {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+ {ok, RepDb} = ensure_rep_db_exists(),
Server = self(),
Pid = spawn_link(
fun() ->
@@ -245,21 +303,20 @@ process_update(State, {Change}) ->
State;
false ->
case get_value(<<"_replication_state">>, RepProps) of
+ undefined ->
+ maybe_start_replication(State, DocId, JsonRepDoc);
+ <<"triggered">> ->
+ maybe_start_replication(State, DocId, JsonRepDoc);
<<"completed">> ->
replication_complete(DocId),
State;
- <<"error">> ->
- stop_replication(DocId),
- State;
- <<"triggered">> ->
- maybe_start_replication(State, DocId, JsonRepDoc);
- undefined ->
- maybe_start_replication(State, DocId, JsonRepDoc)
+ _ ->
+ State
end
end.
-rep_db_update_error(Error, {Props} = JsonRepDoc) ->
+rep_db_update_error(Error, DocId) ->
case Error of
{bad_rep_doc, Reason} ->
ok;
@@ -267,9 +324,8 @@ rep_db_update_error(Error, {Props} = JsonRepDoc) ->
Reason = to_binary(Error)
end,
?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
- [get_value(<<"_id">>, Props), Reason]),
- couch_rep:update_rep_doc(
- JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]).
+ [DocId, Reason]),
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]).
rep_user_ctx({RepDoc}) ->
@@ -284,30 +340,39 @@ rep_user_ctx({RepDoc}) ->
end.
-maybe_start_replication(#state{max_retries = MaxRetries} = State,
- DocId, JsonRepDoc) ->
- UserCtx = rep_user_ctx(JsonRepDoc),
- {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx),
- case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
- [] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
- true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
+maybe_start_replication(State, DocId, RepDoc) ->
+ UserCtx = rep_user_ctx(RepDoc),
+ {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx),
+ case rep_state(RepId) of
+ nil ->
+ RepState = #rep_state{
+ doc_id = DocId,
+ user_ctx = UserCtx,
+ doc = RepDoc,
+ starting = true,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+ true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+ ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+ [pp_rep_id(RepId), DocId]),
Server = self(),
Pid = spawn_link(fun() ->
- start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
+ start_replication(Server, RepDoc, RepId, UserCtx, 0)
end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, {DocId, _}}] ->
+ #rep_state{doc_id = DocId} ->
State;
- [{BaseId, {OtherDocId, false}}] ->
+ #rep_state{starting = false, doc_id = OtherDocId} ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
- maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
State;
- [{BaseId, {OtherDocId, true}}] ->
+ #rep_state{starting = true, doc_id = OtherDocId} ->
?LOG_INFO("The replication specified by the document `~s` is already"
" being triggered by the document `~s`", [DocId, OtherDocId]),
- maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
State
end.
@@ -323,98 +388,233 @@ make_rep_id(RepDoc, UserCtx) ->
end.
-maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
- case get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+ case get_value(<<"_replication_id">>, RepProps) of
RepId ->
ok;
_ ->
- couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
+ update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
end.
-start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
+start_replication(Server, RepDoc, RepId, UserCtx, Wait) ->
+ ok = timer:sleep(Wait * 1000),
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
- ?LOG_INFO("Document `~s` triggered replication `~s`",
- [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
- ok = gen_server:call(Server, {triggered, RepId}, infinity),
+ ok = gen_server:call(Server, {rep_started, RepId}, infinity),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
- couch_rep:update_rep_doc(
- RepDoc,
- [{<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(element(1, RepId))}]),
- keep_retrying(
- Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
+ replication_error(RepId, Error)
end.
-keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
- ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
-
-keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
- {RepProps} = RepDoc,
- DocId = get_value(<<"_id">>, RepProps),
- ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. "
- "Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]),
- ok = timer:sleep(Wait * 1000),
- case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- Pid when is_pid(Pid) ->
- ok = gen_server:call(Server, {triggered, RepId}, infinity),
- [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
- ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
- [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
- couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
- NewError ->
- keep_retrying(
- Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
+replication_complete(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ case rep_state(RepId) of
+ nil ->
+ couch_rep:end_replication(RepId);
+ #rep_state{} ->
+ ok
+ end,
+ true = ets:delete(?DOC_TO_REP, DocId);
+ _ ->
+ ok
end.
rep_doc_deleted(DocId) ->
- case stop_replication(DocId) of
- {ok, RepId} ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
?LOG_INFO("Stopped replication `~s` because replication document `~s`"
" was deleted", [pp_rep_id(RepId), DocId]);
- none ->
+ [] ->
ok
end.
-replication_complete(DocId) ->
- case stop_replication(DocId) of
- {ok, RepId} ->
- ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
- [pp_rep_id(RepId), DocId]);
- none ->
- ok
+replication_error(State, RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ State;
+ RepState ->
+ maybe_retry_replication(RepId, RepState, Error, State)
end.
-
-stop_replication(DocId) ->
- case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
- [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
- couch_rep:end_replication(RepId),
- true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
- {ok, RepId};
- [] ->
- none
- end.
+maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) ->
+ #rep_state{
+ doc_id = DocId,
+ max_retries = MaxRetries
+ } = RepState,
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nReached maximum retry attempts (~p).",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+ State;
+
+maybe_retry_replication(RepId, RepState, Error, State) ->
+ #rep_state{
+ doc_id = DocId,
+ user_ctx = UserCtx,
+ doc = RepDoc
+ } = RepState,
+ #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+ ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nRestarting replication in ~p seconds.",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+ Server = self(),
+ Pid = spawn_link(fun() ->
+ start_replication(Server, RepDoc, RepId, UserCtx, Wait)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
stop_all_replications() ->
?LOG_INFO("Stopping all ongoing replications because the replicator"
" database was deleted or changed", []),
ets:foldl(
- fun({_, {RepId, _}}, _) ->
+ fun({_, RepId}, _) ->
couch_rep:end_replication(RepId)
end,
- ok, ?DOC_ID_TO_REP_ID),
- true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
- true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+ ok, ?DOC_TO_REP),
+ true = ets:delete_all_objects(?REP_TO_STATE),
+ true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+ {ok, RepDb} = ensure_rep_db_exists(),
+ try
+ case couch_db:open_doc(RepDb, RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end
+ catch throw:conflict ->
+ % Shouldn't happen, as by default only the role _replicator can
+ % update replication documents.
+ ?LOG_ERROR("Conflict error when updating replication document `~s`."
+ " Retrying.", [RepDocId]),
+ ok = timer:sleep(5),
+ update_rep_doc(RepDocId, KVs)
+ after
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, State} = KV, Body) ->
+ case get_value(K, Body) of
+ State ->
+ Body;
+ _ ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ lists:keystore(
+ <<"_replication_state_time">>, 1, Body1,
+ {<<"_replication_state_time">>, timestamp()})
+ end;
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody, KVs),
+ case NewRepDocBody of
+ RepDocBody ->
+ ok;
+ _ ->
+ % Might not succeed - when the replication doc is deleted right
+ % before this update (not an error, ignore).
+ couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
+ end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+ {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+ UTime = erlang:universaltime(),
+ LocalTime = calendar:universal_time_to_local_time(UTime),
+ DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+ calendar:datetime_to_gregorian_seconds(UTime),
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+ iolist_to_binary(
+ io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+ [Year, Month, Day, Hour, Min, Sec,
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+ io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+ io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ Opts = [
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+ sys_db
+ ],
+ case couch_db:open(DbName, Opts) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Opts)
+ end,
+ ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end,
+ ok.
% pretty-print replication id
pp_rep_id({Base, Extension}) ->
Base ++ Extension.
+
+
+rep_state(RepId) ->
+ case ets:lookup(?REP_TO_STATE, RepId) of
+ [{RepId, RepState}] ->
+ RepState;
+ [] ->
+ nil
+ end.
+
+
+error_reason({error, Reason}) ->
+ Reason;
+error_reason(Reason) ->
+ Reason.
+
+
+retries_value("infinity") ->
+ infinity;
+retries_value(Value) ->
+ list_to_integer(Value).
+
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+ Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+ case Left of
+ infinity ->
+ State#rep_state{wait = Wait2};
+ _ ->
+ State#rep_state{retries_left = Left - 1, wait = Wait2}
+ end.