From 77962e9b1458e97aa8a534fe18f2eda1965cc8b1 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Wed, 4 Aug 2010 17:05:22 +0000 Subject: Add replicator DB (_replicator). Part of ticket COUCHDB-776. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@982330 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/Makefile.am | 2 + src/couchdb/couch_changes.erl | 6 +- src/couchdb/couch_db.hrl | 3 +- src/couchdb/couch_js_functions.hrl | 73 +++++++++++ src/couchdb/couch_rep.erl | 204 ++++++++++++++++++++++-------- src/couchdb/couch_rep_db_listener.erl | 232 ++++++++++++++++++++++++++++++++++ src/couchdb/couch_server.erl | 1 + 7 files changed, 466 insertions(+), 55 deletions(-) create mode 100644 src/couchdb/couch_rep_db_listener.erl (limited to 'src/couchdb') diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 308a3837..219f7d82 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -66,6 +66,7 @@ source_files = \ couch_rep_reader.erl \ couch_rep_sup.erl \ couch_rep_writer.erl \ + couch_rep_db_listener.erl \ couch_server.erl \ couch_server_sup.erl \ couch_stats_aggregator.erl \ @@ -124,6 +125,7 @@ compiled_files = \ couch_rep_reader.beam \ couch_rep_sup.beam \ couch_rep_writer.beam \ + couch_rep_db_listener.beam \ couch_server.beam \ couch_server_sup.beam \ couch_stats_aggregator.beam \ diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index 580148c8..098fac84 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -165,7 +165,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ feed = ResponseType, - limit = Limit + limit = Limit, + db_open_options = DbOptions } = Args, % ?LOG_INFO("send_changes start ~p",[StartSeq]), {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( @@ -179,7 +180,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, case wait_db_updated(Timeout, TimeoutFun) of updated -> % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), - case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of + DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], + case couch_db:open(Db#db.name, DbOptions1) of {ok, Db2} -> keep_sending_changes( Args#changes_args{limit=NewLimit}, diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index a35745ef..51fb25e2 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -289,6 +289,7 @@ heartbeat, timeout, filter = "", - include_docs = false + include_docs = false, + db_open_options = [] }). diff --git a/src/couchdb/couch_js_functions.hrl b/src/couchdb/couch_js_functions.hrl index 1f314f6e..f850dd4c 100644 --- a/src/couchdb/couch_js_functions.hrl +++ b/src/couchdb/couch_js_functions.hrl @@ -95,3 +95,76 @@ } } ">>). + + +-define(REP_DB_DOC_VALIDATE_FUN, <<" + function(newDoc, oldDoc, userCtx) { + var isAdmin = (userCtx.roles.indexOf('_admin') >= 0); + var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); + + if (oldDoc && !newDoc._deleted && !isReplicator) { + throw({forbidden: + 'Only the replicator can edit replication documents. ' + + 'Admins can only add and delete replication documents.' + }); + } else if (!isAdmin) { + throw({forbidden: + 'Only admins may add/delete replication documents.' + }); + } + + if (!oldDoc && newDoc.state) { + throw({forbidden: + 'The state field can only be set by the replicator.' + }); + } + + if (!oldDoc && newDoc.replication_id) { + throw({forbidden: + 'The replication_id field can only be set by the replicator.' + }); + } + + if (newDoc.user_ctx) { + var user_ctx = newDoc.user_ctx; + + if (typeof user_ctx !== 'object') { + throw({forbidden: 'The user_ctx property must be an object.'}); + } + + if (!(user_ctx.name === null || + (typeof user_ctx.name === 'undefined') || + ((typeof user_ctx.name === 'string') && + user_ctx.name.length > 0))) { + throw({forbidden: + 'The name property of the user_ctx must be a ' + + 'non-empty string.' + }); + } + + if ((typeof user_ctx.roles !== 'undefined') && + (typeof user_ctx.roles.length !== 'number')) { + throw({forbidden: + 'The roles property of the user_ctx must be ' + + 'an array of strings.' + }); + } + + if (user_ctx.roles) { + for (var i = 0; i < user_ctx.roles.length; i++) { + var role = user_ctx.roles[i]; + + if (typeof role !== 'string' || role.length === 0) { + throw({forbidden: 'Roles must be non-empty strings.'}); + } + if (role[0] === '_') { + throw({forbidden: + 'System roles (starting with underscore) ' + + 'are not allowed.' + }); + } + } + } + } + } +">>). diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index d3db8a68..57b30088 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,8 +16,12 @@ code_change/3]). -export([replicate/2, checkpoint/1]). +-export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([start_replication/3, end_replication/1, get_result/4]). +-export([update_rep_doc/2]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). -define(REP_ID_VERSION, 2). @@ -48,7 +52,8 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + rep_doc = nil }). %% convenience function to do a simple replication from the shell @@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION), - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, - temporary, - 1, - worker, - [?MODULE] - }, - + RepId = make_replication_id(PostBody, UserCtx), case couch_util:get_value(<<"cancel">>, Props, false) of true -> - case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of - {error, not_found} -> - {error, not_found}; - ok -> - ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), - {ok, {cancelled, ?l2b(BaseId)}} - end; + end_replication(RepId); false -> - Server = start_replication_server(Replicator), + Server = start_replication(PostBody, RepId, UserCtx), + get_result(Server, RepId, PostBody, UserCtx) + end. - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> - {ok, {continuous, ?l2b(BaseId)}}; - false -> - get_result(Server, PostBody, UserCtx) - end +end_replication({BaseId, Extension}) -> + RepId = BaseId ++ Extension, + case supervisor:terminate_child(couch_rep_sup, RepId) of + {error, not_found} = R -> + R; + ok -> + ok = supervisor:delete_child(couch_rep_sup, RepId), + {ok, {cancelled, ?l2b(BaseId)}} end. +start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> + Replicator = { + BaseId ++ Extension, + {gen_server, start_link, + [?MODULE, [BaseId, RepDoc, UserCtx], []]}, + temporary, + 1, + worker, + [?MODULE] + }, + start_replication_server(Replicator). + checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). -get_result(Server, PostBody, UserCtx) -> - try gen_server:call(Server, get_result, infinity) of - retry -> replicate(PostBody, UserCtx); - Else -> Else - catch - exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> - %% oops, this replication just finished -- restart it. - replicate(PostBody, UserCtx); - exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> - %% we made the call during terminate - replicate(PostBody, UserCtx) +get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result, infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) + end end. init(InitArgs) -> try do_init(InitArgs) catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. -do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), + maybe_set_triggered(RepDoc, RepId), + case DocIds of List when is_list(List) -> % Fast replication using only a list of doc IDs to replicate. @@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + rep_doc = RepDoc }, {ok, State}. @@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. terminate(normal, #state{checkpoint_scheduled=nil} = State) -> - do_terminate(State); + do_terminate(State), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]); terminate(normal, State) -> timer:cancel(State#state.checkpoint_scheduled), - do_terminate(do_checkpoint(State)); + do_terminate(do_checkpoint(State)), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]); -terminate(Reason, State) -> - #state{ - listeners = Listeners, - source = Source, - target = Target, - stats = Stats - } = State, +terminate(shutdown, #state{listeners = Listeners} = State) -> + % continuous replication stopped + [gen_server:reply(L, {ok, stopped}) || L <- Listeners], + do_forced_terminate(State); + +terminate(Reason, #state{listeners = Listeners} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), - close_db(Target), - close_db(Source). + do_forced_terminate(State), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. % internal funs +do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) -> + ets:delete(Stats), + close_db(Target), + close_db(Source). + start_replication_server(Replicator) -> RepId = element(1, Replicator), case supervisor:start_child(couch_rep_sup, Replicator) of @@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Rest]) -> has_session_id(SessionId, Rest) end. -maybe_append_options(Options, Props) -> +maybe_append_options(Options, {Props}) -> lists:foldl(fun(Option, Acc) -> Acc ++ case couch_util:get_value(Option, Props, false) of @@ -460,6 +478,12 @@ maybe_append_options(Options, Props) -> end end, [], Options). +make_replication_id(RepProps, UserCtx) -> + BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], RepProps), + {BaseId, Extension}. + % Versioned clauses for generating replication ids % If a change is made to how replications are identified % add a new clause and increase ?REP_ID_VERSION at the top @@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +update_rep_doc({Props} = _RepDoc, KVs) -> + case couch_util:get_value(<<"_id">>, Props) of + undefined -> + % replication triggered by POSTing to _replicate/ + ok; + RepDocId -> + % replication triggered by adding a Rep Doc to the replicator DB + {ok, RepDb} = ensure_rep_db_exists(), + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end, + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, + KVs + ), + % might not succeed - when the replication doc is deleted right + % before this update (not an error) + couch_db:update_doc( + RepDb, + RepDoc#doc{body = {NewRepDocBody}}, + [] + ). + +maybe_set_triggered({RepProps} = RepDoc, RepId) -> + case couch_util:get_value(<<"state">>, RepProps) of + <<"triggered">> -> + ok; + _ -> + update_rep_doc( + RepDoc, + [ + {<<"state">>, <<"triggered">>}, + {<<"replication_id">>, ?l2b(RepId)} + ] + ) + end. + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl new file mode 100644 index 00000000..bc407693 --- /dev/null +++ b/src/couchdb/couch_rep_db_listener.erl @@ -0,0 +1,232 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_db_listener). +-behaviour(gen_server). + +-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). + +-include("couch_db.hrl"). + +-define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id). +-define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). + +-record(state, { + changes_feed_loop, + changes_queue, + changes_processor +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + process_flag(trap_exit, true), + {ok, Queue} = couch_work_queue:new(1024 * 1024, 1000), + {ok, Processor} = changes_processor(Queue), + {ok, Loop} = changes_feed_loop(Queue), + Server = self(), + ok = couch_config:register( + fun("replicator", "db") -> + ok = gen_server:call(Server, rep_db_changed, infinity) + end + ), + {ok, #state{ + changes_feed_loop = Loop, + changes_queue = Queue, + changes_processor = Processor} + }. + +handle_call(rep_db_changed, _From, State) -> + #state{ + changes_feed_loop = Loop, + changes_queue = Queue + } = State, + exit(Loop, rep_db_changed), + {ok, NewLoop} = changes_feed_loop(Queue), + {reply, ok, State#state{changes_feed_loop = NewLoop}}. + + +handle_cast(_Msg, State) -> + {noreply, State}. + + +handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) -> + {noreply, State}; + +handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) -> + ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]), + {stop, rep_db_changes_processor_error, State}. + + +terminate(_Reason, State) -> + #state{ + changes_feed_loop = Loop, + changes_queue = Queue + } = State, + exit(Loop, stop), + % closing the queue will cause changes_processor to shutdown + couch_work_queue:close(Queue), + ok. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +changes_feed_loop(ChangesQueue) -> + {ok, RepDb} = couch_rep:ensure_rep_db_exists(), + Pid = spawn_link( + fun() -> + ChangesFeedFun = couch_changes:handle_changes( + #changes_args{ + include_docs = true, + feed = "continuous", + timeout = infinity, + db_open_options = [sys_db] + }, + {json_req, null}, + RepDb + ), + ChangesFeedFun( + fun({change, Change, _}, _) -> + case has_valid_rep_id(Change) of + true -> + couch_work_queue:queue(ChangesQueue, Change); + false -> + ok + end; + (_, _) -> + ok + end + ) + end + ), + couch_db:close(RepDb), + {ok, Pid}. + + +changes_processor(ChangesQueue) -> + Pid = spawn_link( + fun() -> + ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]), + ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]), + consume_changes(ChangesQueue), + true = ets:delete(?REP_ID_TO_DOC_ID_MAP), + true = ets:delete(?DOC_TO_REP_ID_MAP) + end + ), + {ok, Pid}. + + +consume_changes(ChangesQueue) -> + case couch_work_queue:dequeue(ChangesQueue) of + closed -> + ok; + {ok, Changes} -> + lists:foreach(fun process_change/1, Changes), + consume_changes(ChangesQueue) + end. + + +has_valid_rep_id({Change}) -> + has_valid_rep_id(couch_util:get_value(<<"id">>, Change)); +has_valid_rep_id(<>) -> + false; +has_valid_rep_id(_Else) -> + true. + + +process_change({Change}) -> + {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), + case couch_util:get_value(<<"deleted">>, Change, false) of + true -> + maybe_stop_replication(JsonRepDoc); + false -> + case couch_util:get_value(<<"state">>, RepProps) of + <<"completed">> -> + maybe_stop_replication(JsonRepDoc); + <<"error">> -> + % cleanup ets table entries + maybe_stop_replication(JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(JsonRepDoc); + undefined -> + case couch_util:get_value(<<"replication_id">>, RepProps) of + undefined -> + maybe_start_replication(JsonRepDoc); + _ -> + ok + end + end + end, + ok. + + +rep_user_ctx({RepDoc}) -> + case couch_util:get_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{roles = [<<"_admin">>]}; + {UserCtx} -> + #user_ctx{ + name = couch_util:get_value(<<"name">>, UserCtx, null), + roles = couch_util:get_value(<<"roles">>, UserCtx, []) + } + end. + + +maybe_start_replication({RepProps} = JsonRepDoc) -> + UserCtx = rep_user_ctx(JsonRepDoc), + RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), + DocId = couch_util:get_value(<<"_id">>, RepProps), + case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of + [] -> + true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}), + true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), + spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end); + [{RepId, DocId}] -> + ok; + [{RepId, _OtherDocId}] -> + couch_rep:update_rep_doc( + JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}] + ) + end. + + +start_replication(RepDoc, RepId, UserCtx) -> + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + RepPid when is_pid(RepPid) -> + couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx); + Error -> + couch_rep:update_rep_doc( + RepDoc, + [ + {<<"state">>, <<"error">>}, + {<<"replication_id">>, ?l2b(element(1, RepId))} + ] + ), + ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error]) + end. + + +maybe_stop_replication({RepProps}) -> + DocId = couch_util:get_value(<<"_id">>, RepProps), + case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of + [{DocId, RepId}] -> + couch_rep:end_replication(RepId), + true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId), + true = ets:delete(?DOC_TO_REP_ID_MAP, DocId); + [] -> + ok + end. diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index 0f8f66f3..2d96d5f3 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -76,6 +76,7 @@ check_dbname(#server{dbname_regexp=RegExp}, DbName) -> nomatch -> case DbName of "_users" -> ok; + "_replicator" -> ok; _Else -> {error, illegal_database_name} end; -- cgit v1.2.3