diff options
author | Robert Newson <rnewson@apache.org> | 2010-07-19 00:10:11 +0000 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2010-07-19 00:10:11 +0000 |
commit | 4c780ad5eb81e0e4c66b45fd70fbb623c3a19be6 (patch) | |
tree | 5c21b98a2cc25bf6613d59e545b3dcf2b7026c11 | |
parent | 66ab269ccf9945a72f7108f07454eca14060b632 (diff) |
COUCHDB-810: Adds port to replication checkpoints.
New replication checkpoints now include the port number, which allows for efficient replication between multiple couchdb instances running on the same machine.
Old replication checkpoints are recognized (Full replication is not induced) and they are automatically migrated to the new checkpoint format.
Thanks to Randall Leeds for the patch.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@965331 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | src/couchdb/couch_rep.erl | 77 |
1 files changed, 58 insertions, 19 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index b741f86f..4f6fb673 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -19,6 +19,8 @@ -include("couch_db.hrl"). +-define(REP_ID_VERSION, 2). + -record(state, { changes_feed, missing_revs, @@ -59,7 +61,9 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - {BaseId, Extension} = make_replication_id(PostBody, UserCtx), + BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], Props), Replicator = {BaseId ++ Extension, {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, temporary, @@ -144,8 +148,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> _ -> % Replication using the _changes API (DB sequence update numbers). - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), + + [SourceLog, TargetLog] = find_replication_logs( + [Source, Target], RepId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), @@ -451,13 +456,23 @@ maybe_append_options(Options, Props) -> end end, [], Options). -make_replication_id({Props}, UserCtx) -> - %% funky algorithm to preserve backwards compatibility +% Versioned clauses for generating replication ids +% If a change is made to how replications are identified +% add a new clause and increase ?REP_ID_VERSION at the top +make_replication_id({Props}, UserCtx, 2) -> + {ok, HostName} = inet:gethostname(), + Port = mochiweb_socket_server:get(couch_httpd, port), + Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), + maybe_append_filters({Props}, [HostName, Port, Src, Tgt]); +make_replication_id({Props}, UserCtx, 1) -> {ok, HostName} = inet:gethostname(), - % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), - Base = [HostName, Src, Tgt] ++ + maybe_append_filters({Props}, [HostName, Src, Tgt]). + +maybe_append_filters({Props}, Base) -> + Base2 = Base ++ case couch_util:get_value(<<"filter">>, Props) of undefined -> case couch_util:get_value(<<"doc_ids">>, Props) of @@ -469,9 +484,7 @@ make_replication_id({Props}, UserCtx) -> Filter -> [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] end, - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. + couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). @@ -493,26 +506,52 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> get_rep_endpoint(UserCtx, <<DbName/binary>>) -> {local, DbName, UserCtx}. -open_replication_log(#http_db{}=Db, RepId) -> - DocId = ?LOCAL_DOC_PREFIX ++ RepId, - Req = Db#http_db{resource=couch_util:url_encode(DocId)}, +find_replication_logs(Logs, RepId, {Props}, UserCtx) -> + LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + fold_replication_logs(Logs, ?REP_ID_VERSION, + LogId, LogId, {Props}, UserCtx, []). + +% Accumulate the replication logs +% Falls back to older log document ids and migrates them +fold_replication_logs([], _Vsn, _LogId, _NewId, {_Props}, _UserCtx, Acc) -> + lists:reverse(Acc); +fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId, + {Props}, UserCtx, Acc) -> + case open_replication_log(Db, LogId) of + {error, not_found} when Vsn > 1 -> + OldRepId = make_replication_id({Props}, UserCtx, Vsn - 1), + fold_replication_logs(Dbs, Vsn - 1, + ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, {Props}, UserCtx, Acc); + {error, not_found} -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [#doc{id=NewId}|Acc]); + {ok, Doc} when LogId =:= NewId -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [Doc|Acc]); + {ok, Doc} -> + MigratedLog = #doc{id=NewId,body=Doc#doc.body}, + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [MigratedLog|Acc]) + end. + +open_replication_log(#http_db{}=Db, DocId) -> + Req = Db#http_db{resource=couch_util:url_encode(?b2l(DocId))}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), - #doc{id=?l2b(DocId)}; + {error, not_found}; Doc -> ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), - couch_doc:from_json_obj(Doc) + {ok, couch_doc:from_json_obj(Doc)} end; -open_replication_log(Db, RepId) -> - DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), +open_replication_log(Db, DocId) -> case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), - Doc; + {ok, Doc}; _ -> ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), - #doc{id=DocId} + {error, not_found} end. open_db(Props, UserCtx) -> |