diff options
author | Damien F. Katz <damien@apache.org> | 2008-09-11 19:26:09 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2008-09-11 19:26:09 +0000 |
commit | 634b1b193acc24b95326c74e615d723043516f16 (patch) | |
tree | faedbfddee34a23083b34998572ceb1501a8747f /src | |
parent | 37ca97c918f4b5316e4293d8f1001bb87b8dfb0c (diff) |
Check-in of document purge functionality.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@694430 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_btree.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 32 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 162 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 15 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 37 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 109 |
7 files changed, 262 insertions, 103 deletions
diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl index a20e1a9f..30575090 100644 --- a/src/couchdb/couch_btree.erl +++ b/src/couchdb/couch_btree.erl @@ -298,8 +298,8 @@ reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) -> []; reduce_node(#btree{reduce=R}, kp_node, NodeList) -> R(rereduce, [Red || {_K, {_P, Red}} <- NodeList]); -reduce_node(#btree{reduce=R}, kv_node, NodeList) -> - R(reduce, NodeList). +reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) -> + R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]). get_node(#btree{fd = Fd}, NodePos) -> diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 4bcefdcd..823de72e 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -20,7 +20,7 @@ -export([get_missing_revs/2]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). --export([increment_update_seq/1]). +-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). @@ -148,18 +148,31 @@ get_full_doc_infos(Db, Ids) -> increment_update_seq(#db{update_pid=UpdatePid}) -> gen_server:call(UpdatePid, increment_update_seq). - + +purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> + gen_server:call(UpdatePid, {purge_docs, IdsRevs}). + + +get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> + PurgeSeq. + +get_last_purged(#db{header=#db_header{purged_docs=nil}}) -> + {ok, []}; +get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> + couch_file:pread_term(Fd, PurgedPointer). + get_db_info(Db) -> #db{fd=Fd, compactor_pid=Compactor, - doc_count=Count, - doc_del_count=DelCount, - update_seq=SeqNum} = Db, + update_seq=SeqNum, + fulldocinfo_by_id_btree=FullDocBtree} = Db, {ok, Size} = couch_file:bytes(Fd), + {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree), InfoList = [ {doc_count, Count}, {doc_del_count, DelCount}, {update_seq, SeqNum}, + {purge_seq, couch_db:get_purge_seq(Db)}, {compact_running, Compactor/=nil}, {disk_size, Size} ], @@ -263,7 +276,7 @@ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> Db2 = open_ref_counted(Db#db.main_pid, self()), DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], % We only retry once - ok = close(Db2), + close(Db2), case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of ok -> {ok, NewRevs}; Else -> throw(Else) @@ -335,10 +348,13 @@ doc_flush_binaries(Doc, Fd) -> Doc#doc{attachments = NewBins}. enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds). + couch_btree:final_reduce( + fun couch_db_updater:btree_by_seq_reduce/2, Reds). enum_docs_reduce_to_count(Reds) -> - couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds). + {Count, _DelCount} = couch_btree:final_reduce( + fun couch_db_updater:btree_by_id_reduce/2, Reds), + Count. enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index fa604108..f4533146 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -84,8 +84,8 @@ fulldocinfo_by_id_btree_state = nil, docinfo_by_seq_btree_state = nil, local_docs_btree_state = nil, - doc_count=0, - doc_del_count=0 + purge_seq = 0, + purged_docs = nil }). -record(db, @@ -99,8 +99,6 @@ docinfo_by_seq_btree, local_docs_btree, update_seq, - doc_count, - doc_del_count, name, filepath }). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index cc916961..a368ccac 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -58,14 +58,77 @@ handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db#db.name}), - {reply, {ok, Db2#db.update_seq}, Db2}. + {reply, {ok, Db2#db.update_seq}, Db2}; +handle_call({purge_docs, _IdRevs}, _From, + #db{compactor_pid=Pid}=Db) when Pid /= nil -> + {reply, {error, purge_during_compaction}, Db}; +handle_call({purge_docs, IdRevs}, _From, Db) -> + #db{ + fd=Fd, + fulldocinfo_by_id_btree = DocInfoByIdBTree, + docinfo_by_seq_btree = DocInfoBySeqBTree, + update_seq = LastSeq, + header = Header = #db_header{purge_seq=PurgeSeq} + } = Db, + DocLookups = couch_btree:lookup(DocInfoByIdBTree, + [Id || {Id, _Revs} <- IdRevs]), + + NewDocInfos = lists:zipwith( + fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> + case couch_key_tree:remove_leafs(Tree, Revs) of + {_, []=_RemovedRevs} -> % no change + nil; + {NewTree, RemovedRevs} -> + {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} + end; + (_, not_found) -> + nil + end, + IdRevs, DocLookups), + + SeqsToRemove = [Seq + || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], + + FullDocInfoToUpdate = [FullInfo + || {#full_doc_info{rev_tree=Tree}=FullInfo,_} + <- NewDocInfos, Tree /= []], + IdRevsPurged = [{Id, Revs} + || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], + + {DocInfoToUpdate, NewSeq} = lists:mapfoldl( + fun(FullInfo, SeqAcc) -> + Info = couch_doc:to_doc_info(FullInfo), + {Info#doc_info{update_seq=SeqAcc + 1}, SeqAcc + 1} + end, LastSeq, FullDocInfoToUpdate), + + IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=Tree},_} + <- NewDocInfos, Tree == []], + + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, + DocInfoToUpdate, SeqsToRemove), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, + FullDocInfoToUpdate, IdsToRemove), + {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged), + + Db2 = commit_data( + Db#db{ + fulldocinfo_by_id_btree = DocInfoByIdBTree2, + docinfo_by_seq_btree = DocInfoBySeqBTree2, + update_seq = NewSeq, + header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), + + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + couch_db_update_notifier:notify({updated, Db#db.name}), + {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}. + + handle_cast(start_compact, Db) -> case Db#db.compactor_pid of nil -> ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), - Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end), + Pid = spawn_link(fun() -> start_copy_compact_int(Db) end), Db2 = Db#db{compactor_pid=Pid}, ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), {noreply, Db2}; @@ -80,14 +143,16 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> init_db(Db#db.name, CompactFilepath, NewFd, NewHeader), case Db#db.update_seq == NewSeq of true -> - NewDb2 = commit_data( - NewDb#db{ - main_pid = Db#db.main_pid, - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count, - filepath = Filepath}), + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), + + NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree, + main_pid = Db#db.main_pid,filepath = Filepath}), - ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", + [Filepath, CompactFilepath]), file:delete(Filepath), ok = file:rename(CompactFilepath, Filepath), @@ -102,7 +167,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> ?LOG_INFO("Compaction file still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", [Db#db.update_seq, NewSeq]), - Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end), + Pid = spawn_link(fun() -> start_copy_compact_int(Db) end), Db2 = Db#db{compactor_pid=Pid}, couch_file:close(NewFd), {noreply, Db2} @@ -143,14 +208,14 @@ btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, btree_by_id_join(Id, {Seq, Deleted, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. - - btree_by_id_reduce(reduce, FullDocInfos) -> % count the number of not deleted documents - length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); + {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]), + length([1 || #full_doc_info{deleted=true} <- FullDocInfos])}; btree_by_id_reduce(rereduce, Reds) -> - lists:sum(Reds). + {lists:sum([Count || {Count,_} <- Reds]), + lists:sum([DelCount || {_, DelCount} <- Reds])}. btree_by_seq_reduce(reduce, DocInfos) -> % count the number of documents @@ -188,8 +253,6 @@ init_db(DbName, Filepath, Fd, Header) -> docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, update_seq = Header#db_header.update_seq, - doc_count = Header#db_header.doc_count, - doc_del_count = Header#db_header.doc_del_count, name = DbName, filepath=Filepath }. @@ -270,15 +333,11 @@ merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], [NewInfo|AccNewInfos],AccSeq+1) end. -new_index_entries([], DocCount, DelCount, AccById, AccBySeq) -> - {ok, DocCount, DelCount, AccById, AccBySeq}; -new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) -> +new_index_entries([], AccById, AccBySeq) -> + {ok, AccById, AccBySeq}; +new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), - {DocCount2, DelCount2} = - if Deleted -> {DocCount, DelCount + 1}; - true -> {DocCount + 1, DelCount} - end, - new_index_entries(RestInfos, DocCount2, DelCount2, + new_index_entries(RestInfos, [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], [DocInfo|AccBySeq]). @@ -286,9 +345,7 @@ update_docs_int(Db, DocsList, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, - update_seq = LastSeq, - doc_count = FullDocCount, - doc_del_count = FullDelCount + update_seq = LastSeq } = Db, % separate out the NonRep documents from the rest of the documents @@ -305,7 +362,7 @@ update_docs_int(Db, DocsList, Options) -> Ids = [Id || [#doc{id=Id}|_] <- DocsList2], - % lookup up the existing documents, if they exist. + % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( fun(_Id, {ok, FullDocInfo}) -> @@ -315,18 +372,6 @@ update_docs_int(Db, DocsList, Options) -> end, Ids, OldDocLookups), - {OldCount, OldDelCount} = lists:foldl( - fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{deleted=false} -> - {OldCountAcc + 1, OldDelCountAcc}; - _ -> - {OldCountAcc, OldDelCountAcc + 1} - end; - (not_found, Acc) -> - Acc - end, {0, 0}, OldDocLookups), - % Merge the new docs into the revision trees. NoConflicts = lists:member(new_edits, Options), {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), @@ -339,11 +384,10 @@ update_docs_int(Db, DocsList, Options) -> % Try to write the local documents first, a conflict might be generated {ok, Db2} = update_local_docs(Db, NonRepDocs), - % Write out the documents summaries (they are stored in the nodes of the rev trees) + % Write out the document summaries (they are stored in the nodes of the rev trees) {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), - {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = - new_index_entries(FlushedDocInfos, 0, 0, [], []), + {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []), % and the indexes to the documents {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), @@ -352,9 +396,7 @@ update_docs_int(Db, DocsList, Options) -> Db3 = Db2#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, - update_seq = NewSeq, - doc_count = FullDocCount + NewDocsCount - OldCount, - doc_del_count = FullDelCount + NewDelCount - OldDelCount}, + update_seq = NewSeq}, case lists:member(delay_commit, Options) of true -> @@ -406,9 +448,7 @@ commit_data(#db{fd=Fd, header=Header} = Db) -> summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), - local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count + local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree) }, if Header == Header2 -> Db; % unchanged. nothing to do @@ -476,7 +516,7 @@ copy_compact_docs(Db, NewDb) -> NewDb2 end. -start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> +start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db) -> CompactFile = Filepath ++ ".compact", ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of @@ -484,24 +524,12 @@ start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); {error, enoent} -> - receive after 1000 -> ok end, {ok, Fd} = couch_file:open(CompactFile, [create]), - Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header) + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), - NewDb2 = copy_compact_docs(Db, NewDb), - NewDb3 = - case CopyLocal of - true -> - % suck up all the local docs into memory and write them to the new db - {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), - commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); - _ -> - NewDb2 - end, - close_db(NewDb3), + NewDb2 = commit_data(copy_compact_docs(Db, NewDb)), + close_db(NewDb2), + + gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). - gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
\ No newline at end of file diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index bdc172a3..3857f40d 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -310,6 +310,21 @@ handle_db_request(Req, 'POST', {_DbName, Db, [<<"_compact">>]}) -> handle_db_request(_Req, _Method, {_DbName, _Db, [<<"_compact">>]}) -> throw({method_not_allowed, "POST"}); +handle_db_request(Req, 'POST', {_DbName, Db, [<<"_purge">>]}) -> + {IdsRevs} = ?JSON_DECODE(Req:recv_body(?MAX_DOC_SIZE)), + % validate the json input + [{_Id, [_|_]=_Revs} = IdRevs || IdRevs <- IdsRevs], + + case couch_db:purge_docs(Db, IdsRevs) of + {ok, PurgeSeq, PurgedIdsRevs} -> + send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs}}]}); + Error -> + throw(Error) + end; + +handle_db_request(_Req, _Method, {_DbName, _Db, [<<"_purge">>]}) -> + throw({method_not_allowed, "POST"}); + % View request handlers handle_db_request(Req, 'GET', {_DbName, Db, [<<"_all_docs">>]}) -> diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl index 5bb80be1..3a05fd4d 100644 --- a/src/couchdb/couch_key_tree.erl +++ b/src/couchdb/couch_key_tree.erl @@ -13,7 +13,7 @@ -module(couch_key_tree). -export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). --export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]). +-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1, remove_leafs/2]). % a key tree looks like this: % Tree -> [] or [{Key, Value, ChildTree} | SiblingTree] @@ -53,7 +53,40 @@ find_missing([{Key, _, SubTree} | RestTree], Keys) -> SrcKeys2 = Keys -- [Key], SrcKeys3 = find_missing(SubTree, SrcKeys2), find_missing(RestTree, SrcKeys3). + + +get_all_key_paths_rev([], KeyPathAcc) -> + KeyPathAcc; +get_all_key_paths_rev([{Key, Value, SubTree} | RestTree], KeyPathAcc) -> + get_all_key_paths_rev(SubTree, [{Key, Value} | KeyPathAcc]) ++ + get_all_key_paths_rev(RestTree, KeyPathAcc). + + +% Removes any branches from the tree whose leaf node(s) are in the Keys +remove_leafs(Tree, Keys) -> + % flatten each branch in a tree into a tree path + Paths = get_all_key_paths_rev(Tree, []), + % filter out any that are in the keys list. + {FoundKeys, FilteredPaths} = lists:mapfoldl( + fun(Key, PathsAcc) -> + case [Path || [{LeafKey,_}|_]=Path <- PathsAcc, LeafKey /= Key] of + PathsAcc -> + {nil, PathsAcc}; + PathsAcc2 -> + {Key, PathsAcc2} + end + end, Paths, Keys), + + % convert paths back to trees + NewTree = lists:foldl( + fun(Path,TreeAcc) -> + SingleTree = lists:foldl( + fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), + merge(TreeAcc, SingleTree) + end, [], FilteredPaths), + {NewTree, FoundKeys}. + % get the leafs in the tree matching the keys. The matching key nodes can be % leafs or an inner nodes. If an inner node, then the leafs for that node @@ -83,7 +116,7 @@ get(Tree, KeysToGet) -> {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet), FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths], {FixedResults, KeysNotFound}. - + get_full_key_paths(Tree, Keys) -> get_full_key_paths(Tree, Keys, []). diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 5b3105f1..9414e4fa 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -28,6 +28,7 @@ views, id_btree=nil, current_seq=0, + purge_seq=0, query_server=nil }). @@ -43,6 +44,14 @@ {root_dir }). +-record(index_header, + {seq=0, + purge_seq=0, + id_btree_state=nil, + view_states=nil + }). + + start_link() -> gen_server:start_link({local, couch_view}, couch_view, [], []). @@ -412,9 +421,17 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> {ok, Fd} -> Sig = Group#group.sig, case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of - {ok, {Sig, HeaderInfo}} -> + {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} -> % sigs match! - init_group(Db, Fd, Group, HeaderInfo); + DbPurgeSeq = couch_db:get_purge_seq(Db), + case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of + true -> + % We can only use index with the same, or next purge seq as the + % db. + init_group(Db, Fd, Group, HeaderInfo); + false -> + reset_file(Db, Fd, DbName, Group) + end; _ -> reset_file(Db, Fd, DbName, Group) end; @@ -424,6 +441,7 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> Error -> throw(Error) end end, + couch_db:monitor(Db), couch_db:close(Db), update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). @@ -479,24 +497,67 @@ get_notify_pids() -> [] end. -update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) -> - ViewEmptyKVs = [{View, []} || View <- Views], +purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> + {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), + Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], + {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), + + % now populate the dictionary with all the keys to delete + ViewKeysToRemoveDict = lists:foldl( + fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) -> + lists:foldl( + fun({ViewNum, RowKey}, ViewDictAcc2) -> + dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2) + end, ViewDictAcc, ViewNumRowKeys); + ({not_found, _}, ViewDictAcc) -> + ViewDictAcc + end, dict:new(), Lookups), + + % Now remove the values from the btrees + Views2 = lists:map( + fun(#view{id_num=Num,btree=Btree}=View) -> + case dict:find(Num, ViewKeysToRemoveDict) of + {ok, RemoveKeys} -> + {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), + View#view{btree=Btree2}; + error -> % no keys to remove in this view + View + end + end, Views), + Group#group{id_btree=IdBtree2, + views=Views2, + purge_seq=couch_db:get_purge_seq(Db)}. + + +update_group(#group{db=Db,current_seq=CurrentSeq, + purge_seq=GroupPurgeSeq}=Group) -> + ViewEmptyKVs = [{View, []} || View <- Group#group.views], % compute on all docs modified since we last computed. - {ok, {UncomputedDocs, Group2, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}} + DbPurgeSeq = couch_db:get_purge_seq(Db), + Group2 = + case DbPurgeSeq of + GroupPurgeSeq -> + Group; + DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq -> + purge(Group); + _ -> + throw(restart) + end, + {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}} = couch_db:enum_docs_since( Db, CurrentSeq, fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, - {[], Group, ViewEmptyKVs, [], CurrentSeq} + {[], Group2, ViewEmptyKVs, [], CurrentSeq} ), - {Group3, Results} = view_compute(Group2, UncomputedDocs), + {Group4, Results} = view_compute(Group3, UncomputedDocs), {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), - couch_query_servers:stop_doc_map(Group3#group.query_server), + couch_query_servers:stop_doc_map(Group4#group.query_server), if CurrentSeq /= NewSeq -> - {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), - {ok, Group4#group{query_server=nil}}; + {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), + {ok, Group5#group{query_server=nil}}; true -> - {ok, Group3#group{query_server=nil}} + {ok, Group4#group{query_server=nil}} end. delete_index_dir(RootDir, DbName) -> @@ -523,10 +584,13 @@ delete_index_file(RootDir, DbName, GroupId) -> file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ binary_to_list(GroupId) ++ ".view"). -init_group(Db, Fd, #group{views=Views}=Group, nil = _IndexHeaderData) -> - init_group(Db, Fd, Group, {0, nil, [nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, - {Seq, IdBtreeState, ViewStates} = _IndexHeaderData) -> +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[nil || _ <- Views]}); +init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> + #index_header{seq=Seq, purge_seq=PurgeSeq, + id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), Views2 = lists:zipwith( fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> @@ -548,12 +612,17 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, View#view{btree=Btree} end, ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, id_btree=IdBtree, views=Views2}. + Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree, views=Views2}. -get_index_header_data(#group{current_seq=Seq,id_btree=IdBtree,views=Views}) -> +get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree,views=Views}) -> ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], - {Seq, couch_btree:get_state(IdBtree), ViewStates}. + #index_header{seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=couch_btree:get_state(IdBtree), + view_states=ViewStates}. % keys come back in the language of btree - tuples. less_json_keys(A, B) -> @@ -575,7 +644,7 @@ type_sort(V) when is_integer(V) -> 1; type_sort(V) when is_float(V) -> 1; type_sort(V) when is_binary(V) -> 2; type_sort(V) when is_list(V) -> 3; -type_sort({V}) when is_list(V) -> 4; % must come before tuple test below +type_sort({V}) when is_list(V) -> 4; type_sort(V) when is_tuple(V) -> 5. @@ -702,8 +771,8 @@ view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{Vie [KV] end, [], lists:sort(ResultKVs)), NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2], - NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], + NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2], NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). |