diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 77 |
1 files changed, 58 insertions, 19 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index b741f86f..4f6fb673 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -19,6 +19,8 @@ -include("couch_db.hrl"). +-define(REP_ID_VERSION, 2). + -record(state, { changes_feed, missing_revs, @@ -59,7 +61,9 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> - {BaseId, Extension} = make_replication_id(PostBody, UserCtx), + BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], Props), Replicator = {BaseId ++ Extension, {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, temporary, @@ -144,8 +148,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> _ -> % Replication using the _changes API (DB sequence update numbers). - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), + + [SourceLog, TargetLog] = find_replication_logs( + [Source, Target], RepId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), @@ -451,13 +456,23 @@ maybe_append_options(Options, Props) -> end end, [], Options). -make_replication_id({Props}, UserCtx) -> - %% funky algorithm to preserve backwards compatibility +% Versioned clauses for generating replication ids +% If a change is made to how replications are identified +% add a new clause and increase ?REP_ID_VERSION at the top +make_replication_id({Props}, UserCtx, 2) -> + {ok, HostName} = inet:gethostname(), + Port = mochiweb_socket_server:get(couch_httpd, port), + Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), + maybe_append_filters({Props}, [HostName, Port, Src, Tgt]); +make_replication_id({Props}, UserCtx, 1) -> {ok, HostName} = inet:gethostname(), - % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), - Base = [HostName, Src, Tgt] ++ + maybe_append_filters({Props}, [HostName, Src, Tgt]). + +maybe_append_filters({Props}, Base) -> + Base2 = Base ++ case couch_util:get_value(<<"filter">>, Props) of undefined -> case couch_util:get_value(<<"doc_ids">>, Props) of @@ -469,9 +484,7 @@ make_replication_id({Props}, UserCtx) -> Filter -> [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] end, - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. + couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). @@ -493,26 +506,52 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> get_rep_endpoint(UserCtx, <<DbName/binary>>) -> {local, DbName, UserCtx}. -open_replication_log(#http_db{}=Db, RepId) -> - DocId = ?LOCAL_DOC_PREFIX ++ RepId, - Req = Db#http_db{resource=couch_util:url_encode(DocId)}, +find_replication_logs(Logs, RepId, {Props}, UserCtx) -> + LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + fold_replication_logs(Logs, ?REP_ID_VERSION, + LogId, LogId, {Props}, UserCtx, []). + +% Accumulate the replication logs +% Falls back to older log document ids and migrates them +fold_replication_logs([], _Vsn, _LogId, _NewId, {_Props}, _UserCtx, Acc) -> + lists:reverse(Acc); +fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId, + {Props}, UserCtx, Acc) -> + case open_replication_log(Db, LogId) of + {error, not_found} when Vsn > 1 -> + OldRepId = make_replication_id({Props}, UserCtx, Vsn - 1), + fold_replication_logs(Dbs, Vsn - 1, + ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, {Props}, UserCtx, Acc); + {error, not_found} -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [#doc{id=NewId}|Acc]); + {ok, Doc} when LogId =:= NewId -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [Doc|Acc]); + {ok, Doc} -> + MigratedLog = #doc{id=NewId,body=Doc#doc.body}, + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + {Props}, UserCtx, [MigratedLog|Acc]) + end. + +open_replication_log(#http_db{}=Db, DocId) -> + Req = Db#http_db{resource=couch_util:url_encode(?b2l(DocId))}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), - #doc{id=?l2b(DocId)}; + {error, not_found}; Doc -> ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), - couch_doc:from_json_obj(Doc) + {ok, couch_doc:from_json_obj(Doc)} end; -open_replication_log(Db, RepId) -> - DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), +open_replication_log(Db, DocId) -> case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), - Doc; + {ok, Doc}; _ -> ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), - #doc{id=DocId} + {error, not_found} end. open_db(Props, UserCtx) -> |