diff options
Diffstat (limited to 'apps/couch/src/couch_db_updater.erl')
-rw-r--r-- | apps/couch/src/couch_db_updater.erl | 85 |
1 files changed, 45 insertions, 40 deletions
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index ab078caf..835d188c 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -204,9 +204,11 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> couch_file:delete(RootDir, Filepath), ok = file:rename(CompactFilepath, Filepath), close_db(Db), - ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity), + NewDb3 = refresh_validate_doc_funs(NewDb2), + ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), + couch_db_update_notifier:notify({compacted, NewDb3#db.name}), ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), - {noreply, NewDb2#db{compactor_pid=nil}}; + {noreply, NewDb3#db{compactor_pid=nil}}; false -> ?LOG_INFO("Compaction for ~s still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", @@ -518,16 +520,17 @@ send_result(Client, Id, OriginalRevs, NewResult) -> % used to send a result to the client catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). -merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> +merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; -merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], +merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, NewRevTree = lists:foldl( fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> if not MergeConflicts -> - case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc), + Limit) of {_NewTree, conflicts} when (not OldDeleted) -> send_result(Client, Id, {Pos-1,PrevRevs}, conflict), AccTree; @@ -558,7 +561,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], NewDoc#doc{revs={OldPos, [OldRev]}}), NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, {NewTree2, _} = couch_key_tree:merge(AccTree, - [couch_db:doc_to_tree(NewDoc2)]), + couch_db:doc_to_tree(NewDoc2), Limit), % we changed the rev id, this tells the caller we did send_result(Client, Id, {Pos-1,PrevRevs}, {ok, {OldPos + 1, NewRevId}}), @@ -572,15 +575,15 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], end; true -> {NewTree, _} = couch_key_tree:merge(AccTree, - [couch_db:doc_to_tree(NewDoc)]), + couch_db:doc_to_tree(NewDoc), Limit), NewTree end end, OldTree, NewDocs), if NewRevTree == OldTree -> % nothing changed - merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, - AccRemoveSeqs, AccSeq); + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + AccNewInfos, AccRemoveSeqs, AccSeq); true -> % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, @@ -588,8 +591,8 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], 0 -> AccRemoveSeqs; _ -> [OldSeq | AccRemoveSeqs] end, - merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, - [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) end. @@ -609,7 +612,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ id_tree = DocInfoByIdBTree, seq_tree = DocInfoBySeqBTree, - update_seq = LastSeq + update_seq = LastSeq, + revs_limit = RevsLimit } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. @@ -622,11 +626,9 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> end, Ids, OldDocLookups), % Merge the new docs into the revision trees. - {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees( + {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit, MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), - NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), - % All documents are now ready to write. {ok, Db2} = update_local_docs(Db, NonRepDocs), @@ -794,15 +796,6 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> end, BinInfos), {BodyData, NewBinInfos}. -copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> - couch_key_tree:map( - fun(Rev, {IsDel, Sp, Seq}, leaf) -> - DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd), - {IsDel, DocBody, Seq}; - (_, _, branch) -> - ?REV_MISSING - end, Tree). - merge_lookups(Infos, []) -> Infos; merge_lookups([], _) -> @@ -816,20 +809,19 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) -> % lookup any necessary full_doc_infos DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), - Infos = merge_lookups(MixedInfos, LookupResults), + % COUCHDB-968, make sure we prune duplicates during compaction + Infos = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) -> + A =< B + end, merge_lookups(MixedInfos, LookupResults)), - % write out the attachments - NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, - DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos], - - % write out the docs - % we do this in 2 stages so the docs are written out contiguously, making - % view indexing and replication faster. - NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( - fun(_Key, {IsDel, DocBody, Seq}) -> + NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map( + fun(Rev, {IsDel, Sp, Seq}, leaf) -> + DocBody = copy_doc_attachments(Db, Rev, Sp, DestFd), {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), - {IsDel, Pos, Seq} - end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0], + {IsDel, Pos, Seq}; + (_, _, branch) -> + ?REV_MISSING + end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos], NewInfos = stem_full_doc_infos(Db, NewInfos1), RemoveSeqs = @@ -900,14 +892,19 @@ copy_compact(Db, NewDb0, Retry) -> commit_data(NewDb4#db{update_seq=Db#db.update_seq}). -start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> +start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) -> CompactFile = Filepath ++ ".compact", ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of {ok, Fd} -> couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, - {ok, Header} = couch_file:read_header(Fd); + case couch_file:read_header(Fd) of + {ok, Header} -> + ok; + no_valid_header -> + ok = couch_file:write_header(Fd, Header=#db_header{}) + end; {error, enoent} -> couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), @@ -915,8 +912,16 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> ok = couch_file:write_header(Fd, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), + NewDb2 = if PurgeSeq > 0 -> + {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), + {ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs), + NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}}; + true -> + NewDb + end, unlink(Fd), - NewDb2 = copy_compact(Db, NewDb, Retry), - close_db(NewDb2), + + NewDb3 = copy_compact(Db, NewDb2, Retry), + close_db(NewDb3), gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}). |