summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2010-07-19 00:10:11 +0000
committerRobert Newson <rnewson@apache.org>2010-07-19 00:10:11 +0000
commit4c780ad5eb81e0e4c66b45fd70fbb623c3a19be6 (patch)
tree5c21b98a2cc25bf6613d59e545b3dcf2b7026c11
parent66ab269ccf9945a72f7108f07454eca14060b632 (diff)
COUCHDB-810: Adds port to replication checkpoints.
New replication checkpoints now include the port number, which allows for efficient replication between multiple couchdb instances running on the same machine. Old replication checkpoints are recognized (Full replication is not induced) and they are automatically migrated to the new checkpoint format. Thanks to Randall Leeds for the patch. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@965331 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep.erl77
1 files changed, 58 insertions, 19 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index b741f86f..4f6fb673 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -19,6 +19,8 @@
-include("couch_db.hrl").
+-define(REP_ID_VERSION, 2).
+
-record(state, {
changes_feed,
missing_revs,
@@ -59,7 +61,9 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
%% function handling POST to _replicate
replicate({Props}=PostBody, UserCtx) ->
- {BaseId, Extension} = make_replication_id(PostBody, UserCtx),
+ BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], Props),
Replicator = {BaseId ++ Extension,
{gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
temporary,
@@ -144,8 +148,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
_ ->
% Replication using the _changes API (DB sequence update numbers).
- SourceLog = open_replication_log(Source, RepId),
- TargetLog = open_replication_log(Target, RepId),
+
+ [SourceLog, TargetLog] = find_replication_logs(
+ [Source, Target], RepId, {PostProps}, UserCtx),
{StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
@@ -451,13 +456,23 @@ maybe_append_options(Options, Props) ->
end
end, [], Options).
-make_replication_id({Props}, UserCtx) ->
- %% funky algorithm to preserve backwards compatibility
+% Versioned clauses for generating replication ids
+% If a change is made to how replications are identified
+% add a new clause and increase ?REP_ID_VERSION at the top
+make_replication_id({Props}, UserCtx, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = mochiweb_socket_server:get(couch_httpd, port),
+ Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
+ maybe_append_filters({Props}, [HostName, Port, Src, Tgt]);
+make_replication_id({Props}, UserCtx, 1) ->
{ok, HostName} = inet:gethostname(),
- % Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
- Base = [HostName, Src, Tgt] ++
+ maybe_append_filters({Props}, [HostName, Src, Tgt]).
+
+maybe_append_filters({Props}, Base) ->
+ Base2 = Base ++
case couch_util:get_value(<<"filter">>, Props) of
undefined ->
case couch_util:get_value(<<"doc_ids">>, Props) of
@@ -469,9 +484,7 @@ make_replication_id({Props}, UserCtx) ->
Filter ->
[Filter, couch_util:get_value(<<"query_params">>, Props, {[]})]
end,
- Extension = maybe_append_options(
- [<<"continuous">>, <<"create_target">>], Props),
- {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+ couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
@@ -493,26 +506,52 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
{local, DbName, UserCtx}.
-open_replication_log(#http_db{}=Db, RepId) ->
- DocId = ?LOCAL_DOC_PREFIX ++ RepId,
- Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+find_replication_logs(Logs, RepId, {Props}, UserCtx) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ fold_replication_logs(Logs, ?REP_ID_VERSION,
+ LogId, LogId, {Props}, UserCtx, []).
+
+% Accumulate the replication logs
+% Falls back to older log document ids and migrates them
+fold_replication_logs([], _Vsn, _LogId, _NewId, {_Props}, _UserCtx, Acc) ->
+ lists:reverse(Acc);
+fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
+ {Props}, UserCtx, Acc) ->
+ case open_replication_log(Db, LogId) of
+ {error, not_found} when Vsn > 1 ->
+ OldRepId = make_replication_id({Props}, UserCtx, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, {Props}, UserCtx, Acc);
+ {error, not_found} ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ {Props}, UserCtx, [#doc{id=NewId}|Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ {Props}, UserCtx, [Doc|Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id=NewId,body=Doc#doc.body},
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ {Props}, UserCtx, [MigratedLog|Acc])
+ end.
+
+open_replication_log(#http_db{}=Db, DocId) ->
+ Req = Db#http_db{resource=couch_util:url_encode(?b2l(DocId))},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
- #doc{id=?l2b(DocId)};
+ {error, not_found};
Doc ->
?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
- couch_doc:from_json_obj(Doc)
+ {ok, couch_doc:from_json_obj(Doc)}
end;
-open_replication_log(Db, RepId) ->
- DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+open_replication_log(Db, DocId) ->
case couch_db:open_doc(Db, DocId, []) of
{ok, Doc} ->
?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
- Doc;
+ {ok, Doc};
_ ->
?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
- #doc{id=DocId}
+ {error, not_found}
end.
open_db(Props, UserCtx) ->