diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2009-08-11 01:51:06 +0000 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2009-08-11 01:51:06 +0000 |
commit | f5c4dcd4366ace98378f26b3c67164befdb0d1d5 (patch) | |
tree | c96fabcd64bc3b8296a4c1024c47c06f371063df /src/couchdb | |
parent | 5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef (diff) |
stream _bulk_docs JSON body during replication
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802974 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_rep_httpc.erl | 9 | ||||
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 25 |
2 files changed, 28 insertions, 6 deletions
diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl index 8cc3cae6..3e3b5f05 100644 --- a/src/couchdb/couch_rep_httpc.erl +++ b/src/couchdb/couch_rep_httpc.erl @@ -39,7 +39,14 @@ do_request(Req) -> {OAuthProps} -> [oauth_header(Url, Method, OAuthProps) | Headers0] end, - Body = if B =:= nil -> []; true -> iolist_to_binary(?JSON_ENCODE(B)) end, + Body = case B of + {Fun, InitialState} when is_function(Fun) -> + {Fun, InitialState}; + nil -> + []; + _Else -> + iolist_to_binary(?JSON_ENCODE(B)) + end, Resp = case Conn of nil -> ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity); diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 8bea63fe..34d5f06c 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -19,15 +19,30 @@ start_link(Parent, Target, Reader, _PostProps) -> {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. +make_chunk(Data) when is_list(Data) -> + make_chunk(list_to_binary(Data)); +make_chunk(Data) -> + [ibrowse_lib:dec2hex(4, size(Data)), "\r\n", Data, "\r\n"]. + +upload_docs({start, Docs}) -> + Data = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>), + {ok, Data, {continue, Docs, ""}}; +upload_docs({continue, [Doc|Rest], Prepend}) -> + JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])), + {ok, make_chunk([Prepend, JsonDoc]), {continue, Rest, ","}}; +upload_docs({continue, [], _}) -> + {ok, make_chunk(<<"]}\n">>), last_chunk}; +upload_docs(last_chunk) -> + {ok, "0\r\n\r\n", finish}; +upload_docs(finish) -> + eof. + writer_loop(Parent, Reader, Target) -> - % ?LOG_DEBUG("writer loop begin", []), case couch_rep_reader:next(Reader) of {complete, FinalSeq} -> - % ?LOG_INFO("writer terminating normally", []), Parent ! {writer_checkpoint, FinalSeq}, ok; {HighSeq, Docs} -> - % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]), DocCount = length(Docs), try write_docs(Target, Docs) of {ok, []} -> @@ -47,11 +62,11 @@ writer_loop(Parent, Reader, Target) -> end. write_docs(#http_db{} = Db, Docs) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], ErrorsJson = couch_rep_httpc:request(Db#http_db{ resource = "_bulk_docs", method = post, - body = {[{new_edits, false}, {docs, JsonDocs}]} + body = {fun upload_docs/1, {start, Docs}}, + headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers] }), ErrorsList = lists:map( |