diff options
author | Filipe David Borba Manana <fdmanana@apache.org> | 2010-08-04 17:05:22 +0000 |
---|---|---|
committer | Filipe David Borba Manana <fdmanana@apache.org> | 2010-08-04 17:05:22 +0000 |
commit | 77962e9b1458e97aa8a534fe18f2eda1965cc8b1 (patch) | |
tree | 412882a3a596ca852f70219c920f67344925e091 | |
parent | 8446f0c3a69f7925d9104fd4487175c618b5a9dc (diff) |
Add replicator DB (_replicator).
Part of ticket COUCHDB-776.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@982330 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | etc/couchdb/default.ini.tpl.in | 4 | ||||
-rw-r--r-- | share/Makefile.am | 1 | ||||
-rw-r--r-- | share/www/script/couch_tests.js | 1 | ||||
-rw-r--r-- | share/www/script/test/replicator_db.js | 750 | ||||
-rw-r--r-- | src/couchdb/Makefile.am | 2 | ||||
-rw-r--r-- | src/couchdb/couch_changes.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 3 | ||||
-rw-r--r-- | src/couchdb/couch_js_functions.hrl | 73 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 204 | ||||
-rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 232 | ||||
-rw-r--r-- | src/couchdb/couch_server.erl | 1 |
11 files changed, 1221 insertions, 56 deletions
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in index 68350fc2..a890beb7 100644 --- a/etc/couchdb/default.ini.tpl.in +++ b/etc/couchdb/default.ini.tpl.in @@ -62,6 +62,7 @@ stats_aggregator={couch_stats_aggregator, start, []} stats_collector={couch_stats_collector, start, []} uuids={couch_uuids, start, []} auth_cache={couch_auth_cache, start_link, []} +rep_db_changes_listener={couch_rep_db_listener, start_link, []} [httpd_global_handlers] / = {couch_httpd_misc_handlers, handle_welcome_req, <<"Welcome">>} @@ -123,5 +124,6 @@ compression_level = 8 ; from 1 (lowest, fastest) to 9 (highest, slowest), 0 to d compressible_types = text/*, application/javascript, application/json, application/xml [replicator] +db = _replicator max_http_sessions = 10 -max_http_pipeline_size = 10
\ No newline at end of file +max_http_pipeline_size = 10 diff --git a/share/Makefile.am b/share/Makefile.am index 77d5f8d8..52de3d2b 100644 --- a/share/Makefile.am +++ b/share/Makefile.am @@ -152,6 +152,7 @@ nobase_dist_localdata_DATA = \ www/script/test/reduce_false.js \ www/script/test/reduce_false_temp.js \ www/script/test/replication.js \ + www/script/test/replicator_db.js \ www/script/test/rev_stemming.js \ www/script/test/rewrite.js \ www/script/test/security_validation.js \ diff --git a/share/www/script/couch_tests.js b/share/www/script/couch_tests.js index 5723eece..c048521c 100644 --- a/share/www/script/couch_tests.js +++ b/share/www/script/couch_tests.js @@ -75,6 +75,7 @@ loadTest("reduce_builtin.js"); loadTest("reduce_false.js"); loadTest("reduce_false_temp.js"); loadTest("replication.js"); +loadTest("replicator_db.js"); loadTest("rev_stemming.js"); loadTest("rewrite.js"); loadTest("security_validation.js"); diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js new file mode 100644 index 00000000..edeb1cc5 --- /dev/null +++ b/share/www/script/test/replicator_db.js @@ -0,0 +1,750 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +couchTests.replicator_db = function(debug) { + + if (debug) debugger; + + var wait_rep_doc = 500; // number of millisecs to wait after saving a Rep Doc + var host = CouchDB.host; + var dbA = new CouchDB("test_suite_rep_db_a", {"X-Couch-Full-Commit":"false"}); + var dbB = new CouchDB("test_suite_rep_db_b", {"X-Couch-Full-Commit":"false"}); + var repDb = new CouchDB("test_suite_rep_db", {"X-Couch-Full-Commit":"false"}); + var usersDb = new CouchDB("test_suite_auth", {"X-Couch-Full-Commit":"false"}); + + var docs1 = [ + { + _id: "foo1", + value: 11 + }, + { + _id: "foo2", + value: 22 + }, + { + _id: "foo3", + value: 33 + } + ]; + + function waitForRep(repDb, repDoc, state) { + var newRep, + t0 = new Date(), + t1, + ms = 1000; + + do { + newRep = repDb.open(repDoc._id); + t1 = new Date(); + } while (((t1 - t0) <= ms) && newRep.state !== state); + } + + function waitForSeq(sourceDb, targetDb) { + var targetSeq, + sourceSeq = sourceDb.info().update_seq, + t0 = new Date(), + t1, + ms = 1000; + + do { + targetSeq = targetDb.info().update_seq; + t1 = new Date(); + } while (((t1 - t0) <= ms) && targetSeq < sourceSeq); + } + + function wait(ms) { + var t0 = new Date(), t1; + do { + CouchDB.request("GET", "/"); + t1 = new Date(); + } while ((t1 - t0) <= ms); + } + + + function populate_db(db, docs) { + db.deleteDb(); + db.createDb(); + for (var i = 0; i < docs.length; i++) { + var d = docs[i]; + delete d._rev; + T(db.save(d).ok); + } + } + + function simple_replication() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc = { + _id: "foo_simple_rep", + source: dbA.name, + target: dbB.name + }; + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + var repDoc1 = repDb.open(repDoc._id); + T(repDoc1 !== null); + T(repDoc1.source === repDoc.source); + T(repDoc1.target === repDoc.target); + T(repDoc1.state === "completed", "simple"); + T(typeof repDoc1.replication_id === "string"); + } + + + function filtered_replication() { + var docs2 = docs1.concat([ + { + _id: "_design/mydesign", + language : "javascript", + filters : { + myfilter : (function(doc, req) { + return (doc.value % 2) !== Number(req.query.myparam); + }).toString() + } + } + ]); + + populate_db(dbA, docs2); + populate_db(dbB, []); + + var repDoc = { + _id: "foo_filt_rep_doc", + source: "http://" + host + "/" + dbA.name, + target: dbB.name, + filter: "mydesign/myfilter", + query_params: { + myparam: 1 + } + }; + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + for (var i = 0; i < docs2.length; i++) { + var doc = docs2[i]; + var copy = dbB.open(doc._id); + + if (typeof doc.value === "number") { + if ((doc.value % 2) !== 1) { + T(copy !== null); + T(copy.value === doc.value); + } else { + T(copy === null); + } + } + } + + var repDoc1 = repDb.open(repDoc._id); + T(repDoc1 !== null); + T(repDoc1.source === repDoc.source); + T(repDoc1.target === repDoc.target); + T(repDoc1.state === "completed", "filtered"); + T(typeof repDoc1.replication_id === "string"); + } + + + function continuous_replication() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc = { + _id: "foo_cont_rep_doc", + source: "http://" + host + "/" + dbA.name, + target: dbB.name, + continuous: true + }; + + T(repDb.save(repDoc).ok); + + waitForSeq(dbA, dbB); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + // add another doc to source, it will be replicated to target + var docX = { + _id: "foo1000", + value: 1001 + }; + + T(dbA.save(docX).ok); + + waitForSeq(dbA, dbB); + var copy = dbB.open("foo1000"); + T(copy !== null); + T(copy.value === 1001); + + var repDoc1 = repDb.open(repDoc._id); + T(repDoc1 !== null); + T(repDoc1.source === repDoc.source); + T(repDoc1.target === repDoc.target); + T(repDoc1.state === "triggered"); + T(typeof repDoc1.replication_id === "string"); + + // stop replication by deleting the replication document + T(repDb.deleteDoc(repDoc1).ok); + + // add another doc to source, it will NOT be replicated to target + var docY = { + _id: "foo666", + value: 999 + }; + + T(dbA.save(docY).ok); + + wait(200); // is there a way to avoid wait here? + var copy = dbB.open("foo666"); + T(copy === null); + } + + + function by_doc_ids_replication() { + // to test that we can replicate docs with slashes in their IDs + var docs2 = docs1.concat([ + { + _id: "_design/mydesign", + language : "javascript" + } + ]); + + populate_db(dbA, docs2); + populate_db(dbB, []); + + var repDoc = { + _id: "foo_cont_rep_doc", + source: "http://" + host + "/" + dbA.name, + target: dbB.name, + doc_ids: ["foo666", "foo3", "_design/mydesign", "foo999", "foo1"] + }; + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + var copy = dbB.open("foo1"); + T(copy !== null); + T(copy.value === 11); + + copy = dbB.open("foo2"); + T(copy === null); + + copy = dbB.open("foo3"); + T(copy !== null); + T(copy.value === 33); + + copy = dbB.open("foo666"); + T(copy === null); + + copy = dbB.open("foo999"); + T(copy === null); + + copy = dbB.open("_design/mydesign"); + T(copy !== null); + T(copy.language === "javascript"); + } + + + function successive_identical_replications() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc1 = { + _id: "foo_ident_rep_1", + source: dbA.name, + target: dbB.name + }; + T(repDb.save(repDoc1).ok); + + waitForRep(repDb, repDoc1, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + var repDoc1_copy = repDb.open(repDoc1._id); + T(repDoc1_copy !== null); + T(repDoc1_copy.source === repDoc1.source); + T(repDoc1_copy.target === repDoc1.target); + T(repDoc1_copy.state === "completed"); + T(typeof repDoc1_copy.replication_id === "string"); + + var newDoc = { + _id: "doc666", + value: 666 + }; + T(dbA.save(newDoc).ok); + + wait(200); + var newDoc_copy = dbB.open(newDoc._id); + // not replicated because first replication is complete (not continuous) + T(newDoc_copy === null); + + var repDoc2 = { + _id: "foo_ident_rep_2", + source: dbA.name, + target: dbB.name + }; + T(repDb.save(repDoc2).ok); + + waitForRep(repDb, repDoc2, "completed"); + var newDoc_copy = dbB.open(newDoc._id); + T(newDoc_copy !== null); + T(newDoc_copy.value === newDoc.value); + + var repDoc2_copy = repDb.open(repDoc2._id); + T(repDoc2_copy !== null); + T(repDoc2_copy.source === repDoc1.source); + T(repDoc2_copy.target === repDoc1.target); + T(repDoc2_copy.state === "completed"); + T(typeof repDoc2_copy.replication_id === "string"); + T(repDoc2_copy.replication_id === repDoc1_copy.replication_id); + } + + + // test the case where multiple replication docs (different IDs) + // describe in fact the same replication (source, target, etc) + function identical_rep_docs() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc1 = { + _id: "foo_dup_rep_doc_1", + source: "http://" + host + "/" + dbA.name, + target: dbB.name + }; + var repDoc2 = { + _id: "foo_dup_rep_doc_2", + source: "http://" + host + "/" + dbA.name, + target: dbB.name + }; + + T(repDb.save(repDoc1).ok); + T(repDb.save(repDoc2).ok); + + waitForRep(repDb, repDoc1, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + repDoc1 = repDb.open("foo_dup_rep_doc_1"); + T(repDoc1 !== null); + T(repDoc1.state === "completed", "identical"); + T(typeof repDoc1.replication_id === "string"); + + repDoc2 = repDb.open("foo_dup_rep_doc_2"); + T(repDoc2 !== null); + T(typeof repDoc2.state === "undefined"); + T(repDoc2.replication_id === repDoc1.replication_id); + } + + + // test the case where multiple replication docs (different IDs) + // describe in fact the same continuous replication (source, target, etc) + function identical_continuous_rep_docs() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc1 = { + _id: "foo_dup_cont_rep_doc_1", + source: "http://" + host + "/" + dbA.name, + target: dbB.name, + continuous: true + }; + var repDoc2 = { + _id: "foo_dup_cont_rep_doc_2", + source: "http://" + host + "/" + dbA.name, + target: dbB.name, + continuous: true + }; + + T(repDb.save(repDoc1).ok); + T(repDb.save(repDoc2).ok); + + waitForSeq(dbA, dbB); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + repDoc1 = repDb.open("foo_dup_cont_rep_doc_1"); + T(repDoc1 !== null); + T(repDoc1.state === "triggered"); + T(typeof repDoc1.replication_id === "string"); + + repDoc2 = repDb.open("foo_dup_cont_rep_doc_2"); + T(repDoc2 !== null); + T(typeof repDoc2.state === "undefined"); + T(repDoc2.replication_id === repDoc1.replication_id); + + var newDoc = { + _id: "foo666", + value: 999 + }; + T(dbA.save(newDoc).ok); + + waitForSeq(dbA, dbB); + var copy = dbB.open("foo666"); + T(copy !== null); + T(copy.value === 999); + + // deleting second replication doc, doesn't affect the 1st one and + // neither it stops the replication + T(repDb.deleteDoc(repDoc2).ok); + repDoc1 = repDb.open("foo_dup_cont_rep_doc_1"); + T(repDoc1 !== null); + T(repDoc1.state === "triggered"); + + var newDoc2 = { + _id: "foo5000", + value: 5000 + }; + T(dbA.save(newDoc2).ok); + + waitForSeq(dbA, dbB); + var copy = dbB.open("foo5000"); + T(copy !== null); + T(copy.value === 5000); + + // deleting the 1st replication document stops the replication + T(repDb.deleteDoc(repDoc1).ok); + var newDoc3 = { + _id: "foo1983", + value: 1983 + }; + T(dbA.save(newDoc3).ok); + + wait(wait_rep_doc); //how to remove wait? + var copy = dbB.open("foo1983"); + T(copy === null); + } + + + function rep_db_write_authorization() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var server_admins_config = [ + { + section: "admins", + key: "fdmanana", + value: "qwerty" + } + ]; + + run_on_modified_server(server_admins_config, function() { + var repDoc = { + _id: "foo_rep_doc", + source: dbA.name, + target: dbB.name + }; + + try { + repDb.save(repDoc); + T(false && "Should have thrown an exception"); + } catch (x) { + T(x["error"] === "forbidden"); + } + + T(CouchDB.login("fdmanana", "qwerty").ok); + T(CouchDB.session().userCtx.name === "fdmanana"); + T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1); + + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + repDoc = repDb.open("foo_rep_doc"); + T(repDoc !== null); + + repDoc.target = "test_suite_foo_db"; + repDoc.create_target = true; + + // Only the replicator can update replication documents. + // Admins can only add and delete replication documents. + try { + repDb.save(repDoc); + T(false && "Should have thrown an exception"); + } catch (x) { + T(x["error"] === "forbidden"); + } + }); + } + + + function test_replication_credentials_delegation() { + populate_db(usersDb, []); + + var joeUserDoc = CouchDB.prepareUserDoc({ + name: "joe", + roles: ["god", "erlanger"] + }, "erly"); + T(usersDb.save(joeUserDoc).ok); + + var ddoc = { + _id: "_design/beer", + language: "javascript" + }; + populate_db(dbA, docs1.concat([ddoc])); + populate_db(dbB, []); + + T(dbB.setSecObj({ + admins: { + names: [], + roles: ["god"] + } + }).ok); + + var server_admins_config = [ + { + section: "admins", + key: "fdmanana", + value: "qwerty" + } + ]; + + run_on_modified_server(server_admins_config, function() { + + T(CouchDB.login("fdmanana", "qwerty").ok); + T(CouchDB.session().userCtx.name === "fdmanana"); + T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1); + + var repDoc = { + _id: "foo_rep_del_doc_1", + source: dbA.name, + target: dbB.name, + user_ctx: { + name: "joe", + roles: ["erlanger"] + } + }; + + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + // design doc was not replicated, because joe is not an admin of db B + var doc = dbB.open(ddoc._id); + T(doc === null); + + // now test the same replication but putting the role "god" in the + // delegation user context property + var repDoc2 = { + _id: "foo_rep_del_doc_2", + source: dbA.name, + target: dbB.name, + user_ctx: { + name: "joe", + roles: ["erlanger", "god"] + } + }; + T(repDb.save(repDoc2).ok); + + waitForRep(repDb, repDoc2, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + // because anyone with a 'god' role is an admin of db B, a replication + // that is delegated to a 'god' role can write design docs to db B + doc = dbB.open(ddoc._id); + T(doc !== null); + T(doc.language === ddoc.language); + }); + } + + + function continuous_replication_survives_restart() { + var origRepDbName = CouchDB.request( + "GET", "/_config/replicator/db").responseText; + + repDb.deleteDb(); + + var xhr = CouchDB.request("PUT", "/_config/replicator/db", { + body : JSON.stringify(repDb.name), + headers: {"X-Couch-Persist": "false"} + }); + T(xhr.status === 200); + + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc = { + _id: "foo_cont_rep_survives_doc", + source: "http://" + host + "/" + dbA.name, + target: dbB.name, + continuous: true + }; + + T(repDb.save(repDoc).ok); + + waitForSeq(dbA, dbB); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + repDb.ensureFullCommit(); + dbA.ensureFullCommit(); + + restartServer(); + + xhr = CouchDB.request("PUT", "/_config/replicator/db", { + body : JSON.stringify(repDb.name), + headers: {"X-Couch-Persist": "false"} + }); + + T(xhr.status === 200); + + // add another doc to source, it will be replicated to target + var docX = { + _id: "foo1000", + value: 1001 + }; + + T(dbA.save(docX).ok); + + waitForSeq(dbA, dbB); + var copy = dbB.open("foo1000"); + T(copy !== null); + T(copy.value === 1001); + + repDoc = repDb.open("foo_cont_rep_survives_doc"); + T(repDoc !== null); + T(repDoc.continuous === true); + + // stop replication + T(repDb.deleteDoc(repDoc).ok); + + xhr = CouchDB.request("PUT", "/_config/replicator/db", { + body : origRepDbName, + headers: {"X-Couch-Persist": "false"} + }); + T(xhr.status === 200); + } + + + function error_state_replication() { + populate_db(dbA, docs1); + + var repDoc = { + _id: "foo_error_rep", + source: dbA.name, + target: "nonexistent_test_db" + }; + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "error"); + var repDoc1 = repDb.open(repDoc._id); + T(repDoc1 !== null); + T(repDoc1.state === "error"); + T(typeof repDoc1.replication_id === "string"); + } + + + // run all the tests + var server_config = [ + { + section: "replicator", + key: "db", + value: repDb.name + } + ]; + + repDb.deleteDb(); + run_on_modified_server(server_config, simple_replication); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, filtered_replication); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, continuous_replication); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, by_doc_ids_replication); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, successive_identical_replications); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, identical_rep_docs); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, identical_continuous_rep_docs); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, rep_db_write_authorization); + + var server_config_2 = server_config.concat([ + { + section: "couch_httpd_auth", + key: "authentication_db", + value: usersDb.name + } + ]); + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config_2, test_replication_credentials_delegation); + + repDb.deleteDb(); + restartServer(); + continuous_replication_survives_restart(); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, error_state_replication); + + + // cleanup + repDb.deleteDb(); + usersDb.deleteDb(); + dbA.deleteDb(); + dbB.deleteDb(); +}; diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 308a3837..219f7d82 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -66,6 +66,7 @@ source_files = \ couch_rep_reader.erl \ couch_rep_sup.erl \ couch_rep_writer.erl \ + couch_rep_db_listener.erl \ couch_server.erl \ couch_server_sup.erl \ couch_stats_aggregator.erl \ @@ -124,6 +125,7 @@ compiled_files = \ couch_rep_reader.beam \ couch_rep_sup.beam \ couch_rep_writer.beam \ + couch_rep_db_listener.beam \ couch_server.beam \ couch_server_sup.beam \ couch_stats_aggregator.beam \ diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index 580148c8..098fac84 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -165,7 +165,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ feed = ResponseType, - limit = Limit + limit = Limit, + db_open_options = DbOptions } = Args, % ?LOG_INFO("send_changes start ~p",[StartSeq]), {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( @@ -179,7 +180,8 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, case wait_db_updated(Timeout, TimeoutFun) of updated -> % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), - case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of + DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], + case couch_db:open(Db#db.name, DbOptions1) of {ok, Db2} -> keep_sending_changes( Args#changes_args{limit=NewLimit}, diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index a35745ef..51fb25e2 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -289,6 +289,7 @@ heartbeat, timeout, filter = "", - include_docs = false + include_docs = false, + db_open_options = [] }). diff --git a/src/couchdb/couch_js_functions.hrl b/src/couchdb/couch_js_functions.hrl index 1f314f6e..f850dd4c 100644 --- a/src/couchdb/couch_js_functions.hrl +++ b/src/couchdb/couch_js_functions.hrl @@ -95,3 +95,76 @@ } } ">>). + + +-define(REP_DB_DOC_VALIDATE_FUN, <<" + function(newDoc, oldDoc, userCtx) { + var isAdmin = (userCtx.roles.indexOf('_admin') >= 0); + var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); + + if (oldDoc && !newDoc._deleted && !isReplicator) { + throw({forbidden: + 'Only the replicator can edit replication documents. ' + + 'Admins can only add and delete replication documents.' + }); + } else if (!isAdmin) { + throw({forbidden: + 'Only admins may add/delete replication documents.' + }); + } + + if (!oldDoc && newDoc.state) { + throw({forbidden: + 'The state field can only be set by the replicator.' + }); + } + + if (!oldDoc && newDoc.replication_id) { + throw({forbidden: + 'The replication_id field can only be set by the replicator.' + }); + } + + if (newDoc.user_ctx) { + var user_ctx = newDoc.user_ctx; + + if (typeof user_ctx !== 'object') { + throw({forbidden: 'The user_ctx property must be an object.'}); + } + + if (!(user_ctx.name === null || + (typeof user_ctx.name === 'undefined') || + ((typeof user_ctx.name === 'string') && + user_ctx.name.length > 0))) { + throw({forbidden: + 'The name property of the user_ctx must be a ' + + 'non-empty string.' + }); + } + + if ((typeof user_ctx.roles !== 'undefined') && + (typeof user_ctx.roles.length !== 'number')) { + throw({forbidden: + 'The roles property of the user_ctx must be ' + + 'an array of strings.' + }); + } + + if (user_ctx.roles) { + for (var i = 0; i < user_ctx.roles.length; i++) { + var role = user_ctx.roles[i]; + + if (typeof role !== 'string' || role.length === 0) { + throw({forbidden: 'Roles must be non-empty strings.'}); + } + if (role[0] === '_') { + throw({forbidden: + 'System roles (starting with underscore) ' + + 'are not allowed.' + }); + } + } + } + } + } +">>). diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index d3db8a68..57b30088 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,8 +16,12 @@ code_change/3]). -export([replicate/2, checkpoint/1]). +-export([ensure_rep_db_exists/0, make_replication_id/2]). +-export([start_replication/3, end_replication/1, get_result/4]). +-export([update_rep_doc/2]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). -define(REP_ID_VERSION, 2). @@ -48,7 +52,8 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + rep_doc = nil }). %% convenience function to do a simple replication from the shell @@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION), - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, - temporary, - 1, - worker, - [?MODULE] - }, - + RepId = make_replication_id(PostBody, UserCtx), case couch_util:get_value(<<"cancel">>, Props, false) of true -> - case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of - {error, not_found} -> - {error, not_found}; - ok -> - ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), - {ok, {cancelled, ?l2b(BaseId)}} - end; + end_replication(RepId); false -> - Server = start_replication_server(Replicator), + Server = start_replication(PostBody, RepId, UserCtx), + get_result(Server, RepId, PostBody, UserCtx) + end. - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> - {ok, {continuous, ?l2b(BaseId)}}; - false -> - get_result(Server, PostBody, UserCtx) - end +end_replication({BaseId, Extension}) -> + RepId = BaseId ++ Extension, + case supervisor:terminate_child(couch_rep_sup, RepId) of + {error, not_found} = R -> + R; + ok -> + ok = supervisor:delete_child(couch_rep_sup, RepId), + {ok, {cancelled, ?l2b(BaseId)}} end. +start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> + Replicator = { + BaseId ++ Extension, + {gen_server, start_link, + [?MODULE, [BaseId, RepDoc, UserCtx], []]}, + temporary, + 1, + worker, + [?MODULE] + }, + start_replication_server(Replicator). + checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). -get_result(Server, PostBody, UserCtx) -> - try gen_server:call(Server, get_result, infinity) of - retry -> replicate(PostBody, UserCtx); - Else -> Else - catch - exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> - %% oops, this replication just finished -- restart it. - replicate(PostBody, UserCtx); - exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> - %% we made the call during terminate - replicate(PostBody, UserCtx) +get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result, infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) + end end. init(InitArgs) -> try do_init(InitArgs) catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. -do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), @@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), + maybe_set_triggered(RepDoc, RepId), + case DocIds of List when is_list(List) -> % Fast replication using only a list of doc IDs to replicate. @@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + rep_doc = RepDoc }, {ok, State}. @@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. terminate(normal, #state{checkpoint_scheduled=nil} = State) -> - do_terminate(State); + do_terminate(State), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]); terminate(normal, State) -> timer:cancel(State#state.checkpoint_scheduled), - do_terminate(do_checkpoint(State)); + do_terminate(do_checkpoint(State)), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]); -terminate(Reason, State) -> - #state{ - listeners = Listeners, - source = Source, - target = Target, - stats = Stats - } = State, +terminate(shutdown, #state{listeners = Listeners} = State) -> + % continuous replication stopped + [gen_server:reply(L, {ok, stopped}) || L <- Listeners], + do_forced_terminate(State); + +terminate(Reason, #state{listeners = Listeners} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), - close_db(Target), - close_db(Source). + do_forced_terminate(State), + update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. % internal funs +do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) -> + ets:delete(Stats), + close_db(Target), + close_db(Source). + start_replication_server(Replicator) -> RepId = element(1, Replicator), case supervisor:start_child(couch_rep_sup, Replicator) of @@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Rest]) -> has_session_id(SessionId, Rest) end. -maybe_append_options(Options, Props) -> +maybe_append_options(Options, {Props}) -> lists:foldl(fun(Option, Acc) -> Acc ++ case couch_util:get_value(Option, Props, false) of @@ -460,6 +478,12 @@ maybe_append_options(Options, Props) -> end end, [], Options). +make_replication_id(RepProps, UserCtx) -> + BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], RepProps), + {BaseId, Extension}. + % Versioned clauses for generating replication ids % If a change is made to how replications are identified % add a new clause and increase ?REP_ID_VERSION at the top @@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +update_rep_doc({Props} = _RepDoc, KVs) -> + case couch_util:get_value(<<"_id">>, Props) of + undefined -> + % replication triggered by POSTing to _replicate/ + ok; + RepDocId -> + % replication triggered by adding a Rep Doc to the replicator DB + {ok, RepDb} = ensure_rep_db_exists(), + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end, + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, + KVs + ), + % might not succeed - when the replication doc is deleted right + % before this update (not an error) + couch_db:update_doc( + RepDb, + RepDoc#doc{body = {NewRepDocBody}}, + [] + ). + +maybe_set_triggered({RepProps} = RepDoc, RepId) -> + case couch_util:get_value(<<"state">>, RepProps) of + <<"triggered">> -> + ok; + _ -> + update_rep_doc( + RepDoc, + [ + {<<"state">>, <<"triggered">>}, + {<<"replication_id">>, ?l2b(RepId)} + ] + ) + end. + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl new file mode 100644 index 00000000..bc407693 --- /dev/null +++ b/src/couchdb/couch_rep_db_listener.erl @@ -0,0 +1,232 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_db_listener). +-behaviour(gen_server). + +-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). + +-include("couch_db.hrl"). + +-define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id). +-define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). + +-record(state, { + changes_feed_loop, + changes_queue, + changes_processor +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + process_flag(trap_exit, true), + {ok, Queue} = couch_work_queue:new(1024 * 1024, 1000), + {ok, Processor} = changes_processor(Queue), + {ok, Loop} = changes_feed_loop(Queue), + Server = self(), + ok = couch_config:register( + fun("replicator", "db") -> + ok = gen_server:call(Server, rep_db_changed, infinity) + end + ), + {ok, #state{ + changes_feed_loop = Loop, + changes_queue = Queue, + changes_processor = Processor} + }. + +handle_call(rep_db_changed, _From, State) -> + #state{ + changes_feed_loop = Loop, + changes_queue = Queue + } = State, + exit(Loop, rep_db_changed), + {ok, NewLoop} = changes_feed_loop(Queue), + {reply, ok, State#state{changes_feed_loop = NewLoop}}. + + +handle_cast(_Msg, State) -> + {noreply, State}. + + +handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) -> + {noreply, State}; + +handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) -> + ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]), + {stop, rep_db_changes_processor_error, State}. + + +terminate(_Reason, State) -> + #state{ + changes_feed_loop = Loop, + changes_queue = Queue + } = State, + exit(Loop, stop), + % closing the queue will cause changes_processor to shutdown + couch_work_queue:close(Queue), + ok. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +changes_feed_loop(ChangesQueue) -> + {ok, RepDb} = couch_rep:ensure_rep_db_exists(), + Pid = spawn_link( + fun() -> + ChangesFeedFun = couch_changes:handle_changes( + #changes_args{ + include_docs = true, + feed = "continuous", + timeout = infinity, + db_open_options = [sys_db] + }, + {json_req, null}, + RepDb + ), + ChangesFeedFun( + fun({change, Change, _}, _) -> + case has_valid_rep_id(Change) of + true -> + couch_work_queue:queue(ChangesQueue, Change); + false -> + ok + end; + (_, _) -> + ok + end + ) + end + ), + couch_db:close(RepDb), + {ok, Pid}. + + +changes_processor(ChangesQueue) -> + Pid = spawn_link( + fun() -> + ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]), + ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]), + consume_changes(ChangesQueue), + true = ets:delete(?REP_ID_TO_DOC_ID_MAP), + true = ets:delete(?DOC_TO_REP_ID_MAP) + end + ), + {ok, Pid}. + + +consume_changes(ChangesQueue) -> + case couch_work_queue:dequeue(ChangesQueue) of + closed -> + ok; + {ok, Changes} -> + lists:foreach(fun process_change/1, Changes), + consume_changes(ChangesQueue) + end. + + +has_valid_rep_id({Change}) -> + has_valid_rep_id(couch_util:get_value(<<"id">>, Change)); +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> + false; +has_valid_rep_id(_Else) -> + true. + + +process_change({Change}) -> + {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), + case couch_util:get_value(<<"deleted">>, Change, false) of + true -> + maybe_stop_replication(JsonRepDoc); + false -> + case couch_util:get_value(<<"state">>, RepProps) of + <<"completed">> -> + maybe_stop_replication(JsonRepDoc); + <<"error">> -> + % cleanup ets table entries + maybe_stop_replication(JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(JsonRepDoc); + undefined -> + case couch_util:get_value(<<"replication_id">>, RepProps) of + undefined -> + maybe_start_replication(JsonRepDoc); + _ -> + ok + end + end + end, + ok. + + +rep_user_ctx({RepDoc}) -> + case couch_util:get_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{roles = [<<"_admin">>]}; + {UserCtx} -> + #user_ctx{ + name = couch_util:get_value(<<"name">>, UserCtx, null), + roles = couch_util:get_value(<<"roles">>, UserCtx, []) + } + end. + + +maybe_start_replication({RepProps} = JsonRepDoc) -> + UserCtx = rep_user_ctx(JsonRepDoc), + RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), + DocId = couch_util:get_value(<<"_id">>, RepProps), + case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of + [] -> + true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}), + true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), + spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end); + [{RepId, DocId}] -> + ok; + [{RepId, _OtherDocId}] -> + couch_rep:update_rep_doc( + JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}] + ) + end. + + +start_replication(RepDoc, RepId, UserCtx) -> + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of + RepPid when is_pid(RepPid) -> + couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx); + Error -> + couch_rep:update_rep_doc( + RepDoc, + [ + {<<"state">>, <<"error">>}, + {<<"replication_id">>, ?l2b(element(1, RepId))} + ] + ), + ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error]) + end. + + +maybe_stop_replication({RepProps}) -> + DocId = couch_util:get_value(<<"_id">>, RepProps), + case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of + [{DocId, RepId}] -> + couch_rep:end_replication(RepId), + true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId), + true = ets:delete(?DOC_TO_REP_ID_MAP, DocId); + [] -> + ok + end. diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index 0f8f66f3..2d96d5f3 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -76,6 +76,7 @@ check_dbname(#server{dbname_regexp=RegExp}, DbName) -> nomatch -> case DbName of "_users" -> ok; + "_replicator" -> ok; _Else -> {error, illegal_database_name} end; |