From 7bd071836a08722cb14479261507aec1d071e2d2 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 6 Feb 2012 22:02:32 -0500 Subject: Add an index keyed on LRU for faster candidate ID Our current implementation for closing an LRU DB involves a full scan of a public ets table. This scan blocks all other activity in couch_server and can become a serious bottleneck when the LRU cache hit rate drops too low. In the worst-case all_dbs_active scenario we end up with O(N**2) algorithmic complexity. This patch adds a new index keyed on LRU for faster access to the least recently used databases. It also moves the ets table to a dict on the couch_server heap. The downside is an increased message rate inbound on the couch_server, as clients are no longer allowed to update the LRU data structures without sending a message. BugzID: 12879 Conflicts: apps/couch/src/couch_server.erl --- apps/couch/src/couch_lru.erl | 49 +++++++++++++++++++++++++ apps/couch/src/couch_server.erl | 81 +++++++++++------------------------------ 2 files changed, 71 insertions(+), 59 deletions(-) create mode 100644 apps/couch/src/couch_lru.erl diff --git a/apps/couch/src/couch_lru.erl b/apps/couch/src/couch_lru.erl new file mode 100644 index 00000000..ffddfdfc --- /dev/null +++ b/apps/couch/src/couch_lru.erl @@ -0,0 +1,49 @@ +-module(couch_lru). +-export([new/0, insert/2, update/2, close/1]). + +-include("couch_db.hrl"). + +new() -> + {gb_trees:empty(), dict:new()}. + +insert(DbName, {Tree0, Dict0}) -> + Lru = now(), + {gb_trees:insert(Lru, DbName, Tree0), dict:store(DbName, Lru, Dict0)}. + +update(DbName, {Tree0, Dict0}) -> + case dict:find(DbName, Dict0) of + {ok, Old} -> + New = now(), + Tree = gb_trees:insert(New, DbName, gb_trees:delete(Old, Tree0)), + Dict = dict:store(DbName, New, Dict0), + {Tree, Dict}; + error -> + % We closed this database before processing the update. Ignore + {Tree0, Dict0} + end. + +close({Tree, _} = Cache) -> + close_int(gb_trees:next(gb_trees:iterator(Tree)), Cache). + +%% internals + +close_int(none, _) -> + erlang:error(all_dbs_active); +close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) -> + case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of + true -> + [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + case couch_db:is_idle(Db) of true -> + true = ets:delete(couch_dbs, DbName), + exit(Pid, kill), + {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}; + false -> + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + twig:log(warn, "~p old active ~s", [?MODULE, Db#db.name]), + close_int(gb_trees:next(Iter), update(DbName, Cache)) + end; + false -> + NewTree = gb_trees:delete(Lru, Tree), + NewIter = gb_trees:iterator(NewTree), + close_int(gb_trees:next(NewIter), {NewTree, dict:erase(DbName, Dict)}) + end. diff --git a/apps/couch/src/couch_server.erl b/apps/couch/src/couch_server.erl index c7850990..618fa76a 100644 --- a/apps/couch/src/couch_server.erl +++ b/apps/couch/src/couch_server.erl @@ -26,7 +26,8 @@ dbname_regexp, max_dbs_open=100, dbs_open=0, - start_time="" + start_time="", + lru = couch_lru:new() }). dev_start() -> @@ -47,19 +48,22 @@ open(DbName, Options) -> Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), case ets:lookup(couch_dbs, DbName) of [#db{fd=Fd, fd_monitor=Lock} = Db] when Lock =/= locked -> - ets:insert(couch_lru, {DbName, now()}), + update_lru(DbName), {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; _ -> Timeout = couch_util:get_value(timeout, Options, infinity), case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of {ok, #db{fd=Fd} = Db} -> - ets:insert(couch_lru, {DbName, now()}), + update_lru(DbName), {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; Error -> Error end end. +update_lru(DbName) -> + gen_server:cast(couch_server, {update_lru, DbName}). + close_lru() -> gen_server:call(couch_server, close_lru). @@ -134,7 +138,6 @@ init([]) -> "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end ), ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]), - ets:new(couch_lru, [set, public, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, @@ -180,49 +183,11 @@ all_databases(Prefix) -> maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server) when NumOpen < MaxOpen -> {ok, Server}; -maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) -> - % must free up the lru db. - case try_close_lru(now()) of - ok -> - {ok, Server#server{dbs_open=NumOpen - 1}}; - Error -> Error - end. - -find_oldest_db({DbName, Lru}, Acc) -> - erlang:min({Lru, DbName}, Acc). - -try_close_lru(StartTime) -> - case ets:foldl(fun find_oldest_db/2, {StartTime, nil}, couch_lru) of - {StartTime, nil} -> - {error, all_dbs_active}; - {_, DbName} -> - % There may exist an extremely small possibility of a race - % condition here, if a process could lookup the DB before the lock, - % but fail to monitor the fd before the is_idle check. - % - % If we do hit this race condition the behavior is that the process - % grabbing the database will end up inserting a value into the - % couch_lru table. Its possible that we end up picking that up - % as the DbName above to close. So we here we'll just remove the - % couch_lru entry and ignore it. - case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of - true -> - [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), - case couch_db:is_idle(Db) of true -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_lru, DbName), - exit(Pid, kill), - ok; - false -> - Update = {#db.fd_monitor, nil}, - true = ets:update_element(couch_dbs, DbName, Update), - true = ets:insert(couch_lru, {DbName, now()}), - try_close_lru(StartTime) - end; - false -> - true = ets:delete(couch_lru, DbName), - try_close_lru(StartTime) - end +maybe_close_lru_db(#server{dbs_open=NumOpen, lru=Lru}=Server) -> + try + {ok, Server#server{dbs_open = NumOpen - 1, lru = couch_lru:close(Lru)}} + catch error:all_dbs_active -> + {error, all_dbs_active} end. open_async(Server, From, DbName, Filepath, Options) -> @@ -241,12 +206,11 @@ open_async(Server, From, DbName, Filepath, Options) -> }), Server#server{dbs_open=Server#server.dbs_open + 1}. -handle_call(close_lru, _From, #server{dbs_open=N} = Server) -> - case try_close_lru(now()) of - ok -> - {reply, ok, Server#server{dbs_open = N-1}}; - Error -> - {reply, Error, Server} +handle_call(close_lru, _From, #server{dbs_open=N, lru=Lru} = Server) -> + try + {reply, ok, Server#server{dbs_open = N-1, lru = couch_lru:close(Lru)}} + catch error:all_dbs_active -> + {reply, {error, all_dbs_active}, Server} end; handle_call(open_dbs_count, _From, Server) -> {reply, Server#server.dbs_open, Server}; @@ -262,14 +226,13 @@ handle_call({open_result, DbName, {ok, Db}, Options}, _From, Server) -> [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName), [gen_server:reply(From, {ok, Db}) || From <- Froms], true = ets:insert(couch_dbs, Db), - true = ets:insert(couch_lru, {DbName, now()}), case lists:member(create, Options) of true -> couch_db_update_notifier:notify({created, DbName}); false -> ok end, - {reply, ok, Server}; + {reply, ok, Server#server{lru = couch_lru:insert(DbName, Server#server.lru)}}; handle_call({open_result, DbName, Error, _Options}, _From, Server) -> % icky hack of field values - compactor_pid used to store clients [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName), @@ -357,12 +320,12 @@ handle_call({delete, DbName, _Options}, _From, Server) -> Error -> {reply, Error, Server} end; -handle_call({db_updated, #db{name = DbName} = Db}, _From, Server) -> +handle_call({db_updated, #db{name = DbName} = Db}, _, #server{lru=Lru}=Server) -> true = ets:insert(couch_dbs, Db), - true = ets:insert(couch_lru, {DbName, now()}), - {reply, ok, Server}. - + {reply, ok, Server#server{lru = couch_lru:update(DbName, Lru)}}. +handle_cast({update_lru, DbName}, #server{lru = Lru} = Server) -> + {noreply, Server#server{lru = couch_lru:update(DbName, Lru)}}; handle_cast(Msg, Server) -> {stop, {unknown_cast_message, Msg}, Server}. -- cgit v1.2.3