From 64481d0117baba9fce06384addff168912c83546 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 5 Mar 2010 16:27:00 +0000 Subject: efficient attachment replication. Patch by Filipe Manana. Closes COUCHDB-639 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@919469 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_writer.erl | 82 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 6 deletions(-) (limited to 'src/couchdb/couch_rep_writer.erl') diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 269b9799..cf01c576 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -51,8 +51,27 @@ writer_loop(Parent, Reader, Target) -> writer_loop(Parent, Reader, Target) end. -write_docs(#http_db{headers = Headers} = Db, Docs) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], +write_docs(#http_db{} = Db, Docs) -> + {DocsAtts, DocsNoAtts} = lists:partition( + fun(#doc{atts=[]}) -> false; (_) -> true end, + Docs + ), + ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts), + ErrorsJson = lists:foldl( + fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end, + ErrorsJson0, + DocsAtts + ), + {ok, ErrorsJson}; +write_docs(Db, Docs) -> + couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes). + +write_bulk_docs(_Db, []) -> + []; +write_bulk_docs(#http_db{headers = Headers} = Db, Docs) -> + JsonDocs = [ + couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs + ], Request = Db#http_db{ resource = "_bulk_docs", method = post, @@ -65,10 +84,61 @@ write_docs(#http_db{headers = Headers} = Db, Docs) -> List when is_list(List) -> List end, - ErrorsList = [write_docs_1(V) || V <- ErrorsJson], - {ok, ErrorsList}; -write_docs(Db, Docs) -> - couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes). + [write_docs_1(V) || V <- ErrorsJson]. + +write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> + JsonBytes = ?JSON_ENCODE( + couch_doc:to_json_obj( + Doc, + [follows, att_gzip_length, {atts_after_revpos, 0}] + ) + ), + Boundary = couch_uuids:random(), + Len = couch_doc:len_doc_to_multi_part_stream( + Boundary, JsonBytes, Atts, 0, true + ), + {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000), + _StreamerPid = spawn_link( + fun() -> + couch_doc:doc_to_multi_part_stream( + Boundary, + JsonBytes, + Atts, + 0, + fun(Data) -> couch_work_queue:queue(DataQueue, Data) end, + true + ), + couch_work_queue:close(DataQueue) + end + ), + BodyFun = fun(Acc) -> + case couch_work_queue:dequeue(DataQueue) of + closed -> + eof; + {ok, Data} -> + {ok, iolist_to_binary(lists:reverse(Data)), Acc} + end + end, + Request = Db#http_db{ + resource = couch_util:url_encode(Doc#doc.id), + method = put, + qs = [{new_edits, false}], + body = {BodyFun, ok}, + headers = [ + {"x-couch-full-commit", "false"}, + {"Content-Type", + "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""}, + {"Content-Length", Len} | Headers + ] + }, + case couch_rep_httpc:request(Request) of + {[{<<"error">>, Error}, {<<"reason">>, Reason}]} -> + {Pos, [RevId | _]} = Doc#doc.revs, + ErrId = couch_util:to_existing_atom(Error), + [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}]; + _ -> + [] + end. write_docs_1({Props}) -> Id = proplists:get_value(<<"id">>, Props), -- cgit v1.2.3