From 4b5e0a20aa087dd26df644c0432627aa3e5826d4 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Wed, 6 May 2009 19:11:10 +0000 Subject: First cut at _changes api. Update the by_id and by_seq indexes to contain update seq numbers and pointers to bodies on disk, for use in the _changes api. This is a new file version, but the code can continue to serve the old 0.9 version without problems, though certain features in the _changes api will not be able to work. Upgrade to new file version (from 1 to 2) by compacting the file. Also fixed bugs with how the stats api tracks open databases. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@772406 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 93 +++++++++++++++++++++++++++++------------------- 1 file changed, 56 insertions(+), 37 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 4cd5a5fd..e065ff3b 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,13 +17,14 @@ -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). --export([set_revs_limit/2,get_revs_limit/1]). +-export([set_revs_limit/2,get_revs_limit/1,register_update_notifier/3]). -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]). +-export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). +-export([changes_since/5]). -include("couch_db.hrl"). @@ -81,6 +82,9 @@ is_idle(MainPid) -> monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). +register_update_notifier(#db{main_pid=Pid}, Seq, Fun) -> + gen_server:call(Pid, {register_update_notifier, Seq, Fun}). + start_compact(#db{update_pid=Pid}) -> gen_server:cast(Pid, start_compact). @@ -166,6 +170,7 @@ get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> get_db_info(Db) -> #db{fd=Fd, + header=#db_header{disk_version=DiskVersion}, compactor_pid=Compactor, update_seq=SeqNum, name=Name, @@ -181,7 +186,8 @@ get_db_info(Db) -> {purge_seq, couch_db:get_purge_seq(Db)}, {compact_running, Compactor/=nil}, {disk_size, Size}, - {instance_start_time, StartTime} + {instance_start_time, StartTime}, + {disk_format_version, DiskVersion} ], {ok, InfoList}. @@ -337,7 +343,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || - {{Deleted, Sp}, {Start, [RevId|_]}=Revs} <- Leafs]), + {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]), {PreppedBucket, AccErrors3} = lists:foldl( fun(Doc, {Docs2Acc, AccErrors2}) -> case prep_and_validate_update(Db, Doc, OldFullDocInfo, @@ -398,7 +404,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case PrevRevFull of {_RevId, ?REV_MISSING} -> conflict; - {RevId, {IsDel, DiskSp}} -> + {RevId, {IsDel, DiskSp, _Seq}} -> DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath), Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), {ok, Doc2, fun() -> DiskDoc end} @@ -522,7 +528,7 @@ make_first_doc_on_disk(_Db, _Id, _Pos, []) -> nil; make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) -> make_first_doc_on_disk(Db, Id, Pos - 1, RestPath); -make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp}} |_]=DocPath) -> +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) -> Revs = [Rev || {Rev, _} <- DocPath], make_doc(Db, Id, IsDel, Sp, {Pos, Revs}). @@ -653,7 +659,22 @@ enum_docs_reduce_to_count(Reds) -> {Count, _DelCount} = couch_btree:final_reduce( fun couch_db_updater:btree_by_id_reduce/2, Reds), Count. - + +changes_since(Db, Style, StartSeq, Fun, Acc) -> + enum_docs_since(Db, StartSeq, fwd, + fun(DocInfo, _Offset, Acc2) -> + #doc_info{revs=Revs} = DocInfo, + case Style of + main_only -> + Infos = [DocInfo]; + all_docs -> + % make each rev it's own doc info + Infos = [DocInfo#doc_info{revs=[RevInfo]} || + #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq] + end, + Fun(Infos, Acc2) + end, Acc). + count_changes_since(Db, SinceSeq) -> {ok, Changes} = couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, @@ -665,9 +686,9 @@ count_changes_since(Db, SinceSeq) -> end, 0), Changes. - -enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> - couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). + +enum_docs_since(Db, SinceSeq, Direction, InFun, Acc) -> + couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Acc). enum_docs_since(Db, SinceSeq, InFun, Acc) -> enum_docs_since(Db, SinceSeq, fwd, InFun, Acc). @@ -684,6 +705,7 @@ init({DbName, Filepath, Fd, Options}) -> {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), couch_ref_counter:add(RefCntr), + couch_stats_collector:track_process_count({couchdb, open_databases}), {ok, Db}. terminate(Reason, _Db) -> @@ -698,8 +720,8 @@ handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact, % Idle means no referrers. Unless in the middle of a compaction file switch, % there are always at least 2 referrers, couch_db_updater and us. {reply, (Delay == nil) and (Compact == nil) and (couch_ref_counter:count(RefCntr) == 2), Db}; -handle_call({db_updated, #db{fd_ref_counter=NewRefCntr}=NewDb}, _From, - #db{fd_ref_counter=OldRefCntr}) -> +handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> + #db{fd_ref_counter=NewRefCntr}=NewDb, case NewRefCntr == OldRefCntr of true -> ok; false -> @@ -747,7 +769,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> ?REV_MISSING -> % we have the rev in our list but know nothing about it {{not_found, missing}, {Pos, Rev}}; - {IsDeleted, SummaryPtr} -> + {IsDeleted, SummaryPtr, _UpdateSeq} -> {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} end end, FoundRevs), @@ -768,14 +790,15 @@ open_doc_int(Db, <> = Id, _Options) -> [not_found] -> {not_found, missing} end; -open_doc_int(Db, #doc_info{id=Id,rev={Pos,RevId},deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> - Doc = make_doc(Db, Id, IsDeleted, Sp, {Pos,[RevId]}), +open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) -> + #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo, + Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}), {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> - #doc_info{deleted=IsDeleted,rev=Rev,summary_pointer=Sp} = DocInfo = - couch_doc:to_doc_info(FullDocInfo), + #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} = + DocInfo = couch_doc:to_doc_info(FullDocInfo), {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), - Doc = make_doc(Db, Id, IsDeleted, Sp, RevPath), + Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath), {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of @@ -785,36 +808,36 @@ open_doc_int(Db, Id, Options) -> {not_found, missing} end. -doc_meta_info(DocInfo, RevTree, Options) -> +doc_meta_info(#doc_info{revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) -> case lists:member(revs_info, Options) of false -> []; true -> {[{Pos, RevPath}],[]} = - couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]), + couch_key_tree:get_full_key_paths(RevTree, [Rev]), [{revs_info, Pos, lists:map( - fun({Rev, {true, _Sp}}) -> - {Rev, deleted}; - ({Rev, {false, _Sp}}) -> - {Rev, available}; - ({Rev, ?REV_MISSING}) -> - {Rev, missing} + fun({Rev1, {true, _Sp, _UpdateSeq}}) -> + {Rev1, deleted}; + ({Rev1, {false, _Sp, _UpdateSeq}}) -> + {Rev1, available}; + ({Rev1, ?REV_MISSING}) -> + {Rev1, missing} end, RevPath)}] end ++ case lists:member(conflicts, Options) of false -> []; true -> - case DocInfo#doc_info.conflict_revs of + case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of [] -> []; - _ -> [{conflicts, DocInfo#doc_info.conflict_revs}] + ConflictRevs -> [{conflicts, ConflictRevs}] end end ++ case lists:member(deleted_conflicts, Options) of false -> []; true -> - case DocInfo#doc_info.deleted_conflict_revs of + case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of [] -> []; - _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}] + DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}] end end. @@ -829,19 +852,15 @@ doc_to_tree_simple(Doc, [RevId]) -> doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(Db, FullDocInfo) -> - {#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath} - = couch_doc:to_doc_info_path(FullDocInfo), - make_doc(Db, Id, Deleted, Sp, RevPath). -make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) -> +make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> {BodyData, BinValues} = - case BodySp of + case Bp of nil -> {[], []}; _ -> {ok, {BodyData0, BinValues0}} = - couch_stream:read_term( Db#db.summary_stream, BodySp), + couch_stream:read_term( Db#db.summary_stream, Bp), {BodyData0, [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]} end, -- cgit v1.2.3