summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-11 01:51:06 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-11 01:51:06 +0000
commitf5c4dcd4366ace98378f26b3c67164befdb0d1d5 (patch)
treec96fabcd64bc3b8296a4c1024c47c06f371063df
parent5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef (diff)
stream _bulk_docs JSON body during replication
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802974 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep_httpc.erl9
-rw-r--r--src/couchdb/couch_rep_writer.erl25
2 files changed, 28 insertions, 6 deletions
diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl
index 8cc3cae6..3e3b5f05 100644
--- a/src/couchdb/couch_rep_httpc.erl
+++ b/src/couchdb/couch_rep_httpc.erl
@@ -39,7 +39,14 @@ do_request(Req) ->
{OAuthProps} ->
[oauth_header(Url, Method, OAuthProps) | Headers0]
end,
- Body = if B =:= nil -> []; true -> iolist_to_binary(?JSON_ENCODE(B)) end,
+ Body = case B of
+ {Fun, InitialState} when is_function(Fun) ->
+ {Fun, InitialState};
+ nil ->
+ [];
+ _Else ->
+ iolist_to_binary(?JSON_ENCODE(B))
+ end,
Resp = case Conn of
nil ->
ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity);
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 8bea63fe..34d5f06c 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -19,15 +19,30 @@
start_link(Parent, Target, Reader, _PostProps) ->
{ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+make_chunk(Data) when is_list(Data) ->
+ make_chunk(list_to_binary(Data));
+make_chunk(Data) ->
+ [ibrowse_lib:dec2hex(4, size(Data)), "\r\n", Data, "\r\n"].
+
+upload_docs({start, Docs}) ->
+ Data = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
+ {ok, Data, {continue, Docs, ""}};
+upload_docs({continue, [Doc|Rest], Prepend}) ->
+ JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])),
+ {ok, make_chunk([Prepend, JsonDoc]), {continue, Rest, ","}};
+upload_docs({continue, [], _}) ->
+ {ok, make_chunk(<<"]}\n">>), last_chunk};
+upload_docs(last_chunk) ->
+ {ok, "0\r\n\r\n", finish};
+upload_docs(finish) ->
+ eof.
+
writer_loop(Parent, Reader, Target) ->
- % ?LOG_DEBUG("writer loop begin", []),
case couch_rep_reader:next(Reader) of
{complete, FinalSeq} ->
- % ?LOG_INFO("writer terminating normally", []),
Parent ! {writer_checkpoint, FinalSeq},
ok;
{HighSeq, Docs} ->
- % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]),
DocCount = length(Docs),
try write_docs(Target, Docs) of
{ok, []} ->
@@ -47,11 +62,11 @@ writer_loop(Parent, Reader, Target) ->
end.
write_docs(#http_db{} = Db, Docs) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
ErrorsJson = couch_rep_httpc:request(Db#http_db{
resource = "_bulk_docs",
method = post,
- body = {[{new_edits, false}, {docs, JsonDocs}]}
+ body = {fun upload_docs/1, {start, Docs}},
+ headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers]
}),
ErrorsList =
lists:map(