From 90c656fd3097272d1e574da3f5944d60b476e418 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 2 Dec 2011 16:19:44 -0500 Subject: Fork new OS processes outside proc_manager loop --- apps/couch/src/couch_proc_manager.erl | 62 +++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/apps/couch/src/couch_proc_manager.erl b/apps/couch/src/couch_proc_manager.erl index 509da9ba..0f35422b 100644 --- a/apps/couch/src/couch_proc_manager.erl +++ b/apps/couch/src/couch_proc_manager.erl @@ -3,7 +3,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([start_link/0, get_proc_count/0]). +-export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]). -include("couch_db.hrl"). @@ -25,34 +25,37 @@ handle_call(get_table, _From, State) -> handle_call(get_proc_count, _From, State) -> {reply, ets:info(State#state.tab, size), State}; -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, {Client, _}, State) -> +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), try get_procs(State#state.tab, Lang) of + [] -> + spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]), + {noreply, State}; Procs -> case proc_with_ddoc(DDoc, DDocKey, Procs) of {ok, Proc0} -> + Client = element(1, From), Proc = Proc0#proc{client = erlang:monitor(process, Client)}, ets:insert(State#state.tab, Proc), {reply, {ok, Proc, get_query_server_config()}, State}; {error, Reason} -> {reply, {error, Reason}, State} end - catch {unknown_query_language, _} -> - {reply, {unknown_query_language, Lang}, State}; - error:Reason -> + catch error:Reason -> ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), {reply, {error, Reason}, State} end; -handle_call({get_proc, Lang}, {Client, _}, State) -> +handle_call({get_proc, Lang}, {Client, _} = From, State) -> try get_procs(State#state.tab, Lang) of + [] -> + spawn_link(?MODULE, new_proc, [From, Lang]), + {noreply, State}; [Proc0|_] -> Proc = Proc0#proc{client = erlang:monitor(process, Client)}, ets:insert(State#state.tab, Proc), {reply, {ok, Proc, get_query_server_config()}, State} - catch {unknown_query_language, _} -> - {reply, {unknown_query_language, Lang}, State}; - error:Reason -> + catch error:Reason -> ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), {reply, {error, Reason}, State} end; @@ -74,6 +77,13 @@ handle_call(_Call, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) -> + link(Proc0#proc.pid), + Proc = Proc0#proc{client = erlang:monitor(process, Client)}, + gen_server:reply(From, {ok, Proc, get_query_server_config()}), + ets:insert(State#state.tab, Proc), + {noreply, State}; + handle_info({'EXIT', Pid, Reason}, State) -> ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]), ets:delete(State#state.tab, Pid), @@ -115,20 +125,37 @@ maybe_reuse_proc(Tab, #proc{pid = Pid} = Proc) -> get_procs(Tab, Lang) when is_binary(Lang) -> get_procs(Tab, binary_to_list(Lang)); get_procs(Tab, Lang) when is_list(Lang) -> - case ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}) of - [] -> - {ok, NewProc} = new_proc(Lang), % check OS process limit - [NewProc]; - Procs -> - Procs + ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}). + +new_proc(From, Lang) -> + case new_proc_int(From, Lang) of + {ok, Proc} -> + exit({ok, Proc, From}); + Error -> + gen_server:reply(From, {error, Error}) + end. + +new_proc(From, Lang, DDoc, DDocKey) -> + case new_proc_int(From, Lang) of + {ok, NewProc} -> + case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of + {ok, Proc} -> + exit({ok, Proc, From}); + {error, Reason} -> + gen_server:reply(From, {error, Reason}) + end; + Error -> + gen_server:reply(From, {error, Error}) end. -new_proc(Lang) when is_list(Lang) -> +new_proc_int(From, Lang) when is_binary(Lang) -> + new_proc_int(From, binary_to_list(Lang)); +new_proc_int(From, Lang) when is_list(Lang) -> case couch_config:get("query_servers", Lang) of undefined -> case couch_config:get("native_query_servers", Lang) of undefined -> - throw({unknown_query_language, Lang}); + gen_server:reply(From, {unknown_query_language, Lang}); SpecStr -> {ok, {M,F,A}} = couch_util:parse_term(SpecStr), {ok, Pid} = apply(M, F, A), @@ -147,6 +174,7 @@ make_proc(Pid, Lang, Mod) -> set_timeout_fun = {Mod, set_timeout}, stop_fun = {Mod, stop} }, + unlink(Pid), {ok, Proc}. get_query_server_config() -> -- cgit v1.2.3