summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_rep.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2011-08-15 15:45:15 -0400
committerAdam Kocoloski <adam@cloudant.com>2011-08-15 15:45:15 -0400
commit6ffe1675dd7b004e48891956a6bdbe32899ce80c (patch)
tree57326d9a498481e65bb0db38c66daf10896801f1 /apps/couch/src/couch_rep.erl
parent52ff89ff7996e839b9e2f91fd76184d362a8aeb0 (diff)
parentfdd1a5d0bc48b49b0df5c9217beff9574011283c (diff)
Merge branch '11554-merge-couchdb-1.1'
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r--apps/couch/src/couch_rep.erl384
1 files changed, 242 insertions, 142 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index c804b49d..2d011aab 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -15,9 +15,14 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2, checkpoint/1]).
+-export([replicate/2, replicate/3, checkpoint/1]).
+-export([make_replication_id/2]).
+-export([start_replication/4, end_replication/1, get_result/4]).
-include("couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-define(REP_ID_VERSION, 2).
-record(state, {
changes_feed,
@@ -47,7 +52,6 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil,
source_db_update_notifier = nil,
target_db_update_notifier = nil
}).
@@ -61,40 +65,55 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
%% function handling POST to _replicate
-replicate({Props}=PostBody, UserCtx) ->
- {BaseId, Extension} = make_replication_id(PostBody, UserCtx),
- Replicator = {BaseId ++ Extension,
- {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
- temporary,
- 1,
- worker,
- [?MODULE]
- },
+replicate(PostBody, UserCtx) ->
+ replicate(PostBody, UserCtx, couch_replication_manager).
+replicate({Props}=PostBody, UserCtx, Module) ->
+ RepId = make_replication_id(PostBody, UserCtx),
case couch_util:get_value(<<"cancel">>, Props, false) of
true ->
- case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of
- {error, not_found} ->
- {error, not_found};
- ok ->
- ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension),
- {ok, {cancelled, ?l2b(BaseId)}}
- end;
+ end_replication(RepId);
false ->
- Server = start_replication_server(Replicator),
+ Server = start_replication(PostBody, RepId, UserCtx, Module),
+ get_result(Server, RepId, PostBody, UserCtx)
+ end.
- case couch_util:get_value(<<"continuous">>, Props, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId)}};
- false ->
- get_result(Server, PostBody, UserCtx)
+end_replication({BaseId, Extension}) ->
+ RepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, RepId) of
+ {error, not_found} = R ->
+ R;
+ ok ->
+ case supervisor:delete_child(couch_rep_sup, RepId) of
+ ok ->
+ {ok, {cancelled, ?l2b(BaseId)}};
+ {error, not_found} ->
+ {ok, {cancelled, ?l2b(BaseId)}};
+ {error, _} = Error ->
+ Error
end
end.
+start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx, Module) ->
+ Replicator = {
+ BaseId ++ Extension,
+ {gen_server, start_link,
+ [?MODULE, [RepId, RepDoc, UserCtx, Module], []]},
+ temporary,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ start_replication_server(Replicator).
+
checkpoint(Server) ->
gen_server:cast(Server, do_checkpoint).
-get_result(Server, PostBody, UserCtx) ->
+get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
try gen_server:call(Server, get_result, infinity) of
retry -> replicate(PostBody, UserCtx);
Else -> Else
@@ -105,6 +124,7 @@ get_result(Server, PostBody, UserCtx) ->
exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
%% we made the call during terminate
replicate(PostBody, UserCtx)
+ end
end.
init(InitArgs) ->
@@ -115,13 +135,12 @@ init(InitArgs) ->
{stop, Error}
end.
-do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
+do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) ->
process_flag(trap_exit, true),
SourceProps = couch_util:get_value(<<"source">>, PostProps),
TargetProps = couch_util:get_value(<<"target">>, PostProps),
- DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil),
Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),
@@ -133,29 +152,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceInfo = dbinfo(Source),
TargetInfo = dbinfo(Target),
- case DocIds of
- List when is_list(List) ->
- % Fast replication using only a list of doc IDs to replicate.
- % Replication sessions, checkpoints and logs are not created
- % since the update sequence number of the source DB is not used
- % for determining which documents are copied into the target DB.
- SourceLog = nil,
- TargetLog = nil,
-
- StartSeq = nil,
- History = nil,
-
- ChangesFeed = nil,
- MissingRevs = nil,
-
- {ok, Reader} =
- couch_rep_reader:start_link(self(), Source, DocIds, PostProps);
-
- _ ->
- % Replication using the _changes API (DB sequence update numbers).
- SourceLog = open_replication_log(Source, RepId),
- TargetLog = open_replication_log(Target, RepId),
-
+ [SourceLog, TargetLog] = find_replication_logs(
+ [Source, Target], BaseId, {PostProps}, UserCtx),
{StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
{ok, ChangesFeed} =
@@ -163,9 +161,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
{ok, MissingRevs} =
couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
{ok, Reader} =
- couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps)
- end,
-
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
{ok, Writer} =
couch_rep_writer:start_link(self(), Target, Reader, PostProps),
@@ -176,10 +172,12 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
ets:insert(Stats, {docs_written, 0}),
ets:insert(Stats, {doc_write_failures, 0}),
- {ShortId, _} = lists:split(6, RepId),
+ {ShortId, _} = lists:split(6, BaseId),
couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
[ShortId, dbname(Source), dbname(Target)]), "Starting"),
+ Module:replication_started(RepId),
+
State = #state{
changes_feed = ChangesFeed,
missing_revs = MissingRevs,
@@ -202,7 +200,6 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds,
source_db_update_notifier = source_db_update_notifier(Source),
target_db_update_notifier = target_db_update_notifier(Target)
},
@@ -274,24 +271,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
- do_terminate(State);
+terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId, _, _, Module]} = State) ->
+ do_terminate(State),
+ Module:replication_completed(RepId);
-terminate(normal, State) ->
+terminate(normal, #state{init_args=[RepId, _, _, Module]} = State) ->
timer:cancel(State#state.checkpoint_scheduled),
- do_terminate(do_checkpoint(State));
+ do_terminate(do_checkpoint(State)),
+ Module:replication_completed(RepId);
-terminate(Reason, State) ->
- #state{
- listeners = Listeners,
- source = Source,
- target = Target,
- stats = Stats
- } = State,
+terminate(shutdown, #state{listeners = Listeners} = State) ->
+ % continuous replication stopped
+ [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
+ terminate_cleanup(State);
+
+terminate(Reason, #state{listeners = Listeners, init_args=[RepId, _, _, Module]} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
- ets:delete(Stats),
- close_db(Target),
- close_db(Source).
+ terminate_cleanup(State),
+ Module:replication_error(RepId, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -321,7 +318,14 @@ start_replication_server(Replicator) ->
throw({db_not_found, <<"could not open ", DbUrl/binary>>});
{error, {unauthorized, DbUrl}} ->
throw({unauthorized,
- <<"unauthorized to access database ", DbUrl/binary>>})
+ <<"unauthorized to access or create database ", DbUrl/binary>>});
+ {error, {'EXIT', {badarg,
+ [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
+ % Clause to deal with a change in the supervisor module introduced
+ % in R14B02. For more details consult the thread at:
+ % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
+ _ = supervisor:delete_child(couch_rep_sup, RepId),
+ start_replication_server(Replicator)
end;
{error, {already_started, Pid}} ->
?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
@@ -330,7 +334,7 @@ start_replication_server(Replicator) ->
throw({db_not_found, <<"could not open ", DbUrl/binary>>});
{error, {{unauthorized, DbUrl}, _}} ->
throw({unauthorized,
- <<"unauthorized to access database ", DbUrl/binary>>})
+ <<"unauthorized to access or create database ", DbUrl/binary>>})
end.
compare_replication_logs(SrcDoc, TgtDoc) ->
@@ -390,30 +394,11 @@ dbname(#db{name = Name}) ->
dbinfo(#http_db{} = Db) ->
{DbProps} = couch_rep_httpc:request(Db),
- [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps];
+ [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps];
dbinfo(Db) ->
{ok, Info} = couch_db:get_db_info(Db),
Info.
-do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) ->
- #state{
- listeners = Listeners,
- rep_starttime = ReplicationStartTime,
- stats = Stats
- } = State,
-
- RepByDocsJson = {[
- {<<"start_time">>, ?l2b(ReplicationStartTime)},
- {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
-
- terminate_cleanup(State),
- [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)];
-
do_terminate(State) ->
#state{
checkpoint_history = CheckpointHistory,
@@ -475,7 +460,7 @@ has_session_id(SessionId, [{Props} | Rest]) ->
has_session_id(SessionId, Rest)
end.
-maybe_append_options(Options, Props) ->
+maybe_append_options(Options, {Props}) ->
lists:foldl(fun(Option, Acc) ->
Acc ++
case couch_util:get_value(Option, Props, false) of
@@ -486,13 +471,39 @@ maybe_append_options(Options, Props) ->
end
end, [], Options).
-make_replication_id({Props}, UserCtx) ->
- %% funky algorithm to preserve backwards compatibility
+make_replication_id(RepProps, UserCtx) ->
+ BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], RepProps),
+ {BaseId, Extension}.
+
+% Versioned clauses for generating replication ids
+% If a change is made to how replications are identified
+% add a new clause and increase ?REP_ID_VERSION at the top
+make_replication_id({Props}, UserCtx, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+ P when is_number(P) ->
+ P;
+ _ ->
+ % On restart we might be called before the couch_httpd process is
+ % started.
+ % TODO: we might be under an SSL socket server only, or both under
+ % SSL and a non-SSL socket.
+ % ... mochiweb_socket_server:get(https, port)
+ list_to_integer(couch_config:get("httpd", "port", "5984"))
+ end,
+ Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
+ maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx);
+make_replication_id({Props}, UserCtx, 1) ->
{ok, HostName} = inet:gethostname(),
- % Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
- Base = [HostName, Src, Tgt] ++
+ maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx).
+
+maybe_append_filters({Props}, Base, UserCtx) ->
+ Base2 = Base ++
case couch_util:get_value(<<"filter">>, Props) of
undefined ->
case couch_util:get_value(<<"doc_ids">>, Props) of
@@ -502,11 +513,47 @@ make_replication_id({Props}, UserCtx) ->
[DocIds]
end;
Filter ->
- [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})]
+ [filter_code(Filter, Props, UserCtx),
+ couch_util:get_value(<<"query_params">>, Props, {[]})]
end,
- Extension = maybe_append_options(
- [<<"continuous">>, <<"create_target">>], Props),
- {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+ couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+
+filter_code(Filter, Props, UserCtx) ->
+ {DDocName, FilterName} =
+ case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+ {match, [DDocName0, FilterName0]} ->
+ {DDocName0, FilterName0};
+ _ ->
+ throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
+ end,
+ ProxyParams = parse_proxy_params(
+ couch_util:get_value(<<"proxy">>, Props, [])),
+ DbName = couch_util:get_value(<<"source">>, Props),
+ Source = try
+ open_db(DbName, UserCtx, ProxyParams)
+ catch
+ _Tag:DbError ->
+ DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+ [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]),
+ throw({error, iolist_to_binary(DbErrorMsg)})
+ end,
+ try
+ Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of
+ {ok, #doc{body = Body0}} ->
+ Body0;
+ DocError ->
+ DocErrorMsg = io_lib:format(
+ "Couldn't open document `_design/~s` from source "
+ "database `~s`: ~s",
+ [dbname(Source), DDocName, couch_util:to_binary(DocError)]),
+ throw({error, iolist_to_binary(DocErrorMsg)})
+ end,
+ Code = couch_util:get_nested_json_value(
+ Body, [<<"filters">>, FilterName]),
+ re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}])
+ after
+ close_db(Source)
+ end.
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
@@ -528,27 +575,54 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
{local, DbName, UserCtx}.
-open_replication_log(#http_db{}=Db, RepId) ->
- DocId = ?LOCAL_DOC_PREFIX ++ RepId,
- Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+find_replication_logs(DbList, RepId, RepProps, UserCtx) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION,
+ LogId, LogId, RepProps, UserCtx, []).
+
+% Accumulate the replication logs
+% Falls back to older log document ids and migrates them
+fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) ->
+ lists:reverse(Acc);
+fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
+ RepProps, UserCtx, Acc) ->
+ case open_replication_log(Db, LogId) of
+ {error, not_found} when Vsn > 1 ->
+ OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc);
+ {error, not_found} ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ RepProps, UserCtx, [#doc{id=NewId}|Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ RepProps, UserCtx, [Doc|Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id=NewId,body=Doc#doc.body},
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ RepProps, UserCtx, [MigratedLog|Acc])
+ end.
+
+open_replication_log(Db, DocId) ->
+ case open_doc(Db, DocId) of
+ {ok, Doc} ->
+ ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]),
+ {ok, Doc};
+ _ ->
+ ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]),
+ {error, not_found}
+ end.
+
+open_doc(#http_db{} = Db, DocId) ->
+ Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
- ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
- #doc{id=?l2b(DocId)};
+ {error, not_found};
Doc ->
- ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
- couch_doc:from_json_obj(Doc)
+ {ok, couch_doc:from_json_obj(Doc)}
end;
-open_replication_log(Db, RepId) ->
- DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
- case couch_db:open_doc(Db, DocId, []) of
- {ok, Doc} ->
- ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
- Doc;
- _ ->
- ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
- #doc{id=DocId}
- end.
+open_doc(Db, DocId) ->
+ couch_db:open_doc(Db, DocId).
open_db(Props, UserCtx, ProxyParams) ->
open_db(Props, UserCtx, ProxyParams, false).
@@ -575,18 +649,18 @@ open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
try
- case CreateTarget of
- true ->
- ok = couch_httpd:verify_is_server_admin(UserCtx),
- couch_server:create(DbName, [{user_ctx, UserCtx}]);
+ case CreateTarget of
+ true ->
+ ok = couch_httpd:verify_is_server_admin(UserCtx),
+ couch_server:create(DbName, [{user_ctx, UserCtx}]);
false ->
ok
- end,
+ end,
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} ->
- couch_db:monitor(Db),
- Db;
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ couch_db:monitor(Db),
+ Db;
{not_found, no_db_file} ->
throw({db_not_found, DbName})
end
@@ -619,32 +693,54 @@ do_checkpoint(State) ->
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
- stats = Stats
+ stats = Stats,
+ init_args = [_RepId, {RepDoc} | _]
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
[dbname(Source), dbname(Target), NewSeqNum]),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ StartTime = ?l2b(ReplicationStartTime),
+ DocsRead = ets:lookup_element(Stats, docs_read, 2),
+ DocsWritten = ets:lookup_element(Stats, docs_written, 2),
+ DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
- {<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
{<<"start_last_seq">>, StartSeqNum},
{<<"end_last_seq">>, NewSeqNum},
{<<"recorded_seq">>, NewSeqNum},
{<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
{<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
]},
- % limit history to 50 entries
- NewRepHistory = {[
+ BaseHistory = [
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeqNum},
- {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
- ]},
+ {<<"replication_id_version">>, ?REP_ID_VERSION}
+ ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
+ undefined ->
+ [];
+ DocIds when is_list(DocIds) ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
try
{SrcRevPos,SrcRevId} =
@@ -760,9 +856,9 @@ ensure_full_commit(Source, RequiredSeq) ->
InstanceStartTime
end.
-update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) ->
+update_local_doc(#http_db{} = Db, Doc) ->
Req = Db#http_db{
- resource = couch_util:url_encode(DocId),
+ resource = couch_util:encode_doc_id(Doc),
method = put,
body = couch_doc:to_json_obj(Doc, [attachments]),
headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
@@ -787,9 +883,13 @@ parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
parse_proxy_params([]) ->
[];
parse_proxy_params(ProxyUrl) ->
- {url, _, Base, Port, User, Passwd, _Path, _Proto} =
- ibrowse_lib:parse_url(ProxyUrl),
- [{proxy_host, Base}, {proxy_port, Port}] ++
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd
+ } = ibrowse_lib:parse_url(ProxyUrl),
+ [{proxy_host, Host}, {proxy_port, Port}] ++
case is_list(User) andalso is_list(Passwd) of
false ->
[];