diff options
author | John Christopher Anderson <jchris@apache.org> | 2009-08-12 19:58:14 +0000 |
---|---|---|
committer | John Christopher Anderson <jchris@apache.org> | 2009-08-12 19:58:14 +0000 |
commit | e10858cd53e9b2a1cf769d7d71e4c1adcf30b4f3 (patch) | |
tree | 3f2ed7cdde56caf6559b6402b0861e3249ca54f4 /src/couchdb/couch_query_servers.erl | |
parent | d6cb0bc17d834675a69620940036490b909a4b0d (diff) |
Introduces native Erlang query servers. Closes COUCHDB-377
Thanks Mark Hammond and Paul Davis for doing most of the work, and Michael McDaniel for the inspiration.
There is still room for improvement on the APIs exposed to the Erlang views, as well as likely a whole lot of work to be done to increase parallelism. But the important part now is that we have native Erlang views.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@803685 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 205 |
1 files changed, 123 insertions, 82 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index fca7c85a..f94cc28b 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -25,6 +25,14 @@ -include("couch_db.hrl"). +-record(proc, { + pid, + lang, + prompt_fun, + set_timeout_fun, + stop_fun +}). + start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). @@ -32,19 +40,19 @@ stop() -> exit(whereis(couch_query_servers), close). start_doc_map(Lang, Functions) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> - true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource]) + true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) end, Functions), - {ok, {Lang, Pid}}. + {ok, Proc}. -map_docs({_Lang, Pid}, Docs) -> +map_docs(Proc, Docs) -> % send the documents Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]), + FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]), % the results are a json array of function map yields like this: % [FunResults1, FunResults2 ...] % where funresults is are json arrays of key value pairs: @@ -63,8 +71,8 @@ map_docs({_Lang, Pid}, Docs) -> stop_doc_map(nil) -> ok; -stop_doc_map({Lang, Pid}) -> - ok = ret_os_process(Lang, Pid). +stop_doc_map(Proc) -> + ok = ret_os_process(Proc). group_reductions_results([]) -> []; @@ -83,7 +91,7 @@ group_reductions_results(List) -> rereduce(_Lang, [], _ReducedValues) -> {ok, []}; rereduce(Lang, RedSrcs, ReducedValues) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), Grouped = group_reductions_results(ReducedValues), Results = try lists:zipwith( fun @@ -92,11 +100,11 @@ rereduce(Lang, RedSrcs, ReducedValues) -> Result; (FunSrc, Values) -> [true, [Result]] = - couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]), + proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]), Result end, RedSrcs, Grouped) after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end, {ok, Results}. @@ -121,12 +129,11 @@ recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) os_reduce(_Lang, [], _KVs) -> {ok, []}; os_reduce(Lang, OsRedSrcs, KVs) -> - Pid = get_os_process(Lang), - OsResults = try couch_os_process:prompt(Pid, - [<<"reduce">>, OsRedSrcs, KVs]) of + Proc = get_os_process(Lang), + OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of [true, Reductions] -> Reductions after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end, {ok, OsResults}. @@ -151,7 +158,7 @@ builtin_sum_rows(KVs) -> end, 0, KVs). validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), JsonDiskDoc = if DiskDoc == nil -> @@ -159,7 +166,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> true -> couch_doc:to_json_obj(DiskDoc, [revs]) end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of 1 -> ok; @@ -168,14 +175,14 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> {[{<<"unauthorized">>, Message}]} -> throw({unauthorized, Message}) after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. % todo use json_apply_field append_docid(DocId, JsonReqIn) -> [{<<"docId">>, DocId} | JsonReqIn]. render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), {JsonReq, JsonDoc} = case {DocId, Doc} of @@ -183,16 +190,16 @@ render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of FormResp -> FormResp after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), {JsonReq, JsonDoc} = case {DocId, Doc} of @@ -200,51 +207,51 @@ render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"update">>, UpdateSrc, JsonDoc, JsonReq]) of FormResp -> FormResp after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. start_view_list(Lang, ListSrc) -> - Pid = get_os_process(Lang), - true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]), - {ok, {Lang, Pid}}. + Proc = get_os_process(Lang), + proc_prompt(Proc, [<<"add_fun">>, ListSrc]), + {ok, Proc}. -render_list_head({_Lang, Pid}, Req, Db, Head) -> +render_list_head(Proc, Req, Db, Head) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), - couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]). + proc_prompt(Proc, [<<"list">>, Head, JsonReq]). -render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) -> +render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) -> JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), - couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]); + proc_prompt(Proc, [<<"list_row">>, JsonRow]); -render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) -> +render_list_row(Proc, _, {Key, Value}, _IncludeDoc) -> JsonRow = {[{key, Key}, {value, Value}]}, - couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]). + proc_prompt(Proc, [<<"list_row">>, JsonRow]). -render_list_tail({Lang, Pid}) -> - JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]), - ok = ret_os_process(Lang, Pid), +render_list_tail(Proc) -> + JsonResp = proc_prompt(Proc, [<<"list_end">>]), + ok = ret_os_process(Proc), JsonResp. start_filter(Lang, FilterSrc) -> - Pid = get_os_process(Lang), - true = couch_os_process:prompt(Pid, [<<"add_fun">>, FilterSrc]), - {ok, {Lang, Pid}}. + Proc = get_os_process(Lang), + true = proc_prompt(Proc, [<<"add_fun">>, FilterSrc]), + {ok, Proc}. -filter_doc({_Lang, Pid}, Doc, Req, Db) -> +filter_doc(Proc, Doc, Req, Db) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), JsonDoc = couch_doc:to_json_obj(Doc, [revs]), JsonCtx = couch_util:json_user_ctx(Db), - [true, [Pass]] = couch_os_process:prompt(Pid, + [true, [Pass]] = proc_prompt(Proc, [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]), {ok, Pass}. -end_filter({Lang, Pid}) -> - ok = ret_os_process(Lang, Pid). +end_filter(Proc) -> + ok = ret_os_process(Proc). init([]) -> @@ -258,58 +265,74 @@ init([]) -> fun("query_servers" ++ _, _) -> ?MODULE:stop() end), + ok = couch_config:register( + fun("native_query_servers" ++ _, _) -> + ?MODULE:stop() + end), Langs = ets:new(couch_query_server_langs, [set, private]), - PidLangs = ets:new(couch_query_server_pid_langs, [set, private]), - Pids = ets:new(couch_query_server_procs, [set, private]), + PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), + LangProcs = ets:new(couch_query_server_procs, [set, private]), InUse = ets:new(couch_query_server_used, [set, private]), + % 'query_servers' specifies an OS command-line to execute. lists:foreach(fun({Lang, Command}) -> - true = ets:insert(Langs, {?l2b(Lang), Command}) + true = ets:insert(Langs, {?l2b(Lang), + couch_os_process, start_link, [Command]}) end, couch_config:get("query_servers")), + % 'native_query_servers' specifies a {Module, Func, Arg} tuple. + lists:foreach(fun({Lang, SpecStr}) -> + {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), + true = ets:insert(Langs, {?l2b(Lang), + Mod, Fun, SpecArg}) + end, couch_config:get("native_query_servers")), process_flag(trap_exit, true), - {ok, {Langs, PidLangs, Pids, InUse}}. + {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} + PidProcs, % Keyed by PID, valus is a #proc record. + LangProcs, % Keyed by language name, value is a #proc record + InUse % Keyed by PID, value is #proc record. + }}. terminate(_Reason, _Server) -> ok. -handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) -> +handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. - case ets:lookup(Pids, Lang) of - [{Lang, [Pid|_]}] -> - add_value(PidLangs, Pid, Lang), - rem_from_list(Pids, Lang, Pid), - add_to_list(InUse, Lang, Pid), - {reply, {recycled, Pid, get_query_server_config()}, Server}; + case ets:lookup(LangProcs, Lang) of + [{Lang, [Proc|_]}] -> + add_value(PidProcs, Proc#proc.pid, Proc), + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), + {reply, {recycled, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of - {ok, Pid} -> - add_to_list(InUse, Lang, Pid), - {reply, {new, Pid}, Server}; + {ok, Proc} -> + add_to_list(InUse, Lang, Proc), + {reply, {new, Proc}, Server}; Error -> {reply, Error, Server} end end; -handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> +handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) -> % Along with max process limit, here we should check % if we're over the limit and discard when we are. - add_to_list(Pids, Lang, Pid), - rem_from_list(InUse, Lang, Pid), + add_to_list(LangProcs, Proc#proc.lang, Proc), + rem_from_list(InUse, Proc#proc.lang, Proc), {reply, true, Server}. handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) -> - case ets:lookup(PidLangs, Pid) of - [{Pid, Lang}] -> +handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> + case ets:lookup(PidProcs, Pid) of + [{Pid, Proc}] -> case Status of normal -> ok; _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) end, - rem_value(PidLangs, Pid), - catch rem_from_list(Pids, Lang, Pid), - catch rem_from_list(InUse, Lang, Pid), + rem_value(PidProcs, Pid), + catch rem_from_list(LangProcs, Proc#proc.lang, Proc), + catch rem_from_list(InUse, Proc#proc.lang, Proc), {noreply, Server}; [] -> ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), @@ -328,37 +351,55 @@ get_query_server_config() -> new_process(Langs, Lang) -> case ets:lookup(Langs, Lang) of - [{Lang, Command}] -> - couch_os_process:start_link(Command); + [{Lang, Mod, Func, Arg}] -> + {ok, Pid} = apply(Mod, Func, Arg), + {ok, #proc{lang=Lang, + pid=Pid, + % Called via proc_prompt, proc_set_timeout, and proc_stop + prompt_fun={Mod, prompt}, + set_timeout_fun={Mod, set_timeout}, + stop_fun={Mod, stop}}}; _ -> {unknown_query_language, Lang} end. +proc_prompt(Proc, Args) -> + {Mod, Func} = Proc#proc.prompt_fun, + apply(Mod, Func, [Proc#proc.pid, Args]). + +proc_stop(Proc) -> + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Proc#proc.pid]). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + get_os_process(Lang) -> case gen_server:call(couch_query_servers, {get_proc, Lang}) of - {new, Pid} -> - couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Pid), - Pid; - {recycled, Pid, QueryConfig} -> - case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of + {new, Proc} -> + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; + {recycled, Proc, QueryConfig} -> + case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> - couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Pid), - Pid; + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; _ -> - catch couch_os_process:stop(Pid), + catch proc_stop(Proc), get_os_process(Lang) end; Error -> throw(Error) end. -ret_os_process(Lang, Pid) -> - true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}), - catch unlink(Pid), +ret_os_process(Proc) -> + true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + catch unlink(Proc#proc.pid), ok. add_value(Tid, Key, Value) -> |