From 1b07ac052dd87d5dd255ebc328e9b8e66fac21c5 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Tue, 23 Nov 2010 11:35:39 +0000 Subject: Merged revision 1038067 from trunk: Replicator DB changes: - Added back the restriction that only the replicator can edit replication documents - this avoids lots of potential race conditions and confusion; - Added more tests; - More accurate log messages; - Don't ignore always replication documents already tagged with a replication_id property - this is necessary when replicating a replicator DB from one server to another server. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1038068 13f79535-47bb-0310-9956-ffa450edef68 --- share/www/script/test/replicator_db.js | 91 ++++++++++++++++++++++++++++++++++ src/couchdb/couch_js_functions.hrl | 16 +++--- src/couchdb/couch_rep.erl | 2 +- src/couchdb/couch_rep_db_listener.erl | 76 +++++++++++++++++----------- 4 files changed, 150 insertions(+), 35 deletions(-) diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js index 2e558f3e..00168695 100644 --- a/share/www/script/test/replicator_db.js +++ b/share/www/script/test/replicator_db.js @@ -633,6 +633,89 @@ couchTests.replicator_db = function(debug) { } + function rep_db_write_authorization() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var server_admins_config = [ + { + section: "admins", + key: "fdmanana", + value: "qwerty" + } + ]; + + run_on_modified_server(server_admins_config, function() { + var repDoc = { + _id: "foo_rep_doc", + source: dbA.name, + target: dbB.name + }; + + T(CouchDB.login("fdmanana", "qwerty").ok); + T(CouchDB.session().userCtx.name === "fdmanana"); + T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1); + + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + + T(copy !== null); + T(copy.value === doc.value); + } + + repDoc = repDb.open("foo_rep_doc"); + T(repDoc !== null); + repDoc.target = "test_suite_foo_db"; + repDoc.create_target = true; + + // Only the replicator can update replication documents. + // Admins can only add and delete replication documents. + try { + repDb.save(repDoc); + T(false && "Should have thrown an exception"); + } catch (x) { + T(x["error"] === "forbidden"); + } + }); + } + + + function rep_doc_with_bad_rep_id() { + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc = { + _id: "foo_rep", + source: dbA.name, + target: dbB.name, + replication_id: "1234abc" + }; + T(repDb.save(repDoc).ok); + + waitForRep(repDb, repDoc, "completed"); + for (var i = 0; i < docs1.length; i++) { + var doc = docs1[i]; + var copy = dbB.open(doc._id); + T(copy !== null); + T(copy.value === doc.value); + } + + var repDoc1 = repDb.open(repDoc._id); + T(repDoc1 !== null); + T(repDoc1.source === repDoc.source); + T(repDoc1.target === repDoc.target); + T(repDoc1.state === "completed", + "replication document with bad replication id failed"); + T(typeof repDoc1.replication_id === "string"); + T(repDoc1.replication_id !== "1234abc"); + } + + function error_state_replication() { populate_db(dbA, docs1); @@ -687,6 +770,14 @@ couchTests.replicator_db = function(debug) { restartServer(); run_on_modified_server(server_config, identical_continuous_rep_docs); + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, rep_db_write_authorization); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, rep_doc_with_bad_rep_id); + var server_config_2 = server_config.concat([ { section: "couch_httpd_auth", diff --git a/src/couchdb/couch_js_functions.hrl b/src/couchdb/couch_js_functions.hrl index a2a40b3c..67f06686 100644 --- a/src/couchdb/couch_js_functions.hrl +++ b/src/couchdb/couch_js_functions.hrl @@ -99,14 +99,18 @@ -define(REP_DB_DOC_VALIDATE_FUN, <<" function(newDoc, oldDoc, userCtx) { - if (newDoc.user_ctx) { + function reportError(error_msg) { + log('Error writing document `' + newDoc._id + + '` to replicator DB: ' + error_msg); + throw({forbidden: error_msg}); + } - function reportError(error_msg) { - log('Error writing document ' + newDoc._id + - ' to replicator DB: ' + error_msg); - throw({forbidden: error_msg}); - } + var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); + if (oldDoc && !newDoc._deleted && !isReplicator) { + reportError('Only the replicator can edit replication documents.'); + } + if (newDoc.user_ctx) { var user_ctx = newDoc.user_ctx; if (typeof user_ctx !== 'object') { diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index b5918abc..8eaa99ee 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -882,7 +882,7 @@ maybe_set_triggered({RepProps} = RepDoc, RepId) -> ensure_rep_db_exists() -> DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), Opts = [ - {user_ctx, #user_ctx{roles=[<<"_admin">>]}}, + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, sys_db ], case couch_db:open(DbName, Opts) of diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index e21aafd3..5a5ab164 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -151,25 +151,23 @@ has_valid_rep_id(_Else) -> process_change({Change}) -> {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), + DocId = couch_util:get_value(<<"_id">>, RepProps), case couch_util:get_value(<<"deleted">>, Change, false) of true -> - maybe_stop_replication(JsonRepDoc); + rep_doc_deleted(DocId); false -> case couch_util:get_value(<<"state">>, RepProps) of <<"completed">> -> - maybe_stop_replication(JsonRepDoc); + replication_complete(DocId); <<"error">> -> - % cleanup ets table entries - maybe_stop_replication(JsonRepDoc); + stop_replication(DocId); <<"triggered">> -> - maybe_start_replication(JsonRepDoc); + maybe_start_replication(DocId, JsonRepDoc); undefined -> - case couch_util:get_value(<<"replication_id">>, RepProps) of - undefined -> - maybe_start_replication(JsonRepDoc); - _ -> - ok - end + maybe_start_replication(DocId, JsonRepDoc); + _ -> + ?LOG_ERROR("Invalid value for the `state` property of the " + "replication document `~s`", [DocId]) end end, ok. @@ -187,26 +185,33 @@ rep_user_ctx({RepDoc}) -> end. -maybe_start_replication({RepProps} = JsonRepDoc) -> +maybe_start_replication(DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), - RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), - DocId = couch_util:get_value(<<"_id">>, RepProps), - case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of + {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), + case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of [] -> - true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}), + true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}), true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end); - [{RepId, DocId}] -> + [{BaseId, DocId}] -> + ok; + [{BaseId, OtherDocId}] -> + maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId) + end. + + +maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) -> + case couch_util:get_value(<<"replication_id">>, Props) of + RepId -> ok; - [{RepId, OtherDocId}] -> + _ -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), - couch_rep:update_rep_doc( - JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}] - ) + couch_rep:update_rep_doc(JsonRepDoc, [{<<"replication_id">>, RepId}]) end. + start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of RepPid when is_pid(RepPid) -> @@ -224,16 +229,31 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error]) end. +rep_doc_deleted(DocId) -> + case stop_replication(DocId) of + {ok, {Base, Ext}} -> + ?LOG_INFO("Stopped replication `~s` because replication document `~s`" + " was deleted", [Base ++ Ext, DocId]); + none -> + ok + end. -maybe_stop_replication({RepProps}) -> - DocId = couch_util:get_value(<<"_id">>, RepProps), +replication_complete(DocId) -> + case stop_replication(DocId) of + {ok, {Base, Ext}} -> + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [Base ++ Ext, DocId]); + none -> + ok + end. + +stop_replication(DocId) -> case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of - [{DocId, {Base, Ext} = RepId}] -> + [{DocId, {BaseId, _} = RepId}] -> couch_rep:end_replication(RepId), - true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId), + true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId), true = ets:delete(?DOC_TO_REP_ID_MAP, DocId), - ?LOG_INFO("Stopped replication `~s` because replication document `~s`" - " was deleted", [Base ++ Ext, DocId]); + {ok, RepId}; [] -> - ok + none end. -- cgit v1.2.3