summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl82
1 files changed, 68 insertions, 14 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 7b318332..75242ca0 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -17,7 +17,12 @@
-export([replicate/2]).
+-define(BUFFER_NDOCS, 1000).
+-define(BUFFER_NATTACHMENTS, 50).
+-define(BUFFER_MEMORY, 10000000). %% bytes
+
-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
%% @spec replicate(Source::binary(), Target::binary()) ->
%% {ok, Stats} | {error, Reason}
@@ -202,7 +207,8 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
ets:update_counter(Stats, docs_read, length(Docs)),
%% save them (maybe in a buffer)
- {NewBuffer, NewContext} = case couch_util:should_flush() of
+ {NewBuffer, NewContext} =
+ case should_flush(lists:flatlength([Docs|Buffer])) of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
{ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
@@ -222,7 +228,7 @@ handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State)
ets:update_counter(State#state.stats, total_revs, RevsCount),
case State#state.listeners of
[] ->
- % still waiting for the first listener to sen a request
+ % still waiting for the first listener to send a request
{noreply, State#state{current_seq=LastSeq,done=true}};
_ ->
{stop, normal, ok, State#state{current_seq=LastSeq}}
@@ -327,13 +333,13 @@ dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
[Id, couch_doc:rev_to_str(Rev), Error]),
dump_update_errors(Rest).
-attachment_loop(ReqId) ->
+attachment_loop(ReqId, Conn) ->
couch_util:should_flush(),
receive
{From, {set_req_id, NewId}} ->
%% we learn the ReqId to listen for
From ! {self(), {ok, NewId}},
- attachment_loop(NewId);
+ attachment_loop(NewId, Conn);
{ibrowse_async_headers, ReqId, Status, Headers} ->
%% we got header, give the controlling process a chance to react
receive
@@ -343,37 +349,42 @@ attachment_loop(ReqId) ->
receive
{From, continue} ->
%% normal case
- attachment_loop(ReqId);
+ attachment_loop(ReqId, Conn);
{From, fail} ->
%% error, failure code
?LOG_ERROR(
"streaming attachment failed with status ~p",
[Status]),
+ catch ibrowse:stop_worker_process(Conn),
exit(attachment_request_failed);
{From, stop_ok} ->
%% stop looping, controller will start a new loop
+ catch ibrowse:stop_worker_process(Conn),
stop_ok
end
end,
- attachment_loop(ReqId);
+ attachment_loop(ReqId, Conn);
{ibrowse_async_response, ReqId, {chunk_start,_}} ->
- attachment_loop(ReqId);
+ attachment_loop(ReqId, Conn);
{ibrowse_async_response, ReqId, chunk_end} ->
- attachment_loop(ReqId);
+ attachment_loop(ReqId, Conn);
{ibrowse_async_response, ReqId, {error, Err}} ->
?LOG_ERROR("streaming attachment failed with ~p", [Err]),
+ catch ibrowse:stop_worker_process(Conn),
exit(attachment_request_failed);
{ibrowse_async_response, ReqId, Data} ->
receive {From, gimme_data} -> From ! {self(), Data} end,
- attachment_loop(ReqId);
- {ibrowse_async_response_end, ReqId} -> ok
+ attachment_loop(ReqId, Conn);
+ {ibrowse_async_response_end, ReqId} ->
+ catch ibrowse:stop_worker_process(Conn),
+ exit(normal)
end.
attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) ->
#http_db{uri=DbUrl, headers=Headers} = DbS,
{Pos, [RevId|_]} = Rev,
Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(?b2l(Name)),
- "?rev=", couch_doc:rev_to_str({Pos,RevId})]),
+ "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
?LOG_DEBUG("Attachment URL ~p", [Url]),
{ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
Type, Length),
@@ -389,11 +400,14 @@ make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0) ->
make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) ->
%% start the process that receives attachment data from ibrowse
- Pid = spawn_link(fun() -> attachment_loop(nil) end),
+ #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+ {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
+ Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
%% make the async request
- Options = [{stream_to, Pid}, {response_format, binary}],
- ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of
+ Opts = [{stream_to, Pid}, {response_format, binary}],
+ ReqId =
+ case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
{ibrowse_req_id, X} -> X;
{error, _Reason} -> exit(attachment_request_failed)
end,
@@ -717,6 +731,46 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0,
open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
+%% @spec should_flush() -> true | false
+%% @doc Calculates whether it's time to flush the document buffer. Considers
+%% - memory utilization
+%% - number of pending document writes
+%% - approximate number of pending attachment writes
+should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
+ true;
+should_flush(_DocCount) ->
+ MeAndMyLinks = [self()|element(2,process_info(self(),links))],
+
+ case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
+ true -> true;
+ false ->
+ case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
+ true ->
+ [garbage_collect(Pid) || Pid <- MeAndMyLinks],
+ memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
+ false -> false
+ end
+ end.
+
+%% @spec memory_footprint([pid()]) -> integer()
+%% @doc Sum of process and binary memory utilization for all processes in list
+memory_footprint(PidList) ->
+ ProcessMemory = lists:foldl(fun(Pid, Acc) ->
+ Acc + element(2,process_info(Pid, memory))
+ end, 0, PidList),
+
+ BinaryMemory = lists:foldl(fun(Pid, Acc) ->
+ Acc + binary_memory(Pid)
+ end, 0, PidList),
+
+ ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
+ ProcessMemory + BinaryMemory.
+
+%% @spec binary_memory(pid()) -> integer()
+%% @doc Memory utilization of all binaries referenced by this process.
+binary_memory(Pid) ->
+ lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(Pid, binary))).
update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
[] = Options,