summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep_writer.erl111
1 files changed, 81 insertions, 30 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 90a3dcc1..26fad55b 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -17,35 +17,89 @@
-include("couch_db.hrl").
-define (MAX_BYTES, 10000000).
+-define (MAX_CHUNK_SIZE, 65535).
start_link(Parent, Target, Reader, _PostProps) ->
{ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
make_chunk(Data) when is_list(Data) ->
make_chunk(list_to_binary(Data));
+make_chunk(Data) when size(Data) > ?MAX_CHUNK_SIZE ->
+ <<ChunkData:?MAX_CHUNK_SIZE/binary, Rest/binary>> = Data,
+ Chunk = {?MAX_CHUNK_SIZE, [ibrowse_lib:dec2hex(4, ?MAX_CHUNK_SIZE), "\r\n",
+ ChunkData, "\r\n"]},
+ [Chunk, Rest];
make_chunk(Data) ->
Size = size(Data),
- {Size, [ibrowse_lib:dec2hex(8, Size), "\r\n", Data, "\r\n"]}.
+ [{Size, [ibrowse_lib:dec2hex(4, Size), "\r\n", Data, "\r\n"]}].
+
+upload_docs({start, Buffer}) ->
+ [{Size, Chunk}] = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
+ % Buffer ! {self(), next_doc},
+ {ok, Chunk, {continue, Buffer, "", Size}};
+
+upload_docs({continue, _Buf, _Pre, ByteCount}) when ByteCount > ?MAX_BYTES ->
+ {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
+upload_docs({continue, Buffer, Prepend, ByteCount}) ->
+ Buffer ! {self(), next_doc},
+ receive
+ {ok, JsonDoc} ->
+ [{Size, Chunk} | MoreData] = make_chunk([Prepend, JsonDoc]),
+ {ok, Chunk, {multi_chunk, MoreData, Buffer, ",", ByteCount+Size}};
+ eof ->
+ {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
+ timeout ->
+ ?LOG_DEBUG("sending a heartbeat to keep the connection open", []),
+ {ok, "1\r\n \r\n", {continue, Buffer, Prepend, ByteCount}}
+ end;
+
+upload_docs({multi_chunk, [], Buffer, Prepend, ByteCount}) ->
+ % Buffer ! {self(), next_doc},
+ upload_docs({continue, Buffer, Prepend, ByteCount});
+upload_docs({multi_chunk, [Data], Buffer, Prepend, ByteCount}) ->
+ [{Size, Chunk} | MoreData] = make_chunk(Data),
+ {ok, Chunk, {multi_chunk, MoreData, Buffer, Prepend, ByteCount+Size}};
-upload_docs({start, W, Docs}) ->
- {Size, Chunk} = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
- {ok, Chunk, {continue, W, Docs, "", Size}};
-upload_docs({continue, W, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES ->
- W ! {docs_remaining, length(Docs)},
- {ok, "2\r\n]}\r\n", last_chunk};
-upload_docs({continue, W, [Doc|Rest], Prepend, ByteCount}) ->
- JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])),
- {Size, Chunk} = make_chunk([Prepend, JsonDoc]),
- {ok, Chunk, {continue, W, Rest, ",", ByteCount+Size}};
-upload_docs({continue, W, [], _, _}) ->
- W ! {docs_remaining, 0},
- {ok, "2\r\n]}\r\n", last_chunk};
-upload_docs(last_chunk) ->
- {ok, "0\r\n\r\n", finish};
upload_docs(finish) ->
couch_util:should_flush(),
eof.
-
+
+encoding_worker([]) ->
+ receive {MiddleMan, next_doc} -> MiddleMan ! eof end,
+ ok;
+encoding_worker([Doc|Rest]) ->
+ JsonDoc = ?l2b(?JSON_ENCODE(couch_doc:to_json_obj(Doc,[revs,attachments]))),
+ receive {MiddleMan, next_doc} -> MiddleMan ! {ok, JsonDoc} end,
+ encoding_worker(Rest).
+
+% needed because upload_docs is inside a gen_server so it can't have a mailbox
+middle_man(EncodingWorker) ->
+ receive {Uploader, next_doc} ->
+ receive
+ {ok, JsonDoc} ->
+ EncodingWorker ! {self(), next_doc},
+ Uploader ! {ok, JsonDoc},
+ middle_man(EncodingWorker);
+ eof ->
+ Uploader ! eof,
+ ok
+ after 5000 ->
+ Uploader ! timeout,
+ middle_man(EncodingWorker)
+ end
+ end.
+
+request_loop(Ref, Request, Acc) ->
+ receive
+ {'DOWN', Ref, _, _, normal} ->
+ lists:flatten(lists:reverse(Acc));
+ {'DOWN', Ref, _, _, Reason} ->
+ exit(Reason)
+ after 0 ->
+ ErrorsJson = couch_rep_httpc:request(Request),
+ request_loop(Ref, Request, [ErrorsJson|Acc])
+ end.
+
writer_loop(Parent, Reader, Target) ->
case couch_rep_reader:next(Reader) of
{complete, FinalSeq} ->
@@ -53,7 +107,7 @@ writer_loop(Parent, Reader, Target) ->
ok;
{HighSeq, Docs} ->
DocCount = length(Docs),
- try write_docs(Target, Docs, []) of
+ try write_docs(Target, Docs) of
{ok, []} ->
Parent ! {update_stats, docs_written, DocCount};
{ok, Errors} ->
@@ -71,13 +125,16 @@ writer_loop(Parent, Reader, Target) ->
writer_loop(Parent, Reader, Target)
end.
-write_docs(#http_db{} = Db, Docs, ErrorsAcc) ->
- ErrorsJson = couch_rep_httpc:request(Db#http_db{
+write_docs(#http_db{} = Db, Docs) ->
+ {Worker, Ref} = spawn_monitor(fun() -> encoding_worker(Docs) end),
+ Pid = spawn_link(fun() -> Worker ! {self(),next_doc}, middle_man(Worker) end),
+ Request = Db#http_db{
resource = "_bulk_docs",
method = post,
- body = {fun upload_docs/1, {start, self(), Docs}},
+ body = {fun upload_docs/1, {start, Pid}},
headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers]
- }),
+ },
+ ErrorsJson = request_loop(Ref, Request, []),
ErrorsList =
lists:map(
fun({Props}) ->
@@ -88,12 +145,6 @@ write_docs(#http_db{} = Db, Docs, ErrorsAcc) ->
Reason = proplists:get_value(<<"reason">>, Props),
{{Id, Rev}, {ErrId, Reason}}
end, ErrorsJson),
- receive
- {docs_remaining, 0} ->
- {ok, lists:flatten([ErrorsList|ErrorsAcc])};
- {docs_remaining, N} ->
- MoreDocs = lists:nthtail(length(Docs)-N, Docs),
- write_docs(Db, MoreDocs, [ErrorsList|ErrorsAcc])
- end;
-write_docs(Db, Docs, _) ->
+ {ok, ErrorsList};
+write_docs(Db, Docs) ->
couch_db:update_docs(Db, Docs, [], replicated_changes).