diff options
author | Damien F. Katz <damien@apache.org> | 2009-05-30 02:43:52 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-05-30 02:43:52 +0000 |
commit | 7860f047be25f99c4d1e83f9456c7eb245ad37dc (patch) | |
tree | 8a53ccfc11cb17bd666035892c8dbf14e36d5b0e /src/couchdb/couch_query_servers.erl | |
parent | c1ca0dd42524fb6999e1f05de94ccff71bc497b6 (diff) |
Test and fix for infinite loops in view_servers, fix for crashed OS processes causing leaked erlang processes and fix for view server crashing when view group process terminates.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@780165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 63 |
1 files changed, 41 insertions, 22 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 7c82abe7..ef2bde3b 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -118,7 +118,7 @@ recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinRe recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) -> recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]). -os_reduce(Lang, [], KVs) -> +os_reduce(_Lang, [], _KVs) -> {ok, []}; os_reduce(Lang, OsRedSrcs, KVs) -> Pid = get_os_process(Lang), @@ -130,7 +130,7 @@ os_reduce(Lang, OsRedSrcs, KVs) -> end, {ok, OsResults}. -builtin_reduce(_Re, [], KVs, Acc) -> +builtin_reduce(_Re, [], _KVs, Acc) -> {ok, lists:reverse(Acc)}; builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) -> Sum = builtin_sum_rows(KVs), @@ -242,6 +242,7 @@ init([]) -> lists:foreach(fun({Lang, Command}) -> true = ets:insert(Langs, {?l2b(Lang), Command}) end, couch_config:get("query_servers")), + process_flag(trap_exit, true), {ok, {Langs, PidLangs, Pids, InUse}}. terminate(_Reason, _Server) -> @@ -255,13 +256,15 @@ handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) -> add_value(PidLangs, Pid, Lang), rem_from_list(Pids, Lang, Pid), add_to_list(InUse, Lang, Pid), - QueryConfig = get_query_server_config(), - true = couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig]), - {reply, Pid, Server}; + {reply, {recycled, Pid, get_query_server_config()}, Server}; _ -> - {ok, Pid} = new_process(Langs, Lang), - add_to_list(InUse, Lang, Pid), - {reply, Pid, Server} + case (catch new_process(Langs, Lang)) of + {ok, Pid} -> + add_to_list(InUse, Lang, Pid), + {reply, {new, Pid}, Server}; + Error -> + {reply, Error, Server} + end end; handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> % Along with max process limit, here we should check @@ -273,22 +276,20 @@ handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {Langs, PidLangs, Pids, InUse}) -> +handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) -> case ets:lookup(PidLangs, Pid) of [{Pid, Lang}] -> case Status of normal -> ok; _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) end, - {ok, { - Langs, - rem_value(PidLangs, Pid), - rem_from_list(Pids, Lang, Pid), - rem_from_list(InUse, Lang, Pid) - }}; + rem_value(PidLangs, Pid), + catch rem_from_list(Pids, Lang, Pid), + catch rem_from_list(InUse, Lang, Pid), + {noreply, Server}; [] -> ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), - {ok, {Langs, PidLangs, Pids, InUse}} + {stop, Status, Server} end. code_change(_OldVsn, State, _Extra) -> @@ -302,27 +303,45 @@ get_query_server_config() -> {[{<<"reduce_limit">>, ReduceLimit}]}. new_process(Langs, Lang) -> - Proc = case ets:lookup(Langs, Lang) of [{Lang, Command}] -> couch_os_process:start_link(Command); _ -> - throw({unknown_query_language, Lang}) - end, - Proc. + {unknown_query_language, Lang} + end. get_os_process(Lang) -> - gen_server:call(couch_query_servers, {get_proc, Lang}). + case gen_server:call(couch_query_servers, {get_proc, Lang}) of + {new, Pid} -> + couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Pid), + Pid; + {recycled, Pid, QueryConfig} -> + case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of + true -> + couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Pid), + Pid; + _ -> + catch couch_os_process:stop(Pid), + get_os_process(Lang) + end; + Error -> + throw(Error) + end. ret_os_process(Lang, Pid) -> true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}), + catch unlink(Pid), ok. add_value(Tid, Key, Value) -> true = ets:insert(Tid, {Key, Value}). rem_value(Tid, Key) -> - true = ets:insert(Tid, Key). + true = ets:delete(Tid, Key). add_to_list(Tid, Key, Value) -> case ets:lookup(Tid, Key) of |