summaryrefslogtreecommitdiff
path: root/apps/couch
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch')
-rw-r--r--apps/couch/include/couch_db.hrl9
-rw-r--r--apps/couch/src/couch.app.src2
-rw-r--r--apps/couch/src/couch_proc_manager.erl186
-rw-r--r--apps/couch/src/couch_query_servers.erl214
4 files changed, 199 insertions, 212 deletions
diff --git a/apps/couch/include/couch_db.hrl b/apps/couch/include/couch_db.hrl
index 2907ff90..12b0ac90 100644
--- a/apps/couch/include/couch_db.hrl
+++ b/apps/couch/include/couch_db.hrl
@@ -294,3 +294,12 @@
include_docs = false
}).
+-record(proc, {
+ pid,
+ lang,
+ client = nil,
+ ddoc_keys = [],
+ prompt_fun,
+ set_timeout_fun,
+ stop_fun
+}).
diff --git a/apps/couch/src/couch.app.src b/apps/couch/src/couch.app.src
index 61bcc1ee..26bcd406 100644
--- a/apps/couch/src/couch.app.src
+++ b/apps/couch/src/couch.app.src
@@ -9,7 +9,7 @@
couch_httpd,
couch_log,
couch_primary_services,
- couch_query_servers,
+ couch_proc_manager,
couch_rep_sup,
couch_secondary_services,
couch_server,
diff --git a/apps/couch/src/couch_proc_manager.erl b/apps/couch/src/couch_proc_manager.erl
new file mode 100644
index 00000000..ca2e8ac5
--- /dev/null
+++ b/apps/couch/src/couch_proc_manager.erl
@@ -0,0 +1,186 @@
+-module(couch_proc_manager).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_proc_count/0]).
+
+-include("couch_db.hrl").
+
+-record(state, {tab}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_proc_count() ->
+ gen_server:call(?MODULE, get_proc_count).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, #state{tab = ets:new(procs, [{keypos, #proc.pid}])}}.
+
+handle_call(get_table, _From, State) ->
+ {reply, State#state.tab, State};
+
+handle_call(get_proc_count, _From, State) ->
+ {reply, ets:info(State#state.tab, size), State};
+
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, {Client, _}, State) ->
+ Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ try get_procs(State#state.tab, Lang) of
+ Procs ->
+ case proc_with_ddoc(DDoc, DDocKey, Procs) of
+ {ok, Proc0} ->
+ Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+ ets:insert(State#state.tab, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, State};
+ {error, Reason} ->
+ {reply, {error, Reason}, State}
+ end
+ catch {unknown_query_language, _} ->
+ {reply, {unknown_query_language, Lang}, State};
+ error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+
+handle_call({get_proc, Lang}, {Client, _}, State) ->
+ try get_procs(State#state.tab, Lang) of
+ [Proc0|_] ->
+ Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+ ets:insert(State#state.tab, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, State}
+ catch {unknown_query_language, _} ->
+ {reply, {unknown_query_language, Lang}, State};
+ error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+
+handle_call({ret_proc, #proc{client=Ref, pid=Pid} = Proc}, _From, State) ->
+ erlang:demonitor(Ref, [flush]),
+ % We need to check if the process is alive here, as the client could be
+ % handing us a #proc{} with a dead one. We would have already removed the
+ % #proc{} from our own table, so the alternative is to do a lookup in the
+ % table before the insert. Don't know which approach is cheaper.
+ case is_process_alive(Pid) of true ->
+ maybe_reuse_proc(State#state.tab, Proc);
+ false -> ok end,
+ {reply, true, State};
+
+handle_call(_Call, _From, State) ->
+ {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
+ ets:delete(State#state.tab, Pid),
+ {noreply, State};
+
+handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
+ case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of
+ [] ->
+ ok;
+ [#proc{pid = Pid} = Proc] ->
+ case is_process_alive(Pid) of true ->
+ maybe_reuse_proc(State#state.tab, Proc);
+ false -> ok end
+ end,
+ {noreply, State};
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{tab=Tab}) ->
+ ets:foldl(fun(#proc{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+maybe_reuse_proc(Tab, #proc{pid = Pid} = Proc) ->
+ Limit = couch_config:get("query_server_config", "os_process_soft_limit", "100"),
+ case ets:info(Tab, size) > list_to_integer(Limit) of
+ true ->
+ ets:delete(Tab, Pid),
+ unlink(Pid),
+ exit(Pid, kill);
+ false ->
+ garbage_collect(Pid),
+ ets:insert(Tab, Proc#proc{client=nil})
+ end.
+
+get_procs(Tab, Lang) when is_binary(Lang) ->
+ get_procs(Tab, binary_to_list(Lang));
+get_procs(Tab, Lang) when is_list(Lang) ->
+ case ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}) of
+ [] ->
+ {ok, NewProc} = new_proc(Lang), % check OS process limit
+ [NewProc];
+ Procs ->
+ Procs
+ end.
+
+new_proc(Lang) when is_binary(Lang) ->
+ new_proc(binary_to_list(Lang));
+new_proc(Lang) when is_list(Lang) ->
+ case couch_config:get("query_servers", Lang) of
+ undefined ->
+ case couch_config:get("native_query_servers", Lang) of
+ undefined ->
+ throw({unknown_query_language, Lang});
+ SpecStr ->
+ {ok, {M,F,A}} = couch_util:parse_term(SpecStr),
+ {ok, Pid} = apply(M, F, A),
+ make_proc(Pid, Lang, M)
+ end;
+ Command ->
+ {ok, Pid} = couch_os_process:start_link(Command),
+ make_proc(Pid, Lang, couch_os_process)
+ end.
+
+make_proc(Pid, Lang, Mod) ->
+ Proc = #proc{
+ lang = Lang,
+ pid = Pid,
+ prompt_fun = {Mod, prompt},
+ set_timeout_fun = {Mod, set_timeout},
+ stop_fun = {Mod, stop}
+ },
+ {ok, Proc}.
+
+get_query_server_config() ->
+ Limit = couch_config:get("query_server_config", "reduce_limit", "true"),
+ {[{<<"reduce_limit">>, list_to_atom(Limit)}]}.
+
+proc_with_ddoc(DDoc, DDocKey, Procs) ->
+ Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
+ case lists:dropwhile(Filter, Procs) of
+ [DDocProc|_] ->
+ {ok, DDocProc};
+ [] ->
+ teach_any_proc(DDoc, DDocKey, Procs)
+ end.
+
+teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
+ try
+ teach_ddoc(DDoc, DDocKey, Proc)
+ catch _:_ ->
+ teach_any_proc(DDoc, DDocKey, Rest)
+ end;
+teach_any_proc(_, _, []) ->
+ {error, noproc}.
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+ % send ddoc over the wire
+ % we only share the rev with the client we know to update code
+ % but it only keeps the latest copy, per each ddoc, around.
+ true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
+ DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ % we should remove any other ddocs keys for this docid
+ % because the query server overwrites without the rev
+ Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ % add ddoc to the proc
+ {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl
index a4850e86..4e86dcf4 100644
--- a/apps/couch/src/couch_query_servers.erl
+++ b/apps/couch/src/couch_query_servers.erl
@@ -11,11 +11,7 @@
% the License.
-module(couch_query_servers).
--behaviour(gen_server).
--export([start_link/0, config_change/1]).
-
--export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).
@@ -27,18 +23,6 @@
-include("couch_db.hrl").
--record(proc, {
- pid,
- lang,
- ddoc_keys = [],
- prompt_fun,
- set_timeout_fun,
- stop_fun
-}).
-
-start_link() ->
- gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
-
start_doc_map(Lang, Functions) ->
Proc = get_os_process(Lang),
lists:foreach(fun(FunctionSource) ->
@@ -223,151 +207,6 @@ with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
ok = ret_os_process(Proc)
end.
-init([]) ->
- % register async to avoid deadlock on restart_child
- Self = self(),
- spawn(couch_config, register, [fun ?MODULE:config_change/1, Self]),
-
- Langs = ets:new(couch_query_server_langs, [set, private]),
- PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
- LangProcs = ets:new(couch_query_server_procs, [set, private]),
- % 'query_servers' specifies an OS command-line to execute.
- lists:foreach(fun({Lang, Command}) ->
- true = ets:insert(Langs, {?l2b(Lang),
- couch_os_process, start_link, [Command]})
- end, couch_config:get("query_servers")),
- % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
- lists:foreach(fun({Lang, SpecStr}) ->
- {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
- true = ets:insert(Langs, {?l2b(Lang),
- Mod, Fun, SpecArg})
- end, couch_config:get("native_query_servers")),
- process_flag(trap_exit, true),
- {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
- PidProcs, % Keyed by PID, valus is a #proc record.
- LangProcs % Keyed by language name, value is a #proc record
- }}.
-
-terminate(_Reason, {_Langs, PidProcs, _LangProcs}) ->
- [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
- ok.
-
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) ->
- % Note to future self. Add max process limit.
- Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
- case ets:lookup(LangProcs, Lang) of
- [{Lang, [P|Rest]}] ->
- % find a proc in the set that has the DDoc
- {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]),
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- _ ->
- case (catch new_process(Langs, Lang)) of
- {ok, Proc} ->
- add_value(PidProcs, Proc#proc.pid, Proc),
- {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]),
- {reply, {ok, Proc2, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end
- end;
-handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) ->
- % Note to future self. Add max process limit.
- case ets:lookup(LangProcs, Lang) of
- [{Lang, [Proc|_]}] ->
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- _ ->
- case (catch new_process(Langs, Lang)) of
- {ok, Proc} ->
- add_value(PidProcs, Proc#proc.pid, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end
- end;
-handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) ->
- rem_value(PidProcs, Pid),
- unlink(Pid),
- {reply, ok, Server};
-handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) ->
- % Along with max process limit, here we should check
- % if we're over the limit and discard when we are.
- add_value(PidProcs, Proc#proc.pid, Proc),
- add_to_list(LangProcs, Proc#proc.lang, Proc),
- link(Proc#proc.pid),
- {reply, true, Server}.
-
-handle_cast(_Whatever, Server) ->
- {noreply, Server}.
-
-handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
- case ets:lookup(PidProcs, Pid) of
- [{Pid, Proc}] ->
- case Status of
- normal -> ok;
- _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
- end,
- rem_value(PidProcs, Pid),
- catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
- {noreply, Server};
- [] ->
- case Status of
- normal ->
- {noreply, Server};
- _ ->
- {stop, Status, Server}
- end
- end.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-config_change("query_servers") ->
- supervisor:terminate_child(couch_secondary_services, query_servers),
- supervisor:restart_child(couch_secondary_services, query_servers);
-config_change("native_query_servers") ->
- supervisor:terminate_child(couch_secondary_services, query_servers),
- supervisor:restart_child(couch_secondary_services, query_servers).
-
-% Private API
-
-get_query_server_config() ->
- ReduceLimit = list_to_atom(
- couch_config:get("query_server_config","reduce_limit","true")),
- {[{<<"reduce_limit">>, ReduceLimit}]}.
-
-new_process(Langs, Lang) ->
- case ets:lookup(Langs, Lang) of
- [{Lang, Mod, Func, Arg}] ->
- {ok, Pid} = apply(Mod, Func, Arg),
- {ok, #proc{lang=Lang,
- pid=Pid,
- % Called via proc_prompt, proc_set_timeout, and proc_stop
- prompt_fun={Mod, prompt},
- set_timeout_fun={Mod, set_timeout},
- stop_fun={Mod, stop}}};
- _ ->
- {unknown_query_language, Lang}
- end.
-
-proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
- DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) ->
- lists:any(fun(Key) ->
- Key == DDocKey
- end, Keys)
- end, LangProcs),
- case DDocProcs of
- [DDocProc|_] ->
- ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]),
- {ok, DDocProc};
- [] ->
- [TeachProc|_] = LangProcs,
- ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]),
- {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc),
- {ok, SmartProc}
- end.
-
proc_prompt(Proc, Args) ->
{Mod, Func} = Proc#proc.prompt_fun,
apply(Mod, Func, [Proc#proc.pid, Args]).
@@ -380,28 +219,15 @@ proc_set_timeout(Proc, Timeout) ->
{Mod, Func} = Proc#proc.set_timeout_fun,
apply(Mod, Func, [Proc#proc.pid, Timeout]).
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
- % send ddoc over the wire
- % we only share the rev with the client we know to update code
- % but it only keeps the latest copy, per each ddoc, around.
- true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
- % we should remove any other ddocs keys for this docid
- % because the query server overwrites without the rev
- Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
- % add ddoc to the proc
- {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
-
get_ddoc_process(#doc{} = DDoc, DDocKey) ->
% remove this case statement
- case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
+ case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}) of
{ok, Proc, QueryConfig} ->
% process knows the ddoc
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
- link(Proc#proc.pid),
- gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
_ ->
catch proc_stop(Proc),
@@ -412,14 +238,12 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->
end.
get_os_process(Lang) ->
- case gen_server:call(couch_query_servers, {get_proc, Lang}) of
+ case gen_server:call(couch_proc_manager, {get_proc, Lang}) of
{ok, Proc, QueryConfig} ->
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
- link(Proc#proc.pid),
- gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
_ ->
catch proc_stop(Proc),
@@ -430,38 +254,6 @@ get_os_process(Lang) ->
end.
ret_os_process(Proc) ->
- true = gen_server:call(couch_query_servers, {ret_proc, Proc}),
+ true = gen_server:call(couch_proc_manager, {ret_proc, Proc}),
catch unlink(Proc#proc.pid),
ok.
-
-add_value(Tid, Key, Value) ->
- true = ets:insert(Tid, {Key, Value}).
-
-rem_value(Tid, Key) ->
- true = ets:delete(Tid, Key).
-
-add_to_list(Tid, Key, Value) ->
- case ets:lookup(Tid, Key) of
- [{Key, Vals}] ->
- true = ets:insert(Tid, {Key, [Value|Vals]});
- [] ->
- true = ets:insert(Tid, {Key, [Value]})
- end.
-
-rem_from_list(Tid, Key, Value) when is_record(Value, proc)->
- Pid = Value#proc.pid,
- case ets:lookup(Tid, Key) of
- [{Key, Vals}] ->
- % make a new values list that doesn't include the Value arg
- NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid],
- ets:insert(Tid, {Key, NewValues});
- [] -> ok
- end;
-rem_from_list(Tid, Key, Value) ->
- case ets:lookup(Tid, Key) of
- [{Key, Vals}] ->
- % make a new values list that doesn't include the Value arg
- NewValues = [Val || Val <- Vals, Val /= Value],
- ets:insert(Tid, {Key, NewValues});
- [] -> ok
- end.