diff options
author | Filipe David Borba Manana <fdmanana@apache.org> | 2010-08-04 17:05:22 +0000 |
---|---|---|
committer | Filipe David Borba Manana <fdmanana@apache.org> | 2010-08-04 17:05:22 +0000 |
commit | 77962e9b1458e97aa8a534fe18f2eda1965cc8b1 (patch) | |
tree | 412882a3a596ca852f70219c920f67344925e091 /src/couchdb/couch_rep.erl | |
parent | 8446f0c3a69f7925d9104fd4487175c618b5a9dc (diff) |
Add replicator DB (_replicator).
Part of ticket COUCHDB-776.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@982330 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 204 |
1 files changed, 152 insertions, 52 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index d3db8a68..57b30088 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,8 +16,12 @@ code_change/3]). -export([replicate/2, checkpoint/1]). +-export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([start_replication/3, end_replication/1, get_result/4]). +-export([update_rep_doc/2]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). -define(REP_ID_VERSION, 2). @@ -48,7 +52,8 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + rep_doc = nil }). %% convenience function to do a simple replication from the shell @@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION), - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, - temporary, - 1, - worker, - [?MODULE] - }, - + RepId = make_replication_id(PostBody, UserCtx), case couch_util:get_value(<<"cancel">>, Props, false) of true -> - case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of - {error, not_found} -> - {error, not_found}; - ok -> - ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), - {ok, {cancelled, ?l2b(BaseId)}} - end; + end_replication(RepId); false -> - Server = start_replication_server(Replicator), + Server = start_replication(PostBody, RepId, UserCtx), + get_result(Server, RepId, PostBody, UserCtx) + end. - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> - {ok, {continuous, ?l2b(BaseId)}}; - false -> - get_result(Server, PostBody, UserCtx) - end +end_replication({BaseId, Extension}) -> + RepId = BaseId ++ Extension, + case supervisor:terminate_child(couch_rep_sup, RepId) of + {error, not_found} = R -> + R; + ok -> + ok = supervisor:delete_child(couch_rep_sup, RepId), + {ok, {cancelled, ?l2b(BaseId)}} end. +start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> + Replicator = { + BaseId ++ Extension, + {gen_server, start_link, + [?MODULE, [BaseId, RepDoc, UserCtx], []]}, + temporary, + 1, + worker, + [?MODULE] + }, + start_replication_server(Replicator). + checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). -get_result(Server, PostBody, UserCtx) -> - try gen_server:call(Server, get_result, infinity) of - retry -> replicate(PostBody, UserCtx); - Else -> Else - catch - exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> - %% oops, this replication just finished -- restart it. - replicate(PostBody, UserCtx); - exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> - %% we made the call during terminate - replicate(PostBody, UserCtx) +get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result, infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) + end end. init(InitArgs) -> try do_init(InitArgs) catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. -do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), + maybe_set_triggered(RepDoc, RepId), + case DocIds of List when is_list(List) -> % Fast replication using only a list of doc IDs to replicate. @@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + rep_doc = RepDoc }, {ok, State}. @@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. terminate(normal, #state{checkpoint_scheduled=nil} = State) -> - do_terminate(State); + do_terminate(State), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]); terminate(normal, State) -> timer:cancel(State#state.checkpoint_scheduled), - do_terminate(do_checkpoint(State)); + do_terminate(do_checkpoint(State)), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]); -terminate(Reason, State) -> - #state{ - listeners = Listeners, - source = Source, - target = Target, - stats = Stats - } = State, +terminate(shutdown, #state{listeners = Listeners} = State) -> + % continuous replication stopped + [gen_server:reply(L, {ok, stopped}) || L <- Listeners], + do_forced_terminate(State); + +terminate(Reason, #state{listeners = Listeners} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), - close_db(Target), - close_db(Source). + do_forced_terminate(State), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. % internal funs +do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) -> + ets:delete(Stats), + close_db(Target), + close_db(Source). + start_replication_server(Replicator) -> RepId = element(1, Replicator), case supervisor:start_child(couch_rep_sup, Replicator) of @@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Rest]) -> has_session_id(SessionId, Rest) end. -maybe_append_options(Options, Props) -> +maybe_append_options(Options, {Props}) -> lists:foldl(fun(Option, Acc) -> Acc ++ case couch_util:get_value(Option, Props, false) of @@ -460,6 +478,12 @@ maybe_append_options(Options, Props) -> end end, [], Options). +make_replication_id(RepProps, UserCtx) -> + BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], RepProps), + {BaseId, Extension}. + % Versioned clauses for generating replication ids % If a change is made to how replications are identified % add a new clause and increase ?REP_ID_VERSION at the top @@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +update_rep_doc({Props} = _RepDoc, KVs) -> + case couch_util:get_value(<<"_id">>, Props) of + undefined -> + % replication triggered by POSTing to _replicate/ + ok; + RepDocId -> + % replication triggered by adding a Rep Doc to the replicator DB + {ok, RepDb} = ensure_rep_db_exists(), + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end, + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, + KVs + ), + % might not succeed - when the replication doc is deleted right + % before this update (not an error) + couch_db:update_doc( + RepDb, + RepDoc#doc{body = {NewRepDocBody}}, + [] + ). + +maybe_set_triggered({RepProps} = RepDoc, RepId) -> + case couch_util:get_value(<<"state">>, RepProps) of + <<"triggered">> -> + ok; + _ -> + update_rep_doc( + RepDoc, + [ + {<<"state">>, <<"triggered">>}, + {<<"replication_id">>, ?l2b(RepId)} + ] + ) + end. + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. |