summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--share/www/script/test/replicator_db.js91
-rw-r--r--src/couchdb/couch_js_functions.hrl16
-rw-r--r--src/couchdb/couch_rep.erl2
-rw-r--r--src/couchdb/couch_rep_db_listener.erl76
4 files changed, 150 insertions, 35 deletions
diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js
index 2e558f3e..00168695 100644
--- a/share/www/script/test/replicator_db.js
+++ b/share/www/script/test/replicator_db.js
@@ -633,6 +633,89 @@ couchTests.replicator_db = function(debug) {
}
+ function rep_db_write_authorization() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var server_admins_config = [
+ {
+ section: "admins",
+ key: "fdmanana",
+ value: "qwerty"
+ }
+ ];
+
+ run_on_modified_server(server_admins_config, function() {
+ var repDoc = {
+ _id: "foo_rep_doc",
+ source: dbA.name,
+ target: dbB.name
+ };
+
+ T(CouchDB.login("fdmanana", "qwerty").ok);
+ T(CouchDB.session().userCtx.name === "fdmanana");
+ T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1);
+
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ repDoc = repDb.open("foo_rep_doc");
+ T(repDoc !== null);
+ repDoc.target = "test_suite_foo_db";
+ repDoc.create_target = true;
+
+ // Only the replicator can update replication documents.
+ // Admins can only add and delete replication documents.
+ try {
+ repDb.save(repDoc);
+ T(false && "Should have thrown an exception");
+ } catch (x) {
+ T(x["error"] === "forbidden");
+ }
+ });
+ }
+
+
+ function rep_doc_with_bad_rep_id() {
+ populate_db(dbA, docs1);
+ populate_db(dbB, []);
+
+ var repDoc = {
+ _id: "foo_rep",
+ source: dbA.name,
+ target: dbB.name,
+ replication_id: "1234abc"
+ };
+ T(repDb.save(repDoc).ok);
+
+ waitForRep(repDb, repDoc, "completed");
+ for (var i = 0; i < docs1.length; i++) {
+ var doc = docs1[i];
+ var copy = dbB.open(doc._id);
+ T(copy !== null);
+ T(copy.value === doc.value);
+ }
+
+ var repDoc1 = repDb.open(repDoc._id);
+ T(repDoc1 !== null);
+ T(repDoc1.source === repDoc.source);
+ T(repDoc1.target === repDoc.target);
+ T(repDoc1.state === "completed",
+ "replication document with bad replication id failed");
+ T(typeof repDoc1.replication_id === "string");
+ T(repDoc1.replication_id !== "1234abc");
+ }
+
+
function error_state_replication() {
populate_db(dbA, docs1);
@@ -687,6 +770,14 @@ couchTests.replicator_db = function(debug) {
restartServer();
run_on_modified_server(server_config, identical_continuous_rep_docs);
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, rep_db_write_authorization);
+
+ repDb.deleteDb();
+ restartServer();
+ run_on_modified_server(server_config, rep_doc_with_bad_rep_id);
+
var server_config_2 = server_config.concat([
{
section: "couch_httpd_auth",
diff --git a/src/couchdb/couch_js_functions.hrl b/src/couchdb/couch_js_functions.hrl
index a2a40b3c..67f06686 100644
--- a/src/couchdb/couch_js_functions.hrl
+++ b/src/couchdb/couch_js_functions.hrl
@@ -99,14 +99,18 @@
-define(REP_DB_DOC_VALIDATE_FUN, <<"
function(newDoc, oldDoc, userCtx) {
- if (newDoc.user_ctx) {
+ function reportError(error_msg) {
+ log('Error writing document `' + newDoc._id +
+ '` to replicator DB: ' + error_msg);
+ throw({forbidden: error_msg});
+ }
- function reportError(error_msg) {
- log('Error writing document ' + newDoc._id +
- ' to replicator DB: ' + error_msg);
- throw({forbidden: error_msg});
- }
+ var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
+ if (oldDoc && !newDoc._deleted && !isReplicator) {
+ reportError('Only the replicator can edit replication documents.');
+ }
+ if (newDoc.user_ctx) {
var user_ctx = newDoc.user_ctx;
if (typeof user_ctx !== 'object') {
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index b5918abc..8eaa99ee 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -882,7 +882,7 @@ maybe_set_triggered({RepProps} = RepDoc, RepId) ->
ensure_rep_db_exists() ->
DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
Opts = [
- {user_ctx, #user_ctx{roles=[<<"_admin">>]}},
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
sys_db
],
case couch_db:open(DbName, Opts) of
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index e21aafd3..5a5ab164 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -151,25 +151,23 @@ has_valid_rep_id(_Else) ->
process_change({Change}) ->
{RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
+ DocId = couch_util:get_value(<<"_id">>, RepProps),
case couch_util:get_value(<<"deleted">>, Change, false) of
true ->
- maybe_stop_replication(JsonRepDoc);
+ rep_doc_deleted(DocId);
false ->
case couch_util:get_value(<<"state">>, RepProps) of
<<"completed">> ->
- maybe_stop_replication(JsonRepDoc);
+ replication_complete(DocId);
<<"error">> ->
- % cleanup ets table entries
- maybe_stop_replication(JsonRepDoc);
+ stop_replication(DocId);
<<"triggered">> ->
- maybe_start_replication(JsonRepDoc);
+ maybe_start_replication(DocId, JsonRepDoc);
undefined ->
- case couch_util:get_value(<<"replication_id">>, RepProps) of
- undefined ->
- maybe_start_replication(JsonRepDoc);
- _ ->
- ok
- end
+ maybe_start_replication(DocId, JsonRepDoc);
+ _ ->
+ ?LOG_ERROR("Invalid value for the `state` property of the "
+ "replication document `~s`", [DocId])
end
end,
ok.
@@ -187,26 +185,33 @@ rep_user_ctx({RepDoc}) ->
end.
-maybe_start_replication({RepProps} = JsonRepDoc) ->
+maybe_start_replication(DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
- RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
- DocId = couch_util:get_value(<<"_id">>, RepProps),
- case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of
+ {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of
[] ->
- true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}),
+ true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}),
true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
- [{RepId, DocId}] ->
+ [{BaseId, DocId}] ->
+ ok;
+ [{BaseId, OtherDocId}] ->
+ maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId)
+ end.
+
+
+maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) ->
+ case couch_util:get_value(<<"replication_id">>, Props) of
+ RepId ->
ok;
- [{RepId, OtherDocId}] ->
+ _ ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
- couch_rep:update_rep_doc(
- JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}]
- )
+ couch_rep:update_rep_doc(JsonRepDoc, [{<<"replication_id">>, RepId}])
end.
+
start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
RepPid when is_pid(RepPid) ->
@@ -224,16 +229,31 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error])
end.
+rep_doc_deleted(DocId) ->
+ case stop_replication(DocId) of
+ {ok, {Base, Ext}} ->
+ ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
+ " was deleted", [Base ++ Ext, DocId]);
+ none ->
+ ok
+ end.
-maybe_stop_replication({RepProps}) ->
- DocId = couch_util:get_value(<<"_id">>, RepProps),
+replication_complete(DocId) ->
+ case stop_replication(DocId) of
+ {ok, {Base, Ext}} ->
+ ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+ [Base ++ Ext, DocId]);
+ none ->
+ ok
+ end.
+
+stop_replication(DocId) ->
case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
- [{DocId, {Base, Ext} = RepId}] ->
+ [{DocId, {BaseId, _} = RepId}] ->
couch_rep:end_replication(RepId),
- true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId),
+ true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId),
true = ets:delete(?DOC_TO_REP_ID_MAP, DocId),
- ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
- " was deleted", [Base ++ Ext, DocId]);
+ {ok, RepId};
[] ->
- ok
+ none
end.