% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_db_updater). -behaviour(gen_server). -export([btree_by_id_reduce/2,btree_by_seq_reduce/2]). -export([less_docid/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include("couch_db.hrl"). init({MainPid, DbName, Filepath, Fd, Options}) -> case lists:member(create, Options) of true -> % create a new header and writes it to the file Header = #db_header{}, ok = couch_file:write_header(Fd, Header), % delete any old compaction files that might be hanging around file:delete(Filepath ++ ".compact"); false -> ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE {ok, Header} = couch_file:read_header(Fd) end, Db = init_db(DbName, Filepath, Fd, Header), Db2 = refresh_validate_doc_funs(Db), {ok, Db2#db{main_pid=MainPid}}. terminate(Reason, _Srv) -> couch_util:terminate_linked(Reason), ok. handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; handle_call({update_docs, DocActions, Options}, _From, Db) -> try update_docs_int(Db, DocActions, Options) of {ok, Conflicts, Db2} -> ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db2#db.name}), {reply, {ok, Conflicts}, Db2} catch throw: retry -> {reply, retry, Db} end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately handle_call(full_commit, _From, Db) -> {reply, ok, commit_data(Db)}; % commit the data and return ok handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db#db.name}), {reply, {ok, Db2#db.update_seq}, Db2}; handle_call({set_admins, NewAdmins}, _From, Db) -> {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins), Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr, update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), {reply, ok, Db2}; handle_call({set_revs_limit, Limit}, _From, Db) -> Db2 = commit_data(Db#db{revs_limit=Limit, update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), {reply, ok, Db2}; handle_call({purge_docs, _IdRevs}, _From, #db{compactor_pid=Pid}=Db) when Pid /= nil -> {reply, {error, purge_during_compaction}, Db}; handle_call({purge_docs, IdRevs}, _From, Db) -> #db{ fd=Fd, fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq, header = Header = #db_header{purge_seq=PurgeSeq} } = Db, DocLookups = couch_btree:lookup(DocInfoByIdBTree, [Id || {Id, _Revs} <- IdRevs]), NewDocInfos = lists:zipwith( fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> case couch_key_tree:remove_leafs(Tree, Revs) of {_, []=_RemovedRevs} -> % no change nil; {NewTree, RemovedRevs} -> {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} end; (_, not_found) -> nil end, IdRevs, DocLookups), SeqsToRemove = [Seq || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], FullDocInfoToUpdate = [FullInfo || {#full_doc_info{rev_tree=Tree}=FullInfo,_} <- NewDocInfos, Tree /= []], IdRevsPurged = [{Id, Revs} || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], {DocInfoToUpdate, NewSeq} = lists:mapfoldl( fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> Tree2 = couch_key_tree:map_leafs( fun(RevInfo) -> RevInfo#rev_info{seq=SeqAcc + 1} end, Tree), {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}), SeqAcc + 1} end, LastSeq, FullDocInfoToUpdate), IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_} <- NewDocInfos], {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, DocInfoToUpdate, SeqsToRemove), {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, FullDocInfoToUpdate, IdsToRemove), {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged), Db2 = commit_data( Db#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, update_seq = NewSeq + 1, header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db#db.name}), {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}. handle_cast(start_compact, Db) -> case Db#db.compactor_pid of nil -> ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), {noreply, Db2}; _ -> % compact currently running, this is a no-op {noreply, Db} end; handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), {ok, NewHeader} = couch_file:read_header(NewFd), #db{update_seq=NewSeq} = NewDb = init_db(Db#db.name, Filepath, NewFd, NewHeader), unlink(NewFd), case Db#db.update_seq == NewSeq of true -> % suck up all the local docs into memory and write them to the new db {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree, main_pid = Db#db.main_pid,filepath = Filepath}), ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), file:delete(Filepath), ok = file:rename(CompactFilepath, Filepath), close_db(Db), ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}), ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), {noreply, NewDb2#db{compactor_pid=nil}}; false -> ?LOG_INFO("Compaction file still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", [Db#db.update_seq, NewSeq]), close_db(NewDb), Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, {noreply, Db2} end. handle_info(delayed_commit, Db) -> {noreply, commit_data(Db)}. code_change(_OldVsn, State, _Extra) -> {ok, State}. btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) -> RevInfos = [{Rev, Seq, Bp} || #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs], DeletedRevInfos = [{Rev, Seq, Bp} || #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs], {KeySeq,{Id, RevInfos, DeletedRevInfos}}. btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> #doc_info{ id = Id, high_seq=KeySeq, revs = [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} || {Rev, Seq, Bp} <- RevInfos] ++ [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || {Rev, Seq, Bp} <- DeletedRevInfos]}; btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> % 09 UPGRADE CODE % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers % and individual seq nums for conflicts that are currently in the index, % meaning the filtered _changes api will not work except for on main docs. % Simply compact a 0.9.0 database to upgrade the index. #doc_info{ id=Id, high_seq=KeySeq, revs = [#rev_info{rev=Rev,seq=KeySeq,deleted=Deleted,body_sp=Bp}] ++ [#rev_info{rev=Rev1,seq=KeySeq,deleted=false} || Rev1 <- Conflicts] ++ [#rev_info{rev=Rev2,seq=KeySeq,deleted=true} || Rev2 <- DelConflicts]}. btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Deleted, rev_tree=Tree}) -> DiskTree = couch_key_tree:map( fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; (_RevId, ?REV_MISSING) -> ?REV_MISSING end, Tree), {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}. btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> Tree = couch_key_tree:map( fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> {IsDeleted == 1, BodyPointer, UpdateSeq}; (_RevId, ?REV_MISSING) -> ?REV_MISSING; (_RevId, {IsDeleted, BodyPointer}) -> % 09 UPGRADE CODE % this is the 0.9.0 and earlier rev info record. It's missing the seq % nums, which means couchdb will sometimes reexamine unchanged % documents with the _changes API. % This is fixed by compacting the database. {IsDeleted, BodyPointer, HighSeq} end, DiskTree), #full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}. btree_by_id_reduce(reduce, FullDocInfos) -> % count the number of not deleted documents {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]), length([1 || #full_doc_info{deleted=true} <- FullDocInfos])}; btree_by_id_reduce(rereduce, Reds) -> {lists:sum([Count || {Count,_} <- Reds]), lists:sum([DelCount || {_, DelCount} <- Reds])}. btree_by_seq_reduce(reduce, DocInfos) -> % count the number of documents length(DocInfos); btree_by_seq_reduce(rereduce, Reds) -> lists:sum(Reds). simple_upgrade_record(Old, New) when size(Old) == size(New)-> Old; simple_upgrade_record(Old, New) -> NewValuesTail = lists:sublist(tuple_to_list(New), size(Old) + 1, size(New)-size(Old)), list_to_tuple(tuple_to_list(Old) ++ NewValuesTail). % used for doc insertion, also for the PassedEndFun on all_docs view less_docid(A, B) when A==B -> false; less_docid(nil, _) -> true; % nil - special key sorts before all less_docid({}, _) -> false; % {} -> special key sorts after all less_docid(A, B) -> A < B. init_db(DbName, Filepath, Fd, Header0) -> Header1 = simple_upgrade_record(Header0, #db_header{}), Header = case element(2, Header1) of 1 -> Header1#db_header{unused = 0}; % 0.9 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10 ?LATEST_DISK_VERSION -> Header1; _ -> throw({database_disk_version_error, "Incorrect disk header version"}) end, Less = fun less_docid/2, {ok, FsyncOptions} = couch_util:parse_term( couch_config:get("couchdb", "fsync_options", "[before_header, after_header, on_file_open]")), case lists:member(on_file_open, FsyncOptions) of true -> ok = couch_file:sync(Fd); _ -> ok end, {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, [{split, fun(X) -> btree_by_id_split(X) end}, {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}, {less, Less}]), {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, [{split, fun(X) -> btree_by_seq_split(X) end}, {join, fun(X,Y) -> btree_by_seq_join(X,Y) end}, {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), case Header#db_header.admins_ptr of nil -> Admins = [], AdminsPtr = nil; AdminsPtr -> {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr) end, % convert start time tuple to microsecs and store as a binary string {MegaSecs, Secs, MicroSecs} = now(), StartTime = ?l2b(io_lib:format("~p", [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), {ok, RefCntr} = couch_ref_counter:start([Fd]), #db{ update_pid=self(), fd=Fd, fd_ref_counter = RefCntr, header=Header, fulldocinfo_by_id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, committed_update_seq = Header#db_header.update_seq, update_seq = Header#db_header.update_seq, name = DbName, filepath = Filepath, admins = Admins, admins_ptr = AdminsPtr, instance_start_time = StartTime, revs_limit = Header#db_header.revs_limit, fsync_options = FsyncOptions }. close_db(#db{fd_ref_counter = RefCntr}) -> couch_ref_counter:drop(RefCntr). refresh_validate_doc_funs(Db) -> {ok, DesignDocs} = get_design_docs(Db), ProcessDocFuns = lists:flatmap( fun(DesignDoc) -> case couch_doc:get_validate_doc_fun(DesignDoc) of nil -> []; Fun -> [Fun] end end, DesignDocs), Db#db{validate_doc_funs=ProcessDocFuns}. get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> couch_btree:foldl(Btree, <<"_design/">>, fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), {ok, [Doc | AccDocs]}; (_, _Reds, AccDocs) -> {stop, AccDocs} end, []). % rev tree functions flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, Flushed = couch_key_tree:map( fun(_Rev, Value) -> case Value of #doc{attachments=Atts,deleted=IsDeleted}=Doc -> % this node value is actually an unwritten document summary, % write to disk. % make sure the Fd in the written bins is the same Fd we are. Bins = case Atts of [] -> []; [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> % convert bins, removing the FD. % All bins should have been flushed to disk already. [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts]; _ -> % BinFd must not equal our Fd. This can happen when a database % is being switched out during a compaction ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), throw(retry) end, {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}), {IsDeleted, NewSummaryPointer, UpdateSeq}; _ -> Value end end, Unflushed), flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq}; merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, {NewRevTree, NewConflicts} = lists:foldl( fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) -> case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of {_NewTree, conflicts} when (not OldDeleted) and (not MergeConflicts) -> {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]}; {NewTree, _} -> {NewTree, AccConflicts2} end end, {OldTree, AccConflicts}, NewDocs), if NewRevTree == OldTree -> % nothing changed merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccRemoveSeqs, NewConflicts, AccSeq); true -> % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, RemoveSeqs = case OldSeq of 0 -> AccRemoveSeqs; _ -> [OldSeq | AccRemoveSeqs] end, merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1) end. new_index_entries([], AccById, AccBySeq) -> {AccById, AccBySeq}; new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo = couch_doc:to_doc_info(FullDocInfo), new_index_entries(RestInfos, [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], [DocInfo|AccBySeq]). stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. update_docs_int(Db, DocsList, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, % separate out the NonRep documents from the rest of the documents {DocsList2, NonRepDocs} = lists:foldl( fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) -> case Id of <<?LOCAL_DOC_PREFIX, _/binary>> -> {DocsListAcc, [Doc | NonRepDocsAcc]}; Id-> {[Docs | DocsListAcc], NonRepDocsAcc} end end, {[], []}, DocsList), Ids = [Id || [#doc{id=Id}|_] <- DocsList2], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( fun(_Id, {ok, FullDocInfo}) -> FullDocInfo; (Id, not_found) -> #full_doc_info{id=Id} end, Ids, OldDocLookups), % Merge the new docs into the revision trees. {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( lists:member(merge_conflicts, Options), DocsList2, OldDocInfos, [], [], [], LastSeq), NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), % All documents are now ready to write. {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs), % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), {IndexFullDocInfos, IndexDocInfos} = new_index_entries(FlushedFullDocInfos, [], []), % and the indexes {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs), Db3 = Db2#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, update_seq = NewSeq}, % Check if we just updated any design documents, and update the validation % funs if we did. case [1 || <<"_design/",_/binary>> <- Ids] of [] -> Db4 = Db3; _ -> Db4 = refresh_validate_doc_funs(Db3) end, {ok, LocalConflicts ++ Conflicts, commit_data(Db4, not lists:member(full_commit, Options))}. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) -> NewRev = list_to_integer(?b2l(RevStr)), OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; not_found -> 0 end, case OldRev + 1 == NewRev of true -> case Delete of false -> {update, {Id, {NewRev, Body}}}; true -> {remove, Id} end; false -> {conflict, {Id, {0, RevStr}}} end end, Docs, OldDocLookups), BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries], {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), {ok, Conflicts, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> commit_data(Db, false). db_to_header(Db, Header) -> Header#db_header{ update_seq = Db#db.update_seq, docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), admins_ptr = Db#db.admins_ptr, revs_limit = Db#db.revs_limit}. commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) -> Header = db_to_header(Db, OldHeader), if OldHeader == Header -> Db; Delay and (Db#db.waiting_delayed_commit == nil) -> Db#db{waiting_delayed_commit= erlang:send_after(1000, self(), delayed_commit)}; Delay -> Db; true -> if Db#db.waiting_delayed_commit /= nil -> case erlang:cancel_timer(Db#db.waiting_delayed_commit) of false -> receive delayed_commit -> ok after 0 -> ok end; _ -> ok end; true -> ok end, case lists:member(before_header, FsyncOptions) of true -> ok = couch_file:sync(Fd); _ -> ok end, ok = couch_file:write_header(Fd, Header), case lists:member(after_header, FsyncOptions) of true -> ok = couch_file:sync(Fd); _ -> ok end, Db#db{waiting_delayed_commit=nil, header=Header, committed_update_seq=Db#db.update_seq} end. copy_doc_attachments(SrcFd, SrcSp, DestFd) -> {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp), % copy the bin values NewBinInfos = lists:map( fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> % 09 UPGRADE CODE {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), {Name, {Type, NewBinSp, Len}}; ({Name, {Type, BinSp, Len}}) -> {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), {Name, {Type, NewBinSp, Len}} end, BinInfos), {BodyData, NewBinInfos}. copy_rev_tree_attachments(_SrcFd, _DestFd, []) -> []; copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) -> % root nner node, only copy info/data from leaf nodes [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]), [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> % This is a leaf node, copy it over DocBody = copy_doc_attachments(SrcFd, Sp, DestFd), [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) -> % inner node, only copy info/data from leaf nodes [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]. copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), % write out the attachments NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)} end, LookupResults), % write out the docs % we do this in 2 stages so the docs are written out contiguously, making % view indexing and replication faster. NewFullDocInfos1 = lists:map( fun(#full_doc_info{rev_tree=RevTree}=Info) -> Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( fun(_Key, {IsDel, DocBody, Seq}) -> {ok, Pos} = couch_file:append_term(DestFd, DocBody), {IsDel, Pos, Seq} end, RevTree)} end, NewFullDocInfos0), NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], RemoveSeqs = case Retry of false -> []; true -> % We are retrying a compaction, meaning the documents we are copying may % already exist in our file and must be removed from the by_seq index. Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids), [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] end, {ok, DocInfoBTree} = couch_btree:add_remove( NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs), {ok, FullDocInfoBTree} = couch_btree:add_remove( NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. copy_compact(Db, NewDb0, Retry) -> FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header], NewDb = NewDb0#db{fsync_options=FsyncOptions}, TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> couch_task_status:update("Copied ~p of ~p changes (~p%)", [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]), if TotalCopied rem 1000 == 0 -> NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), if TotalCopied rem 10000 == 0 -> {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}}; true -> {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}} end; true -> {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}} end end, couch_task_status:set_update_frequency(500), {ok, {NewDb2, Uncopied, TotalChanges}} = couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, [], 0}), couch_task_status:update("Flushing"), NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), % copy misc header values if NewDb3#db.admins /= Db#db.admins -> {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.admins), NewDb4 = NewDb3#db{admins=Db#db.admins, admins_ptr=Ptr}; true -> NewDb4 = NewDb3 end, commit_data(NewDb4#db{update_seq=Db#db.update_seq}). start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> CompactFile = Filepath ++ ".compact", ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of {ok, Fd} -> couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, {ok, Header} = couch_file:read_header(Fd); {error, enoent} -> couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), Retry = false, ok = couch_file:write_header(Fd, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), NewDb2 = copy_compact(Db, NewDb, Retry), gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}), close_db(NewDb2).