diff options
author | Damien F. Katz <damien@apache.org> | 2008-09-11 19:26:09 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2008-09-11 19:26:09 +0000 |
commit | 634b1b193acc24b95326c74e615d723043516f16 (patch) | |
tree | faedbfddee34a23083b34998572ceb1501a8747f /src/couchdb/couch_db_updater.erl | |
parent | 37ca97c918f4b5316e4293d8f1001bb87b8dfb0c (diff) |
Check-in of document purge functionality.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@694430 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 162 |
1 files changed, 95 insertions, 67 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index cc916961..a368ccac 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -58,14 +58,77 @@ handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db#db.name}), - {reply, {ok, Db2#db.update_seq}, Db2}. + {reply, {ok, Db2#db.update_seq}, Db2}; +handle_call({purge_docs, _IdRevs}, _From, + #db{compactor_pid=Pid}=Db) when Pid /= nil -> + {reply, {error, purge_during_compaction}, Db}; +handle_call({purge_docs, IdRevs}, _From, Db) -> + #db{ + fd=Fd, + fulldocinfo_by_id_btree = DocInfoByIdBTree, + docinfo_by_seq_btree = DocInfoBySeqBTree, + update_seq = LastSeq, + header = Header = #db_header{purge_seq=PurgeSeq} + } = Db, + DocLookups = couch_btree:lookup(DocInfoByIdBTree, + [Id || {Id, _Revs} <- IdRevs]), + + NewDocInfos = lists:zipwith( + fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> + case couch_key_tree:remove_leafs(Tree, Revs) of + {_, []=_RemovedRevs} -> % no change + nil; + {NewTree, RemovedRevs} -> + {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} + end; + (_, not_found) -> + nil + end, + IdRevs, DocLookups), + + SeqsToRemove = [Seq + || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], + + FullDocInfoToUpdate = [FullInfo + || {#full_doc_info{rev_tree=Tree}=FullInfo,_} + <- NewDocInfos, Tree /= []], + IdRevsPurged = [{Id, Revs} + || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], + + {DocInfoToUpdate, NewSeq} = lists:mapfoldl( + fun(FullInfo, SeqAcc) -> + Info = couch_doc:to_doc_info(FullInfo), + {Info#doc_info{update_seq=SeqAcc + 1}, SeqAcc + 1} + end, LastSeq, FullDocInfoToUpdate), + + IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=Tree},_} + <- NewDocInfos, Tree == []], + + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, + DocInfoToUpdate, SeqsToRemove), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, + FullDocInfoToUpdate, IdsToRemove), + {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged), + + Db2 = commit_data( + Db#db{ + fulldocinfo_by_id_btree = DocInfoByIdBTree2, + docinfo_by_seq_btree = DocInfoBySeqBTree2, + update_seq = NewSeq, + header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), + + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + couch_db_update_notifier:notify({updated, Db#db.name}), + {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}. + + handle_cast(start_compact, Db) -> case Db#db.compactor_pid of nil -> ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), - Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end), + Pid = spawn_link(fun() -> start_copy_compact_int(Db) end), Db2 = Db#db{compactor_pid=Pid}, ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), {noreply, Db2}; @@ -80,14 +143,16 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> init_db(Db#db.name, CompactFilepath, NewFd, NewHeader), case Db#db.update_seq == NewSeq of true -> - NewDb2 = commit_data( - NewDb#db{ - main_pid = Db#db.main_pid, - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count, - filepath = Filepath}), + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), + + NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree, + main_pid = Db#db.main_pid,filepath = Filepath}), - ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", + [Filepath, CompactFilepath]), file:delete(Filepath), ok = file:rename(CompactFilepath, Filepath), @@ -102,7 +167,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> ?LOG_INFO("Compaction file still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", [Db#db.update_seq, NewSeq]), - Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end), + Pid = spawn_link(fun() -> start_copy_compact_int(Db) end), Db2 = Db#db{compactor_pid=Pid}, couch_file:close(NewFd), {noreply, Db2} @@ -143,14 +208,14 @@ btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, btree_by_id_join(Id, {Seq, Deleted, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. - - btree_by_id_reduce(reduce, FullDocInfos) -> % count the number of not deleted documents - length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); + {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]), + length([1 || #full_doc_info{deleted=true} <- FullDocInfos])}; btree_by_id_reduce(rereduce, Reds) -> - lists:sum(Reds). + {lists:sum([Count || {Count,_} <- Reds]), + lists:sum([DelCount || {_, DelCount} <- Reds])}. btree_by_seq_reduce(reduce, DocInfos) -> % count the number of documents @@ -188,8 +253,6 @@ init_db(DbName, Filepath, Fd, Header) -> docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, update_seq = Header#db_header.update_seq, - doc_count = Header#db_header.doc_count, - doc_del_count = Header#db_header.doc_del_count, name = DbName, filepath=Filepath }. @@ -270,15 +333,11 @@ merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], [NewInfo|AccNewInfos],AccSeq+1) end. -new_index_entries([], DocCount, DelCount, AccById, AccBySeq) -> - {ok, DocCount, DelCount, AccById, AccBySeq}; -new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) -> +new_index_entries([], AccById, AccBySeq) -> + {ok, AccById, AccBySeq}; +new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), - {DocCount2, DelCount2} = - if Deleted -> {DocCount, DelCount + 1}; - true -> {DocCount + 1, DelCount} - end, - new_index_entries(RestInfos, DocCount2, DelCount2, + new_index_entries(RestInfos, [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], [DocInfo|AccBySeq]). @@ -286,9 +345,7 @@ update_docs_int(Db, DocsList, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, - update_seq = LastSeq, - doc_count = FullDocCount, - doc_del_count = FullDelCount + update_seq = LastSeq } = Db, % separate out the NonRep documents from the rest of the documents @@ -305,7 +362,7 @@ update_docs_int(Db, DocsList, Options) -> Ids = [Id || [#doc{id=Id}|_] <- DocsList2], - % lookup up the existing documents, if they exist. + % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( fun(_Id, {ok, FullDocInfo}) -> @@ -315,18 +372,6 @@ update_docs_int(Db, DocsList, Options) -> end, Ids, OldDocLookups), - {OldCount, OldDelCount} = lists:foldl( - fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{deleted=false} -> - {OldCountAcc + 1, OldDelCountAcc}; - _ -> - {OldCountAcc, OldDelCountAcc + 1} - end; - (not_found, Acc) -> - Acc - end, {0, 0}, OldDocLookups), - % Merge the new docs into the revision trees. NoConflicts = lists:member(new_edits, Options), {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), @@ -339,11 +384,10 @@ update_docs_int(Db, DocsList, Options) -> % Try to write the local documents first, a conflict might be generated {ok, Db2} = update_local_docs(Db, NonRepDocs), - % Write out the documents summaries (they are stored in the nodes of the rev trees) + % Write out the document summaries (they are stored in the nodes of the rev trees) {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), - {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = - new_index_entries(FlushedDocInfos, 0, 0, [], []), + {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []), % and the indexes to the documents {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), @@ -352,9 +396,7 @@ update_docs_int(Db, DocsList, Options) -> Db3 = Db2#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, - update_seq = NewSeq, - doc_count = FullDocCount + NewDocsCount - OldCount, - doc_del_count = FullDelCount + NewDelCount - OldDelCount}, + update_seq = NewSeq}, case lists:member(delay_commit, Options) of true -> @@ -406,9 +448,7 @@ commit_data(#db{fd=Fd, header=Header} = Db) -> summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), - local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count + local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree) }, if Header == Header2 -> Db; % unchanged. nothing to do @@ -476,7 +516,7 @@ copy_compact_docs(Db, NewDb) -> NewDb2 end. -start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> +start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db) -> CompactFile = Filepath ++ ".compact", ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of @@ -484,24 +524,12 @@ start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); {error, enoent} -> - receive after 1000 -> ok end, {ok, Fd} = couch_file:open(CompactFile, [create]), - Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header) + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), - NewDb2 = copy_compact_docs(Db, NewDb), - NewDb3 = - case CopyLocal of - true -> - % suck up all the local docs into memory and write them to the new db - {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), - commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); - _ -> - NewDb2 - end, - close_db(NewDb3), + NewDb2 = commit_data(copy_compact_docs(Db, NewDb)), + close_db(NewDb2), + + gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). - gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
\ No newline at end of file |