diff options
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 224 |
1 files changed, 158 insertions, 66 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 3df7beb0..6660ee74 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -35,6 +35,15 @@ stop_fun }). +-record(qserver, { + langs, % Keyed by language name, value is {Mod,Func,Arg} + pid_procs, % Keyed by PID, valus is a #proc record. + lang_procs, % Keyed by language name, value is a #proc record + lang_limits, % Keyed by language name, value is {Lang, Limit, Current} + waitlist = [], + config +}). + start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). @@ -242,81 +251,97 @@ init([]) -> supervisor:terminate_child(couch_secondary_services, query_servers), [supervisor:restart_child(couch_secondary_services, query_servers)] end), + ok = couch_config:register( + fun("query_server_config" ++ _, _) -> + supervisor:terminate_child(couch_secondary_services, query_servers), + supervisor:restart_child(couch_secondary_services, query_servers) + end), Langs = ets:new(couch_query_server_langs, [set, private]), + LangLimits = ets:new(couch_query_server_lang_limits, [set, private]), PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), LangProcs = ets:new(couch_query_server_procs, [set, private]), + + ProcTimeout = list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000")), + ReduceLimit = list_to_atom( + couch_config:get("query_server_config","reduce_limit","true")), + OsProcLimit = list_to_integer( + couch_config:get("query_server_config","os_process_limit","10")), + % 'query_servers' specifies an OS command-line to execute. lists:foreach(fun({Lang, Command}) -> + true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}), true = ets:insert(Langs, {?l2b(Lang), couch_os_process, start_link, [Command]}) end, couch_config:get("query_servers")), % 'native_query_servers' specifies a {Module, Func, Arg} tuple. lists:foreach(fun({Lang, SpecStr}) -> {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), + true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit true = ets:insert(Langs, {?l2b(Lang), Mod, Fun, SpecArg}) end, couch_config:get("native_query_servers")), - process_flag(trap_exit, true), - {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} - PidProcs, % Keyed by PID, valus is a #proc record. - LangProcs % Keyed by language name, value is a #proc record - }}. -terminate(_Reason, {_Langs, PidProcs, _LangProcs}) -> + + process_flag(trap_exit, true), + {ok, #qserver{ + langs = Langs, % Keyed by language name, value is {Mod,Func,Arg} + pid_procs = PidProcs, % Keyed by PID, valus is a #proc record. + lang_procs = LangProcs, % Keyed by language name, value is a #proc record + lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current} + config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]} + }}. + +terminate(_Reason, #qserver{pid_procs=PidProcs}) -> [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)], ok. -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) -> - % Note to future self. Add max process limit. +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) -> Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), - case ets:lookup(LangProcs, Lang) of - [{Lang, [P|Rest]}] -> - % find a proc in the set that has the DDoc - {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]), - rem_from_list(LangProcs, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - _ -> - case (catch new_process(Langs, Lang)) of - {ok, Proc} -> - add_value(PidProcs, Proc#proc.pid, Proc), - {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]), - {reply, {ok, Proc2, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end + case lang_proc(Lang, Server, fun(Procs) -> + % find a proc in the set that has the DDoc + proc_with_ddoc(DDoc, DDocKey, Procs) + end) of + {ok, Proc} -> + {reply, {ok, Proc, Server#qserver.config}, Server}; + wait -> + {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)}; + Error -> + {reply, Error, Server} end; -handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) -> - % Note to future self. Add max process limit. - case ets:lookup(LangProcs, Lang) of - [{Lang, [Proc|_]}] -> - rem_from_list(LangProcs, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - _ -> - case (catch new_process(Langs, Lang)) of - {ok, Proc} -> - add_value(PidProcs, Proc#proc.pid, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end +handle_call({get_proc, Lang}, From, Server) -> + case lang_proc(Lang, Server, fun([P|_Procs]) -> + {ok, P} + end) of + {ok, Proc} -> + {reply, {ok, Proc, Server#qserver.config}, Server}; + wait -> + {noreply, add_to_waitlist({Lang}, From, Server)}; + Error -> + {reply, Error, Server} end; -handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) -> +handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) -> rem_value(PidProcs, Pid), unlink(Pid), {reply, ok, Server}; -handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) -> +handle_call({ret_proc, Proc}, _From, #qserver{ + pid_procs=PidProcs, + lang_procs=LangProcs}=Server) -> % Along with max process limit, here we should check % if we're over the limit and discard when we are. add_value(PidProcs, Proc#proc.pid, Proc), add_to_list(LangProcs, Proc#proc.lang, Proc), link(Proc#proc.pid), - {reply, true, Server}. + {reply, true, service_waitlist(Server)}. handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> +handle_info({'EXIT', Pid, Status}, #qserver{ + pid_procs=PidProcs, + lang_procs=LangProcs, + lang_limits=LangLimits}=Server) -> case ets:lookup(PidProcs, Pid) of [{Pid, Proc}] -> case Status of @@ -325,7 +350,9 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> end, rem_value(PidProcs, Pid), catch rem_from_list(LangProcs, Proc#proc.lang, Proc), - {noreply, Server}; + [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang), + true = ets:insert(LangLimits, {Lang, Lim, Current-1}), + {noreply, service_waitlist(Server)}; [] -> case Status of normal -> @@ -340,23 +367,90 @@ code_change(_OldVsn, State, _Extra) -> % Private API -get_query_server_config() -> - ReduceLimit = list_to_atom( - couch_config:get("query_server_config","reduce_limit","true")), - {[{<<"reduce_limit">>, ReduceLimit}]}. - -new_process(Langs, Lang) -> - case ets:lookup(Langs, Lang) of - [{Lang, Mod, Func, Arg}] -> - {ok, Pid} = apply(Mod, Func, Arg), - {ok, #proc{lang=Lang, - pid=Pid, - % Called via proc_prompt, proc_set_timeout, and proc_stop - prompt_fun={Mod, prompt}, - set_timeout_fun={Mod, set_timeout}, - stop_fun={Mod, stop}}}; +add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) -> + Server#qserver{waitlist=[{Info, From}|Waitlist]}. + +service_waitlist(#qserver{waitlist=[]}=Server) -> + Server; +service_waitlist(#qserver{waitlist=Waitlist}=Server) -> + [Oldest|RevWList] = lists:reverse(Waitlist), + case service_waiting(Oldest, Server) of + ok -> + Server#qserver{waitlist=lists:reverse(RevWList)}; + wait -> + Server#qserver{waitlist=Waitlist} + end. + +% todo get rid of duplication +service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) -> + Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + case lang_proc(Lang, Server, fun(Procs) -> + % find a proc in the set that has the DDoc + proc_with_ddoc(DDoc, DDocKey, Procs) + end) of + {ok, Proc} -> + gen_server:reply(From, {ok, Proc, Server#qserver.config}), + ok; + wait -> % this should never happen + wait; + Error -> + gen_server:reply(From, Error), + ok + end; +service_waiting({{Lang}, From}, Server) -> + case lang_proc(Lang, Server, fun([P|Procs]) -> + {ok, P} + end) of + {ok, Proc} -> + gen_server:reply(From, {ok, Proc, Server#qserver.config}), + ok; + wait -> % this should never happen + wait; + Error -> + gen_server:reply(From, Error), + ok + end. + +lang_proc(Lang, #qserver{ + langs=Langs, + pid_procs=PidProcs, + lang_procs=LangProcs, + lang_limits=LangLimits}, PickFun) -> + % Note to future self. Add max process limit. + case ets:lookup(LangProcs, Lang) of + [{Lang, [P|Procs]}] -> + {ok, Proc} = PickFun([P|Procs]), + rem_from_list(LangProcs, Lang, Proc), + {ok, Proc}; _ -> - {unknown_query_language, Lang} + case (catch new_process(Langs, LangLimits, Lang)) of + {ok, Proc} -> + add_value(PidProcs, Proc#proc.pid, Proc), + {ok, Proc2} = PickFun([Proc]); + ErrorOrWait -> + ErrorOrWait + end + end. + +new_process(Langs, LangLimits, Lang) -> + [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang), + if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit + % we are below the limit for our language, make a new one + case ets:lookup(Langs, Lang) of + [{Lang, Mod, Func, Arg}] -> + {ok, Pid} = apply(Mod, Func, Arg), + true = ets:insert(LangLimits, {Lang, Lim, Current+1}), + {ok, #proc{lang=Lang, + pid=Pid, + % Called via proc_prompt, proc_set_timeout, and proc_stop + prompt_fun={Mod, prompt}, + set_timeout_fun={Mod, set_timeout}, + stop_fun={Mod, stop}}}; + _ -> + {unknown_query_language, Lang} + end; + true -> + wait end. proc_with_ddoc(DDoc, DDocKey, LangProcs) -> @@ -402,12 +496,11 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> get_ddoc_process(#doc{} = DDoc, DDocKey) -> % remove this case statement case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of - {ok, Proc, QueryConfig} -> + {ok, Proc, {QueryConfig}} -> % process knows the ddoc - case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of + case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of true -> - proc_set_timeout(Proc, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), + proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), link(Proc#proc.pid), gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; @@ -421,11 +514,10 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) -> get_os_process(Lang) -> case gen_server:call(couch_query_servers, {get_proc, Lang}) of - {ok, Proc, QueryConfig} -> - case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of + {ok, Proc, {QueryConfig}} -> + case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of true -> - proc_set_timeout(Proc, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), + proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), link(Proc#proc.pid), gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; |