From 627481ee0ade53d0ceed2e29cbb4e312ecbe3340 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Fri, 13 Nov 2009 20:38:45 +0000 Subject: Initial check-in of APIs for multiple/related supported and incremental replication of only changed attachments. Needs more far more testing and to be hooked up the replicator. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@835981 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 117 +++++++++++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 44 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 2dbb88a3..aa46a347 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -116,22 +116,40 @@ open_doc_revs(Db, Id, Revs, Options) -> [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), Result. +% Each returned result is a list of tuples: +% {Id, MissingRevs, PossibleAncestors} +% if no revs are missing, it's omitted from the results. get_missing_revs(Db, IdRevsList) -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - FullDocInfoResults = get_full_doc_infos(Db, Ids), - Results = lists:zipwith( - fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree}} -> - {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; - not_found -> - {Id, Revs} + Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]), + {ok, find_missing(IdRevsList, Results)}. + +find_missing([], []) -> + []; +find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> + case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of + [] -> + find_missing(RestIdRevs, RestLookupInfo); + MissingRevs -> + #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), + LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], + % Find the revs that are possible parents of this rev + PossibleAncestors = + lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> + % this leaf is a "possible ancenstor" of the missing + % revs if this LeafPos lessthan any of the missing revs + case lists:any(fun({MissingPos, _}) -> + LeafPos < MissingPos end, MissingRevs) of + true -> + [{LeafPos, LeafRevId} | Acc]; + false -> + Acc end - end, - IdRevsList, FullDocInfoResults), - % strip out the non-missing ids - Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []], - {ok, Missing}. + end, [], LeafRevs), + [{Id, MissingRevs, PossibleAncestors} | + find_missing(RestIdRevs, RestLookupInfo)] + end; +find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) -> + [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)]. get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of @@ -334,7 +352,12 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( - fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + case couch_doc:has_stubs(Doc) of + true -> + couch_doc:merge_doc(Doc, #doc{}); % will throw exception + false -> ok + end, case Revs of {0, []} -> case validate_doc_update(Db, Doc, fun() -> nil end) of @@ -385,7 +408,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case OldInfo of not_found -> {ValidatedBucket, AccErrors3} = lists:foldl( - fun(Doc, {AccPrepped2, AccErrors2}) -> + fun(Doc, {AccPrepped2, AccErrors2}) -> + case couch_doc:has_stubs(Doc) of + true -> + couch_doc:merge_doc(Doc, #doc{}); % will throw exception + false -> ok + end, case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> {[Doc | AccPrepped2], AccErrors2}; @@ -411,12 +439,24 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, {Start, Path}} -> % our unflushed doc is a leaf node. Go back on the path % to find the previous rev that's on disk. + LoadPrevRevFun = fun() -> make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) end, - case validate_doc_update(Db, Doc, LoadPrevRevFun) of + + case couch_doc:has_stubs(Doc) of + true -> + DiskDoc = LoadPrevRevFun(), + Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), + GetDiskDocFun = fun() -> DiskDoc end; + false -> + Doc2 = Doc, + GetDiskDocFun = LoadPrevRevFun + end, + + case validate_doc_update(Db, Doc2, GetDiskDocFun) of ok -> - {[Doc | AccValidated], AccErrors2}; + {[Doc2 | AccValidated], AccErrors2}; Error -> {AccValidated, [{Doc, Error} | AccErrors2]} end; @@ -455,18 +495,18 @@ new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> end, IdRevsAcc, Bucket), new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). -check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> +check_dup_atts(#doc{atts=Atts}=Doc) -> + Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), + check_dup_atts2(Atts2), + Doc. + +check_dup_atts2([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> throw({bad_request, <<"Duplicate attachments">>}); -check_dup_atts([_, _ | Rest]) -> - check_dup_atts(Rest); -check_dup_atts(_) -> +check_dup_atts2([_ | Rest]) -> + check_dup_atts2(Rest); +check_dup_atts2(_) -> ok. -sort_and_check_atts(#doc{atts=Atts}=Doc) -> - Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), - check_dup_atts(Atts2), - Doc#doc{atts=Atts2}. - update_docs(Db, Docs, Options, replicated_changes) -> couch_stats_collector:increment({couchdb, database_writes}), @@ -475,7 +515,8 @@ update_docs(Db, Docs, Options, replicated_changes) -> case (Db#db.validate_doc_funs /= []) orelse lists:any( fun(#doc{id= <>}) -> true; - (_) -> false + (#doc{atts=Atts}) -> + Atts /= [] end, Docs) of true -> Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], @@ -488,7 +529,7 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd) + DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -544,7 +585,7 @@ update_docs(Db, Docs, Options, interactive_edit) -> true -> [] end ++ Options, DocBuckets3 = [[ doc_flush_atts(set_new_att_revpos( - sort_and_check_atts(Doc)), Db#db.fd) + check_dup_atts(Doc)), Db#db.fd) || Doc <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), @@ -690,20 +731,8 @@ write_streamed_attachment(_Stream, _F, 0) -> ok; write_streamed_attachment(Stream, F, LenLeft) -> Bin = F(), - TruncatedBin = check_bin_length(LenLeft, Bin), - ok = couch_stream:write(Stream, TruncatedBin), - write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin)). - -%% There was a bug in ibrowse 1.4.1 that would cause it to append a CR to a -%% chunked response when the CR and LF terminating the last data chunk were -%% split across packets. The bug was fixed in version 1.5.0, but we still -%% check for it just in case. -check_bin_length(LenLeft, Bin) when size(Bin) > LenLeft -> - <<_ValidData:LenLeft/binary, Crap/binary>> = Bin, - ?LOG_ERROR("write_streamed_attachment has written too much expected: ~p" ++ - " got: ~p tail: ~p", [LenLeft, size(Bin), Crap]), - exit(replicated_attachment_too_large); -check_bin_length(_, Bin) -> Bin. + ok = couch_stream:write(Stream, Bin), + write_streamed_attachment(Stream, F, LenLeft - size(Bin)). enum_docs_since_reduce_to_count(Reds) -> couch_btree:final_reduce( -- cgit v1.2.3