From 91bf33fdc69c2087707795b8822b0fa7617f8709 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Fri, 17 Jul 2009 21:33:41 +0000 Subject: Deterministic revids, MD5 checking of documents, added tracking of rev when an attachment is edited to allow attachment level replication. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795232 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 304 ++++++++++++++++++++++++++++------------------- 1 file changed, 179 insertions(+), 125 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 9cb887ea..9dc1fce8 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -24,7 +24,7 @@ -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([changes_since/5,read_doc/2]). +-export([changes_since/5,read_doc/2,new_revid/1]). -include("couch_db.hrl"). @@ -295,11 +295,11 @@ validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) -> end. -prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc, - OldFullDocInfo, LeafRevsDict) -> - case PrevRevs of +prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, + OldFullDocInfo, LeafRevsDict, AllowConflict) -> + case Revs of [PrevRev|_] -> - case dict:find({RevStart-1, PrevRev}, LeafRevsDict) of + case dict:find({RevStart, PrevRev}, LeafRevsDict) of {ok, {Deleted, DiskSp, DiskRevs}} -> case couch_doc:has_stubs(Doc) of true -> @@ -310,13 +310,21 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end, {validate_doc_update(Db, Doc, LoadDiskDoc), Doc} end; + error when AllowConflict -> + {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; error -> {conflict, Doc} end; [] -> % new doc, and we have existing revs. + % reuse existing deleted doc if OldFullDocInfo#full_doc_info.deleted -> % existing docs are deletions + #doc_info{revs=[#rev_info{rev={Pos, DelRevId}}|_]} = + couch_doc:to_doc_info(OldFullDocInfo), + Doc2 = Doc#doc{revs={Pos, [DelRevId]}}, + {validate_doc_update(Db, Doc2, fun() -> nil end), Doc2}; + AllowConflict -> {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; true -> {conflict, Doc} @@ -325,49 +333,51 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc -prep_and_validate_updates(_Db, [], [], AccPrepped, AccFatalErrors) -> +prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, + AccFatalErrors) -> {AccPrepped, AccFatalErrors}; -prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AccPrepped, AccErrors) -> +prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], + AllowConflict, AccPrepped, AccErrors) -> [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> case Revs of - {Pos, [NewRev,_OldRev|_]} -> - % old revs specified but none exist, a conflict - {AccBucket, [{{Id, {Pos, NewRev}}, conflict} | AccErrors2]}; - {Pos, [NewRev]} -> + {0, []} -> case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> {[Doc | AccBucket], AccErrors2}; Error -> - {AccBucket, [{{Id, {Pos, NewRev}}, Error} | AccErrors2]} - end + {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} + end; + _ -> + % old revs specified but none exist, a conflict + {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} end end, {[], AccErrors}, DocBucket), - prep_and_validate_updates(Db, RestBuckets, RestLookups, + prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3); prep_and_validate_updates(Db, [DocBucket|RestBuckets], [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], - AccPrepped, AccErrors) -> + AllowConflict, AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]), {PreppedBucket, AccErrors3} = lists:foldl( fun(Doc, {Docs2Acc, AccErrors2}) -> case prep_and_validate_update(Db, Doc, OldFullDocInfo, - LeafRevsDict) of + LeafRevsDict, AllowConflict) of {ok, Doc2} -> {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs={Pos, [NewRev|_]}}} -> + {Error, #doc{id=Id,revs=Revs}} -> % Record the error - {Docs2Acc, [{{Id, {Pos, NewRev}}, Error} |AccErrors2]} + {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} end end, {[], AccErrors}, DocBucket), - prep_and_validate_updates(Db, RestBuckets, RestLookups, [PreppedBucket | AccPrepped], AccErrors3). + prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3). update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> @@ -408,32 +418,12 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, {Start, Path}} -> % our unflushed doc is a leaf node. Go back on the path % to find the previous rev that's on disk. - PrevRevResult = - case couch_doc:has_stubs(Doc) of - true -> - [_PrevRevFull | [PrevRevFull | _]=PrevPath] = Path, - case PrevRevFull of - {_RevId, ?REV_MISSING} -> - conflict; - {RevId, {IsDel, DiskSp, _Seq}} -> - DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath), - Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), - {ok, Doc2, fun() -> DiskDoc end} - end; - false -> - {ok, Doc, - fun() -> + LoadPrevRevFun = fun() -> make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) - end} - end, - case PrevRevResult of - {ok, NewDoc, LoadPrevRevFun} -> - case validate_doc_update(Db, NewDoc, LoadPrevRevFun) of - ok -> - {[NewDoc | AccValidated], AccErrors2}; - Error -> - {AccValidated, [{NewDoc, Error} | AccErrors2]} - end; + end, + case validate_doc_update(Db, Doc, LoadPrevRevFun) of + ok -> + {[Doc | AccValidated], AccErrors2}; Error -> {AccValidated, [{Doc, Error} | AccErrors2]} end; @@ -444,9 +434,47 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end end, {[], AccErrors}, Bucket), - prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3) + prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, + [ValidatedBucket | AccPrepped], AccErrors3) end. + + +new_revid(#doc{body=Body,revs={OldStart,OldRevs}, + atts=Atts,deleted=Deleted}) -> + case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M /= <<>>] of + Atts2 when length(Atts) /= length(Atts2) -> + % We must have old style non-md5 attachments + ?l2b(integer_to_list(couch_util:rand32())); + Atts2 -> + OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end, + erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2])) + end. + +new_revs([], OutBuckets, IdRevsAcc) -> + {lists:reverse(OutBuckets), IdRevsAcc}; +new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> + {NewBucket, IdRevsAcc3} = lists:mapfoldl( + fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> + NewRevId = new_revid(Doc), + {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, + [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} + end, IdRevsAcc, Bucket), + new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). + +check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> + throw({bad_request, <<"Duplicate attachments">>}); +check_dup_atts([_, _ | Rest]) -> + check_dup_atts(Rest); +check_dup_atts(_) -> + ok. + +sort_and_check_atts(#doc{atts=Atts}=Doc) -> + Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), + check_dup_atts(Atts2), + Doc#doc{atts=Atts2}. + + update_docs(Db, Docs, Options, replicated_changes) -> couch_stats_collector:increment({couchdb, database_writes}), DocBuckets = group_alike_docs(Docs), @@ -467,70 +495,74 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]), + DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd) + || Doc <- Bucket] || Bucket <- DocBuckets3], + {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; update_docs(Db, Docs, Options, interactive_edit) -> couch_stats_collector:increment({couchdb, database_writes}), AllOrNothing = lists:member(all_or_nothing, Options), % go ahead and generate the new revision ids for the documents. - Docs2 = lists:map( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc) -> + % separate out the NonRep documents from the rest of the documents + {Docs2, NonRepDocs} = lists:foldl( + fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> case Id of <> -> - Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end, - Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}}; - _ -> - Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}} + {DocsAcc, [Doc | NonRepDocsAcc]}; + Id-> + {[Doc | DocsAcc], NonRepDocsAcc} end - end, Docs), + end, {[], []}, Docs), + DocBuckets = group_alike_docs(Docs2), case (Db#db.validate_doc_funs /= []) orelse lists:any( fun(#doc{id= <>}) -> true; - (#doc{attachments=Atts}) -> + (#doc{atts=Atts}) -> Atts /= [] - end, Docs) of + end, Docs2) of true -> % lookup the doc by id and get the most recent Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], ExistingDocInfos = get_full_doc_infos(Db, Ids), - {DocBucketsPrepped, Failures} = - case AllOrNothing of - true -> - prep_and_validate_replicated_updates(Db, DocBuckets, - ExistingDocInfos, [], []); - false -> - prep_and_validate_updates(Db, DocBuckets, ExistingDocInfos, [], []) - end, + {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db, + DocBuckets, ExistingDocInfos, AllOrNothing, [], []), % strip out any empty buckets DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped]; false -> - Failures = [], + PreCommitFailures = [], DocBuckets2 = DocBuckets end, - if (AllOrNothing) and (Failures /= []) -> - {aborted, Failures}; + if (AllOrNothing) and (PreCommitFailures /= []) -> + {aborted, lists:map( + fun({{Id,{Pos, [RevId|_]}}, Error}) -> + {{Id, {Pos, RevId}}, Error}; + ({{Id,{0, []}}, Error}) -> + {{Id, {0, <<>>}}, Error} + end, PreCommitFailures)}; true -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, - {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2), - FailDict = dict:from_list(CommitFailures ++ Failures), - % the output for each is either {ok, NewRev} or Error + DocBuckets3 = [[ + doc_flush_atts(set_new_att_revpos( + sort_and_check_atts(Doc)), Db#db.fd) + || Doc <- B] || B <- DocBuckets2], + {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), + + {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), + + ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), {ok, lists:map( - fun(#doc{id=Id,revs={Pos, [NewRevId|_]}}) -> - case dict:find({Id, {Pos, NewRevId}}, FailDict) of - {ok, Error} -> - Error; - error -> - {ok, {Pos, NewRevId}} - end - end, Docs2)} + fun(#doc{id=Id,revs={Pos, RevIds}}) -> + {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), + Result + end, Docs)} end. % Returns the first available document on disk. Input list is a full rev path @@ -545,78 +577,81 @@ make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) -> write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, - Options) -> - % flush unwritten binaries to disk. - DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of - {ok, Conflicts} -> {ok, Conflicts}; + NonRepDocs, Options) -> + case gen_server:call(UpdatePid, + {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of + {ok, Results} -> {ok, Results}; retry -> % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), - DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), - case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of - {ok, Conflicts} -> {ok, Conflicts}; + case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of + {ok, Results} -> {ok, Results}; retry -> throw({update_error, compaction_retry}) end end. +set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> + Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) -> + % already commited to disk, do not set new rev + Att; + (Att) -> + Att#att{revpos=RevPos+1} + end, Atts)}. + -doc_flush_binaries(Doc, Fd) -> - NewAttachments = lists:map( - fun({Key, {Type, BinValue}}) -> - NewBinValue = flush_binary(Fd, BinValue), - {Key, {Type, NewBinValue}} - end, Doc#doc.attachments), - Doc#doc{attachments = NewAttachments}. +doc_flush_atts(Doc, Fd) -> + Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. -flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd -> - % already written to our file, nothing to write - {Fd, StreamPointer, Len}; +check_md5(_NewSig, <<>>) -> ok; +check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok; +check_md5(_, _) -> throw(data_corruption). -flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) -> - {NewStreamData, Len} = - couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd), - {Fd, NewStreamData, Len}; +flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> + % already written to our file, nothing to write + Att; -flush_binary(Fd, {OtherFd, StreamPointer, Len}) -> - {NewStreamData, Len} = +flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) -> + {NewStreamData, Len, Md5} = couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - {Fd, NewStreamData, Len}; + check_md5(Md5, InMd5), + Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len}; -flush_binary(Fd, Bin) when is_binary(Bin) -> - with_stream(Fd, fun(OutputStream) -> - couch_stream:write(OutputStream, Bin) +flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> + with_stream(Fd, Att, fun(OutputStream) -> + couch_stream:write(OutputStream, Data) end); -flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) -> - with_stream(Fd, fun(OutputStream) -> - % StreamFun(MaxChunkSize, WriterFun) must call WriterFun +flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> + with_stream(Fd, Att, fun(OutputStream) -> + % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, - StreamFun(4096, + Fun(4096, % WriterFun({Length, Binary}, State) % WriterFun({0, _Footers}, State) % Called with Length == 0 on the last time. % WriterFun returns NewState. fun({0, _Footers}, _) -> ok; - ({_Length, Bin}, _) -> - couch_stream:write(OutputStream, Bin) + ({_Length, Chunk}, _) -> + couch_stream:write(OutputStream, Chunk) end, ok) end); -flush_binary(Fd, {Fun, Len}) when is_function(Fun) -> - with_stream(Fd, fun(OutputStream) -> +flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> + with_stream(Fd, Att, fun(OutputStream) -> write_streamed_attachment(OutputStream, Fun, Len) end). -with_stream(Fd, Fun) -> +with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> {ok, OutputStream} = couch_stream:open(Fd), Fun(OutputStream), - {StreamInfo, Len} = couch_stream:close(OutputStream), - {Fd, StreamInfo, Len}. + {StreamInfo, Len, Md5} = couch_stream:close(OutputStream), + check_md5(Md5, InMd5), + Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}. write_streamed_attachment(_Stream, _F, 0) -> @@ -832,11 +867,15 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. -read_doc(Fd, Pos) when is_integer(Pos) -> - couch_file:pread_term(Fd, Pos); -read_doc(Fd, OldStyleStreamPointer) -> +read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) -> + % 09 UPGRADE CODE + couch_stream:old_read_term(Fd, OldStreamPointer); +read_doc(#db{header=#db_header{disk_version=Version},fd=Fd}, Pos) + when Version == 3 -> % 09 UPGRADE CODE - couch_stream:old_read_term(Fd, OldStyleStreamPointer). + couch_file:pread_term(Fd, Pos); +read_doc(#db{fd=Fd}, Pos) -> + couch_file:pread_term_md5(Fd, Pos). doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> @@ -850,21 +889,36 @@ doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) -> - {BodyData, BinValues} = +make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> + {BodyData, Atts} = case Bp of nil -> {[], []}; _ -> - {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp), + {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), {BodyData0, - [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} + lists:map( + fun({Name,Type,Sp,Len,RevPos,Md5}) -> + #att{name=Name, + type=Type, + len=Len, + md5=Md5, + revpos=RevPos, + data={Fd,Sp}}; + ({Name,{Type,Sp,Len}}) -> + #att{name=Name, + type=Type, + len=Len, + md5= <<>>, + revpos=0, + data={Fd,Sp}} + end, Atts0)} end, #doc{ id = Id, revs = RevisionPath, body = BodyData, - attachments = BinValues, + atts = Atts, deleted = Deleted }. -- cgit v1.2.3