summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl204
1 files changed, 152 insertions, 52 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index d3db8a68..57b30088 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,8 +16,12 @@
code_change/3]).
-export([replicate/2, checkpoint/1]).
+-export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([start_replication/3, end_replication/1, get_result/4]).
+-export([update_rep_doc/2]).
-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
-define(REP_ID_VERSION, 2).
@@ -48,7 +52,8 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil
+ doc_ids = nil,
+ rep_doc = nil
}).
%% convenience function to do a simple replication from the shell
@@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
%% function handling POST to _replicate
replicate({Props}=PostBody, UserCtx) ->
- BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
- Extension = maybe_append_options(
- [<<"continuous">>, <<"create_target">>], Props),
- Replicator = {BaseId ++ Extension,
- {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
- temporary,
- 1,
- worker,
- [?MODULE]
- },
-
+ RepId = make_replication_id(PostBody, UserCtx),
case couch_util:get_value(<<"cancel">>, Props, false) of
true ->
- case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of
- {error, not_found} ->
- {error, not_found};
- ok ->
- ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension),
- {ok, {cancelled, ?l2b(BaseId)}}
- end;
+ end_replication(RepId);
false ->
- Server = start_replication_server(Replicator),
+ Server = start_replication(PostBody, RepId, UserCtx),
+ get_result(Server, RepId, PostBody, UserCtx)
+ end.
- case couch_util:get_value(<<"continuous">>, Props, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId)}};
- false ->
- get_result(Server, PostBody, UserCtx)
- end
+end_replication({BaseId, Extension}) ->
+ RepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, RepId) of
+ {error, not_found} = R ->
+ R;
+ ok ->
+ ok = supervisor:delete_child(couch_rep_sup, RepId),
+ {ok, {cancelled, ?l2b(BaseId)}}
end.
+start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+ Replicator = {
+ BaseId ++ Extension,
+ {gen_server, start_link,
+ [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+ temporary,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ start_replication_server(Replicator).
+
checkpoint(Server) ->
gen_server:cast(Server, do_checkpoint).
-get_result(Server, PostBody, UserCtx) ->
- try gen_server:call(Server, get_result, infinity) of
- retry -> replicate(PostBody, UserCtx);
- Else -> Else
- catch
- exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
- %% oops, this replication just finished -- restart it.
- replicate(PostBody, UserCtx);
- exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
- %% we made the call during terminate
- replicate(PostBody, UserCtx)
+get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
+ try gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(PostBody, UserCtx);
+ Else -> Else
+ catch
+ exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% oops, this replication just finished -- restart it.
+ replicate(PostBody, UserCtx);
+ exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% we made the call during terminate
+ replicate(PostBody, UserCtx)
+ end
end.
init(InitArgs) ->
try do_init(InitArgs)
catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end.
-do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
+do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
+ maybe_set_triggered(RepDoc, RepId),
+
case DocIds of
List when is_list(List) ->
% Fast replication using only a list of doc IDs to replicate.
@@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds
+ doc_ids = DocIds,
+ rep_doc = RepDoc
},
{ok, State}.
@@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
- do_terminate(State);
+ do_terminate(State),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
terminate(normal, State) ->
timer:cancel(State#state.checkpoint_scheduled),
- do_terminate(do_checkpoint(State));
+ do_terminate(do_checkpoint(State)),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
-terminate(Reason, State) ->
- #state{
- listeners = Listeners,
- source = Source,
- target = Target,
- stats = Stats
- } = State,
+terminate(shutdown, #state{listeners = Listeners} = State) ->
+ % continuous replication stopped
+ [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
+ do_forced_terminate(State);
+
+terminate(Reason, #state{listeners = Listeners} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
- ets:delete(Stats),
- close_db(Target),
- close_db(Source).
+ do_forced_terminate(State),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% internal funs
+do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
+ ets:delete(Stats),
+ close_db(Target),
+ close_db(Source).
+
start_replication_server(Replicator) ->
RepId = element(1, Replicator),
case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Rest]) ->
has_session_id(SessionId, Rest)
end.
-maybe_append_options(Options, Props) ->
+maybe_append_options(Options, {Props}) ->
lists:foldl(fun(Option, Acc) ->
Acc ++
case couch_util:get_value(Option, Props, false) of
@@ -460,6 +478,12 @@ maybe_append_options(Options, Props) ->
end
end, [], Options).
+make_replication_id(RepProps, UserCtx) ->
+ BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], RepProps),
+ {BaseId, Extension}.
+
% Versioned clauses for generating replication ids
% If a change is made to how replications are identified
% add a new clause and increase ?REP_ID_VERSION at the top
@@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) ->
true ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+ case couch_util:get_value(<<"_id">>, Props) of
+ undefined ->
+ % replication triggered by POSTing to _replicate/
+ ok;
+ RepDocId ->
+ % replication triggered by adding a Rep Doc to the replicator DB
+ {ok, RepDb} = ensure_rep_db_exists(),
+ case couch_db:open_doc(RepDb, RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end,
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody,
+ KVs
+ ),
+ % might not succeed - when the replication doc is deleted right
+ % before this update (not an error)
+ couch_db:update_doc(
+ RepDb,
+ RepDoc#doc{body = {NewRepDocBody}},
+ []
+ ).
+
+maybe_set_triggered({RepProps} = RepDoc, RepId) ->
+ case couch_util:get_value(<<"state">>, RepProps) of
+ <<"triggered">> ->
+ ok;
+ _ ->
+ update_rep_doc(
+ RepDoc,
+ [
+ {<<"state">>, <<"triggered">>},
+ {<<"replication_id">>, ?l2b(RepId)}
+ ]
+ )
+ end.
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ Opts = [
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+ sys_db
+ ],
+ case couch_db:open(DbName, Opts) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Opts)
+ end,
+ ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end,
+ ok.