summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl144
1 files changed, 15 insertions, 129 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 4fabdd99..a9c156e9 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,12 +16,10 @@
code_change/3]).
-export([replicate/2, checkpoint/1]).
--export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([make_replication_id/2]).
-export([start_replication/3, end_replication/1, get_result/4]).
--export([update_rep_doc/2]).
-include("couch_db.hrl").
--include("couch_js_functions.hrl").
-include("../ibrowse/ibrowse.hrl").
-define(REP_ID_VERSION, 2).
@@ -54,7 +52,6 @@
committed_seq = 0,
stats = nil,
- rep_doc = nil,
source_db_update_notifier = nil,
target_db_update_notifier = nil
}).
@@ -94,11 +91,11 @@ end_replication({BaseId, Extension}) ->
end
end.
-start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) ->
Replicator = {
BaseId ++ Extension,
{gen_server, start_link,
- [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+ [?MODULE, [RepId, RepDoc, UserCtx], []]},
temporary,
1,
worker,
@@ -135,7 +132,7 @@ init(InitArgs) ->
{stop, Error}
end.
-do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
+do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -152,10 +149,8 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
- maybe_set_triggered(RepDoc, RepId),
-
[SourceLog, TargetLog] = find_replication_logs(
- [Source, Target], RepId, {PostProps}, UserCtx),
+ [Source, Target], BaseId, {PostProps}, UserCtx),
{StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
{ok, ChangesFeed} =
@@ -174,10 +169,12 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
ets:insert(Stats, {docs_written, 0}),
ets:insert(Stats, {doc_write_failures, 0}),
- {ShortId, _} = lists:split(6, RepId),
+ {ShortId, _} = lists:split(6, BaseId),
couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
[ShortId, dbname(Source), dbname(Target)]), "Starting"),
+ couch_replication_manager:replication_started(RepId),
+
State = #state{
changes_feed = ChangesFeed,
missing_revs = MissingRevs,
@@ -200,7 +197,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- rep_doc = RepDoc,
source_db_update_notifier = source_db_update_notifier(Source),
target_db_update_notifier = target_db_update_notifier(Target)
},
@@ -272,27 +268,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
+terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) ->
do_terminate(State),
- update_rep_doc(
- State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+ couch_replication_manager:replication_completed(RepId);
-terminate(normal, State) ->
+terminate(normal, #state{init_args=[RepId | _]} = State) ->
timer:cancel(State#state.checkpoint_scheduled),
do_terminate(do_checkpoint(State)),
- update_rep_doc(
- State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+ couch_replication_manager:replication_completed(RepId);
terminate(shutdown, #state{listeners = Listeners} = State) ->
% continuous replication stopped
[gen_server:reply(L, {ok, stopped}) || L <- Listeners],
terminate_cleanup(State);
-terminate(Reason, #state{listeners = Listeners} = State) ->
+terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
terminate_cleanup(State),
- update_rep_doc(
- State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]).
+ couch_replication_manager:replication_error(RepId, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -698,7 +691,7 @@ do_checkpoint(State) ->
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
stats = Stats,
- rep_doc = {RepDoc}
+ init_args = [_RepId, {RepDoc} | _]
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
@@ -901,113 +894,6 @@ parse_proxy_params(ProxyUrl) ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
-update_rep_doc({Props} = _RepDoc, KVs) ->
- case couch_util:get_value(<<"_id">>, Props) of
- undefined ->
- % replication triggered by POSTing to _replicate/
- ok;
- RepDocId ->
- % replication triggered by adding a Rep Doc to the replicator DB
- {ok, RepDb} = ensure_rep_db_exists(),
- case couch_db:open_doc(RepDb, RepDocId, []) of
- {ok, LatestRepDoc} ->
- update_rep_doc(RepDb, LatestRepDoc, KVs);
- _ ->
- ok
- end,
- couch_db:close(RepDb)
- end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
- NewRepDocBody = lists:foldl(
- fun({<<"_replication_state">> = K, State} = KV, Body) ->
- case couch_util:get_value(K, Body) of
- State ->
- Body;
- _ ->
- Body1 = lists:keystore(K, 1, Body, KV),
- lists:keystore(
- <<"_replication_state_time">>, 1,
- Body1, {<<"_replication_state_time">>, timestamp()})
- end;
- ({K, _V} = KV, Body) ->
- lists:keystore(K, 1, Body, KV)
- end,
- RepDocBody,
- KVs
- ),
- case NewRepDocBody of
- RepDocBody ->
- ok;
- _ ->
- % might not succeed - when the replication doc is deleted right
- % before this update (not an error)
- couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
- end.
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
- {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
- UTime = erlang:universaltime(),
- LocalTime = calendar:universal_time_to_local_time(UTime),
- DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
- calendar:datetime_to_gregorian_seconds(UTime),
- zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
- iolist_to_binary(
- io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
- [Year, Month, Day, Hour, Min, Sec,
- zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
- io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
- io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-maybe_set_triggered({RepProps} = RepDoc, RepId) ->
- case couch_util:get_value(<<"_replication_state">>, RepProps) of
- <<"triggered">> ->
- ok;
- _ ->
- update_rep_doc(
- RepDoc,
- [
- {<<"_replication_state">>, <<"triggered">>},
- {<<"_replication_id">>, ?l2b(RepId)}
- ]
- )
- end.
-
-ensure_rep_db_exists() ->
- DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
- Opts = [
- {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
- sys_db
- ],
- case couch_db:open(DbName, Opts) of
- {ok, Db} ->
- Db;
- _Error ->
- {ok, Db} = couch_db:create(DbName, Opts)
- end,
- ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
- {ok, Db}.
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
- case couch_db:open_doc(RepDb, DDocID, []) of
- {ok, _Doc} ->
- ok;
- _ ->
- DDoc = couch_doc:from_json_obj({[
- {<<"_id">>, DDocID},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ]}),
- {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
- end,
- ok.
-
source_db_update_notifier(#db{name = DbName}) ->
Server = self(),
{ok, Notifier} = couch_db_update_notifier:start_link(