summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2010-03-04 01:09:22 +0000
committerDamien F. Katz <damien@apache.org>2010-03-04 01:09:22 +0000
commit3a3a9c1efab1c9fe4cd5ebb6c80da4005eb0806b (patch)
treef850ff24c113465531aab2bdcdb2f2e70855d00c /src/couchdb/couch_query_servers.erl
parent2ea7ab6a8525184ecdbfcee69667e200511b9f9b (diff)
Changed process tree shutdown to be synchronous, to eliminate spurious test failures caused by processes not shutdown fast enough or at the wrong time.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@918805 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl75
1 files changed, 37 insertions, 38 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 3095b199..344ce8b2 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -15,7 +15,7 @@
-export([start_link/0]).
--export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).
@@ -38,9 +38,6 @@
start_link() ->
gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
-stop() ->
- exit(whereis(couch_query_servers), normal).
-
start_doc_map(Lang, Functions) ->
Proc = get_os_process(Lang),
lists:foreach(fun(FunctionSource) ->
@@ -219,17 +216,18 @@ init([]) ->
ok = couch_config:register(
fun("query_servers" ++ _, _) ->
- ?MODULE:stop()
+ supervisor:terminate_child(couch_secondary_services, query_servers),
+ supervisor:restart_child(couch_secondary_services, query_servers)
end),
ok = couch_config:register(
fun("native_query_servers" ++ _, _) ->
- ?MODULE:stop()
+ supervisor:terminate_child(couch_secondary_services, query_servers),
+ [supervisor:restart_child(couch_secondary_services, query_servers)]
end),
Langs = ets:new(couch_query_server_langs, [set, private]),
PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
LangProcs = ets:new(couch_query_server_procs, [set, private]),
- InUse = ets:new(couch_query_server_used, [set, private]),
% 'query_servers' specifies an OS command-line to execute.
lists:foreach(fun({Lang, Command}) ->
true = ets:insert(Langs, {?l2b(Lang),
@@ -244,75 +242,71 @@ init([]) ->
process_flag(trap_exit, true),
{ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
PidProcs, % Keyed by PID, valus is a #proc record.
- LangProcs, % Keyed by language name, value is a #proc record
- InUse % Keyed by PID, value is #proc record.
+ LangProcs % Keyed by language name, value is a #proc record
}}.
-terminate(_Reason, _Server) ->
+terminate(_Reason, {_Langs, PidProcs, _LangProcs}) ->
+ [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
ok.
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) ->
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) ->
% Note to future self. Add max process limit.
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
case ets:lookup(LangProcs, Lang) of
[{Lang, [P|Rest]}] ->
% find a proc in the set that has the DDoc
case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of
- {ok, Proc} ->
- % looks like the proc isn't getting dropped from the list.
- % we need to change this to take a fun for equality checking
- % so we can do a comparison on portnum
- rem_from_list(LangProcs, Lang, Proc),
- add_to_list(InUse, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end;
+ {ok, Proc} ->
+ rem_from_list(LangProcs, Lang, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, Server};
+ Error ->
+ {reply, Error, Server}
+ end;
_ ->
case (catch new_process(Langs, Lang)) of
{ok, Proc} ->
add_value(PidProcs, Proc#proc.pid, Proc),
case proc_with_ddoc(DDoc, DDocKey, [Proc]) of
- {ok, Proc2} ->
- rem_from_list(LangProcs, Lang, Proc),
- add_to_list(InUse, Lang, Proc2),
- {reply, {ok, Proc2, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end;
+ {ok, Proc2} ->
+ {reply, {ok, Proc2, get_query_server_config()}, Server};
+ Error ->
+ {reply, Error, Server}
+ end;
Error ->
{reply, Error, Server}
end
end;
-handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) ->
+handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) ->
% Note to future self. Add max process limit.
case ets:lookup(LangProcs, Lang) of
[{Lang, [Proc|_]}] ->
- add_value(PidProcs, Proc#proc.pid, Proc),
rem_from_list(LangProcs, Lang, Proc),
- add_to_list(InUse, Lang, Proc),
{reply, {ok, Proc, get_query_server_config()}, Server};
_ ->
case (catch new_process(Langs, Lang)) of
{ok, Proc} ->
add_value(PidProcs, Proc#proc.pid, Proc),
- add_to_list(InUse, Lang, Proc),
{reply, {ok, Proc, get_query_server_config()}, Server};
Error ->
{reply, Error, Server}
end
end;
-handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) ->
+handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) ->
+ rem_value(PidProcs, Pid),
+ unlink(Pid),
+ {reply, ok, Server};
+handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) ->
% Along with max process limit, here we should check
% if we're over the limit and discard when we are.
+ add_value(PidProcs, Proc#proc.pid, Proc),
add_to_list(LangProcs, Proc#proc.lang, Proc),
- rem_from_list(InUse, Proc#proc.lang, Proc),
+ link(Proc#proc.pid),
{reply, true, Server}.
handle_cast(_Whatever, Server) ->
{noreply, Server}.
-handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) ->
+handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
case ets:lookup(PidProcs, Pid) of
[{Pid, Proc}] ->
case Status of
@@ -321,11 +315,14 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) ->
end,
rem_value(PidProcs, Pid),
catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
- catch rem_from_list(InUse, Proc#proc.lang, Proc),
{noreply, Server};
[] ->
- ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
- {stop, Status, Server}
+ case Status of
+ normal ->
+ {noreply, Server};
+ _ ->
+ {stop, Status, Server}
+ end
end.
code_change(_OldVsn, State, _Extra) ->
@@ -402,6 +399,7 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
link(Proc#proc.pid),
+ gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
_ ->
catch proc_stop(Proc),
@@ -419,6 +417,7 @@ get_os_process(Lang) ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
link(Proc#proc.pid),
+ gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
_ ->
catch proc_stop(Proc),