summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep_reader.erl')
-rw-r--r--src/couchdb/couch_rep_reader.erl70
1 files changed, 4 insertions, 66 deletions
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index 4f81c8e4..bdef3dfc 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -43,15 +43,13 @@
opened_seqs = []
}).
-start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
- gen_server:start_link(
- ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
- ).
+start_link(Parent, Source, MissingRevs, PostProps) ->
+ gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
next(Pid) ->
gen_server:call(Pid, next_docs, infinity).
-init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
+init([Parent, Source, MissingRevs, _PostProps]) ->
process_flag(trap_exit, true),
if is_record(Source, http_db) ->
#url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
@@ -59,15 +57,7 @@ init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
true -> ok end,
Self = self(),
- ReaderLoop = spawn_link(
- fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
- ),
- MissingRevs = case MissingRevs_or_DocIds of
- Pid when is_pid(Pid) ->
- Pid;
- _ListDocIds ->
- nil
- end,
+ ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
State = #state{
parent = Parent,
source = Source,
@@ -183,8 +173,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) ->
handle_reader_loop_complete(State) ->
{noreply, State#state{complete = waiting_on_monitors}}.
-calculate_new_high_seq(#state{missing_revs=nil}) ->
- nil;
calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
Open;
calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -209,8 +197,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
% opened seqs greater than the smallest outstanding request. I believe its the
% minimal set of info needed to correctly calculate which seqs have been
% replicated (because remote docs can be opened out-of-order) -- APK
-update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
- State;
update_sequence_lists(Seq, State) ->
Requested = lists:delete(Seq, State#state.requested_seqs),
AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -261,44 +247,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
end,
lists:reverse(lists:foldl(Transform, [], JsonResults)).
-open_doc(#http_db{url = Url} = DbS, DocId) ->
- % get latest rev of the doc
- Req = DbS#http_db{
- resource=encode_doc_id(DocId),
- qs=[{att_encoding_info, true}]
- },
- {Props} = Json = couch_rep_httpc:request(Req),
- case couch_util:get_value(<<"_id">>, Props) of
- Id when is_binary(Id) ->
- #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
- [Doc#doc{
- atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
- }];
- undefined ->
- Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)),
- ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
- [DocId, couch_util:url_strip_password(Url), Err]),
- []
- end.
-
-reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
- case Source of
- #http_db{} ->
- [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
- infinity) || Id <- DocIds];
- _LocalDb ->
- Docs = lists:foldr(fun(Id, Acc) ->
- case couch_db:open_doc(Source, Id) of
- {ok, Doc} ->
- [Doc | Acc];
- _ ->
- Acc
- end
- end, [], DocIds),
- gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
- end,
- exit(complete);
-
reader_loop(ReaderServer, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
@@ -332,8 +280,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
maybe_reopen_db(Db, _HighSeq) ->
Db.
-spawn_document_request(Source, Id, nil, nil) ->
- spawn_document_request(Source, Id);
spawn_document_request(Source, Id, Seq, Revs) ->
Server = self(),
SpawnFun = fun() ->
@@ -341,11 +287,3 @@ spawn_document_request(Source, Id, Seq, Revs) ->
gen_server:call(Server, {add_docs, Seq, Results}, infinity)
end,
spawn_monitor(SpawnFun).
-
-spawn_document_request(Source, Id) ->
- Server = self(),
- SpawnFun = fun() ->
- Results = open_doc(Source, Id),
- gen_server:call(Server, {add_docs, nil, Results}, infinity)
- end,
- spawn_monitor(SpawnFun).