diff options
| author | John Christopher Anderson <jchris@apache.org> | 2010-08-07 19:26:50 +0000 | 
|---|---|---|
| committer | John Christopher Anderson <jchris@apache.org> | 2010-08-07 19:26:50 +0000 | 
| commit | e18c591080642b9f1cedad94cf71a7053d28669c (patch) | |
| tree | 314e1e69dcf1d013a096e6f742e388951cc220e0 | |
| parent | ccaa1037c27c39f50c5666f530a4c5e1067a867c (diff) | |
os_process_limit for query servers make them much more robust under concurrent load
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@983291 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | etc/couchdb/default.ini.tpl.in | 1 | ||||
| -rw-r--r-- | src/couchdb/couch_query_servers.erl | 224 | 
2 files changed, 159 insertions, 66 deletions
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in index a890beb7..872b1444 100644 --- a/etc/couchdb/default.ini.tpl.in +++ b/etc/couchdb/default.ini.tpl.in @@ -46,6 +46,7 @@ javascript = %bindir%/%couchjs_command_name% %localbuilddatadir%/server/main.js  ; please let us know on the mailing list so we can fine tune the heuristic.  [query_server_config]  reduce_limit = true +os_process_limit = 25  ; enable external as an httpd handler, then link it with commands here.  ; note, this api is still under consideration. diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 3df7beb0..6660ee74 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -35,6 +35,15 @@      stop_fun  }). +-record(qserver, { +    langs, % Keyed by language name, value is {Mod,Func,Arg} +    pid_procs, % Keyed by PID, valus is a #proc record. +    lang_procs, % Keyed by language name, value is a #proc record +    lang_limits, % Keyed by language name, value is {Lang, Limit, Current} +    waitlist = [], +    config +}). +  start_link() ->      gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). @@ -242,81 +251,97 @@ init([]) ->              supervisor:terminate_child(couch_secondary_services, query_servers),              [supervisor:restart_child(couch_secondary_services, query_servers)]          end), +    ok = couch_config:register( +        fun("query_server_config" ++ _, _) -> +            supervisor:terminate_child(couch_secondary_services, query_servers), +            supervisor:restart_child(couch_secondary_services, query_servers) +        end),      Langs = ets:new(couch_query_server_langs, [set, private]), +    LangLimits = ets:new(couch_query_server_lang_limits, [set, private]),      PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),      LangProcs = ets:new(couch_query_server_procs, [set, private]), + +    ProcTimeout = list_to_integer(couch_config:get( +                        "couchdb", "os_process_timeout", "5000")), +    ReduceLimit = list_to_atom( +        couch_config:get("query_server_config","reduce_limit","true")), +    OsProcLimit = list_to_integer( +        couch_config:get("query_server_config","os_process_limit","10")), +      % 'query_servers' specifies an OS command-line to execute.      lists:foreach(fun({Lang, Command}) -> +        true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}),          true = ets:insert(Langs, {?l2b(Lang),                            couch_os_process, start_link, [Command]})      end, couch_config:get("query_servers")),      % 'native_query_servers' specifies a {Module, Func, Arg} tuple.      lists:foreach(fun({Lang, SpecStr}) ->          {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), +        true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit          true = ets:insert(Langs, {?l2b(Lang),                            Mod, Fun, SpecArg})      end, couch_config:get("native_query_servers")), -    process_flag(trap_exit, true), -    {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} -          PidProcs, % Keyed by PID, valus is a #proc record. -          LangProcs % Keyed by language name, value is a #proc record -          }}. -terminate(_Reason, {_Langs, PidProcs, _LangProcs}) -> + +    process_flag(trap_exit, true), +    {ok, #qserver{ +        langs = Langs, % Keyed by language name, value is {Mod,Func,Arg} +        pid_procs = PidProcs, % Keyed by PID, valus is a #proc record. +        lang_procs = LangProcs, % Keyed by language name, value is a #proc record +        lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current} +        config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]} +    }}. + +terminate(_Reason, #qserver{pid_procs=PidProcs}) ->      [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],      ok. -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) -> -    % Note to future self. Add max process limit. +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) ->      Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), -    case ets:lookup(LangProcs, Lang) of -    [{Lang, [P|Rest]}] -> -        % find a proc in the set that has the DDoc -        {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]), -        rem_from_list(LangProcs, Lang, Proc), -        {reply, {ok, Proc, get_query_server_config()}, Server}; -    _ -> -        case (catch new_process(Langs, Lang)) of -        {ok, Proc} -> -            add_value(PidProcs, Proc#proc.pid, Proc), -            {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]), -            {reply, {ok, Proc2, get_query_server_config()}, Server}; -        Error -> -            {reply, Error, Server} -        end +    case lang_proc(Lang, Server, fun(Procs) -> +            % find a proc in the set that has the DDoc +            proc_with_ddoc(DDoc, DDocKey, Procs) +        end) of +    {ok, Proc} -> +        {reply, {ok, Proc, Server#qserver.config}, Server}; +    wait -> +        {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)}; +    Error -> +        {reply, Error, Server}      end; -handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) -> -    % Note to future self. Add max process limit. -    case ets:lookup(LangProcs, Lang) of -    [{Lang, [Proc|_]}] -> -        rem_from_list(LangProcs, Lang, Proc), -        {reply, {ok, Proc, get_query_server_config()}, Server}; -    _ -> -        case (catch new_process(Langs, Lang)) of -        {ok, Proc} -> -            add_value(PidProcs, Proc#proc.pid, Proc), -            {reply, {ok, Proc, get_query_server_config()}, Server}; -        Error -> -            {reply, Error, Server} -        end +handle_call({get_proc, Lang}, From, Server) -> +    case lang_proc(Lang, Server, fun([P|_Procs]) -> +            {ok, P} +        end) of +    {ok, Proc} -> +        {reply, {ok, Proc, Server#qserver.config}, Server}; +    wait -> +        {noreply, add_to_waitlist({Lang}, From, Server)}; +    Error -> +        {reply, Error, Server}      end; -handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) -> +handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) ->      rem_value(PidProcs, Pid),      unlink(Pid),      {reply, ok, Server}; -handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) -> +handle_call({ret_proc, Proc}, _From, #qserver{ +        pid_procs=PidProcs, +        lang_procs=LangProcs}=Server) ->      % Along with max process limit, here we should check      % if we're over the limit and discard when we are.      add_value(PidProcs, Proc#proc.pid, Proc),      add_to_list(LangProcs, Proc#proc.lang, Proc),      link(Proc#proc.pid), -    {reply, true, Server}. +    {reply, true, service_waitlist(Server)}.  handle_cast(_Whatever, Server) ->      {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> +handle_info({'EXIT', Pid, Status}, #qserver{ +        pid_procs=PidProcs, +        lang_procs=LangProcs, +        lang_limits=LangLimits}=Server) ->      case ets:lookup(PidProcs, Pid) of      [{Pid, Proc}] ->          case Status of @@ -325,7 +350,9 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->          end,          rem_value(PidProcs, Pid),          catch rem_from_list(LangProcs, Proc#proc.lang, Proc), -        {noreply, Server}; +        [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang), +        true = ets:insert(LangLimits, {Lang, Lim, Current-1}), +        {noreply, service_waitlist(Server)};      [] ->          case Status of          normal -> @@ -340,23 +367,90 @@ code_change(_OldVsn, State, _Extra) ->  % Private API -get_query_server_config() -> -    ReduceLimit = list_to_atom( -        couch_config:get("query_server_config","reduce_limit","true")), -    {[{<<"reduce_limit">>, ReduceLimit}]}. - -new_process(Langs, Lang) -> -    case ets:lookup(Langs, Lang) of -    [{Lang, Mod, Func, Arg}] -> -        {ok, Pid} = apply(Mod, Func, Arg), -        {ok, #proc{lang=Lang, -                   pid=Pid, -                   % Called via proc_prompt, proc_set_timeout, and proc_stop -                   prompt_fun={Mod, prompt}, -                   set_timeout_fun={Mod, set_timeout}, -                   stop_fun={Mod, stop}}}; +add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) -> +    Server#qserver{waitlist=[{Info, From}|Waitlist]}. + +service_waitlist(#qserver{waitlist=[]}=Server) -> +    Server; +service_waitlist(#qserver{waitlist=Waitlist}=Server) -> +    [Oldest|RevWList] = lists:reverse(Waitlist), +    case service_waiting(Oldest, Server) of +    ok -> +        Server#qserver{waitlist=lists:reverse(RevWList)}; +    wait -> +        Server#qserver{waitlist=Waitlist} +    end. + +% todo get rid of duplication +service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) -> +    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), +    case lang_proc(Lang, Server, fun(Procs) -> +            % find a proc in the set that has the DDoc +            proc_with_ddoc(DDoc, DDocKey, Procs) +        end) of +    {ok, Proc} -> +        gen_server:reply(From, {ok, Proc, Server#qserver.config}), +        ok; +    wait -> % this should never happen +        wait; +    Error -> +        gen_server:reply(From, Error), +        ok +    end; +service_waiting({{Lang}, From}, Server) -> +    case lang_proc(Lang, Server, fun([P|Procs]) -> +            {ok, P} +        end) of +    {ok, Proc} -> +        gen_server:reply(From, {ok, Proc, Server#qserver.config}), +        ok; +    wait -> % this should never happen +        wait; +    Error -> +        gen_server:reply(From, Error), +        ok +    end. + +lang_proc(Lang, #qserver{ +        langs=Langs, +        pid_procs=PidProcs, +        lang_procs=LangProcs, +        lang_limits=LangLimits}, PickFun) -> +    % Note to future self. Add max process limit. +    case ets:lookup(LangProcs, Lang) of +    [{Lang, [P|Procs]}] -> +        {ok, Proc} = PickFun([P|Procs]), +        rem_from_list(LangProcs, Lang, Proc), +        {ok, Proc};      _ -> -        {unknown_query_language, Lang} +        case (catch new_process(Langs, LangLimits, Lang)) of +        {ok, Proc} -> +            add_value(PidProcs, Proc#proc.pid, Proc), +            {ok, Proc2} = PickFun([Proc]); +        ErrorOrWait -> +            ErrorOrWait +        end +    end. + +new_process(Langs, LangLimits, Lang) -> +    [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang), +    if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit +        % we are below the limit for our language, make a new one +        case ets:lookup(Langs, Lang) of +        [{Lang, Mod, Func, Arg}] -> +            {ok, Pid} = apply(Mod, Func, Arg), +            true = ets:insert(LangLimits, {Lang, Lim, Current+1}), +            {ok, #proc{lang=Lang, +                       pid=Pid, +                       % Called via proc_prompt, proc_set_timeout, and proc_stop +                       prompt_fun={Mod, prompt}, +                       set_timeout_fun={Mod, set_timeout}, +                       stop_fun={Mod, stop}}}; +        _ -> +            {unknown_query_language, Lang} +        end; +    true -> +        wait      end.  proc_with_ddoc(DDoc, DDocKey, LangProcs) -> @@ -402,12 +496,11 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->  get_ddoc_process(#doc{} = DDoc, DDocKey) ->      % remove this case statement      case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of -    {ok, Proc, QueryConfig} -> +    {ok, Proc, {QueryConfig}} ->          % process knows the ddoc -        case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of +        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of          true -> -            proc_set_timeout(Proc, list_to_integer(couch_config:get( -                                "couchdb", "os_process_timeout", "5000"))), +            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),              link(Proc#proc.pid),              gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),              Proc; @@ -421,11 +514,10 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->  get_os_process(Lang) ->      case gen_server:call(couch_query_servers, {get_proc, Lang}) of -    {ok, Proc, QueryConfig} -> -        case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of +    {ok, Proc, {QueryConfig}} -> +        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of          true -> -            proc_set_timeout(Proc, list_to_integer(couch_config:get( -                                "couchdb", "os_process_timeout", "5000"))), +            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),              link(Proc#proc.pid),              gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),              Proc;  | 
