From 50e4e3bfccb863e7a4a61306b2121828509ed261 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Thu, 3 Jun 2010 03:56:44 +0000 Subject: More work to allow for streaming attachment replication. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@950865 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 2 +- src/couchdb/couch_db.hrl | 1 + src/couchdb/couch_doc.erl | 63 +++++++++++++----------- src/couchdb/couch_httpd.erl | 10 ++-- src/couchdb/couch_httpd_db.erl | 104 +++++++++++++++++++++++++++------------ src/couchdb/couch_rep_writer.erl | 2 +- 6 files changed, 115 insertions(+), 67 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 5222cb5e..63ada164 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -523,7 +523,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI fun(Doc, {AccPrepped2, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> - couch_doc:merge_doc(Doc, #doc{}); % will throw exception + couch_doc:merge_stubs(Doc, #doc{}); % will throw exception false -> ok end, case validate_doc_update(Db, Doc, fun() -> nil end) of diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 5bc6ebaa..f82406e8 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -286,3 +286,4 @@ filter = "", include_docs = false }). + diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index cb8a116c..d6eeb9fa 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -375,8 +375,7 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> ResultAcc = Fun(Bin, Acc), fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, - SendEncodedAtts) -> +len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) -> AttsSize = lists:foldl(fun(#att{data=Data} = Att, AccAttsSize) -> case Data of stub -> @@ -395,16 +394,18 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, end end, 0, Atts), if AttsSize == 0 -> - iolist_size(JsonBytes); + {<<"application/json">>, iolist_size(JsonBytes)}; true -> - 2 + % "--" - size(Boundary) + - 36 + % "\r\ncontent-type: application/json\r\n\r\n" - iolist_size(JsonBytes) + - 4 + % "\r\n--" - size(Boundary) + - + AttsSize + - 2 % "--" + {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>, + 2 + % "--" + size(Boundary) + + 36 + % "\r\ncontent-type: application/json\r\n\r\n" + iolist_size(JsonBytes) + + 4 + % "\r\n--" + size(Boundary) + + + AttsSize + + 2 % "--" + } end. doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun, @@ -446,26 +447,30 @@ doc_from_multi_part_stream(ContentType, DataFun) -> unlink(Self) end), Parser ! {get_doc_bytes, self()}, - receive {doc_bytes, DocBytes} -> ok end, - Doc = from_json_obj(?JSON_DECODE(DocBytes)), - % go through the attachments looking for 'follows' in the data, - % replace with function that reads the data from MIME stream. - ReadAttachmentDataFun = fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end, - Atts2 = lists:map( - fun(#att{data=follows}=A) -> - A#att{data=ReadAttachmentDataFun}; - (A) -> - A - end, Doc#doc.atts), - Doc#doc{atts=Atts2}. + receive + {doc_bytes, DocBytes} -> + Doc = from_json_obj(?JSON_DECODE(DocBytes)), + % go through the attachments looking for 'follows' in the data, + % replace with function that reads the data from MIME stream. + ReadAttachmentDataFun = fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end, + Atts2 = lists:map( + fun(#att{data=follows}=A) -> + A#att{data=ReadAttachmentDataFun}; + (A) -> + A + end, Doc#doc.atts), + {ok, Doc#doc{atts=Atts2}} + end. mp_parse_doc({headers, H}, []) -> - {"application/json", _} = couch_util:get_value("content-type", H), - fun (Next) -> - mp_parse_doc(Next, []) + case couch_util:get_value("content-type", H) of + {"application/json", _} -> + fun (Next) -> + mp_parse_doc(Next, []) + end end; mp_parse_doc({body, Bytes}, AccBytes) -> fun (Next) -> diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index d9e91e4b..7f6232e9 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -772,13 +772,15 @@ parse_multipart_request(ContentType, DataFun, Callback) -> nil_callback(_Data)-> fun(Next) -> nil_callback(Next) end. -get_boundary(ContentType) -> - {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType), +get_boundary({"multipart/" ++ _, Opts}) -> case couch_util:get_value("boundary", Opts) of S when is_list(S) -> S - end. - + end; +get_boundary(ContentType) -> + {"multipart/" ++ _ , Opts} = mochiweb_util:parse_header(ContentType), + get_boundary({"multipart/", Opts}). + split_header(<<>>) -> diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 1f16ffcc..87fc15d8 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -403,7 +403,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_revs_diff">>]}=Req, Db) -> couch_doc:revs_to_strs(PossibleAncestors)}] end}} end, Results), - send_json(Req, {[{missing, {Results2}}]}); + send_json(Req, {Results2}); db_req(#httpd{path_parts=[_,<<"_revs_diff">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); @@ -573,27 +573,36 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> send_doc(Req, Doc, Options2); _ -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), - {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, "["), - % We loop through the docs. The first time through the separator - % is whitespace, then a comma on subsequent iterations. - lists:foldl( - fun(Result, AccSeparator) -> - case Result of - {ok, Doc} -> - JsonDoc = couch_doc:to_json_obj(Doc, Options), - Json = ?JSON_ENCODE({[{ok, JsonDoc}]}), - send_chunk(Resp, AccSeparator ++ Json); - {{not_found, missing}, RevId} -> - RevStr = couch_doc:rev_to_str(RevId), - Json = ?JSON_ENCODE({[{"missing", RevStr}]}), - send_chunk(Resp, AccSeparator ++ Json) + AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of + undefined -> []; + AcceptHeader -> string:tokens(AcceptHeader, "; ") + end, + case lists:member("multipart/mixed", AcceptedTypes) of + false -> + {ok, Resp} = start_json_response(Req, 200), + send_chunk(Resp, "["), + % We loop through the docs. The first time through the separator + % is whitespace, then a comma on subsequent iterations. + lists:foldl( + fun(Result, AccSeparator) -> + case Result of + {ok, Doc} -> + JsonDoc = couch_doc:to_json_obj(Doc, Options), + Json = ?JSON_ENCODE({[{ok, JsonDoc}]}), + send_chunk(Resp, AccSeparator ++ Json); + {{not_found, missing}, RevId} -> + RevStr = couch_doc:rev_to_str(RevId), + Json = ?JSON_ENCODE({[{"missing", RevStr}]}), + send_chunk(Resp, AccSeparator ++ Json) + end, + "," % AccSeparator now has a comma end, - "," % AccSeparator now has a comma - end, - "", Results), - send_chunk(Resp, "]"), - end_json_response(Resp) + "", Results), + send_chunk(Resp, "]"), + end_json_response(Resp); + true -> + send_docs_multipart(Req, Results, Options) + end end; db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> @@ -647,13 +656,13 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> Loc = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), RespHeaders = [{"Location", Loc}], - case couch_httpd:header_value(Req, "Content-Type") of - ("multipart/related" ++ _Rest) = ContentType-> - Doc0 = couch_doc:doc_from_multi_part_stream(ContentType, + case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of + ("multipart/related;" ++ _) = ContentType -> + {ok, Doc0} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, DocId, Doc0), update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType); - _ -> + _Else -> case couch_httpd:qs_value(Req, "batch") of "ok" -> % batch @@ -672,7 +681,8 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> ]}); _Normal -> % normal - Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)), + Body = couch_httpd:json_body(Req), + Doc = couch_doc_from_req(Req, DocId, Body), update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType) end end; @@ -725,11 +735,11 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)); true -> Boundary = couch_uuids:random(), - JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])), - Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes, - Atts,false), - CType = {<<"Content-Type">>, - <<"multipart/related; boundary=\"", Boundary/binary, "\"">>}, + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, + [attachments, follows|Options])), + {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( + Boundary,JsonBytes, Atts,false), + CType = {<<"Content-Type">>, ContentType}, {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len), couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts, fun(Data) -> couch_httpd:send(Resp, Data) end, false) @@ -738,7 +748,35 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)) end. - +send_docs_multipart(Req, Results, Options) -> + OuterBoundary = couch_uuids:random(), + InnerBoundary = couch_uuids:random(), + CType = {"Content-Type", + "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""}, + {ok, Resp} = start_chunked_response(Req, 200, [CType]), + couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), + lists:foreach( + fun({ok, #doc{atts=Atts}=Doc}) -> + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, + [attachments,follows|Options])), + {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream( + InnerBoundary, JsonBytes, Atts, false), + couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ", + ContentType/binary, "\r\n\r\n">>), + couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts, + fun(Data) -> couch_httpd:send_chunk(Resp, Data) + end, false), + couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>); + ({{not_found, missing}, RevId}) -> + RevStr = couch_doc:rev_to_str(RevId), + Json = ?JSON_ENCODE({[{"missing", RevStr}]}), + couch_httpd:send_chunk(Resp, + [<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>, + Json, + <<"\r\n--", OuterBoundary/binary>>]) + end, Results), + couch_httpd:send_chunk(Resp, <<"--">>), + couch_httpd:last_chunk(Resp). receive_request_data(Req) -> {couch_httpd:recv(Req, 0), fun() -> receive_request_data(Req) end}. @@ -791,6 +829,8 @@ couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc) -> case extract_header_rev(Req, ExplicitDocRev) of missing_rev -> Revs2 = {0, []}; + ExplicitDocRev -> + Revs2 = Revs; {Pos, Rev} -> Revs2 = {Pos, [Rev]} end, diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 3ab39797..3a337255 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -94,7 +94,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> ) ), Boundary = couch_uuids:random(), - Len = couch_doc:len_doc_to_multi_part_stream( + {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary, JsonBytes, Atts, true ), {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000), -- cgit v1.2.3