summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_doc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_doc.erl')
-rw-r--r--src/couchdb/couch_doc.erl224
1 files changed, 166 insertions, 58 deletions
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 72b56d53..4cb20d6c 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -12,10 +12,12 @@
-module(couch_doc).
--export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]).
+-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
-export([att_foldl/3,get_validate_doc_fun/1]).
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-export([validate_docid/1]).
+-export([doc_from_multi_part_stream/2]).
+-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
-include("couch_db.hrl").
@@ -47,10 +49,10 @@ rev_to_str({Pos, RevId}) ->
?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
-rev_to_strs([]) ->
+revs_to_strs([]) ->
[];
-rev_to_strs([{Pos, RevId}| Rest]) ->
- [rev_to_str({Pos, RevId}) | rev_to_strs(Rest)].
+revs_to_strs([{Pos, RevId}| Rest]) ->
+ [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
to_json_meta(Meta) ->
lists:map(
@@ -65,56 +67,44 @@ to_json_meta(Meta) ->
({local_seq, Seq}) ->
{<<"_local_seq">>, Seq};
({conflicts, Conflicts}) ->
- {<<"_conflicts">>, rev_to_strs(Conflicts)};
+ {<<"_conflicts">>, revs_to_strs(Conflicts)};
({deleted_conflicts, DConflicts}) ->
- {<<"_deleted_conflicts">>, rev_to_strs(DConflicts)}
+ {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
end, Meta).
-to_json_attachment_stubs(Attachments) ->
- BinProps = lists:map(
- fun(#att{name=Name,type=Type,len=Length,revpos=Pos}) ->
- {Name, {[
- {<<"stub">>, true},
- {<<"content_type">>, Type},
- {<<"length">>, Length},
- {<<"revpos">>, Pos}
- ]}}
- end,
- Attachments),
- case BinProps of
- [] -> [];
- _ -> [{<<"_attachments">>, {BinProps}}]
+to_json_attachments(Attachments, Options) ->
+ case lists:member(attachments, Options) of
+ true -> % return all the binaries
+ to_json_attachments(Attachments, 0, lists:member(follows, Options));
+ false ->
+ % note the default is [], because this sorts higher than all numbers.
+ % and will return all the binaries.
+ RevPos = proplists:get_value(atts_after_revpos, Options, []),
+ to_json_attachments(Attachments, RevPos, lists:member(follows, Options))
end.
-to_json_attachments(Atts) ->
+to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) ->
+ [];
+to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) ->
AttProps = lists:map(
- fun(#att{data=Fun,len=Len}=Att) when is_function(Fun) ->
- Data = read_streamed_attachment(Fun, Len, _Acc = []),
- {Att#att.name, {[
- {<<"content_type">>, Att#att.type},
- {<<"revpos">>, Att#att.revpos},
- {<<"data">>, couch_util:encodeBase64(Data)}
- ]}};
- (Att) ->
+ fun(#att{len=Len}=Att) ->
{Att#att.name, {[
{<<"content_type">>, Att#att.type},
- {<<"revpos">>, Att#att.revpos},
- {<<"data">>, couch_util:encodeBase64(att_to_iolist(Att))}
- ]}}
- end,
- Atts),
- case AttProps of
- [] -> [];
- _ -> [{<<"_attachments">>, {AttProps}}]
- end.
-
-to_json_attachments(Attachments, Options) ->
- case lists:member(attachments, Options) of
- true -> % return the full rev list and the binaries as strings.
- to_json_attachments(Attachments);
- false ->
- to_json_attachment_stubs(Attachments)
- end.
+ {<<"revpos">>, Att#att.revpos}
+ ] ++
+ if Att#att.revpos > RevPosIncludeAfter ->
+ if DataToFollow ->
+ [{<<"length">>, Len}, {<<"follows">>, true}];
+ true ->
+ [{<<"data">>,
+ couch_util:encodeBase64(att_to_iolist(Att))}]
+ end;
+ true ->
+ [{<<"length">>, Len}, {<<"stub">>, true}]
+ end
+ }}
+ end, Atts),
+ [{<<"_attachments">>, {AttProps}}].
to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
meta=Meta}=Doc,Options)->
@@ -199,12 +189,20 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
#att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos};
_ ->
- Value = proplists:get_value(<<"data">>, BinProps),
Type = proplists:get_value(<<"content_type">>, BinProps,
?DEFAULT_ATTACHMENT_CONTENT_TYPE),
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
- Bin = couch_util:decodeBase64(Value),
- #att{name=Name, data=Bin, type=Type, len=size(Bin), revpos=RevPos}
+ case proplists:get_value(<<"follows">>, BinProps) of
+ true ->
+ #att{name=Name, data=follows, type=Type,
+ len=proplists:get_value(<<"length">>, BinProps),
+ revpos=RevPos};
+ _ ->
+ Value = proplists:get_value(<<"data">>, BinProps),
+ Bin = couch_util:decodeBase64(Value),
+ #att{name=Name, data=Bin, type=Type, len=size(Bin),
+ revpos=RevPos}
+ end
end
end, JsonBins),
transfer_fields(Rest, Doc#doc{atts=Atts});
@@ -278,14 +276,21 @@ att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == n
% 09 UPGRADE CODE
couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
- couch_stream:foldl(Fd, Sp, Md5, Fun, Acc).
+ couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+att_foldl(#att{data=DataFun,len=Len}, Fun, Acc) when is_function(DataFun) ->
+ fold_streamed_data(DataFun, Len, Fun, Acc).
+
att_to_iolist(#att{data=Bin}) when is_binary(Bin) ->
Bin;
att_to_iolist(#att{data=Iolist}) when is_list(Iolist) ->
Iolist;
att_to_iolist(#att{data={Fd,Sp},md5=Md5}) ->
- lists:reverse(couch_stream:foldl(Fd, Sp, Md5, fun(Bin,Acc) -> [Bin|Acc] end, [])).
+ lists:reverse(couch_stream:foldl(Fd, Sp, Md5,
+ fun(Bin,Acc) -> [Bin|Acc] end, []));
+att_to_iolist(#att{data=DataFun, len=Len}) when is_function(DataFun)->
+ lists:reverse(fold_streamed_data(DataFun, Len,
+ fun(Data, Acc) -> [Data | Acc] end, [])).
get_validate_doc_fun(#doc{body={Props}}) ->
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
@@ -309,18 +314,121 @@ has_stubs([#att{data=stub}|_]) ->
has_stubs([_Att|Rest]) ->
has_stubs(Rest).
-merge_stubs(#doc{atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
+merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
MergedBins = lists:map(
- fun(#att{name=Name, data=stub}) ->
- dict:fetch(Name, BinDict);
+ fun(#att{name=Name, data=stub, revpos=RevPos}) ->
+ case dict:find(Name, BinDict) of
+ {ok, #att{revpos=RevPos}=DiskAtt} ->
+ DiskAtt;
+ _ ->
+ throw({missing_stub_on_target,
+ <<"id:", Id/binary, ", name:", Name/binary>>})
+ end;
(Att) ->
Att
end, MemBins),
StubsDoc#doc{atts= MergedBins}.
-read_streamed_attachment(_RcvFun, 0, Acc) ->
- list_to_binary(lists:reverse(Acc));
-read_streamed_attachment(RcvFun, LenLeft, Acc) ->
+fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
+ Acc;
+fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
Bin = RcvFun(),
- read_streamed_attachment(RcvFun, LenLeft - size(Bin), [Bin|Acc]).
+ ResultAcc = Fun(Bin, Acc),
+ fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
+
+len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) ->
+ 2 + % "--"
+ size(Boundary) +
+ 34 + % "\r\ncontent-type: application/json\r\n"
+ iolist_size(JsonBytes) +
+ 4 + % "\r\n--"
+ size(Boundary) +
+ + lists:foldl(fun(#att{revpos=RevPos,len=Len}, AccAttsSize) ->
+ if RevPos > AttsSinceRevPos ->
+ AccAttsSize +
+ 2 + % "\r\n"
+ Len +
+ 4 + % "\r\n--"
+ size(Boundary);
+ true ->
+ AccAttsSize
+ end
+ end, 0, Atts) +
+ 2. % "--"
+
+doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) ->
+ WriteFun([<<"--", Boundary, "\r\ncontent-type: application/json\r\n">>,
+ JsonBytes, <<"\r\n--", Boundary>>]),
+ atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos).
+
+atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) ->
+ WriteFun(<<"--">>);
+atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun,
+ AttsSinceRevPos) when RevPos > AttsSinceRevPos ->
+ WriteFun(<<"\r\n">>),
+ att_foldl(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
+ WriteFun(<<"\r\n--", Boundary>>),
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos);
+atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) ->
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos).
+
+
+doc_from_multi_part_stream(ContentType, DataFun) ->
+ Parser = spawn_link(fun() ->
+ couch_httpd:parse_multipart_request(ContentType, DataFun,
+ fun(Next)-> mp_parse_doc(Next, []) end)
+ end),
+ Parser ! {get_doc_bytes, self()},
+ receive {doc_bytes, DocBytes} -> ok end,
+ Doc = from_json_obj(?JSON_DECODE(DocBytes)),
+ % go through the attachments looking for 'follows' in the data,
+ % replace with function that reads the data from MIME stream.
+ ReadAttachmentDataFun = fun() ->
+ Parser ! {get_bytes, self()},
+ receive {bytes, Bytes} -> Bytes end
+ end,
+ Atts2 = lists:map(
+ fun(#att{data=follows}=A) ->
+ A#att{data=ReadAttachmentDataFun};
+ (A) ->
+ A
+ end, Doc#doc.atts),
+ Doc#doc{atts=Atts2}.
+
+mp_parse_doc({headers, H}, []) ->
+ {"application/json", _} = proplists:get_value("content-type", H),
+ fun (Next) ->
+ mp_parse_doc(Next, [])
+ end;
+mp_parse_doc({body, Bytes}, AccBytes) ->
+ fun (Next) ->
+ mp_parse_doc(Next, [Bytes | AccBytes])
+ end;
+mp_parse_doc(body_end, AccBytes) ->
+ receive {get_doc_bytes, From} ->
+ From ! {doc_bytes, lists:reverse(AccBytes)}
+ end,
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end.
+
+mp_parse_atts(eof) ->
+ ok;
+mp_parse_atts({headers, _H}) ->
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end;
+mp_parse_atts({body, Bytes}) ->
+ receive {get_bytes, From} ->
+ From ! {bytes, Bytes}
+ end,
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end;
+mp_parse_atts(body_end) ->
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end.
+
+