diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2009-05-16 18:58:18 +0000 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2009-05-16 18:58:18 +0000 |
commit | 04bd244c1249fb3dc0581d97e8bbf4cdcd808d7c (patch) | |
tree | bec7ed6cd307ccf4b4bdca59eb435da5b3a6fd0a /src/couchdb | |
parent | e69ff9662344905fbac5d045259e791227772a18 (diff) |
replicator memory management and buffer flush calculation updates
* new should_flush fun considers ndocs, nattachments, memory in making decision
* memory utilized by attachment receivers is accounted for
* download attachments using standalone connections instead of conn pool. This
prevents a document request from getting stuck behind a huge attachment, which
would prevent us from triggering a buffer flush in time. We also consider the
memory utilization of the standalone ibrowse connection in should_flush
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@775507 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_rep.erl | 82 |
1 files changed, 68 insertions, 14 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 7b318332..75242ca0 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -17,7 +17,12 @@ -export([replicate/2]). +-define(BUFFER_NDOCS, 1000). +-define(BUFFER_NATTACHMENTS, 50). +-define(BUFFER_MEMORY, 10000000). %% bytes + -include("couch_db.hrl"). +-include("../ibrowse/ibrowse.hrl"). %% @spec replicate(Source::binary(), Target::binary()) -> %% {ok, Stats} | {error, Reason} @@ -202,7 +207,8 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ets:update_counter(Stats, docs_read, length(Docs)), %% save them (maybe in a buffer) - {NewBuffer, NewContext} = case couch_util:should_flush() of + {NewBuffer, NewContext} = + case should_flush(lists:flatlength([Docs|Buffer])) of true -> Docs2 = lists:flatten([Docs|Buffer]), {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes), @@ -222,7 +228,7 @@ handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ets:update_counter(State#state.stats, total_revs, RevsCount), case State#state.listeners of [] -> - % still waiting for the first listener to sen a request + % still waiting for the first listener to send a request {noreply, State#state{current_seq=LastSeq,done=true}}; _ -> {stop, normal, ok, State#state{current_seq=LastSeq}} @@ -327,13 +333,13 @@ dump_update_errors([{{Id, Rev}, Error}|Rest]) -> [Id, couch_doc:rev_to_str(Rev), Error]), dump_update_errors(Rest). -attachment_loop(ReqId) -> +attachment_loop(ReqId, Conn) -> couch_util:should_flush(), receive {From, {set_req_id, NewId}} -> %% we learn the ReqId to listen for From ! {self(), {ok, NewId}}, - attachment_loop(NewId); + attachment_loop(NewId, Conn); {ibrowse_async_headers, ReqId, Status, Headers} -> %% we got header, give the controlling process a chance to react receive @@ -343,37 +349,42 @@ attachment_loop(ReqId) -> receive {From, continue} -> %% normal case - attachment_loop(ReqId); + attachment_loop(ReqId, Conn); {From, fail} -> %% error, failure code ?LOG_ERROR( "streaming attachment failed with status ~p", [Status]), + catch ibrowse:stop_worker_process(Conn), exit(attachment_request_failed); {From, stop_ok} -> %% stop looping, controller will start a new loop + catch ibrowse:stop_worker_process(Conn), stop_ok end end, - attachment_loop(ReqId); + attachment_loop(ReqId, Conn); {ibrowse_async_response, ReqId, {chunk_start,_}} -> - attachment_loop(ReqId); + attachment_loop(ReqId, Conn); {ibrowse_async_response, ReqId, chunk_end} -> - attachment_loop(ReqId); + attachment_loop(ReqId, Conn); {ibrowse_async_response, ReqId, {error, Err}} -> ?LOG_ERROR("streaming attachment failed with ~p", [Err]), + catch ibrowse:stop_worker_process(Conn), exit(attachment_request_failed); {ibrowse_async_response, ReqId, Data} -> receive {From, gimme_data} -> From ! {self(), Data} end, - attachment_loop(ReqId); - {ibrowse_async_response_end, ReqId} -> ok + attachment_loop(ReqId, Conn); + {ibrowse_async_response_end, ReqId} -> + catch ibrowse:stop_worker_process(Conn), + exit(normal) end. attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) -> #http_db{uri=DbUrl, headers=Headers} = DbS, {Pos, [RevId|_]} = Rev, Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(?b2l(Name)), - "?rev=", couch_doc:rev_to_str({Pos,RevId})]), + "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]), ?LOG_DEBUG("Attachment URL ~p", [Url]), {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name, Type, Length), @@ -389,11 +400,14 @@ make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0) -> make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> %% start the process that receives attachment data from ibrowse - Pid = spawn_link(fun() -> attachment_loop(nil) end), + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), + {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), + Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end), %% make the async request - Options = [{stream_to, Pid}, {response_format, binary}], - ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of + Opts = [{stream_to, Pid}, {response_format, binary}], + ReqId = + case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of {ibrowse_req_id, X} -> X; {error, _Reason} -> exit(attachment_request_failed) end, @@ -717,6 +731,46 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0, open_doc_revs(Db, DocId, Revs, Options) -> couch_db:open_doc_revs(Db, DocId, Revs, Options). +%% @spec should_flush() -> true | false +%% @doc Calculates whether it's time to flush the document buffer. Considers +%% - memory utilization +%% - number of pending document writes +%% - approximate number of pending attachment writes +should_flush(DocCount) when DocCount > ?BUFFER_NDOCS -> + true; +should_flush(_DocCount) -> + MeAndMyLinks = [self()|element(2,process_info(self(),links))], + + case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of + true -> true; + false -> + case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of + true -> + [garbage_collect(Pid) || Pid <- MeAndMyLinks], + memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY; + false -> false + end + end. + +%% @spec memory_footprint([pid()]) -> integer() +%% @doc Sum of process and binary memory utilization for all processes in list +memory_footprint(PidList) -> + ProcessMemory = lists:foldl(fun(Pid, Acc) -> + Acc + element(2,process_info(Pid, memory)) + end, 0, PidList), + + BinaryMemory = lists:foldl(fun(Pid, Acc) -> + Acc + binary_memory(Pid) + end, 0, PidList), + + ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]), + ProcessMemory + BinaryMemory. + +%% @spec binary_memory(pid()) -> integer() +%% @doc Memory utilization of all binaries referenced by this process. +binary_memory(Pid) -> + lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end, + 0, element(2,process_info(Pid, binary))). update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) -> [] = Options, |