From 7393d62b7b630bee50f609d0ae8125d33f7cda2b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 18 Aug 2010 11:51:03 -0400 Subject: Grab bag of Cloudant patches to couch OTP application - Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again). --- apps/couch/src/couch_db_updater.erl | 303 ++++++++++++++++++++---------------- 1 file changed, 173 insertions(+), 130 deletions(-) (limited to 'apps/couch/src/couch_db_updater.erl') diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index 19a4c165..e4f8d0ca 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -13,14 +13,14 @@ -module(couch_db_updater). -behaviour(gen_server). --export([btree_by_id_reduce/2,btree_by_seq_reduce/2]). +-export([btree_by_id_split/1,btree_by_id_join/2,btree_by_id_reduce/2]). +-export([btree_by_seq_split/1,btree_by_seq_join/2,btree_by_seq_reduce/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include("couch_db.hrl"). -init({MainPid, DbName, Filepath, Fd, Options}) -> - process_flag(trap_exit, true), +init({DbName, Filepath, Fd, Options}) -> case lists:member(create, Options) of true -> % create a new header and writes it to the file @@ -44,25 +44,40 @@ init({MainPid, DbName, Filepath, Fd, Options}) -> end, Db = init_db(DbName, Filepath, Fd, Header), - Db2 = refresh_validate_doc_funs(Db), - {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}. + couch_stats_collector:track_process_count({couchdb, open_databases}), + % we don't load validation funs here because the fabric query is liable to + % race conditions. Instead see couch_db:validate_doc_update, which loads + % them lazily + {ok, Db#db{main_pid = self(), is_sys_db = lists:member(sys_db, Options)}}. terminate(_Reason, Db) -> couch_file:close(Db#db.fd), couch_util:shutdown_sync(Db#db.compactor_pid), - couch_util:shutdown_sync(Db#db.fd_ref_counter), + couch_util:shutdown_sync(Db#db.fd), ok. +handle_call(start_compact, _From, Db) -> + {noreply, NewDb} = handle_cast(start_compact, Db), + {reply, {ok, NewDb#db.compactor_pid}, NewDb}; + handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> + {reply, ok, Db}; +handle_call(full_commit, _From, Db) -> + {reply, ok, commit_data(Db)}; + +handle_call({full_commit, _}, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately -handle_call(full_commit, _From, Db) -> +handle_call({full_commit, RequiredSeq}, _From, Db) when RequiredSeq =< + Db#db.committed_update_seq -> + {reply, ok, Db}; +handle_call({full_commit, _}, _, Db) -> {reply, ok, commit_data(Db)}; % commit the data and return ok handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_db_update_notifier:notify({updated, Db#db.name}), {reply, {ok, Db2#db.update_seq}, Db2}; @@ -70,13 +85,13 @@ handle_call({set_security, NewSec}, _From, Db) -> {ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec), Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2}; handle_call({set_revs_limit, Limit}, _From, Db) -> Db2 = commit_data(Db#db{revs_limit=Limit, update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2}; handle_call({purge_docs, _IdRevs}, _From, @@ -85,8 +100,8 @@ handle_call({purge_docs, _IdRevs}, _From, handle_call({purge_docs, IdRevs}, _From, Db) -> #db{ fd=Fd, - fulldocinfo_by_id_btree = DocInfoByIdBTree, - docinfo_by_seq_btree = DocInfoBySeqBTree, + id_tree = DocInfoByIdBTree, + seq_tree = DocInfoBySeqBTree, update_seq = LastSeq, header = Header = #db_header{purge_seq=PurgeSeq} } = Db, @@ -136,29 +151,32 @@ handle_call({purge_docs, IdRevs}, _From, Db) -> Db2 = commit_data( Db#db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree2, - docinfo_by_seq_btree = DocInfoBySeqBTree2, + id_tree = DocInfoByIdBTree2, + seq_tree = DocInfoBySeqBTree2, update_seq = NewSeq + 1, header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_db_update_notifier:notify({updated, Db#db.name}), - {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}; -handle_call(start_compact, _From, Db) -> + {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}. + + +handle_cast({load_validation_funs, ValidationFuns}, Db) -> + Db2 = Db#db{validate_doc_funs = ValidationFuns}, + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; +handle_cast(start_compact, Db) -> case Db#db.compactor_pid of nil -> ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), - {reply, ok, Db2}; + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; _ -> % compact currently running, this is a no-op - {reply, ok, Db} - end. - - - + {noreply, Db} + end; handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), {ok, NewHeader} = couch_file:read_header(NewFd), @@ -168,13 +186,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> case Db#db.update_seq == NewSeq of true -> % suck up all the local docs into memory and write them to the new db - {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree, fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), + {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs), NewDb2 = commit_data(NewDb#db{ - local_docs_btree = NewLocalBtree, - main_pid = Db#db.main_pid, + local_tree = NewLocalBtree, + main_pid = self(), filepath = Filepath, instance_start_time = Db#db.instance_start_time, revs_limit = Db#db.revs_limit @@ -186,13 +204,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> couch_file:delete(RootDir, Filepath), ok = file:rename(CompactFilepath, Filepath), close_db(Db), - ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}), + ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity), ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), {noreply, NewDb2#db{compactor_pid=nil}}; false -> - ?LOG_INFO("Compaction file still behind main file " + ?LOG_INFO("Compaction for ~s still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", - [Db#db.update_seq, NewSeq]), + [Db#db.name, Db#db.update_seq, NewSeq]), close_db(NewDb), Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, @@ -215,7 +233,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, FullCommit2) of {ok, Db2} -> - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), if Db2#db.update_seq /= Db#db.update_seq -> couch_db_update_notifier:notify({updated, Db2#db.name}); true -> ok @@ -235,13 +253,16 @@ handle_info(delayed_commit, Db) -> Db -> {noreply, Db}; Db2 -> - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {noreply, Db2} end; handle_info({'EXIT', _Pid, normal}, Db) -> {noreply, Db}; handle_info({'EXIT', _Pid, Reason}, Db) -> - {stop, Reason, Db}. + {stop, Reason, Db}; +handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> + ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]), + {stop, normal, Db}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -279,14 +300,27 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> end. -btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) -> - RevInfos = [{Rev, Seq, Bp} || - #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs], - DeletedRevInfos = [{Rev, Seq, Bp} || - #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs], - {KeySeq,{Id, RevInfos, DeletedRevInfos}}. +rev_tree(DiskTree) -> + couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> + {IsDeleted == 1, BodyPointer, UpdateSeq}; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING + end, DiskTree). + +disk_tree(RevTree) -> + couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> + {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING + end, RevTree). +btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) -> + {Seq, {Id, if Del -> 1; true -> 0 end, disk_tree(T)}}. + +btree_by_seq_join(Seq, {Id, Del, T}) when is_integer(Del) -> + #full_doc_info{id=Id, update_seq=Seq, deleted=Del==1, rev_tree=rev_tree(T)}; btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> + % 1.0 stored #doc_info records in the seq tree. compact to upgrade. #doc_info{ id = Id, high_seq=KeySeq, @@ -310,14 +344,7 @@ btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Deleted, rev_tree=Tree}) -> - DiskTree = - couch_key_tree:map( - fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> - {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; - (_RevId, ?REV_MISSING) -> - ?REV_MISSING - end, Tree), - {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}. + {Id, {Seq, if Deleted -> 1; true -> 0 end, disk_tree(Tree)}}. btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> Tree = @@ -377,19 +404,19 @@ init_db(DbName, Filepath, Fd, Header0) -> "[before_header, after_header, on_file_open]")), case lists:member(on_file_open, FsyncOptions) of - true -> ok = couch_file:sync(Fd); + true -> ok = couch_file:sync(Filepath); _ -> ok end, - {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, - [{split, fun(X) -> btree_by_id_split(X) end}, - {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, - {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]), - {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, - [{split, fun(X) -> btree_by_seq_split(X) end}, - {join, fun(X,Y) -> btree_by_seq_join(X,Y) end}, - {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]), - {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), + {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd, + [{split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2}]), + {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd, + [{split, fun ?MODULE:btree_by_seq_split/1}, + {join, fun ?MODULE:btree_by_seq_join/2}, + {reduce, fun ?MODULE:btree_by_seq_reduce/2}]), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd), case Header#db_header.security_ptr of nil -> Security = [], @@ -401,15 +428,13 @@ init_db(DbName, Filepath, Fd, Header0) -> {MegaSecs, Secs, MicroSecs} = now(), StartTime = ?l2b(io_lib:format("~p", [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), - {ok, RefCntr} = couch_ref_counter:start([Fd]), #db{ - update_pid=self(), fd=Fd, - fd_ref_counter = RefCntr, + fd_monitor = erlang:monitor(process,Fd), header=Header, - fulldocinfo_by_id_btree = IdBtree, - docinfo_by_seq_btree = SeqBtree, - local_docs_btree = LocalDocsBtree, + id_tree = IdBtree, + seq_tree = SeqBtree, + local_tree = LocalDocsBtree, committed_update_seq = Header#db_header.update_seq, update_seq = Header#db_header.update_seq, name = DbName, @@ -422,8 +447,8 @@ init_db(DbName, Filepath, Fd, Header0) -> }. -close_db(#db{fd_ref_counter = RefCntr}) -> - couch_ref_counter:drop(RefCntr). +close_db(#db{fd_monitor = Ref}) -> + erlang:demonitor(Ref). refresh_validate_doc_funs(Db) -> @@ -435,7 +460,13 @@ refresh_validate_doc_funs(Db) -> Fun -> [Fun] end end, DesignDocs), - Db#db{validate_doc_funs=ProcessDocFuns}. + case Db#db.name of + <<"shards/", _:18/binary, DbName/binary>> -> + fabric:reset_validation_funs(DbName), + Db#db{validate_doc_funs=ProcessDocFuns}; + _ -> + Db#db{validate_doc_funs=ProcessDocFuns} + end. % rev tree functions @@ -563,14 +594,11 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], -new_index_entries([], AccById, AccBySeq) -> - {AccById, AccBySeq}; -new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> - #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo = - couch_doc:to_doc_info(FullDocInfo), - new_index_entries(RestInfos, - [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], - [DocInfo|AccBySeq]). +new_index_entries([], Acc) -> + Acc; +new_index_entries([Info|Rest], Acc) -> + #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info), + new_index_entries(Rest, [Info#full_doc_info{deleted=Del}|Acc]). stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> @@ -579,8 +607,8 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree, - docinfo_by_seq_btree = DocInfoBySeqBTree, + id_tree = DocInfoByIdBTree, + seq_tree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], @@ -607,16 +635,17 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> % the trees, the attachments are already written to disk) {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), - {IndexFullDocInfos, IndexDocInfos} = - new_index_entries(FlushedFullDocInfos, [], []), + IndexInfos = new_index_entries(FlushedFullDocInfos, []), % and the indexes - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, + IndexInfos, []), + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, + IndexInfos, RemoveSeqs), Db3 = Db2#db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree2, - docinfo_by_seq_btree = DocInfoBySeqBTree2, + id_tree = DocInfoByIdBTree2, + seq_tree = DocInfoBySeqBTree2, update_seq = NewSeq}, % Check if we just updated any design documents, and update the validation @@ -631,24 +660,26 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> {ok, commit_data(Db4, not FullCommit)}. -update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> +update_local_docs(#db{local_tree=Btree}=Db, Docs) -> Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) -> + fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, + _OldDocLookup) -> case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); [] -> PrevRev = 0 end, - OldRev = - case OldDocLookup of - {ok, {_, {OldRev0, _}}} -> OldRev0; - not_found -> 0 - end, - case OldRev == PrevRev of - true -> + %% disabled conflict checking for local docs -- APK 16 June 2010 + % OldRev = + % case OldDocLookup of + % {ok, {_, {OldRev0, _}}} -> OldRev0; + % not_found -> 0 + % end, + % case OldRev == PrevRev of + % true -> case Delete of false -> send_result(Client, Id, {0, PrevRevs}, {ok, @@ -658,11 +689,11 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> send_result(Client, Id, {0, PrevRevs}, {ok, {0, <<"0">>}}), {remove, Id} - end; - false -> - send_result(Client, Id, {0, PrevRevs}, conflict), - ignore - end + end%; + % false -> + % send_result(Client, Id, {0, PrevRevs}, conflict), + % ignore + % end end, Docs, OldDocLookups), BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], @@ -671,7 +702,7 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Db#db{local_docs_btree = Btree2}}. + {ok, Db#db{local_tree = Btree2}}. commit_data(Db) -> @@ -680,9 +711,9 @@ commit_data(Db) -> db_to_header(Db, Header) -> Header#db_header{ update_seq = Db#db.update_seq, - docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), - fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), - local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), + seq_tree_state = couch_btree:get_state(Db#db.seq_tree), + id_tree_state = couch_btree:get_state(Db#db.id_tree), + local_tree_state = couch_btree:get_state(Db#db.local_tree), security_ptr = Db#db.security_ptr, revs_limit = Db#db.revs_limit}. @@ -771,31 +802,36 @@ copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> (_, _, branch) -> ?REV_MISSING end, Tree). - -copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> - Ids = [Id || #doc_info{id=Id} <- InfoBySeq], - LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), +merge_lookups(Infos, []) -> + Infos; +merge_lookups([], _) -> + []; +merge_lookups([#doc_info{}|RestInfos], [{ok, FullDocInfo}|RestLookups]) -> + [FullDocInfo|merge_lookups(RestInfos, RestLookups)]; +merge_lookups([FullDocInfo|RestInfos], Lookups) -> + [FullDocInfo|merge_lookups(RestInfos, Lookups)]. + +copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) -> + % lookup any necessary full_doc_infos + DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], + LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), + Infos = merge_lookups(MixedInfos, LookupResults), % write out the attachments - NewFullDocInfos0 = lists:map( - fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)} - end, LookupResults), + NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, + DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos], + % write out the docs % we do this in 2 stages so the docs are written out contiguously, making % view indexing and replication faster. - NewFullDocInfos1 = lists:map( - fun(#full_doc_info{rev_tree=RevTree}=Info) -> - Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( - fun(_Key, {IsDel, DocBody, Seq}) -> - {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), - {IsDel, Pos, Seq} - end, RevTree)} - end, NewFullDocInfos0), - - NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), - NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], + NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( + fun(_Key, {IsDel, DocBody, Seq}) -> + {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), + {IsDel, Pos, Seq} + end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0], + + NewInfos = stem_full_doc_infos(Db, NewInfos1), RemoveSeqs = case Retry of false -> @@ -803,16 +839,16 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> true -> % We are retrying a compaction, meaning the documents we are copying may % already exist in our file and must be removed from the by_seq index. - Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids), + Ids = [Id || #full_doc_info{id=Id} <- Infos], + Existing = couch_btree:lookup(NewDb#db.id_tree, Ids), [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] end, - {ok, DocInfoBTree} = couch_btree:add_remove( - NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs), - {ok, FullDocInfoBTree} = couch_btree:add_remove( - NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), - NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree, - docinfo_by_seq_btree=DocInfoBTree}. + {ok, SeqTree} = couch_btree:add_remove( + NewDb#db.seq_tree, NewInfos, RemoveSeqs), + {ok, IdTree} = couch_btree:add_remove( + NewDb#db.id_tree, NewInfos, []), + NewDb#db{id_tree=IdTree, seq_tree=SeqTree}. @@ -821,13 +857,20 @@ copy_compact(Db, NewDb0, Retry) -> NewDb = NewDb0#db{fsync_options=FsyncOptions}, TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = - fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> + fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> + case DocInfo of + #full_doc_info{update_seq=Seq} -> + ok; + #doc_info{high_seq=Seq} -> + ok + end, couch_task_status:update("Copied ~p of ~p changes (~p%)", [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]), if TotalCopied rem 1000 =:= 0 -> NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), if TotalCopied rem 10000 =:= 0 -> - {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}}; + NewDb3 = commit_data(NewDb2#db{update_seq=Seq}), + {ok, {NewDb3, [], TotalCopied + 1}}; true -> {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}} end; @@ -839,7 +882,7 @@ copy_compact(Db, NewDb0, Retry) -> couch_task_status:set_update_frequency(500), {ok, _, {NewDb2, Uncopied, TotalChanges}} = - couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun, + couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, {NewDb, [], 0}, [{start_key, NewDb#db.update_seq + 1}]), -- cgit v1.2.3