From f5c4dcd4366ace98378f26b3c67164befdb0d1d5 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Tue, 11 Aug 2009 01:51:06 +0000 Subject: stream _bulk_docs JSON body during replication git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802974 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_httpc.erl | 9 ++++++++- src/couchdb/couch_rep_writer.erl | 25 ++++++++++++++++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl index 8cc3cae6..3e3b5f05 100644 --- a/src/couchdb/couch_rep_httpc.erl +++ b/src/couchdb/couch_rep_httpc.erl @@ -39,7 +39,14 @@ do_request(Req) -> {OAuthProps} -> [oauth_header(Url, Method, OAuthProps) | Headers0] end, - Body = if B =:= nil -> []; true -> iolist_to_binary(?JSON_ENCODE(B)) end, + Body = case B of + {Fun, InitialState} when is_function(Fun) -> + {Fun, InitialState}; + nil -> + []; + _Else -> + iolist_to_binary(?JSON_ENCODE(B)) + end, Resp = case Conn of nil -> ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity); diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 8bea63fe..34d5f06c 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -19,15 +19,30 @@ start_link(Parent, Target, Reader, _PostProps) -> {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. +make_chunk(Data) when is_list(Data) -> + make_chunk(list_to_binary(Data)); +make_chunk(Data) -> + [ibrowse_lib:dec2hex(4, size(Data)), "\r\n", Data, "\r\n"]. + +upload_docs({start, Docs}) -> + Data = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>), + {ok, Data, {continue, Docs, ""}}; +upload_docs({continue, [Doc|Rest], Prepend}) -> + JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])), + {ok, make_chunk([Prepend, JsonDoc]), {continue, Rest, ","}}; +upload_docs({continue, [], _}) -> + {ok, make_chunk(<<"]}\n">>), last_chunk}; +upload_docs(last_chunk) -> + {ok, "0\r\n\r\n", finish}; +upload_docs(finish) -> + eof. + writer_loop(Parent, Reader, Target) -> - % ?LOG_DEBUG("writer loop begin", []), case couch_rep_reader:next(Reader) of {complete, FinalSeq} -> - % ?LOG_INFO("writer terminating normally", []), Parent ! {writer_checkpoint, FinalSeq}, ok; {HighSeq, Docs} -> - % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]), DocCount = length(Docs), try write_docs(Target, Docs) of {ok, []} -> @@ -47,11 +62,11 @@ writer_loop(Parent, Reader, Target) -> end. write_docs(#http_db{} = Db, Docs) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], ErrorsJson = couch_rep_httpc:request(Db#http_db{ resource = "_bulk_docs", method = post, - body = {[{new_edits, false}, {docs, JsonDocs}]} + body = {fun upload_docs/1, {start, Docs}}, + headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers] }), ErrorsList = lists:map( -- cgit v1.2.3