summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-09-11 19:26:09 +0000
committerDamien F. Katz <damien@apache.org>2008-09-11 19:26:09 +0000
commit634b1b193acc24b95326c74e615d723043516f16 (patch)
treefaedbfddee34a23083b34998572ceb1501a8747f /src
parent37ca97c918f4b5316e4293d8f1001bb87b8dfb0c (diff)
Check-in of document purge functionality.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@694430 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_btree.erl4
-rw-r--r--src/couchdb/couch_db.erl32
-rw-r--r--src/couchdb/couch_db.hrl6
-rw-r--r--src/couchdb/couch_db_updater.erl162
-rw-r--r--src/couchdb/couch_httpd.erl15
-rw-r--r--src/couchdb/couch_key_tree.erl37
-rw-r--r--src/couchdb/couch_view.erl109
7 files changed, 262 insertions, 103 deletions
diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl
index a20e1a9f..30575090 100644
--- a/src/couchdb/couch_btree.erl
+++ b/src/couchdb/couch_btree.erl
@@ -298,8 +298,8 @@ reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
[];
reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
R(rereduce, [Red || {_K, {_P, Red}} <- NodeList]);
-reduce_node(#btree{reduce=R}, kv_node, NodeList) ->
- R(reduce, NodeList).
+reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) ->
+ R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).
get_node(#btree{fd = Fd}, NodePos) ->
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 4bcefdcd..823de72e 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -20,7 +20,7 @@
-export([get_missing_revs/2]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
--export([increment_update_seq/1]).
+-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
@@ -148,18 +148,31 @@ get_full_doc_infos(Db, Ids) ->
increment_update_seq(#db{update_pid=UpdatePid}) ->
gen_server:call(UpdatePid, increment_update_seq).
-
+
+purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
+ gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+
+
+get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
+ PurgeSeq.
+
+get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
+ {ok, []};
+get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
+ couch_file:pread_term(Fd, PurgedPointer).
+
get_db_info(Db) ->
#db{fd=Fd,
compactor_pid=Compactor,
- doc_count=Count,
- doc_del_count=DelCount,
- update_seq=SeqNum} = Db,
+ update_seq=SeqNum,
+ fulldocinfo_by_id_btree=FullDocBtree} = Db,
{ok, Size} = couch_file:bytes(Fd),
+ {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
InfoList = [
{doc_count, Count},
{doc_del_count, DelCount},
{update_seq, SeqNum},
+ {purge_seq, couch_db:get_purge_seq(Db)},
{compact_running, Compactor/=nil},
{disk_size, Size}
],
@@ -263,7 +276,7 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
Db2 = open_ref_counted(Db#db.main_pid, self()),
DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
% We only retry once
- ok = close(Db2),
+ close(Db2),
case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
ok -> {ok, NewRevs};
Else -> throw(Else)
@@ -335,10 +348,13 @@ doc_flush_binaries(Doc, Fd) ->
Doc#doc{attachments = NewBins}.
enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+ couch_btree:final_reduce(
+ fun couch_db_updater:btree_by_seq_reduce/2, Reds).
enum_docs_reduce_to_count(Reds) ->
- couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds).
+ {Count, _DelCount} = couch_btree:final_reduce(
+ fun couch_db_updater:btree_by_id_reduce/2, Reds),
+ Count.
enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) ->
couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index fa604108..f4533146 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -84,8 +84,8 @@
fulldocinfo_by_id_btree_state = nil,
docinfo_by_seq_btree_state = nil,
local_docs_btree_state = nil,
- doc_count=0,
- doc_del_count=0
+ purge_seq = 0,
+ purged_docs = nil
}).
-record(db,
@@ -99,8 +99,6 @@
docinfo_by_seq_btree,
local_docs_btree,
update_seq,
- doc_count,
- doc_del_count,
name,
filepath
}).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index cc916961..a368ccac 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -58,14 +58,77 @@ handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, Db2#db.update_seq}, Db2}.
+ {reply, {ok, Db2#db.update_seq}, Db2};
+handle_call({purge_docs, _IdRevs}, _From,
+ #db{compactor_pid=Pid}=Db) when Pid /= nil ->
+ {reply, {error, purge_during_compaction}, Db};
+handle_call({purge_docs, IdRevs}, _From, Db) ->
+ #db{
+ fd=Fd,
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ update_seq = LastSeq,
+ header = Header = #db_header{purge_seq=PurgeSeq}
+ } = Db,
+ DocLookups = couch_btree:lookup(DocInfoByIdBTree,
+ [Id || {Id, _Revs} <- IdRevs]),
+
+ NewDocInfos = lists:zipwith(
+ fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []=_RemovedRevs} -> % no change
+ nil;
+ {NewTree, RemovedRevs} ->
+ {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+ end;
+ (_, not_found) ->
+ nil
+ end,
+ IdRevs, DocLookups),
+
+ SeqsToRemove = [Seq
+ || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
+
+ FullDocInfoToUpdate = [FullInfo
+ || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
+ <- NewDocInfos, Tree /= []],
+ IdRevsPurged = [{Id, Revs}
+ || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
+
+ {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
+ fun(FullInfo, SeqAcc) ->
+ Info = couch_doc:to_doc_info(FullInfo),
+ {Info#doc_info{update_seq=SeqAcc + 1}, SeqAcc + 1}
+ end, LastSeq, FullDocInfoToUpdate),
+
+ IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=Tree},_}
+ <- NewDocInfos, Tree == []],
+
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
+ DocInfoToUpdate, SeqsToRemove),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
+ FullDocInfoToUpdate, IdsToRemove),
+ {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged),
+
+ Db2 = commit_data(
+ Db#db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ update_seq = NewSeq,
+ header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
+
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db#db.name}),
+ {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}.
+
+
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db) end),
Db2 = Db#db{compactor_pid=Pid},
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
{noreply, Db2};
@@ -80,14 +143,16 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
init_db(Db#db.name, CompactFilepath, NewFd, NewHeader),
case Db#db.update_seq == NewSeq of
true ->
- NewDb2 = commit_data(
- NewDb#db{
- main_pid = Db#db.main_pid,
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count,
- filepath = Filepath}),
+ % suck up all the local docs into memory and write them to the new db
+ {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
+
+ NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree,
+ main_pid = Db#db.main_pid,filepath = Filepath}),
- ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+ ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
+ [Filepath, CompactFilepath]),
file:delete(Filepath),
ok = file:rename(CompactFilepath, Filepath),
@@ -102,7 +167,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
?LOG_INFO("Compaction file still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
[Db#db.update_seq, NewSeq]),
- Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db) end),
Db2 = Db#db{compactor_pid=Pid},
couch_file:close(NewFd),
{noreply, Db2}
@@ -143,14 +208,14 @@ btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
#full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
-
-
btree_by_id_reduce(reduce, FullDocInfos) ->
% count the number of not deleted documents
- length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
+ {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]),
+ length([1 || #full_doc_info{deleted=true} <- FullDocInfos])};
btree_by_id_reduce(rereduce, Reds) ->
- lists:sum(Reds).
+ {lists:sum([Count || {Count,_} <- Reds]),
+ lists:sum([DelCount || {_, DelCount} <- Reds])}.
btree_by_seq_reduce(reduce, DocInfos) ->
% count the number of documents
@@ -188,8 +253,6 @@ init_db(DbName, Filepath, Fd, Header) ->
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
update_seq = Header#db_header.update_seq,
- doc_count = Header#db_header.doc_count,
- doc_del_count = Header#db_header.doc_del_count,
name = DbName,
filepath=Filepath }.
@@ -270,15 +333,11 @@ merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
[NewInfo|AccNewInfos],AccSeq+1)
end.
-new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
- {ok, DocCount, DelCount, AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
+new_index_entries([], AccById, AccBySeq) ->
+ {ok, AccById, AccBySeq};
+new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
#doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
- {DocCount2, DelCount2} =
- if Deleted -> {DocCount, DelCount + 1};
- true -> {DocCount + 1, DelCount}
- end,
- new_index_entries(RestInfos, DocCount2, DelCount2,
+ new_index_entries(RestInfos,
[FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
[DocInfo|AccBySeq]).
@@ -286,9 +345,7 @@ update_docs_int(Db, DocsList, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- doc_count = FullDocCount,
- doc_del_count = FullDelCount
+ update_seq = LastSeq
} = Db,
% separate out the NonRep documents from the rest of the documents
@@ -305,7 +362,7 @@ update_docs_int(Db, DocsList, Options) ->
Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
- % lookup up the existing documents, if they exist.
+ % lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
fun(_Id, {ok, FullDocInfo}) ->
@@ -315,18 +372,6 @@ update_docs_int(Db, DocsList, Options) ->
end,
Ids, OldDocLookups),
- {OldCount, OldDelCount} = lists:foldl(
- fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{deleted=false} ->
- {OldCountAcc + 1, OldDelCountAcc};
- _ ->
- {OldCountAcc, OldDelCountAcc + 1}
- end;
- (not_found, Acc) ->
- Acc
- end, {0, 0}, OldDocLookups),
-
% Merge the new docs into the revision trees.
NoConflicts = lists:member(new_edits, Options),
{ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
@@ -339,11 +384,10 @@ update_docs_int(Db, DocsList, Options) ->
% Try to write the local documents first, a conflict might be generated
{ok, Db2} = update_local_docs(Db, NonRepDocs),
- % Write out the documents summaries (they are stored in the nodes of the rev trees)
+ % Write out the document summaries (they are stored in the nodes of the rev trees)
{ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
- {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
- new_index_entries(FlushedDocInfos, 0, 0, [], []),
+ {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []),
% and the indexes to the documents
{ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
@@ -352,9 +396,7 @@ update_docs_int(Db, DocsList, Options) ->
Db3 = Db2#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree2,
docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq,
- doc_count = FullDocCount + NewDocsCount - OldCount,
- doc_del_count = FullDelCount + NewDelCount - OldDelCount},
+ update_seq = NewSeq},
case lists:member(delay_commit, Options) of
true ->
@@ -406,9 +448,7 @@ commit_data(#db{fd=Fd, header=Header} = Db) ->
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree)
},
if Header == Header2 ->
Db; % unchanged. nothing to do
@@ -476,7 +516,7 @@ copy_compact_docs(Db, NewDb) ->
NewDb2
end.
-start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
@@ -484,24 +524,12 @@ start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
{ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
{error, enoent} ->
- receive after 1000 -> ok end,
{ok, Fd} = couch_file:open(CompactFile, [create]),
- Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- NewDb2 = copy_compact_docs(Db, NewDb),
- NewDb3 =
- case CopyLocal of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
- commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
- _ ->
- NewDb2
- end,
- close_db(NewDb3),
+ NewDb2 = commit_data(copy_compact_docs(Db, NewDb)),
+ close_db(NewDb2),
+
+ gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
- gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). \ No newline at end of file
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index bdc172a3..3857f40d 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -310,6 +310,21 @@ handle_db_request(Req, 'POST', {_DbName, Db, [<<"_compact">>]}) ->
handle_db_request(_Req, _Method, {_DbName, _Db, [<<"_compact">>]}) ->
throw({method_not_allowed, "POST"});
+handle_db_request(Req, 'POST', {_DbName, Db, [<<"_purge">>]}) ->
+ {IdsRevs} = ?JSON_DECODE(Req:recv_body(?MAX_DOC_SIZE)),
+ % validate the json input
+ [{_Id, [_|_]=_Revs} = IdRevs || IdRevs <- IdsRevs],
+
+ case couch_db:purge_docs(Db, IdsRevs) of
+ {ok, PurgeSeq, PurgedIdsRevs} ->
+ send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs}}]});
+ Error ->
+ throw(Error)
+ end;
+
+handle_db_request(_Req, _Method, {_DbName, _Db, [<<"_purge">>]}) ->
+ throw({method_not_allowed, "POST"});
+
% View request handlers
handle_db_request(Req, 'GET', {_DbName, Db, [<<"_all_docs">>]}) ->
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
index 5bb80be1..3a05fd4d 100644
--- a/src/couchdb/couch_key_tree.erl
+++ b/src/couchdb/couch_key_tree.erl
@@ -13,7 +13,7 @@
-module(couch_key_tree).
-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
--export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]).
+-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1, remove_leafs/2]).
% a key tree looks like this:
% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
@@ -53,7 +53,40 @@ find_missing([{Key, _, SubTree} | RestTree], Keys) ->
SrcKeys2 = Keys -- [Key],
SrcKeys3 = find_missing(SubTree, SrcKeys2),
find_missing(RestTree, SrcKeys3).
+
+
+get_all_key_paths_rev([], KeyPathAcc) ->
+ KeyPathAcc;
+get_all_key_paths_rev([{Key, Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_key_paths_rev(SubTree, [{Key, Value} | KeyPathAcc]) ++
+ get_all_key_paths_rev(RestTree, KeyPathAcc).
+
+
+% Removes any branches from the tree whose leaf node(s) are in the Keys
+remove_leafs(Tree, Keys) ->
+ % flatten each branch in a tree into a tree path
+ Paths = get_all_key_paths_rev(Tree, []),
+ % filter out any that are in the keys list.
+ {FoundKeys, FilteredPaths} = lists:mapfoldl(
+ fun(Key, PathsAcc) ->
+ case [Path || [{LeafKey,_}|_]=Path <- PathsAcc, LeafKey /= Key] of
+ PathsAcc ->
+ {nil, PathsAcc};
+ PathsAcc2 ->
+ {Key, PathsAcc2}
+ end
+ end, Paths, Keys),
+
+ % convert paths back to trees
+ NewTree = lists:foldl(
+ fun(Path,TreeAcc) ->
+ SingleTree = lists:foldl(
+ fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
+ merge(TreeAcc, SingleTree)
+ end, [], FilteredPaths),
+ {NewTree, FoundKeys}.
+
% get the leafs in the tree matching the keys. The matching key nodes can be
% leafs or an inner nodes. If an inner node, then the leafs for that node
@@ -83,7 +116,7 @@ get(Tree, KeysToGet) ->
{KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
{FixedResults, KeysNotFound}.
-
+
get_full_key_paths(Tree, Keys) ->
get_full_key_paths(Tree, Keys, []).
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 5b3105f1..9414e4fa 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -28,6 +28,7 @@
views,
id_btree=nil,
current_seq=0,
+ purge_seq=0,
query_server=nil
}).
@@ -43,6 +44,14 @@
{root_dir
}).
+-record(index_header,
+ {seq=0,
+ purge_seq=0,
+ id_btree_state=nil,
+ view_states=nil
+ }).
+
+
start_link() ->
gen_server:start_link({local, couch_view}, couch_view, [], []).
@@ -412,9 +421,17 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
{ok, Fd} ->
Sig = Group#group.sig,
case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
- {ok, {Sig, HeaderInfo}} ->
+ {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
% sigs match!
- init_group(Db, Fd, Group, HeaderInfo);
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of
+ true ->
+ % We can only use index with the same, or next purge seq as the
+ % db.
+ init_group(Db, Fd, Group, HeaderInfo);
+ false ->
+ reset_file(Db, Fd, DbName, Group)
+ end;
_ ->
reset_file(Db, Fd, DbName, Group)
end;
@@ -424,6 +441,7 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
Error -> throw(Error)
end
end,
+
couch_db:monitor(Db),
couch_db:close(Db),
update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
@@ -479,24 +497,67 @@ get_notify_pids() ->
[]
end.
-update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) ->
- ViewEmptyKVs = [{View, []} || View <- Views],
+purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
+ {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+ Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
+ {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+
+ % now populate the dictionary with all the keys to delete
+ ViewKeysToRemoveDict = lists:foldl(
+ fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->
+ lists:foldl(
+ fun({ViewNum, RowKey}, ViewDictAcc2) ->
+ dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)
+ end, ViewDictAcc, ViewNumRowKeys);
+ ({not_found, _}, ViewDictAcc) ->
+ ViewDictAcc
+ end, dict:new(), Lookups),
+
+ % Now remove the values from the btrees
+ Views2 = lists:map(
+ fun(#view{id_num=Num,btree=Btree}=View) ->
+ case dict:find(Num, ViewKeysToRemoveDict) of
+ {ok, RemoveKeys} ->
+ {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),
+ View#view{btree=Btree2};
+ error -> % no keys to remove in this view
+ View
+ end
+ end, Views),
+ Group#group{id_btree=IdBtree2,
+ views=Views2,
+ purge_seq=couch_db:get_purge_seq(Db)}.
+
+
+update_group(#group{db=Db,current_seq=CurrentSeq,
+ purge_seq=GroupPurgeSeq}=Group) ->
+ ViewEmptyKVs = [{View, []} || View <- Group#group.views],
% compute on all docs modified since we last computed.
- {ok, {UncomputedDocs, Group2, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}}
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ Group2 =
+ case DbPurgeSeq of
+ GroupPurgeSeq ->
+ Group;
+ DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq ->
+ purge(Group);
+ _ ->
+ throw(restart)
+ end,
+ {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}}
= couch_db:enum_docs_since(
Db,
CurrentSeq,
fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
- {[], Group, ViewEmptyKVs, [], CurrentSeq}
+ {[], Group2, ViewEmptyKVs, [], CurrentSeq}
),
- {Group3, Results} = view_compute(Group2, UncomputedDocs),
+ {Group4, Results} = view_compute(Group3, UncomputedDocs),
{ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
- couch_query_servers:stop_doc_map(Group3#group.query_server),
+ couch_query_servers:stop_doc_map(Group4#group.query_server),
if CurrentSeq /= NewSeq ->
- {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
- {ok, Group4#group{query_server=nil}};
+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
+ {ok, Group5#group{query_server=nil}};
true ->
- {ok, Group3#group{query_server=nil}}
+ {ok, Group4#group{query_server=nil}}
end.
delete_index_dir(RootDir, DbName) ->
@@ -523,10 +584,13 @@ delete_index_file(RootDir, DbName, GroupId) ->
file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
++ binary_to_list(GroupId) ++ ".view").
-init_group(Db, Fd, #group{views=Views}=Group, nil = _IndexHeaderData) ->
- init_group(Db, Fd, Group, {0, nil, [nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group,
- {Seq, IdBtreeState, ViewStates} = _IndexHeaderData) ->
+init_group(Db, Fd, #group{views=Views}=Group, nil) ->
+ init_group(Db, Fd, Group,
+ #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
+ id_btree_state=nil, view_states=[nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
+ #index_header{seq=Seq, purge_seq=PurgeSeq,
+ id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
{ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
Views2 = lists:zipwith(
fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
@@ -548,12 +612,17 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group,
View#view{btree=Btree}
end,
ViewStates, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, id_btree=IdBtree, views=Views2}.
+ Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+ id_btree=IdBtree, views=Views2}.
-get_index_header_data(#group{current_seq=Seq,id_btree=IdBtree,views=Views}) ->
+get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
+ id_btree=IdBtree,views=Views}) ->
ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
- {Seq, couch_btree:get_state(IdBtree), ViewStates}.
+ #index_header{seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree_state=couch_btree:get_state(IdBtree),
+ view_states=ViewStates}.
% keys come back in the language of btree - tuples.
less_json_keys(A, B) ->
@@ -575,7 +644,7 @@ type_sort(V) when is_integer(V) -> 1;
type_sort(V) when is_float(V) -> 1;
type_sort(V) when is_binary(V) -> 2;
type_sort(V) when is_list(V) -> 3;
-type_sort({V}) when is_list(V) -> 4; % must come before tuple test below
+type_sort({V}) when is_list(V) -> 4;
type_sort(V) when is_tuple(V) -> 5.
@@ -702,8 +771,8 @@ view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{Vie
[KV]
end, [], lists:sort(ResultKVs)),
NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
- NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
+ NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).