summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_changes.erl2
-rw-r--r--src/couchdb/couch_rep.erl105
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl107
-rw-r--r--src/couchdb/couch_rep_reader.erl70
-rw-r--r--src/couchdb/couch_rep_writer.erl9
5 files changed, 112 insertions, 181 deletions
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index ec0beb57..1e53161b 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -119,6 +119,8 @@ os_filter_fun(FilterName, Style, Req, Db) ->
"filter parameter must be of the form `designname/filtername`"})
end.
+builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
+ filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style);
builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
{Props} = couch_httpd:json_body_obj(Req),
DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil),
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 88912fbf..d35471c5 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -53,7 +53,6 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil,
rep_doc = nil
}).
@@ -129,7 +128,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
SourceProps = couch_util:get_value(<<"source">>, PostProps),
TargetProps = couch_util:get_value(<<"target">>, PostProps),
- DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil),
Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),
@@ -143,40 +141,16 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
maybe_set_triggered(RepDoc, RepId),
- case DocIds of
- List when is_list(List) ->
- % Fast replication using only a list of doc IDs to replicate.
- % Replication sessions, checkpoints and logs are not created
- % since the update sequence number of the source DB is not used
- % for determining which documents are copied into the target DB.
- SourceLog = nil,
- TargetLog = nil,
+ [SourceLog, TargetLog] = find_replication_logs(
+ [Source, Target], RepId, {PostProps}, UserCtx),
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
- StartSeq = nil,
- History = nil,
-
- ChangesFeed = nil,
- MissingRevs = nil,
-
- {ok, Reader} =
- couch_rep_reader:start_link(self(), Source, DocIds, PostProps);
-
- _ ->
- % Replication using the _changes API (DB sequence update numbers).
-
- [SourceLog, TargetLog] = find_replication_logs(
- [Source, Target], RepId, {PostProps}, UserCtx),
-
- {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
-
- {ok, ChangesFeed} =
+ {ok, ChangesFeed} =
couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
- {ok, MissingRevs} =
+ {ok, MissingRevs} =
couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
- {ok, Reader} =
- couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps)
- end,
-
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
{ok, Writer} =
couch_rep_writer:start_link(self(), Target, Reader, PostProps),
@@ -213,7 +187,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds,
rep_doc = RepDoc
},
{ok, State}.
@@ -390,25 +363,6 @@ dbinfo(Db) ->
{ok, Info} = couch_db:get_db_info(Db),
Info.
-do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) ->
- #state{
- listeners = Listeners,
- rep_starttime = ReplicationStartTime,
- stats = Stats
- } = State,
-
- RepByDocsJson = {[
- {<<"start_time">>, ?l2b(ReplicationStartTime)},
- {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
-
- terminate_cleanup(State),
- [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)];
-
do_terminate(State) ->
#state{
checkpoint_history = CheckpointHistory,
@@ -659,32 +613,53 @@ do_checkpoint(State) ->
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
- stats = Stats
+ stats = Stats,
+ rep_doc = {RepDoc}
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
[dbname(Source), dbname(Target), NewSeqNum]),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ StartTime = ?l2b(ReplicationStartTime),
+ DocsRead = ets:lookup_element(Stats, docs_read, 2),
+ DocsWritten = ets:lookup_element(Stats, docs_written, 2),
+ DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
- {<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
{<<"start_last_seq">>, StartSeqNum},
{<<"end_last_seq">>, NewSeqNum},
{<<"recorded_seq">>, NewSeqNum},
{<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
{<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
]},
- % limit history to 50 entries
- NewRepHistory = {[
+ BaseHistory = [
{<<"session_id">>, SessionId},
- {<<"source_last_seq">>, NewSeqNum},
- {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
- ]},
+ {<<"source_last_seq">>, NewSeqNum}
+ ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
+ undefined ->
+ [];
+ DocIds when is_list(DocIds) ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
try
{SrcRevPos,SrcRevId} =
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 98e929fe..246e82c0 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -18,6 +18,7 @@
-export([start_link/4, next/1, stop/1]).
-define(BUFFER_SIZE, 1000).
+-define(DOC_IDS_FILTER_NAME, "_doc_ids").
-include("couch_db.hrl").
-include("../ibrowse/ibrowse.hrl").
@@ -36,6 +37,11 @@
rows = queue:new()
}).
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
start_link(Parent, Source, StartSeq, PostProps) ->
gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []).
@@ -46,9 +52,9 @@ stop(Server) ->
catch gen_server:call(Server, stop),
ok.
-init([Parent, #http_db{}=Source, Since, PostProps]) ->
+init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) ->
process_flag(trap_exit, true),
- Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of
+ Feed = case get_value(<<"continuous">>, PostProps, false) of
false ->
normal;
true ->
@@ -60,33 +66,24 @@ init([Parent, #http_db{}=Source, Since, PostProps]) ->
{"since", Since},
{"feed", Feed}
],
- QS = case couch_util:get_value(<<"filter">>, PostProps) of
+ {QS, Method, Body, Headers} = case get_value(<<"doc_ids">>, PostProps) of
undefined ->
- BaseQS;
- FilterName ->
- {Params} = couch_util:get_value(<<"query_params">>, PostProps, {[]}),
- lists:foldr(
- fun({K, V}, QSAcc) ->
- Ks = couch_util:to_list(K),
- case proplists:is_defined(Ks, QSAcc) of
- true ->
- QSAcc;
- false ->
- [{Ks, V} | QSAcc]
- end
- end,
- [{"filter", FilterName} | BaseQS],
- Params
- )
+ {maybe_add_filter_qs_params(PostProps, BaseQS), get, nil, Headers0};
+ DocIds when is_list(DocIds) ->
+ Headers1 = [{"Content-Type", "application/json"} | Headers0],
+ QS1 = [{"filter", ?l2b(?DOC_IDS_FILTER_NAME)} | BaseQS],
+ {QS1, post, {[{<<"doc_ids">>, DocIds}]}, Headers1}
end,
Pid = couch_rep_httpc:spawn_link_worker_process(Source),
Req = Source#http_db{
+ method = Method,
+ body = Body,
resource = "_changes",
qs = QS,
conn = Pid,
options = [{stream_to, {self(), once}}] ++
lists:keydelete(inactivity_timeout, 1, Source#http_db.options),
- headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
+ headers = Headers -- [{"Accept-Encoding", "gzip"}]
},
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
Args = [Parent, Req, Since, PostProps],
@@ -123,11 +120,17 @@ init([Parent, #http_db{}=Source, Since, PostProps]) ->
init([_Parent, Source, Since, PostProps] = InitArgs) ->
process_flag(trap_exit, true),
Server = self(),
+ Filter = case get_value(<<"doc_ids">>, PostProps) of
+ undefined ->
+ ?b2l(get_value(<<"filter">>, PostProps, <<>>));
+ DocIds when is_list(DocIds) ->
+ ?DOC_IDS_FILTER_NAME
+ end,
ChangesArgs = #changes_args{
style = all_docs,
since = Since,
- filter = ?b2l(couch_util:get_value(<<"filter">>, PostProps, <<>>)),
- feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of
+ filter = Filter,
+ feed = case get_value(<<"continuous">>, PostProps, false) of
true ->
"continuous";
false ->
@@ -138,7 +141,7 @@ init([_Parent, Source, Since, PostProps] = InitArgs) ->
ChangesPid = spawn_link(fun() ->
ChangesFeedFun = couch_changes:handle_changes(
ChangesArgs,
- {json_req, filter_json_req(Source, PostProps)},
+ {json_req, filter_json_req(Filter, Source, PostProps)},
Source
),
ChangesFeedFun(fun({change, Change, _}, _) ->
@@ -149,29 +152,49 @@ init([_Parent, Source, Since, PostProps] = InitArgs) ->
end),
{ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
-filter_json_req(Db, PostProps) ->
- case couch_util:get_value(<<"filter">>, PostProps) of
+maybe_add_filter_qs_params(PostProps, BaseQS) ->
+ case get_value(<<"filter">>, PostProps) of
undefined ->
- {[]};
+ BaseQS;
FilterName ->
- {Query} = couch_util:get_value(<<"query_params">>, PostProps, {[]}),
- {ok, Info} = couch_db:get_db_info(Db),
- % simulate a request to db_name/_changes
- {[
- {<<"info">>, {Info}},
- {<<"id">>, null},
- {<<"method">>, 'GET'},
- {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
- {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
- {<<"headers">>, []},
- {<<"body">>, []},
- {<<"peer">>, <<"replicator">>},
- {<<"form">>, []},
- {<<"cookie">>, []},
- {<<"userCtx">>, couch_util:json_user_ctx(Db)}
- ]}
+ {Params} = get_value(<<"query_params">>, PostProps, {[]}),
+ lists:foldr(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case proplists:is_defined(Ks, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, V} | QSAcc]
+ end
+ end,
+ [{"filter", FilterName} | BaseQS],
+ Params
+ )
end.
+filter_json_req([], _Db, _PostProps) ->
+ {[]};
+filter_json_req(?DOC_IDS_FILTER_NAME, _Db, PostProps) ->
+ {[{<<"doc_ids">>, get_value(<<"doc_ids">>, PostProps)}]};
+filter_json_req(FilterName, Db, PostProps) ->
+ {Query} = get_value(<<"query_params">>, PostProps, {[]}),
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}.
+
handle_call({add_change, Row}, From, State) ->
handle_add_change(Row, From, State);
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index 4f81c8e4..bdef3dfc 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -43,15 +43,13 @@
opened_seqs = []
}).
-start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
- gen_server:start_link(
- ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
- ).
+start_link(Parent, Source, MissingRevs, PostProps) ->
+ gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
next(Pid) ->
gen_server:call(Pid, next_docs, infinity).
-init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
+init([Parent, Source, MissingRevs, _PostProps]) ->
process_flag(trap_exit, true),
if is_record(Source, http_db) ->
#url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
@@ -59,15 +57,7 @@ init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
true -> ok end,
Self = self(),
- ReaderLoop = spawn_link(
- fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
- ),
- MissingRevs = case MissingRevs_or_DocIds of
- Pid when is_pid(Pid) ->
- Pid;
- _ListDocIds ->
- nil
- end,
+ ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
State = #state{
parent = Parent,
source = Source,
@@ -183,8 +173,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) ->
handle_reader_loop_complete(State) ->
{noreply, State#state{complete = waiting_on_monitors}}.
-calculate_new_high_seq(#state{missing_revs=nil}) ->
- nil;
calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
Open;
calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -209,8 +197,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
% opened seqs greater than the smallest outstanding request. I believe its the
% minimal set of info needed to correctly calculate which seqs have been
% replicated (because remote docs can be opened out-of-order) -- APK
-update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
- State;
update_sequence_lists(Seq, State) ->
Requested = lists:delete(Seq, State#state.requested_seqs),
AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -261,44 +247,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
end,
lists:reverse(lists:foldl(Transform, [], JsonResults)).
-open_doc(#http_db{url = Url} = DbS, DocId) ->
- % get latest rev of the doc
- Req = DbS#http_db{
- resource=encode_doc_id(DocId),
- qs=[{att_encoding_info, true}]
- },
- {Props} = Json = couch_rep_httpc:request(Req),
- case couch_util:get_value(<<"_id">>, Props) of
- Id when is_binary(Id) ->
- #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
- [Doc#doc{
- atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
- }];
- undefined ->
- Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)),
- ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
- [DocId, couch_util:url_strip_password(Url), Err]),
- []
- end.
-
-reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
- case Source of
- #http_db{} ->
- [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
- infinity) || Id <- DocIds];
- _LocalDb ->
- Docs = lists:foldr(fun(Id, Acc) ->
- case couch_db:open_doc(Source, Id) of
- {ok, Doc} ->
- [Doc | Acc];
- _ ->
- Acc
- end
- end, [], DocIds),
- gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
- end,
- exit(complete);
-
reader_loop(ReaderServer, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
@@ -332,8 +280,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
maybe_reopen_db(Db, _HighSeq) ->
Db.
-spawn_document_request(Source, Id, nil, nil) ->
- spawn_document_request(Source, Id);
spawn_document_request(Source, Id, Seq, Revs) ->
Server = self(),
SpawnFun = fun() ->
@@ -341,11 +287,3 @@ spawn_document_request(Source, Id, Seq, Revs) ->
gen_server:call(Server, {add_docs, Seq, Results}, infinity)
end,
spawn_monitor(SpawnFun).
-
-spawn_document_request(Source, Id) ->
- Server = self(),
- SpawnFun = fun() ->
- Results = open_doc(Source, Id),
- gen_server:call(Server, {add_docs, nil, Results}, infinity)
- end,
- spawn_monitor(SpawnFun).
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index f7bc9a72..622bfb27 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -21,8 +21,6 @@ start_link(Parent, Target, Reader, _PostProps) ->
writer_loop(Parent, Reader, Target) ->
case couch_rep_reader:next(Reader) of
- {complete, nil} ->
- ok;
{complete, FinalSeq} ->
Parent ! {writer_checkpoint, FinalSeq},
ok;
@@ -40,12 +38,7 @@ writer_loop(Parent, Reader, Target) ->
?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
exit({attachment_request_failed, Err, Docs})
end,
- case HighSeq of
- nil ->
- ok;
- _SeqNumber ->
- Parent ! {writer_checkpoint, HighSeq}
- end,
+ Parent ! {writer_checkpoint, HighSeq},
couch_rep_att:cleanup(),
couch_util:should_flush(),
writer_loop(Parent, Reader, Target)