diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-08-18 11:51:03 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-18 14:24:57 -0400 |
commit | 7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch) | |
tree | 754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_db.erl | |
parent | c0cb2625f25a2b51485c164bea1d8822f449ce14 (diff) |
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are
accessible through a protected ets table owned by couch_server.
- #full_doc_info{} in by_id and by_seq trees for faster compaction at the
expense of more disk usage afterwards. Proposed as COUCHDB-738 but not
accepted upstream.
- Replication via distributed Erlang.
- Better hot upgrade support (uses exported functions much more often).
- Configurable btree chunk sizes allow for larger (but still bounded)
reductions.
- Shorter names for btree fields in #db{} and #db_header{}.
- couch_view_group does not keep a reference to the #db{}.
- Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_db.erl')
-rw-r--r-- | apps/couch/src/couch_db.erl | 198 |
1 files changed, 94 insertions, 104 deletions
diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index 7678f6ca..a3112e24 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -11,7 +11,6 @@ % the License. -module(couch_db). --behaviour(gen_server). -export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]). -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). @@ -22,21 +21,20 @@ -export([enum_docs/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,ensure_full_commit/1]). +-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). -export([set_security/2,get_security/1]). --export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). --export([check_is_admin/1, check_is_reader/1]). +-export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]). -include("couch_db.hrl"). - start_link(DbName, Filepath, Options) -> case open_db_file(Filepath, Options) of {ok, Fd} -> - StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), + {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName, + Filepath, Fd, Options}, []), unlink(Fd), - StartResult; + gen_server:call(UpdaterPid, get_db); Else -> Else end. @@ -52,7 +50,7 @@ open_db_file(Filepath, Options) -> {ok, Fd} -> ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), ok = file:rename(Filepath ++ ".compact", Filepath), - ok = couch_file:sync(Fd), + ok = couch_file:sync(Filepath), {ok, Fd}; {error, enoent} -> {not_found, no_db_file} @@ -86,24 +84,33 @@ open(DbName, Options) -> Else -> Else end. -ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> - ok = gen_server:call(UpdatePid, full_commit, infinity), +ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> + ok = gen_server:call(Pid, full_commit, infinity), + {ok, StartTime}. + +ensure_full_commit(Db, RequiredSeq) -> + #db{main_pid=Pid, instance_start_time=StartTime} = Db, + ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity), {ok, StartTime}. -close(#db{fd_ref_counter=RefCntr}) -> - couch_ref_counter:drop(RefCntr). +close(#db{fd_monitor=RefCntr}) -> + erlang:demonitor(RefCntr). open_ref_counted(MainPid, OpenedPid) -> gen_server:call(MainPid, {open_ref_count, OpenedPid}). -is_idle(MainPid) -> - gen_server:call(MainPid, is_idle). +is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> + {monitored_by, Pids} = erlang:process_info(Db#db.fd, monitored_by), + (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= []; +is_idle(_Db) -> + false. monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). -start_compact(#db{update_pid=Pid}) -> - gen_server:call(Pid, start_compact). +start_compact(#db{main_pid=Pid}) -> + {ok, _} = gen_server:call(Pid, start_compact), + ok. delete_doc(Db, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], @@ -210,13 +217,13 @@ get_full_doc_info(Db, Id) -> Result. get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). + couch_btree:lookup(Db#db.id_tree, Ids). -increment_update_seq(#db{update_pid=UpdatePid}) -> - gen_server:call(UpdatePid, increment_update_seq). +increment_update_seq(#db{main_pid=Pid}) -> + gen_server:call(Pid, increment_update_seq). -purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> - gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +purge_docs(#db{main_pid=Pid}, IdsRevs) -> + gen_server:call(Pid, {purge_docs, IdsRevs}). get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. @@ -232,13 +239,17 @@ get_last_purged(#db{header=#db_header{purged_docs=nil}}) -> get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> couch_file:pread_term(Fd, PurgedPointer). +get_doc_count(Db) -> + {ok, {Count, _DelCount}} = couch_btree:full_reduce(Db#db.id_tree), + {ok, Count}. + get_db_info(Db) -> #db{fd=Fd, header=#db_header{disk_version=DiskVersion}, compactor_pid=Compactor, update_seq=SeqNum, name=Name, - fulldocinfo_by_id_btree=FullDocBtree, + id_tree=FullDocBtree, instance_start_time=StartTime, committed_update_seq=CommittedUpdateSeq} = Db, {ok, Size} = couch_file:bytes(Fd), @@ -257,7 +268,12 @@ get_db_info(Db) -> ], {ok, InfoList}. -get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> +get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> + {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), + receive {'DOWN', Ref, _, _, Response} -> + Response + end; +get_design_docs(#db{id_tree=Btree}=Db) -> {ok,_, Docs} = couch_btree:fold(Btree, fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), @@ -319,7 +335,7 @@ get_readers(#db{security=SecProps}) -> get_security(#db{security=SecProps}) -> {SecProps}. -set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> +set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), ok = validate_security_object(NewSecProps), ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity), @@ -354,7 +370,7 @@ validate_names_and_roles({Props}) when is_list(Props) -> get_revs_limit(#db{revs_limit=Limit}) -> Limit. -set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 -> +set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> check_is_admin(Db), gen_server:call(Pid, {set_revs_limit, Limit}, infinity); set_revs_limit(_Db, _Limit) -> @@ -406,6 +422,9 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> catch check_is_admin(Db); +validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) -> + ValidationFuns = load_validation_funs(Db), + validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun); validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) -> ok; validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> @@ -424,6 +443,27 @@ validate_doc_update(Db, Doc, GetDiskDocFun) -> Error end. +% to be safe, spawn a middleman here +load_validation_funs(#db{main_pid = Pid} = Db) -> + {_, Ref} = spawn_monitor(fun() -> + {ok, DesignDocs} = get_design_docs(Db), + exit({ok, lists:flatmap(fun(DesignDoc) -> + case couch_doc:get_validate_doc_fun(DesignDoc) of + nil -> + []; + Fun -> + [Fun] + end + end, DesignDocs)}) + end), + receive + {'DOWN', Ref, _, _, {ok, Funs}} -> + gen_server:cast(Pid, {load_validation_funs, Funs}), + Funs; + {'DOWN', Ref, _, _, Reason} -> + ?LOG_ERROR("could not load validation funs ~p", [Reason]), + throw(internal_server_error) + end. prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) -> @@ -512,8 +552,8 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [PreppedBucket | AccPrepped], AccErrors3). -update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> - update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit). +update_docs(Db, Docs, Options) -> + update_docs(Db, Docs, Options, interactive_edit). prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> @@ -743,37 +783,37 @@ set_commit_option(Options) -> [full_commit|Options] end. -collect_results(UpdatePid, MRef, ResultsAcc) -> +collect_results(Pid, MRef, ResultsAcc) -> receive - {result, UpdatePid, Result} -> - collect_results(UpdatePid, MRef, [Result | ResultsAcc]); - {done, UpdatePid} -> + {result, Pid, Result} -> + collect_results(Pid, MRef, [Result | ResultsAcc]); + {done, Pid} -> {ok, ResultsAcc}; - {retry, UpdatePid} -> + {retry, Pid} -> retry; {'DOWN', MRef, _, _, Reason} -> exit(Reason) end. -write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, +write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets, NonRepDocs, Options0) -> Options = set_commit_option(Options0), MergeConflicts = lists:member(merge_conflicts, Options), FullCommit = lists:member(full_commit, Options), - MRef = erlang:monitor(process, UpdatePid), + MRef = erlang:monitor(process, Pid), try - UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, - case collect_results(UpdatePid, MRef, []) of + Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(Pid, MRef, []) of {ok, Results} -> {ok, Results}; retry -> % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file - {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), + {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]), DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), - UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, - case collect_results(UpdatePid, MRef, []) of + Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(Pid, MRef, []) of {ok, Results} -> {ok, Results}; retry -> throw({update_error, compaction_retry}) end @@ -921,9 +961,15 @@ enum_docs_reduce_to_count(Reds) -> changes_since(Db, Style, StartSeq, Fun, Acc) -> changes_since(Db, Style, StartSeq, Fun, [], Acc). - + changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> - Wrapper = fun(DocInfo, _Offset, Acc2) -> + Wrapper = fun(FullDocInfo, _Offset, Acc2) -> + case FullDocInfo of + #full_doc_info{} -> + DocInfo = couch_doc:to_doc_info(FullDocInfo); + #doc_info{} -> + DocInfo = FullDocInfo + end, #doc_info{revs=Revs} = DocInfo, DocInfo2 = case Style of @@ -936,83 +982,27 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> end, Fun(DocInfo2, Acc2) end, - {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, - Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), + {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper, + Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]), {ok, AccOut}. count_changes_since(Db, SinceSeq) -> {ok, Changes} = - couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, + couch_btree:fold_reduce(Db#db.seq_tree, fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)} + {ok, couch_btree:final_reduce(Db#db.seq_tree, PartialReds)} end, 0, [{start_key, SinceSeq + 1}]), Changes. enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]), + {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]), {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. enum_docs(Db, InFun, InAcc, Options) -> - {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options), + {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.id_tree, InFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. -% server functions - -init({DbName, Filepath, Fd, Options}) -> - {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), - {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), - couch_ref_counter:add(RefCntr), - case lists:member(sys_db, Options) of - true -> - ok; - false -> - couch_stats_collector:track_process_count({couchdb, open_databases}) - end, - process_flag(trap_exit, true), - {ok, Db}. - -terminate(_Reason, Db) -> - couch_util:shutdown_sync(Db#db.update_pid), - ok. - -handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) -> - ok = couch_ref_counter:add(RefCntr, OpenerPid), - {reply, {ok, Db}, Db}; -handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact, - waiting_delayed_commit=Delay}=Db) -> - % Idle means no referrers. Unless in the middle of a compaction file switch, - % there are always at least 2 referrers, couch_db_updater and us. - {reply, (Delay == nil) andalso (Compact == nil) andalso (couch_ref_counter:count(RefCntr) == 2), Db}; -handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> - #db{fd_ref_counter=NewRefCntr}=NewDb, - case NewRefCntr =:= OldRefCntr of - true -> ok; - false -> - couch_ref_counter:add(NewRefCntr), - couch_ref_counter:drop(OldRefCntr) - end, - {reply, ok, NewDb}; -handle_call(get_db, _From, Db) -> - {reply, {ok, Db}, Db}. - - -handle_cast(Msg, Db) -> - ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_info({'EXIT', _Pid, normal}, Db) -> - {noreply, Db}; -handle_info({'EXIT', _Pid, Reason}, Server) -> - {stop, Reason, Server}; -handle_info(Msg, Db) -> - ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). - - %%% Internal function %%% open_doc_revs_int(Db, IdRevs, Options) -> Ids = [Id || {Id, _Revs} <- IdRevs], @@ -1054,7 +1044,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> IdRevs, LookupResults). open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) -> - case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of + case couch_btree:lookup(Db#db.local_tree, [Id]) of [{ok, {_, {Rev, BodyData}}}] -> {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}}; [not_found] -> |