diff options
Diffstat (limited to 'apps/couch/src/couch_rep_reader.erl')
-rw-r--r-- | apps/couch/src/couch_rep_reader.erl | 63 |
1 files changed, 34 insertions, 29 deletions
diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index 46633994..a7ae45a8 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -20,12 +20,9 @@ -import(couch_util, [url_encode/1]). -define (BUFFER_SIZE, 1000). --define (MAX_CONCURRENT_REQUESTS, 10). --define (MAX_CONNECTIONS, 20). --define (MAX_PIPELINE_SIZE, 50). +-define (MAX_CONCURRENT_REQUESTS, 100). -include("couch_db.hrl"). --include_lib("ibrowse/include/ibrowse.hrl"). -record (state, { parent, @@ -53,14 +50,9 @@ next(Pid) -> init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> process_flag(trap_exit, true), - if is_record(Source, http_db) -> - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), - ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS), - ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); - true -> ok end, Self = self(), ReaderLoop = spawn_link( - fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end + fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end ), MissingRevs = case MissingRevs_or_DocIds of Pid when is_pid(Pid) -> @@ -230,7 +222,7 @@ update_sequence_lists(Seq, State) -> opened_seqs = Opened }. -open_doc_revs(#http_db{} = DbS, DocId, Revs) -> +open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}], @@ -246,36 +238,48 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]), Transform = - fun({[{<<"missing">>, Rev}]}) -> - {{not_found, missing}, couch_doc:parse_rev(Rev)}; - ({[{<<"ok">>, Json}]}) -> + fun({[{<<"ok">>, Json}]}, Acc) -> #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), - Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]} + Doc1 = Doc#doc{ + atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] + }, + [Doc1 | Acc]; + ({ErrorProps}, Acc) -> + Err = couch_util:get_value(<<"error">>, ErrorProps, + ?JSON_ENCODE({ErrorProps})), + ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", + [DocId, couch_util:url_strip_password(Url), Err]), + Acc end, - [Transform(Result) || Result <- JsonResults]. + lists:reverse(lists:foldl(Transform, [], JsonResults)). -open_doc(#http_db{} = DbS, DocId) -> +open_doc(#http_db{url = Url} = DbS, DocId) -> % get latest rev of the doc Req = DbS#http_db{ resource=url_encode(DocId), qs=[{att_encoding_info, true}] }, - case couch_rep_httpc:request(Req) of - {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} -> - []; - Json -> + {Props} = Json = couch_rep_httpc:request(Req), + case couch_util:get_value(<<"_id">>, Props) of + Id when is_binary(Id) -> #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), [Doc#doc{ atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] - }] + }]; + undefined -> + Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)), + ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", + [DocId, couch_util:url_strip_password(Url), Err]), + [] end. -reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> - case Source of +reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) -> + case Source1 of #http_db{} -> [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, infinity) || Id <- DocIds]; _LocalDb -> + {ok, Source} = gen_server:call(Parent, get_source_db, infinity), Docs = lists:foldr(fun(Id, Acc) -> case couch_db:open_doc(Source, Id) of {ok, Doc} -> @@ -288,7 +292,7 @@ reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> end, exit(complete); -reader_loop(ReaderServer, Source, MissingRevsServer) -> +reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> exit(complete); @@ -301,22 +305,23 @@ reader_loop(ReaderServer, Source, MissingRevsServer) -> #http_db{} -> [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs}, infinity) || {Id,Seq,Revs} <- SortedIdsRevs], - reader_loop(ReaderServer, Source, MissingRevsServer); + reader_loop(ReaderServer, Parent, Source, MissingRevsServer); _Local -> - Source2 = maybe_reopen_db(Source, HighSeq), + {ok, Source1} = gen_server:call(Parent, get_source_db, infinity), + Source2 = maybe_reopen_db(Source1, HighSeq), lists:foreach(fun({Id,Seq,Revs}) -> {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]), JustTheDocs = [Doc || {ok, Doc} <- Docs], gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs}, infinity) end, SortedIdsRevs), - reader_loop(ReaderServer, Source2, MissingRevsServer) + couch_db:close(Source2), + reader_loop(ReaderServer, Parent, Source2, MissingRevsServer) end end. maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), - couch_db:close(Db), NewDb; maybe_reopen_db(Db, _HighSeq) -> Db. |