summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_reader.erl
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2010-01-29 20:08:54 +0000
committerJohn Christopher Anderson <jchris@apache.org>2010-01-29 20:08:54 +0000
commit0fc0c2d630abe0f4b6cc37c7f92728d1fe156ff3 (patch)
tree4a7727a9db0c07676d8ba44fdab09291bf928da7 /src/couchdb/couch_rep_reader.erl
parentadb2703aa1f7b13d30e033a8b47bc625f8c492cc (diff)
Thanks Filipe Manana. Closes COUCHDB-631.
Replicator option to replicate a list of docids (bypasses by_seq index). Usage: POST to /_replicate with a JSON body of: {"source": "myfoo", "target" : "http://remotedb.com/theirfoo", "doc_ids": ["foo1", "foo3", "foo666"]} This will copy the listed docs from the source to the target database. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@904615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_reader.erl')
-rw-r--r--src/couchdb/couch_rep_reader.erl63
1 files changed, 59 insertions, 4 deletions
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index a66454c8..b8326fb3 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -43,13 +43,15 @@
opened_seqs = []
}).
-start_link(Parent, Source, MissingRevs, PostProps) ->
- gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
+start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
+ gen_server:start_link(
+ ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
+ ).
next(Pid) ->
gen_server:call(Pid, next_docs, infinity).
-init([Parent, Source, MissingRevs, _PostProps]) ->
+init([Parent, Source, MissingRevs_or_DocIds, PostProps]) ->
process_flag(trap_exit, true),
if is_record(Source, http_db) ->
#url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
@@ -57,7 +59,15 @@ init([Parent, Source, MissingRevs, _PostProps]) ->
ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
true -> ok end,
Self = self(),
- ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+ ReaderLoop = spawn_link(
+ fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
+ ),
+ MissingRevs = case MissingRevs_or_DocIds of
+ Pid when is_pid(Pid) ->
+ Pid;
+ _ListDocIds ->
+ nil
+ end,
State = #state{
parent = Parent,
source = Source,
@@ -167,6 +177,8 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) ->
handle_reader_loop_complete(State) ->
{noreply, State#state{complete = waiting_on_monitors}}.
+calculate_new_high_seq(#state{missing_revs=nil}) ->
+ nil;
calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
Open;
calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -191,6 +203,8 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
% opened seqs greater than the smallest outstanding request. I believe its the
% minimal set of info needed to correctly calculate which seqs have been
% replicated (because remote docs can be opened out-of-order) -- APK
+update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
+ State;
update_sequence_lists(Seq, State) ->
Requested = lists:delete(Seq, State#state.requested_seqs),
AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -234,6 +248,37 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
end,
[Transform(Result) || Result <- JsonResults].
+open_doc(#http_db{} = DbS, DocId) ->
+ % get latest rev of the doc
+ Req = DbS#http_db{resource=url_encode(DocId)},
+ case couch_rep_httpc:request(Req) of
+ {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
+ [];
+ Json ->
+ #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
+ [Doc#doc{
+ atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
+ }]
+ end.
+
+reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
+ case Source of
+ #http_db{} ->
+ [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
+ infinity) || Id <- DocIds];
+ _LocalDb ->
+ Docs = lists:foldr(fun(Id, Acc) ->
+ case couch_db:open_doc(Source, Id) of
+ {ok, Doc} ->
+ [Doc | Acc];
+ _ ->
+ Acc
+ end
+ end, [], DocIds),
+ gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
+ end,
+ exit(complete);
+
reader_loop(ReaderServer, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
@@ -267,6 +312,8 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
maybe_reopen_db(Db, _HighSeq) ->
Db.
+spawn_document_request(Source, Id, nil, nil) ->
+ spawn_document_request(Source, Id);
spawn_document_request(Source, Id, Seq, Revs) ->
Server = self(),
SpawnFun = fun() ->
@@ -274,3 +321,11 @@ spawn_document_request(Source, Id, Seq, Revs) ->
gen_server:call(Server, {add_docs, Seq, Results}, infinity)
end,
spawn_monitor(SpawnFun).
+
+spawn_document_request(Source, Id) ->
+ Server = self(),
+ SpawnFun = fun() ->
+ Results = open_doc(Source, Id),
+ gen_server:call(Server, {add_docs, nil, Results}, infinity)
+ end,
+ spawn_monitor(SpawnFun).