summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2011-12-02 16:19:44 -0500
committerRobert Newson <robert.newson@cloudant.com>2012-11-14 17:24:21 +0000
commit90c656fd3097272d1e574da3f5944d60b476e418 (patch)
tree42fb22027e35025d0d89a7102ff6fb27ca2fbc71
parent7bd071836a08722cb14479261507aec1d071e2d2 (diff)
Fork new OS processes outside proc_manager loop
-rw-r--r--apps/couch/src/couch_proc_manager.erl62
1 files changed, 45 insertions, 17 deletions
diff --git a/apps/couch/src/couch_proc_manager.erl b/apps/couch/src/couch_proc_manager.erl
index 509da9ba..0f35422b 100644
--- a/apps/couch/src/couch_proc_manager.erl
+++ b/apps/couch/src/couch_proc_manager.erl
@@ -3,7 +3,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start_link/0, get_proc_count/0]).
+-export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]).
-include("couch_db.hrl").
@@ -25,34 +25,37 @@ handle_call(get_table, _From, State) ->
handle_call(get_proc_count, _From, State) ->
{reply, ets:info(State#state.tab, size), State};
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, {Client, _}, State) ->
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
try get_procs(State#state.tab, Lang) of
+ [] ->
+ spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]),
+ {noreply, State};
Procs ->
case proc_with_ddoc(DDoc, DDocKey, Procs) of
{ok, Proc0} ->
+ Client = element(1, From),
Proc = Proc0#proc{client = erlang:monitor(process, Client)},
ets:insert(State#state.tab, Proc),
{reply, {ok, Proc, get_query_server_config()}, State};
{error, Reason} ->
{reply, {error, Reason}, State}
end
- catch {unknown_query_language, _} ->
- {reply, {unknown_query_language, Lang}, State};
- error:Reason ->
+ catch error:Reason ->
?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
{reply, {error, Reason}, State}
end;
-handle_call({get_proc, Lang}, {Client, _}, State) ->
+handle_call({get_proc, Lang}, {Client, _} = From, State) ->
try get_procs(State#state.tab, Lang) of
+ [] ->
+ spawn_link(?MODULE, new_proc, [From, Lang]),
+ {noreply, State};
[Proc0|_] ->
Proc = Proc0#proc{client = erlang:monitor(process, Client)},
ets:insert(State#state.tab, Proc),
{reply, {ok, Proc, get_query_server_config()}, State}
- catch {unknown_query_language, _} ->
- {reply, {unknown_query_language, Lang}, State};
- error:Reason ->
+ catch error:Reason ->
?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
{reply, {error, Reason}, State}
end;
@@ -74,6 +77,13 @@ handle_call(_Call, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) ->
+ link(Proc0#proc.pid),
+ Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+ gen_server:reply(From, {ok, Proc, get_query_server_config()}),
+ ets:insert(State#state.tab, Proc),
+ {noreply, State};
+
handle_info({'EXIT', Pid, Reason}, State) ->
?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
ets:delete(State#state.tab, Pid),
@@ -115,20 +125,37 @@ maybe_reuse_proc(Tab, #proc{pid = Pid} = Proc) ->
get_procs(Tab, Lang) when is_binary(Lang) ->
get_procs(Tab, binary_to_list(Lang));
get_procs(Tab, Lang) when is_list(Lang) ->
- case ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}) of
- [] ->
- {ok, NewProc} = new_proc(Lang), % check OS process limit
- [NewProc];
- Procs ->
- Procs
+ ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}).
+
+new_proc(From, Lang) ->
+ case new_proc_int(From, Lang) of
+ {ok, Proc} ->
+ exit({ok, Proc, From});
+ Error ->
+ gen_server:reply(From, {error, Error})
+ end.
+
+new_proc(From, Lang, DDoc, DDocKey) ->
+ case new_proc_int(From, Lang) of
+ {ok, NewProc} ->
+ case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
+ {ok, Proc} ->
+ exit({ok, Proc, From});
+ {error, Reason} ->
+ gen_server:reply(From, {error, Reason})
+ end;
+ Error ->
+ gen_server:reply(From, {error, Error})
end.
-new_proc(Lang) when is_list(Lang) ->
+new_proc_int(From, Lang) when is_binary(Lang) ->
+ new_proc_int(From, binary_to_list(Lang));
+new_proc_int(From, Lang) when is_list(Lang) ->
case couch_config:get("query_servers", Lang) of
undefined ->
case couch_config:get("native_query_servers", Lang) of
undefined ->
- throw({unknown_query_language, Lang});
+ gen_server:reply(From, {unknown_query_language, Lang});
SpecStr ->
{ok, {M,F,A}} = couch_util:parse_term(SpecStr),
{ok, Pid} = apply(M, F, A),
@@ -147,6 +174,7 @@ make_proc(Pid, Lang, Mod) ->
set_timeout_fun = {Mod, set_timeout},
stop_fun = {Mod, stop}
},
+ unlink(Pid),
{ok, Proc}.
get_query_server_config() ->