summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_rep_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_rep_reader.erl')
-rw-r--r--apps/couch/src/couch_rep_reader.erl80
1 files changed, 9 insertions, 71 deletions
diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl
index a7ae45a8..0d344e5c 100644
--- a/apps/couch/src/couch_rep_reader.erl
+++ b/apps/couch/src/couch_rep_reader.erl
@@ -17,7 +17,7 @@
-export([start_link/4, next/1]).
--import(couch_util, [url_encode/1]).
+-import(couch_util, [encode_doc_id/1]).
-define (BUFFER_SIZE, 1000).
-define (MAX_CONCURRENT_REQUESTS, 100).
@@ -40,26 +40,17 @@
opened_seqs = []
}).
-start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
- gen_server:start_link(
- ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
- ).
+start_link(Parent, Source, MissingRevs, PostProps) ->
+ gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
next(Pid) ->
gen_server:call(Pid, next_docs, infinity).
-init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
+init([Parent, Source, MissingRevs, _PostProps]) ->
process_flag(trap_exit, true),
Self = self(),
ReaderLoop = spawn_link(
- fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end
- ),
- MissingRevs = case MissingRevs_or_DocIds of
- Pid when is_pid(Pid) ->
- Pid;
- _ListDocIds ->
- nil
- end,
+ fun() -> reader_loop(Self, Parent, Source, MissingRevs) end),
State = #state{
parent = Parent,
source = Source,
@@ -175,8 +166,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) ->
handle_reader_loop_complete(State) ->
{noreply, State#state{complete = waiting_on_monitors}}.
-calculate_new_high_seq(#state{missing_revs=nil}) ->
- nil;
calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
Open;
calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -188,9 +177,9 @@ calculate_new_high_seq(State) ->
hd(State#state.opened_seqs).
split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
- case Length+size(Rev) > 8192 of
+ case Length+size(Rev)+3 > 8192 of
false ->
- {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)};
+ {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)+3};
true ->
{[[Rev],CurrentAcc|Rest], BaseLength, BaseLength}
end.
@@ -201,8 +190,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
% opened seqs greater than the smallest outstanding request. I believe its the
% minimal set of info needed to correctly calculate which seqs have been
% replicated (because remote docs can be opened out-of-order) -- APK
-update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
- State;
update_sequence_lists(Seq, State) ->
Requested = lists:delete(Seq, State#state.requested_seqs),
AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -226,8 +213,8 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
%% all this logic just splits up revision lists that are too long for
%% MochiWeb into multiple requests
BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}],
- BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS},
- BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
+ BaseReq = DbS#http_db{resource=encode_doc_id(DocId), qs=BaseQS},
+ BaseLength = length(couch_rep_httpc:full_url(BaseReq) ++ "&open_revs=[]"),
{RevLists, _, _} = lists:foldl(fun split_revlist/2,
{[[]], BaseLength, BaseLength}, couch_doc:revs_to_strs(Revs)),
@@ -253,45 +240,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
end,
lists:reverse(lists:foldl(Transform, [], JsonResults)).
-open_doc(#http_db{url = Url} = DbS, DocId) ->
- % get latest rev of the doc
- Req = DbS#http_db{
- resource=url_encode(DocId),
- qs=[{att_encoding_info, true}]
- },
- {Props} = Json = couch_rep_httpc:request(Req),
- case couch_util:get_value(<<"_id">>, Props) of
- Id when is_binary(Id) ->
- #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
- [Doc#doc{
- atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
- }];
- undefined ->
- Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)),
- ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
- [DocId, couch_util:url_strip_password(Url), Err]),
- []
- end.
-
-reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) ->
- case Source1 of
- #http_db{} ->
- [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
- infinity) || Id <- DocIds];
- _LocalDb ->
- {ok, Source} = gen_server:call(Parent, get_source_db, infinity),
- Docs = lists:foldr(fun(Id, Acc) ->
- case couch_db:open_doc(Source, Id) of
- {ok, Doc} ->
- [Doc | Acc];
- _ ->
- Acc
- end
- end, [], DocIds),
- gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
- end,
- exit(complete);
-
reader_loop(ReaderServer, Parent, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
@@ -326,8 +274,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
maybe_reopen_db(Db, _HighSeq) ->
Db.
-spawn_document_request(Source, Id, nil, nil) ->
- spawn_document_request(Source, Id);
spawn_document_request(Source, Id, Seq, Revs) ->
Server = self(),
SpawnFun = fun() ->
@@ -335,11 +281,3 @@ spawn_document_request(Source, Id, Seq, Revs) ->
gen_server:call(Server, {add_docs, Seq, Results}, infinity)
end,
spawn_monitor(SpawnFun).
-
-spawn_document_request(Source, Id) ->
- Server = self(),
- SpawnFun = fun() ->
- Results = open_doc(Source, Id),
- gen_server:call(Server, {add_docs, nil, Results}, infinity)
- end,
- spawn_monitor(SpawnFun).