summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-08-04 17:05:22 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-08-04 17:05:22 +0000
commit77962e9b1458e97aa8a534fe18f2eda1965cc8b1 (patch)
tree412882a3a596ca852f70219c920f67344925e091
parent8446f0c3a69f7925d9104fd4487175c618b5a9dc (diff)
Add replicator DB (_replicator).
Part of ticket COUCHDB-776. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@982330 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--etc/couchdb/default.ini.tpl.in4
-rw-r--r--share/Makefile.am1
-rw-r--r--share/www/script/couch_tests.js1
-rw-r--r--share/www/script/test/replicator_db.js750
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_changes.erl6
-rw-r--r--src/couchdb/couch_db.hrl3
-rw-r--r--src/couchdb/couch_js_functions.hrl73
-rw-r--r--src/couchdb/couch_rep.erl204
-rw-r--r--src/couchdb/couch_rep_db_listener.erl232
-rw-r--r--src/couchdb/couch_server.erl1
11 files changed, 1221 insertions, 56 deletions
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in
index 68350fc2..a890beb7 100644
--- a/etc/couchdb/default.ini.tpl.in
+++ b/etc/couchdb/default.ini.tpl.in
@@ -62,6 +62,7 @@ stats_aggregator={couch_stats_aggregator, start, []}
stats_collector={couch_stats_collector, start, []}
uuids={couch_uuids, start, []}
auth_cache={couch_auth_cache, start_link, []}
+rep_db_changes_listener={couch_rep_db_listener, start_link, []}
[httpd_global_handlers]
/ = {couch_httpd_misc_handlers, handle_welcome_req, <<"Welcome">>}
@@ -123,5 +124,6 @@ compression_level = 8 ; from 1 (lowest, fastest) to 9 (highest, slowest), 0 to d
compressible_types = text/*, application/javascript, application/json, application/xml
[replicator]
+db = _replicator
max_http_sessions = 10
-max_http_pipeline_size = 10 \ No newline at end of file
+max_http_pipeline_size = 10
diff --git a/share/Makefile.am b/share/Makefile.am
index 77d5f8d8..52de3d2b 100644
--- a/share/Makefile.am
+++ b/share/Makefile.am
@@ -152,6 +152,7 @@ nobase_dist_localdata_DATA = \
www/script/test/reduce_false.js \
www/script/test/reduce_false_temp.js \
www/script/test/replication.js \
+ www/script/test/replicator_db.js \
www/script/test/rev_stemming.js \
www/script/test/rewrite.js \
www/script/test/security_validation.js \
diff --git a/share/www/script/couch_tests.js b/share/www/script/couch_tests.js
index 5723eece..c048521c 100644
--- a/share/www/script/couch_tests.js
+++ b/share/www/script/couch_tests.js
@@ -75,6 +75,7 @@ loadTest("reduce_builtin.js");
loadTest("reduce_false.js");
loadTest("reduce_false_temp.js");
loadTest("replication.js");
+loadTest("replicator_db.js");
loadTest("rev_stemming.js");
loadTest("rewrite.js");
loadTest("security_validation.js");
diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js
new file mode 100644
index 00000000..edeb1cc5
--- /dev/null
+++ b/share/www/script/test/replicator_db.js
@@ -0,0 +1,750 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+couchTests.replicator_db = function(debug) {
+
+ if (debug) debugger;
+
+ var wait_rep_doc = 500; // number of millisecs to wait after saving a Rep Doc
+ var host = CouchDB.host;
+ var dbA = new CouchDB("test_suite_rep_db_a", {"X-Couch-Full-Commit":"false"});
+ var dbB = new CouchDB("test_suite_rep_db_b", {"X-Couch-Full-Commit":"false"});
+ var repDb = new CouchDB("test_suite_rep_db", {"X-Couch-Full-Commit":"false"});
+ var usersDb = new CouchDB("test_suite_auth", {"X-Couch-Full-Commit":"false"});
+
+ var docs1 = [
+ {
+ _id: "foo1",
+ value: 11
+ },
+ {
+ _id: "foo2",
+ value: 22
+ },
+ {
+ _id: "foo3",
+ value: 33
+ }
+ ];
+
+ function waitForRep(repDb, repDoc, state) {
+ var newRep,
+ t0 = new Date(),
+ t1,
+ ms = 1000;
+
+ do {
+ newRep = repDb.open(repDoc._id);
+ t1 = new Date();
+ } while (((t1 - t0) <= ms) && newRep.state !== state);
+ }
+
+ function waitForSeq(sourceDb, targetDb) {
+ var targetSeq,
+ sourceSeq = sourceDb.info().update_seq,
+ t0 = new Date(),
+ t1,
+ ms = 1000;
+
+ do {
+ targetSeq = targetDb.info().update_seq;
+ t1 = new Date();
+ } while (((t1 - t0) <= ms) && targetSeq < sourceSeq);
+ }
+
+ function wait(ms) {
+ var t0 = new Date(), t1;
+ do {
+ CouchDB.request("GET", "/");
+ t1 = new Date();
+ } while ((t1 - t0) <= ms);
+ }
+
+
+ function populate_db(db, docs) {
+ db.deleteDb();
+ db.createDb();
+ for (var i = 0; i < docs.length; i++) {
+ var d = docs[i];
+ delete d._rev;
+ T(db.save(d).ok);
+ }
+ }
+
+ function simple_replication() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc = {
+ _id: "foo_simple_rep",
+ source: dbA.name,
+ target: dbB.name
+ };
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ var repDoc1 = repDb.open(repDoc._id);
+ T(repDoc1 !== null);
+ T(repDoc1.source === repDoc.source);
+ T(repDoc1.target === repDoc.target);
+ T(repDoc1.state === "completed", "simple");
+ T(typeof repDoc1.replication_id === "string");
+ }
+
+
+ function filtered_replication() {
+ var docs2 = docs1.concat([
+ {
+ _id: "_design/mydesign",
+ language : "javascript",
+ filters : {
+ myfilter : (function(doc, req) {
+ return (doc.value % 2) !== Number(req.query.myparam);
+ }).toString()
+ }
+ }
+ ]);
+
+ populate_db(dbA, docs2);
+ populate_db(dbB, []);
+
+ var repDoc = {
+ _id: "foo_filt_rep_doc",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name,
+ filter: "mydesign/myfilter",
+ query_params: {
+ myparam: 1
+ }
+ };
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+ for (var i = 0; i < docs2.length; i++) {
+ var doc = docs2[i];
+ var copy = dbB.open(doc._id);
+
+ if (typeof doc.value === "number") {
+ if ((doc.value % 2) !== 1) {
+ T(copy !== null);
+ T(copy.value === doc.value);
+ } else {
+ T(copy === null);
+ }
+ }
+ }
+
+ var repDoc1 = repDb.open(repDoc._id);
+ T(repDoc1 !== null);
+ T(repDoc1.source === repDoc.source);
+ T(repDoc1.target === repDoc.target);
+ T(repDoc1.state === "completed", "filtered");
+ T(typeof repDoc1.replication_id === "string");
+ }
+
+
+ function continuous_replication() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc = {
+ _id: "foo_cont_rep_doc",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name,
+ continuous: true
+ };
+
+ T(repDb.save(repDoc).ok);
+
+ waitForSeq(dbA, dbB);
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ // add another doc to source, it will be replicated to target
+ var docX = {
+ _id: "foo1000",
+ value: 1001
+ };
+
+ T(dbA.save(docX).ok);
+
+ waitForSeq(dbA, dbB);
+ var copy = dbB.open("foo1000");
+ T(copy !== null);
+ T(copy.value === 1001);
+
+ var repDoc1 = repDb.open(repDoc._id);
+ T(repDoc1 !== null);
+ T(repDoc1.source === repDoc.source);
+ T(repDoc1.target === repDoc.target);
+ T(repDoc1.state === "triggered");
+ T(typeof repDoc1.replication_id === "string");
+
+ // stop replication by deleting the replication document
+ T(repDb.deleteDoc(repDoc1).ok);
+
+ // add another doc to source, it will NOT be replicated to target
+ var docY = {
+ _id: "foo666",
+ value: 999
+ };
+
+ T(dbA.save(docY).ok);
+
+ wait(200); // is there a way to avoid wait here?
+ var copy = dbB.open("foo666");
+ T(copy === null);
+ }
+
+
+ function by_doc_ids_replication() {
+ // to test that we can replicate docs with slashes in their IDs
+ var docs2 = docs1.concat([
+ {
+ _id: "_design/mydesign",
+ language : "javascript"
+ }
+ ]);
+
+ populate_db(dbA, docs2);
+ populate_db(dbB, []);
+
+ var repDoc = {
+ _id: "foo_cont_rep_doc",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name,
+ doc_ids: ["foo666", "foo3", "_design/mydesign", "foo999", "foo1"]
+ };
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+ var copy = dbB.open("foo1");
+ T(copy !== null);
+ T(copy.value === 11);
+
+ copy = dbB.open("foo2");
+ T(copy === null);
+
+ copy = dbB.open("foo3");
+ T(copy !== null);
+ T(copy.value === 33);
+
+ copy = dbB.open("foo666");
+ T(copy === null);
+
+ copy = dbB.open("foo999");
+ T(copy === null);
+
+ copy = dbB.open("_design/mydesign");
+ T(copy !== null);
+ T(copy.language === "javascript");
+ }
+
+
+ function successive_identical_replications() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc1 = {
+ _id: "foo_ident_rep_1",
+ source: dbA.name,
+ target: dbB.name
+ };
+ T(repDb.save(repDoc1).ok);
+
+ waitForRep(repDb, repDoc1, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ var repDoc1_copy = repDb.open(repDoc1._id);
+ T(repDoc1_copy !== null);
+ T(repDoc1_copy.source === repDoc1.source);
+ T(repDoc1_copy.target === repDoc1.target);
+ T(repDoc1_copy.state === "completed");
+ T(typeof repDoc1_copy.replication_id === "string");
+
+ var newDoc = {
+ _id: "doc666",
+ value: 666
+ };
+ T(dbA.save(newDoc).ok);
+
+ wait(200);
+ var newDoc_copy = dbB.open(newDoc._id);
+ // not replicated because first replication is complete (not continuous)
+ T(newDoc_copy === null);
+
+ var repDoc2 = {
+ _id: "foo_ident_rep_2",
+ source: dbA.name,
+ target: dbB.name
+ };
+ T(repDb.save(repDoc2).ok);
+
+ waitForRep(repDb, repDoc2, "completed");
+ var newDoc_copy = dbB.open(newDoc._id);
+ T(newDoc_copy !== null);
+ T(newDoc_copy.value === newDoc.value);
+
+ var repDoc2_copy = repDb.open(repDoc2._id);
+ T(repDoc2_copy !== null);
+ T(repDoc2_copy.source === repDoc1.source);
+ T(repDoc2_copy.target === repDoc1.target);
+ T(repDoc2_copy.state === "completed");
+ T(typeof repDoc2_copy.replication_id === "string");
+ T(repDoc2_copy.replication_id === repDoc1_copy.replication_id);
+ }
+
+
+ // test the case where multiple replication docs (different IDs)
+ // describe in fact the same replication (source, target, etc)
+ function identical_rep_docs() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc1 = {
+ _id: "foo_dup_rep_doc_1",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name
+ };
+ var repDoc2 = {
+ _id: "foo_dup_rep_doc_2",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name
+ };
+
+ T(repDb.save(repDoc1).ok);
+ T(repDb.save(repDoc2).ok);
+
+ waitForRep(repDb, repDoc1, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ repDoc1 = repDb.open("foo_dup_rep_doc_1");
+ T(repDoc1 !== null);
+ T(repDoc1.state === "completed", "identical");
+ T(typeof repDoc1.replication_id === "string");
+
+ repDoc2 = repDb.open("foo_dup_rep_doc_2");
+ T(repDoc2 !== null);
+ T(typeof repDoc2.state === "undefined");
+ T(repDoc2.replication_id === repDoc1.replication_id);
+ }
+
+
+ // test the case where multiple replication docs (different IDs)
+ // describe in fact the same continuous replication (source, target, etc)
+ function identical_continuous_rep_docs() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc1 = {
+ _id: "foo_dup_cont_rep_doc_1",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name,
+ continuous: true
+ };
+ var repDoc2 = {
+ _id: "foo_dup_cont_rep_doc_2",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name,
+ continuous: true
+ };
+
+ T(repDb.save(repDoc1).ok);
+ T(repDb.save(repDoc2).ok);
+
+ waitForSeq(dbA, dbB);
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ repDoc1 = repDb.open("foo_dup_cont_rep_doc_1");
+ T(repDoc1 !== null);
+ T(repDoc1.state === "triggered");
+ T(typeof repDoc1.replication_id === "string");
+
+ repDoc2 = repDb.open("foo_dup_cont_rep_doc_2");
+ T(repDoc2 !== null);
+ T(typeof repDoc2.state === "undefined");
+ T(repDoc2.replication_id === repDoc1.replication_id);
+
+ var newDoc = {
+ _id: "foo666",
+ value: 999
+ };
+ T(dbA.save(newDoc).ok);
+
+ waitForSeq(dbA, dbB);
+ var copy = dbB.open("foo666");
+ T(copy !== null);
+ T(copy.value === 999);
+
+ // deleting second replication doc, doesn't affect the 1st one and
+ // neither it stops the replication
+ T(repDb.deleteDoc(repDoc2).ok);
+ repDoc1 = repDb.open("foo_dup_cont_rep_doc_1");
+ T(repDoc1 !== null);
+ T(repDoc1.state === "triggered");
+
+ var newDoc2 = {
+ _id: "foo5000",
+ value: 5000
+ };
+ T(dbA.save(newDoc2).ok);
+
+ waitForSeq(dbA, dbB);
+ var copy = dbB.open("foo5000");
+ T(copy !== null);
+ T(copy.value === 5000);
+
+ // deleting the 1st replication document stops the replication
+ T(repDb.deleteDoc(repDoc1).ok);
+ var newDoc3 = {
+ _id: "foo1983",
+ value: 1983
+ };
+ T(dbA.save(newDoc3).ok);
+
+ wait(wait_rep_doc); //how to remove wait?
+ var copy = dbB.open("foo1983");
+ T(copy === null);
+ }
+
+
+ function rep_db_write_authorization() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var server_admins_config = [
+ {
+ section: "admins",
+ key: "fdmanana",
+ value: "qwerty"
+ }
+ ];
+
+ run_on_modified_server(server_admins_config, function() {
+ var repDoc = {
+ _id: "foo_rep_doc",
+ source: dbA.name,
+ target: dbB.name
+ };
+
+ try {
+ repDb.save(repDoc);
+ T(false && "Should have thrown an exception");
+ } catch (x) {
+ T(x["error"] === "forbidden");
+ }
+
+ T(CouchDB.login("fdmanana", "qwerty").ok);
+ T(CouchDB.session().userCtx.name === "fdmanana");
+ T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1);
+
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ repDoc = repDb.open("foo_rep_doc");
+ T(repDoc !== null);
+
+ repDoc.target = "test_suite_foo_db";
+ repDoc.create_target = true;
+
+ // Only the replicator can update replication documents.
+ // Admins can only add and delete replication documents.
+ try {
+ repDb.save(repDoc);
+ T(false && "Should have thrown an exception");
+ } catch (x) {
+ T(x["error"] === "forbidden");
+ }
+ });
+ }
+
+
+ function test_replication_credentials_delegation() {
+ populate_db(usersDb, []);
+
+ var joeUserDoc = CouchDB.prepareUserDoc({
+ name: "joe",
+ roles: ["god", "erlanger"]
+ }, "erly");
+ T(usersDb.save(joeUserDoc).ok);
+
+ var ddoc = {
+ _id: "_design/beer",
+ language: "javascript"
+ };
+ populate_db(dbA, docs1.concat([ddoc]));
+ populate_db(dbB, []);
+
+ T(dbB.setSecObj({
+ admins: {
+ names: [],
+ roles: ["god"]
+ }
+ }).ok);
+
+ var server_admins_config = [
+ {
+ section: "admins",
+ key: "fdmanana",
+ value: "qwerty"
+ }
+ ];
+
+ run_on_modified_server(server_admins_config, function() {
+
+ T(CouchDB.login("fdmanana", "qwerty").ok);
+ T(CouchDB.session().userCtx.name === "fdmanana");
+ T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1);
+
+ var repDoc = {
+ _id: "foo_rep_del_doc_1",
+ source: dbA.name,
+ target: dbB.name,
+ user_ctx: {
+ name: "joe",
+ roles: ["erlanger"]
+ }
+ };
+
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ // design doc was not replicated, because joe is not an admin of db B
+ var doc = dbB.open(ddoc._id);
+ T(doc === null);
+
+ // now test the same replication but putting the role "god" in the
+ // delegation user context property
+ var repDoc2 = {
+ _id: "foo_rep_del_doc_2",
+ source: dbA.name,
+ target: dbB.name,
+ user_ctx: {
+ name: "joe",
+ roles: ["erlanger", "god"]
+ }
+ };
+ T(repDb.save(repDoc2).ok);
+
+ waitForRep(repDb, repDoc2, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ // because anyone with a 'god' role is an admin of db B, a replication
+ // that is delegated to a 'god' role can write design docs to db B
+ doc = dbB.open(ddoc._id);
+ T(doc !== null);
+ T(doc.language === ddoc.language);
+ });
+ }
+
+
+ function continuous_replication_survives_restart() {
+ var origRepDbName = CouchDB.request(
+ "GET", "/_config/replicator/db").responseText;
+
+ repDb.deleteDb();
+
+ var xhr = CouchDB.request("PUT", "/_config/replicator/db", {
+ body : JSON.stringify(repDb.name),
+ headers: {"X-Couch-Persist": "false"}
+ });
+ T(xhr.status === 200);
+
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc = {
+ _id: "foo_cont_rep_survives_doc",
+ source: "http://" + host + "/" + dbA.name,
+ target: dbB.name,
+ continuous: true
+ };
+
+ T(repDb.save(repDoc).ok);
+
+ waitForSeq(dbA, dbB);
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ repDb.ensureFullCommit();
+ dbA.ensureFullCommit();
+
+ restartServer();
+
+ xhr = CouchDB.request("PUT", "/_config/replicator/db", {
+ body : JSON.stringify(repDb.name),
+ headers: {"X-Couch-Persist": "false"}
+ });
+
+ T(xhr.status === 200);
+
+ // add another doc to source, it will be replicated to target
+ var docX = {
+ _id: "foo1000",
+ value: 1001
+ };
+
+ T(dbA.save(docX).ok);
+
+ waitForSeq(dbA, dbB);
+ var copy = dbB.open("foo1000");
+ T(copy !== null);
+ T(copy.value === 1001);
+
+ repDoc = repDb.open("foo_cont_rep_survives_doc");
+ T(repDoc !== null);
+ T(repDoc.continuous === true);
+
+ // stop replication
+ T(repDb.deleteDoc(repDoc).ok);
+
+ xhr = CouchDB.request("PUT", "/_config/replicator/db", {
+ body : origRepDbName,
+ headers: {"X-Couch-Persist": "false"}
+ });
+ T(xhr.status === 200);
+ }
+
+
+ function error_state_replication() {
+ populate_db(dbA, docs1);
+
+ var repDoc = {
+ _id: "foo_error_rep",
+ source: dbA.name,
+ target: "nonexistent_test_db"
+ };
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "error");
+ var repDoc1 = repDb.open(repDoc._id);
+ T(repDoc1 !== null);
+ T(repDoc1.state === "error");
+ T(typeof repDoc1.replication_id === "string");
+ }
+
+
+ // run all the tests
+ var server_config = [
+ {
+ section: "replicator",
+ key: "db",
+ value: repDb.name
+ }
+ ];
+
+ repDb.deleteDb();
+ run_on_modified_server(server_config, simple_replication);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, filtered_replication);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, continuous_replication);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, by_doc_ids_replication);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, successive_identical_replications);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, identical_rep_docs);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, identical_continuous_rep_docs);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, rep_db_write_authorization);
+
+ var server_config_2 = server_config.concat([
+ {
+ section: "couch_httpd_auth",
+ key: "authentication_db",
+ value: usersDb.name
+ }
+ ]);
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config_2, test_replication_credentials_delegation);
+
+ repDb.deleteDb();
+ restartServer();
+ continuous_replication_survives_restart();
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, error_state_replication);
+
+
+ // cleanup
+ repDb.deleteDb();
+ usersDb.deleteDb();
+ dbA.deleteDb();
+ dbB.deleteDb();
+};
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 308a3837..219f7d82 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -66,6 +66,7 @@ source_files = \
couch_rep_reader.erl \
couch_rep_sup.erl \
couch_rep_writer.erl \
+ couch_rep_db_listener.erl \
couch_server.erl \
couch_server_sup.erl \
couch_stats_aggregator.erl \
@@ -124,6 +125,7 @@ compiled_files = \
couch_rep_reader.beam \
couch_rep_sup.beam \
couch_rep_writer.beam \
+ couch_rep_db_listener.beam \
couch_server.beam \
couch_server_sup.beam \
couch_stats_aggregator.beam \
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index 580148c8..098fac84 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -165,7 +165,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
TimeoutFun) ->
#changes_args{
feed = ResponseType,
- limit = Limit
+ limit = Limit,
+ db_open_options = DbOptions
} = Args,
% ?LOG_INFO("send_changes start ~p",[StartSeq]),
{ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
@@ -179,7 +180,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
case wait_db_updated(Timeout, TimeoutFun) of
updated ->
% ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
- case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of
+ DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+ case couch_db:open(Db#db.name, DbOptions1) of
{ok, Db2} ->
keep_sending_changes(
Args#changes_args{limit=NewLimit},
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index a35745ef..51fb25e2 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -289,6 +289,7 @@
heartbeat,
timeout,
filter = "",
- include_docs = false
+ include_docs = false,
+ db_open_options = []
}).
diff --git a/src/couchdb/couch_js_functions.hrl b/src/couchdb/couch_js_functions.hrl
index 1f314f6e..f850dd4c 100644
--- a/src/couchdb/couch_js_functions.hrl
+++ b/src/couchdb/couch_js_functions.hrl
@@ -95,3 +95,76 @@
}
}
">>).
+
+
+-define(REP_DB_DOC_VALIDATE_FUN, <<"
+ function(newDoc, oldDoc, userCtx) {
+ var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
+ var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
+
+ if (oldDoc && !newDoc._deleted && !isReplicator) {
+ throw({forbidden:
+ 'Only the replicator can edit replication documents. ' +
+ 'Admins can only add and delete replication documents.'
+ });
+ } else if (!isAdmin) {
+ throw({forbidden:
+ 'Only admins may add/delete replication documents.'
+ });
+ }
+
+ if (!oldDoc && newDoc.state) {
+ throw({forbidden:
+ 'The state field can only be set by the replicator.'
+ });
+ }
+
+ if (!oldDoc && newDoc.replication_id) {
+ throw({forbidden:
+ 'The replication_id field can only be set by the replicator.'
+ });
+ }
+
+ if (newDoc.user_ctx) {
+ var user_ctx = newDoc.user_ctx;
+
+ if (typeof user_ctx !== 'object') {
+ throw({forbidden: 'The user_ctx property must be an object.'});
+ }
+
+ if (!(user_ctx.name === null ||
+ (typeof user_ctx.name === 'undefined') ||
+ ((typeof user_ctx.name === 'string') &&
+ user_ctx.name.length > 0))) {
+ throw({forbidden:
+ 'The name property of the user_ctx must be a ' +
+ 'non-empty string.'
+ });
+ }
+
+ if ((typeof user_ctx.roles !== 'undefined') &&
+ (typeof user_ctx.roles.length !== 'number')) {
+ throw({forbidden:
+ 'The roles property of the user_ctx must be ' +
+ 'an array of strings.'
+ });
+ }
+
+ if (user_ctx.roles) {
+ for (var i = 0; i < user_ctx.roles.length; i++) {
+ var role = user_ctx.roles[i];
+
+ if (typeof role !== 'string' || role.length === 0) {
+ throw({forbidden: 'Roles must be non-empty strings.'});
+ }
+ if (role[0] === '_') {
+ throw({forbidden:
+ 'System roles (starting with underscore) ' +
+ 'are not allowed.'
+ });
+ }
+ }
+ }
+ }
+ }
+">>).
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index d3db8a68..57b30088 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,8 +16,12 @@
code_change/3]).
-export([replicate/2, checkpoint/1]).
+-export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([start_replication/3, end_replication/1, get_result/4]).
+-export([update_rep_doc/2]).
-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
-define(REP_ID_VERSION, 2).
@@ -48,7 +52,8 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil
+ doc_ids = nil,
+ rep_doc = nil
}).
%% convenience function to do a simple replication from the shell
@@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
%% function handling POST to _replicate
replicate({Props}=PostBody, UserCtx) ->
- BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
- Extension = maybe_append_options(
- [<<"continuous">>, <<"create_target">>], Props),
- Replicator = {BaseId ++ Extension,
- {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
- temporary,
- 1,
- worker,
- [?MODULE]
- },
-
+ RepId = make_replication_id(PostBody, UserCtx),
case couch_util:get_value(<<"cancel">>, Props, false) of
true ->
- case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of
- {error, not_found} ->
- {error, not_found};
- ok ->
- ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension),
- {ok, {cancelled, ?l2b(BaseId)}}
- end;
+ end_replication(RepId);
false ->
- Server = start_replication_server(Replicator),
+ Server = start_replication(PostBody, RepId, UserCtx),
+ get_result(Server, RepId, PostBody, UserCtx)
+ end.
- case couch_util:get_value(<<"continuous">>, Props, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId)}};
- false ->
- get_result(Server, PostBody, UserCtx)
- end
+end_replication({BaseId, Extension}) ->
+ RepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, RepId) of
+ {error, not_found} = R ->
+ R;
+ ok ->
+ ok = supervisor:delete_child(couch_rep_sup, RepId),
+ {ok, {cancelled, ?l2b(BaseId)}}
end.
+start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+ Replicator = {
+ BaseId ++ Extension,
+ {gen_server, start_link,
+ [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+ temporary,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ start_replication_server(Replicator).
+
checkpoint(Server) ->
gen_server:cast(Server, do_checkpoint).
-get_result(Server, PostBody, UserCtx) ->
- try gen_server:call(Server, get_result, infinity) of
- retry -> replicate(PostBody, UserCtx);
- Else -> Else
- catch
- exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
- %% oops, this replication just finished -- restart it.
- replicate(PostBody, UserCtx);
- exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
- %% we made the call during terminate
- replicate(PostBody, UserCtx)
+get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
+ try gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(PostBody, UserCtx);
+ Else -> Else
+ catch
+ exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% oops, this replication just finished -- restart it.
+ replicate(PostBody, UserCtx);
+ exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% we made the call during terminate
+ replicate(PostBody, UserCtx)
+ end
end.
init(InitArgs) ->
try do_init(InitArgs)
catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end.
-do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
+do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
+ maybe_set_triggered(RepDoc, RepId),
+
case DocIds of
List when is_list(List) ->
% Fast replication using only a list of doc IDs to replicate.
@@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds
+ doc_ids = DocIds,
+ rep_doc = RepDoc
},
{ok, State}.
@@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
- do_terminate(State);
+ do_terminate(State),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
terminate(normal, State) ->
timer:cancel(State#state.checkpoint_scheduled),
- do_terminate(do_checkpoint(State));
+ do_terminate(do_checkpoint(State)),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
-terminate(Reason, State) ->
- #state{
- listeners = Listeners,
- source = Source,
- target = Target,
- stats = Stats
- } = State,
+terminate(shutdown, #state{listeners = Listeners} = State) ->
+ % continuous replication stopped
+ [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
+ do_forced_terminate(State);
+
+terminate(Reason, #state{listeners = Listeners} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
- ets:delete(Stats),
- close_db(Target),
- close_db(Source).
+ do_forced_terminate(State),
+ update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% internal funs
+do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
+ ets:delete(Stats),
+ close_db(Target),
+ close_db(Source).
+
start_replication_server(Replicator) ->
RepId = element(1, Replicator),
case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Rest]) ->
has_session_id(SessionId, Rest)
end.
-maybe_append_options(Options, Props) ->
+maybe_append_options(Options, {Props}) ->
lists:foldl(fun(Option, Acc) ->
Acc ++
case couch_util:get_value(Option, Props, false) of
@@ -460,6 +478,12 @@ maybe_append_options(Options, Props) ->
end
end, [], Options).
+make_replication_id(RepProps, UserCtx) ->
+ BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], RepProps),
+ {BaseId, Extension}.
+
% Versioned clauses for generating replication ids
% If a change is made to how replications are identified
% add a new clause and increase ?REP_ID_VERSION at the top
@@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) ->
true ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+ case couch_util:get_value(<<"_id">>, Props) of
+ undefined ->
+ % replication triggered by POSTing to _replicate/
+ ok;
+ RepDocId ->
+ % replication triggered by adding a Rep Doc to the replicator DB
+ {ok, RepDb} = ensure_rep_db_exists(),
+ case couch_db:open_doc(RepDb, RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end,
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody,
+ KVs
+ ),
+ % might not succeed - when the replication doc is deleted right
+ % before this update (not an error)
+ couch_db:update_doc(
+ RepDb,
+ RepDoc#doc{body = {NewRepDocBody}},
+ []
+ ).
+
+maybe_set_triggered({RepProps} = RepDoc, RepId) ->
+ case couch_util:get_value(<<"state">>, RepProps) of
+ <<"triggered">> ->
+ ok;
+ _ ->
+ update_rep_doc(
+ RepDoc,
+ [
+ {<<"state">>, <<"triggered">>},
+ {<<"replication_id">>, ?l2b(RepId)}
+ ]
+ )
+ end.
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ Opts = [
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+ sys_db
+ ],
+ case couch_db:open(DbName, Opts) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Opts)
+ end,
+ ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end,
+ ok.
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
new file mode 100644
index 00000000..bc407693
--- /dev/null
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -0,0 +1,232 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_db_listener).
+-behaviour(gen_server).
+
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
+-define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
+
+-record(state, {
+ changes_feed_loop,
+ changes_queue,
+ changes_processor
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init(_) ->
+ process_flag(trap_exit, true),
+ {ok, Queue} = couch_work_queue:new(1024 * 1024, 1000),
+ {ok, Processor} = changes_processor(Queue),
+ {ok, Loop} = changes_feed_loop(Queue),
+ Server = self(),
+ ok = couch_config:register(
+ fun("replicator", "db") ->
+ ok = gen_server:call(Server, rep_db_changed, infinity)
+ end
+ ),
+ {ok, #state{
+ changes_feed_loop = Loop,
+ changes_queue = Queue,
+ changes_processor = Processor}
+ }.
+
+handle_call(rep_db_changed, _From, State) ->
+ #state{
+ changes_feed_loop = Loop,
+ changes_queue = Queue
+ } = State,
+ exit(Loop, rep_db_changed),
+ {ok, NewLoop} = changes_feed_loop(Queue),
+ {reply, ok, State#state{changes_feed_loop = NewLoop}}.
+
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+
+handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) ->
+ {noreply, State};
+
+handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
+ ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
+ {stop, rep_db_changes_processor_error, State}.
+
+
+terminate(_Reason, State) ->
+ #state{
+ changes_feed_loop = Loop,
+ changes_queue = Queue
+ } = State,
+ exit(Loop, stop),
+ % closing the queue will cause changes_processor to shutdown
+ couch_work_queue:close(Queue),
+ ok.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+changes_feed_loop(ChangesQueue) ->
+ {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+ Pid = spawn_link(
+ fun() ->
+ ChangesFeedFun = couch_changes:handle_changes(
+ #changes_args{
+ include_docs = true,
+ feed = "continuous",
+ timeout = infinity,
+ db_open_options = [sys_db]
+ },
+ {json_req, null},
+ RepDb
+ ),
+ ChangesFeedFun(
+ fun({change, Change, _}, _) ->
+ case has_valid_rep_id(Change) of
+ true ->
+ couch_work_queue:queue(ChangesQueue, Change);
+ false ->
+ ok
+ end;
+ (_, _) ->
+ ok
+ end
+ )
+ end
+ ),
+ couch_db:close(RepDb),
+ {ok, Pid}.
+
+
+changes_processor(ChangesQueue) ->
+ Pid = spawn_link(
+ fun() ->
+ ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
+ ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
+ consume_changes(ChangesQueue),
+ true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
+ true = ets:delete(?DOC_TO_REP_ID_MAP)
+ end
+ ),
+ {ok, Pid}.
+
+
+consume_changes(ChangesQueue) ->
+ case couch_work_queue:dequeue(ChangesQueue) of
+ closed ->
+ ok;
+ {ok, Changes} ->
+ lists:foreach(fun process_change/1, Changes),
+ consume_changes(ChangesQueue)
+ end.
+
+
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
+
+
+process_change({Change}) ->
+ {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
+ case couch_util:get_value(<<"deleted">>, Change, false) of
+ true ->
+ maybe_stop_replication(JsonRepDoc);
+ false ->
+ case couch_util:get_value(<<"state">>, RepProps) of
+ <<"completed">> ->
+ maybe_stop_replication(JsonRepDoc);
+ <<"error">> ->
+ % cleanup ets table entries
+ maybe_stop_replication(JsonRepDoc);
+ <<"triggered">> ->
+ maybe_start_replication(JsonRepDoc);
+ undefined ->
+ case couch_util:get_value(<<"replication_id">>, RepProps) of
+ undefined ->
+ maybe_start_replication(JsonRepDoc);
+ _ ->
+ ok
+ end
+ end
+ end,
+ ok.
+
+
+rep_user_ctx({RepDoc}) ->
+ case couch_util:get_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{roles = [<<"_admin">>]};
+ {UserCtx} ->
+ #user_ctx{
+ name = couch_util:get_value(<<"name">>, UserCtx, null),
+ roles = couch_util:get_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+
+maybe_start_replication({RepProps} = JsonRepDoc) ->
+ UserCtx = rep_user_ctx(JsonRepDoc),
+ RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ DocId = couch_util:get_value(<<"_id">>, RepProps),
+ case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of
+ [] ->
+ true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}),
+ true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
+ spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
+ [{RepId, DocId}] ->
+ ok;
+ [{RepId, _OtherDocId}] ->
+ couch_rep:update_rep_doc(
+ JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}]
+ )
+ end.
+
+
+start_replication(RepDoc, RepId, UserCtx) ->
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+ RepPid when is_pid(RepPid) ->
+ couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
+ Error ->
+ couch_rep:update_rep_doc(
+ RepDoc,
+ [
+ {<<"state">>, <<"error">>},
+ {<<"replication_id">>, ?l2b(element(1, RepId))}
+ ]
+ ),
+ ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error])
+ end.
+
+
+maybe_stop_replication({RepProps}) ->
+ DocId = couch_util:get_value(<<"_id">>, RepProps),
+ case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
+ [{DocId, RepId}] ->
+ couch_rep:end_replication(RepId),
+ true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId),
+ true = ets:delete(?DOC_TO_REP_ID_MAP, DocId);
+ [] ->
+ ok
+ end.
diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl
index 0f8f66f3..2d96d5f3 100644
--- a/src/couchdb/couch_server.erl
+++ b/src/couchdb/couch_server.erl
@@ -76,6 +76,7 @@ check_dbname(#server{dbname_regexp=RegExp}, DbName) ->
nomatch ->
case DbName of
"_users" -> ok;
+ "_replicator" -> ok;
_Else ->
{error, illegal_database_name}
end;