summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl224
1 files changed, 158 insertions, 66 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 3df7beb0..6660ee74 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -35,6 +35,15 @@
stop_fun
}).
+-record(qserver, {
+ langs, % Keyed by language name, value is {Mod,Func,Arg}
+ pid_procs, % Keyed by PID, valus is a #proc record.
+ lang_procs, % Keyed by language name, value is a #proc record
+ lang_limits, % Keyed by language name, value is {Lang, Limit, Current}
+ waitlist = [],
+ config
+}).
+
start_link() ->
gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
@@ -242,81 +251,97 @@ init([]) ->
supervisor:terminate_child(couch_secondary_services, query_servers),
[supervisor:restart_child(couch_secondary_services, query_servers)]
end),
+ ok = couch_config:register(
+ fun("query_server_config" ++ _, _) ->
+ supervisor:terminate_child(couch_secondary_services, query_servers),
+ supervisor:restart_child(couch_secondary_services, query_servers)
+ end),
Langs = ets:new(couch_query_server_langs, [set, private]),
+ LangLimits = ets:new(couch_query_server_lang_limits, [set, private]),
PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
LangProcs = ets:new(couch_query_server_procs, [set, private]),
+
+ ProcTimeout = list_to_integer(couch_config:get(
+ "couchdb", "os_process_timeout", "5000")),
+ ReduceLimit = list_to_atom(
+ couch_config:get("query_server_config","reduce_limit","true")),
+ OsProcLimit = list_to_integer(
+ couch_config:get("query_server_config","os_process_limit","10")),
+
% 'query_servers' specifies an OS command-line to execute.
lists:foreach(fun({Lang, Command}) ->
+ true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}),
true = ets:insert(Langs, {?l2b(Lang),
couch_os_process, start_link, [Command]})
end, couch_config:get("query_servers")),
% 'native_query_servers' specifies a {Module, Func, Arg} tuple.
lists:foreach(fun({Lang, SpecStr}) ->
{ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
+ true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit
true = ets:insert(Langs, {?l2b(Lang),
Mod, Fun, SpecArg})
end, couch_config:get("native_query_servers")),
- process_flag(trap_exit, true),
- {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
- PidProcs, % Keyed by PID, valus is a #proc record.
- LangProcs % Keyed by language name, value is a #proc record
- }}.
-terminate(_Reason, {_Langs, PidProcs, _LangProcs}) ->
+
+ process_flag(trap_exit, true),
+ {ok, #qserver{
+ langs = Langs, % Keyed by language name, value is {Mod,Func,Arg}
+ pid_procs = PidProcs, % Keyed by PID, valus is a #proc record.
+ lang_procs = LangProcs, % Keyed by language name, value is a #proc record
+ lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current}
+ config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]}
+ }}.
+
+terminate(_Reason, #qserver{pid_procs=PidProcs}) ->
[couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
ok.
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) ->
- % Note to future self. Add max process limit.
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) ->
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
- case ets:lookup(LangProcs, Lang) of
- [{Lang, [P|Rest]}] ->
- % find a proc in the set that has the DDoc
- {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]),
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- _ ->
- case (catch new_process(Langs, Lang)) of
- {ok, Proc} ->
- add_value(PidProcs, Proc#proc.pid, Proc),
- {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]),
- {reply, {ok, Proc2, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end
+ case lang_proc(Lang, Server, fun(Procs) ->
+ % find a proc in the set that has the DDoc
+ proc_with_ddoc(DDoc, DDocKey, Procs)
+ end) of
+ {ok, Proc} ->
+ {reply, {ok, Proc, Server#qserver.config}, Server};
+ wait ->
+ {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)};
+ Error ->
+ {reply, Error, Server}
end;
-handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) ->
- % Note to future self. Add max process limit.
- case ets:lookup(LangProcs, Lang) of
- [{Lang, [Proc|_]}] ->
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- _ ->
- case (catch new_process(Langs, Lang)) of
- {ok, Proc} ->
- add_value(PidProcs, Proc#proc.pid, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end
+handle_call({get_proc, Lang}, From, Server) ->
+ case lang_proc(Lang, Server, fun([P|_Procs]) ->
+ {ok, P}
+ end) of
+ {ok, Proc} ->
+ {reply, {ok, Proc, Server#qserver.config}, Server};
+ wait ->
+ {noreply, add_to_waitlist({Lang}, From, Server)};
+ Error ->
+ {reply, Error, Server}
end;
-handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) ->
+handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) ->
rem_value(PidProcs, Pid),
unlink(Pid),
{reply, ok, Server};
-handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) ->
+handle_call({ret_proc, Proc}, _From, #qserver{
+ pid_procs=PidProcs,
+ lang_procs=LangProcs}=Server) ->
% Along with max process limit, here we should check
% if we're over the limit and discard when we are.
add_value(PidProcs, Proc#proc.pid, Proc),
add_to_list(LangProcs, Proc#proc.lang, Proc),
link(Proc#proc.pid),
- {reply, true, Server}.
+ {reply, true, service_waitlist(Server)}.
handle_cast(_Whatever, Server) ->
{noreply, Server}.
-handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
+handle_info({'EXIT', Pid, Status}, #qserver{
+ pid_procs=PidProcs,
+ lang_procs=LangProcs,
+ lang_limits=LangLimits}=Server) ->
case ets:lookup(PidProcs, Pid) of
[{Pid, Proc}] ->
case Status of
@@ -325,7 +350,9 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
end,
rem_value(PidProcs, Pid),
catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
- {noreply, Server};
+ [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang),
+ true = ets:insert(LangLimits, {Lang, Lim, Current-1}),
+ {noreply, service_waitlist(Server)};
[] ->
case Status of
normal ->
@@ -340,23 +367,90 @@ code_change(_OldVsn, State, _Extra) ->
% Private API
-get_query_server_config() ->
- ReduceLimit = list_to_atom(
- couch_config:get("query_server_config","reduce_limit","true")),
- {[{<<"reduce_limit">>, ReduceLimit}]}.
-
-new_process(Langs, Lang) ->
- case ets:lookup(Langs, Lang) of
- [{Lang, Mod, Func, Arg}] ->
- {ok, Pid} = apply(Mod, Func, Arg),
- {ok, #proc{lang=Lang,
- pid=Pid,
- % Called via proc_prompt, proc_set_timeout, and proc_stop
- prompt_fun={Mod, prompt},
- set_timeout_fun={Mod, set_timeout},
- stop_fun={Mod, stop}}};
+add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) ->
+ Server#qserver{waitlist=[{Info, From}|Waitlist]}.
+
+service_waitlist(#qserver{waitlist=[]}=Server) ->
+ Server;
+service_waitlist(#qserver{waitlist=Waitlist}=Server) ->
+ [Oldest|RevWList] = lists:reverse(Waitlist),
+ case service_waiting(Oldest, Server) of
+ ok ->
+ Server#qserver{waitlist=lists:reverse(RevWList)};
+ wait ->
+ Server#qserver{waitlist=Waitlist}
+ end.
+
+% todo get rid of duplication
+service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) ->
+ Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ case lang_proc(Lang, Server, fun(Procs) ->
+ % find a proc in the set that has the DDoc
+ proc_with_ddoc(DDoc, DDocKey, Procs)
+ end) of
+ {ok, Proc} ->
+ gen_server:reply(From, {ok, Proc, Server#qserver.config}),
+ ok;
+ wait -> % this should never happen
+ wait;
+ Error ->
+ gen_server:reply(From, Error),
+ ok
+ end;
+service_waiting({{Lang}, From}, Server) ->
+ case lang_proc(Lang, Server, fun([P|Procs]) ->
+ {ok, P}
+ end) of
+ {ok, Proc} ->
+ gen_server:reply(From, {ok, Proc, Server#qserver.config}),
+ ok;
+ wait -> % this should never happen
+ wait;
+ Error ->
+ gen_server:reply(From, Error),
+ ok
+ end.
+
+lang_proc(Lang, #qserver{
+ langs=Langs,
+ pid_procs=PidProcs,
+ lang_procs=LangProcs,
+ lang_limits=LangLimits}, PickFun) ->
+ % Note to future self. Add max process limit.
+ case ets:lookup(LangProcs, Lang) of
+ [{Lang, [P|Procs]}] ->
+ {ok, Proc} = PickFun([P|Procs]),
+ rem_from_list(LangProcs, Lang, Proc),
+ {ok, Proc};
_ ->
- {unknown_query_language, Lang}
+ case (catch new_process(Langs, LangLimits, Lang)) of
+ {ok, Proc} ->
+ add_value(PidProcs, Proc#proc.pid, Proc),
+ {ok, Proc2} = PickFun([Proc]);
+ ErrorOrWait ->
+ ErrorOrWait
+ end
+ end.
+
+new_process(Langs, LangLimits, Lang) ->
+ [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang),
+ if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit
+ % we are below the limit for our language, make a new one
+ case ets:lookup(Langs, Lang) of
+ [{Lang, Mod, Func, Arg}] ->
+ {ok, Pid} = apply(Mod, Func, Arg),
+ true = ets:insert(LangLimits, {Lang, Lim, Current+1}),
+ {ok, #proc{lang=Lang,
+ pid=Pid,
+ % Called via proc_prompt, proc_set_timeout, and proc_stop
+ prompt_fun={Mod, prompt},
+ set_timeout_fun={Mod, set_timeout},
+ stop_fun={Mod, stop}}};
+ _ ->
+ {unknown_query_language, Lang}
+ end;
+ true ->
+ wait
end.
proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
@@ -402,12 +496,11 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
get_ddoc_process(#doc{} = DDoc, DDocKey) ->
% remove this case statement
case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
- {ok, Proc, QueryConfig} ->
+ {ok, Proc, {QueryConfig}} ->
% process knows the ddoc
- case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
+ case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
true ->
- proc_set_timeout(Proc, list_to_integer(couch_config:get(
- "couchdb", "os_process_timeout", "5000"))),
+ proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
link(Proc#proc.pid),
gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
@@ -421,11 +514,10 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->
get_os_process(Lang) ->
case gen_server:call(couch_query_servers, {get_proc, Lang}) of
- {ok, Proc, QueryConfig} ->
- case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
+ {ok, Proc, {QueryConfig}} ->
+ case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
true ->
- proc_set_timeout(Proc, list_to_integer(couch_config:get(
- "couchdb", "os_process_timeout", "5000"))),
+ proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
link(Proc#proc.pid),
gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;