From 7393d62b7b630bee50f609d0ae8125d33f7cda2b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 18 Aug 2010 11:51:03 -0400 Subject: Grab bag of Cloudant patches to couch OTP application - Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again). --- apps/couch/src/couch_server.erl | 378 +++++++++++++++++++--------------------- 1 file changed, 180 insertions(+), 198 deletions(-) (limited to 'apps/couch/src/couch_server.erl') diff --git a/apps/couch/src/couch_server.erl b/apps/couch/src/couch_server.erl index 43fd9044..b54771be 100644 --- a/apps/couch/src/couch_server.erl +++ b/apps/couch/src/couch_server.erl @@ -13,10 +13,11 @@ -module(couch_server). -behaviour(gen_server). --export([open/2,create/2,delete/2,all_databases/0,get_version/0]). --export([init/1, handle_call/3,sup_start_link/0]). +-export([open/2,create/2,delete/2,all_databases/0,all_databases/1]). +-export([init/1, handle_call/3,sup_start_link/0,get_version/0]). -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). --export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). +-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0,config_change/4]). +-export([close_lru/0]). -include("couch_db.hrl"). @@ -50,15 +51,26 @@ get_stats() -> sup_start_link() -> gen_server:start_link({local, couch_server}, couch_server, [], []). + open(DbName, Options) -> - case gen_server:call(couch_server, {open, DbName, Options}, infinity) of - {ok, Db} -> - Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), - {ok, Db#db{user_ctx=Ctx}}; - Error -> - Error + Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), + case ets:lookup(couch_dbs, DbName) of + [#db{fd=Fd, fd_monitor=Lock} = Db] when Lock =/= locked -> + ets:insert(couch_lru, {DbName, now()}), + {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + _ -> + case gen_server:call(couch_server, {open, DbName, Options}, infinity) of + {ok, #db{fd=Fd} = Db} -> + ets:insert(couch_lru, {DbName, now()}), + {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + Error -> + Error + end end. +close_lru() -> + gen_server:call(couch_server, close_lru). + create(DbName, Options) -> case gen_server:call(couch_server, {create, DbName, Options}, infinity) of {ok, Db} -> @@ -121,28 +133,12 @@ init([]) -> RootDir = couch_config:get("couchdb", "database_dir", "."), MaxDbsOpen = list_to_integer( couch_config:get("couchdb", "max_dbs_open")), - Self = self(), - ok = couch_config:register( - fun("couchdb", "database_dir") -> - exit(Self, config_change) - end), - ok = couch_config:register( - fun("couchdb", "max_dbs_open", Max) -> - gen_server:call(couch_server, - {set_max_dbs_open, list_to_integer(Max)}) - end), + ok = couch_config:register(fun ?MODULE:config_change/4), ok = couch_file:init_delete_dir(RootDir), hash_admin_passwords(), - ok = couch_config:register( - fun("admins", _Key, _Value, Persist) -> - % spawn here so couch_config doesn't try to call itself - spawn(fun() -> hash_admin_passwords(Persist) end) - end, false), - {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"), - ets:new(couch_dbs_by_name, [set, private, named_table]), - ets:new(couch_dbs_by_pid, [set, private, named_table]), - ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]), - ets:new(couch_sys_dbs, [set, private, named_table]), + {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\.]*$"), + ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]), + ets:new(couch_lru, [set, public, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, @@ -150,15 +146,27 @@ init([]) -> start_time=httpd_util:rfc1123_date()}}. terminate(_Reason, _Srv) -> - [couch_util:shutdown_sync(Pid) || {_, {Pid, _LruTime}} <- - ets:tab2list(couch_dbs_by_name)], + ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end, + nil, couch_dbs), ok. +config_change("couchdb", "database_dir", _, _) -> + exit(whereis(couch_server), config_change); +config_change("couchdb", "max_dbs_open", Max, _) -> + gen_server:call(couch_server, {set_max_dbs_open, list_to_integer(Max)}); +config_change("admins", _, _, Persist) -> + % spawn here so couch_config doesn't try to call itself + spawn(fun() -> hash_admin_passwords(Persist) end). + all_databases() -> + all_databases(""). + +all_databases(Prefix) -> {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server), NormRoot = couch_util:normpath(Root), Filenames = - filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true, + filelib:fold_files(Root++Prefix, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", + true, fun(Filename, AccIn) -> NormFilename = couch_util:normpath(Filename), case NormFilename -- NormRoot of @@ -181,172 +189,145 @@ maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) -> Error -> Error end. +find_oldest_db({DbName, Lru}, Acc) -> + erlang:min({Lru, DbName}, Acc). + try_close_lru(StartTime) -> - LruTime = get_lru(), - if LruTime > StartTime -> - % this means we've looped through all our opened dbs and found them - % all in use. + case ets:foldl(fun find_oldest_db/2, {StartTime, nil}, couch_lru) of + {StartTime, nil} -> {error, all_dbs_active}; - true -> - [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime), - [{_, {opened, MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName), - case couch_db:is_idle(MainPid) of - true -> - couch_util:shutdown_sync(MainPid), - true = ets:delete(couch_dbs_by_lru, LruTime), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, MainPid), - true = ets:delete(couch_sys_dbs, DbName), + {_, DbName} -> + % There may exist an extremely small possibility of a race + % condition here, if a process could lookup the DB before the lock, + % but fail to monitor the fd before the is_idle check. + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}), + [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + case couch_db:is_idle(Db) of true -> + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + exit(Pid, kill), ok; false -> - % this still has referrers. Go ahead and give it a current lru time - % and try the next one in the table. - NewLruTime = now(), - true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, NewLruTime}}), - true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), - true = ets:delete(couch_dbs_by_lru, LruTime), - true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}), + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + true = ets:insert(couch_lru, {DbName, now()}), try_close_lru(StartTime) end end. -get_lru() -> - get_lru(ets:first(couch_dbs_by_lru)). - -get_lru(LruTime) -> - [{LruTime, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime), - case ets:member(couch_sys_dbs, DbName) of - false -> - LruTime; - true -> - [{_, {opened, MainPid, _}}] = ets:lookup(couch_dbs_by_name, DbName), - case couch_db:is_idle(MainPid) of - true -> - LruTime; - false -> - get_lru(ets:next(couch_dbs_by_lru, LruTime)) - end - end. - open_async(Server, From, DbName, Filepath, Options) -> Parent = self(), + put({async_open, DbName}, now()), Opener = spawn_link(fun() -> - Res = couch_db:start_link(DbName, Filepath, Options), - gen_server:call( - Parent, {open_result, DbName, Res, Options}, infinity - ), - unlink(Parent), - case Res of - {ok, DbReader} -> - unlink(DbReader); - _ -> - ok - end - end), - true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}), - true = ets:insert(couch_dbs_by_pid, {Opener, DbName}), - DbsOpen = case lists:member(sys_db, Options) of - true -> - true = ets:insert(couch_sys_dbs, {DbName, true}), - Server#server.dbs_open; - false -> - Server#server.dbs_open + 1 - end, - Server#server{dbs_open = DbsOpen}. - + Res = couch_db:start_link(DbName, Filepath, Options), + gen_server:call(Parent, {open_result, DbName, Res}, infinity), + unlink(Parent) + end), + % icky hack of field values - compactor_pid used to store clients + true = ets:insert(couch_dbs, #db{ + name = DbName, + main_pid = Opener, + compactor_pid = [From], + fd_monitor = locked + }), + Server#server{dbs_open=Server#server.dbs_open + 1}. + +handle_call(close_lru, _From, #server{dbs_open=N} = Server) -> + case try_close_lru(now()) of + ok -> + {reply, ok, Server#server{dbs_open = N-1}}; + Error -> + {reply, Error, Server} + end; +handle_call(open_dbs_count, _From, Server) -> + {reply, Server#server.dbs_open, Server}; +handle_call({set_dbname_regexp, RegExp}, _From, Server) -> + {reply, ok, Server#server{dbname_regexp=RegExp}}; handle_call({set_max_dbs_open, Max}, _From, Server) -> {reply, ok, Server#server{max_dbs_open=Max}}; handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; -handle_call({open_result, DbName, {ok, OpenedDbPid}, Options}, _From, Server) -> - link(OpenedDbPid), - [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName), - lists:foreach(fun({FromPid,_}=From) -> - gen_server:reply(From, - catch couch_db:open_ref_counted(OpenedDbPid, FromPid)) - end, Froms), - LruTime = now(), - true = ets:insert(couch_dbs_by_name, - {DbName, {opened, OpenedDbPid, LruTime}}), - true = ets:delete(couch_dbs_by_pid, Opener), - true = ets:insert(couch_dbs_by_pid, {OpenedDbPid, DbName}), - true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), - case lists:member(create, Options) of - true -> - couch_db_update_notifier:notify({created, DbName}); - false -> - ok +handle_call({open_result, DbName, {ok, Db}}, _From, Server) -> + link(Db#db.main_pid), + case erase({async_open, DbName}) of undefined -> ok; T0 -> + ?LOG_INFO("needed ~p ms to open new ~s", [timer:now_diff(now(),T0)/1000, + DbName]) end, + % icky hack of field values - compactor_pid used to store clients + [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName), + [gen_server:reply(From, {ok, Db}) || From <- Froms], + true = ets:insert(couch_dbs, Db), {reply, ok, Server}; -handle_call({open_result, DbName, Error, Options}, _From, Server) -> - [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName), - lists:foreach(fun(From) -> - gen_server:reply(From, Error) - end, Froms), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Opener), - DbsOpen = case lists:member(sys_db, Options) of - true -> - true = ets:delete(couch_sys_dbs, DbName), - Server#server.dbs_open; - false -> - Server#server.dbs_open - 1 - end, - {reply, ok, Server#server{dbs_open = DbsOpen}}; -handle_call({open, DbName, Options}, {FromPid,_}=From, Server) -> - LruTime = now(), - case ets:lookup(couch_dbs_by_name, DbName) of +handle_call({open_result, DbName, Error}, _From, Server) -> + % icky hack of field values - compactor_pid used to store clients + [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName), + [gen_server:reply(From, Error) || From <- Froms], + ?LOG_INFO("open_result error ~p for ~s", [Error, DbName]), + true = ets:delete(couch_dbs, DbName), + {reply, ok, Server#server{dbs_open=Server#server.dbs_open - 1}}; +handle_call({open, DbName, Options}, From, Server) -> + case ets:lookup(couch_dbs, DbName) of [] -> - open_db(DbName, Server, Options, From); - [{_, {opening, Opener, Froms}}] -> - true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From|Froms]}}), + DbNameList = binary_to_list(DbName), + case check_dbname(Server, DbNameList) of + ok -> + case maybe_close_lru_db(Server) of + {ok, Server2} -> + Filepath = get_full_filename(Server, DbNameList), + {noreply, open_async(Server2, From, DbName, Filepath, Options)}; + CloseError -> + {reply, CloseError, Server} + end; + Error -> + {reply, Error, Server} + end; + [#db{compactor_pid = Froms} = Db] when is_list(Froms) -> + % icky hack of field values - compactor_pid used to store clients + ?LOG_INFO("adding another listener to async open for ~s", [DbName]), + true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}), {noreply, Server}; - [{_, {opened, MainPid, PrevLruTime}}] -> - true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, LruTime}}), - true = ets:delete(couch_dbs_by_lru, PrevLruTime), - true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), - {reply, couch_db:open_ref_counted(MainPid, FromPid), Server} + [#db{} = Db] -> + {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> - case ets:lookup(couch_dbs_by_name, DbName) of - [] -> - open_db(DbName, Server, [create | Options], From); - [_AlreadyRunningDb] -> - {reply, file_exists, Server} + DbNameList = binary_to_list(DbName), + case check_dbname(Server, DbNameList) of + ok -> + case ets:lookup(couch_dbs, DbName) of + [] -> + case maybe_close_lru_db(Server) of + {ok, Server2} -> + Filepath = get_full_filename(Server, DbNameList), + {noreply, open_async(Server2, From, DbName, Filepath, + [create | Options])}; + CloseError -> + {reply, CloseError, Server} + end; + [_AlreadyRunningDb] -> + {reply, file_exists, Server} + end; + Error -> + {reply, Error, Server} end; handle_call({delete, DbName, _Options}, _From, Server) -> DbNameList = binary_to_list(DbName), case check_dbname(Server, DbNameList) of ok -> FullFilepath = get_full_filename(Server, DbNameList), - UpdateState = - case ets:lookup(couch_dbs_by_name, DbName) of - [] -> false; - [{_, {opening, Pid, Froms}}] -> - couch_util:shutdown_sync(Pid), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Pid), - [gen_server:send_result(F, not_found) || F <- Froms], - true; - [{_, {opened, Pid, LruTime}}] -> - couch_util:shutdown_sync(Pid), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Pid), - true = ets:delete(couch_dbs_by_lru, LruTime), - true - end, - Server2 = case UpdateState of - true -> - DbsOpen = case ets:member(couch_sys_dbs, DbName) of - true -> - true = ets:delete(couch_sys_dbs, DbName), - Server#server.dbs_open; - false -> - Server#server.dbs_open - 1 - end, - Server#server{dbs_open = DbsOpen}; - false -> - Server + Server2 = + case ets:lookup(couch_dbs, DbName) of + [] -> Server; + [#db{main_pid=Pid, compactor_pid=Froms}] when is_list(Froms) -> + % icky hack of field values - compactor_pid used to store clients + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + exit(Pid, kill), + [gen_server:reply(F, not_found) || F <- Froms], + Server#server{dbs_open=Server#server.dbs_open - 1}; + [#db{main_pid=Pid}] -> + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + exit(Pid, kill), + Server#server{dbs_open=Server#server.dbs_open - 1} end, %% Delete any leftover .compact files. If we don't do this a subsequent @@ -364,36 +345,37 @@ handle_call({delete, DbName, _Options}, _From, Server) -> end; Error -> {reply, Error, Server} - end. + end; +handle_call({db_updated, Db}, _From, Server) -> + ets:insert(couch_dbs, Db), + {reply, ok, Server}. + -handle_cast(Msg, _Server) -> - exit({unknown_cast_message, Msg}). +handle_cast(Msg, Server) -> + {stop, {unknown_cast_message, Msg}, Server}. code_change(_OldVsn, State, _Extra) -> {ok, State}. - -handle_info({'EXIT', _Pid, config_change}, Server) -> - {noreply, shutdown, Server}; -handle_info(Error, _Server) -> - ?LOG_ERROR("Unexpected message, restarting couch_server: ~p", [Error]), - exit(kill). -open_db(DbName, Server, Options, From) -> - DbNameList = binary_to_list(DbName), - case check_dbname(Server, DbNameList) of - ok -> - Filepath = get_full_filename(Server, DbNameList), - case lists:member(sys_db, Options) of +handle_info({'EXIT', _Pid, config_change}, Server) -> + {stop, config_change, Server}; +handle_info({'EXIT', Pid, Reason}, #server{dbs_open=DbsOpen}=Server) -> + Match = erlang:make_tuple(tuple_size(#db{}), '_', [{1, db}, + {#db.main_pid, Pid}]), + case ets:match_object(couch_dbs, Match) of + [#db{name = DbName, compactor_pid=Froms}] -> + ?LOG_INFO("db ~s died with reason ~p", [DbName, Reason]), + % icky hack of field values - compactor_pid used to store clients + if is_list(Froms) -> + [gen_server:reply(From, Reason) || From <- Froms]; true -> - {noreply, open_async(Server, From, DbName, Filepath, Options)}; - false -> - case maybe_close_lru_db(Server) of - {ok, Server2} -> - {noreply, open_async(Server2, From, DbName, Filepath, Options)}; - CloseError -> - {reply, CloseError, Server} - end - end; - Error -> - {reply, Error, Server} - end. + ok + end, + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + {noreply, Server#server{dbs_open=DbsOpen - 1}}; + [] -> + {noreply, Server} + end; +handle_info(Info, Server) -> + {stop, {unknown_message, Info}, Server}. -- cgit v1.2.3