summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_writer.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep_writer.erl')
-rw-r--r--src/couchdb/couch_rep_writer.erl82
1 files changed, 76 insertions, 6 deletions
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 269b9799..cf01c576 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -51,8 +51,27 @@ writer_loop(Parent, Reader, Target) ->
writer_loop(Parent, Reader, Target)
end.
-write_docs(#http_db{headers = Headers} = Db, Docs) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+write_docs(#http_db{} = Db, Docs) ->
+ {DocsAtts, DocsNoAtts} = lists:partition(
+ fun(#doc{atts=[]}) -> false; (_) -> true end,
+ Docs
+ ),
+ ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts),
+ ErrorsJson = lists:foldl(
+ fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end,
+ ErrorsJson0,
+ DocsAtts
+ ),
+ {ok, ErrorsJson};
+write_docs(Db, Docs) ->
+ couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
+
+write_bulk_docs(_Db, []) ->
+ [];
+write_bulk_docs(#http_db{headers = Headers} = Db, Docs) ->
+ JsonDocs = [
+ couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs
+ ],
Request = Db#http_db{
resource = "_bulk_docs",
method = post,
@@ -65,10 +84,61 @@ write_docs(#http_db{headers = Headers} = Db, Docs) ->
List when is_list(List) ->
List
end,
- ErrorsList = [write_docs_1(V) || V <- ErrorsJson],
- {ok, ErrorsList};
-write_docs(Db, Docs) ->
- couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
+ [write_docs_1(V) || V <- ErrorsJson].
+
+write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
+ JsonBytes = ?JSON_ENCODE(
+ couch_doc:to_json_obj(
+ Doc,
+ [follows, att_gzip_length, {atts_after_revpos, 0}]
+ )
+ ),
+ Boundary = couch_uuids:random(),
+ Len = couch_doc:len_doc_to_multi_part_stream(
+ Boundary, JsonBytes, Atts, 0, true
+ ),
+ {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
+ _StreamerPid = spawn_link(
+ fun() ->
+ couch_doc:doc_to_multi_part_stream(
+ Boundary,
+ JsonBytes,
+ Atts,
+ 0,
+ fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
+ true
+ ),
+ couch_work_queue:close(DataQueue)
+ end
+ ),
+ BodyFun = fun(Acc) ->
+ case couch_work_queue:dequeue(DataQueue) of
+ closed ->
+ eof;
+ {ok, Data} ->
+ {ok, iolist_to_binary(lists:reverse(Data)), Acc}
+ end
+ end,
+ Request = Db#http_db{
+ resource = couch_util:url_encode(Doc#doc.id),
+ method = put,
+ qs = [{new_edits, false}],
+ body = {BodyFun, ok},
+ headers = [
+ {"x-couch-full-commit", "false"},
+ {"Content-Type",
+ "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+ {"Content-Length", Len} | Headers
+ ]
+ },
+ case couch_rep_httpc:request(Request) of
+ {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ ErrId = couch_util:to_existing_atom(Error),
+ [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
+ _ ->
+ []
+ end.
write_docs_1({Props}) ->
Id = proplists:get_value(<<"id">>, Props),