diff options
author | John Christopher Anderson <jchris@apache.org> | 2010-03-04 03:10:19 +0000 |
---|---|---|
committer | John Christopher Anderson <jchris@apache.org> | 2010-03-04 03:10:19 +0000 |
commit | d318717866ffa267781ab482e99a05415e2ac0e4 (patch) | |
tree | 207409d78eacca8e206deb05b99689ec449722a4 /src/couchdb/couch_query_servers.erl | |
parent | 3a3a9c1efab1c9fe4cd5ebb6c80da4005eb0806b (diff) |
reverting damien's latest commit until we can figure out why it's causing failures
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@918834 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 75 |
1 files changed, 38 insertions, 37 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 344ce8b2..3095b199 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -15,7 +15,7 @@ -export([start_link/0]). --export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). @@ -38,6 +38,9 @@ start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). +stop() -> + exit(whereis(couch_query_servers), normal). + start_doc_map(Lang, Functions) -> Proc = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> @@ -216,18 +219,17 @@ init([]) -> ok = couch_config:register( fun("query_servers" ++ _, _) -> - supervisor:terminate_child(couch_secondary_services, query_servers), - supervisor:restart_child(couch_secondary_services, query_servers) + ?MODULE:stop() end), ok = couch_config:register( fun("native_query_servers" ++ _, _) -> - supervisor:terminate_child(couch_secondary_services, query_servers), - [supervisor:restart_child(couch_secondary_services, query_servers)] + ?MODULE:stop() end), Langs = ets:new(couch_query_server_langs, [set, private]), PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), LangProcs = ets:new(couch_query_server_procs, [set, private]), + InUse = ets:new(couch_query_server_used, [set, private]), % 'query_servers' specifies an OS command-line to execute. lists:foreach(fun({Lang, Command}) -> true = ets:insert(Langs, {?l2b(Lang), @@ -242,71 +244,75 @@ init([]) -> process_flag(trap_exit, true), {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} PidProcs, % Keyed by PID, valus is a #proc record. - LangProcs % Keyed by language name, value is a #proc record + LangProcs, % Keyed by language name, value is a #proc record + InUse % Keyed by PID, value is #proc record. }}. -terminate(_Reason, {_Langs, PidProcs, _LangProcs}) -> - [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)], +terminate(_Reason, _Server) -> ok. -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) -> +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), case ets:lookup(LangProcs, Lang) of [{Lang, [P|Rest]}] -> % find a proc in the set that has the DDoc case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of - {ok, Proc} -> - rem_from_list(LangProcs, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end; + {ok, Proc} -> + % looks like the proc isn't getting dropped from the list. + % we need to change this to take a fun for equality checking + % so we can do a comparison on portnum + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), + {reply, {ok, Proc, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> add_value(PidProcs, Proc#proc.pid, Proc), case proc_with_ddoc(DDoc, DDocKey, [Proc]) of - {ok, Proc2} -> - {reply, {ok, Proc2, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end; + {ok, Proc2} -> + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc2), + {reply, {ok, Proc2, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; Error -> {reply, Error, Server} end end; -handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) -> +handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. case ets:lookup(LangProcs, Lang) of [{Lang, [Proc|_]}] -> + add_value(PidProcs, Proc#proc.pid, Proc), rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), {reply, {ok, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> add_value(PidProcs, Proc#proc.pid, Proc), + add_to_list(InUse, Lang, Proc), {reply, {ok, Proc, get_query_server_config()}, Server}; Error -> {reply, Error, Server} end end; -handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) -> - rem_value(PidProcs, Pid), - unlink(Pid), - {reply, ok, Server}; -handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) -> +handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) -> % Along with max process limit, here we should check % if we're over the limit and discard when we are. - add_value(PidProcs, Proc#proc.pid, Proc), add_to_list(LangProcs, Proc#proc.lang, Proc), - link(Proc#proc.pid), + rem_from_list(InUse, Proc#proc.lang, Proc), {reply, true, Server}. handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> +handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> case ets:lookup(PidProcs, Pid) of [{Pid, Proc}] -> case Status of @@ -315,14 +321,11 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> end, rem_value(PidProcs, Pid), catch rem_from_list(LangProcs, Proc#proc.lang, Proc), + catch rem_from_list(InUse, Proc#proc.lang, Proc), {noreply, Server}; [] -> - case Status of - normal -> - {noreply, Server}; - _ -> - {stop, Status, Server} - end + ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), + {stop, Status, Server} end. code_change(_OldVsn, State, _Extra) -> @@ -399,7 +402,6 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) -> proc_set_timeout(Proc, list_to_integer(couch_config:get( "couchdb", "os_process_timeout", "5000"))), link(Proc#proc.pid), - gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; _ -> catch proc_stop(Proc), @@ -417,7 +419,6 @@ get_os_process(Lang) -> proc_set_timeout(Proc, list_to_integer(couch_config:get( "couchdb", "os_process_timeout", "5000"))), link(Proc#proc.pid), - gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; _ -> catch proc_stop(Proc), |