summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_query_servers.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2011-01-06 19:52:58 -0500
committerAdam Kocoloski <adam@cloudant.com>2011-01-06 19:52:58 -0500
commita5db6a8aedaf3e696b463023f2c6f7acf5ac5b57 (patch)
treedab6be2a9e2b7e097e446ebad6eb41b1a0e4e6f8 /apps/couch/src/couch_query_servers.erl
parent0ddca37246a5541b38a266809fb77cfeeeb174f7 (diff)
Refactor OS process management
Squashed commit of the following: commit a9cd9681f6c88f0f3c019e98e2edfef55cad0129 commit eb38bca08ffbf778b69fbb2d612e23733af82ff5 commit 98a03a079ab24f2c7bd9e0d6d7fac5fa62bfd4eb commit 9b8ec059165d981e4cd743008ecdf393a4f37f61 commit 3a891c1dd9a17fdd267c423b340dd09c31c89d7a commit 68351dd181c8a92b5baa9ac23f25c7c191484394 commit e4384a517e2efeac9231701898a6c67213642319 commit cd954661422d0ef146b5bd7792f835dcc4220c84 commit 3bcca92c7c0102d5722dfc6b2c332766cfe0370c commit 82d15f40f503b2609cf785ce2837e1280edaaa43 commit 70051abbd699e076452d772587c32ee5e09bdcbc commit 7f01d37781e7774015f6cb34f795b28db9ecc9f5 BugzID: 11572 See also COUCHDB-901 A new config setting is introduced. The following block controls the maximum number of OS processes that will be reused. Additional OS processes will still be spawned on-demand, but they'll be terminated when the clients are through with them. [query_server_config] os_process_soft_limit = 100
Diffstat (limited to 'apps/couch/src/couch_query_servers.erl')
-rw-r--r--apps/couch/src/couch_query_servers.erl214
1 files changed, 3 insertions, 211 deletions
diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl
index a4850e86..4e86dcf4 100644
--- a/apps/couch/src/couch_query_servers.erl
+++ b/apps/couch/src/couch_query_servers.erl
@@ -11,11 +11,7 @@
% the License.
-module(couch_query_servers).
--behaviour(gen_server).
--export([start_link/0, config_change/1]).
-
--export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).
@@ -27,18 +23,6 @@
-include("couch_db.hrl").
--record(proc, {
- pid,
- lang,
- ddoc_keys = [],
- prompt_fun,
- set_timeout_fun,
- stop_fun
-}).
-
-start_link() ->
- gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
-
start_doc_map(Lang, Functions) ->
Proc = get_os_process(Lang),
lists:foreach(fun(FunctionSource) ->
@@ -223,151 +207,6 @@ with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
ok = ret_os_process(Proc)
end.
-init([]) ->
- % register async to avoid deadlock on restart_child
- Self = self(),
- spawn(couch_config, register, [fun ?MODULE:config_change/1, Self]),
-
- Langs = ets:new(couch_query_server_langs, [set, private]),
- PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
- LangProcs = ets:new(couch_query_server_procs, [set, private]),
- % 'query_servers' specifies an OS command-line to execute.
- lists:foreach(fun({Lang, Command}) ->
- true = ets:insert(Langs, {?l2b(Lang),
- couch_os_process, start_link, [Command]})
- end, couch_config:get("query_servers")),
- % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
- lists:foreach(fun({Lang, SpecStr}) ->
- {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
- true = ets:insert(Langs, {?l2b(Lang),
- Mod, Fun, SpecArg})
- end, couch_config:get("native_query_servers")),
- process_flag(trap_exit, true),
- {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
- PidProcs, % Keyed by PID, valus is a #proc record.
- LangProcs % Keyed by language name, value is a #proc record
- }}.
-
-terminate(_Reason, {_Langs, PidProcs, _LangProcs}) ->
- [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
- ok.
-
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) ->
- % Note to future self. Add max process limit.
- Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
- case ets:lookup(LangProcs, Lang) of
- [{Lang, [P|Rest]}] ->
- % find a proc in the set that has the DDoc
- {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]),
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- _ ->
- case (catch new_process(Langs, Lang)) of
- {ok, Proc} ->
- add_value(PidProcs, Proc#proc.pid, Proc),
- {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]),
- {reply, {ok, Proc2, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end
- end;
-handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) ->
- % Note to future self. Add max process limit.
- case ets:lookup(LangProcs, Lang) of
- [{Lang, [Proc|_]}] ->
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- _ ->
- case (catch new_process(Langs, Lang)) of
- {ok, Proc} ->
- add_value(PidProcs, Proc#proc.pid, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end
- end;
-handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) ->
- rem_value(PidProcs, Pid),
- unlink(Pid),
- {reply, ok, Server};
-handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) ->
- % Along with max process limit, here we should check
- % if we're over the limit and discard when we are.
- add_value(PidProcs, Proc#proc.pid, Proc),
- add_to_list(LangProcs, Proc#proc.lang, Proc),
- link(Proc#proc.pid),
- {reply, true, Server}.
-
-handle_cast(_Whatever, Server) ->
- {noreply, Server}.
-
-handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
- case ets:lookup(PidProcs, Pid) of
- [{Pid, Proc}] ->
- case Status of
- normal -> ok;
- _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
- end,
- rem_value(PidProcs, Pid),
- catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
- {noreply, Server};
- [] ->
- case Status of
- normal ->
- {noreply, Server};
- _ ->
- {stop, Status, Server}
- end
- end.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-config_change("query_servers") ->
- supervisor:terminate_child(couch_secondary_services, query_servers),
- supervisor:restart_child(couch_secondary_services, query_servers);
-config_change("native_query_servers") ->
- supervisor:terminate_child(couch_secondary_services, query_servers),
- supervisor:restart_child(couch_secondary_services, query_servers).
-
-% Private API
-
-get_query_server_config() ->
- ReduceLimit = list_to_atom(
- couch_config:get("query_server_config","reduce_limit","true")),
- {[{<<"reduce_limit">>, ReduceLimit}]}.
-
-new_process(Langs, Lang) ->
- case ets:lookup(Langs, Lang) of
- [{Lang, Mod, Func, Arg}] ->
- {ok, Pid} = apply(Mod, Func, Arg),
- {ok, #proc{lang=Lang,
- pid=Pid,
- % Called via proc_prompt, proc_set_timeout, and proc_stop
- prompt_fun={Mod, prompt},
- set_timeout_fun={Mod, set_timeout},
- stop_fun={Mod, stop}}};
- _ ->
- {unknown_query_language, Lang}
- end.
-
-proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
- DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) ->
- lists:any(fun(Key) ->
- Key == DDocKey
- end, Keys)
- end, LangProcs),
- case DDocProcs of
- [DDocProc|_] ->
- ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]),
- {ok, DDocProc};
- [] ->
- [TeachProc|_] = LangProcs,
- ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]),
- {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc),
- {ok, SmartProc}
- end.
-
proc_prompt(Proc, Args) ->
{Mod, Func} = Proc#proc.prompt_fun,
apply(Mod, Func, [Proc#proc.pid, Args]).
@@ -380,28 +219,15 @@ proc_set_timeout(Proc, Timeout) ->
{Mod, Func} = Proc#proc.set_timeout_fun,
apply(Mod, Func, [Proc#proc.pid, Timeout]).
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
- % send ddoc over the wire
- % we only share the rev with the client we know to update code
- % but it only keeps the latest copy, per each ddoc, around.
- true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
- % we should remove any other ddocs keys for this docid
- % because the query server overwrites without the rev
- Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
- % add ddoc to the proc
- {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
-
get_ddoc_process(#doc{} = DDoc, DDocKey) ->
% remove this case statement
- case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
+ case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}) of
{ok, Proc, QueryConfig} ->
% process knows the ddoc
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
- link(Proc#proc.pid),
- gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
_ ->
catch proc_stop(Proc),
@@ -412,14 +238,12 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->
end.
get_os_process(Lang) ->
- case gen_server:call(couch_query_servers, {get_proc, Lang}) of
+ case gen_server:call(couch_proc_manager, {get_proc, Lang}) of
{ok, Proc, QueryConfig} ->
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
- link(Proc#proc.pid),
- gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
_ ->
catch proc_stop(Proc),
@@ -430,38 +254,6 @@ get_os_process(Lang) ->
end.
ret_os_process(Proc) ->
- true = gen_server:call(couch_query_servers, {ret_proc, Proc}),
+ true = gen_server:call(couch_proc_manager, {ret_proc, Proc}),
catch unlink(Proc#proc.pid),
ok.
-
-add_value(Tid, Key, Value) ->
- true = ets:insert(Tid, {Key, Value}).
-
-rem_value(Tid, Key) ->
- true = ets:delete(Tid, Key).
-
-add_to_list(Tid, Key, Value) ->
- case ets:lookup(Tid, Key) of
- [{Key, Vals}] ->
- true = ets:insert(Tid, {Key, [Value|Vals]});
- [] ->
- true = ets:insert(Tid, {Key, [Value]})
- end.
-
-rem_from_list(Tid, Key, Value) when is_record(Value, proc)->
- Pid = Value#proc.pid,
- case ets:lookup(Tid, Key) of
- [{Key, Vals}] ->
- % make a new values list that doesn't include the Value arg
- NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid],
- ets:insert(Tid, {Key, NewValues});
- [] -> ok
- end;
-rem_from_list(Tid, Key, Value) ->
- case ets:lookup(Tid, Key) of
- [{Key, Vals}] ->
- % make a new values list that doesn't include the Value arg
- NewValues = [Val || Val <- Vals, Val /= Value],
- ets:insert(Tid, {Key, NewValues});
- [] -> ok
- end.