From 0fc0c2d630abe0f4b6cc37c7f92728d1fe156ff3 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Fri, 29 Jan 2010 20:08:54 +0000 Subject: Thanks Filipe Manana. Closes COUCHDB-631. Replicator option to replicate a list of docids (bypasses by_seq index). Usage: POST to /_replicate with a JSON body of: {"source": "myfoo", "target" : "http://remotedb.com/theirfoo", "doc_ids": ["foo1", "foo3", "foo666"]} This will copy the listed docs from the source to the target database. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@904615 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_reader.erl | 63 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) (limited to 'src/couchdb/couch_rep_reader.erl') diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index a66454c8..b8326fb3 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -43,13 +43,15 @@ opened_seqs = [] }). -start_link(Parent, Source, MissingRevs, PostProps) -> - gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []). +start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) -> + gen_server:start_link( + ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], [] + ). next(Pid) -> gen_server:call(Pid, next_docs, infinity). -init([Parent, Source, MissingRevs, _PostProps]) -> +init([Parent, Source, MissingRevs_or_DocIds, PostProps]) -> process_flag(trap_exit, true), if is_record(Source, http_db) -> #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), @@ -57,7 +59,15 @@ init([Parent, Source, MissingRevs, _PostProps]) -> ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); true -> ok end, Self = self(), - ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end), + ReaderLoop = spawn_link( + fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end + ), + MissingRevs = case MissingRevs_or_DocIds of + Pid when is_pid(Pid) -> + Pid; + _ListDocIds -> + nil + end, State = #state{ parent = Parent, source = Source, @@ -167,6 +177,8 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) -> handle_reader_loop_complete(State) -> {noreply, State#state{complete = waiting_on_monitors}}. +calculate_new_high_seq(#state{missing_revs=nil}) -> + nil; calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) -> Open; calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) @@ -191,6 +203,8 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> % opened seqs greater than the smallest outstanding request. I believe its the % minimal set of info needed to correctly calculate which seqs have been % replicated (because remote docs can be opened out-of-order) -- APK +update_sequence_lists(_Seq, #state{missing_revs=nil} = State) -> + State; update_sequence_lists(Seq, State) -> Requested = lists:delete(Seq, State#state.requested_seqs), AllOpened = lists:merge([Seq], State#state.opened_seqs), @@ -234,6 +248,37 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> end, [Transform(Result) || Result <- JsonResults]. +open_doc(#http_db{} = DbS, DocId) -> + % get latest rev of the doc + Req = DbS#http_db{resource=url_encode(DocId)}, + case couch_rep_httpc:request(Req) of + {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} -> + []; + Json -> + #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), + [Doc#doc{ + atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] + }] + end. + +reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> + case Source of + #http_db{} -> + [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, + infinity) || Id <- DocIds]; + _LocalDb -> + Docs = lists:foldr(fun(Id, Acc) -> + case couch_db:open_doc(Source, Id) of + {ok, Doc} -> + [Doc | Acc]; + _ -> + Acc + end + end, [], DocIds), + gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity) + end, + exit(complete); + reader_loop(ReaderServer, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> @@ -267,6 +312,8 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> maybe_reopen_db(Db, _HighSeq) -> Db. +spawn_document_request(Source, Id, nil, nil) -> + spawn_document_request(Source, Id); spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> @@ -274,3 +321,11 @@ spawn_document_request(Source, Id, Seq, Revs) -> gen_server:call(Server, {add_docs, Seq, Results}, infinity) end, spawn_monitor(SpawnFun). + +spawn_document_request(Source, Id) -> + Server = self(), + SpawnFun = fun() -> + Results = open_doc(Source, Id), + gen_server:call(Server, {add_docs, nil, Results}, infinity) + end, + spawn_monitor(SpawnFun). -- cgit v1.2.3