summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-11-13 20:38:45 +0000
committerDamien F. Katz <damien@apache.org>2009-11-13 20:38:45 +0000
commit627481ee0ade53d0ceed2e29cbb4e312ecbe3340 (patch)
tree7ebc9d3b490b670103fca359e47c6aff284922ef /src
parentdbf062b847922c8bffa43915324d8f75646a3dce (diff)
Initial check-in of APIs for multiple/related supported and incremental replication of only changed attachments. Needs more far more testing and to be hooked up the replicator.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@835981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl117
-rw-r--r--src/couchdb/couch_db_updater.erl2
-rw-r--r--src/couchdb/couch_doc.erl224
-rw-r--r--src/couchdb/couch_httpd.erl140
-rw-r--r--src/couchdb/couch_httpd_db.erl196
-rw-r--r--src/couchdb/couch_rep_missing_revs.erl2
-rw-r--r--src/couchdb/couch_rep_reader.erl2
-rw-r--r--src/mochiweb/mochiweb_multipart.erl2
8 files changed, 529 insertions, 156 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 2dbb88a3..aa46a347 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -116,22 +116,40 @@ open_doc_revs(Db, Id, Revs, Options) ->
[Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
Result.
+% Each returned result is a list of tuples:
+% {Id, MissingRevs, PossibleAncestors}
+% if no revs are missing, it's omitted from the results.
get_missing_revs(Db, IdRevsList) ->
- Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
- FullDocInfoResults = get_full_doc_infos(Db, Ids),
- Results = lists:zipwith(
- fun({Id, Revs}, FullDocInfoResult) ->
- case FullDocInfoResult of
- {ok, #full_doc_info{rev_tree=RevisionTree}} ->
- {Id, couch_key_tree:find_missing(RevisionTree, Revs)};
- not_found ->
- {Id, Revs}
+ Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
+ {ok, find_missing(IdRevsList, Results)}.
+
+find_missing([], []) ->
+ [];
+find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+ case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
+ [] ->
+ find_missing(RestIdRevs, RestLookupInfo);
+ MissingRevs ->
+ #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
+ LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
+ % Find the revs that are possible parents of this rev
+ PossibleAncestors =
+ lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
+ % this leaf is a "possible ancenstor" of the missing
+ % revs if this LeafPos lessthan any of the missing revs
+ case lists:any(fun({MissingPos, _}) ->
+ LeafPos < MissingPos end, MissingRevs) of
+ true ->
+ [{LeafPos, LeafRevId} | Acc];
+ false ->
+ Acc
end
- end,
- IdRevsList, FullDocInfoResults),
- % strip out the non-missing ids
- Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []],
- {ok, Missing}.
+ end, [], LeafRevs),
+ [{Id, MissingRevs, PossibleAncestors} |
+ find_missing(RestIdRevs, RestLookupInfo)]
+ end;
+find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
+ [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
@@ -334,7 +352,12 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
[#doc{id=Id}|_]=DocBucket,
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+ false -> ok
+ end,
case Revs of
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
@@ -385,7 +408,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {AccPrepped2, AccErrors2}) ->
+ fun(Doc, {AccPrepped2, AccErrors2}) ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+ false -> ok
+ end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccPrepped2], AccErrors2};
@@ -411,12 +439,24 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
+
LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
end,
- case validate_doc_update(Db, Doc, LoadPrevRevFun) of
+
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = LoadPrevRevFun(),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ GetDiskDocFun = fun() -> DiskDoc end;
+ false ->
+ Doc2 = Doc,
+ GetDiskDocFun = LoadPrevRevFun
+ end,
+
+ case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
- {[Doc | AccValidated], AccErrors2};
+ {[Doc2 | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
@@ -455,18 +495,18 @@ new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
-check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 ->
+check_dup_atts(#doc{atts=Atts}=Doc) ->
+ Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
+ check_dup_atts2(Atts2),
+ Doc.
+
+check_dup_atts2([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 ->
throw({bad_request, <<"Duplicate attachments">>});
-check_dup_atts([_, _ | Rest]) ->
- check_dup_atts(Rest);
-check_dup_atts(_) ->
+check_dup_atts2([_ | Rest]) ->
+ check_dup_atts2(Rest);
+check_dup_atts2(_) ->
ok.
-sort_and_check_atts(#doc{atts=Atts}=Doc) ->
- Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
- check_dup_atts(Atts2),
- Doc#doc{atts=Atts2}.
-
update_docs(Db, Docs, Options, replicated_changes) ->
couch_stats_collector:increment({couchdb, database_writes}),
@@ -475,7 +515,8 @@ update_docs(Db, Docs, Options, replicated_changes) ->
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
- (_) -> false
+ (#doc{atts=Atts}) ->
+ Atts /= []
end, Docs) of
true ->
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
@@ -488,7 +529,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -544,7 +585,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
- sort_and_check_atts(Doc)), Db#db.fd)
+ check_dup_atts(Doc)), Db#db.fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -690,20 +731,8 @@ write_streamed_attachment(_Stream, _F, 0) ->
ok;
write_streamed_attachment(Stream, F, LenLeft) ->
Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- ok = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin)).
-
-%% There was a bug in ibrowse 1.4.1 that would cause it to append a CR to a
-%% chunked response when the CR and LF terminating the last data chunk were
-%% split across packets. The bug was fixed in version 1.5.0, but we still
-%% check for it just in case.
-check_bin_length(LenLeft, Bin) when size(Bin) > LenLeft ->
- <<_ValidData:LenLeft/binary, Crap/binary>> = Bin,
- ?LOG_ERROR("write_streamed_attachment has written too much expected: ~p" ++
- " got: ~p tail: ~p", [LenLeft, size(Bin), Crap]),
- exit(replicated_attachment_too_large);
-check_bin_length(_, Bin) -> Bin.
+ ok = couch_stream:write(Stream, Bin),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
enum_docs_since_reduce_to_count(Reds) ->
couch_btree:final_reduce(
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 2bdd25e3..31366a84 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -299,7 +299,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
% nums, which means couchdb will sometimes reexamine unchanged
% documents with the _changes API.
% This is fixed by compacting the database.
- {IsDeleted, BodyPointer, HighSeq}
+ {IsDeleted == 1, BodyPointer, HighSeq}
end, DiskTree),
#full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}.
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 72b56d53..4cb20d6c 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -12,10 +12,12 @@
-module(couch_doc).
--export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]).
+-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
-export([att_foldl/3,get_validate_doc_fun/1]).
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-export([validate_docid/1]).
+-export([doc_from_multi_part_stream/2]).
+-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
-include("couch_db.hrl").
@@ -47,10 +49,10 @@ rev_to_str({Pos, RevId}) ->
?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
-rev_to_strs([]) ->
+revs_to_strs([]) ->
[];
-rev_to_strs([{Pos, RevId}| Rest]) ->
- [rev_to_str({Pos, RevId}) | rev_to_strs(Rest)].
+revs_to_strs([{Pos, RevId}| Rest]) ->
+ [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
to_json_meta(Meta) ->
lists:map(
@@ -65,56 +67,44 @@ to_json_meta(Meta) ->
({local_seq, Seq}) ->
{<<"_local_seq">>, Seq};
({conflicts, Conflicts}) ->
- {<<"_conflicts">>, rev_to_strs(Conflicts)};
+ {<<"_conflicts">>, revs_to_strs(Conflicts)};
({deleted_conflicts, DConflicts}) ->
- {<<"_deleted_conflicts">>, rev_to_strs(DConflicts)}
+ {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
end, Meta).
-to_json_attachment_stubs(Attachments) ->
- BinProps = lists:map(
- fun(#att{name=Name,type=Type,len=Length,revpos=Pos}) ->
- {Name, {[
- {<<"stub">>, true},
- {<<"content_type">>, Type},
- {<<"length">>, Length},
- {<<"revpos">>, Pos}
- ]}}
- end,
- Attachments),
- case BinProps of
- [] -> [];
- _ -> [{<<"_attachments">>, {BinProps}}]
+to_json_attachments(Attachments, Options) ->
+ case lists:member(attachments, Options) of
+ true -> % return all the binaries
+ to_json_attachments(Attachments, 0, lists:member(follows, Options));
+ false ->
+ % note the default is [], because this sorts higher than all numbers.
+ % and will return all the binaries.
+ RevPos = proplists:get_value(atts_after_revpos, Options, []),
+ to_json_attachments(Attachments, RevPos, lists:member(follows, Options))
end.
-to_json_attachments(Atts) ->
+to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) ->
+ [];
+to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) ->
AttProps = lists:map(
- fun(#att{data=Fun,len=Len}=Att) when is_function(Fun) ->
- Data = read_streamed_attachment(Fun, Len, _Acc = []),
- {Att#att.name, {[
- {<<"content_type">>, Att#att.type},
- {<<"revpos">>, Att#att.revpos},
- {<<"data">>, couch_util:encodeBase64(Data)}
- ]}};
- (Att) ->
+ fun(#att{len=Len}=Att) ->
{Att#att.name, {[
{<<"content_type">>, Att#att.type},
- {<<"revpos">>, Att#att.revpos},
- {<<"data">>, couch_util:encodeBase64(att_to_iolist(Att))}
- ]}}
- end,
- Atts),
- case AttProps of
- [] -> [];
- _ -> [{<<"_attachments">>, {AttProps}}]
- end.
-
-to_json_attachments(Attachments, Options) ->
- case lists:member(attachments, Options) of
- true -> % return the full rev list and the binaries as strings.
- to_json_attachments(Attachments);
- false ->
- to_json_attachment_stubs(Attachments)
- end.
+ {<<"revpos">>, Att#att.revpos}
+ ] ++
+ if Att#att.revpos > RevPosIncludeAfter ->
+ if DataToFollow ->
+ [{<<"length">>, Len}, {<<"follows">>, true}];
+ true ->
+ [{<<"data">>,
+ couch_util:encodeBase64(att_to_iolist(Att))}]
+ end;
+ true ->
+ [{<<"length">>, Len}, {<<"stub">>, true}]
+ end
+ }}
+ end, Atts),
+ [{<<"_attachments">>, {AttProps}}].
to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
meta=Meta}=Doc,Options)->
@@ -199,12 +189,20 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
#att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos};
_ ->
- Value = proplists:get_value(<<"data">>, BinProps),
Type = proplists:get_value(<<"content_type">>, BinProps,
?DEFAULT_ATTACHMENT_CONTENT_TYPE),
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
- Bin = couch_util:decodeBase64(Value),
- #att{name=Name, data=Bin, type=Type, len=size(Bin), revpos=RevPos}
+ case proplists:get_value(<<"follows">>, BinProps) of
+ true ->
+ #att{name=Name, data=follows, type=Type,
+ len=proplists:get_value(<<"length">>, BinProps),
+ revpos=RevPos};
+ _ ->
+ Value = proplists:get_value(<<"data">>, BinProps),
+ Bin = couch_util:decodeBase64(Value),
+ #att{name=Name, data=Bin, type=Type, len=size(Bin),
+ revpos=RevPos}
+ end
end
end, JsonBins),
transfer_fields(Rest, Doc#doc{atts=Atts});
@@ -278,14 +276,21 @@ att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == n
% 09 UPGRADE CODE
couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
- couch_stream:foldl(Fd, Sp, Md5, Fun, Acc).
+ couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+att_foldl(#att{data=DataFun,len=Len}, Fun, Acc) when is_function(DataFun) ->
+ fold_streamed_data(DataFun, Len, Fun, Acc).
+
att_to_iolist(#att{data=Bin}) when is_binary(Bin) ->
Bin;
att_to_iolist(#att{data=Iolist}) when is_list(Iolist) ->
Iolist;
att_to_iolist(#att{data={Fd,Sp},md5=Md5}) ->
- lists:reverse(couch_stream:foldl(Fd, Sp, Md5, fun(Bin,Acc) -> [Bin|Acc] end, [])).
+ lists:reverse(couch_stream:foldl(Fd, Sp, Md5,
+ fun(Bin,Acc) -> [Bin|Acc] end, []));
+att_to_iolist(#att{data=DataFun, len=Len}) when is_function(DataFun)->
+ lists:reverse(fold_streamed_data(DataFun, Len,
+ fun(Data, Acc) -> [Data | Acc] end, [])).
get_validate_doc_fun(#doc{body={Props}}) ->
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
@@ -309,18 +314,121 @@ has_stubs([#att{data=stub}|_]) ->
has_stubs([_Att|Rest]) ->
has_stubs(Rest).
-merge_stubs(#doc{atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
+merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
MergedBins = lists:map(
- fun(#att{name=Name, data=stub}) ->
- dict:fetch(Name, BinDict);
+ fun(#att{name=Name, data=stub, revpos=RevPos}) ->
+ case dict:find(Name, BinDict) of
+ {ok, #att{revpos=RevPos}=DiskAtt} ->
+ DiskAtt;
+ _ ->
+ throw({missing_stub_on_target,
+ <<"id:", Id/binary, ", name:", Name/binary>>})
+ end;
(Att) ->
Att
end, MemBins),
StubsDoc#doc{atts= MergedBins}.
-read_streamed_attachment(_RcvFun, 0, Acc) ->
- list_to_binary(lists:reverse(Acc));
-read_streamed_attachment(RcvFun, LenLeft, Acc) ->
+fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
+ Acc;
+fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
Bin = RcvFun(),
- read_streamed_attachment(RcvFun, LenLeft - size(Bin), [Bin|Acc]).
+ ResultAcc = Fun(Bin, Acc),
+ fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
+
+len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) ->
+ 2 + % "--"
+ size(Boundary) +
+ 34 + % "\r\ncontent-type: application/json\r\n"
+ iolist_size(JsonBytes) +
+ 4 + % "\r\n--"
+ size(Boundary) +
+ + lists:foldl(fun(#att{revpos=RevPos,len=Len}, AccAttsSize) ->
+ if RevPos > AttsSinceRevPos ->
+ AccAttsSize +
+ 2 + % "\r\n"
+ Len +
+ 4 + % "\r\n--"
+ size(Boundary);
+ true ->
+ AccAttsSize
+ end
+ end, 0, Atts) +
+ 2. % "--"
+
+doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) ->
+ WriteFun([<<"--", Boundary, "\r\ncontent-type: application/json\r\n">>,
+ JsonBytes, <<"\r\n--", Boundary>>]),
+ atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos).
+
+atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) ->
+ WriteFun(<<"--">>);
+atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun,
+ AttsSinceRevPos) when RevPos > AttsSinceRevPos ->
+ WriteFun(<<"\r\n">>),
+ att_foldl(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
+ WriteFun(<<"\r\n--", Boundary>>),
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos);
+atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) ->
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos).
+
+
+doc_from_multi_part_stream(ContentType, DataFun) ->
+ Parser = spawn_link(fun() ->
+ couch_httpd:parse_multipart_request(ContentType, DataFun,
+ fun(Next)-> mp_parse_doc(Next, []) end)
+ end),
+ Parser ! {get_doc_bytes, self()},
+ receive {doc_bytes, DocBytes} -> ok end,
+ Doc = from_json_obj(?JSON_DECODE(DocBytes)),
+ % go through the attachments looking for 'follows' in the data,
+ % replace with function that reads the data from MIME stream.
+ ReadAttachmentDataFun = fun() ->
+ Parser ! {get_bytes, self()},
+ receive {bytes, Bytes} -> Bytes end
+ end,
+ Atts2 = lists:map(
+ fun(#att{data=follows}=A) ->
+ A#att{data=ReadAttachmentDataFun};
+ (A) ->
+ A
+ end, Doc#doc.atts),
+ Doc#doc{atts=Atts2}.
+
+mp_parse_doc({headers, H}, []) ->
+ {"application/json", _} = proplists:get_value("content-type", H),
+ fun (Next) ->
+ mp_parse_doc(Next, [])
+ end;
+mp_parse_doc({body, Bytes}, AccBytes) ->
+ fun (Next) ->
+ mp_parse_doc(Next, [Bytes | AccBytes])
+ end;
+mp_parse_doc(body_end, AccBytes) ->
+ receive {get_doc_bytes, From} ->
+ From ! {doc_bytes, lists:reverse(AccBytes)}
+ end,
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end.
+
+mp_parse_atts(eof) ->
+ ok;
+mp_parse_atts({headers, _H}) ->
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end;
+mp_parse_atts({body, Bytes}) ->
+ receive {get_bytes, From} ->
+ From ! {bytes, Bytes}
+ end,
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end;
+mp_parse_atts(body_end) ->
+ fun (Next) ->
+ mp_parse_atts(Next)
+ end.
+
+
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index a49b4a4d..a0955daa 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -23,7 +23,7 @@
-export([start_response_length/4, send/2]).
-export([start_json_response/2, start_json_response/3, end_json_response/1]).
-export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]).
--export([send_json/2,send_json/3,send_json/4,last_chunk/1]).
+-export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]).
start_link() ->
% read config and register for configuration changes
@@ -337,6 +337,7 @@ json_body_obj(Httpd) ->
end.
+
doc_etag(#doc{revs={Start, [DiskRev|_]}}) ->
"\"" ++ ?b2l(couch_doc:rev_to_str({Start, DiskRev})) ++ "\"".
@@ -628,3 +629,140 @@ server_header() ->
OTPVersion = "R" ++ integer_to_list(erlang:system_info(compat_rel)) ++ "B",
[{"Server", "CouchDB/" ++ couch_server:get_version() ++
" (Erlang OTP/" ++ OTPVersion ++ ")"}].
+
+
+-record(mp, {boundary, buffer, data_fun, callback}).
+
+
+parse_multipart_request(ContentType, DataFun, Callback) ->
+ Boundary0 = iolist_to_binary(get_boundary(ContentType)),
+ Boundary = <<"\r\n--", Boundary0/binary>>,
+ Mp = #mp{boundary= Boundary,
+ buffer= <<>>,
+ data_fun=DataFun,
+ callback=Callback},
+ {Mp2, _NilCallback} = read_until(Mp, <<"--", Boundary0/binary>>,
+ fun(Next)-> nil_callback(Next) end),
+ #mp{buffer=Buffer, data_fun=DataFun2, callback=Callback2} =
+ parse_part_header(Mp2),
+ {Buffer, DataFun2, Callback2}.
+
+nil_callback(_Data)->
+ fun(Next) -> nil_callback(Next) end.
+
+get_boundary(ContentType) ->
+ {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType),
+ case proplists:get_value("boundary", Opts) of
+ S when is_list(S) ->
+ S
+ end.
+
+
+
+split_header(<<>>) ->
+ [];
+split_header(Line) ->
+ {Name, [$: | Value]} = lists:splitwith(fun (C) -> C =/= $: end,
+ binary_to_list(Line)),
+ [{string:to_lower(string:strip(Name)),
+ mochiweb_util:parse_header(Value)}].
+
+read_until(#mp{data_fun=DataFun, buffer=Buffer}=Mp, Pattern, Callback) ->
+ case find_in_binary(Pattern, Buffer) of
+ not_found ->
+ Callback2 = Callback(Buffer),
+ {Buffer2, DataFun2} = DataFun(),
+ Buffer3 = iolist_to_binary(Buffer2),
+ read_until(Mp#mp{data_fun=DataFun2,buffer=Buffer3}, Pattern, Callback2);
+ {partial, Skip} ->
+ <<DataChunk:Skip/binary, Rest/binary>> = Buffer,
+ Callback2 = Callback(DataChunk),
+ {Buffer2, DataFun2} = DataFun(),
+ Buffer3 = iolist_to_binary(Buffer2),
+ read_until(Mp#mp{data_fun=DataFun2,
+ buffer= <<Buffer3/binary, Rest/binary>>},
+ Pattern, Callback2);
+ {exact, Skip} ->
+ PatternLen = size(Pattern),
+ <<DataChunk:Skip/binary, _:PatternLen/binary, Rest/binary>> = Buffer,
+ Callback2 = Callback(DataChunk),
+ {Mp#mp{buffer= Rest}, Callback2}
+ end.
+
+
+parse_part_header(#mp{callback=UserCallBack}=Mp) ->
+ {Mp2, AccCallback} = read_until(Mp, <<"\r\n\r\n">>,
+ fun(Next) -> acc_callback(Next, []) end),
+ HeaderData = AccCallback(get_data),
+
+ Headers =
+ lists:foldl(fun(Line, Acc) ->
+ split_header(Line) ++ Acc
+ end, [], re:split(HeaderData,<<"\r\n">>, [])),
+ NextCallback = UserCallBack({headers, Headers}),
+ parse_part_body(Mp2#mp{callback=NextCallback}).
+
+parse_part_body(#mp{boundary=Prefix, callback=Callback}=Mp) ->
+ {Mp2, WrappedCallback} = read_until(Mp, Prefix,
+ fun(Data) -> body_callback_wrapper(Data, Callback) end),
+ Callback2 = WrappedCallback(get_callback),
+ Callback3 = Callback2(body_end),
+ case check_for_last(Mp2#mp{callback=Callback3}) of
+ {last, #mp{callback=Callback3}=Mp3} ->
+ Mp3#mp{callback=Callback3(eof)};
+ {more, Mp3} ->
+ parse_part_header(Mp3)
+ end.
+
+acc_callback(get_data, Acc)->
+ iolist_to_binary(lists:reverse(Acc));
+acc_callback(Data, Acc)->
+ fun(Next) -> acc_callback(Next, [Data | Acc]) end.
+
+body_callback_wrapper(get_callback, Callback) ->
+ Callback;
+body_callback_wrapper(Data, Callback) ->
+ Callback2 = Callback({body, Data}),
+ fun(Next) -> body_callback_wrapper(Next, Callback2) end.
+
+
+check_for_last(#mp{buffer=Buffer, data_fun=DataFun}=Mp) ->
+ case Buffer of
+ <<"--",_/binary>> -> {last, Mp};
+ <<_, _, _/binary>> -> {more, Mp};
+ _ -> % not long enough
+ {Data, DataFun2} = DataFun(),
+ check_for_last(Mp#mp{buffer= <<Buffer/binary, Data/binary>>,
+ data_fun = DataFun2})
+ end.
+
+find_in_binary(B, Data) when size(B) > 0 ->
+ case size(Data) - size(B) of
+ Last when Last < 0 ->
+ partial_find(B, Data, 0, size(Data));
+ Last ->
+ find_in_binary(B, size(B), Data, 0, Last)
+ end.
+
+find_in_binary(B, BS, D, N, Last) when N =< Last->
+ case D of
+ <<_:N/binary, B:BS/binary, _/binary>> ->
+ {exact, N};
+ _ ->
+ find_in_binary(B, BS, D, 1 + N, Last)
+ end;
+find_in_binary(B, BS, D, N, Last) when N =:= 1 + Last ->
+ partial_find(B, D, N, BS - 1).
+
+partial_find(_B, _D, _N, 0) ->
+ not_found;
+partial_find(B, D, N, K) ->
+ <<B1:K/binary, _/binary>> = B,
+ case D of
+ <<_Skip:N/binary, B1:K/binary>> ->
+ {partial, N, K};
+ _ ->
+ partial_find(B, D, 1 + N, K - 1)
+ end.
+
+
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index da62ccb4..0cad21e1 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -29,7 +29,8 @@
options = [],
rev = nil,
open_revs = [],
- show = nil
+ show = nil,
+ atts_since = nil
}).
% Database request handlers
@@ -476,7 +477,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
case couch_db:purge_docs(Db, IdsRevs2) of
{ok, PurgeSeq, PurgedIdsRevs} ->
- PurgedIdsRevs2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
+ PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
Error ->
throw(Error)
@@ -507,7 +508,8 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) ->
{JsonDocIdRevs} = couch_httpd:json_body_obj(Req),
JsonDocIdRevs2 = [{Id, [couch_doc:parse_rev(RevStr) || RevStr <- RevStrs]} || {Id, RevStrs} <- JsonDocIdRevs],
{ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2),
- Results2 = [{Id, [couch_doc:rev_to_str(Rev) || Rev <- Revs]} || {Id, Revs} <- Results],
+ io:format("Results:~p~n", [Results]),
+ Results2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs, _} <- Results],
send_json(Req, {[
{missing_revs, {Results2}}
]});
@@ -515,6 +517,29 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) ->
db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
+
+db_req(#httpd{method='POST',path_parts=[_,<<"_revs_diff">>]}=Req, Db) ->
+ {JsonDocIdRevs} = couch_httpd:json_body_obj(Req),
+ JsonDocIdRevs2 =
+ [{Id, couch_doc:parse_revs(RevStrs)} || {Id, RevStrs} <- JsonDocIdRevs],
+ {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2),
+ Results2 =
+ lists:map(fun({Id, MissingRevs, PossibleAncestors}) ->
+ {Id,
+ {[{missing, couch_doc:revs_to_strs(MissingRevs)}] ++
+ if PossibleAncestors == [] ->
+ [];
+ true ->
+ [{possible_ancestors,
+ couch_doc:revs_to_strs(PossibleAncestors)}]
+ end}}
+ end, Results),
+ send_json(Req, {[{missing, {Results2}}]});
+
+db_req(#httpd{path_parts=[_,<<"_revs_diff">>]}=Req, _Db) ->
+ send_method_not_allowed(Req, "POST");
+
+
db_req(#httpd{method='PUT',path_parts=[_,<<"_admins">>]}=Req,
Db) ->
Admins = couch_httpd:json_body(Req),
@@ -655,9 +680,12 @@ db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
couch_doc_open(Db, DocId, nil, []),
case couch_httpd:qs_value(Req, "rev") of
undefined ->
- update_doc(Req, Db, DocId, {[{<<"_deleted">>,true}]});
+ update_doc(Req, Db, DocId,
+ couch_doc_from_req(Req, DocId, {[{<<"_deleted">>,true}]}));
Rev ->
- update_doc(Req, Db, DocId, {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]})
+ update_doc(Req, Db, DocId,
+ couch_doc_from_req(Req, DocId,
+ {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}))
end;
db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
@@ -665,23 +693,21 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
show = Format,
rev = Rev,
open_revs = Revs,
- options = Options
+ options = Options,
+ atts_since = AttsSince
} = parse_doc_query(Req),
case Format of
nil ->
case Revs of
[] ->
Doc = couch_doc_open(Db, DocId, Rev, Options),
- DiskEtag = couch_httpd:doc_etag(Doc),
- case Doc#doc.meta of
- [] ->
- % output etag only when we have no meta
- couch_httpd:etag_respond(Req, DiskEtag, fun() ->
- send_json(Req, 200, [{"Etag", DiskEtag}], couch_doc:to_json_obj(Doc, Options))
- end);
- _ ->
- send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options))
- end;
+ Options2 =
+ if AttsSince /= nil ->
+ RevPos = find_ancestor_rev_pos(Doc#doc.revs, AttsSince),
+ [{atts_after_revpos, RevPos} | Options];
+ true -> Options
+ end,
+ send_doc(Req, Doc, Options2);
_ ->
{ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
{ok, Resp} = start_json_response(Req, 200),
@@ -713,7 +739,7 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
couch_doc:validate_docid(DocId),
- case couch_httpd:header_value(Req, "content-type") of
+ case couch_httpd:header_value(Req, "Content-Type") of
"multipart/form-data" ++ _Rest ->
ok;
_Else ->
@@ -756,27 +782,38 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
couch_doc:validate_docid(DocId),
- Json = couch_httpd:json_body(Req),
- case couch_httpd:qs_value(Req, "batch") of
- "ok" ->
- % batch
- Doc = couch_doc_from_req(Req, DocId, Json),
+
+ Loc = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
+ RespHeaders = [{"Location", Loc}],
+ case couch_httpd:header_value(Req, "content-type") of
+ ("multipart/related" ++ _Rest) = ContentType->
+ io:format("Huh!~n"),
+ Doc0 = couch_doc:doc_from_multi_part_stream(ContentType,
+ fun() -> receive_request_data(Req) end),
+ Doc = couch_doc_from_req(Req, DocId, Doc0),
+ update_doc(Req, Db, DocId, Doc, RespHeaders);
+ _ ->
+ case couch_httpd:qs_value(Req, "batch") of
+ "ok" ->
+ % batch
+ Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)),
- spawn(fun() ->
- case catch(couch_db:update_doc(Db, Doc, [])) of
- {ok, _} -> ok;
- Error ->
- ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error])
- end
- end),
- send_json(Req, 202, [], {[
- {ok, true},
- {id, DocId}
- ]});
- _Normal ->
- % normal
- Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
- update_doc(Req, Db, DocId, Json, [{"Location", Location}])
+ spawn(fun() ->
+ case catch(couch_db:update_doc(Db, Doc, [])) of
+ {ok, _} -> ok;
+ Error ->
+ ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error])
+ end
+ end),
+ send_json(Req, 202, [], {[
+ {ok, true},
+ {id, DocId}
+ ]});
+ _Normal ->
+ % normal
+ Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)),
+ update_doc(Req, Db, DocId, Doc, RespHeaders)
+ end
end;
db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
@@ -799,7 +836,66 @@ db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
db_doc_req(Req, _Db, _DocId) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY").
+find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
+ 0;
+find_ancestor_rev_pos(_DocRevs, []) ->
+ 0;
+find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
+ case lists:member({RevPos, RevId}, AttsSinceRevs) of
+ true ->
+ RevPos;
+ false ->
+ find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
+ end.
+send_doc(Req, Doc, Options) ->
+ case Doc#doc.meta of
+ [] ->
+ DiskEtag = couch_httpd:doc_etag(Doc),
+ % output etag only when we have no meta
+ couch_httpd:etag_respond(Req, DiskEtag, fun() ->
+ send_doc_efficiently(Req, Doc, [{"Etag", DiskEtag}], Options)
+ end);
+ _ ->
+ send_doc_efficiently(Req, Doc, [], Options)
+ end.
+
+
+send_doc_efficiently(Req, #doc{atts=[]}=Doc, Headers, Options) ->
+ send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
+send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
+ case lists:member(attachments, Options) orelse
+ proplists:is_defined(atts_after_revpos, Options) of
+ true ->
+ AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of
+ undefined -> [];
+ AcceptHeader -> string:tokens(AcceptHeader, ", ")
+ end,
+ case lists:member(AcceptedTypes, "multipart/related") of
+ false ->
+ send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options));
+ true ->
+ Boundary = couch_uuids:random(),
+ JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
+ AttsSinceRevPos = proplists:get_value(atts_after_revpos, Options, 0),
+ Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
+ AttsSinceRevPos),
+ CType = {<<"content-type">>,
+ <<"multipart/related; boundary=", Boundary/binary>>},
+ Resp = start_response_length(Req, 200, [CType | Headers], Len),
+ couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
+ AttsSinceRevPos,
+ fun(Data) -> couch_httpd:send(Resp, Data) end)
+ end;
+ false ->
+ send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options))
+ end.
+
+
+
+receive_request_data(Req) ->
+ {couch_httpd:recv(Req, 0), fun() -> receive_request_data(Req) end}.
+
update_doc_result_to_json({{Id, Rev}, Error}) ->
{_Code, Err, Msg} = couch_httpd:error_info(Error),
{[{id, Id}, {rev, couch_doc:rev_to_str(Rev)},
@@ -814,12 +910,10 @@ update_doc_result_to_json(DocId, Error) ->
{[{id, DocId}, {error, ErrorStr}, {reason, Reason}]}.
-update_doc(Req, Db, DocId, Json) ->
- update_doc(Req, Db, DocId, Json, []).
-
-update_doc(Req, Db, DocId, Json, Headers) ->
- #doc{deleted=Deleted} = Doc = couch_doc_from_req(Req, DocId, Json),
+update_doc(Req, Db, DocId, Doc) ->
+ update_doc(Req, Db, DocId, Doc, []).
+update_doc(Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers) ->
case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of
"true" ->
Options = [full_commit];
@@ -837,21 +931,22 @@ update_doc(Req, Db, DocId, Json, Headers) ->
{id, DocId},
{rev, NewRevStr}]}).
-couch_doc_from_req(Req, DocId, Json) ->
- Doc = couch_doc:from_json_obj(Json),
+couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc) ->
validate_attachment_names(Doc),
ExplicitDocRev =
- case Doc#doc.revs of
+ case Revs of
{Start,[RevId|_]} -> {Start, RevId};
_ -> undefined
end,
case extract_header_rev(Req, ExplicitDocRev) of
missing_rev ->
- Revs = {0, []};
+ Revs2 = {0, []};
{Pos, Rev} ->
- Revs = {Pos, [Rev]}
+ Revs2 = {Pos, [Rev]}
end,
- Doc#doc{id=DocId, revs=Revs}.
+ Doc#doc{id=DocId, revs=Revs2};
+couch_doc_from_req(Req, DocId, Json) ->
+ couch_doc_from_req(Req, DocId, couch_doc:from_json_obj(Json)).
% Useful for debugging
@@ -1034,7 +1129,10 @@ parse_doc_query(Req) ->
Args#doc_query_args{open_revs=all};
{"open_revs", RevsJsonStr} ->
JsonArray = ?JSON_DECODE(RevsJsonStr),
- Args#doc_query_args{open_revs=[couch_doc:parse_rev(Rev) || Rev <- JsonArray]};
+ Args#doc_query_args{open_revs=couch_doc:parse_revs(JsonArray)};
+ {"atts_since", RevsJsonStr} ->
+ JsonArray = ?JSON_DECODE(RevsJsonStr),
+ Args#doc_query_args{atts_since = couch_doc:parse_revs(JsonArray)};
{"show", FormatStr} ->
Args#doc_query_args{show=parse_doc_format(FormatStr)};
_Else -> % unknown key value pair, ignore.
diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl
index 847a00db..5790dd71 100644
--- a/src/couchdb/couch_rep_missing_revs.erl
+++ b/src/couchdb/couch_rep_missing_revs.erl
@@ -171,7 +171,7 @@ get_missing_revs(Target, Changes) ->
SeqDict = changes_dictionary(Changes),
{[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
{ok, Results} = couch_db:get_missing_revs(Target, IdRevsList),
- {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs} <- Results]}.
+ {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}.
changes_dictionary(ChangeList) ->
KVs = [{proplists:get_value(<<"id">>,C), proplists:get_value(<<"seq">>,C)}
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index 7f061500..a66454c8 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -218,7 +218,7 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
{RevLists, _, _} = lists:foldl(fun split_revlist/2,
- {[[]], BaseLength, BaseLength}, couch_doc:rev_to_strs(Revs)),
+ {[[]], BaseLength, BaseLength}, couch_doc:revs_to_strs(Revs)),
Requests = [BaseReq#http_db{
qs = [{open_revs, ?JSON_ENCODE(RevList)} | BaseQS]
diff --git a/src/mochiweb/mochiweb_multipart.erl b/src/mochiweb/mochiweb_multipart.erl
index 9eb4badd..b9631613 100644
--- a/src/mochiweb/mochiweb_multipart.erl
+++ b/src/mochiweb/mochiweb_multipart.erl
@@ -158,7 +158,7 @@ feed_mp(body, State=#mp{boundary=Prefix, buffer=Buffer, callback=Callback}) ->
end.
get_boundary(ContentType) ->
- {"multipart/form-data", Opts} = mochiweb_util:parse_header(ContentType),
+ {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType),
case proplists:get_value("boundary", Opts) of
S when is_list(S) ->
S