summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2009-03-13 22:15:34 +0000
committerJohn Christopher Anderson <jchris@apache.org>2009-03-13 22:15:34 +0000
commit9007e2d21dea8b0185c0096b30364a8ee40a3867 (patch)
tree7d8dacb2c8cd619f18dfab8fdb40d146ac28c85a /src/couchdb/couch_db_updater.erl
parent65608e14e8911b33c30178d717d745edc9f66c17 (diff)
Commit Damien's rep_security branch to trunk.
Changes bulk_docs conflict checking. Breaks file format, see mailing list for data upgrade procedure, or http://wiki.apache.org/couchdb/Breaking_changes git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@753448 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl147
1 files changed, 75 insertions, 72 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 7790d7a4..7752a577 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -44,15 +44,13 @@ handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call({update_docs, DocActions, Options}, _From, Db) ->
try update_docs_int(Db, DocActions, Options) of
- {ok, Db2} ->
+ {ok, Conflicts, Db2} ->
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
couch_db_update_notifier:notify({updated, Db2#db.name}),
- {reply, ok, Db2}
+ {reply, {ok, Conflicts}, Db2}
catch
throw: retry ->
- {reply, retry, Db};
- throw: conflict ->
- {reply, conflict, Db}
+ {reply, retry, Db}
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
@@ -64,18 +62,18 @@ handle_call(increment_update_seq, _From, Db) ->
couch_db_update_notifier:notify({updated, Db#db.name}),
{reply, {ok, Db2#db.update_seq}, Db2};
-handle_call({set_admins, NewAdmins, #user_ctx{roles=Roles}}, _From, Db) ->
- DbAdmins = [<<"_admin">> | Db#db.admins],
- case length(DbAdmins -- Roles) == length(DbAdmins) of
- true ->
- {reply, {unauthorized, <<"You are not a db or server admin.">>}, Db};
- false ->
- {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins),
- Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2}
- end;
+handle_call({set_admins, NewAdmins}, _From, Db) ->
+ {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins),
+ Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr,
+ update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {reply, ok, Db2};
+
+handle_call({set_revs_limit, Limit}, _From, Db) ->
+ Db2 = commit_data(Db#db{revs_limit=Limit,
+ update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {reply, ok, Db2};
handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
@@ -298,7 +296,8 @@ init_db(DbName, Filepath, Fd, Header0) ->
filepath = Filepath,
admins = Admins,
admins_ptr = AdminsPtr,
- instance_start_time = StartTime
+ instance_start_time = StartTime,
+ revs_limit = Header#db_header.revs_limit
}.
@@ -358,40 +357,31 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
end, Unflushed),
flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccSeq};
-merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
- #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
- UpdatesRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
- couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc))
+merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccConflicts, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccConflicts, AccSeq};
+merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccConflicts, AccSeq) ->
+ #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted}=OldDocInfo,
+ {NewRevTree, NewConflicts} = lists:foldl(
+ fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) ->
+ case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
+ {_NewTree, conflicts}
+ when (not OldDeleted) and (not MergeConflicts) ->
+ {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]};
+ {NewTree, _} ->
+ {NewTree, AccConflicts2}
+ end
end,
- [], NewDocs),
- NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+ {OldTree, AccConflicts}, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
- merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
+ merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
+ NewConflicts, AccSeq);
true ->
- if NoConflicts andalso OldTree /= [] ->
- OldConflicts = couch_key_tree:count_leafs(OldTree),
- NewConflicts = couch_key_tree:count_leafs(NewRevTree),
- if NewConflicts > OldConflicts ->
- % if all the old docs are deletions, allow this new conflict
- case [1 || {_Rev,{IsDel,_Sp},_Path} <-
- couch_key_tree:get_all_leafs(OldTree), IsDel==false] of
- [] ->
- ok;
- _ ->
- throw(conflict)
- end;
- true -> ok
- end;
- true -> ok
- end,
+ % we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
- merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo,
- [NewInfo|AccNewInfos],AccSeq+1)
+ merge_rev_trees(MergeConflicts, RestDocsList,RestOldInfo,
+ [NewInfo|AccNewInfos], NewConflicts, AccSeq+1)
end.
new_index_entries([], AccById, AccBySeq) ->
@@ -402,19 +392,23 @@ new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
[FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
[DocInfo|AccBySeq]).
+
+stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
+ [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
+ #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+
+
update_docs_int(Db, DocsList, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
-
% separate out the NonRep documents from the rest of the documents
{DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+ fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
case Id of
- <<?LOCAL_DOC_PREFIX, _/binary>> when Rest==[] ->
- % when saving NR (non rep) documents, you can only save a single rev
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsListAcc, [Doc | NonRepDocsAcc]};
Id->
{[Docs | DocsListAcc], NonRepDocsAcc}
@@ -434,23 +428,26 @@ update_docs_int(Db, DocsList, Options) ->
Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- NoConflicts = lists:member(new_edits, Options),
- {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
+ {ok, NewDocInfos0, Conflicts, NewSeq} = merge_rev_trees(
+ lists:member(merge_conflicts, Options),
+ DocsList2, OldDocInfos, [], [], LastSeq),
+
+ NewDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
RemoveSeqs =
- [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+ [OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
- % All regular documents are now ready to write.
+ % All documents are now ready to write.
- % Try to write the local documents first, a conflict might be generated
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs),
- % Write out the document summaries (they are stored in the nodes of the rev trees)
+ % Write out the document summaries (the bodies are stored in the nodes of
+ % the trees, the attachments are already written to disk)
{ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
{ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []),
- % and the indexes to the documents
+ % and the indexes
{ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
{ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
@@ -459,6 +456,8 @@ update_docs_int(Db, DocsList, Options) ->
docinfo_by_seq_btree = DocInfoBySeqBTree2,
update_seq = NewSeq},
+ % Check if we just updated any design documents, and update the validation
+ % funs if we did.
case [1 || <<"_design/",_/binary>> <- Ids] of
[] ->
Db4 = Db3;
@@ -466,18 +465,15 @@ update_docs_int(Db, DocsList, Options) ->
Db4 = refresh_validate_doc_funs(Db3)
end,
- {ok, commit_data(Db4, not lists:member(full_commit, Options))}.
-
+ {ok, LocalConflicts ++ Conflicts,
+ commit_data(Db4, not lists:member(full_commit, Options))}.
+
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
Ids = [Id || #doc{id=Id} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
- NewRev =
- case Revs of
- [] -> 0;
- [RevStr|_] -> list_to_integer(binary_to_list(RevStr))
- end,
+ fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) ->
+ NewRev = list_to_integer(?b2l(RevStr)),
OldRev =
case OldDocLookup of
{ok, {_, {OldRev0, _}}} -> OldRev0;
@@ -490,18 +486,19 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
true -> {remove, Id}
end;
false ->
- throw(conflict)
+ {conflict, {Id, {0, RevStr}}}
end
end, Docs, OldDocLookups),
BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
-
+ Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries],
+
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Db#db{local_docs_btree = Btree2}}.
+ {ok, Conflicts, Db#db{local_docs_btree = Btree2}}.
commit_data(Db) ->
@@ -515,7 +512,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- admins_ptr = Db#db.admins_ptr
+ admins_ptr = Db#db.admins_ptr,
+ revs_limit = Db#db.revs_limit
},
if Header == Header2 ->
Db;
@@ -549,6 +547,10 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
[];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+ % root nner node, only copy info/data from leaf nodes
+ [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
+ [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
% This is a leaf node, copy it over
NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
@@ -560,10 +562,11 @@ copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
- NewFullDocInfos = lists:map(
+ NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
end, LookupResults),
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
RemoveSeqs =
case Retry of