From 4c780ad5eb81e0e4c66b45fd70fbb623c3a19be6 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Mon, 19 Jul 2010 00:10:11 +0000 Subject: COUCHDB-810: Adds port to replication checkpoints. New replication checkpoints now include the port number, which allows for efficient replication between multiple couchdb instances running on the same machine. Old replication checkpoints are recognized (Full replication is not induced) and they are automatically migrated to the new checkpoint format. Thanks to Randall Leeds for the patch. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@965331 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep.erl | 77 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index b741f86f..4f6fb673 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -19,6 +19,8 @@ -include("couch_db.hrl"). +-define(REP_ID_VERSION, 2). + -record(state, { changes_feed, missing_revs, @@ -59,7 +61,9 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - {BaseId, Extension} = make_replication_id(PostBody, UserCtx), + BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], Props), Replicator = {BaseId ++ Extension, {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, temporary, @@ -144,8 +148,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> _ -> % Replication using the _changes API (DB sequence update numbers). - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), + + [SourceLog, TargetLog] = find_replication_logs( + [Source, Target], RepId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), @@ -451,13 +456,23 @@ maybe_append_options(Options, Props) -> end end, [], Options). -make_replication_id({Props}, UserCtx) -> - %% funky algorithm to preserve backwards compatibility +% Versioned clauses for generating replication ids +% If a change is made to how replications are identified +% add a new clause and increase ?REP_ID_VERSION at the top +make_replication_id({Props}, UserCtx, 2) -> + {ok, HostName} = inet:gethostname(), + Port = mochiweb_socket_server:get(couch_httpd, port), + Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), + maybe_append_filters({Props}, [HostName, Port, Src, Tgt]); +make_replication_id({Props}, UserCtx, 1) -> {ok, HostName} = inet:gethostname(), - % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), - Base = [HostName, Src, Tgt] ++ + maybe_append_filters({Props}, [HostName, Src, Tgt]). + +maybe_append_filters({Props}, Base) -> + Base2 = Base ++ case couch_util:get_value(<<"filter">>, Props) of undefined -> case couch_util:get_value(<<"doc_ids">>, Props) of @@ -469,9 +484,7 @@ make_replication_id({Props}, UserCtx) -> Filter -> [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] end, - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. + couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). @@ -493,26 +506,52 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> get_rep_endpoint(UserCtx, <>) -> {local, DbName, UserCtx}. -open_replication_log(#http_db{}=Db, RepId) -> - DocId = ?LOCAL_DOC_PREFIX ++ RepId, - Req = Db#http_db{resource=couch_util:url_encode(DocId)}, +find_replication_logs(Logs, RepId, {Props}, UserCtx) -> + LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + fold_replication_logs(Logs, ?REP_ID_VERSION, + LogId, LogId, {Props}, UserCtx, []). + +% Accumulate the replication logs +% Falls back to older log document ids and migrates them +fold_replication_logs([], _Vsn, _LogId, _NewId, {_Props}, _UserCtx, Acc) -> + lists:reverse(Acc); +fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId, + {Props}, UserCtx, Acc) -> + case open_replication_log(Db, LogId) of + {error, not_found} when Vsn > 1 -> + OldRepId = make_replication_id({Props}, UserCtx, Vsn - 1), + fold_replication_logs(Dbs, Vsn - 1, + ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, {Props}, UserCtx, Acc); + {error, not_found} -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [#doc{id=NewId}|Acc]); + {ok, Doc} when LogId =:= NewId -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [Doc|Acc]); + {ok, Doc} -> + MigratedLog = #doc{id=NewId,body=Doc#doc.body}, + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [MigratedLog|Acc]) + end. + +open_replication_log(#http_db{}=Db, DocId) -> + Req = Db#http_db{resource=couch_util:url_encode(?b2l(DocId))}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), - #doc{id=?l2b(DocId)}; + {error, not_found}; Doc -> ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), - couch_doc:from_json_obj(Doc) + {ok, couch_doc:from_json_obj(Doc)} end; -open_replication_log(Db, RepId) -> - DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), +open_replication_log(Db, DocId) -> case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), - Doc; + {ok, Doc}; _ -> ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), - #doc{id=DocId} + {error, not_found} end. open_db(Props, UserCtx) -> -- cgit v1.2.3