summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-11 03:47:25 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-11 03:47:25 +0000
commit525187db0224d894493835bf8cfef956eec3aa3b (patch)
tree6785c1ec0079e3e441fa001d295f8680dfc8b36c
parent0a2bf6e5f691a7739f482e53ea329959ac81758d (diff)
split into multiple requests so target doesn't buffer too much
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802986 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep_writer.erl34
1 files changed, 23 insertions, 11 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index d4f2fd31..ca702062 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -16,22 +16,29 @@
-include("couch_db.hrl").
+-define (MAX_BYTES, 10000000).
+
start_link(Parent, Target, Reader, _PostProps) ->
{ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
make_chunk(Data) when is_list(Data) ->
make_chunk(list_to_binary(Data));
make_chunk(Data) ->
- [ibrowse_lib:dec2hex(8, size(Data)), "\r\n", Data, "\r\n"].
+ Size = size(Data),
+ {Size, [ibrowse_lib:dec2hex(8, Size), "\r\n", Data, "\r\n"]}.
upload_docs({start, Docs}) ->
- Data = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
- {ok, Data, {continue, Docs, ""}};
-upload_docs({continue, [Doc|Rest], Prepend}) ->
+ {Size, Chunk} = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
+ {ok, Chunk, {continue, Docs, "", Size}};
+upload_docs({continue, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES ->
+ put(docs_remaining, Docs),
+ {ok, "2\r\n]}\r\n", last_chunk};
+upload_docs({continue, [Doc|Rest], Prepend, ByteCount}) ->
JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])),
- {ok, make_chunk([Prepend, JsonDoc]), {continue, Rest, ","}};
-upload_docs({continue, [], _}) ->
- {ok, make_chunk(<<"]}\n">>), last_chunk};
+ {Size, Chunk} = make_chunk([Prepend, JsonDoc]),
+ {ok, Chunk, {continue, Rest, ",", ByteCount+Size}};
+upload_docs({continue, [], _, _}) ->
+ {ok, "2\r\n]}\r\n", last_chunk};
upload_docs(last_chunk) ->
{ok, "0\r\n\r\n", finish};
upload_docs(finish) ->
@@ -44,7 +51,7 @@ writer_loop(Parent, Reader, Target) ->
ok;
{HighSeq, Docs} ->
DocCount = length(Docs),
- try write_docs(Target, Docs) of
+ try write_docs(Target, Docs, []) of
{ok, []} ->
Parent ! {update_stats, docs_written, DocCount};
{ok, Errors} ->
@@ -61,7 +68,7 @@ writer_loop(Parent, Reader, Target) ->
writer_loop(Parent, Reader, Target)
end.
-write_docs(#http_db{} = Db, Docs) ->
+write_docs(#http_db{} = Db, Docs, ErrorsAcc) ->
ErrorsJson = couch_rep_httpc:request(Db#http_db{
resource = "_bulk_docs",
method = post,
@@ -78,6 +85,11 @@ write_docs(#http_db{} = Db, Docs) ->
Reason = proplists:get_value(<<"reason">>, Props),
{{Id, Rev}, {ErrId, Reason}}
end, ErrorsJson),
- {ok, ErrorsList};
-write_docs(Db, Docs) ->
+ case erase(docs_remaining) of
+ undefined ->
+ {ok, lists:flatten([ErrorsList|ErrorsAcc])};
+ MoreDocs ->
+ write_docs(Db, MoreDocs, [ErrorsList|ErrorsAcc])
+ end;
+write_docs(Db, Docs, _) ->
couch_db:update_docs(Db, Docs, [], replicated_changes).