diff options
-rw-r--r-- | src/couchdb/couch_changes.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 105 | ||||
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 107 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 70 | ||||
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 9 |
5 files changed, 112 insertions, 181 deletions
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index ec0beb57..1e53161b 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -119,6 +119,8 @@ os_filter_fun(FilterName, Style, Req, Db) -> "filter parameter must be of the form `designname/filtername`"}) end. +builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) -> + filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style); builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) -> {Props} = couch_httpd:json_body_obj(Req), DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil), diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 88912fbf..d35471c5 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -53,7 +53,6 @@ committed_seq = 0, stats = nil, - doc_ids = nil, rep_doc = nil }). @@ -129,7 +128,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> SourceProps = couch_util:get_value(<<"source">>, PostProps), TargetProps = couch_util:get_value(<<"target">>, PostProps), - DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil), Continuous = couch_util:get_value(<<"continuous">>, PostProps, false), CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), @@ -143,40 +141,16 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> maybe_set_triggered(RepDoc, RepId), - case DocIds of - List when is_list(List) -> - % Fast replication using only a list of doc IDs to replicate. - % Replication sessions, checkpoints and logs are not created - % since the update sequence number of the source DB is not used - % for determining which documents are copied into the target DB. - SourceLog = nil, - TargetLog = nil, + [SourceLog, TargetLog] = find_replication_logs( + [Source, Target], RepId, {PostProps}, UserCtx), + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - StartSeq = nil, - History = nil, - - ChangesFeed = nil, - MissingRevs = nil, - - {ok, Reader} = - couch_rep_reader:start_link(self(), Source, DocIds, PostProps); - - _ -> - % Replication using the _changes API (DB sequence update numbers). - - [SourceLog, TargetLog] = find_replication_logs( - [Source, Target], RepId, {PostProps}, UserCtx), - - {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - - {ok, ChangesFeed} = + {ok, ChangesFeed} = couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), - {ok, MissingRevs} = + {ok, MissingRevs} = couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), - {ok, Reader} = - couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) - end, - + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -213,7 +187,6 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds, rep_doc = RepDoc }, {ok, State}. @@ -390,25 +363,6 @@ dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. -do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> - #state{ - listeners = Listeners, - rep_starttime = ReplicationStartTime, - stats = Stats - } = State, - - RepByDocsJson = {[ - {<<"start_time">>, ?l2b(ReplicationStartTime)}, - {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} - ]}, - - terminate_cleanup(State), - [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; - do_terminate(State) -> #state{ checkpoint_history = CheckpointHistory, @@ -659,32 +613,53 @@ do_checkpoint(State) -> rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, - stats = Stats + stats = Stats, + rep_doc = {RepDoc} } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", [dbname(Source), dbname(Target), NewSeqNum]), + EndTime = ?l2b(httpd_util:rfc1123_date()), + StartTime = ?l2b(ReplicationStartTime), + DocsRead = ets:lookup_element(Stats, docs_read, 2), + DocsWritten = ets:lookup_element(Stats, docs_written, 2), + DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2), NewHistoryEntry = {[ {<<"session_id">>, SessionId}, - {<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, {<<"start_last_seq">>, StartSeqNum}, {<<"end_last_seq">>, NewSeqNum}, {<<"recorded_seq">>, NewSeqNum}, {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten}, + {<<"doc_write_failures">>, DocWriteFailures} ]}, - % limit history to 50 entries - NewRepHistory = {[ + BaseHistory = [ {<<"session_id">>, SessionId}, - {<<"source_last_seq">>, NewSeqNum}, - {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} - ]}, + {<<"source_last_seq">>, NewSeqNum} + ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of + undefined -> + []; + DocIds when is_list(DocIds) -> + % backwards compatibility with the result of a replication by + % doc IDs in versions 0.11.x and 1.0.x + [ + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten}, + {<<"doc_write_failures">>, DocWriteFailures} + ] + end, + % limit history to 50 entries + NewRepHistory = { + BaseHistory ++ + [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] + }, try {SrcRevPos,SrcRevId} = diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 98e929fe..246e82c0 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -18,6 +18,7 @@ -export([start_link/4, next/1, stop/1]). -define(BUFFER_SIZE, 1000). +-define(DOC_IDS_FILTER_NAME, "_doc_ids"). -include("couch_db.hrl"). -include("../ibrowse/ibrowse.hrl"). @@ -36,6 +37,11 @@ rows = queue:new() }). +-import(couch_util, [ + get_value/2, + get_value/3 +]). + start_link(Parent, Source, StartSeq, PostProps) -> gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []). @@ -46,9 +52,9 @@ stop(Server) -> catch gen_server:call(Server, stop), ok. -init([Parent, #http_db{}=Source, Since, PostProps]) -> +init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) -> process_flag(trap_exit, true), - Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of + Feed = case get_value(<<"continuous">>, PostProps, false) of false -> normal; true -> @@ -60,33 +66,24 @@ init([Parent, #http_db{}=Source, Since, PostProps]) -> {"since", Since}, {"feed", Feed} ], - QS = case couch_util:get_value(<<"filter">>, PostProps) of + {QS, Method, Body, Headers} = case get_value(<<"doc_ids">>, PostProps) of undefined -> - BaseQS; - FilterName -> - {Params} = couch_util:get_value(<<"query_params">>, PostProps, {[]}), - lists:foldr( - fun({K, V}, QSAcc) -> - Ks = couch_util:to_list(K), - case proplists:is_defined(Ks, QSAcc) of - true -> - QSAcc; - false -> - [{Ks, V} | QSAcc] - end - end, - [{"filter", FilterName} | BaseQS], - Params - ) + {maybe_add_filter_qs_params(PostProps, BaseQS), get, nil, Headers0}; + DocIds when is_list(DocIds) -> + Headers1 = [{"Content-Type", "application/json"} | Headers0], + QS1 = [{"filter", ?l2b(?DOC_IDS_FILTER_NAME)} | BaseQS], + {QS1, post, {[{<<"doc_ids">>, DocIds}]}, Headers1} end, Pid = couch_rep_httpc:spawn_link_worker_process(Source), Req = Source#http_db{ + method = Method, + body = Body, resource = "_changes", qs = QS, conn = Pid, options = [{stream_to, {self(), once}}] ++ lists:keydelete(inactivity_timeout, 1, Source#http_db.options), - headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] + headers = Headers -- [{"Accept-Encoding", "gzip"}] }, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req), Args = [Parent, Req, Since, PostProps], @@ -123,11 +120,17 @@ init([Parent, #http_db{}=Source, Since, PostProps]) -> init([_Parent, Source, Since, PostProps] = InitArgs) -> process_flag(trap_exit, true), Server = self(), + Filter = case get_value(<<"doc_ids">>, PostProps) of + undefined -> + ?b2l(get_value(<<"filter">>, PostProps, <<>>)); + DocIds when is_list(DocIds) -> + ?DOC_IDS_FILTER_NAME + end, ChangesArgs = #changes_args{ style = all_docs, since = Since, - filter = ?b2l(couch_util:get_value(<<"filter">>, PostProps, <<>>)), - feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of + filter = Filter, + feed = case get_value(<<"continuous">>, PostProps, false) of true -> "continuous"; false -> @@ -138,7 +141,7 @@ init([_Parent, Source, Since, PostProps] = InitArgs) -> ChangesPid = spawn_link(fun() -> ChangesFeedFun = couch_changes:handle_changes( ChangesArgs, - {json_req, filter_json_req(Source, PostProps)}, + {json_req, filter_json_req(Filter, Source, PostProps)}, Source ), ChangesFeedFun(fun({change, Change, _}, _) -> @@ -149,29 +152,49 @@ init([_Parent, Source, Since, PostProps] = InitArgs) -> end), {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}. -filter_json_req(Db, PostProps) -> - case couch_util:get_value(<<"filter">>, PostProps) of +maybe_add_filter_qs_params(PostProps, BaseQS) -> + case get_value(<<"filter">>, PostProps) of undefined -> - {[]}; + BaseQS; FilterName -> - {Query} = couch_util:get_value(<<"query_params">>, PostProps, {[]}), - {ok, Info} = couch_db:get_db_info(Db), - % simulate a request to db_name/_changes - {[ - {<<"info">>, {Info}}, - {<<"id">>, null}, - {<<"method">>, 'GET'}, - {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, - {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, - {<<"headers">>, []}, - {<<"body">>, []}, - {<<"peer">>, <<"replicator">>}, - {<<"form">>, []}, - {<<"cookie">>, []}, - {<<"userCtx">>, couch_util:json_user_ctx(Db)} - ]} + {Params} = get_value(<<"query_params">>, PostProps, {[]}), + lists:foldr( + fun({K, V}, QSAcc) -> + Ks = couch_util:to_list(K), + case proplists:is_defined(Ks, QSAcc) of + true -> + QSAcc; + false -> + [{Ks, V} | QSAcc] + end + end, + [{"filter", FilterName} | BaseQS], + Params + ) end. +filter_json_req([], _Db, _PostProps) -> + {[]}; +filter_json_req(?DOC_IDS_FILTER_NAME, _Db, PostProps) -> + {[{<<"doc_ids">>, get_value(<<"doc_ids">>, PostProps)}]}; +filter_json_req(FilterName, Db, PostProps) -> + {Query} = get_value(<<"query_params">>, PostProps, {[]}), + {ok, Info} = couch_db:get_db_info(Db), + % simulate a request to db_name/_changes + {[ + {<<"info">>, {Info}}, + {<<"id">>, null}, + {<<"method">>, 'GET'}, + {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, + {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, + {<<"headers">>, []}, + {<<"body">>, []}, + {<<"peer">>, <<"replicator">>}, + {<<"form">>, []}, + {<<"cookie">>, []}, + {<<"userCtx">>, couch_util:json_user_ctx(Db)} + ]}. + handle_call({add_change, Row}, From, State) -> handle_add_change(Row, From, State); diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 4f81c8e4..bdef3dfc 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -43,15 +43,13 @@ opened_seqs = [] }). -start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) -> - gen_server:start_link( - ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], [] - ). +start_link(Parent, Source, MissingRevs, PostProps) -> + gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []). next(Pid) -> gen_server:call(Pid, next_docs, infinity). -init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> +init([Parent, Source, MissingRevs, _PostProps]) -> process_flag(trap_exit, true), if is_record(Source, http_db) -> #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), @@ -59,15 +57,7 @@ init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); true -> ok end, Self = self(), - ReaderLoop = spawn_link( - fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end - ), - MissingRevs = case MissingRevs_or_DocIds of - Pid when is_pid(Pid) -> - Pid; - _ListDocIds -> - nil - end, + ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end), State = #state{ parent = Parent, source = Source, @@ -183,8 +173,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) -> handle_reader_loop_complete(State) -> {noreply, State#state{complete = waiting_on_monitors}}. -calculate_new_high_seq(#state{missing_revs=nil}) -> - nil; calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) -> Open; calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) @@ -209,8 +197,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> % opened seqs greater than the smallest outstanding request. I believe its the % minimal set of info needed to correctly calculate which seqs have been % replicated (because remote docs can be opened out-of-order) -- APK -update_sequence_lists(_Seq, #state{missing_revs=nil} = State) -> - State; update_sequence_lists(Seq, State) -> Requested = lists:delete(Seq, State#state.requested_seqs), AllOpened = lists:merge([Seq], State#state.opened_seqs), @@ -261,44 +247,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> end, lists:reverse(lists:foldl(Transform, [], JsonResults)). -open_doc(#http_db{url = Url} = DbS, DocId) -> - % get latest rev of the doc - Req = DbS#http_db{ - resource=encode_doc_id(DocId), - qs=[{att_encoding_info, true}] - }, - {Props} = Json = couch_rep_httpc:request(Req), - case couch_util:get_value(<<"_id">>, Props) of - Id when is_binary(Id) -> - #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), - [Doc#doc{ - atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] - }]; - undefined -> - Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)), - ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", - [DocId, couch_util:url_strip_password(Url), Err]), - [] - end. - -reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> - case Source of - #http_db{} -> - [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, - infinity) || Id <- DocIds]; - _LocalDb -> - Docs = lists:foldr(fun(Id, Acc) -> - case couch_db:open_doc(Source, Id) of - {ok, Doc} -> - [Doc | Acc]; - _ -> - Acc - end - end, [], DocIds), - gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity) - end, - exit(complete); - reader_loop(ReaderServer, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> @@ -332,8 +280,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> maybe_reopen_db(Db, _HighSeq) -> Db. -spawn_document_request(Source, Id, nil, nil) -> - spawn_document_request(Source, Id); spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> @@ -341,11 +287,3 @@ spawn_document_request(Source, Id, Seq, Revs) -> gen_server:call(Server, {add_docs, Seq, Results}, infinity) end, spawn_monitor(SpawnFun). - -spawn_document_request(Source, Id) -> - Server = self(), - SpawnFun = fun() -> - Results = open_doc(Source, Id), - gen_server:call(Server, {add_docs, nil, Results}, infinity) - end, - spawn_monitor(SpawnFun). diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index f7bc9a72..622bfb27 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -21,8 +21,6 @@ start_link(Parent, Target, Reader, _PostProps) -> writer_loop(Parent, Reader, Target) -> case couch_rep_reader:next(Reader) of - {complete, nil} -> - ok; {complete, FinalSeq} -> Parent ! {writer_checkpoint, FinalSeq}, ok; @@ -40,12 +38,7 @@ writer_loop(Parent, Reader, Target) -> ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), exit({attachment_request_failed, Err, Docs}) end, - case HighSeq of - nil -> - ok; - _SeqNumber -> - Parent ! {writer_checkpoint, HighSeq} - end, + Parent ! {writer_checkpoint, HighSeq}, couch_rep_att:cleanup(), couch_util:should_flush(), writer_loop(Parent, Reader, Target) |