summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-09-11 19:26:09 +0000
committerDamien F. Katz <damien@apache.org>2008-09-11 19:26:09 +0000
commit634b1b193acc24b95326c74e615d723043516f16 (patch)
treefaedbfddee34a23083b34998572ceb1501a8747f /src/couchdb/couch_db_updater.erl
parent37ca97c918f4b5316e4293d8f1001bb87b8dfb0c (diff)
Check-in of document purge functionality.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@694430 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl162
1 files changed, 95 insertions, 67 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index cc916961..a368ccac 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -58,14 +58,77 @@ handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, Db2#db.update_seq}, Db2}.
+ {reply, {ok, Db2#db.update_seq}, Db2};
+handle_call({purge_docs, _IdRevs}, _From,
+ #db{compactor_pid=Pid}=Db) when Pid /= nil ->
+ {reply, {error, purge_during_compaction}, Db};
+handle_call({purge_docs, IdRevs}, _From, Db) ->
+ #db{
+ fd=Fd,
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ update_seq = LastSeq,
+ header = Header = #db_header{purge_seq=PurgeSeq}
+ } = Db,
+ DocLookups = couch_btree:lookup(DocInfoByIdBTree,
+ [Id || {Id, _Revs} <- IdRevs]),
+
+ NewDocInfos = lists:zipwith(
+ fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []=_RemovedRevs} -> % no change
+ nil;
+ {NewTree, RemovedRevs} ->
+ {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+ end;
+ (_, not_found) ->
+ nil
+ end,
+ IdRevs, DocLookups),
+
+ SeqsToRemove = [Seq
+ || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
+
+ FullDocInfoToUpdate = [FullInfo
+ || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
+ <- NewDocInfos, Tree /= []],
+ IdRevsPurged = [{Id, Revs}
+ || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
+
+ {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
+ fun(FullInfo, SeqAcc) ->
+ Info = couch_doc:to_doc_info(FullInfo),
+ {Info#doc_info{update_seq=SeqAcc + 1}, SeqAcc + 1}
+ end, LastSeq, FullDocInfoToUpdate),
+
+ IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=Tree},_}
+ <- NewDocInfos, Tree == []],
+
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
+ DocInfoToUpdate, SeqsToRemove),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
+ FullDocInfoToUpdate, IdsToRemove),
+ {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged),
+
+ Db2 = commit_data(
+ Db#db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ update_seq = NewSeq,
+ header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
+
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db#db.name}),
+ {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}.
+
+
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db) end),
Db2 = Db#db{compactor_pid=Pid},
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
{noreply, Db2};
@@ -80,14 +143,16 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
init_db(Db#db.name, CompactFilepath, NewFd, NewHeader),
case Db#db.update_seq == NewSeq of
true ->
- NewDb2 = commit_data(
- NewDb#db{
- main_pid = Db#db.main_pid,
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count,
- filepath = Filepath}),
+ % suck up all the local docs into memory and write them to the new db
+ {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
+
+ NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree,
+ main_pid = Db#db.main_pid,filepath = Filepath}),
- ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+ ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
+ [Filepath, CompactFilepath]),
file:delete(Filepath),
ok = file:rename(CompactFilepath, Filepath),
@@ -102,7 +167,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
?LOG_INFO("Compaction file still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
[Db#db.update_seq, NewSeq]),
- Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db) end),
Db2 = Db#db{compactor_pid=Pid},
couch_file:close(NewFd),
{noreply, Db2}
@@ -143,14 +208,14 @@ btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
#full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
-
-
btree_by_id_reduce(reduce, FullDocInfos) ->
% count the number of not deleted documents
- length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
+ {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]),
+ length([1 || #full_doc_info{deleted=true} <- FullDocInfos])};
btree_by_id_reduce(rereduce, Reds) ->
- lists:sum(Reds).
+ {lists:sum([Count || {Count,_} <- Reds]),
+ lists:sum([DelCount || {_, DelCount} <- Reds])}.
btree_by_seq_reduce(reduce, DocInfos) ->
% count the number of documents
@@ -188,8 +253,6 @@ init_db(DbName, Filepath, Fd, Header) ->
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
update_seq = Header#db_header.update_seq,
- doc_count = Header#db_header.doc_count,
- doc_del_count = Header#db_header.doc_del_count,
name = DbName,
filepath=Filepath }.
@@ -270,15 +333,11 @@ merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
[NewInfo|AccNewInfos],AccSeq+1)
end.
-new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
- {ok, DocCount, DelCount, AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
+new_index_entries([], AccById, AccBySeq) ->
+ {ok, AccById, AccBySeq};
+new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
#doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
- {DocCount2, DelCount2} =
- if Deleted -> {DocCount, DelCount + 1};
- true -> {DocCount + 1, DelCount}
- end,
- new_index_entries(RestInfos, DocCount2, DelCount2,
+ new_index_entries(RestInfos,
[FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
[DocInfo|AccBySeq]).
@@ -286,9 +345,7 @@ update_docs_int(Db, DocsList, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- doc_count = FullDocCount,
- doc_del_count = FullDelCount
+ update_seq = LastSeq
} = Db,
% separate out the NonRep documents from the rest of the documents
@@ -305,7 +362,7 @@ update_docs_int(Db, DocsList, Options) ->
Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
- % lookup up the existing documents, if they exist.
+ % lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
fun(_Id, {ok, FullDocInfo}) ->
@@ -315,18 +372,6 @@ update_docs_int(Db, DocsList, Options) ->
end,
Ids, OldDocLookups),
- {OldCount, OldDelCount} = lists:foldl(
- fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{deleted=false} ->
- {OldCountAcc + 1, OldDelCountAcc};
- _ ->
- {OldCountAcc, OldDelCountAcc + 1}
- end;
- (not_found, Acc) ->
- Acc
- end, {0, 0}, OldDocLookups),
-
% Merge the new docs into the revision trees.
NoConflicts = lists:member(new_edits, Options),
{ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
@@ -339,11 +384,10 @@ update_docs_int(Db, DocsList, Options) ->
% Try to write the local documents first, a conflict might be generated
{ok, Db2} = update_local_docs(Db, NonRepDocs),
- % Write out the documents summaries (they are stored in the nodes of the rev trees)
+ % Write out the document summaries (they are stored in the nodes of the rev trees)
{ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
- {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
- new_index_entries(FlushedDocInfos, 0, 0, [], []),
+ {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []),
% and the indexes to the documents
{ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
@@ -352,9 +396,7 @@ update_docs_int(Db, DocsList, Options) ->
Db3 = Db2#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree2,
docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq,
- doc_count = FullDocCount + NewDocsCount - OldCount,
- doc_del_count = FullDelCount + NewDelCount - OldDelCount},
+ update_seq = NewSeq},
case lists:member(delay_commit, Options) of
true ->
@@ -406,9 +448,7 @@ commit_data(#db{fd=Fd, header=Header} = Db) ->
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree)
},
if Header == Header2 ->
Db; % unchanged. nothing to do
@@ -476,7 +516,7 @@ copy_compact_docs(Db, NewDb) ->
NewDb2
end.
-start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
@@ -484,24 +524,12 @@ start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
{ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
{error, enoent} ->
- receive after 1000 -> ok end,
{ok, Fd} = couch_file:open(CompactFile, [create]),
- Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- NewDb2 = copy_compact_docs(Db, NewDb),
- NewDb3 =
- case CopyLocal of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
- commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
- _ ->
- NewDb2
- end,
- close_db(NewDb3),
+ NewDb2 = commit_data(copy_compact_docs(Db, NewDb)),
+ close_db(NewDb2),
+
+ gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
- gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). \ No newline at end of file