diff options
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 205 |
1 files changed, 123 insertions, 82 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index fca7c85a..f94cc28b 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -25,6 +25,14 @@ -include("couch_db.hrl"). +-record(proc, { + pid, + lang, + prompt_fun, + set_timeout_fun, + stop_fun +}). + start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). @@ -32,19 +40,19 @@ stop() -> exit(whereis(couch_query_servers), close). start_doc_map(Lang, Functions) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> - true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource]) + true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) end, Functions), - {ok, {Lang, Pid}}. + {ok, Proc}. -map_docs({_Lang, Pid}, Docs) -> +map_docs(Proc, Docs) -> % send the documents Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]), + FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]), % the results are a json array of function map yields like this: % [FunResults1, FunResults2 ...] % where funresults is are json arrays of key value pairs: @@ -63,8 +71,8 @@ map_docs({_Lang, Pid}, Docs) -> stop_doc_map(nil) -> ok; -stop_doc_map({Lang, Pid}) -> - ok = ret_os_process(Lang, Pid). +stop_doc_map(Proc) -> + ok = ret_os_process(Proc). group_reductions_results([]) -> []; @@ -83,7 +91,7 @@ group_reductions_results(List) -> rereduce(_Lang, [], _ReducedValues) -> {ok, []}; rereduce(Lang, RedSrcs, ReducedValues) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), Grouped = group_reductions_results(ReducedValues), Results = try lists:zipwith( fun @@ -92,11 +100,11 @@ rereduce(Lang, RedSrcs, ReducedValues) -> Result; (FunSrc, Values) -> [true, [Result]] = - couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]), + proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]), Result end, RedSrcs, Grouped) after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end, {ok, Results}. @@ -121,12 +129,11 @@ recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) os_reduce(_Lang, [], _KVs) -> {ok, []}; os_reduce(Lang, OsRedSrcs, KVs) -> - Pid = get_os_process(Lang), - OsResults = try couch_os_process:prompt(Pid, - [<<"reduce">>, OsRedSrcs, KVs]) of + Proc = get_os_process(Lang), + OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of [true, Reductions] -> Reductions after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end, {ok, OsResults}. @@ -151,7 +158,7 @@ builtin_sum_rows(KVs) -> end, 0, KVs). validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), JsonDiskDoc = if DiskDoc == nil -> @@ -159,7 +166,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> true -> couch_doc:to_json_obj(DiskDoc, [revs]) end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of 1 -> ok; @@ -168,14 +175,14 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> {[{<<"unauthorized">>, Message}]} -> throw({unauthorized, Message}) after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. % todo use json_apply_field append_docid(DocId, JsonReqIn) -> [{<<"docId">>, DocId} | JsonReqIn]. render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), {JsonReq, JsonDoc} = case {DocId, Doc} of @@ -183,16 +190,16 @@ render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of FormResp -> FormResp after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), {JsonReq, JsonDoc} = case {DocId, Doc} of @@ -200,51 +207,51 @@ render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"update">>, UpdateSrc, JsonDoc, JsonReq]) of FormResp -> FormResp after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. start_view_list(Lang, ListSrc) -> - Pid = get_os_process(Lang), - true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]), - {ok, {Lang, Pid}}. + Proc = get_os_process(Lang), + proc_prompt(Proc, [<<"add_fun">>, ListSrc]), + {ok, Proc}. -render_list_head({_Lang, Pid}, Req, Db, Head) -> +render_list_head(Proc, Req, Db, Head) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), - couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]). + proc_prompt(Proc, [<<"list">>, Head, JsonReq]). -render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) -> +render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) -> JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), - couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]); + proc_prompt(Proc, [<<"list_row">>, JsonRow]); -render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) -> +render_list_row(Proc, _, {Key, Value}, _IncludeDoc) -> JsonRow = {[{key, Key}, {value, Value}]}, - couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]). + proc_prompt(Proc, [<<"list_row">>, JsonRow]). -render_list_tail({Lang, Pid}) -> - JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]), - ok = ret_os_process(Lang, Pid), +render_list_tail(Proc) -> + JsonResp = proc_prompt(Proc, [<<"list_end">>]), + ok = ret_os_process(Proc), JsonResp. start_filter(Lang, FilterSrc) -> - Pid = get_os_process(Lang), - true = couch_os_process:prompt(Pid, [<<"add_fun">>, FilterSrc]), - {ok, {Lang, Pid}}. + Proc = get_os_process(Lang), + true = proc_prompt(Proc, [<<"add_fun">>, FilterSrc]), + {ok, Proc}. -filter_doc({_Lang, Pid}, Doc, Req, Db) -> +filter_doc(Proc, Doc, Req, Db) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), JsonDoc = couch_doc:to_json_obj(Doc, [revs]), JsonCtx = couch_util:json_user_ctx(Db), - [true, [Pass]] = couch_os_process:prompt(Pid, + [true, [Pass]] = proc_prompt(Proc, [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]), {ok, Pass}. -end_filter({Lang, Pid}) -> - ok = ret_os_process(Lang, Pid). +end_filter(Proc) -> + ok = ret_os_process(Proc). init([]) -> @@ -258,58 +265,74 @@ init([]) -> fun("query_servers" ++ _, _) -> ?MODULE:stop() end), + ok = couch_config:register( + fun("native_query_servers" ++ _, _) -> + ?MODULE:stop() + end), Langs = ets:new(couch_query_server_langs, [set, private]), - PidLangs = ets:new(couch_query_server_pid_langs, [set, private]), - Pids = ets:new(couch_query_server_procs, [set, private]), + PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), + LangProcs = ets:new(couch_query_server_procs, [set, private]), InUse = ets:new(couch_query_server_used, [set, private]), + % 'query_servers' specifies an OS command-line to execute. lists:foreach(fun({Lang, Command}) -> - true = ets:insert(Langs, {?l2b(Lang), Command}) + true = ets:insert(Langs, {?l2b(Lang), + couch_os_process, start_link, [Command]}) end, couch_config:get("query_servers")), + % 'native_query_servers' specifies a {Module, Func, Arg} tuple. + lists:foreach(fun({Lang, SpecStr}) -> + {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), + true = ets:insert(Langs, {?l2b(Lang), + Mod, Fun, SpecArg}) + end, couch_config:get("native_query_servers")), process_flag(trap_exit, true), - {ok, {Langs, PidLangs, Pids, InUse}}. + {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} + PidProcs, % Keyed by PID, valus is a #proc record. + LangProcs, % Keyed by language name, value is a #proc record + InUse % Keyed by PID, value is #proc record. + }}. terminate(_Reason, _Server) -> ok. -handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) -> +handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. - case ets:lookup(Pids, Lang) of - [{Lang, [Pid|_]}] -> - add_value(PidLangs, Pid, Lang), - rem_from_list(Pids, Lang, Pid), - add_to_list(InUse, Lang, Pid), - {reply, {recycled, Pid, get_query_server_config()}, Server}; + case ets:lookup(LangProcs, Lang) of + [{Lang, [Proc|_]}] -> + add_value(PidProcs, Proc#proc.pid, Proc), + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), + {reply, {recycled, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of - {ok, Pid} -> - add_to_list(InUse, Lang, Pid), - {reply, {new, Pid}, Server}; + {ok, Proc} -> + add_to_list(InUse, Lang, Proc), + {reply, {new, Proc}, Server}; Error -> {reply, Error, Server} end end; -handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> +handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) -> % Along with max process limit, here we should check % if we're over the limit and discard when we are. - add_to_list(Pids, Lang, Pid), - rem_from_list(InUse, Lang, Pid), + add_to_list(LangProcs, Proc#proc.lang, Proc), + rem_from_list(InUse, Proc#proc.lang, Proc), {reply, true, Server}. handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) -> - case ets:lookup(PidLangs, Pid) of - [{Pid, Lang}] -> +handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> + case ets:lookup(PidProcs, Pid) of + [{Pid, Proc}] -> case Status of normal -> ok; _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) end, - rem_value(PidLangs, Pid), - catch rem_from_list(Pids, Lang, Pid), - catch rem_from_list(InUse, Lang, Pid), + rem_value(PidProcs, Pid), + catch rem_from_list(LangProcs, Proc#proc.lang, Proc), + catch rem_from_list(InUse, Proc#proc.lang, Proc), {noreply, Server}; [] -> ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), @@ -328,37 +351,55 @@ get_query_server_config() -> new_process(Langs, Lang) -> case ets:lookup(Langs, Lang) of - [{Lang, Command}] -> - couch_os_process:start_link(Command); + [{Lang, Mod, Func, Arg}] -> + {ok, Pid} = apply(Mod, Func, Arg), + {ok, #proc{lang=Lang, + pid=Pid, + % Called via proc_prompt, proc_set_timeout, and proc_stop + prompt_fun={Mod, prompt}, + set_timeout_fun={Mod, set_timeout}, + stop_fun={Mod, stop}}}; _ -> {unknown_query_language, Lang} end. +proc_prompt(Proc, Args) -> + {Mod, Func} = Proc#proc.prompt_fun, + apply(Mod, Func, [Proc#proc.pid, Args]). + +proc_stop(Proc) -> + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Proc#proc.pid]). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + get_os_process(Lang) -> case gen_server:call(couch_query_servers, {get_proc, Lang}) of - {new, Pid} -> - couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Pid), - Pid; - {recycled, Pid, QueryConfig} -> - case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of + {new, Proc} -> + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; + {recycled, Proc, QueryConfig} -> + case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> - couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Pid), - Pid; + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; _ -> - catch couch_os_process:stop(Pid), + catch proc_stop(Proc), get_os_process(Lang) end; Error -> throw(Error) end. -ret_os_process(Lang, Pid) -> - true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}), - catch unlink(Pid), +ret_os_process(Proc) -> + true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + catch unlink(Proc#proc.pid), ok. add_value(Tid, Key, Value) -> |