From 91bf33fdc69c2087707795b8822b0fa7617f8709 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Fri, 17 Jul 2009 21:33:41 +0000 Subject: Deterministic revids, MD5 checking of documents, added tracking of rev when an attachment is edited to allow attachment level replication. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795232 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db_updater.erl | 182 +++++++++++++++++++++++---------------- 1 file changed, 108 insertions(+), 74 deletions(-) (limited to 'src/couchdb/couch_db_updater.erl') diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index dacf2515..fd1d340f 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -44,12 +44,15 @@ terminate(Reason, _Srv) -> handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; -handle_call({update_docs, DocActions, Options}, _From, Db) -> - try update_docs_int(Db, DocActions, Options) of - {ok, Conflicts, Db2} -> +handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) -> + try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of + {ok, Failures, Db2} -> ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), - couch_db_update_notifier:notify({updated, Db2#db.name}), - {reply, {ok, Conflicts}, Db2} + if Db2#db.update_seq /= Db#db.update_seq -> + couch_db_update_notifier:notify({updated, Db2#db.name}); + true -> ok + end, + {reply, {ok, Failures}, Db2} catch throw: retry -> {reply, retry, Db} @@ -289,6 +292,7 @@ init_db(DbName, Filepath, Fd, Header0) -> case element(2, Header1) of 1 -> Header1#db_header{unused = 0}; % 0.9 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10 + 3 -> Header1; % post 0.9 and pre 0.10 ?LATEST_DISK_VERSION -> Header1; _ -> throw({database_disk_version_error, "Incorrect disk header version"}) end, @@ -364,31 +368,39 @@ refresh_validate_doc_funs(Db) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> +flush_trees(#db{fd=Fd,header=Header}=Db, + [InfoUnflushed | RestUnflushed], AccFlushed) -> #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, Flushed = couch_key_tree:map( fun(_Rev, Value) -> case Value of - #doc{attachments=Atts,deleted=IsDeleted}=Doc -> + #doc{atts=Atts,deleted=IsDeleted}=Doc -> % this node value is actually an unwritten document summary, % write to disk. - % make sure the Fd in the written bins is the same Fd we are. - Bins = + % make sure the Fd in the written bins is the same Fd we are + % and convert bins, removing the FD. + % All bins should have been written to disk already. + DiskAtts = case Atts of [] -> []; - [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> - % convert bins, removing the FD. - % All bins should have been flushed to disk already. - [{BinName, {BinType, BinSp, BinLen}} - || {BinName, {BinType, {_Fd, BinSp, BinLen}}} + [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd -> + [{N,T,P,L,R,M} + || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L} <- Atts]; _ -> % BinFd must not equal our Fd. This can happen when a database % is being switched out during a compaction - ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), + ?LOG_DEBUG("File where the attachments are written has" + " changed. Possibly retrying.", []), throw(retry) end, - {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}), + {ok, NewSummaryPointer} = + case Header#db_header.disk_version < 4 of + true -> + couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); + false -> + couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) + end, {IsDeleted, NewSummaryPointer, UpdateSeq}; _ -> Value @@ -403,14 +415,40 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, {NewRevTree, NewConflicts} = lists:foldl( - fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) -> - case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of - {_NewTree, conflicts} - when (not OldDeleted) and (not MergeConflicts) -> - {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]}; - {NewTree, _} -> + fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) -> + if not MergeConflicts -> + case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + {_NewTree, conflicts} when (not OldDeleted) -> + {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}; + {NewTree, no_conflicts} when AccTree == NewTree -> + % the tree didn't change at all + % meaning we are saving a rev that's already + % been editted again. + if (Pos == 1) and OldDeleted -> + % this means we are recreating a brand new document + % into a state that already existed before. + % put the rev into a subsequent edit of the deletion + #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} = + couch_doc:to_doc_info(OldDocInfo), + NewRevId = couch_db:new_revid( + NewDoc#doc{revs={OldPos, [OldRev]}}), + NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, + {NewTree2, _} = couch_key_tree:merge(AccTree, + [couch_db:doc_to_tree(NewDoc2)]), + % we changed the rev id, this tells the caller we did. + {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}} + | AccConflicts2]}; + true -> + {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]} + end; + {NewTree, _} -> + {NewTree, AccConflicts2} + end; + true -> + {NewTree, _} = couch_key_tree:merge(AccTree, + [couch_db:doc_to_tree(NewDoc)]), {NewTree, AccConflicts2} - end + end end, {OldTree, AccConflicts}, NewDocs), if NewRevTree == OldTree -> @@ -444,26 +482,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. - -update_docs_int(Db, DocsList, Options) -> +update_docs_int(Db, DocsList, NonRepDocs, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, - % separate out the NonRep documents from the rest of the documents - {DocsList2, NonRepDocs} = lists:foldl( - fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) -> - case Id of - <> -> - {DocsListAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Docs | DocsListAcc], NonRepDocsAcc} - end - end, {[], []}, DocsList), - - Ids = [Id || [#doc{id=Id}|_] <- DocsList2], - + Ids = [Id || [#doc{id=Id}|_] <- DocsList], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( @@ -477,7 +502,7 @@ update_docs_int(Db, DocsList, Options) -> % Merge the new docs into the revision trees. {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( lists:member(merge_conflicts, Options), - DocsList2, OldDocInfos, [], [], [], LastSeq), + DocsList, OldDocInfos, [], [], [], LastSeq), NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), @@ -518,33 +543,43 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) -> - NewRev = list_to_integer(?b2l(RevStr)), + fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) -> + case PrevRevs of + [RevStr|_] -> + PrevRev = list_to_integer(?b2l(RevStr)); + [] -> + PrevRev = 0 + end, OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; not_found -> 0 end, - case OldRev + 1 == NewRev of + case OldRev == PrevRev of true -> case Delete of - false -> {update, {Id, {NewRev, Body}}}; - true -> {remove, Id} + false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}}; + true -> {remove, Id, PrevRevs} end; false -> - {conflict, {Id, {0, RevStr}}} + {conflict, {Id, {0, PrevRevs}}} end - end, Docs, OldDocLookups), - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], - Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries], + BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries], + BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries], + Results = + [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}} + || {remove, Id, PrevRevs} <- BtreeEntries] ++ + [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}} + || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++ + [{IdRevs, conflict} + || {conflict, IdRevs} <- BtreeEntries], {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Conflicts, Db#db{local_docs_btree = Btree2}}. + {ok, Results, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> @@ -594,43 +629,42 @@ commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) -> end. -copy_doc_attachments(SrcFd, SrcSp, DestFd) -> - {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp), +copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> + {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp), % copy the bin values NewBinInfos = lists:map( fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> % 09 UPGRADE CODE - {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, {Type, NewBinSp, Len}}; + {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), + {Name, Type, NewBinSp, Len, Pos, Md5}; ({Name, {Type, BinSp, Len}}) -> - {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, {Type, NewBinSp, Len}} + % 09 UPGRADE CODE + {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, Len, Pos, Md5}; + ({Name, Type, BinSp, Len, RevPos, Md5}) -> + {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, Len, RevPos, Md5} end, BinInfos), {BodyData, NewBinInfos}. -copy_rev_tree_attachments(_SrcFd, _DestFd, []) -> - []; -copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) -> - % root nner node, only copy info/data from leaf nodes - [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]), - [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; -copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> - % This is a leaf node, copy it over - DocBody = copy_doc_attachments(SrcFd, Sp, DestFd), - [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; -copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) -> - % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]. - - -copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> +copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> + couch_key_tree:map( + fun(Rev, {IsDel, Sp, Seq}, leaf) -> + DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd), + {IsDel, DocBody, Seq}; + (_, _, branch) -> + ?REV_MISSING + end, Tree). + + +copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), % write out the attachments NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)} + Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)} end, LookupResults), % write out the docs % we do this in 2 stages so the docs are written out contiguously, making @@ -639,7 +673,7 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> fun(#full_doc_info{rev_tree=RevTree}=Info) -> Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( fun(_Key, {IsDel, DocBody, Seq}) -> - {ok, Pos} = couch_file:append_term(DestFd, DocBody), + {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), {IsDel, Pos, Seq} end, RevTree)} end, NewFullDocInfos0), -- cgit v1.2.3