diff options
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 111 |
1 files changed, 81 insertions, 30 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 90a3dcc1..26fad55b 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -17,35 +17,89 @@ -include("couch_db.hrl"). -define (MAX_BYTES, 10000000). +-define (MAX_CHUNK_SIZE, 65535). start_link(Parent, Target, Reader, _PostProps) -> {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. make_chunk(Data) when is_list(Data) -> make_chunk(list_to_binary(Data)); +make_chunk(Data) when size(Data) > ?MAX_CHUNK_SIZE -> + <<ChunkData:?MAX_CHUNK_SIZE/binary, Rest/binary>> = Data, + Chunk = {?MAX_CHUNK_SIZE, [ibrowse_lib:dec2hex(4, ?MAX_CHUNK_SIZE), "\r\n", + ChunkData, "\r\n"]}, + [Chunk, Rest]; make_chunk(Data) -> Size = size(Data), - {Size, [ibrowse_lib:dec2hex(8, Size), "\r\n", Data, "\r\n"]}. + [{Size, [ibrowse_lib:dec2hex(4, Size), "\r\n", Data, "\r\n"]}]. + +upload_docs({start, Buffer}) -> + [{Size, Chunk}] = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>), + % Buffer ! {self(), next_doc}, + {ok, Chunk, {continue, Buffer, "", Size}}; + +upload_docs({continue, _Buf, _Pre, ByteCount}) when ByteCount > ?MAX_BYTES -> + {ok, "2\r\n]}\r\n0\r\n\r\n", finish}; +upload_docs({continue, Buffer, Prepend, ByteCount}) -> + Buffer ! {self(), next_doc}, + receive + {ok, JsonDoc} -> + [{Size, Chunk} | MoreData] = make_chunk([Prepend, JsonDoc]), + {ok, Chunk, {multi_chunk, MoreData, Buffer, ",", ByteCount+Size}}; + eof -> + {ok, "2\r\n]}\r\n0\r\n\r\n", finish}; + timeout -> + ?LOG_DEBUG("sending a heartbeat to keep the connection open", []), + {ok, "1\r\n \r\n", {continue, Buffer, Prepend, ByteCount}} + end; + +upload_docs({multi_chunk, [], Buffer, Prepend, ByteCount}) -> + % Buffer ! {self(), next_doc}, + upload_docs({continue, Buffer, Prepend, ByteCount}); +upload_docs({multi_chunk, [Data], Buffer, Prepend, ByteCount}) -> + [{Size, Chunk} | MoreData] = make_chunk(Data), + {ok, Chunk, {multi_chunk, MoreData, Buffer, Prepend, ByteCount+Size}}; -upload_docs({start, W, Docs}) -> - {Size, Chunk} = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>), - {ok, Chunk, {continue, W, Docs, "", Size}}; -upload_docs({continue, W, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES -> - W ! {docs_remaining, length(Docs)}, - {ok, "2\r\n]}\r\n", last_chunk}; -upload_docs({continue, W, [Doc|Rest], Prepend, ByteCount}) -> - JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])), - {Size, Chunk} = make_chunk([Prepend, JsonDoc]), - {ok, Chunk, {continue, W, Rest, ",", ByteCount+Size}}; -upload_docs({continue, W, [], _, _}) -> - W ! {docs_remaining, 0}, - {ok, "2\r\n]}\r\n", last_chunk}; -upload_docs(last_chunk) -> - {ok, "0\r\n\r\n", finish}; upload_docs(finish) -> couch_util:should_flush(), eof. - + +encoding_worker([]) -> + receive {MiddleMan, next_doc} -> MiddleMan ! eof end, + ok; +encoding_worker([Doc|Rest]) -> + JsonDoc = ?l2b(?JSON_ENCODE(couch_doc:to_json_obj(Doc,[revs,attachments]))), + receive {MiddleMan, next_doc} -> MiddleMan ! {ok, JsonDoc} end, + encoding_worker(Rest). + +% needed because upload_docs is inside a gen_server so it can't have a mailbox +middle_man(EncodingWorker) -> + receive {Uploader, next_doc} -> + receive + {ok, JsonDoc} -> + EncodingWorker ! {self(), next_doc}, + Uploader ! {ok, JsonDoc}, + middle_man(EncodingWorker); + eof -> + Uploader ! eof, + ok + after 5000 -> + Uploader ! timeout, + middle_man(EncodingWorker) + end + end. + +request_loop(Ref, Request, Acc) -> + receive + {'DOWN', Ref, _, _, normal} -> + lists:flatten(lists:reverse(Acc)); + {'DOWN', Ref, _, _, Reason} -> + exit(Reason) + after 0 -> + ErrorsJson = couch_rep_httpc:request(Request), + request_loop(Ref, Request, [ErrorsJson|Acc]) + end. + writer_loop(Parent, Reader, Target) -> case couch_rep_reader:next(Reader) of {complete, FinalSeq} -> @@ -53,7 +107,7 @@ writer_loop(Parent, Reader, Target) -> ok; {HighSeq, Docs} -> DocCount = length(Docs), - try write_docs(Target, Docs, []) of + try write_docs(Target, Docs) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; {ok, Errors} -> @@ -71,13 +125,16 @@ writer_loop(Parent, Reader, Target) -> writer_loop(Parent, Reader, Target) end. -write_docs(#http_db{} = Db, Docs, ErrorsAcc) -> - ErrorsJson = couch_rep_httpc:request(Db#http_db{ +write_docs(#http_db{} = Db, Docs) -> + {Worker, Ref} = spawn_monitor(fun() -> encoding_worker(Docs) end), + Pid = spawn_link(fun() -> Worker ! {self(),next_doc}, middle_man(Worker) end), + Request = Db#http_db{ resource = "_bulk_docs", method = post, - body = {fun upload_docs/1, {start, self(), Docs}}, + body = {fun upload_docs/1, {start, Pid}}, headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers] - }), + }, + ErrorsJson = request_loop(Ref, Request, []), ErrorsList = lists:map( fun({Props}) -> @@ -88,12 +145,6 @@ write_docs(#http_db{} = Db, Docs, ErrorsAcc) -> Reason = proplists:get_value(<<"reason">>, Props), {{Id, Rev}, {ErrId, Reason}} end, ErrorsJson), - receive - {docs_remaining, 0} -> - {ok, lists:flatten([ErrorsList|ErrorsAcc])}; - {docs_remaining, N} -> - MoreDocs = lists:nthtail(length(Docs)-N, Docs), - write_docs(Db, MoreDocs, [ErrorsList|ErrorsAcc]) - end; -write_docs(Db, Docs, _) -> + {ok, ErrorsList}; +write_docs(Db, Docs) -> couch_db:update_docs(Db, Docs, [], replicated_changes). |