summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-05-30 02:43:52 +0000
committerDamien F. Katz <damien@apache.org>2009-05-30 02:43:52 +0000
commit7860f047be25f99c4d1e83f9456c7eb245ad37dc (patch)
tree8a53ccfc11cb17bd666035892c8dbf14e36d5b0e /src/couchdb/couch_query_servers.erl
parentc1ca0dd42524fb6999e1f05de94ccff71bc497b6 (diff)
Test and fix for infinite loops in view_servers, fix for crashed OS processes causing leaked erlang processes and fix for view server crashing when view group process terminates.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@780165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl63
1 files changed, 41 insertions, 22 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 7c82abe7..ef2bde3b 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -118,7 +118,7 @@ recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinRe
recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
-os_reduce(Lang, [], KVs) ->
+os_reduce(_Lang, [], _KVs) ->
{ok, []};
os_reduce(Lang, OsRedSrcs, KVs) ->
Pid = get_os_process(Lang),
@@ -130,7 +130,7 @@ os_reduce(Lang, OsRedSrcs, KVs) ->
end,
{ok, OsResults}.
-builtin_reduce(_Re, [], KVs, Acc) ->
+builtin_reduce(_Re, [], _KVs, Acc) ->
{ok, lists:reverse(Acc)};
builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) ->
Sum = builtin_sum_rows(KVs),
@@ -242,6 +242,7 @@ init([]) ->
lists:foreach(fun({Lang, Command}) ->
true = ets:insert(Langs, {?l2b(Lang), Command})
end, couch_config:get("query_servers")),
+ process_flag(trap_exit, true),
{ok, {Langs, PidLangs, Pids, InUse}}.
terminate(_Reason, _Server) ->
@@ -255,13 +256,15 @@ handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
add_value(PidLangs, Pid, Lang),
rem_from_list(Pids, Lang, Pid),
add_to_list(InUse, Lang, Pid),
- QueryConfig = get_query_server_config(),
- true = couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig]),
- {reply, Pid, Server};
+ {reply, {recycled, Pid, get_query_server_config()}, Server};
_ ->
- {ok, Pid} = new_process(Langs, Lang),
- add_to_list(InUse, Lang, Pid),
- {reply, Pid, Server}
+ case (catch new_process(Langs, Lang)) of
+ {ok, Pid} ->
+ add_to_list(InUse, Lang, Pid),
+ {reply, {new, Pid}, Server};
+ Error ->
+ {reply, Error, Server}
+ end
end;
handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
% Along with max process limit, here we should check
@@ -273,22 +276,20 @@ handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
handle_cast(_Whatever, Server) ->
{noreply, Server}.
-handle_info({'EXIT', Pid, Status}, {Langs, PidLangs, Pids, InUse}) ->
+handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) ->
case ets:lookup(PidLangs, Pid) of
[{Pid, Lang}] ->
case Status of
normal -> ok;
_ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
end,
- {ok, {
- Langs,
- rem_value(PidLangs, Pid),
- rem_from_list(Pids, Lang, Pid),
- rem_from_list(InUse, Lang, Pid)
- }};
+ rem_value(PidLangs, Pid),
+ catch rem_from_list(Pids, Lang, Pid),
+ catch rem_from_list(InUse, Lang, Pid),
+ {noreply, Server};
[] ->
?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
- {ok, {Langs, PidLangs, Pids, InUse}}
+ {stop, Status, Server}
end.
code_change(_OldVsn, State, _Extra) ->
@@ -302,27 +303,45 @@ get_query_server_config() ->
{[{<<"reduce_limit">>, ReduceLimit}]}.
new_process(Langs, Lang) ->
- Proc =
case ets:lookup(Langs, Lang) of
[{Lang, Command}] ->
couch_os_process:start_link(Command);
_ ->
- throw({unknown_query_language, Lang})
- end,
- Proc.
+ {unknown_query_language, Lang}
+ end.
get_os_process(Lang) ->
- gen_server:call(couch_query_servers, {get_proc, Lang}).
+ case gen_server:call(couch_query_servers, {get_proc, Lang}) of
+ {new, Pid} ->
+ couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
+ "couchdb", "os_process_timeout", "5000"))),
+ link(Pid),
+ Pid;
+ {recycled, Pid, QueryConfig} ->
+ case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of
+ true ->
+ couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
+ "couchdb", "os_process_timeout", "5000"))),
+ link(Pid),
+ Pid;
+ _ ->
+ catch couch_os_process:stop(Pid),
+ get_os_process(Lang)
+ end;
+ Error ->
+ throw(Error)
+ end.
ret_os_process(Lang, Pid) ->
true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
+ catch unlink(Pid),
ok.
add_value(Tid, Key, Value) ->
true = ets:insert(Tid, {Key, Value}).
rem_value(Tid, Key) ->
- true = ets:insert(Tid, Key).
+ true = ets:delete(Tid, Key).
add_to_list(Tid, Key, Value) ->
case ets:lookup(Tid, Key) of