diff options
author | Damien F. Katz <damien@apache.org> | 2009-05-06 19:11:10 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-05-06 19:11:10 +0000 |
commit | 4b5e0a20aa087dd26df644c0432627aa3e5826d4 (patch) | |
tree | 1494b164fdef4004bff44aa39edbc2f1bf60d8f3 /src | |
parent | 887c9b1a8b551272c3ca06906cfdc4fb901830a8 (diff) |
First cut at _changes api. Update the by_id and by_seq indexes to contain update seq numbers and pointers to bodies on disk, for use in the _changes api. This is a new file version, but the code can continue to serve the old 0.9 version without problems, though certain features in the _changes api will not be able to work. Upgrade to new file version (from 1 to 2) by compacting the file. Also fixed bugs with how the stats api tracks open databases.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@772406 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_db.erl | 93 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 22 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 122 | ||||
-rw-r--r-- | src/couchdb/couch_doc.erl | 50 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 20 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 105 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 17 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 28 | ||||
-rw-r--r-- | src/couchdb/couch_server.erl | 7 | ||||
-rw-r--r-- | src/couchdb/couch_stats_collector.erl | 15 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 11 |
11 files changed, 313 insertions, 177 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 4cd5a5fd..e065ff3b 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,13 +17,14 @@ -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([set_revs_limit/2,get_revs_limit/1]). +-export([set_revs_limit/2,get_revs_limit/1,register_update_notifier/3]). -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]). +-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). +-export([changes_since/5]). -include("couch_db.hrl"). @@ -81,6 +82,9 @@ is_idle(MainPid) -> monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). +register_update_notifier(#db{main_pid=Pid}, Seq, Fun) -> + gen_server:call(Pid, {register_update_notifier, Seq, Fun}). + start_compact(#db{update_pid=Pid}) -> gen_server:cast(Pid, start_compact). @@ -166,6 +170,7 @@ get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> get_db_info(Db) -> #db{fd=Fd, + header=#db_header{disk_version=DiskVersion}, compactor_pid=Compactor, update_seq=SeqNum, name=Name, @@ -181,7 +186,8 @@ get_db_info(Db) -> {purge_seq, couch_db:get_purge_seq(Db)}, {compact_running, Compactor/=nil}, {disk_size, Size}, - {instance_start_time, StartTime} + {instance_start_time, StartTime}, + {disk_format_version, DiskVersion} ], {ok, InfoList}. @@ -337,7 +343,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || - {{Deleted, Sp}, {Start, [RevId|_]}=Revs} <- Leafs]), + {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]), {PreppedBucket, AccErrors3} = lists:foldl( fun(Doc, {Docs2Acc, AccErrors2}) -> case prep_and_validate_update(Db, Doc, OldFullDocInfo, @@ -398,7 +404,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case PrevRevFull of {_RevId, ?REV_MISSING} -> conflict; - {RevId, {IsDel, DiskSp}} -> + {RevId, {IsDel, DiskSp, _Seq}} -> DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath), Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), {ok, Doc2, fun() -> DiskDoc end} @@ -522,7 +528,7 @@ make_first_doc_on_disk(_Db, _Id, _Pos, []) -> nil; make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) -> make_first_doc_on_disk(Db, Id, Pos - 1, RestPath); -make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp}} |_]=DocPath) -> +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) -> Revs = [Rev || {Rev, _} <- DocPath], make_doc(Db, Id, IsDel, Sp, {Pos, Revs}). @@ -653,7 +659,22 @@ enum_docs_reduce_to_count(Reds) -> {Count, _DelCount} = couch_btree:final_reduce( fun couch_db_updater:btree_by_id_reduce/2, Reds), Count. - + +changes_since(Db, Style, StartSeq, Fun, Acc) -> + enum_docs_since(Db, StartSeq, fwd, + fun(DocInfo, _Offset, Acc2) -> + #doc_info{revs=Revs} = DocInfo, + case Style of + main_only -> + Infos = [DocInfo]; + all_docs -> + % make each rev it's own doc info + Infos = [DocInfo#doc_info{revs=[RevInfo]} || + #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq] + end, + Fun(Infos, Acc2) + end, Acc). + count_changes_since(Db, SinceSeq) -> {ok, Changes} = couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, @@ -665,9 +686,9 @@ count_changes_since(Db, SinceSeq) -> end, 0), Changes. - -enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> - couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). + +enum_docs_since(Db, SinceSeq, Direction, InFun, Acc) -> + couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Acc). enum_docs_since(Db, SinceSeq, InFun, Acc) -> enum_docs_since(Db, SinceSeq, fwd, InFun, Acc). @@ -684,6 +705,7 @@ init({DbName, Filepath, Fd, Options}) -> {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), couch_ref_counter:add(RefCntr), + couch_stats_collector:track_process_count({couchdb, open_databases}), {ok, Db}. terminate(Reason, _Db) -> @@ -698,8 +720,8 @@ handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact, % Idle means no referrers. Unless in the middle of a compaction file switch, % there are always at least 2 referrers, couch_db_updater and us. {reply, (Delay == nil) and (Compact == nil) and (couch_ref_counter:count(RefCntr) == 2), Db}; -handle_call({db_updated, #db{fd_ref_counter=NewRefCntr}=NewDb}, _From, - #db{fd_ref_counter=OldRefCntr}) -> +handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> + #db{fd_ref_counter=NewRefCntr}=NewDb, case NewRefCntr == OldRefCntr of true -> ok; false -> @@ -747,7 +769,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> ?REV_MISSING -> % we have the rev in our list but know nothing about it {{not_found, missing}, {Pos, Rev}}; - {IsDeleted, SummaryPtr} -> + {IsDeleted, SummaryPtr, _UpdateSeq} -> {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} end end, FoundRevs), @@ -768,14 +790,15 @@ open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) -> [not_found] -> {not_found, missing} end; -open_doc_int(Db, #doc_info{id=Id,rev={Pos,RevId},deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> - Doc = make_doc(Db, Id, IsDeleted, Sp, {Pos,[RevId]}), +open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) -> + #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo, + Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}), {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> - #doc_info{deleted=IsDeleted,rev=Rev,summary_pointer=Sp} = DocInfo = - couch_doc:to_doc_info(FullDocInfo), + #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} = + DocInfo = couch_doc:to_doc_info(FullDocInfo), {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), - Doc = make_doc(Db, Id, IsDeleted, Sp, RevPath), + Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath), {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of @@ -785,36 +808,36 @@ open_doc_int(Db, Id, Options) -> {not_found, missing} end. -doc_meta_info(DocInfo, RevTree, Options) -> +doc_meta_info(#doc_info{revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) -> case lists:member(revs_info, Options) of false -> []; true -> {[{Pos, RevPath}],[]} = - couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]), + couch_key_tree:get_full_key_paths(RevTree, [Rev]), [{revs_info, Pos, lists:map( - fun({Rev, {true, _Sp}}) -> - {Rev, deleted}; - ({Rev, {false, _Sp}}) -> - {Rev, available}; - ({Rev, ?REV_MISSING}) -> - {Rev, missing} + fun({Rev1, {true, _Sp, _UpdateSeq}}) -> + {Rev1, deleted}; + ({Rev1, {false, _Sp, _UpdateSeq}}) -> + {Rev1, available}; + ({Rev1, ?REV_MISSING}) -> + {Rev1, missing} end, RevPath)}] end ++ case lists:member(conflicts, Options) of false -> []; true -> - case DocInfo#doc_info.conflict_revs of + case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of [] -> []; - _ -> [{conflicts, DocInfo#doc_info.conflict_revs}] + ConflictRevs -> [{conflicts, ConflictRevs}] end end ++ case lists:member(deleted_conflicts, Options) of false -> []; true -> - case DocInfo#doc_info.deleted_conflict_revs of + case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of [] -> []; - _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}] + DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}] end end. @@ -829,19 +852,15 @@ doc_to_tree_simple(Doc, [RevId]) -> doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(Db, FullDocInfo) -> - {#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath} - = couch_doc:to_doc_info_path(FullDocInfo), - make_doc(Db, Id, Deleted, Sp, RevPath). -make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) -> +make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> {BodyData, BinValues} = - case BodySp of + case Bp of nil -> {[], []}; _ -> {ok, {BodyData0, BinValues0}} = - couch_stream:read_term( Db#db.summary_stream, BodySp), + couch_stream:read_term( Db#db.summary_stream, Bp), {BodyData0, [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} end, diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index b23fd18d..c00792a8 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -37,15 +37,20 @@ -define(LOG_ERROR(Format, Args), error_logger:error_report(couch_error, {Format, Args})). + +-record(rev_info, + { + rev, + seq = 0, + deleted = false, + body_sp = nil % stream pointer + }). + -record(doc_info, { id = <<"">>, - rev = <<"">>, - update_seq = 0, - summary_pointer = nil, - conflict_revs = [], - deleted_conflict_revs = [], - deleted = false + high_seq = 0, + revs = [] % rev_info }). -record(full_doc_info, @@ -98,13 +103,14 @@ % than filling in new defaults. % % As long the changes are limited to new header fields (with inline -% defaults) added to the end of the file, then there is no need to increment +% defaults) added to the end of the record, then there is no need to increment % the disk revision number. % % if the disk revision is incremented, then new upgrade logic will need to be % added to couch_db_updater:init_db. --define(LATEST_DISK_VERSION, 1). +-define(DISK_VERSION_0_9, 1). +-define(LATEST_DISK_VERSION, 2). -record(db_header, {disk_version = ?LATEST_DISK_VERSION, diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 1267ffa4..b1cb9037 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -115,13 +115,16 @@ handle_call({purge_docs, IdRevs}, _From, Db) -> || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], {DocInfoToUpdate, NewSeq} = lists:mapfoldl( - fun(FullInfo, SeqAcc) -> - Info = couch_doc:to_doc_info(FullInfo), - {Info#doc_info{update_seq=SeqAcc + 1}, SeqAcc + 1} + fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> + Tree2 = couch_key_tree:map_leafs( fun(RevInfo) -> + RevInfo#rev_info{seq=SeqAcc + 1} + end, Tree), + {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}), + SeqAcc + 1} end, LastSeq, FullDocInfoToUpdate), - IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=Tree},_} - <- NewDocInfos, Tree == []], + IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_} + <- NewDocInfos], {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, DocInfoToUpdate, SeqsToRemove), @@ -194,33 +197,61 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -btree_by_seq_split(DocInfo) -> - #doc_info{ - id = Id, - rev = Rev, - update_seq = Seq, - summary_pointer = Sp, - conflict_revs = Conflicts, - deleted_conflict_revs = DelConflicts, - deleted = Deleted} = DocInfo, - {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. +btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) -> + RevInfos = [{Rev, Seq, Bp} || + #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs], + DeletedRevInfos = [{Rev, Seq, Bp} || + #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs], + {KeySeq,{Id, RevInfos, DeletedRevInfos}}. -btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> +btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> #doc_info{ id = Id, - rev = Rev, - update_seq = Seq, - summary_pointer = Sp, - conflict_revs = Conflicts, - deleted_conflict_revs = DelConflicts, - deleted = Deleted}. + high_seq=KeySeq, + revs = + [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} || + {Rev, Seq, Bp} <- RevInfos] ++ + [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || + {Rev, Seq, Bp} <- DeletedRevInfos]}; +btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> + % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers + % and individual seq nums for conflicts that are currently in the index, + % meaning the filtered _changes api will not work except for on main docs. + % Simply compact a 0.9.0 database to upgrade the index. + #doc_info{ + id=Id, + high_seq=KeySeq, + revs = [#rev_info{rev=Rev,seq=KeySeq,deleted=Deleted,body_sp=Bp}] ++ + [#rev_info{rev=Rev1,seq=KeySeq,deleted=false} || Rev1 <- Conflicts] ++ + [#rev_info{rev=Rev2,seq=KeySeq,deleted=true} || Rev2 <- DelConflicts]}. btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Deleted, rev_tree=Tree}) -> - {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}. - -btree_by_id_join(Id, {Seq, Deleted, Tree}) -> - #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. + DiskTree = + couch_key_tree:map( + fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> + {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING + end, Tree), + {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}. + +btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> + Tree = + couch_key_tree:map( + fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> + {IsDeleted == 1, BodyPointer, UpdateSeq}; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING; + (_RevId, {IsDeleted, BodyPointer}) -> + % this is the 0.9.0 and earlier rev info record. It's missing the seq + % nums, which means couchdb will sometimes reexamine unchanged + % documents with the _changes API. + % This is fixed by compacting the database. + {IsDeleted, BodyPointer, HighSeq} + end, DiskTree), + + #full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}. btree_by_id_reduce(reduce, FullDocInfos) -> % count the number of not deleted documents @@ -251,6 +282,7 @@ less_docid(A, B) -> A < B. init_db(DbName, Filepath, Fd, Header0) -> case element(2, Header0) of + ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly ?LATEST_DISK_VERSION -> ok; _ -> throw({database_disk_version_error, "Incorrect disk header version"}) end, @@ -320,7 +352,8 @@ refresh_validate_doc_funs(Db) -> get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> couch_btree:foldl(Btree, <<"_design/">>, fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> - {ok, [couch_db:make_doc(Db, FullDocInfo) | AccDocs]}; + {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), + {ok, [Doc | AccDocs]}; (_, _Reds, AccDocs) -> {stop, AccDocs} end, @@ -331,8 +364,8 @@ get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> - #full_doc_info{rev_tree=Unflushed} = InfoUnflushed, - Flushed = couch_key_tree:map( + #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, + Flushed = couch_key_tree:map( fun(_Rev, Value) -> case Value of #doc{attachments=Atts,deleted=IsDeleted}=Doc -> @@ -355,7 +388,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> throw(retry) end, {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), - {IsDeleted, NewSummaryPointer}; + {IsDeleted, NewSummaryPointer, UpdateSeq}; _ -> Value end @@ -394,10 +427,13 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1) end. + + new_index_entries([], AccById, AccBySeq) -> - {ok, AccById, AccBySeq}; + {AccById, AccBySeq}; new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> - #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), + #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo = + couch_doc:to_doc_info(FullDocInfo), new_index_entries(RestInfos, [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], [DocInfo|AccBySeq]). @@ -436,13 +472,13 @@ update_docs_int(Db, DocsList, Options) -> #full_doc_info{id=Id} end, Ids, OldDocLookups), - + % Merge the new docs into the revision trees. {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( lists:member(merge_conflicts, Options), DocsList2, OldDocInfos, [], [], [], LastSeq), - NewDocInfos = stem_full_doc_infos(Db, NewDocInfos0), + NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), % All documents are now ready to write. @@ -450,13 +486,14 @@ update_docs_int(Db, DocsList, Options) -> % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) - {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), + {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), - {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []), + {IndexFullDocInfos, IndexDocInfos} = + new_index_entries(FlushedFullDocInfos, [], []), % and the indexes - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs), Db3 = Db2#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, @@ -474,7 +511,8 @@ update_docs_int(Db, DocsList, Options) -> {ok, LocalConflicts ++ Conflicts, commit_data(Db4, not lists:member(full_commit, Options))}. - + + update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), @@ -558,10 +596,10 @@ copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) -> % root nner node, only copy info/data from leaf nodes [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]), [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> % This is a leaf node, copy it over NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; + [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> % inner node, only copy info/data from leaf nodes [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. @@ -598,7 +636,7 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info copy_compact(Db, NewDb, Retry) -> TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = - fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> + fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> couch_task_status:update("Copied ~p of ~p changes (~p%)", [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]), if TotalCopied rem 1000 == 0 -> diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 307f7372..4d2affa4 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -226,34 +226,28 @@ to_doc_info(FullDocInfo) -> {DocInfo, _Path} = to_doc_info_path(FullDocInfo), DocInfo. -to_doc_info_path(#full_doc_info{id=Id,update_seq=Seq,rev_tree=Tree}) -> - LeafRevs = couch_key_tree:get_all_leafs(Tree), - SortedLeafRevs = - lists:sort(fun({{IsDeletedA, _}, {StartA, [RevIdA|_]}}, {{IsDeletedB, _}, {StartB, [RevIdB|_]}}) -> - % sort descending by {not deleted, then Depth, then RevisionId} - A = {not IsDeletedA, StartA, RevIdA}, - B = {not IsDeletedB, StartB, RevIdB}, - A > B - end, - LeafRevs), - - [{{IsDeleted, SummaryPointer}, {Start, [RevId|_]}=Path} | Rest] = SortedLeafRevs, - {ConflictRevTuples, DeletedConflictRevTuples} = - lists:splitwith(fun({{IsDeleted1, _Sp}, _}) -> - not IsDeleted1 - end, Rest), - - ConflictRevs = [{Start1, RevId1} || {_, {Start1, [RevId1|_]}} <- ConflictRevTuples], - DeletedConflictRevs = [{Start1, RevId1} || {_, {Start1, [RevId1|_]}} <- DeletedConflictRevTuples], - DocInfo = #doc_info{ - id=Id, - update_seq=Seq, - rev = {Start, RevId}, - summary_pointer = SummaryPointer, - conflict_revs = ConflictRevs, - deleted_conflict_revs = DeletedConflictRevs, - deleted = IsDeleted}, - {DocInfo, Path}. +max_seq([], Max) -> + Max; +max_seq([#rev_info{seq=Seq}|Rest], Max) -> + max_seq(Rest, if Max > Seq -> Max; true -> Seq end). + +to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> + RevInfosAndPath = + [{#rev_info{deleted=Del,body_sp=Bp,seq=Seq,rev={Pos,RevId}}, Path} || + {{Del, Bp, Seq},{Pos, [RevId|_]}=Path} <- + couch_key_tree:get_all_leafs(Tree)], + SortedRevInfosAndPath = lists:sort( + fun({#rev_info{deleted=DeletedA,rev=RevA}, _PathA}, + {#rev_info{deleted=DeletedB,rev=RevB}, _PathB}) -> + % sort descending by {not deleted, rev} + {not DeletedA, RevA} > {not DeletedB, RevB} + end, RevInfosAndPath), + [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath, + RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath], + {#doc_info{id=Id, high_seq=max_seq(RevInfos, 0), revs=RevInfos}, WinPath}. + + + bin_foldl(Bin, Fun, Acc) when is_binary(Bin) -> case Fun(Bin, Acc) of diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index b9da5488..430aa6b7 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -284,14 +284,16 @@ init({Filepath, Options, ReturnPid, Ref}) -> true -> {ok, 0} = file:position(Fd, 0), ok = file:truncate(Fd), - track_stats(), + couch_stats_collector:track_process_count( + {couchdb, open_os_files}), {ok, Fd}; false -> ok = file:close(Fd), init_status_error(ReturnPid, Ref, file_exists) end; false -> - track_stats(), + couch_stats_collector:track_process_count( + {couchdb, open_os_files}), {ok, Fd} end; Error -> @@ -303,7 +305,7 @@ init({Filepath, Options, ReturnPid, Ref}) -> {ok, Fd_Read} -> {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), ok = file:close(Fd_Read), - track_stats(), + couch_stats_collector:track_process_count({couchdb, open_os_files}), {ok, Fd}; Error -> init_status_error(ReturnPid, Ref, Error) @@ -314,18 +316,6 @@ init({Filepath, Options, ReturnPid, Ref}) -> terminate(_Reason, _Fd) -> ok. -track_stats() -> - case (catch couch_stats_collector:increment({couchdb, open_os_files})) of - ok -> - Self = self(), - spawn( - fun() -> - erlang:monitor(process, Self), - receive {'DOWN', _, _, _, _} -> ok end, - couch_stats_collector:decrement({couchdb, open_os_files}) - end); - _ -> ok - end. handle_call({pread, Pos, Bytes}, _From, Fd) -> {reply, file:pread(Fd, Pos, Bytes), Fd}; diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 4295ed77..b129d37e 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -14,7 +14,7 @@ -include("couch_db.hrl"). -export([handle_request/1, handle_compact_req/2, handle_design_req/2, - db_req/2, couch_doc_open/4]). + db_req/2, couch_doc_open/4,handle_changes_req/2]). -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, @@ -42,6 +42,72 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. +handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> + StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")), + + {ok, Resp} = start_json_response(Req, 200), + send_chunk(Resp, "{\"results\":[\n"), + case couch_httpd:qs_value(Req, "continuous", "false") of + "true" -> + Self = self(), + Notify = couch_db_update_notifier:start_link( + fun({_, DbName0}) when DbName0 == DbName -> + Self ! db_updated; + (_) -> + ok + end), + try + keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>) + after + catch couch_db_update_notifier:stop(Notify), + wait_db_updated(0) % clean out any remaining update messages + end; + "false" -> + {ok, {LastSeq, _Prepend}} = + send_changes(Req, Resp, Db, StartSeq, <<"">>), + send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [LastSeq])), + send_chunk(Resp, "") + end; + +handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> + send_method_not_allowed(Req, "GET,HEAD"). + +% waits for a db_updated msg, if there are multiple msgs, collects them. +wait_db_updated(Timeout) -> + receive db_updated -> + wait_db_updated(0) + after Timeout -> ok + end. + +keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend) -> + {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend), + couch_db:close(Db), + wait_db_updated(infinity), + {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2). + +send_changes(Req, Resp, Db, StartSeq, Prepend0) -> + Style = list_to_existing_atom( + couch_httpd:qs_value(Req, "style", "main_only")), + couch_db:changes_since(Db, Style, StartSeq, + fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) -> + FilterFun = + fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> + {[{rev, couch_doc:rev_to_str(Rev)}]} + end, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, Prepend}}; + _ -> + send_chunk(Resp, + [Prepend, ?JSON_ENCODE({[{seq,Seq}, {id, Id}, + {changes,Results}]})]), + {ok, {Seq, <<",\n">>}} + end + end, {StartSeq, Prepend0}). + handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) -> ok = couch_view_compactor:start_compact(DbName, Id), send_json(Req, 202, {[{ok, true}]}); @@ -89,7 +155,7 @@ do_db_req(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Fun) -> try Fun(Req, Db) after - couch_db:close(Db) + catch couch_db:close(Db) end; Error -> throw(Error) @@ -258,7 +324,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_all_docs">>]}=Req, Db) -> db_req(#httpd{path_parts=[_,<<"_all_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "GET,HEAD,POST"); - + db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs_by_seq">>]}=Req, Db) -> #view_query_args{ start_key = StartKey, @@ -285,28 +351,29 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs_by_seq">>]}=Req, Db) -> fun(DocInfo, Offset, Acc) -> #doc_info{ id=Id, - rev=Rev, - update_seq=UpdateSeq, - deleted=Deleted, - conflict_revs=ConflictRevs, - deleted_conflict_revs=DelConflictRevs + high_seq=Seq, + revs=[#rev_info{rev=Rev,deleted=Deleted} | RestInfo] } = DocInfo, + ConflictRevs = couch_doc:rev_to_strs( + [Rev1 || #rev_info{deleted=false, rev=Rev1} <- RestInfo]), + DelConflictRevs = couch_doc:rev_to_strs( + [Rev1 || #rev_info{deleted=true, rev=Rev1} <- RestInfo]), Json = { [{<<"rev">>, couch_doc:rev_to_str(Rev)}] ++ case ConflictRevs of - [] -> []; - _ -> [{<<"conflicts">>, couch_doc:rev_to_strs(ConflictRevs)}] + [] -> []; + _ -> [{<<"conflicts">>, ConflictRevs}] end ++ case DelConflictRevs of - [] -> []; - _ -> [{<<"deleted_conflicts">>, couch_doc:rev_to_strs(DelConflictRevs)}] + [] -> []; + _ -> [{<<"deleted_conflicts">>, DelConflictRevs}] end ++ case Deleted of - true -> [{<<"deleted">>, true}]; - false -> [] + true -> [{<<"deleted">>, true}]; + false -> [] end }, - FoldlFun({{UpdateSeq, Id}, Json}, Offset, Acc) + FoldlFun({{Seq, Id}, Json}, Offset, Acc) end, {Limit, SkipCount, undefined, []}), couch_httpd_view:finish_view_fold(Req, TotalRowCount, {ok, FoldResult}) end); @@ -412,9 +479,9 @@ all_docs_view(Req, Db, Keys) -> }), AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) -> case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{deleted=false, rev=Rev} -> + #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> FoldlFun({{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}, Offset, Acc); - #doc_info{deleted=true} -> + #doc_info{revs=[#rev_info{deleted=true}|_]} -> {ok, Acc} end end, @@ -436,9 +503,9 @@ all_docs_view(Req, Db, Keys) -> fun(Key, {ok, FoldAcc}) -> DocInfo = (catch couch_db:get_doc_info(Db, Key)), Doc = case DocInfo of - {ok, #doc_info{id=Id, rev=Rev, deleted=false}} = DocInfo -> + {ok, #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]}} -> {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}; - {ok, #doc_info{id=Id, rev=Rev, deleted=true}} = DocInfo -> + {ok, #doc_info{id=Id, revs=[#rev_info{deleted=true, rev=Rev}|_]}} -> {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}, {deleted, true}]}}; not_found -> {{Key, error}, not_found}; diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl index baea9e3f..7c6a2dc1 100644 --- a/src/couchdb/couch_key_tree.erl +++ b/src/couchdb/couch_key_tree.erl @@ -14,7 +14,7 @@ -export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). -export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, - get_all_leafs_full/1,stem/2,test/0]). + get_all_leafs_full/1,stem/2,map_leafs/2,test/0]). % a key tree looks like this: % Tree -> [] or [{Key, Value, ChildTree} | SiblingTree] @@ -291,6 +291,21 @@ map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> Value2 = Fun({Pos, Key}, Value), [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)]. + +map_leafs(_Fun, []) -> + []; +map_leafs(Fun, [{Pos, Tree}|Rest]) -> + [NewTree] = map_leafs_simple(Fun, Pos, [Tree]), + [{Pos, NewTree} | map_leafs(Fun, Rest)]. + +map_leafs_simple(_Fun, _Pos, []) -> + []; +map_leafs_simple(Fun, Pos, [{Key, Value, []} | RestTree]) -> + Value2 = Fun({Pos, Key}, Value), + [{Key, Value2, []} | map_leafs_simple(Fun, Pos, RestTree)]; +map_leafs_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> + [{Key, Value, map_leafs_simple(Fun, Pos + 1, SubTree)} | map_leafs_simple(Fun, Pos, RestTree)]. + stem(Trees, Limit) -> % flatten each branch in a tree into a tree path diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index f7b1e9ae..721e3071 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -605,22 +605,17 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> [] -> gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity); DocInfoList -> - SrcRevsList = lists:map(fun(SrcDocInfo) -> - #doc_info{id=Id, - rev=Rev, - conflict_revs=Conflicts, - deleted_conflict_revs=DelConflicts - } = SrcDocInfo, - SrcRevs = [Rev | Conflicts] ++ DelConflicts, + SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) -> + SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos], {Id, SrcRevs} end, DocInfoList), {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList), %% do we need to check for success here? - [ gen_server:call(Pid, {replicate_doc, Info}, infinity) + [gen_server:call(Pid, {replicate_doc, Info}, infinity) || Info <- MissingRevs ], - #doc_info{update_seq=LastSeq} = lists:last(DocInfoList), + #doc_info{high_seq=LastSeq} = lists:last(DocInfoList), RevsCount2 = RevsCount + length(SrcRevsList), gen_server:cast(Pid, {increment_update_seq, LastSeq}), @@ -641,15 +636,15 @@ get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) -> {Results} = do_http_request(Url, get, Headers), lists:map(fun({RowInfoList}) -> {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), + Seq = proplists:get_value(<<"key">>, RowInfoList), + Revs = + [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} | + [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++ + [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]], #doc_info{ id=proplists:get_value(<<"id">>, RowInfoList), - rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), - update_seq = proplists:get_value(<<"key">>, RowInfoList), - conflict_revs = - couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])), - deleted_conflict_revs = - couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])), - deleted = proplists:get_value(<<"deleted">>, RowValueProps, false) + high_seq = Seq, + revs = Revs } end, proplists:get_value(<<"rows">>, Results)); get_doc_info_list(DbSource, StartSeq) -> @@ -685,6 +680,7 @@ open_doc(Db, DocId, Options) -> open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0, [latest]) -> + io:format("Revs0:~p~n", [Revs0]), Revs = couch_doc:rev_to_strs(Revs0), BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true", diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index 870d0d76..c1908629 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -183,8 +183,7 @@ maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server) maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) -> % must free up the lru db. case try_close_lru(now()) of - ok -> - couch_stats_collector:decrement({couchdb, open_databases}), + ok -> {ok, Server#server{dbs_open=NumOpen - 1}}; Error -> Error end. @@ -238,7 +237,6 @@ handle_call({open, DbName, Options}, _From, Server) -> true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), DbsOpen = Server2#server.dbs_open + 1, - couch_stats_collector:increment({couchdb, open_databases}), {reply, {ok, MainPid}, Server2#server{dbs_open=DbsOpen}}; Error -> @@ -274,7 +272,6 @@ handle_call({create, DbName, Options}, _From, Server) -> true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), DbsOpen = Server2#server.dbs_open + 1, - couch_stats_collector:increment({couchdb, open_databases}), couch_db_update_notifier:notify({created, DbName}), {reply, {ok, MainPid}, Server2#server{dbs_open=DbsOpen}}; @@ -304,7 +301,6 @@ handle_call({delete, DbName, _Options}, _From, Server) -> true = ets:delete(couch_dbs_by_name, DbName), true = ets:delete(couch_dbs_by_pid, Pid), true = ets:delete(couch_dbs_by_lru, LruTime), - couch_stats_collector:decrement({couchdb, open_databases}), Server#server{dbs_open=Server#server.dbs_open - 1} end, case file:delete(FullFilepath) of @@ -334,7 +330,6 @@ handle_info({'EXIT', Pid, _Reason}, #server{dbs_open=DbsOpen}=Server) -> true = ets:delete(couch_dbs_by_pid, Pid), true = ets:delete(couch_dbs_by_name, DbName), true = ets:delete(couch_dbs_by_lru, LruTime), - couch_stats_collector:decrement({couchdb, open_databases}), {noreply, Server#server{dbs_open=DbsOpen - 1}}; handle_info(Info, _Server) -> exit({unknown_message, Info}). diff --git a/src/couchdb/couch_stats_collector.erl b/src/couchdb/couch_stats_collector.erl index 854fffb0..ad5e9e9a 100644 --- a/src/couchdb/couch_stats_collector.erl +++ b/src/couchdb/couch_stats_collector.erl @@ -24,6 +24,7 @@ -export([start/0, stop/0, get/1, increment/1, decrement/1, + track_process_count/1, record/2, clear/1, all/0, all/1]). @@ -87,6 +88,20 @@ all(Type) -> end. +track_process_count(Stat) -> + case (catch couch_stats_collector:increment(Stat)) of + ok -> + Self = self(), + spawn( + fun() -> + erlang:monitor(process, Self), + receive {'DOWN', _, _, _, _} -> ok end, + couch_stats_collector:decrement(Stat) + end); + _ -> ok + end. + + % GEN_SERVER diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 956ac3f1..c06e733e 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -92,15 +92,16 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=DesignOptions}=Group, ViewKVs, DocIdViewIdKeys}) -> - % This fun computes once for each document - #doc_info{id=DocId, deleted=Deleted} = DocInfo, + % This fun computes once for each document + + #doc_info{id=DocId, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo, IncludeDesign = proplists:get_value(<<"include_design">>, DesignOptions, false), case {IncludeDesign, DocId} of {_, GroupId} -> % uh oh. this is the design doc with our definitions. See if % anything in the definition changed. - case couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]) of + case couch_db:open_doc_int(Db, DocInfo, [conflicts, deleted_conflicts]) of {ok, Doc} -> case couch_view_group:design_doc_to_view_group(Doc) of #group{sig=Sig} -> @@ -124,7 +125,7 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=Desig if Deleted -> {Docs, [{DocId, []} | DocIdViewIdKeys]}; true -> - {ok, Doc} = couch_db:open_doc(Db, DocInfo, + {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, [conflicts, deleted_conflicts]), {[Doc | Docs], DocIdViewIdKeys} end, @@ -135,7 +136,7 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=Desig {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, - DocInfo#doc_info.update_seq), + DocInfo#doc_info.high_seq), garbage_collect(), ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], {[], Group2, ViewEmptyKeyValues, []}; |