summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_db_updater.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_db_updater.erl')
-rw-r--r--apps/couch/src/couch_db_updater.erl85
1 files changed, 45 insertions, 40 deletions
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl
index ab078caf..835d188c 100644
--- a/apps/couch/src/couch_db_updater.erl
+++ b/apps/couch/src/couch_db_updater.erl
@@ -204,9 +204,11 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
couch_file:delete(RootDir, Filepath),
ok = file:rename(CompactFilepath, Filepath),
close_db(Db),
- ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity),
+ NewDb3 = refresh_validate_doc_funs(NewDb2),
+ ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
+ couch_db_update_notifier:notify({compacted, NewDb3#db.name}),
?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb2#db{compactor_pid=nil}};
+ {noreply, NewDb3#db{compactor_pid=nil}};
false ->
?LOG_INFO("Compaction for ~s still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
@@ -518,16 +520,17 @@ send_result(Client, Id, OriginalRevs, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
-merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
-merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
+merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
[OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
NewRevTree = lists:foldl(
fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
if not MergeConflicts ->
- case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
+ case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc),
+ Limit) of
{_NewTree, conflicts} when (not OldDeleted) ->
send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
AccTree;
@@ -558,7 +561,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
NewDoc#doc{revs={OldPos, [OldRev]}}),
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
{NewTree2, _} = couch_key_tree:merge(AccTree,
- [couch_db:doc_to_tree(NewDoc2)]),
+ couch_db:doc_to_tree(NewDoc2), Limit),
% we changed the rev id, this tells the caller we did
send_result(Client, Id, {Pos-1,PrevRevs},
{ok, {OldPos + 1, NewRevId}}),
@@ -572,15 +575,15 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
end;
true ->
{NewTree, _} = couch_key_tree:merge(AccTree,
- [couch_db:doc_to_tree(NewDoc)]),
+ couch_db:doc_to_tree(NewDoc), Limit),
NewTree
end
end,
OldTree, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
- merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
- AccRemoveSeqs, AccSeq);
+ merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ AccNewInfos, AccRemoveSeqs, AccSeq);
true ->
% we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
@@ -588,8 +591,8 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
0 -> AccRemoveSeqs;
_ -> [OldSeq | AccRemoveSeqs]
end,
- merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
+ merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
end.
@@ -609,7 +612,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
id_tree = DocInfoByIdBTree,
seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq
+ update_seq = LastSeq,
+ revs_limit = RevsLimit
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
@@ -622,11 +626,9 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
end,
Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees(
+ {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
- NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
-
% All documents are now ready to write.
{ok, Db2} = update_local_docs(Db, NonRepDocs),
@@ -794,15 +796,6 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
end, BinInfos),
{BodyData, NewBinInfos}.
-copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
- couch_key_tree:map(
- fun(Rev, {IsDel, Sp, Seq}, leaf) ->
- DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd),
- {IsDel, DocBody, Seq};
- (_, _, branch) ->
- ?REV_MISSING
- end, Tree).
-
merge_lookups(Infos, []) ->
Infos;
merge_lookups([], _) ->
@@ -816,20 +809,19 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
% lookup any necessary full_doc_infos
DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- Infos = merge_lookups(MixedInfos, LookupResults),
+ % COUCHDB-968, make sure we prune duplicates during compaction
+ Infos = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
+ A =< B
+ end, merge_lookups(MixedInfos, LookupResults)),
- % write out the attachments
- NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db,
- DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos],
-
- % write out the docs
- % we do this in 2 stages so the docs are written out contiguously, making
- % view indexing and replication faster.
- NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
- fun(_Key, {IsDel, DocBody, Seq}) ->
+ NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map(
+ fun(Rev, {IsDel, Sp, Seq}, leaf) ->
+ DocBody = copy_doc_attachments(Db, Rev, Sp, DestFd),
{ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
- {IsDel, Pos, Seq}
- end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0],
+ {IsDel, Pos, Seq};
+ (_, _, branch) ->
+ ?REV_MISSING
+ end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos],
NewInfos = stem_full_doc_infos(Db, NewInfos1),
RemoveSeqs =
@@ -900,14 +892,19 @@ copy_compact(Db, NewDb0, Retry) ->
commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
-start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
+start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd);
+ case couch_file:read_header(Fd) of
+ {ok, Header} ->
+ ok;
+ no_valid_header ->
+ ok = couch_file:write_header(Fd, Header=#db_header{})
+ end;
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
@@ -915,8 +912,16 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
+ NewDb2 = if PurgeSeq > 0 ->
+ {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+ {ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs),
+ NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+ true ->
+ NewDb
+ end,
unlink(Fd),
- NewDb2 = copy_compact(Db, NewDb, Retry),
- close_db(NewDb2),
+
+ NewDb3 = copy_compact(Db, NewDb2, Retry),
+ close_db(NewDb3),
gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}).