diff options
Diffstat (limited to 'apps/couch/src/couch_rep_reader.erl')
-rw-r--r-- | apps/couch/src/couch_rep_reader.erl | 80 |
1 files changed, 9 insertions, 71 deletions
diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index a7ae45a8..0d344e5c 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -17,7 +17,7 @@ -export([start_link/4, next/1]). --import(couch_util, [url_encode/1]). +-import(couch_util, [encode_doc_id/1]). -define (BUFFER_SIZE, 1000). -define (MAX_CONCURRENT_REQUESTS, 100). @@ -40,26 +40,17 @@ opened_seqs = [] }). -start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) -> - gen_server:start_link( - ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], [] - ). +start_link(Parent, Source, MissingRevs, PostProps) -> + gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []). next(Pid) -> gen_server:call(Pid, next_docs, infinity). -init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> +init([Parent, Source, MissingRevs, _PostProps]) -> process_flag(trap_exit, true), Self = self(), ReaderLoop = spawn_link( - fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end - ), - MissingRevs = case MissingRevs_or_DocIds of - Pid when is_pid(Pid) -> - Pid; - _ListDocIds -> - nil - end, + fun() -> reader_loop(Self, Parent, Source, MissingRevs) end), State = #state{ parent = Parent, source = Source, @@ -175,8 +166,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) -> handle_reader_loop_complete(State) -> {noreply, State#state{complete = waiting_on_monitors}}. -calculate_new_high_seq(#state{missing_revs=nil}) -> - nil; calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) -> Open; calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) @@ -188,9 +177,9 @@ calculate_new_high_seq(State) -> hd(State#state.opened_seqs). split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> - case Length+size(Rev) > 8192 of + case Length+size(Rev)+3 > 8192 of false -> - {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)}; + {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)+3}; true -> {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength} end. @@ -201,8 +190,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> % opened seqs greater than the smallest outstanding request. I believe its the % minimal set of info needed to correctly calculate which seqs have been % replicated (because remote docs can be opened out-of-order) -- APK -update_sequence_lists(_Seq, #state{missing_revs=nil} = State) -> - State; update_sequence_lists(Seq, State) -> Requested = lists:delete(Seq, State#state.requested_seqs), AllOpened = lists:merge([Seq], State#state.opened_seqs), @@ -226,8 +213,8 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}], - BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS}, - BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs= + BaseReq = DbS#http_db{resource=encode_doc_id(DocId), qs=BaseQS}, + BaseLength = length(couch_rep_httpc:full_url(BaseReq) ++ "&open_revs=[]"), {RevLists, _, _} = lists:foldl(fun split_revlist/2, {[[]], BaseLength, BaseLength}, couch_doc:revs_to_strs(Revs)), @@ -253,45 +240,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> end, lists:reverse(lists:foldl(Transform, [], JsonResults)). -open_doc(#http_db{url = Url} = DbS, DocId) -> - % get latest rev of the doc - Req = DbS#http_db{ - resource=url_encode(DocId), - qs=[{att_encoding_info, true}] - }, - {Props} = Json = couch_rep_httpc:request(Req), - case couch_util:get_value(<<"_id">>, Props) of - Id when is_binary(Id) -> - #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), - [Doc#doc{ - atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] - }]; - undefined -> - Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)), - ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", - [DocId, couch_util:url_strip_password(Url), Err]), - [] - end. - -reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) -> - case Source1 of - #http_db{} -> - [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, - infinity) || Id <- DocIds]; - _LocalDb -> - {ok, Source} = gen_server:call(Parent, get_source_db, infinity), - Docs = lists:foldr(fun(Id, Acc) -> - case couch_db:open_doc(Source, Id) of - {ok, Doc} -> - [Doc | Acc]; - _ -> - Acc - end - end, [], DocIds), - gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity) - end, - exit(complete); - reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> @@ -326,8 +274,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> maybe_reopen_db(Db, _HighSeq) -> Db. -spawn_document_request(Source, Id, nil, nil) -> - spawn_document_request(Source, Id); spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> @@ -335,11 +281,3 @@ spawn_document_request(Source, Id, Seq, Revs) -> gen_server:call(Server, {add_docs, Seq, Results}, infinity) end, spawn_monitor(SpawnFun). - -spawn_document_request(Source, Id) -> - Server = self(), - SpawnFun = fun() -> - Results = open_doc(Source, Id), - gen_server:call(Server, {add_docs, nil, Results}, infinity) - end, - spawn_monitor(SpawnFun). |