summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_proc_manager.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2011-01-06 19:52:58 -0500
committerAdam Kocoloski <adam@cloudant.com>2011-01-06 19:52:58 -0500
commita5db6a8aedaf3e696b463023f2c6f7acf5ac5b57 (patch)
treedab6be2a9e2b7e097e446ebad6eb41b1a0e4e6f8 /apps/couch/src/couch_proc_manager.erl
parent0ddca37246a5541b38a266809fb77cfeeeb174f7 (diff)
Refactor OS process management
Squashed commit of the following: commit a9cd9681f6c88f0f3c019e98e2edfef55cad0129 commit eb38bca08ffbf778b69fbb2d612e23733af82ff5 commit 98a03a079ab24f2c7bd9e0d6d7fac5fa62bfd4eb commit 9b8ec059165d981e4cd743008ecdf393a4f37f61 commit 3a891c1dd9a17fdd267c423b340dd09c31c89d7a commit 68351dd181c8a92b5baa9ac23f25c7c191484394 commit e4384a517e2efeac9231701898a6c67213642319 commit cd954661422d0ef146b5bd7792f835dcc4220c84 commit 3bcca92c7c0102d5722dfc6b2c332766cfe0370c commit 82d15f40f503b2609cf785ce2837e1280edaaa43 commit 70051abbd699e076452d772587c32ee5e09bdcbc commit 7f01d37781e7774015f6cb34f795b28db9ecc9f5 BugzID: 11572 See also COUCHDB-901 A new config setting is introduced. The following block controls the maximum number of OS processes that will be reused. Additional OS processes will still be spawned on-demand, but they'll be terminated when the clients are through with them. [query_server_config] os_process_soft_limit = 100
Diffstat (limited to 'apps/couch/src/couch_proc_manager.erl')
-rw-r--r--apps/couch/src/couch_proc_manager.erl186
1 files changed, 186 insertions, 0 deletions
diff --git a/apps/couch/src/couch_proc_manager.erl b/apps/couch/src/couch_proc_manager.erl
new file mode 100644
index 00000000..ca2e8ac5
--- /dev/null
+++ b/apps/couch/src/couch_proc_manager.erl
@@ -0,0 +1,186 @@
+-module(couch_proc_manager).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_proc_count/0]).
+
+-include("couch_db.hrl").
+
+-record(state, {tab}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_proc_count() ->
+ gen_server:call(?MODULE, get_proc_count).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, #state{tab = ets:new(procs, [{keypos, #proc.pid}])}}.
+
+handle_call(get_table, _From, State) ->
+ {reply, State#state.tab, State};
+
+handle_call(get_proc_count, _From, State) ->
+ {reply, ets:info(State#state.tab, size), State};
+
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, {Client, _}, State) ->
+ Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ try get_procs(State#state.tab, Lang) of
+ Procs ->
+ case proc_with_ddoc(DDoc, DDocKey, Procs) of
+ {ok, Proc0} ->
+ Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+ ets:insert(State#state.tab, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, State};
+ {error, Reason} ->
+ {reply, {error, Reason}, State}
+ end
+ catch {unknown_query_language, _} ->
+ {reply, {unknown_query_language, Lang}, State};
+ error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+
+handle_call({get_proc, Lang}, {Client, _}, State) ->
+ try get_procs(State#state.tab, Lang) of
+ [Proc0|_] ->
+ Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+ ets:insert(State#state.tab, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, State}
+ catch {unknown_query_language, _} ->
+ {reply, {unknown_query_language, Lang}, State};
+ error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+
+handle_call({ret_proc, #proc{client=Ref, pid=Pid} = Proc}, _From, State) ->
+ erlang:demonitor(Ref, [flush]),
+ % We need to check if the process is alive here, as the client could be
+ % handing us a #proc{} with a dead one. We would have already removed the
+ % #proc{} from our own table, so the alternative is to do a lookup in the
+ % table before the insert. Don't know which approach is cheaper.
+ case is_process_alive(Pid) of true ->
+ maybe_reuse_proc(State#state.tab, Proc);
+ false -> ok end,
+ {reply, true, State};
+
+handle_call(_Call, _From, State) ->
+ {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
+ ets:delete(State#state.tab, Pid),
+ {noreply, State};
+
+handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
+ case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of
+ [] ->
+ ok;
+ [#proc{pid = Pid} = Proc] ->
+ case is_process_alive(Pid) of true ->
+ maybe_reuse_proc(State#state.tab, Proc);
+ false -> ok end
+ end,
+ {noreply, State};
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{tab=Tab}) ->
+ ets:foldl(fun(#proc{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+maybe_reuse_proc(Tab, #proc{pid = Pid} = Proc) ->
+ Limit = couch_config:get("query_server_config", "os_process_soft_limit", "100"),
+ case ets:info(Tab, size) > list_to_integer(Limit) of
+ true ->
+ ets:delete(Tab, Pid),
+ unlink(Pid),
+ exit(Pid, kill);
+ false ->
+ garbage_collect(Pid),
+ ets:insert(Tab, Proc#proc{client=nil})
+ end.
+
+get_procs(Tab, Lang) when is_binary(Lang) ->
+ get_procs(Tab, binary_to_list(Lang));
+get_procs(Tab, Lang) when is_list(Lang) ->
+ case ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}) of
+ [] ->
+ {ok, NewProc} = new_proc(Lang), % check OS process limit
+ [NewProc];
+ Procs ->
+ Procs
+ end.
+
+new_proc(Lang) when is_binary(Lang) ->
+ new_proc(binary_to_list(Lang));
+new_proc(Lang) when is_list(Lang) ->
+ case couch_config:get("query_servers", Lang) of
+ undefined ->
+ case couch_config:get("native_query_servers", Lang) of
+ undefined ->
+ throw({unknown_query_language, Lang});
+ SpecStr ->
+ {ok, {M,F,A}} = couch_util:parse_term(SpecStr),
+ {ok, Pid} = apply(M, F, A),
+ make_proc(Pid, Lang, M)
+ end;
+ Command ->
+ {ok, Pid} = couch_os_process:start_link(Command),
+ make_proc(Pid, Lang, couch_os_process)
+ end.
+
+make_proc(Pid, Lang, Mod) ->
+ Proc = #proc{
+ lang = Lang,
+ pid = Pid,
+ prompt_fun = {Mod, prompt},
+ set_timeout_fun = {Mod, set_timeout},
+ stop_fun = {Mod, stop}
+ },
+ {ok, Proc}.
+
+get_query_server_config() ->
+ Limit = couch_config:get("query_server_config", "reduce_limit", "true"),
+ {[{<<"reduce_limit">>, list_to_atom(Limit)}]}.
+
+proc_with_ddoc(DDoc, DDocKey, Procs) ->
+ Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
+ case lists:dropwhile(Filter, Procs) of
+ [DDocProc|_] ->
+ {ok, DDocProc};
+ [] ->
+ teach_any_proc(DDoc, DDocKey, Procs)
+ end.
+
+teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
+ try
+ teach_ddoc(DDoc, DDocKey, Proc)
+ catch _:_ ->
+ teach_any_proc(DDoc, DDocKey, Rest)
+ end;
+teach_any_proc(_, _, []) ->
+ {error, noproc}.
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+ % send ddoc over the wire
+ % we only share the rev with the client we know to update code
+ % but it only keeps the latest copy, per each ddoc, around.
+ true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
+ DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ % we should remove any other ddocs keys for this docid
+ % because the query server overwrites without the rev
+ Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ % add ddoc to the proc
+ {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.