summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-07-17 21:33:41 +0000
committerDamien F. Katz <damien@apache.org>2009-07-17 21:33:41 +0000
commit91bf33fdc69c2087707795b8822b0fa7617f8709 (patch)
treef01ba98e03b77923d8256e9dd7cdca997ba06cf2 /src/couchdb/couch_db_updater.erl
parent627562eef0290bcf07659f40d7ba85c333978ce6 (diff)
Deterministic revids, MD5 checking of documents, added tracking of rev when an attachment is edited to allow attachment level replication.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795232 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl182
1 files changed, 108 insertions, 74 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index dacf2515..fd1d340f 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -44,12 +44,15 @@ terminate(Reason, _Srv) ->
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
-handle_call({update_docs, DocActions, Options}, _From, Db) ->
- try update_docs_int(Db, DocActions, Options) of
- {ok, Conflicts, Db2} ->
+handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) ->
+ try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of
+ {ok, Failures, Db2} ->
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- couch_db_update_notifier:notify({updated, Db2#db.name}),
- {reply, {ok, Conflicts}, Db2}
+ if Db2#db.update_seq /= Db#db.update_seq ->
+ couch_db_update_notifier:notify({updated, Db2#db.name});
+ true -> ok
+ end,
+ {reply, {ok, Failures}, Db2}
catch
throw: retry ->
{reply, retry, Db}
@@ -289,6 +292,7 @@ init_db(DbName, Filepath, Fd, Header0) ->
case element(2, Header1) of
1 -> Header1#db_header{unused = 0}; % 0.9
2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+ 3 -> Header1; % post 0.9 and pre 0.10
?LATEST_DISK_VERSION -> Header1;
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
@@ -364,31 +368,39 @@ refresh_validate_doc_funs(Db) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
+flush_trees(#db{fd=Fd,header=Header}=Db,
+ [InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
Flushed = couch_key_tree:map(
fun(_Rev, Value) ->
case Value of
- #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+ #doc{atts=Atts,deleted=IsDeleted}=Doc ->
% this node value is actually an unwritten document summary,
% write to disk.
- % make sure the Fd in the written bins is the same Fd we are.
- Bins =
+ % make sure the Fd in the written bins is the same Fd we are
+ % and convert bins, removing the FD.
+ % All bins should have been written to disk already.
+ DiskAtts =
case Atts of
[] -> [];
- [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- [{BinName, {BinType, BinSp, BinLen}}
- || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
+ [{N,T,P,L,R,M}
+ || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L}
<- Atts];
_ ->
% BinFd must not equal our Fd. This can happen when a database
% is being switched out during a compaction
- ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
+ ?LOG_DEBUG("File where the attachments are written has"
+ " changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} =
+ case Header#db_header.disk_version < 4 of
+ true ->
+ couch_file:append_term(Fd, {Doc#doc.body, DiskAtts});
+ false ->
+ couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts})
+ end,
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -403,14 +415,40 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
{NewRevTree, NewConflicts} = lists:foldl(
- fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) ->
- case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
- {_NewTree, conflicts}
- when (not OldDeleted) and (not MergeConflicts) ->
- {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]};
- {NewTree, _} ->
+ fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) ->
+ if not MergeConflicts ->
+ case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
+ {_NewTree, conflicts} when (not OldDeleted) ->
+ {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]};
+ {NewTree, no_conflicts} when AccTree == NewTree ->
+ % the tree didn't change at all
+ % meaning we are saving a rev that's already
+ % been editted again.
+ if (Pos == 1) and OldDeleted ->
+ % this means we are recreating a brand new document
+ % into a state that already existed before.
+ % put the rev into a subsequent edit of the deletion
+ #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
+ couch_doc:to_doc_info(OldDocInfo),
+ NewRevId = couch_db:new_revid(
+ NewDoc#doc{revs={OldPos, [OldRev]}}),
+ NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
+ {NewTree2, _} = couch_key_tree:merge(AccTree,
+ [couch_db:doc_to_tree(NewDoc2)]),
+ % we changed the rev id, this tells the caller we did.
+ {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}}
+ | AccConflicts2]};
+ true ->
+ {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}
+ end;
+ {NewTree, _} ->
+ {NewTree, AccConflicts2}
+ end;
+ true ->
+ {NewTree, _} = couch_key_tree:merge(AccTree,
+ [couch_db:doc_to_tree(NewDoc)]),
{NewTree, AccConflicts2}
- end
+ end
end,
{OldTree, AccConflicts}, NewDocs),
if NewRevTree == OldTree ->
@@ -444,26 +482,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
[Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
#full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-
-update_docs_int(Db, DocsList, Options) ->
+update_docs_int(Db, DocsList, NonRepDocs, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
- % separate out the NonRep documents from the rest of the documents
- {DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
- case Id of
- <<?LOCAL_DOC_PREFIX, _/binary>> ->
- {DocsListAcc, [Doc | NonRepDocsAcc]};
- Id->
- {[Docs | DocsListAcc], NonRepDocsAcc}
- end
- end, {[], []}, DocsList),
-
- Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
-
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -477,7 +502,7 @@ update_docs_int(Db, DocsList, Options) ->
% Merge the new docs into the revision trees.
{ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees(
lists:member(merge_conflicts, Options),
- DocsList2, OldDocInfos, [], [], [], LastSeq),
+ DocsList, OldDocInfos, [], [], [], LastSeq),
NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
@@ -518,33 +543,43 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
Ids = [Id || #doc{id=Id} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) ->
- NewRev = list_to_integer(?b2l(RevStr)),
+ fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) ->
+ case PrevRevs of
+ [RevStr|_] ->
+ PrevRev = list_to_integer(?b2l(RevStr));
+ [] ->
+ PrevRev = 0
+ end,
OldRev =
case OldDocLookup of
{ok, {_, {OldRev0, _}}} -> OldRev0;
not_found -> 0
end,
- case OldRev + 1 == NewRev of
+ case OldRev == PrevRev of
true ->
case Delete of
- false -> {update, {Id, {NewRev, Body}}};
- true -> {remove, Id}
+ false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}};
+ true -> {remove, Id, PrevRevs}
end;
false ->
- {conflict, {Id, {0, RevStr}}}
+ {conflict, {Id, {0, PrevRevs}}}
end
-
end, Docs, OldDocLookups),
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
- Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries],
+ BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries],
+ BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries],
+ Results =
+ [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}}
+ || {remove, Id, PrevRevs} <- BtreeEntries] ++
+ [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}}
+ || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++
+ [{IdRevs, conflict}
+ || {conflict, IdRevs} <- BtreeEntries],
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Conflicts, Db#db{local_docs_btree = Btree2}}.
+ {ok, Results, Db#db{local_docs_btree = Btree2}}.
commit_data(Db) ->
@@ -594,43 +629,42 @@ commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
end.
-copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
+copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp),
% copy the bin values
NewBinInfos = lists:map(
fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
% 09 UPGRADE CODE
- {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}};
+ {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, Type, NewBinSp, Len, Pos, Md5};
({Name, {Type, BinSp, Len}}) ->
- {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ % 09 UPGRADE CODE
+ {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, Type, NewBinSp, Len, Pos, Md5};
+ ({Name, Type, BinSp, Len, RevPos, Md5}) ->
+ {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, Type, NewBinSp, Len, RevPos, Md5}
end, BinInfos),
{BodyData, NewBinInfos}.
-copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
- [];
-copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
- % root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
- [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
-copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
- % This is a leaf node, copy it over
- DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
- [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
-copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
- % inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
-
-
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
+copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
+ couch_key_tree:map(
+ fun(Rev, {IsDel, Sp, Seq}, leaf) ->
+ DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd),
+ {IsDel, DocBody, Seq};
+ (_, _, branch) ->
+ ?REV_MISSING
+ end, Tree).
+
+
+copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
% write out the attachments
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)}
end, LookupResults),
% write out the docs
% we do this in 2 stages so the docs are written out contiguously, making
@@ -639,7 +673,7 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
fun(#full_doc_info{rev_tree=RevTree}=Info) ->
Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
fun(_Key, {IsDel, DocBody, Seq}) ->
- {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+ {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
{IsDel, Pos, Seq}
end, RevTree)}
end, NewFullDocInfos0),