diff options
author | Damien F. Katz <damien@apache.org> | 2009-11-13 20:38:45 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-11-13 20:38:45 +0000 |
commit | 627481ee0ade53d0ceed2e29cbb4e312ecbe3340 (patch) | |
tree | 7ebc9d3b490b670103fca359e47c6aff284922ef /src | |
parent | dbf062b847922c8bffa43915324d8f75646a3dce (diff) |
Initial check-in of APIs for multiple/related supported and incremental replication of only changed attachments. Needs more far more testing and to be hooked up the replicator.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@835981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_db.erl | 117 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_doc.erl | 224 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 140 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 196 | ||||
-rw-r--r-- | src/couchdb/couch_rep_missing_revs.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 2 | ||||
-rw-r--r-- | src/mochiweb/mochiweb_multipart.erl | 2 |
8 files changed, 529 insertions, 156 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 2dbb88a3..aa46a347 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -116,22 +116,40 @@ open_doc_revs(Db, Id, Revs, Options) -> [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), Result. +% Each returned result is a list of tuples: +% {Id, MissingRevs, PossibleAncestors} +% if no revs are missing, it's omitted from the results. get_missing_revs(Db, IdRevsList) -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - FullDocInfoResults = get_full_doc_infos(Db, Ids), - Results = lists:zipwith( - fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree}} -> - {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; - not_found -> - {Id, Revs} + Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]), + {ok, find_missing(IdRevsList, Results)}. + +find_missing([], []) -> + []; +find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> + case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of + [] -> + find_missing(RestIdRevs, RestLookupInfo); + MissingRevs -> + #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), + LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], + % Find the revs that are possible parents of this rev + PossibleAncestors = + lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> + % this leaf is a "possible ancenstor" of the missing + % revs if this LeafPos lessthan any of the missing revs + case lists:any(fun({MissingPos, _}) -> + LeafPos < MissingPos end, MissingRevs) of + true -> + [{LeafPos, LeafRevId} | Acc]; + false -> + Acc end - end, - IdRevsList, FullDocInfoResults), - % strip out the non-missing ids - Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []], - {ok, Missing}. + end, [], LeafRevs), + [{Id, MissingRevs, PossibleAncestors} | + find_missing(RestIdRevs, RestLookupInfo)] + end; +find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) -> + [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)]. get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of @@ -334,7 +352,12 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( - fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + case couch_doc:has_stubs(Doc) of + true -> + couch_doc:merge_doc(Doc, #doc{}); % will throw exception + false -> ok + end, case Revs of {0, []} -> case validate_doc_update(Db, Doc, fun() -> nil end) of @@ -385,7 +408,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case OldInfo of not_found -> {ValidatedBucket, AccErrors3} = lists:foldl( - fun(Doc, {AccPrepped2, AccErrors2}) -> + fun(Doc, {AccPrepped2, AccErrors2}) -> + case couch_doc:has_stubs(Doc) of + true -> + couch_doc:merge_doc(Doc, #doc{}); % will throw exception + false -> ok + end, case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> {[Doc | AccPrepped2], AccErrors2}; @@ -411,12 +439,24 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, {Start, Path}} -> % our unflushed doc is a leaf node. Go back on the path % to find the previous rev that's on disk. + LoadPrevRevFun = fun() -> make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) end, - case validate_doc_update(Db, Doc, LoadPrevRevFun) of + + case couch_doc:has_stubs(Doc) of + true -> + DiskDoc = LoadPrevRevFun(), + Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), + GetDiskDocFun = fun() -> DiskDoc end; + false -> + Doc2 = Doc, + GetDiskDocFun = LoadPrevRevFun + end, + + case validate_doc_update(Db, Doc2, GetDiskDocFun) of ok -> - {[Doc | AccValidated], AccErrors2}; + {[Doc2 | AccValidated], AccErrors2}; Error -> {AccValidated, [{Doc, Error} | AccErrors2]} end; @@ -455,18 +495,18 @@ new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> end, IdRevsAcc, Bucket), new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). -check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> +check_dup_atts(#doc{atts=Atts}=Doc) -> + Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), + check_dup_atts2(Atts2), + Doc. + +check_dup_atts2([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> throw({bad_request, <<"Duplicate attachments">>}); -check_dup_atts([_, _ | Rest]) -> - check_dup_atts(Rest); -check_dup_atts(_) -> +check_dup_atts2([_ | Rest]) -> + check_dup_atts2(Rest); +check_dup_atts2(_) -> ok. -sort_and_check_atts(#doc{atts=Atts}=Doc) -> - Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), - check_dup_atts(Atts2), - Doc#doc{atts=Atts2}. - update_docs(Db, Docs, Options, replicated_changes) -> couch_stats_collector:increment({couchdb, database_writes}), @@ -475,7 +515,8 @@ update_docs(Db, Docs, Options, replicated_changes) -> case (Db#db.validate_doc_funs /= []) orelse lists:any( fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true; - (_) -> false + (#doc{atts=Atts}) -> + Atts /= [] end, Docs) of true -> Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], @@ -488,7 +529,7 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd) + DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -544,7 +585,7 @@ update_docs(Db, Docs, Options, interactive_edit) -> true -> [] end ++ Options, DocBuckets3 = [[ doc_flush_atts(set_new_att_revpos( - sort_and_check_atts(Doc)), Db#db.fd) + check_dup_atts(Doc)), Db#db.fd) || Doc <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), @@ -690,20 +731,8 @@ write_streamed_attachment(_Stream, _F, 0) -> ok; write_streamed_attachment(Stream, F, LenLeft) -> Bin = F(), - TruncatedBin = check_bin_length(LenLeft, Bin), - ok = couch_stream:write(Stream, TruncatedBin), - write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin)). - -%% There was a bug in ibrowse 1.4.1 that would cause it to append a CR to a -%% chunked response when the CR and LF terminating the last data chunk were -%% split across packets. The bug was fixed in version 1.5.0, but we still -%% check for it just in case. -check_bin_length(LenLeft, Bin) when size(Bin) > LenLeft -> - <<_ValidData:LenLeft/binary, Crap/binary>> = Bin, - ?LOG_ERROR("write_streamed_attachment has written too much expected: ~p" ++ - " got: ~p tail: ~p", [LenLeft, size(Bin), Crap]), - exit(replicated_attachment_too_large); -check_bin_length(_, Bin) -> Bin. + ok = couch_stream:write(Stream, Bin), + write_streamed_attachment(Stream, F, LenLeft - size(Bin)). enum_docs_since_reduce_to_count(Reds) -> couch_btree:final_reduce( diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 2bdd25e3..31366a84 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -299,7 +299,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> % nums, which means couchdb will sometimes reexamine unchanged % documents with the _changes API. % This is fixed by compacting the database. - {IsDeleted, BodyPointer, HighSeq} + {IsDeleted == 1, BodyPointer, HighSeq} end, DiskTree), #full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}. diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 72b56d53..4cb20d6c 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -12,10 +12,12 @@ -module(couch_doc). --export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]). +-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]). -export([att_foldl/3,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). +-export([doc_from_multi_part_stream/2]). +-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). -include("couch_db.hrl"). @@ -47,10 +49,10 @@ rev_to_str({Pos, RevId}) -> ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]). -rev_to_strs([]) -> +revs_to_strs([]) -> []; -rev_to_strs([{Pos, RevId}| Rest]) -> - [rev_to_str({Pos, RevId}) | rev_to_strs(Rest)]. +revs_to_strs([{Pos, RevId}| Rest]) -> + [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)]. to_json_meta(Meta) -> lists:map( @@ -65,56 +67,44 @@ to_json_meta(Meta) -> ({local_seq, Seq}) -> {<<"_local_seq">>, Seq}; ({conflicts, Conflicts}) -> - {<<"_conflicts">>, rev_to_strs(Conflicts)}; + {<<"_conflicts">>, revs_to_strs(Conflicts)}; ({deleted_conflicts, DConflicts}) -> - {<<"_deleted_conflicts">>, rev_to_strs(DConflicts)} + {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)} end, Meta). -to_json_attachment_stubs(Attachments) -> - BinProps = lists:map( - fun(#att{name=Name,type=Type,len=Length,revpos=Pos}) -> - {Name, {[ - {<<"stub">>, true}, - {<<"content_type">>, Type}, - {<<"length">>, Length}, - {<<"revpos">>, Pos} - ]}} - end, - Attachments), - case BinProps of - [] -> []; - _ -> [{<<"_attachments">>, {BinProps}}] +to_json_attachments(Attachments, Options) -> + case lists:member(attachments, Options) of + true -> % return all the binaries + to_json_attachments(Attachments, 0, lists:member(follows, Options)); + false -> + % note the default is [], because this sorts higher than all numbers. + % and will return all the binaries. + RevPos = proplists:get_value(atts_after_revpos, Options, []), + to_json_attachments(Attachments, RevPos, lists:member(follows, Options)) end. -to_json_attachments(Atts) -> +to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) -> + []; +to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) -> AttProps = lists:map( - fun(#att{data=Fun,len=Len}=Att) when is_function(Fun) -> - Data = read_streamed_attachment(Fun, Len, _Acc = []), - {Att#att.name, {[ - {<<"content_type">>, Att#att.type}, - {<<"revpos">>, Att#att.revpos}, - {<<"data">>, couch_util:encodeBase64(Data)} - ]}}; - (Att) -> + fun(#att{len=Len}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, - {<<"revpos">>, Att#att.revpos}, - {<<"data">>, couch_util:encodeBase64(att_to_iolist(Att))} - ]}} - end, - Atts), - case AttProps of - [] -> []; - _ -> [{<<"_attachments">>, {AttProps}}] - end. - -to_json_attachments(Attachments, Options) -> - case lists:member(attachments, Options) of - true -> % return the full rev list and the binaries as strings. - to_json_attachments(Attachments); - false -> - to_json_attachment_stubs(Attachments) - end. + {<<"revpos">>, Att#att.revpos} + ] ++ + if Att#att.revpos > RevPosIncludeAfter -> + if DataToFollow -> + [{<<"length">>, Len}, {<<"follows">>, true}]; + true -> + [{<<"data">>, + couch_util:encodeBase64(att_to_iolist(Att))}] + end; + true -> + [{<<"length">>, Len}, {<<"stub">>, true}] + end + }} + end, Atts), + [{<<"_attachments">>, {AttProps}}]. to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds}, meta=Meta}=Doc,Options)-> @@ -199,12 +189,20 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), #att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos}; _ -> - Value = proplists:get_value(<<"data">>, BinProps), Type = proplists:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), - Bin = couch_util:decodeBase64(Value), - #att{name=Name, data=Bin, type=Type, len=size(Bin), revpos=RevPos} + case proplists:get_value(<<"follows">>, BinProps) of + true -> + #att{name=Name, data=follows, type=Type, + len=proplists:get_value(<<"length">>, BinProps), + revpos=RevPos}; + _ -> + Value = proplists:get_value(<<"data">>, BinProps), + Bin = couch_util:decodeBase64(Value), + #att{name=Name, data=Bin, type=Type, len=size(Bin), + revpos=RevPos} + end end end, JsonBins), transfer_fields(Rest, Doc#doc{atts=Atts}); @@ -278,14 +276,21 @@ att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == n % 09 UPGRADE CODE couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc); att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> - couch_stream:foldl(Fd, Sp, Md5, Fun, Acc). + couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); +att_foldl(#att{data=DataFun,len=Len}, Fun, Acc) when is_function(DataFun) -> + fold_streamed_data(DataFun, Len, Fun, Acc). + att_to_iolist(#att{data=Bin}) when is_binary(Bin) -> Bin; att_to_iolist(#att{data=Iolist}) when is_list(Iolist) -> Iolist; att_to_iolist(#att{data={Fd,Sp},md5=Md5}) -> - lists:reverse(couch_stream:foldl(Fd, Sp, Md5, fun(Bin,Acc) -> [Bin|Acc] end, [])). + lists:reverse(couch_stream:foldl(Fd, Sp, Md5, + fun(Bin,Acc) -> [Bin|Acc] end, [])); +att_to_iolist(#att{data=DataFun, len=Len}) when is_function(DataFun)-> + lists:reverse(fold_streamed_data(DataFun, Len, + fun(Data, Acc) -> [Data | Acc] end, [])). get_validate_doc_fun(#doc{body={Props}}) -> Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), @@ -309,18 +314,121 @@ has_stubs([#att{data=stub}|_]) -> has_stubs([_Att|Rest]) -> has_stubs(Rest). -merge_stubs(#doc{atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> +merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]), MergedBins = lists:map( - fun(#att{name=Name, data=stub}) -> - dict:fetch(Name, BinDict); + fun(#att{name=Name, data=stub, revpos=RevPos}) -> + case dict:find(Name, BinDict) of + {ok, #att{revpos=RevPos}=DiskAtt} -> + DiskAtt; + _ -> + throw({missing_stub_on_target, + <<"id:", Id/binary, ", name:", Name/binary>>}) + end; (Att) -> Att end, MemBins), StubsDoc#doc{atts= MergedBins}. -read_streamed_attachment(_RcvFun, 0, Acc) -> - list_to_binary(lists:reverse(Acc)); -read_streamed_attachment(RcvFun, LenLeft, Acc) -> +fold_streamed_data(_RcvFun, 0, _Fun, Acc) -> + Acc; +fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> Bin = RcvFun(), - read_streamed_attachment(RcvFun, LenLeft - size(Bin), [Bin|Acc]). + ResultAcc = Fun(Bin, Acc), + fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). + +len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) -> + 2 + % "--" + size(Boundary) + + 34 + % "\r\ncontent-type: application/json\r\n" + iolist_size(JsonBytes) + + 4 + % "\r\n--" + size(Boundary) + + + lists:foldl(fun(#att{revpos=RevPos,len=Len}, AccAttsSize) -> + if RevPos > AttsSinceRevPos -> + AccAttsSize + + 2 + % "\r\n" + Len + + 4 + % "\r\n--" + size(Boundary); + true -> + AccAttsSize + end + end, 0, Atts) + + 2. % "--" + +doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) -> + WriteFun([<<"--", Boundary, "\r\ncontent-type: application/json\r\n">>, + JsonBytes, <<"\r\n--", Boundary>>]), + atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos). + +atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) -> + WriteFun(<<"--">>); +atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, + AttsSinceRevPos) when RevPos > AttsSinceRevPos -> + WriteFun(<<"\r\n">>), + att_foldl(Att, fun(Data, ok) -> WriteFun(Data) end, ok), + WriteFun(<<"\r\n--", Boundary>>), + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos); +atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) -> + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos). + + +doc_from_multi_part_stream(ContentType, DataFun) -> + Parser = spawn_link(fun() -> + couch_httpd:parse_multipart_request(ContentType, DataFun, + fun(Next)-> mp_parse_doc(Next, []) end) + end), + Parser ! {get_doc_bytes, self()}, + receive {doc_bytes, DocBytes} -> ok end, + Doc = from_json_obj(?JSON_DECODE(DocBytes)), + % go through the attachments looking for 'follows' in the data, + % replace with function that reads the data from MIME stream. + ReadAttachmentDataFun = fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end, + Atts2 = lists:map( + fun(#att{data=follows}=A) -> + A#att{data=ReadAttachmentDataFun}; + (A) -> + A + end, Doc#doc.atts), + Doc#doc{atts=Atts2}. + +mp_parse_doc({headers, H}, []) -> + {"application/json", _} = proplists:get_value("content-type", H), + fun (Next) -> + mp_parse_doc(Next, []) + end; +mp_parse_doc({body, Bytes}, AccBytes) -> + fun (Next) -> + mp_parse_doc(Next, [Bytes | AccBytes]) + end; +mp_parse_doc(body_end, AccBytes) -> + receive {get_doc_bytes, From} -> + From ! {doc_bytes, lists:reverse(AccBytes)} + end, + fun (Next) -> + mp_parse_atts(Next) + end. + +mp_parse_atts(eof) -> + ok; +mp_parse_atts({headers, _H}) -> + fun (Next) -> + mp_parse_atts(Next) + end; +mp_parse_atts({body, Bytes}) -> + receive {get_bytes, From} -> + From ! {bytes, Bytes} + end, + fun (Next) -> + mp_parse_atts(Next) + end; +mp_parse_atts(body_end) -> + fun (Next) -> + mp_parse_atts(Next) + end. + + diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index a49b4a4d..a0955daa 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -23,7 +23,7 @@ -export([start_response_length/4, send/2]). -export([start_json_response/2, start_json_response/3, end_json_response/1]). -export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]). --export([send_json/2,send_json/3,send_json/4,last_chunk/1]). +-export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]). start_link() -> % read config and register for configuration changes @@ -337,6 +337,7 @@ json_body_obj(Httpd) -> end. + doc_etag(#doc{revs={Start, [DiskRev|_]}}) -> "\"" ++ ?b2l(couch_doc:rev_to_str({Start, DiskRev})) ++ "\"". @@ -628,3 +629,140 @@ server_header() -> OTPVersion = "R" ++ integer_to_list(erlang:system_info(compat_rel)) ++ "B", [{"Server", "CouchDB/" ++ couch_server:get_version() ++ " (Erlang OTP/" ++ OTPVersion ++ ")"}]. + + +-record(mp, {boundary, buffer, data_fun, callback}). + + +parse_multipart_request(ContentType, DataFun, Callback) -> + Boundary0 = iolist_to_binary(get_boundary(ContentType)), + Boundary = <<"\r\n--", Boundary0/binary>>, + Mp = #mp{boundary= Boundary, + buffer= <<>>, + data_fun=DataFun, + callback=Callback}, + {Mp2, _NilCallback} = read_until(Mp, <<"--", Boundary0/binary>>, + fun(Next)-> nil_callback(Next) end), + #mp{buffer=Buffer, data_fun=DataFun2, callback=Callback2} = + parse_part_header(Mp2), + {Buffer, DataFun2, Callback2}. + +nil_callback(_Data)-> + fun(Next) -> nil_callback(Next) end. + +get_boundary(ContentType) -> + {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType), + case proplists:get_value("boundary", Opts) of + S when is_list(S) -> + S + end. + + + +split_header(<<>>) -> + []; +split_header(Line) -> + {Name, [$: | Value]} = lists:splitwith(fun (C) -> C =/= $: end, + binary_to_list(Line)), + [{string:to_lower(string:strip(Name)), + mochiweb_util:parse_header(Value)}]. + +read_until(#mp{data_fun=DataFun, buffer=Buffer}=Mp, Pattern, Callback) -> + case find_in_binary(Pattern, Buffer) of + not_found -> + Callback2 = Callback(Buffer), + {Buffer2, DataFun2} = DataFun(), + Buffer3 = iolist_to_binary(Buffer2), + read_until(Mp#mp{data_fun=DataFun2,buffer=Buffer3}, Pattern, Callback2); + {partial, Skip} -> + <<DataChunk:Skip/binary, Rest/binary>> = Buffer, + Callback2 = Callback(DataChunk), + {Buffer2, DataFun2} = DataFun(), + Buffer3 = iolist_to_binary(Buffer2), + read_until(Mp#mp{data_fun=DataFun2, + buffer= <<Buffer3/binary, Rest/binary>>}, + Pattern, Callback2); + {exact, Skip} -> + PatternLen = size(Pattern), + <<DataChunk:Skip/binary, _:PatternLen/binary, Rest/binary>> = Buffer, + Callback2 = Callback(DataChunk), + {Mp#mp{buffer= Rest}, Callback2} + end. + + +parse_part_header(#mp{callback=UserCallBack}=Mp) -> + {Mp2, AccCallback} = read_until(Mp, <<"\r\n\r\n">>, + fun(Next) -> acc_callback(Next, []) end), + HeaderData = AccCallback(get_data), + + Headers = + lists:foldl(fun(Line, Acc) -> + split_header(Line) ++ Acc + end, [], re:split(HeaderData,<<"\r\n">>, [])), + NextCallback = UserCallBack({headers, Headers}), + parse_part_body(Mp2#mp{callback=NextCallback}). + +parse_part_body(#mp{boundary=Prefix, callback=Callback}=Mp) -> + {Mp2, WrappedCallback} = read_until(Mp, Prefix, + fun(Data) -> body_callback_wrapper(Data, Callback) end), + Callback2 = WrappedCallback(get_callback), + Callback3 = Callback2(body_end), + case check_for_last(Mp2#mp{callback=Callback3}) of + {last, #mp{callback=Callback3}=Mp3} -> + Mp3#mp{callback=Callback3(eof)}; + {more, Mp3} -> + parse_part_header(Mp3) + end. + +acc_callback(get_data, Acc)-> + iolist_to_binary(lists:reverse(Acc)); +acc_callback(Data, Acc)-> + fun(Next) -> acc_callback(Next, [Data | Acc]) end. + +body_callback_wrapper(get_callback, Callback) -> + Callback; +body_callback_wrapper(Data, Callback) -> + Callback2 = Callback({body, Data}), + fun(Next) -> body_callback_wrapper(Next, Callback2) end. + + +check_for_last(#mp{buffer=Buffer, data_fun=DataFun}=Mp) -> + case Buffer of + <<"--",_/binary>> -> {last, Mp}; + <<_, _, _/binary>> -> {more, Mp}; + _ -> % not long enough + {Data, DataFun2} = DataFun(), + check_for_last(Mp#mp{buffer= <<Buffer/binary, Data/binary>>, + data_fun = DataFun2}) + end. + +find_in_binary(B, Data) when size(B) > 0 -> + case size(Data) - size(B) of + Last when Last < 0 -> + partial_find(B, Data, 0, size(Data)); + Last -> + find_in_binary(B, size(B), Data, 0, Last) + end. + +find_in_binary(B, BS, D, N, Last) when N =< Last-> + case D of + <<_:N/binary, B:BS/binary, _/binary>> -> + {exact, N}; + _ -> + find_in_binary(B, BS, D, 1 + N, Last) + end; +find_in_binary(B, BS, D, N, Last) when N =:= 1 + Last -> + partial_find(B, D, N, BS - 1). + +partial_find(_B, _D, _N, 0) -> + not_found; +partial_find(B, D, N, K) -> + <<B1:K/binary, _/binary>> = B, + case D of + <<_Skip:N/binary, B1:K/binary>> -> + {partial, N, K}; + _ -> + partial_find(B, D, 1 + N, K - 1) + end. + + diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index da62ccb4..0cad21e1 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -29,7 +29,8 @@ options = [], rev = nil, open_revs = [], - show = nil + show = nil, + atts_since = nil }). % Database request handlers @@ -476,7 +477,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> case couch_db:purge_docs(Db, IdsRevs2) of {ok, PurgeSeq, PurgedIdsRevs} -> - PurgedIdsRevs2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs], + PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs], send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]}); Error -> throw(Error) @@ -507,7 +508,8 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) -> {JsonDocIdRevs} = couch_httpd:json_body_obj(Req), JsonDocIdRevs2 = [{Id, [couch_doc:parse_rev(RevStr) || RevStr <- RevStrs]} || {Id, RevStrs} <- JsonDocIdRevs], {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2), - Results2 = [{Id, [couch_doc:rev_to_str(Rev) || Rev <- Revs]} || {Id, Revs} <- Results], + io:format("Results:~p~n", [Results]), + Results2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs, _} <- Results], send_json(Req, {[ {missing_revs, {Results2}} ]}); @@ -515,6 +517,29 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) -> db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); + +db_req(#httpd{method='POST',path_parts=[_,<<"_revs_diff">>]}=Req, Db) -> + {JsonDocIdRevs} = couch_httpd:json_body_obj(Req), + JsonDocIdRevs2 = + [{Id, couch_doc:parse_revs(RevStrs)} || {Id, RevStrs} <- JsonDocIdRevs], + {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2), + Results2 = + lists:map(fun({Id, MissingRevs, PossibleAncestors}) -> + {Id, + {[{missing, couch_doc:revs_to_strs(MissingRevs)}] ++ + if PossibleAncestors == [] -> + []; + true -> + [{possible_ancestors, + couch_doc:revs_to_strs(PossibleAncestors)}] + end}} + end, Results), + send_json(Req, {[{missing, {Results2}}]}); + +db_req(#httpd{path_parts=[_,<<"_revs_diff">>]}=Req, _Db) -> + send_method_not_allowed(Req, "POST"); + + db_req(#httpd{method='PUT',path_parts=[_,<<"_admins">>]}=Req, Db) -> Admins = couch_httpd:json_body(Req), @@ -655,9 +680,12 @@ db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> couch_doc_open(Db, DocId, nil, []), case couch_httpd:qs_value(Req, "rev") of undefined -> - update_doc(Req, Db, DocId, {[{<<"_deleted">>,true}]}); + update_doc(Req, Db, DocId, + couch_doc_from_req(Req, DocId, {[{<<"_deleted">>,true}]})); Rev -> - update_doc(Req, Db, DocId, {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}) + update_doc(Req, Db, DocId, + couch_doc_from_req(Req, DocId, + {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]})) end; db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> @@ -665,23 +693,21 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> show = Format, rev = Rev, open_revs = Revs, - options = Options + options = Options, + atts_since = AttsSince } = parse_doc_query(Req), case Format of nil -> case Revs of [] -> Doc = couch_doc_open(Db, DocId, Rev, Options), - DiskEtag = couch_httpd:doc_etag(Doc), - case Doc#doc.meta of - [] -> - % output etag only when we have no meta - couch_httpd:etag_respond(Req, DiskEtag, fun() -> - send_json(Req, 200, [{"Etag", DiskEtag}], couch_doc:to_json_obj(Doc, Options)) - end); - _ -> - send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options)) - end; + Options2 = + if AttsSince /= nil -> + RevPos = find_ancestor_rev_pos(Doc#doc.revs, AttsSince), + [{atts_after_revpos, RevPos} | Options]; + true -> Options + end, + send_doc(Req, Doc, Options2); _ -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), {ok, Resp} = start_json_response(Req, 200), @@ -713,7 +739,7 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> couch_doc:validate_docid(DocId), - case couch_httpd:header_value(Req, "content-type") of + case couch_httpd:header_value(Req, "Content-Type") of "multipart/form-data" ++ _Rest -> ok; _Else -> @@ -756,27 +782,38 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> couch_doc:validate_docid(DocId), - Json = couch_httpd:json_body(Req), - case couch_httpd:qs_value(Req, "batch") of - "ok" -> - % batch - Doc = couch_doc_from_req(Req, DocId, Json), + + Loc = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), + RespHeaders = [{"Location", Loc}], + case couch_httpd:header_value(Req, "content-type") of + ("multipart/related" ++ _Rest) = ContentType-> + io:format("Huh!~n"), + Doc0 = couch_doc:doc_from_multi_part_stream(ContentType, + fun() -> receive_request_data(Req) end), + Doc = couch_doc_from_req(Req, DocId, Doc0), + update_doc(Req, Db, DocId, Doc, RespHeaders); + _ -> + case couch_httpd:qs_value(Req, "batch") of + "ok" -> + % batch + Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)), - spawn(fun() -> - case catch(couch_db:update_doc(Db, Doc, [])) of - {ok, _} -> ok; - Error -> - ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error]) - end - end), - send_json(Req, 202, [], {[ - {ok, true}, - {id, DocId} - ]}); - _Normal -> - % normal - Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), - update_doc(Req, Db, DocId, Json, [{"Location", Location}]) + spawn(fun() -> + case catch(couch_db:update_doc(Db, Doc, [])) of + {ok, _} -> ok; + Error -> + ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error]) + end + end), + send_json(Req, 202, [], {[ + {ok, true}, + {id, DocId} + ]}); + _Normal -> + % normal + Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)), + update_doc(Req, Db, DocId, Doc, RespHeaders) + end end; db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) -> @@ -799,7 +836,66 @@ db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) -> db_doc_req(Req, _Db, _DocId) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY"). +find_ancestor_rev_pos({_, []}, _AttsSinceRevs) -> + 0; +find_ancestor_rev_pos(_DocRevs, []) -> + 0; +find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) -> + case lists:member({RevPos, RevId}, AttsSinceRevs) of + true -> + RevPos; + false -> + find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs) + end. +send_doc(Req, Doc, Options) -> + case Doc#doc.meta of + [] -> + DiskEtag = couch_httpd:doc_etag(Doc), + % output etag only when we have no meta + couch_httpd:etag_respond(Req, DiskEtag, fun() -> + send_doc_efficiently(Req, Doc, [{"Etag", DiskEtag}], Options) + end); + _ -> + send_doc_efficiently(Req, Doc, [], Options) + end. + + +send_doc_efficiently(Req, #doc{atts=[]}=Doc, Headers, Options) -> + send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)); +send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> + case lists:member(attachments, Options) orelse + proplists:is_defined(atts_after_revpos, Options) of + true -> + AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of + undefined -> []; + AcceptHeader -> string:tokens(AcceptHeader, ", ") + end, + case lists:member(AcceptedTypes, "multipart/related") of + false -> + send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options)); + true -> + Boundary = couch_uuids:random(), + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)), + AttsSinceRevPos = proplists:get_value(atts_after_revpos, Options, 0), + Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts, + AttsSinceRevPos), + CType = {<<"content-type">>, + <<"multipart/related; boundary=", Boundary/binary>>}, + Resp = start_response_length(Req, 200, [CType | Headers], Len), + couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts, + AttsSinceRevPos, + fun(Data) -> couch_httpd:send(Resp, Data) end) + end; + false -> + send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options)) + end. + + + +receive_request_data(Req) -> + {couch_httpd:recv(Req, 0), fun() -> receive_request_data(Req) end}. + update_doc_result_to_json({{Id, Rev}, Error}) -> {_Code, Err, Msg} = couch_httpd:error_info(Error), {[{id, Id}, {rev, couch_doc:rev_to_str(Rev)}, @@ -814,12 +910,10 @@ update_doc_result_to_json(DocId, Error) -> {[{id, DocId}, {error, ErrorStr}, {reason, Reason}]}. -update_doc(Req, Db, DocId, Json) -> - update_doc(Req, Db, DocId, Json, []). - -update_doc(Req, Db, DocId, Json, Headers) -> - #doc{deleted=Deleted} = Doc = couch_doc_from_req(Req, DocId, Json), +update_doc(Req, Db, DocId, Doc) -> + update_doc(Req, Db, DocId, Doc, []). +update_doc(Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers) -> case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of "true" -> Options = [full_commit]; @@ -837,21 +931,22 @@ update_doc(Req, Db, DocId, Json, Headers) -> {id, DocId}, {rev, NewRevStr}]}). -couch_doc_from_req(Req, DocId, Json) -> - Doc = couch_doc:from_json_obj(Json), +couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc) -> validate_attachment_names(Doc), ExplicitDocRev = - case Doc#doc.revs of + case Revs of {Start,[RevId|_]} -> {Start, RevId}; _ -> undefined end, case extract_header_rev(Req, ExplicitDocRev) of missing_rev -> - Revs = {0, []}; + Revs2 = {0, []}; {Pos, Rev} -> - Revs = {Pos, [Rev]} + Revs2 = {Pos, [Rev]} end, - Doc#doc{id=DocId, revs=Revs}. + Doc#doc{id=DocId, revs=Revs2}; +couch_doc_from_req(Req, DocId, Json) -> + couch_doc_from_req(Req, DocId, couch_doc:from_json_obj(Json)). % Useful for debugging @@ -1034,7 +1129,10 @@ parse_doc_query(Req) -> Args#doc_query_args{open_revs=all}; {"open_revs", RevsJsonStr} -> JsonArray = ?JSON_DECODE(RevsJsonStr), - Args#doc_query_args{open_revs=[couch_doc:parse_rev(Rev) || Rev <- JsonArray]}; + Args#doc_query_args{open_revs=couch_doc:parse_revs(JsonArray)}; + {"atts_since", RevsJsonStr} -> + JsonArray = ?JSON_DECODE(RevsJsonStr), + Args#doc_query_args{atts_since = couch_doc:parse_revs(JsonArray)}; {"show", FormatStr} -> Args#doc_query_args{show=parse_doc_format(FormatStr)}; _Else -> % unknown key value pair, ignore. diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl index 847a00db..5790dd71 100644 --- a/src/couchdb/couch_rep_missing_revs.erl +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -171,7 +171,7 @@ get_missing_revs(Target, Changes) -> SeqDict = changes_dictionary(Changes), {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), - {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs} <- Results]}. + {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}. changes_dictionary(ChangeList) -> KVs = [{proplists:get_value(<<"id">>,C), proplists:get_value(<<"seq">>,C)} diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 7f061500..a66454c8 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -218,7 +218,7 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs= {RevLists, _, _} = lists:foldl(fun split_revlist/2, - {[[]], BaseLength, BaseLength}, couch_doc:rev_to_strs(Revs)), + {[[]], BaseLength, BaseLength}, couch_doc:revs_to_strs(Revs)), Requests = [BaseReq#http_db{ qs = [{open_revs, ?JSON_ENCODE(RevList)} | BaseQS] diff --git a/src/mochiweb/mochiweb_multipart.erl b/src/mochiweb/mochiweb_multipart.erl index 9eb4badd..b9631613 100644 --- a/src/mochiweb/mochiweb_multipart.erl +++ b/src/mochiweb/mochiweb_multipart.erl @@ -158,7 +158,7 @@ feed_mp(body, State=#mp{boundary=Prefix, buffer=Buffer, callback=Callback}) -> end. get_boundary(ContentType) -> - {"multipart/form-data", Opts} = mochiweb_util:parse_header(ContentType), + {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType), case proplists:get_value("boundary", Opts) of S when is_list(S) -> S |