summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_writer.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-11 21:14:53 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-11 21:14:53 +0000
commit641c1417dc41a11434d7e2bcd9cbe01663e4af2e (patch)
tree967ae82a0e951212f0c346af8deefa44e0cfcf7c /src/couchdb/couch_rep_writer.erl
parent4a2452d1f39da32d18e1226646614644f803d94a (diff)
roll back streaming _bulk_docs b/c of a race condition
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@803303 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_writer.erl')
-rw-r--r--src/couchdb/couch_rep_writer.erl93
1 files changed, 4 insertions, 89 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 26fad55b..adc7a9e5 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -16,90 +16,9 @@
-include("couch_db.hrl").
--define (MAX_BYTES, 10000000).
--define (MAX_CHUNK_SIZE, 65535).
-
start_link(Parent, Target, Reader, _PostProps) ->
{ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
-make_chunk(Data) when is_list(Data) ->
- make_chunk(list_to_binary(Data));
-make_chunk(Data) when size(Data) > ?MAX_CHUNK_SIZE ->
- <<ChunkData:?MAX_CHUNK_SIZE/binary, Rest/binary>> = Data,
- Chunk = {?MAX_CHUNK_SIZE, [ibrowse_lib:dec2hex(4, ?MAX_CHUNK_SIZE), "\r\n",
- ChunkData, "\r\n"]},
- [Chunk, Rest];
-make_chunk(Data) ->
- Size = size(Data),
- [{Size, [ibrowse_lib:dec2hex(4, Size), "\r\n", Data, "\r\n"]}].
-
-upload_docs({start, Buffer}) ->
- [{Size, Chunk}] = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
- % Buffer ! {self(), next_doc},
- {ok, Chunk, {continue, Buffer, "", Size}};
-
-upload_docs({continue, _Buf, _Pre, ByteCount}) when ByteCount > ?MAX_BYTES ->
- {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
-upload_docs({continue, Buffer, Prepend, ByteCount}) ->
- Buffer ! {self(), next_doc},
- receive
- {ok, JsonDoc} ->
- [{Size, Chunk} | MoreData] = make_chunk([Prepend, JsonDoc]),
- {ok, Chunk, {multi_chunk, MoreData, Buffer, ",", ByteCount+Size}};
- eof ->
- {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
- timeout ->
- ?LOG_DEBUG("sending a heartbeat to keep the connection open", []),
- {ok, "1\r\n \r\n", {continue, Buffer, Prepend, ByteCount}}
- end;
-
-upload_docs({multi_chunk, [], Buffer, Prepend, ByteCount}) ->
- % Buffer ! {self(), next_doc},
- upload_docs({continue, Buffer, Prepend, ByteCount});
-upload_docs({multi_chunk, [Data], Buffer, Prepend, ByteCount}) ->
- [{Size, Chunk} | MoreData] = make_chunk(Data),
- {ok, Chunk, {multi_chunk, MoreData, Buffer, Prepend, ByteCount+Size}};
-
-upload_docs(finish) ->
- couch_util:should_flush(),
- eof.
-
-encoding_worker([]) ->
- receive {MiddleMan, next_doc} -> MiddleMan ! eof end,
- ok;
-encoding_worker([Doc|Rest]) ->
- JsonDoc = ?l2b(?JSON_ENCODE(couch_doc:to_json_obj(Doc,[revs,attachments]))),
- receive {MiddleMan, next_doc} -> MiddleMan ! {ok, JsonDoc} end,
- encoding_worker(Rest).
-
-% needed because upload_docs is inside a gen_server so it can't have a mailbox
-middle_man(EncodingWorker) ->
- receive {Uploader, next_doc} ->
- receive
- {ok, JsonDoc} ->
- EncodingWorker ! {self(), next_doc},
- Uploader ! {ok, JsonDoc},
- middle_man(EncodingWorker);
- eof ->
- Uploader ! eof,
- ok
- after 5000 ->
- Uploader ! timeout,
- middle_man(EncodingWorker)
- end
- end.
-
-request_loop(Ref, Request, Acc) ->
- receive
- {'DOWN', Ref, _, _, normal} ->
- lists:flatten(lists:reverse(Acc));
- {'DOWN', Ref, _, _, Reason} ->
- exit(Reason)
- after 0 ->
- ErrorsJson = couch_rep_httpc:request(Request),
- request_loop(Ref, Request, [ErrorsJson|Acc])
- end.
-
writer_loop(Parent, Reader, Target) ->
case couch_rep_reader:next(Reader) of
{complete, FinalSeq} ->
@@ -121,20 +40,16 @@ writer_loop(Parent, Reader, Target) ->
end,
Parent ! {writer_checkpoint, HighSeq},
couch_rep_att:cleanup(),
- couch_util:should_flush(),
writer_loop(Parent, Reader, Target)
end.
write_docs(#http_db{} = Db, Docs) ->
- {Worker, Ref} = spawn_monitor(fun() -> encoding_worker(Docs) end),
- Pid = spawn_link(fun() -> Worker ! {self(),next_doc}, middle_man(Worker) end),
- Request = Db#http_db{
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ ErrorsJson = couch_rep_httpc:request(Db#http_db{
resource = "_bulk_docs",
method = post,
- body = {fun upload_docs/1, {start, Pid}},
- headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers]
- },
- ErrorsJson = request_loop(Ref, Request, []),
+ body = {[{new_edits, false}, {docs, JsonDocs}]}
+ }),
ErrorsList =
lists:map(
fun({Props}) ->