summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep_writer.erl26
1 files changed, 15 insertions, 11 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index ca702062..90a3dcc1 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -27,21 +27,23 @@ make_chunk(Data) ->
Size = size(Data),
{Size, [ibrowse_lib:dec2hex(8, Size), "\r\n", Data, "\r\n"]}.
-upload_docs({start, Docs}) ->
+upload_docs({start, W, Docs}) ->
{Size, Chunk} = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
- {ok, Chunk, {continue, Docs, "", Size}};
-upload_docs({continue, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES ->
- put(docs_remaining, Docs),
+ {ok, Chunk, {continue, W, Docs, "", Size}};
+upload_docs({continue, W, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES ->
+ W ! {docs_remaining, length(Docs)},
{ok, "2\r\n]}\r\n", last_chunk};
-upload_docs({continue, [Doc|Rest], Prepend, ByteCount}) ->
+upload_docs({continue, W, [Doc|Rest], Prepend, ByteCount}) ->
JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])),
{Size, Chunk} = make_chunk([Prepend, JsonDoc]),
- {ok, Chunk, {continue, Rest, ",", ByteCount+Size}};
-upload_docs({continue, [], _, _}) ->
+ {ok, Chunk, {continue, W, Rest, ",", ByteCount+Size}};
+upload_docs({continue, W, [], _, _}) ->
+ W ! {docs_remaining, 0},
{ok, "2\r\n]}\r\n", last_chunk};
upload_docs(last_chunk) ->
{ok, "0\r\n\r\n", finish};
upload_docs(finish) ->
+ couch_util:should_flush(),
eof.
writer_loop(Parent, Reader, Target) ->
@@ -65,6 +67,7 @@ writer_loop(Parent, Reader, Target) ->
end,
Parent ! {writer_checkpoint, HighSeq},
couch_rep_att:cleanup(),
+ couch_util:should_flush(),
writer_loop(Parent, Reader, Target)
end.
@@ -72,7 +75,7 @@ write_docs(#http_db{} = Db, Docs, ErrorsAcc) ->
ErrorsJson = couch_rep_httpc:request(Db#http_db{
resource = "_bulk_docs",
method = post,
- body = {fun upload_docs/1, {start, Docs}},
+ body = {fun upload_docs/1, {start, self(), Docs}},
headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers]
}),
ErrorsList =
@@ -85,10 +88,11 @@ write_docs(#http_db{} = Db, Docs, ErrorsAcc) ->
Reason = proplists:get_value(<<"reason">>, Props),
{{Id, Rev}, {ErrId, Reason}}
end, ErrorsJson),
- case erase(docs_remaining) of
- undefined ->
+ receive
+ {docs_remaining, 0} ->
{ok, lists:flatten([ErrorsList|ErrorsAcc])};
- MoreDocs ->
+ {docs_remaining, N} ->
+ MoreDocs = lists:nthtail(length(Docs)-N, Docs),
write_docs(Db, MoreDocs, [ErrorsList|ErrorsAcc])
end;
write_docs(Db, Docs, _) ->