diff options
author | John Christopher Anderson <jchris@apache.org> | 2009-03-13 22:15:34 +0000 |
---|---|---|
committer | John Christopher Anderson <jchris@apache.org> | 2009-03-13 22:15:34 +0000 |
commit | 9007e2d21dea8b0185c0096b30364a8ee40a3867 (patch) | |
tree | 7d8dacb2c8cd619f18dfab8fdb40d146ac28c85a /src/couchdb/couch_db_updater.erl | |
parent | 65608e14e8911b33c30178d717d745edc9f66c17 (diff) |
Commit Damien's rep_security branch to trunk.
Changes bulk_docs conflict checking.
Breaks file format, see mailing list for data upgrade procedure, or
http://wiki.apache.org/couchdb/Breaking_changes
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@753448 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 147 |
1 files changed, 75 insertions, 72 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 7790d7a4..7752a577 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -44,15 +44,13 @@ handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; handle_call({update_docs, DocActions, Options}, _From, Db) -> try update_docs_int(Db, DocActions, Options) of - {ok, Db2} -> + {ok, Conflicts, Db2} -> ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db2#db.name}), - {reply, ok, Db2} + {reply, {ok, Conflicts}, Db2} catch throw: retry -> - {reply, retry, Db}; - throw: conflict -> - {reply, conflict, Db} + {reply, retry, Db} end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately @@ -64,18 +62,18 @@ handle_call(increment_update_seq, _From, Db) -> couch_db_update_notifier:notify({updated, Db#db.name}), {reply, {ok, Db2#db.update_seq}, Db2}; -handle_call({set_admins, NewAdmins, #user_ctx{roles=Roles}}, _From, Db) -> - DbAdmins = [<<"_admin">> | Db#db.admins], - case length(DbAdmins -- Roles) == length(DbAdmins) of - true -> - {reply, {unauthorized, <<"You are not a db or server admin.">>}, Db}; - false -> - {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins), - Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr, - update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), - {reply, ok, Db2} - end; +handle_call({set_admins, NewAdmins}, _From, Db) -> + {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins), + Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr, + update_seq=Db#db.update_seq+1}), + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + {reply, ok, Db2}; + +handle_call({set_revs_limit, Limit}, _From, Db) -> + Db2 = commit_data(Db#db{revs_limit=Limit, + update_seq=Db#db.update_seq+1}), + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + {reply, ok, Db2}; handle_call({purge_docs, _IdRevs}, _From, #db{compactor_pid=Pid}=Db) when Pid /= nil -> @@ -298,7 +296,8 @@ init_db(DbName, Filepath, Fd, Header0) -> filepath = Filepath, admins = Admins, admins_ptr = AdminsPtr, - instance_start_time = StartTime + instance_start_time = StartTime, + revs_limit = Header#db_header.revs_limit }. @@ -358,40 +357,31 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> end, Unflushed), flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). -merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccSeq}; -merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) -> - #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo, - UpdatesRevTree = lists:foldl( - fun(NewDoc, AccTree) -> - couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc)) +merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccConflicts, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccConflicts, AccSeq}; +merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], + [OldDocInfo|RestOldInfo], AccNewInfos, AccConflicts, AccSeq) -> + #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted}=OldDocInfo, + {NewRevTree, NewConflicts} = lists:foldl( + fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) -> + case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + {_NewTree, conflicts} + when (not OldDeleted) and (not MergeConflicts) -> + {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]}; + {NewTree, _} -> + {NewTree, AccConflicts2} + end end, - [], NewDocs), - NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), + {OldTree, AccConflicts}, NewDocs), if NewRevTree == OldTree -> % nothing changed - merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq); + merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, + NewConflicts, AccSeq); true -> - if NoConflicts andalso OldTree /= [] -> - OldConflicts = couch_key_tree:count_leafs(OldTree), - NewConflicts = couch_key_tree:count_leafs(NewRevTree), - if NewConflicts > OldConflicts -> - % if all the old docs are deletions, allow this new conflict - case [1 || {_Rev,{IsDel,_Sp},_Path} <- - couch_key_tree:get_all_leafs(OldTree), IsDel==false] of - [] -> - ok; - _ -> - throw(conflict) - end; - true -> ok - end; - true -> ok - end, + % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, - merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, - [NewInfo|AccNewInfos],AccSeq+1) + merge_rev_trees(MergeConflicts, RestDocsList,RestOldInfo, + [NewInfo|AccNewInfos], NewConflicts, AccSeq+1) end. new_index_entries([], AccById, AccBySeq) -> @@ -402,19 +392,23 @@ new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], [DocInfo|AccBySeq]). + +stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> + [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || + #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. + + update_docs_int(Db, DocsList, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, - % separate out the NonRep documents from the rest of the documents {DocsList2, NonRepDocs} = lists:foldl( - fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> + fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) -> case Id of - <<?LOCAL_DOC_PREFIX, _/binary>> when Rest==[] -> - % when saving NR (non rep) documents, you can only save a single rev + <<?LOCAL_DOC_PREFIX, _/binary>> -> {DocsListAcc, [Doc | NonRepDocsAcc]}; Id-> {[Docs | DocsListAcc], NonRepDocsAcc} @@ -434,23 +428,26 @@ update_docs_int(Db, DocsList, Options) -> Ids, OldDocLookups), % Merge the new docs into the revision trees. - NoConflicts = lists:member(new_edits, Options), - {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), + {ok, NewDocInfos0, Conflicts, NewSeq} = merge_rev_trees( + lists:member(merge_conflicts, Options), + DocsList2, OldDocInfos, [], [], LastSeq), + + NewDocInfos = stem_full_doc_infos(Db, NewDocInfos0), RemoveSeqs = - [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], + [OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], - % All regular documents are now ready to write. + % All documents are now ready to write. - % Try to write the local documents first, a conflict might be generated - {ok, Db2} = update_local_docs(Db, NonRepDocs), + {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs), - % Write out the document summaries (they are stored in the nodes of the rev trees) + % Write out the document summaries (the bodies are stored in the nodes of + % the trees, the attachments are already written to disk) {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []), - % and the indexes to the documents + % and the indexes {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), @@ -459,6 +456,8 @@ update_docs_int(Db, DocsList, Options) -> docinfo_by_seq_btree = DocInfoBySeqBTree2, update_seq = NewSeq}, + % Check if we just updated any design documents, and update the validation + % funs if we did. case [1 || <<"_design/",_/binary>> <- Ids] of [] -> Db4 = Db3; @@ -466,18 +465,15 @@ update_docs_int(Db, DocsList, Options) -> Db4 = refresh_validate_doc_funs(Db3) end, - {ok, commit_data(Db4, not lists:member(full_commit, Options))}. - + {ok, LocalConflicts ++ Conflicts, + commit_data(Db4, not lists:member(full_commit, Options))}. + update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> - NewRev = - case Revs of - [] -> 0; - [RevStr|_] -> list_to_integer(binary_to_list(RevStr)) - end, + fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) -> + NewRev = list_to_integer(?b2l(RevStr)), OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; @@ -490,18 +486,19 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> true -> {remove, Id} end; false -> - throw(conflict) + {conflict, {Id, {0, RevStr}}} end end, Docs, OldDocLookups), BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], - + Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries], + {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Db#db{local_docs_btree = Btree2}}. + {ok, Conflicts, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> @@ -515,7 +512,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), - admins_ptr = Db#db.admins_ptr + admins_ptr = Db#db.admins_ptr, + revs_limit = Db#db.revs_limit }, if Header == Header2 -> Db; @@ -549,6 +547,10 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> []; +copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) -> + % root nner node, only copy info/data from leaf nodes + [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]), + [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> % This is a leaf node, copy it over NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), @@ -560,10 +562,11 @@ copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), - NewFullDocInfos = lists:map( + NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} end, LookupResults), + NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0), NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], RemoveSeqs = case Retry of |