diff options
Diffstat (limited to 'apps/couch')
-rw-r--r-- | apps/couch/include/couch_db.hrl | 9 | ||||
-rw-r--r-- | apps/couch/src/couch.app.src | 2 | ||||
-rw-r--r-- | apps/couch/src/couch_proc_manager.erl | 186 | ||||
-rw-r--r-- | apps/couch/src/couch_query_servers.erl | 214 |
4 files changed, 199 insertions, 212 deletions
diff --git a/apps/couch/include/couch_db.hrl b/apps/couch/include/couch_db.hrl index 2907ff90..12b0ac90 100644 --- a/apps/couch/include/couch_db.hrl +++ b/apps/couch/include/couch_db.hrl @@ -294,3 +294,12 @@ include_docs = false }). +-record(proc, { + pid, + lang, + client = nil, + ddoc_keys = [], + prompt_fun, + set_timeout_fun, + stop_fun +}). diff --git a/apps/couch/src/couch.app.src b/apps/couch/src/couch.app.src index 61bcc1ee..26bcd406 100644 --- a/apps/couch/src/couch.app.src +++ b/apps/couch/src/couch.app.src @@ -9,7 +9,7 @@ couch_httpd, couch_log, couch_primary_services, - couch_query_servers, + couch_proc_manager, couch_rep_sup, couch_secondary_services, couch_server, diff --git a/apps/couch/src/couch_proc_manager.erl b/apps/couch/src/couch_proc_manager.erl new file mode 100644 index 00000000..ca2e8ac5 --- /dev/null +++ b/apps/couch/src/couch_proc_manager.erl @@ -0,0 +1,186 @@ +-module(couch_proc_manager). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_proc_count/0]). + +-include("couch_db.hrl"). + +-record(state, {tab}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_proc_count() -> + gen_server:call(?MODULE, get_proc_count). + +init([]) -> + process_flag(trap_exit, true), + {ok, #state{tab = ets:new(procs, [{keypos, #proc.pid}])}}. + +handle_call(get_table, _From, State) -> + {reply, State#state.tab, State}; + +handle_call(get_proc_count, _From, State) -> + {reply, ets:info(State#state.tab, size), State}; + +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, {Client, _}, State) -> + Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + try get_procs(State#state.tab, Lang) of + Procs -> + case proc_with_ddoc(DDoc, DDocKey, Procs) of + {ok, Proc0} -> + Proc = Proc0#proc{client = erlang:monitor(process, Client)}, + ets:insert(State#state.tab, Proc), + {reply, {ok, Proc, get_query_server_config()}, State}; + {error, Reason} -> + {reply, {error, Reason}, State} + end + catch {unknown_query_language, _} -> + {reply, {unknown_query_language, Lang}, State}; + error:Reason -> + ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), + {reply, {error, Reason}, State} + end; + +handle_call({get_proc, Lang}, {Client, _}, State) -> + try get_procs(State#state.tab, Lang) of + [Proc0|_] -> + Proc = Proc0#proc{client = erlang:monitor(process, Client)}, + ets:insert(State#state.tab, Proc), + {reply, {ok, Proc, get_query_server_config()}, State} + catch {unknown_query_language, _} -> + {reply, {unknown_query_language, Lang}, State}; + error:Reason -> + ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), + {reply, {error, Reason}, State} + end; + +handle_call({ret_proc, #proc{client=Ref, pid=Pid} = Proc}, _From, State) -> + erlang:demonitor(Ref, [flush]), + % We need to check if the process is alive here, as the client could be + % handing us a #proc{} with a dead one. We would have already removed the + % #proc{} from our own table, so the alternative is to do a lookup in the + % table before the insert. Don't know which approach is cheaper. + case is_process_alive(Pid) of true -> + maybe_reuse_proc(State#state.tab, Proc); + false -> ok end, + {reply, true, State}; + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, State) -> + ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]), + ets:delete(State#state.tab, Pid), + {noreply, State}; + +handle_info({'DOWN', Ref, _, _, _Reason}, State) -> + case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of + [] -> + ok; + [#proc{pid = Pid} = Proc] -> + case is_process_alive(Pid) of true -> + maybe_reuse_proc(State#state.tab, Proc); + false -> ok end + end, + {noreply, State}; + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, #state{tab=Tab}) -> + ets:foldl(fun(#proc{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +maybe_reuse_proc(Tab, #proc{pid = Pid} = Proc) -> + Limit = couch_config:get("query_server_config", "os_process_soft_limit", "100"), + case ets:info(Tab, size) > list_to_integer(Limit) of + true -> + ets:delete(Tab, Pid), + unlink(Pid), + exit(Pid, kill); + false -> + garbage_collect(Pid), + ets:insert(Tab, Proc#proc{client=nil}) + end. + +get_procs(Tab, Lang) when is_binary(Lang) -> + get_procs(Tab, binary_to_list(Lang)); +get_procs(Tab, Lang) when is_list(Lang) -> + case ets:match_object(Tab, #proc{lang=Lang, client=nil, _='_'}) of + [] -> + {ok, NewProc} = new_proc(Lang), % check OS process limit + [NewProc]; + Procs -> + Procs + end. + +new_proc(Lang) when is_binary(Lang) -> + new_proc(binary_to_list(Lang)); +new_proc(Lang) when is_list(Lang) -> + case couch_config:get("query_servers", Lang) of + undefined -> + case couch_config:get("native_query_servers", Lang) of + undefined -> + throw({unknown_query_language, Lang}); + SpecStr -> + {ok, {M,F,A}} = couch_util:parse_term(SpecStr), + {ok, Pid} = apply(M, F, A), + make_proc(Pid, Lang, M) + end; + Command -> + {ok, Pid} = couch_os_process:start_link(Command), + make_proc(Pid, Lang, couch_os_process) + end. + +make_proc(Pid, Lang, Mod) -> + Proc = #proc{ + lang = Lang, + pid = Pid, + prompt_fun = {Mod, prompt}, + set_timeout_fun = {Mod, set_timeout}, + stop_fun = {Mod, stop} + }, + {ok, Proc}. + +get_query_server_config() -> + Limit = couch_config:get("query_server_config", "reduce_limit", "true"), + {[{<<"reduce_limit">>, list_to_atom(Limit)}]}. + +proc_with_ddoc(DDoc, DDocKey, Procs) -> + Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end, + case lists:dropwhile(Filter, Procs) of + [DDocProc|_] -> + {ok, DDocProc}; + [] -> + teach_any_proc(DDoc, DDocKey, Procs) + end. + +teach_any_proc(DDoc, DDocKey, [Proc|Rest]) -> + try + teach_ddoc(DDoc, DDocKey, Proc) + catch _:_ -> + teach_any_proc(DDoc, DDocKey, Rest) + end; +teach_any_proc(_, _, []) -> + {error, noproc}. + +teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> + % send ddoc over the wire + % we only share the rev with the client we know to update code + % but it only keeps the latest copy, per each ddoc, around. + true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>, + DDocId, couch_doc:to_json_obj(DDoc, [])]), + % we should remove any other ddocs keys for this docid + % because the query server overwrites without the rev + Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + % add ddoc to the proc + {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl index a4850e86..4e86dcf4 100644 --- a/apps/couch/src/couch_query_servers.erl +++ b/apps/couch/src/couch_query_servers.erl @@ -11,11 +11,7 @@ % the License. -module(couch_query_servers). --behaviour(gen_server). --export([start_link/0, config_change/1]). - --export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). @@ -27,18 +23,6 @@ -include("couch_db.hrl"). --record(proc, { - pid, - lang, - ddoc_keys = [], - prompt_fun, - set_timeout_fun, - stop_fun -}). - -start_link() -> - gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). - start_doc_map(Lang, Functions) -> Proc = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> @@ -223,151 +207,6 @@ with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> ok = ret_os_process(Proc) end. -init([]) -> - % register async to avoid deadlock on restart_child - Self = self(), - spawn(couch_config, register, [fun ?MODULE:config_change/1, Self]), - - Langs = ets:new(couch_query_server_langs, [set, private]), - PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), - LangProcs = ets:new(couch_query_server_procs, [set, private]), - % 'query_servers' specifies an OS command-line to execute. - lists:foreach(fun({Lang, Command}) -> - true = ets:insert(Langs, {?l2b(Lang), - couch_os_process, start_link, [Command]}) - end, couch_config:get("query_servers")), - % 'native_query_servers' specifies a {Module, Func, Arg} tuple. - lists:foreach(fun({Lang, SpecStr}) -> - {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), - true = ets:insert(Langs, {?l2b(Lang), - Mod, Fun, SpecArg}) - end, couch_config:get("native_query_servers")), - process_flag(trap_exit, true), - {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} - PidProcs, % Keyed by PID, valus is a #proc record. - LangProcs % Keyed by language name, value is a #proc record - }}. - -terminate(_Reason, {_Langs, PidProcs, _LangProcs}) -> - [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)], - ok. - -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) -> - % Note to future self. Add max process limit. - Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), - case ets:lookup(LangProcs, Lang) of - [{Lang, [P|Rest]}] -> - % find a proc in the set that has the DDoc - {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]), - rem_from_list(LangProcs, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - _ -> - case (catch new_process(Langs, Lang)) of - {ok, Proc} -> - add_value(PidProcs, Proc#proc.pid, Proc), - {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]), - {reply, {ok, Proc2, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end - end; -handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) -> - % Note to future self. Add max process limit. - case ets:lookup(LangProcs, Lang) of - [{Lang, [Proc|_]}] -> - rem_from_list(LangProcs, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - _ -> - case (catch new_process(Langs, Lang)) of - {ok, Proc} -> - add_value(PidProcs, Proc#proc.pid, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end - end; -handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) -> - rem_value(PidProcs, Pid), - unlink(Pid), - {reply, ok, Server}; -handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) -> - % Along with max process limit, here we should check - % if we're over the limit and discard when we are. - add_value(PidProcs, Proc#proc.pid, Proc), - add_to_list(LangProcs, Proc#proc.lang, Proc), - link(Proc#proc.pid), - {reply, true, Server}. - -handle_cast(_Whatever, Server) -> - {noreply, Server}. - -handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> - case ets:lookup(PidProcs, Pid) of - [{Pid, Proc}] -> - case Status of - normal -> ok; - _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) - end, - rem_value(PidProcs, Pid), - catch rem_from_list(LangProcs, Proc#proc.lang, Proc), - {noreply, Server}; - [] -> - case Status of - normal -> - {noreply, Server}; - _ -> - {stop, Status, Server} - end - end. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -config_change("query_servers") -> - supervisor:terminate_child(couch_secondary_services, query_servers), - supervisor:restart_child(couch_secondary_services, query_servers); -config_change("native_query_servers") -> - supervisor:terminate_child(couch_secondary_services, query_servers), - supervisor:restart_child(couch_secondary_services, query_servers). - -% Private API - -get_query_server_config() -> - ReduceLimit = list_to_atom( - couch_config:get("query_server_config","reduce_limit","true")), - {[{<<"reduce_limit">>, ReduceLimit}]}. - -new_process(Langs, Lang) -> - case ets:lookup(Langs, Lang) of - [{Lang, Mod, Func, Arg}] -> - {ok, Pid} = apply(Mod, Func, Arg), - {ok, #proc{lang=Lang, - pid=Pid, - % Called via proc_prompt, proc_set_timeout, and proc_stop - prompt_fun={Mod, prompt}, - set_timeout_fun={Mod, set_timeout}, - stop_fun={Mod, stop}}}; - _ -> - {unknown_query_language, Lang} - end. - -proc_with_ddoc(DDoc, DDocKey, LangProcs) -> - DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) -> - lists:any(fun(Key) -> - Key == DDocKey - end, Keys) - end, LangProcs), - case DDocProcs of - [DDocProc|_] -> - ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]), - {ok, DDocProc}; - [] -> - [TeachProc|_] = LangProcs, - ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]), - {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc), - {ok, SmartProc} - end. - proc_prompt(Proc, Args) -> {Mod, Func} = Proc#proc.prompt_fun, apply(Mod, Func, [Proc#proc.pid, Args]). @@ -380,28 +219,15 @@ proc_set_timeout(Proc, Timeout) -> {Mod, Func} = Proc#proc.set_timeout_fun, apply(Mod, Func, [Proc#proc.pid, Timeout]). -teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> - % send ddoc over the wire - % we only share the rev with the client we know to update code - % but it only keeps the latest copy, per each ddoc, around. - true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), - % we should remove any other ddocs keys for this docid - % because the query server overwrites without the rev - Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], - % add ddoc to the proc - {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. - get_ddoc_process(#doc{} = DDoc, DDocKey) -> % remove this case statement - case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of + case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}) of {ok, Proc, QueryConfig} -> % process knows the ddoc case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> proc_set_timeout(Proc, list_to_integer(couch_config:get( "couchdb", "os_process_timeout", "5000"))), - link(Proc#proc.pid), - gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; _ -> catch proc_stop(Proc), @@ -412,14 +238,12 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) -> end. get_os_process(Lang) -> - case gen_server:call(couch_query_servers, {get_proc, Lang}) of + case gen_server:call(couch_proc_manager, {get_proc, Lang}) of {ok, Proc, QueryConfig} -> case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> proc_set_timeout(Proc, list_to_integer(couch_config:get( "couchdb", "os_process_timeout", "5000"))), - link(Proc#proc.pid), - gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; _ -> catch proc_stop(Proc), @@ -430,38 +254,6 @@ get_os_process(Lang) -> end. ret_os_process(Proc) -> - true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + true = gen_server:call(couch_proc_manager, {ret_proc, Proc}), catch unlink(Proc#proc.pid), ok. - -add_value(Tid, Key, Value) -> - true = ets:insert(Tid, {Key, Value}). - -rem_value(Tid, Key) -> - true = ets:delete(Tid, Key). - -add_to_list(Tid, Key, Value) -> - case ets:lookup(Tid, Key) of - [{Key, Vals}] -> - true = ets:insert(Tid, {Key, [Value|Vals]}); - [] -> - true = ets:insert(Tid, {Key, [Value]}) - end. - -rem_from_list(Tid, Key, Value) when is_record(Value, proc)-> - Pid = Value#proc.pid, - case ets:lookup(Tid, Key) of - [{Key, Vals}] -> - % make a new values list that doesn't include the Value arg - NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid], - ets:insert(Tid, {Key, NewValues}); - [] -> ok - end; -rem_from_list(Tid, Key, Value) -> - case ets:lookup(Tid, Key) of - [{Key, Vals}] -> - % make a new values list that doesn't include the Value arg - NewValues = [Val || Val <- Vals, Val /= Value], - ets:insert(Tid, {Key, NewValues}); - [] -> ok - end. |