diff options
Diffstat (limited to 'src/couchdb/couch_doc.erl')
-rw-r--r-- | src/couchdb/couch_doc.erl | 224 |
1 files changed, 166 insertions, 58 deletions
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 72b56d53..4cb20d6c 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -12,10 +12,12 @@ -module(couch_doc). --export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]). +-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]). -export([att_foldl/3,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). +-export([doc_from_multi_part_stream/2]). +-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). -include("couch_db.hrl"). @@ -47,10 +49,10 @@ rev_to_str({Pos, RevId}) -> ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]). -rev_to_strs([]) -> +revs_to_strs([]) -> []; -rev_to_strs([{Pos, RevId}| Rest]) -> - [rev_to_str({Pos, RevId}) | rev_to_strs(Rest)]. +revs_to_strs([{Pos, RevId}| Rest]) -> + [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)]. to_json_meta(Meta) -> lists:map( @@ -65,56 +67,44 @@ to_json_meta(Meta) -> ({local_seq, Seq}) -> {<<"_local_seq">>, Seq}; ({conflicts, Conflicts}) -> - {<<"_conflicts">>, rev_to_strs(Conflicts)}; + {<<"_conflicts">>, revs_to_strs(Conflicts)}; ({deleted_conflicts, DConflicts}) -> - {<<"_deleted_conflicts">>, rev_to_strs(DConflicts)} + {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)} end, Meta). -to_json_attachment_stubs(Attachments) -> - BinProps = lists:map( - fun(#att{name=Name,type=Type,len=Length,revpos=Pos}) -> - {Name, {[ - {<<"stub">>, true}, - {<<"content_type">>, Type}, - {<<"length">>, Length}, - {<<"revpos">>, Pos} - ]}} - end, - Attachments), - case BinProps of - [] -> []; - _ -> [{<<"_attachments">>, {BinProps}}] +to_json_attachments(Attachments, Options) -> + case lists:member(attachments, Options) of + true -> % return all the binaries + to_json_attachments(Attachments, 0, lists:member(follows, Options)); + false -> + % note the default is [], because this sorts higher than all numbers. + % and will return all the binaries. + RevPos = proplists:get_value(atts_after_revpos, Options, []), + to_json_attachments(Attachments, RevPos, lists:member(follows, Options)) end. -to_json_attachments(Atts) -> +to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) -> + []; +to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) -> AttProps = lists:map( - fun(#att{data=Fun,len=Len}=Att) when is_function(Fun) -> - Data = read_streamed_attachment(Fun, Len, _Acc = []), - {Att#att.name, {[ - {<<"content_type">>, Att#att.type}, - {<<"revpos">>, Att#att.revpos}, - {<<"data">>, couch_util:encodeBase64(Data)} - ]}}; - (Att) -> + fun(#att{len=Len}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, - {<<"revpos">>, Att#att.revpos}, - {<<"data">>, couch_util:encodeBase64(att_to_iolist(Att))} - ]}} - end, - Atts), - case AttProps of - [] -> []; - _ -> [{<<"_attachments">>, {AttProps}}] - end. - -to_json_attachments(Attachments, Options) -> - case lists:member(attachments, Options) of - true -> % return the full rev list and the binaries as strings. - to_json_attachments(Attachments); - false -> - to_json_attachment_stubs(Attachments) - end. + {<<"revpos">>, Att#att.revpos} + ] ++ + if Att#att.revpos > RevPosIncludeAfter -> + if DataToFollow -> + [{<<"length">>, Len}, {<<"follows">>, true}]; + true -> + [{<<"data">>, + couch_util:encodeBase64(att_to_iolist(Att))}] + end; + true -> + [{<<"length">>, Len}, {<<"stub">>, true}] + end + }} + end, Atts), + [{<<"_attachments">>, {AttProps}}]. to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds}, meta=Meta}=Doc,Options)-> @@ -199,12 +189,20 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), #att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos}; _ -> - Value = proplists:get_value(<<"data">>, BinProps), Type = proplists:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), - Bin = couch_util:decodeBase64(Value), - #att{name=Name, data=Bin, type=Type, len=size(Bin), revpos=RevPos} + case proplists:get_value(<<"follows">>, BinProps) of + true -> + #att{name=Name, data=follows, type=Type, + len=proplists:get_value(<<"length">>, BinProps), + revpos=RevPos}; + _ -> + Value = proplists:get_value(<<"data">>, BinProps), + Bin = couch_util:decodeBase64(Value), + #att{name=Name, data=Bin, type=Type, len=size(Bin), + revpos=RevPos} + end end end, JsonBins), transfer_fields(Rest, Doc#doc{atts=Atts}); @@ -278,14 +276,21 @@ att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == n % 09 UPGRADE CODE couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc); att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> - couch_stream:foldl(Fd, Sp, Md5, Fun, Acc). + couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); +att_foldl(#att{data=DataFun,len=Len}, Fun, Acc) when is_function(DataFun) -> + fold_streamed_data(DataFun, Len, Fun, Acc). + att_to_iolist(#att{data=Bin}) when is_binary(Bin) -> Bin; att_to_iolist(#att{data=Iolist}) when is_list(Iolist) -> Iolist; att_to_iolist(#att{data={Fd,Sp},md5=Md5}) -> - lists:reverse(couch_stream:foldl(Fd, Sp, Md5, fun(Bin,Acc) -> [Bin|Acc] end, [])). + lists:reverse(couch_stream:foldl(Fd, Sp, Md5, + fun(Bin,Acc) -> [Bin|Acc] end, [])); +att_to_iolist(#att{data=DataFun, len=Len}) when is_function(DataFun)-> + lists:reverse(fold_streamed_data(DataFun, Len, + fun(Data, Acc) -> [Data | Acc] end, [])). get_validate_doc_fun(#doc{body={Props}}) -> Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), @@ -309,18 +314,121 @@ has_stubs([#att{data=stub}|_]) -> has_stubs([_Att|Rest]) -> has_stubs(Rest). -merge_stubs(#doc{atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> +merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]), MergedBins = lists:map( - fun(#att{name=Name, data=stub}) -> - dict:fetch(Name, BinDict); + fun(#att{name=Name, data=stub, revpos=RevPos}) -> + case dict:find(Name, BinDict) of + {ok, #att{revpos=RevPos}=DiskAtt} -> + DiskAtt; + _ -> + throw({missing_stub_on_target, + <<"id:", Id/binary, ", name:", Name/binary>>}) + end; (Att) -> Att end, MemBins), StubsDoc#doc{atts= MergedBins}. -read_streamed_attachment(_RcvFun, 0, Acc) -> - list_to_binary(lists:reverse(Acc)); -read_streamed_attachment(RcvFun, LenLeft, Acc) -> +fold_streamed_data(_RcvFun, 0, _Fun, Acc) -> + Acc; +fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> Bin = RcvFun(), - read_streamed_attachment(RcvFun, LenLeft - size(Bin), [Bin|Acc]). + ResultAcc = Fun(Bin, Acc), + fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). + +len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) -> + 2 + % "--" + size(Boundary) + + 34 + % "\r\ncontent-type: application/json\r\n" + iolist_size(JsonBytes) + + 4 + % "\r\n--" + size(Boundary) + + + lists:foldl(fun(#att{revpos=RevPos,len=Len}, AccAttsSize) -> + if RevPos > AttsSinceRevPos -> + AccAttsSize + + 2 + % "\r\n" + Len + + 4 + % "\r\n--" + size(Boundary); + true -> + AccAttsSize + end + end, 0, Atts) + + 2. % "--" + +doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) -> + WriteFun([<<"--", Boundary, "\r\ncontent-type: application/json\r\n">>, + JsonBytes, <<"\r\n--", Boundary>>]), + atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos). + +atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) -> + WriteFun(<<"--">>); +atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, + AttsSinceRevPos) when RevPos > AttsSinceRevPos -> + WriteFun(<<"\r\n">>), + att_foldl(Att, fun(Data, ok) -> WriteFun(Data) end, ok), + WriteFun(<<"\r\n--", Boundary>>), + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos); +atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) -> + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos). + + +doc_from_multi_part_stream(ContentType, DataFun) -> + Parser = spawn_link(fun() -> + couch_httpd:parse_multipart_request(ContentType, DataFun, + fun(Next)-> mp_parse_doc(Next, []) end) + end), + Parser ! {get_doc_bytes, self()}, + receive {doc_bytes, DocBytes} -> ok end, + Doc = from_json_obj(?JSON_DECODE(DocBytes)), + % go through the attachments looking for 'follows' in the data, + % replace with function that reads the data from MIME stream. + ReadAttachmentDataFun = fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end, + Atts2 = lists:map( + fun(#att{data=follows}=A) -> + A#att{data=ReadAttachmentDataFun}; + (A) -> + A + end, Doc#doc.atts), + Doc#doc{atts=Atts2}. + +mp_parse_doc({headers, H}, []) -> + {"application/json", _} = proplists:get_value("content-type", H), + fun (Next) -> + mp_parse_doc(Next, []) + end; +mp_parse_doc({body, Bytes}, AccBytes) -> + fun (Next) -> + mp_parse_doc(Next, [Bytes | AccBytes]) + end; +mp_parse_doc(body_end, AccBytes) -> + receive {get_doc_bytes, From} -> + From ! {doc_bytes, lists:reverse(AccBytes)} + end, + fun (Next) -> + mp_parse_atts(Next) + end. + +mp_parse_atts(eof) -> + ok; +mp_parse_atts({headers, _H}) -> + fun (Next) -> + mp_parse_atts(Next) + end; +mp_parse_atts({body, Bytes}) -> + receive {get_bytes, From} -> + From ! {bytes, Bytes} + end, + fun (Next) -> + mp_parse_atts(Next) + end; +mp_parse_atts(body_end) -> + fun (Next) -> + mp_parse_atts(Next) + end. + + |