summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_changes.erl6
-rw-r--r--src/couchdb/couch_db.hrl3
-rw-r--r--src/couchdb/couch_js_functions.hrl73
-rw-r--r--src/couchdb/couch_rep.erl204
-rw-r--r--src/couchdb/couch_rep_db_listener.erl232
-rw-r--r--src/couchdb/couch_server.erl1
7 files changed, 466 insertions, 55 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 308a3837..219f7d82 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -66,6 +66,7 @@ source_files = \
couch_rep_reader.erl \
couch_rep_sup.erl \
couch_rep_writer.erl \
+ couch_rep_db_listener.erl \
couch_server.erl \
couch_server_sup.erl \
couch_stats_aggregator.erl \
@@ -124,6 +125,7 @@ compiled_files = \
couch_rep_reader.beam \
couch_rep_sup.beam \
couch_rep_writer.beam \
+ couch_rep_db_listener.beam \
couch_server.beam \
couch_server_sup.beam \
couch_stats_aggregator.beam \
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index 580148c8..098fac84 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -165,7 +165,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
TimeoutFun) ->
#changes_args{
feed = ResponseType,
- limit = Limit
+ limit = Limit,
+ db_open_options = DbOptions
} = Args,
% ?LOG_INFO("send_changes start ~p",[StartSeq]),
{ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
@@ -179,7 +180,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
case wait_db_updated(Timeout, TimeoutFun) of
updated ->
% ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
- case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of
+ DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+ case couch_db:open(Db#db.name, DbOptions1) of
{ok, Db2} ->
keep_sending_changes(
Args#changes_args{limit=NewLimit},
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index a35745ef..51fb25e2 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -289,6 +289,7 @@
heartbeat,
timeout,
filter = "",
- include_docs = false
+ include_docs = false,
+ db_open_options = []
}).
diff --git a/src/couchdb/couch_js_functions.hrl b/src/couchdb/couch_js_functions.hrl
index 1f314f6e..f850dd4c 100644
--- a/src/couchdb/couch_js_functions.hrl
+++ b/src/couchdb/couch_js_functions.hrl
@@ -95,3 +95,76 @@
}
}
">>).
+
+
+-define(REP_DB_DOC_VALIDATE_FUN, <<"
+ function(newDoc, oldDoc, userCtx) {
+ var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
+ var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
+
+ if (oldDoc && !newDoc._deleted && !isReplicator) {
+ throw({forbidden:
+ 'Only the replicator can edit replication documents. ' +
+ 'Admins can only add and delete replication documents.'
+ });
+ } else if (!isAdmin) {
+ throw({forbidden:
+ 'Only admins may add/delete replication documents.'
+ });
+ }
+
+ if (!oldDoc && newDoc.state) {
+ throw({forbidden:
+ 'The state field can only be set by the replicator.'
+ });
+ }
+
+ if (!oldDoc && newDoc.replication_id) {
+ throw({forbidden:
+ 'The replication_id field can only be set by the replicator.'
+ });
+ }
+
+ if (newDoc.user_ctx) {
+ var user_ctx = newDoc.user_ctx;
+
+ if (typeof user_ctx !== 'object') {
+ throw({forbidden: 'The user_ctx property must be an object.'});
+ }
+
+ if (!(user_ctx.name === null ||
+ (typeof user_ctx.name === 'undefined') ||
+ ((typeof user_ctx.name === 'string') &&
+ user_ctx.name.length > 0))) {
+ throw({forbidden:
+ 'The name property of the user_ctx must be a ' +
+ 'non-empty string.'
+ });
+ }
+
+ if ((typeof user_ctx.roles !== 'undefined') &&
+ (typeof user_ctx.roles.length !== 'number')) {
+ throw({forbidden:
+ 'The roles property of the user_ctx must be ' +
+ 'an array of strings.'
+ });
+ }
+
+ if (user_ctx.roles) {
+ for (var i = 0; i < user_ctx.roles.length; i++) {
+ var role = user_ctx.roles[i];
+
+ if (typeof role !== 'string' || role.length === 0) {
+ throw({forbidden: 'Roles must be non-empty strings.'});
+ }
+ if (role[0] === '_') {
+ throw({forbidden:
+ 'System roles (starting with underscore) ' +
+ 'are not allowed.'
+ });
+ }
+ }
+ }
+ }
+ }
+">>).
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index d3db8a68..57b30088 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,8 +16,12 @@
code_change/3]).
-export([replicate/2, checkpoint/1]).
+-export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([start_replication/3, end_replication/1, get_result/4]).
+-export([update_rep_doc/2]).
-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
-define(REP_ID_VERSION, 2).
@@ -48,7 +52,8 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil
+ doc_ids = nil,
+ rep_doc = nil
}).
%% convenience function to do a simple replication from the shell
@@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
%% function handling POST to _replicate
replicate({Props}=PostBody, UserCtx) ->
- BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
- Extension = maybe_append_options(
- [<<"continuous">>, <<"create_target">>], Props),
- Replicator = {BaseId ++ Extension,
- {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
- temporary,
- 1,
- worker,
- [?MODULE]
- },
-
+ RepId = make_replication_id(PostBody, UserCtx),
case couch_util:get_value(<<"cancel">>, Props, false) of
true ->
- case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of
- {error, not_found} ->
- {error, not_found};
- ok ->
- ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension),
- {ok, {cancelled, ?l2b(BaseId)}}
- end;
+ end_replication(RepId);
false ->
- Server = start_replication_server(Replicator),
+ Server = start_replication(PostBody, RepId, UserCtx),
+ get_result(Server, RepId, PostBody, UserCtx)
+ end.
- case couch_util:get_value(<<"continuous">>, Props, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId)}};
- false ->
- get_result(Server, PostBody, UserCtx)
- end
+end_replication({BaseId, Extension}) ->
+ RepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, RepId) of
+ {error, not_found} = R ->
+ R;
+ ok ->
+ ok = supervisor:delete_child(couch_rep_sup, RepId),
+ {ok, {cancelled, ?l2b(BaseId)}}
end.
+start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+ Replicator = {
+ BaseId ++ Extension,
+ {gen_server, start_link,
+ [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+ temporary,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ start_replication_server(Replicator).
+
checkpoint(Server) ->
gen_server:cast(Server, do_checkpoint).
-get_result(Server, PostBody, UserCtx) ->
- try gen_server:call(Server, get_result, infinity) of
- retry -> replicate(PostBody, UserCtx);
- Else -> Else
- catch
- exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
- %% oops, this replication just finished -- restart it.
- replicate(PostBody, UserCtx);
- exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
- %% we made the call during terminate
- replicate(PostBody, UserCtx)
+get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
+ try gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(PostBody, UserCtx);
+ Else -> Else
+ catch
+ exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% oops, this replication just finished -- restart it.
+ replicate(PostBody, UserCtx);
+ exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% we made the call during terminate
+ replicate(PostBody, UserCtx)
+ end
end.
init(InitArgs) ->
try do_init(InitArgs)
catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end.
-do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
+do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
+ maybe_set_triggered(RepDoc, RepId),
+
case DocIds of
List when is_list(List) ->
% Fast replication using only a list of doc IDs to replicate.
@@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds
+ doc_ids = DocIds,
+ rep_doc = RepDoc
},
{ok, State}.
@@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
- do_terminate(State);
+ do_terminate(State),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
terminate(normal, State) ->
timer:cancel(State#state.checkpoint_scheduled),
- do_terminate(do_checkpoint(State));
+ do_terminate(do_checkpoint(State)),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
-terminate(Reason, State) ->
- #state{
- listeners = Listeners,
- source = Source,
- target = Target,
- stats = Stats
- } = State,
+terminate(shutdown, #state{listeners = Listeners} = State) ->
+ % continuous replication stopped
+ [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
+ do_forced_terminate(State);
+
+terminate(Reason, #state{listeners = Listeners} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
- ets:delete(Stats),
- close_db(Target),
- close_db(Source).
+ do_forced_terminate(State),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% internal funs
+do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
+ ets:delete(Stats),
+ close_db(Target),
+ close_db(Source).
+
start_replication_server(Replicator) ->
RepId = element(1, Replicator),
case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Rest]) ->
has_session_id(SessionId, Rest)
end.
-maybe_append_options(Options, Props) ->
+maybe_append_options(Options, {Props}) ->
lists:foldl(fun(Option, Acc) ->
Acc ++
case couch_util:get_value(Option, Props, false) of
@@ -460,6 +478,12 @@ maybe_append_options(Options, Props) ->
end
end, [], Options).
+make_replication_id(RepProps, UserCtx) ->
+ BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], RepProps),
+ {BaseId, Extension}.
+
% Versioned clauses for generating replication ids
% If a change is made to how replications are identified
% add a new clause and increase ?REP_ID_VERSION at the top
@@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) ->
true ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+ case couch_util:get_value(<<"_id">>, Props) of
+ undefined ->
+ % replication triggered by POSTing to _replicate/
+ ok;
+ RepDocId ->
+ % replication triggered by adding a Rep Doc to the replicator DB
+ {ok, RepDb} = ensure_rep_db_exists(),
+ case couch_db:open_doc(RepDb, RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end,
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody,
+ KVs
+ ),
+ % might not succeed - when the replication doc is deleted right
+ % before this update (not an error)
+ couch_db:update_doc(
+ RepDb,
+ RepDoc#doc{body = {NewRepDocBody}},
+ []
+ ).
+
+maybe_set_triggered({RepProps} = RepDoc, RepId) ->
+ case couch_util:get_value(<<"state">>, RepProps) of
+ <<"triggered">> ->
+ ok;
+ _ ->
+ update_rep_doc(
+ RepDoc,
+ [
+ {<<"state">>, <<"triggered">>},
+ {<<"replication_id">>, ?l2b(RepId)}
+ ]
+ )
+ end.
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ Opts = [
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+ sys_db
+ ],
+ case couch_db:open(DbName, Opts) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Opts)
+ end,
+ ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end,
+ ok.
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
new file mode 100644
index 00000000..bc407693
--- /dev/null
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -0,0 +1,232 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_db_listener).
+-behaviour(gen_server).
+
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
+-define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
+
+-record(state, {
+ changes_feed_loop,
+ changes_queue,
+ changes_processor
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init(_) ->
+ process_flag(trap_exit, true),
+ {ok, Queue} = couch_work_queue:new(1024 * 1024, 1000),
+ {ok, Processor} = changes_processor(Queue),
+ {ok, Loop} = changes_feed_loop(Queue),
+ Server = self(),
+ ok = couch_config:register(
+ fun("replicator", "db") ->
+ ok = gen_server:call(Server, rep_db_changed, infinity)
+ end
+ ),
+ {ok, #state{
+ changes_feed_loop = Loop,
+ changes_queue = Queue,
+ changes_processor = Processor}
+ }.
+
+handle_call(rep_db_changed, _From, State) ->
+ #state{
+ changes_feed_loop = Loop,
+ changes_queue = Queue
+ } = State,
+ exit(Loop, rep_db_changed),
+ {ok, NewLoop} = changes_feed_loop(Queue),
+ {reply, ok, State#state{changes_feed_loop = NewLoop}}.
+
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+
+handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) ->
+ {noreply, State};
+
+handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
+ ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
+ {stop, rep_db_changes_processor_error, State}.
+
+
+terminate(_Reason, State) ->
+ #state{
+ changes_feed_loop = Loop,
+ changes_queue = Queue
+ } = State,
+ exit(Loop, stop),
+ % closing the queue will cause changes_processor to shutdown
+ couch_work_queue:close(Queue),
+ ok.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+changes_feed_loop(ChangesQueue) ->
+ {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+ Pid = spawn_link(
+ fun() ->
+ ChangesFeedFun = couch_changes:handle_changes(
+ #changes_args{
+ include_docs = true,
+ feed = "continuous",
+ timeout = infinity,
+ db_open_options = [sys_db]
+ },
+ {json_req, null},
+ RepDb
+ ),
+ ChangesFeedFun(
+ fun({change, Change, _}, _) ->
+ case has_valid_rep_id(Change) of
+ true ->
+ couch_work_queue:queue(ChangesQueue, Change);
+ false ->
+ ok
+ end;
+ (_, _) ->
+ ok
+ end
+ )
+ end
+ ),
+ couch_db:close(RepDb),
+ {ok, Pid}.
+
+
+changes_processor(ChangesQueue) ->
+ Pid = spawn_link(
+ fun() ->
+ ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
+ ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
+ consume_changes(ChangesQueue),
+ true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
+ true = ets:delete(?DOC_TO_REP_ID_MAP)
+ end
+ ),
+ {ok, Pid}.
+
+
+consume_changes(ChangesQueue) ->
+ case couch_work_queue:dequeue(ChangesQueue) of
+ closed ->
+ ok;
+ {ok, Changes} ->
+ lists:foreach(fun process_change/1, Changes),
+ consume_changes(ChangesQueue)
+ end.
+
+
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
+
+
+process_change({Change}) ->
+ {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ true ->
+ maybe_stop_replication(JsonRepDoc);
+ false ->
+ case couch_util:get_value(<<"state">>, RepProps) of
+ <<"completed">> ->
+ maybe_stop_replication(JsonRepDoc);
+ <<"error">> ->
+ % cleanup ets table entries
+ maybe_stop_replication(JsonRepDoc);
+ <<"triggered">> ->
+ maybe_start_replication(JsonRepDoc);
+ undefined ->
+ case couch_util:get_value(<<"replication_id">>, RepProps) of
+ undefined ->
+ maybe_start_replication(JsonRepDoc);
+ _ ->
+ ok
+ end
+ end
+ end,
+ ok.
+
+
+rep_user_ctx({RepDoc}) ->
+ case couch_util:get_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{roles = [<<"_admin">>]};
+ {UserCtx} ->
+ #user_ctx{
+ name = couch_util:get_value(<<"name">>, UserCtx, null),
+ roles = couch_util:get_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+
+maybe_start_replication({RepProps} = JsonRepDoc) ->
+ UserCtx = rep_user_ctx(JsonRepDoc),
+ RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ DocId = couch_util:get_value(<<"_id">>, RepProps),
+ case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of
+ [] ->
+ true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}),
+ true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
+ spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
+ [{RepId, DocId}] ->
+ ok;
+ [{RepId, _OtherDocId}] ->
+ couch_rep:update_rep_doc(
+ JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}]
+ )
+ end.
+
+
+start_replication(RepDoc, RepId, UserCtx) ->
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+ RepPid when is_pid(RepPid) ->
+ couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
+ Error ->
+ couch_rep:update_rep_doc(
+ RepDoc,
+ [
+ {<<"state">>, <<"error">>},
+ {<<"replication_id">>, ?l2b(element(1, RepId))}
+ ]
+ ),
+ ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error])
+ end.
+
+
+maybe_stop_replication({RepProps}) ->
+ DocId = couch_util:get_value(<<"_id">>, RepProps),
+ case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
+ [{DocId, RepId}] ->
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId),
+ true = ets:delete(?DOC_TO_REP_ID_MAP, DocId);
+ [] ->
+ ok
+ end.
diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl
index 0f8f66f3..2d96d5f3 100644
--- a/src/couchdb/couch_server.erl
+++ b/src/couchdb/couch_server.erl
@@ -76,6 +76,7 @@ check_dbname(#server{dbname_regexp=RegExp}, DbName) ->
nomatch ->
case DbName of
"_users" -> ok;
+ "_replicator" -> ok;
_Else ->
{error, illegal_database_name}
end;