summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_db_updater.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_db_updater.erl
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_db_updater.erl')
-rw-r--r--apps/couch/src/couch_db_updater.erl303
1 files changed, 173 insertions, 130 deletions
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl
index 19a4c165..e4f8d0ca 100644
--- a/apps/couch/src/couch_db_updater.erl
+++ b/apps/couch/src/couch_db_updater.erl
@@ -13,14 +13,14 @@
-module(couch_db_updater).
-behaviour(gen_server).
--export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([btree_by_id_split/1,btree_by_id_join/2,btree_by_id_reduce/2]).
+-export([btree_by_seq_split/1,btree_by_seq_join/2,btree_by_seq_reduce/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include("couch_db.hrl").
-init({MainPid, DbName, Filepath, Fd, Options}) ->
- process_flag(trap_exit, true),
+init({DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
@@ -44,25 +44,40 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
end,
Db = init_db(DbName, Filepath, Fd, Header),
- Db2 = refresh_validate_doc_funs(Db),
- {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}.
+ couch_stats_collector:track_process_count({couchdb, open_databases}),
+ % we don't load validation funs here because the fabric query is liable to
+ % race conditions. Instead see couch_db:validate_doc_update, which loads
+ % them lazily
+ {ok, Db#db{main_pid = self(), is_sys_db = lists:member(sys_db, Options)}}.
terminate(_Reason, Db) ->
couch_file:close(Db#db.fd),
couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd_ref_counter),
+ couch_util:shutdown_sync(Db#db.fd),
ok.
+handle_call(start_compact, _From, Db) ->
+ {noreply, NewDb} = handle_cast(start_compact, Db),
+ {reply, {ok, NewDb#db.compactor_pid}, NewDb};
+
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
+ {reply, ok, Db};
+handle_call(full_commit, _From, Db) ->
+ {reply, ok, commit_data(Db)};
+
+handle_call({full_commit, _}, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call({full_commit, RequiredSeq}, _From, Db) when RequiredSeq =<
+ Db#db.committed_update_seq ->
+ {reply, ok, Db};
+handle_call({full_commit, _}, _, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_db_update_notifier:notify({updated, Db#db.name}),
{reply, {ok, Db2#db.update_seq}, Db2};
@@ -70,13 +85,13 @@ handle_call({set_security, NewSec}, _From, Db) ->
{ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call({set_revs_limit, Limit}, _From, Db) ->
Db2 = commit_data(Db#db{revs_limit=Limit,
update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call({purge_docs, _IdRevs}, _From,
@@ -85,8 +100,8 @@ handle_call({purge_docs, _IdRevs}, _From,
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
fd=Fd,
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
+ id_tree = DocInfoByIdBTree,
+ seq_tree = DocInfoBySeqBTree,
update_seq = LastSeq,
header = Header = #db_header{purge_seq=PurgeSeq}
} = Db,
@@ -136,29 +151,32 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
Db2 = commit_data(
Db#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ id_tree = DocInfoByIdBTree2,
+ seq_tree = DocInfoBySeqBTree2,
update_seq = NewSeq + 1,
header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2};
-handle_call(start_compact, _From, Db) ->
+ {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}.
+
+
+handle_cast({load_validation_funs, ValidationFuns}, Db) ->
+ Db2 = Db#db{validate_doc_funs = ValidationFuns},
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
+handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2};
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
_ ->
% compact currently running, this is a no-op
- {reply, ok, Db}
- end.
-
-
-
+ {noreply, Db}
+ end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
{ok, NewHeader} = couch_file:read_header(NewFd),
@@ -168,13 +186,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
case Db#db.update_seq == NewSeq of
true ->
% suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
NewDb2 = commit_data(NewDb#db{
- local_docs_btree = NewLocalBtree,
- main_pid = Db#db.main_pid,
+ local_tree = NewLocalBtree,
+ main_pid = self(),
filepath = Filepath,
instance_start_time = Db#db.instance_start_time,
revs_limit = Db#db.revs_limit
@@ -186,13 +204,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
couch_file:delete(RootDir, Filepath),
ok = file:rename(CompactFilepath, Filepath),
close_db(Db),
- ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
+ ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity),
?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
{noreply, NewDb2#db{compactor_pid=nil}};
false ->
- ?LOG_INFO("Compaction file still behind main file "
+ ?LOG_INFO("Compaction for ~s still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
+ [Db#db.name, Db#db.update_seq, NewSeq]),
close_db(NewDb),
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
@@ -215,7 +233,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
FullCommit2) of
{ok, Db2} ->
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
if Db2#db.update_seq /= Db#db.update_seq ->
couch_db_update_notifier:notify({updated, Db2#db.name});
true -> ok
@@ -235,13 +253,16 @@ handle_info(delayed_commit, Db) ->
Db ->
{noreply, Db};
Db2 ->
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2}
end;
handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db};
handle_info({'EXIT', _Pid, Reason}, Db) ->
- {stop, Reason, Db}.
+ {stop, Reason, Db};
+handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
+ ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]),
+ {stop, normal, Db}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -279,14 +300,27 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
end.
-btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) ->
- RevInfos = [{Rev, Seq, Bp} ||
- #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs],
- DeletedRevInfos = [{Rev, Seq, Bp} ||
- #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs],
- {KeySeq,{Id, RevInfos, DeletedRevInfos}}.
+rev_tree(DiskTree) ->
+ couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
+ {IsDeleted == 1, BodyPointer, UpdateSeq};
+ (_RevId, ?REV_MISSING) ->
+ ?REV_MISSING
+ end, DiskTree).
+
+disk_tree(RevTree) ->
+ couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
+ {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq};
+ (_RevId, ?REV_MISSING) ->
+ ?REV_MISSING
+ end, RevTree).
+btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) ->
+ {Seq, {Id, if Del -> 1; true -> 0 end, disk_tree(T)}}.
+
+btree_by_seq_join(Seq, {Id, Del, T}) when is_integer(Del) ->
+ #full_doc_info{id=Id, update_seq=Seq, deleted=Del==1, rev_tree=rev_tree(T)};
btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
+ % 1.0 stored #doc_info records in the seq tree. compact to upgrade.
#doc_info{
id = Id,
high_seq=KeySeq,
@@ -310,14 +344,7 @@ btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
deleted=Deleted, rev_tree=Tree}) ->
- DiskTree =
- couch_key_tree:map(
- fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
- {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq};
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING
- end, Tree),
- {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}.
+ {Id, {Seq, if Deleted -> 1; true -> 0 end, disk_tree(Tree)}}.
btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
Tree =
@@ -377,19 +404,19 @@ init_db(DbName, Filepath, Fd, Header0) ->
"[before_header, after_header, on_file_open]")),
case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
+ true -> ok = couch_file:sync(Filepath);
_ -> ok
end,
- {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
- [{split, fun(X) -> btree_by_id_split(X) end},
- {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
- {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
- [{split, fun(X) -> btree_by_seq_split(X) end},
- {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
- {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+ {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd,
+ [{split, fun ?MODULE:btree_by_id_split/1},
+ {join, fun ?MODULE:btree_by_id_join/2},
+ {reduce, fun ?MODULE:btree_by_id_reduce/2}]),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd,
+ [{split, fun ?MODULE:btree_by_seq_split/1},
+ {join, fun ?MODULE:btree_by_seq_join/2},
+ {reduce, fun ?MODULE:btree_by_seq_reduce/2}]),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd),
case Header#db_header.security_ptr of
nil ->
Security = [],
@@ -401,15 +428,13 @@ init_db(DbName, Filepath, Fd, Header0) ->
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- {ok, RefCntr} = couch_ref_counter:start([Fd]),
#db{
- update_pid=self(),
fd=Fd,
- fd_ref_counter = RefCntr,
+ fd_monitor = erlang:monitor(process,Fd),
header=Header,
- fulldocinfo_by_id_btree = IdBtree,
- docinfo_by_seq_btree = SeqBtree,
- local_docs_btree = LocalDocsBtree,
+ id_tree = IdBtree,
+ seq_tree = SeqBtree,
+ local_tree = LocalDocsBtree,
committed_update_seq = Header#db_header.update_seq,
update_seq = Header#db_header.update_seq,
name = DbName,
@@ -422,8 +447,8 @@ init_db(DbName, Filepath, Fd, Header0) ->
}.
-close_db(#db{fd_ref_counter = RefCntr}) ->
- couch_ref_counter:drop(RefCntr).
+close_db(#db{fd_monitor = Ref}) ->
+ erlang:demonitor(Ref).
refresh_validate_doc_funs(Db) ->
@@ -435,7 +460,13 @@ refresh_validate_doc_funs(Db) ->
Fun -> [Fun]
end
end, DesignDocs),
- Db#db{validate_doc_funs=ProcessDocFuns}.
+ case Db#db.name of
+ <<"shards/", _:18/binary, DbName/binary>> ->
+ fabric:reset_validation_funs(DbName),
+ Db#db{validate_doc_funs=ProcessDocFuns};
+ _ ->
+ Db#db{validate_doc_funs=ProcessDocFuns}
+ end.
% rev tree functions
@@ -563,14 +594,11 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
-new_index_entries([], AccById, AccBySeq) ->
- {AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
- #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo =
- couch_doc:to_doc_info(FullDocInfo),
- new_index_entries(RestInfos,
- [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
- [DocInfo|AccBySeq]).
+new_index_entries([], Acc) ->
+ Acc;
+new_index_entries([Info|Rest], Acc) ->
+ #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info),
+ new_index_entries(Rest, [Info#full_doc_info{deleted=Del}|Acc]).
stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
@@ -579,8 +607,8 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
+ id_tree = DocInfoByIdBTree,
+ seq_tree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
@@ -607,16 +635,17 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
% the trees, the attachments are already written to disk)
{ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
- {IndexFullDocInfos, IndexDocInfos} =
- new_index_entries(FlushedFullDocInfos, [], []),
+ IndexInfos = new_index_entries(FlushedFullDocInfos, []),
% and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
+ IndexInfos, []),
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
+ IndexInfos, RemoveSeqs),
Db3 = Db2#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ id_tree = DocInfoByIdBTree2,
+ seq_tree = DocInfoBySeqBTree2,
update_seq = NewSeq},
% Check if we just updated any design documents, and update the validation
@@ -631,24 +660,26 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
{ok, commit_data(Db4, not FullCommit)}.
-update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) ->
+ fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}},
+ _OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
PrevRev = list_to_integer(?b2l(RevStr));
[] ->
PrevRev = 0
end,
- OldRev =
- case OldDocLookup of
- {ok, {_, {OldRev0, _}}} -> OldRev0;
- not_found -> 0
- end,
- case OldRev == PrevRev of
- true ->
+ %% disabled conflict checking for local docs -- APK 16 June 2010
+ % OldRev =
+ % case OldDocLookup of
+ % {ok, {_, {OldRev0, _}}} -> OldRev0;
+ % not_found -> 0
+ % end,
+ % case OldRev == PrevRev of
+ % true ->
case Delete of
false ->
send_result(Client, Id, {0, PrevRevs}, {ok,
@@ -658,11 +689,11 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
send_result(Client, Id, {0, PrevRevs},
{ok, {0, <<"0">>}}),
{remove, Id}
- end;
- false ->
- send_result(Client, Id, {0, PrevRevs}, conflict),
- ignore
- end
+ end%;
+ % false ->
+ % send_result(Client, Id, {0, PrevRevs}, conflict),
+ % ignore
+ % end
end, Docs, OldDocLookups),
BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
@@ -671,7 +702,7 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Db#db{local_docs_btree = Btree2}}.
+ {ok, Db#db{local_tree = Btree2}}.
commit_data(Db) ->
@@ -680,9 +711,9 @@ commit_data(Db) ->
db_to_header(Db, Header) ->
Header#db_header{
update_seq = Db#db.update_seq,
- docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
- fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ seq_tree_state = couch_btree:get_state(Db#db.seq_tree),
+ id_tree_state = couch_btree:get_state(Db#db.id_tree),
+ local_tree_state = couch_btree:get_state(Db#db.local_tree),
security_ptr = Db#db.security_ptr,
revs_limit = Db#db.revs_limit}.
@@ -771,31 +802,36 @@ copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
(_, _, branch) ->
?REV_MISSING
end, Tree).
-
-copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
- Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
- LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+merge_lookups(Infos, []) ->
+ Infos;
+merge_lookups([], _) ->
+ [];
+merge_lookups([#doc_info{}|RestInfos], [{ok, FullDocInfo}|RestLookups]) ->
+ [FullDocInfo|merge_lookups(RestInfos, RestLookups)];
+merge_lookups([FullDocInfo|RestInfos], Lookups) ->
+ [FullDocInfo|merge_lookups(RestInfos, Lookups)].
+
+copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
+ % lookup any necessary full_doc_infos
+ DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
+ LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
+ Infos = merge_lookups(MixedInfos, LookupResults),
% write out the attachments
- NewFullDocInfos0 = lists:map(
- fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)}
- end, LookupResults),
+ NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db,
+ DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos],
+
% write out the docs
% we do this in 2 stages so the docs are written out contiguously, making
% view indexing and replication faster.
- NewFullDocInfos1 = lists:map(
- fun(#full_doc_info{rev_tree=RevTree}=Info) ->
- Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
- fun(_Key, {IsDel, DocBody, Seq}) ->
- {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
- {IsDel, Pos, Seq}
- end, RevTree)}
- end, NewFullDocInfos0),
-
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
- NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
+ NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+ fun(_Key, {IsDel, DocBody, Seq}) ->
+ {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
+ {IsDel, Pos, Seq}
+ end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0],
+
+ NewInfos = stem_full_doc_infos(Db, NewInfos1),
RemoveSeqs =
case Retry of
false ->
@@ -803,16 +839,16 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
true ->
% We are retrying a compaction, meaning the documents we are copying may
% already exist in our file and must be removed from the by_seq index.
- Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids),
+ Ids = [Id || #full_doc_info{id=Id} <- Infos],
+ Existing = couch_btree:lookup(NewDb#db.id_tree, Ids),
[Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
end,
- {ok, DocInfoBTree} = couch_btree:add_remove(
- NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
- {ok, FullDocInfoBTree} = couch_btree:add_remove(
- NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
- NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
- docinfo_by_seq_btree=DocInfoBTree}.
+ {ok, SeqTree} = couch_btree:add_remove(
+ NewDb#db.seq_tree, NewInfos, RemoveSeqs),
+ {ok, IdTree} = couch_btree:add_remove(
+ NewDb#db.id_tree, NewInfos, []),
+ NewDb#db{id_tree=IdTree, seq_tree=SeqTree}.
@@ -821,13 +857,20 @@ copy_compact(Db, NewDb0, Retry) ->
NewDb = NewDb0#db{fsync_options=FsyncOptions},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
- fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ case DocInfo of
+ #full_doc_info{update_seq=Seq} ->
+ ok;
+ #doc_info{high_seq=Seq} ->
+ ok
+ end,
couch_task_status:update("Copied ~p of ~p changes (~p%)",
[TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
if TotalCopied rem 1000 =:= 0 ->
NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
if TotalCopied rem 10000 =:= 0 ->
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
+ NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
+ {ok, {NewDb3, [], TotalCopied + 1}};
true ->
{ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
end;
@@ -839,7 +882,7 @@ copy_compact(Db, NewDb0, Retry) ->
couch_task_status:set_update_frequency(500),
{ok, _, {NewDb2, Uncopied, TotalChanges}} =
- couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
+ couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
{NewDb, [], 0},
[{start_key, NewDb#db.update_seq + 1}]),