summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_server.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_server.erl
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_server.erl')
-rw-r--r--apps/couch/src/couch_server.erl378
1 files changed, 180 insertions, 198 deletions
diff --git a/apps/couch/src/couch_server.erl b/apps/couch/src/couch_server.erl
index 43fd9044..b54771be 100644
--- a/apps/couch/src/couch_server.erl
+++ b/apps/couch/src/couch_server.erl
@@ -13,10 +13,11 @@
-module(couch_server).
-behaviour(gen_server).
--export([open/2,create/2,delete/2,all_databases/0,get_version/0]).
--export([init/1, handle_call/3,sup_start_link/0]).
+-export([open/2,create/2,delete/2,all_databases/0,all_databases/1]).
+-export([init/1, handle_call/3,sup_start_link/0,get_version/0]).
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
--export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
+-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0,config_change/4]).
+-export([close_lru/0]).
-include("couch_db.hrl").
@@ -50,15 +51,26 @@ get_stats() ->
sup_start_link() ->
gen_server:start_link({local, couch_server}, couch_server, [], []).
+
open(DbName, Options) ->
- case gen_server:call(couch_server, {open, DbName, Options}, infinity) of
- {ok, Db} ->
- Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
- {ok, Db#db{user_ctx=Ctx}};
- Error ->
- Error
+ Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
+ case ets:lookup(couch_dbs, DbName) of
+ [#db{fd=Fd, fd_monitor=Lock} = Db] when Lock =/= locked ->
+ ets:insert(couch_lru, {DbName, now()}),
+ {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ _ ->
+ case gen_server:call(couch_server, {open, DbName, Options}, infinity) of
+ {ok, #db{fd=Fd} = Db} ->
+ ets:insert(couch_lru, {DbName, now()}),
+ {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ Error ->
+ Error
+ end
end.
+close_lru() ->
+ gen_server:call(couch_server, close_lru).
+
create(DbName, Options) ->
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
{ok, Db} ->
@@ -121,28 +133,12 @@ init([]) ->
RootDir = couch_config:get("couchdb", "database_dir", "."),
MaxDbsOpen = list_to_integer(
couch_config:get("couchdb", "max_dbs_open")),
- Self = self(),
- ok = couch_config:register(
- fun("couchdb", "database_dir") ->
- exit(Self, config_change)
- end),
- ok = couch_config:register(
- fun("couchdb", "max_dbs_open", Max) ->
- gen_server:call(couch_server,
- {set_max_dbs_open, list_to_integer(Max)})
- end),
+ ok = couch_config:register(fun ?MODULE:config_change/4),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
- ok = couch_config:register(
- fun("admins", _Key, _Value, Persist) ->
- % spawn here so couch_config doesn't try to call itself
- spawn(fun() -> hash_admin_passwords(Persist) end)
- end, false),
- {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
- ets:new(couch_dbs_by_name, [set, private, named_table]),
- ets:new(couch_dbs_by_pid, [set, private, named_table]),
- ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]),
- ets:new(couch_sys_dbs, [set, private, named_table]),
+ {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\.]*$"),
+ ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]),
+ ets:new(couch_lru, [set, public, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
dbname_regexp=RegExp,
@@ -150,15 +146,27 @@ init([]) ->
start_time=httpd_util:rfc1123_date()}}.
terminate(_Reason, _Srv) ->
- [couch_util:shutdown_sync(Pid) || {_, {Pid, _LruTime}} <-
- ets:tab2list(couch_dbs_by_name)],
+ ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end,
+ nil, couch_dbs),
ok.
+config_change("couchdb", "database_dir", _, _) ->
+ exit(whereis(couch_server), config_change);
+config_change("couchdb", "max_dbs_open", Max, _) ->
+ gen_server:call(couch_server, {set_max_dbs_open, list_to_integer(Max)});
+config_change("admins", _, _, Persist) ->
+ % spawn here so couch_config doesn't try to call itself
+ spawn(fun() -> hash_admin_passwords(Persist) end).
+
all_databases() ->
+ all_databases("").
+
+all_databases(Prefix) ->
{ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
NormRoot = couch_util:normpath(Root),
Filenames =
- filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true,
+ filelib:fold_files(Root++Prefix, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$",
+ true,
fun(Filename, AccIn) ->
NormFilename = couch_util:normpath(Filename),
case NormFilename -- NormRoot of
@@ -181,172 +189,145 @@ maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) ->
Error -> Error
end.
+find_oldest_db({DbName, Lru}, Acc) ->
+ erlang:min({Lru, DbName}, Acc).
+
try_close_lru(StartTime) ->
- LruTime = get_lru(),
- if LruTime > StartTime ->
- % this means we've looped through all our opened dbs and found them
- % all in use.
+ case ets:foldl(fun find_oldest_db/2, {StartTime, nil}, couch_lru) of
+ {StartTime, nil} ->
{error, all_dbs_active};
- true ->
- [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
- [{_, {opened, MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName),
- case couch_db:is_idle(MainPid) of
- true ->
- couch_util:shutdown_sync(MainPid),
- true = ets:delete(couch_dbs_by_lru, LruTime),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, MainPid),
- true = ets:delete(couch_sys_dbs, DbName),
+ {_, DbName} ->
+ % There may exist an extremely small possibility of a race
+ % condition here, if a process could lookup the DB before the lock,
+ % but fail to monitor the fd before the is_idle check.
+ true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}),
+ [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+ case couch_db:is_idle(Db) of true ->
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ exit(Pid, kill),
ok;
false ->
- % this still has referrers. Go ahead and give it a current lru time
- % and try the next one in the table.
- NewLruTime = now(),
- true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, NewLruTime}}),
- true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
- true = ets:delete(couch_dbs_by_lru, LruTime),
- true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}),
+ true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+ true = ets:insert(couch_lru, {DbName, now()}),
try_close_lru(StartTime)
end
end.
-get_lru() ->
- get_lru(ets:first(couch_dbs_by_lru)).
-
-get_lru(LruTime) ->
- [{LruTime, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
- case ets:member(couch_sys_dbs, DbName) of
- false ->
- LruTime;
- true ->
- [{_, {opened, MainPid, _}}] = ets:lookup(couch_dbs_by_name, DbName),
- case couch_db:is_idle(MainPid) of
- true ->
- LruTime;
- false ->
- get_lru(ets:next(couch_dbs_by_lru, LruTime))
- end
- end.
-
open_async(Server, From, DbName, Filepath, Options) ->
Parent = self(),
+ put({async_open, DbName}, now()),
Opener = spawn_link(fun() ->
- Res = couch_db:start_link(DbName, Filepath, Options),
- gen_server:call(
- Parent, {open_result, DbName, Res, Options}, infinity
- ),
- unlink(Parent),
- case Res of
- {ok, DbReader} ->
- unlink(DbReader);
- _ ->
- ok
- end
- end),
- true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}),
- true = ets:insert(couch_dbs_by_pid, {Opener, DbName}),
- DbsOpen = case lists:member(sys_db, Options) of
- true ->
- true = ets:insert(couch_sys_dbs, {DbName, true}),
- Server#server.dbs_open;
- false ->
- Server#server.dbs_open + 1
- end,
- Server#server{dbs_open = DbsOpen}.
-
+ Res = couch_db:start_link(DbName, Filepath, Options),
+ gen_server:call(Parent, {open_result, DbName, Res}, infinity),
+ unlink(Parent)
+ end),
+ % icky hack of field values - compactor_pid used to store clients
+ true = ets:insert(couch_dbs, #db{
+ name = DbName,
+ main_pid = Opener,
+ compactor_pid = [From],
+ fd_monitor = locked
+ }),
+ Server#server{dbs_open=Server#server.dbs_open + 1}.
+
+handle_call(close_lru, _From, #server{dbs_open=N} = Server) ->
+ case try_close_lru(now()) of
+ ok ->
+ {reply, ok, Server#server{dbs_open = N-1}};
+ Error ->
+ {reply, Error, Server}
+ end;
+handle_call(open_dbs_count, _From, Server) ->
+ {reply, Server#server.dbs_open, Server};
+handle_call({set_dbname_regexp, RegExp}, _From, Server) ->
+ {reply, ok, Server#server{dbname_regexp=RegExp}};
handle_call({set_max_dbs_open, Max}, _From, Server) ->
{reply, ok, Server#server{max_dbs_open=Max}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
-handle_call({open_result, DbName, {ok, OpenedDbPid}, Options}, _From, Server) ->
- link(OpenedDbPid),
- [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName),
- lists:foreach(fun({FromPid,_}=From) ->
- gen_server:reply(From,
- catch couch_db:open_ref_counted(OpenedDbPid, FromPid))
- end, Froms),
- LruTime = now(),
- true = ets:insert(couch_dbs_by_name,
- {DbName, {opened, OpenedDbPid, LruTime}}),
- true = ets:delete(couch_dbs_by_pid, Opener),
- true = ets:insert(couch_dbs_by_pid, {OpenedDbPid, DbName}),
- true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
- case lists:member(create, Options) of
- true ->
- couch_db_update_notifier:notify({created, DbName});
- false ->
- ok
+handle_call({open_result, DbName, {ok, Db}}, _From, Server) ->
+ link(Db#db.main_pid),
+ case erase({async_open, DbName}) of undefined -> ok; T0 ->
+ ?LOG_INFO("needed ~p ms to open new ~s", [timer:now_diff(now(),T0)/1000,
+ DbName])
end,
+ % icky hack of field values - compactor_pid used to store clients
+ [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName),
+ [gen_server:reply(From, {ok, Db}) || From <- Froms],
+ true = ets:insert(couch_dbs, Db),
{reply, ok, Server};
-handle_call({open_result, DbName, Error, Options}, _From, Server) ->
- [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName),
- lists:foreach(fun(From) ->
- gen_server:reply(From, Error)
- end, Froms),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, Opener),
- DbsOpen = case lists:member(sys_db, Options) of
- true ->
- true = ets:delete(couch_sys_dbs, DbName),
- Server#server.dbs_open;
- false ->
- Server#server.dbs_open - 1
- end,
- {reply, ok, Server#server{dbs_open = DbsOpen}};
-handle_call({open, DbName, Options}, {FromPid,_}=From, Server) ->
- LruTime = now(),
- case ets:lookup(couch_dbs_by_name, DbName) of
+handle_call({open_result, DbName, Error}, _From, Server) ->
+ % icky hack of field values - compactor_pid used to store clients
+ [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName),
+ [gen_server:reply(From, Error) || From <- Froms],
+ ?LOG_INFO("open_result error ~p for ~s", [Error, DbName]),
+ true = ets:delete(couch_dbs, DbName),
+ {reply, ok, Server#server{dbs_open=Server#server.dbs_open - 1}};
+handle_call({open, DbName, Options}, From, Server) ->
+ case ets:lookup(couch_dbs, DbName) of
[] ->
- open_db(DbName, Server, Options, From);
- [{_, {opening, Opener, Froms}}] ->
- true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From|Froms]}}),
+ DbNameList = binary_to_list(DbName),
+ case check_dbname(Server, DbNameList) of
+ ok ->
+ case maybe_close_lru_db(Server) of
+ {ok, Server2} ->
+ Filepath = get_full_filename(Server, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Filepath, Options)};
+ CloseError ->
+ {reply, CloseError, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
+ end;
+ [#db{compactor_pid = Froms} = Db] when is_list(Froms) ->
+ % icky hack of field values - compactor_pid used to store clients
+ ?LOG_INFO("adding another listener to async open for ~s", [DbName]),
+ true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}),
{noreply, Server};
- [{_, {opened, MainPid, PrevLruTime}}] ->
- true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, LruTime}}),
- true = ets:delete(couch_dbs_by_lru, PrevLruTime),
- true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
- {reply, couch_db:open_ref_counted(MainPid, FromPid), Server}
+ [#db{} = Db] ->
+ {reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
- case ets:lookup(couch_dbs_by_name, DbName) of
- [] ->
- open_db(DbName, Server, [create | Options], From);
- [_AlreadyRunningDb] ->
- {reply, file_exists, Server}
+ DbNameList = binary_to_list(DbName),
+ case check_dbname(Server, DbNameList) of
+ ok ->
+ case ets:lookup(couch_dbs, DbName) of
+ [] ->
+ case maybe_close_lru_db(Server) of
+ {ok, Server2} ->
+ Filepath = get_full_filename(Server, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Filepath,
+ [create | Options])};
+ CloseError ->
+ {reply, CloseError, Server}
+ end;
+ [_AlreadyRunningDb] ->
+ {reply, file_exists, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
end;
handle_call({delete, DbName, _Options}, _From, Server) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
FullFilepath = get_full_filename(Server, DbNameList),
- UpdateState =
- case ets:lookup(couch_dbs_by_name, DbName) of
- [] -> false;
- [{_, {opening, Pid, Froms}}] ->
- couch_util:shutdown_sync(Pid),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, Pid),
- [gen_server:send_result(F, not_found) || F <- Froms],
- true;
- [{_, {opened, Pid, LruTime}}] ->
- couch_util:shutdown_sync(Pid),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, Pid),
- true = ets:delete(couch_dbs_by_lru, LruTime),
- true
- end,
- Server2 = case UpdateState of
- true ->
- DbsOpen = case ets:member(couch_sys_dbs, DbName) of
- true ->
- true = ets:delete(couch_sys_dbs, DbName),
- Server#server.dbs_open;
- false ->
- Server#server.dbs_open - 1
- end,
- Server#server{dbs_open = DbsOpen};
- false ->
- Server
+ Server2 =
+ case ets:lookup(couch_dbs, DbName) of
+ [] -> Server;
+ [#db{main_pid=Pid, compactor_pid=Froms}] when is_list(Froms) ->
+ % icky hack of field values - compactor_pid used to store clients
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ exit(Pid, kill),
+ [gen_server:reply(F, not_found) || F <- Froms],
+ Server#server{dbs_open=Server#server.dbs_open - 1};
+ [#db{main_pid=Pid}] ->
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ exit(Pid, kill),
+ Server#server{dbs_open=Server#server.dbs_open - 1}
end,
%% Delete any leftover .compact files. If we don't do this a subsequent
@@ -364,36 +345,37 @@ handle_call({delete, DbName, _Options}, _From, Server) ->
end;
Error ->
{reply, Error, Server}
- end.
+ end;
+handle_call({db_updated, Db}, _From, Server) ->
+ ets:insert(couch_dbs, Db),
+ {reply, ok, Server}.
+
-handle_cast(Msg, _Server) ->
- exit({unknown_cast_message, Msg}).
+handle_cast(Msg, Server) ->
+ {stop, {unknown_cast_message, Msg}, Server}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-handle_info({'EXIT', _Pid, config_change}, Server) ->
- {noreply, shutdown, Server};
-handle_info(Error, _Server) ->
- ?LOG_ERROR("Unexpected message, restarting couch_server: ~p", [Error]),
- exit(kill).
-open_db(DbName, Server, Options, From) ->
- DbNameList = binary_to_list(DbName),
- case check_dbname(Server, DbNameList) of
- ok ->
- Filepath = get_full_filename(Server, DbNameList),
- case lists:member(sys_db, Options) of
+handle_info({'EXIT', _Pid, config_change}, Server) ->
+ {stop, config_change, Server};
+handle_info({'EXIT', Pid, Reason}, #server{dbs_open=DbsOpen}=Server) ->
+ Match = erlang:make_tuple(tuple_size(#db{}), '_', [{1, db},
+ {#db.main_pid, Pid}]),
+ case ets:match_object(couch_dbs, Match) of
+ [#db{name = DbName, compactor_pid=Froms}] ->
+ ?LOG_INFO("db ~s died with reason ~p", [DbName, Reason]),
+ % icky hack of field values - compactor_pid used to store clients
+ if is_list(Froms) ->
+ [gen_server:reply(From, Reason) || From <- Froms];
true ->
- {noreply, open_async(Server, From, DbName, Filepath, Options)};
- false ->
- case maybe_close_lru_db(Server) of
- {ok, Server2} ->
- {noreply, open_async(Server2, From, DbName, Filepath, Options)};
- CloseError ->
- {reply, CloseError, Server}
- end
- end;
- Error ->
- {reply, Error, Server}
- end.
+ ok
+ end,
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ {noreply, Server#server{dbs_open=DbsOpen - 1}};
+ [] ->
+ {noreply, Server}
+ end;
+handle_info(Info, Server) ->
+ {stop, {unknown_message, Info}, Server}.