From 1b07ac052dd87d5dd255ebc328e9b8e66fac21c5 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Tue, 23 Nov 2010 11:35:39 +0000 Subject: Merged revision 1038067 from trunk: Replicator DB changes: - Added back the restriction that only the replicator can edit replication documents - this avoids lots of potential race conditions and confusion; - Added more tests; - More accurate log messages; - Don't ignore always replication documents already tagged with a replication_id property - this is necessary when replicating a replicator DB from one server to another server. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1038068 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_db_listener.erl | 76 ++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 28 deletions(-) (limited to 'src/couchdb/couch_rep_db_listener.erl') diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index e21aafd3..5a5ab164 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -151,25 +151,23 @@ has_valid_rep_id(_Else) -> process_change({Change}) -> {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), + DocId = couch_util:get_value(<<"_id">>, RepProps), case couch_util:get_value(<<"deleted">>, Change, false) of true -> - maybe_stop_replication(JsonRepDoc); + rep_doc_deleted(DocId); false -> case couch_util:get_value(<<"state">>, RepProps) of <<"completed">> -> - maybe_stop_replication(JsonRepDoc); + replication_complete(DocId); <<"error">> -> - % cleanup ets table entries - maybe_stop_replication(JsonRepDoc); + stop_replication(DocId); <<"triggered">> -> - maybe_start_replication(JsonRepDoc); + maybe_start_replication(DocId, JsonRepDoc); undefined -> - case couch_util:get_value(<<"replication_id">>, RepProps) of - undefined -> - maybe_start_replication(JsonRepDoc); - _ -> - ok - end + maybe_start_replication(DocId, JsonRepDoc); + _ -> + ?LOG_ERROR("Invalid value for the `state` property of the " + "replication document `~s`", [DocId]) end end, ok. @@ -187,26 +185,33 @@ rep_user_ctx({RepDoc}) -> end. -maybe_start_replication({RepProps} = JsonRepDoc) -> +maybe_start_replication(DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), - RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), - DocId = couch_util:get_value(<<"_id">>, RepProps), - case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of + {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), + case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of [] -> - true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}), + true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}), true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end); - [{RepId, DocId}] -> + [{BaseId, DocId}] -> + ok; + [{BaseId, OtherDocId}] -> + maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId) + end. + + +maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) -> + case couch_util:get_value(<<"replication_id">>, Props) of + RepId -> ok; - [{RepId, OtherDocId}] -> + _ -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), - couch_rep:update_rep_doc( - JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}] - ) + couch_rep:update_rep_doc(JsonRepDoc, [{<<"replication_id">>, RepId}]) end. + start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of RepPid when is_pid(RepPid) -> @@ -224,16 +229,31 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error]) end. +rep_doc_deleted(DocId) -> + case stop_replication(DocId) of + {ok, {Base, Ext}} -> + ?LOG_INFO("Stopped replication `~s` because replication document `~s`" + " was deleted", [Base ++ Ext, DocId]); + none -> + ok + end. -maybe_stop_replication({RepProps}) -> - DocId = couch_util:get_value(<<"_id">>, RepProps), +replication_complete(DocId) -> + case stop_replication(DocId) of + {ok, {Base, Ext}} -> + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [Base ++ Ext, DocId]); + none -> + ok + end. + +stop_replication(DocId) -> case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of - [{DocId, {Base, Ext} = RepId}] -> + [{DocId, {BaseId, _} = RepId}] -> couch_rep:end_replication(RepId), - true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId), + true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId), true = ets:delete(?DOC_TO_REP_ID_MAP, DocId), - ?LOG_INFO("Stopped replication `~s` because replication document `~s`" - " was deleted", [Base ++ Ext, DocId]); + {ok, RepId}; [] -> - ok + none end. -- cgit v1.2.3