summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl77
1 files changed, 58 insertions, 19 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index b741f86f..4f6fb673 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -19,6 +19,8 @@
-include("couch_db.hrl").
+-define(REP_ID_VERSION, 2).
+
-record(state, {
changes_feed,
missing_revs,
@@ -59,7 +61,9 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
%% function handling POST to _replicate
replicate({Props}=PostBody, UserCtx) ->
- {BaseId, Extension} = make_replication_id(PostBody, UserCtx),
+ BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], Props),
Replicator = {BaseId ++ Extension,
{gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
temporary,
@@ -144,8 +148,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
_ ->
% Replication using the _changes API (DB sequence update numbers).
- SourceLog = open_replication_log(Source, RepId),
- TargetLog = open_replication_log(Target, RepId),
+
+ [SourceLog, TargetLog] = find_replication_logs(
+ [Source, Target], RepId, {PostProps}, UserCtx),
{StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
@@ -451,13 +456,23 @@ maybe_append_options(Options, Props) ->
end
end, [], Options).
-make_replication_id({Props}, UserCtx) ->
- %% funky algorithm to preserve backwards compatibility
+% Versioned clauses for generating replication ids
+% If a change is made to how replications are identified
+% add a new clause and increase ?REP_ID_VERSION at the top
+make_replication_id({Props}, UserCtx, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = mochiweb_socket_server:get(couch_httpd, port),
+ Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
+ maybe_append_filters({Props}, [HostName, Port, Src, Tgt]);
+make_replication_id({Props}, UserCtx, 1) ->
{ok, HostName} = inet:gethostname(),
- % Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
- Base = [HostName, Src, Tgt] ++
+ maybe_append_filters({Props}, [HostName, Src, Tgt]).
+
+maybe_append_filters({Props}, Base) ->
+ Base2 = Base ++
case couch_util:get_value(<<"filter">>, Props) of
undefined ->
case couch_util:get_value(<<"doc_ids">>, Props) of
@@ -469,9 +484,7 @@ make_replication_id({Props}, UserCtx) ->
Filter ->
[Filter, couch_util:get_value(<<"query_params">>, Props, {[]})]
end,
- Extension = maybe_append_options(
- [<<"continuous">>, <<"create_target">>], Props),
- {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+ couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
@@ -493,26 +506,52 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
{local, DbName, UserCtx}.
-open_replication_log(#http_db{}=Db, RepId) ->
- DocId = ?LOCAL_DOC_PREFIX ++ RepId,
- Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+find_replication_logs(Logs, RepId, {Props}, UserCtx) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ fold_replication_logs(Logs, ?REP_ID_VERSION,
+ LogId, LogId, {Props}, UserCtx, []).
+
+% Accumulate the replication logs
+% Falls back to older log document ids and migrates them
+fold_replication_logs([], _Vsn, _LogId, _NewId, {_Props}, _UserCtx, Acc) ->
+ lists:reverse(Acc);
+fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
+ {Props}, UserCtx, Acc) ->
+ case open_replication_log(Db, LogId) of
+ {error, not_found} when Vsn > 1 ->
+ OldRepId = make_replication_id({Props}, UserCtx, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, {Props}, UserCtx, Acc);
+ {error, not_found} ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ {Props}, UserCtx, [#doc{id=NewId}|Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ {Props}, UserCtx, [Doc|Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id=NewId,body=Doc#doc.body},
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ {Props}, UserCtx, [MigratedLog|Acc])
+ end.
+
+open_replication_log(#http_db{}=Db, DocId) ->
+ Req = Db#http_db{resource=couch_util:url_encode(?b2l(DocId))},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
- #doc{id=?l2b(DocId)};
+ {error, not_found};
Doc ->
?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
- couch_doc:from_json_obj(Doc)
+ {ok, couch_doc:from_json_obj(Doc)}
end;
-open_replication_log(Db, RepId) ->
- DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+open_replication_log(Db, DocId) ->
case couch_db:open_doc(Db, DocId, []) of
{ok, Doc} ->
?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
- Doc;
+ {ok, Doc};
_ ->
?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
- #doc{id=DocId}
+ {error, not_found}
end.
open_db(Props, UserCtx) ->