From 525187db0224d894493835bf8cfef956eec3aa3b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Tue, 11 Aug 2009 03:47:25 +0000 Subject: split into multiple requests so target doesn't buffer too much git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802986 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_writer.erl | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index d4f2fd31..ca702062 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -16,22 +16,29 @@ -include("couch_db.hrl"). +-define (MAX_BYTES, 10000000). + start_link(Parent, Target, Reader, _PostProps) -> {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. make_chunk(Data) when is_list(Data) -> make_chunk(list_to_binary(Data)); make_chunk(Data) -> - [ibrowse_lib:dec2hex(8, size(Data)), "\r\n", Data, "\r\n"]. + Size = size(Data), + {Size, [ibrowse_lib:dec2hex(8, Size), "\r\n", Data, "\r\n"]}. upload_docs({start, Docs}) -> - Data = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>), - {ok, Data, {continue, Docs, ""}}; -upload_docs({continue, [Doc|Rest], Prepend}) -> + {Size, Chunk} = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>), + {ok, Chunk, {continue, Docs, "", Size}}; +upload_docs({continue, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES -> + put(docs_remaining, Docs), + {ok, "2\r\n]}\r\n", last_chunk}; +upload_docs({continue, [Doc|Rest], Prepend, ByteCount}) -> JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])), - {ok, make_chunk([Prepend, JsonDoc]), {continue, Rest, ","}}; -upload_docs({continue, [], _}) -> - {ok, make_chunk(<<"]}\n">>), last_chunk}; + {Size, Chunk} = make_chunk([Prepend, JsonDoc]), + {ok, Chunk, {continue, Rest, ",", ByteCount+Size}}; +upload_docs({continue, [], _, _}) -> + {ok, "2\r\n]}\r\n", last_chunk}; upload_docs(last_chunk) -> {ok, "0\r\n\r\n", finish}; upload_docs(finish) -> @@ -44,7 +51,7 @@ writer_loop(Parent, Reader, Target) -> ok; {HighSeq, Docs} -> DocCount = length(Docs), - try write_docs(Target, Docs) of + try write_docs(Target, Docs, []) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; {ok, Errors} -> @@ -61,7 +68,7 @@ writer_loop(Parent, Reader, Target) -> writer_loop(Parent, Reader, Target) end. -write_docs(#http_db{} = Db, Docs) -> +write_docs(#http_db{} = Db, Docs, ErrorsAcc) -> ErrorsJson = couch_rep_httpc:request(Db#http_db{ resource = "_bulk_docs", method = post, @@ -78,6 +85,11 @@ write_docs(#http_db{} = Db, Docs) -> Reason = proplists:get_value(<<"reason">>, Props), {{Id, Rev}, {ErrId, Reason}} end, ErrorsJson), - {ok, ErrorsList}; -write_docs(Db, Docs) -> + case erase(docs_remaining) of + undefined -> + {ok, lists:flatten([ErrorsList|ErrorsAcc])}; + MoreDocs -> + write_docs(Db, MoreDocs, [ErrorsList|ErrorsAcc]) + end; +write_docs(Db, Docs, _) -> couch_db:update_docs(Db, Docs, [], replicated_changes). -- cgit v1.2.3