summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl205
1 files changed, 123 insertions, 82 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index fca7c85a..f94cc28b 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -25,6 +25,14 @@
-include("couch_db.hrl").
+-record(proc, {
+ pid,
+ lang,
+ prompt_fun,
+ set_timeout_fun,
+ stop_fun
+}).
+
start_link() ->
gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
@@ -32,19 +40,19 @@ stop() ->
exit(whereis(couch_query_servers), close).
start_doc_map(Lang, Functions) ->
- Pid = get_os_process(Lang),
+ Proc = get_os_process(Lang),
lists:foreach(fun(FunctionSource) ->
- true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource])
+ true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
end, Functions),
- {ok, {Lang, Pid}}.
+ {ok, Proc}.
-map_docs({_Lang, Pid}, Docs) ->
+map_docs(Proc, Docs) ->
% send the documents
Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
- FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]),
+ FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]),
% the results are a json array of function map yields like this:
% [FunResults1, FunResults2 ...]
% where funresults is are json arrays of key value pairs:
@@ -63,8 +71,8 @@ map_docs({_Lang, Pid}, Docs) ->
stop_doc_map(nil) ->
ok;
-stop_doc_map({Lang, Pid}) ->
- ok = ret_os_process(Lang, Pid).
+stop_doc_map(Proc) ->
+ ok = ret_os_process(Proc).
group_reductions_results([]) ->
[];
@@ -83,7 +91,7 @@ group_reductions_results(List) ->
rereduce(_Lang, [], _ReducedValues) ->
{ok, []};
rereduce(Lang, RedSrcs, ReducedValues) ->
- Pid = get_os_process(Lang),
+ Proc = get_os_process(Lang),
Grouped = group_reductions_results(ReducedValues),
Results = try lists:zipwith(
fun
@@ -92,11 +100,11 @@ rereduce(Lang, RedSrcs, ReducedValues) ->
Result;
(FunSrc, Values) ->
[true, [Result]] =
- couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]),
+ proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]),
Result
end, RedSrcs, Grouped)
after
- ok = ret_os_process(Lang, Pid)
+ ok = ret_os_process(Proc)
end,
{ok, Results}.
@@ -121,12 +129,11 @@ recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc)
os_reduce(_Lang, [], _KVs) ->
{ok, []};
os_reduce(Lang, OsRedSrcs, KVs) ->
- Pid = get_os_process(Lang),
- OsResults = try couch_os_process:prompt(Pid,
- [<<"reduce">>, OsRedSrcs, KVs]) of
+ Proc = get_os_process(Lang),
+ OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
[true, Reductions] -> Reductions
after
- ok = ret_os_process(Lang, Pid)
+ ok = ret_os_process(Proc)
end,
{ok, OsResults}.
@@ -151,7 +158,7 @@ builtin_sum_rows(KVs) ->
end, 0, KVs).
validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
- Pid = get_os_process(Lang),
+ Proc = get_os_process(Lang),
JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
JsonDiskDoc =
if DiskDoc == nil ->
@@ -159,7 +166,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
true ->
couch_doc:to_json_obj(DiskDoc, [revs])
end,
- try couch_os_process:prompt(Pid,
+ try proc_prompt(Proc,
[<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
1 ->
ok;
@@ -168,14 +175,14 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
{[{<<"unauthorized">>, Message}]} ->
throw({unauthorized, Message})
after
- ok = ret_os_process(Lang, Pid)
+ ok = ret_os_process(Proc)
end.
% todo use json_apply_field
append_docid(DocId, JsonReqIn) ->
[{<<"docId">>, DocId} | JsonReqIn].
render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) ->
- Pid = get_os_process(Lang),
+ Proc = get_os_process(Lang),
{JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db),
{JsonReq, JsonDoc} = case {DocId, Doc} of
@@ -183,16 +190,16 @@ render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) ->
{DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null};
_ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])}
end,
- try couch_os_process:prompt(Pid,
+ try proc_prompt(Proc,
[<<"show">>, ShowSrc, JsonDoc, JsonReq]) of
FormResp ->
FormResp
after
- ok = ret_os_process(Lang, Pid)
+ ok = ret_os_process(Proc)
end.
render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) ->
- Pid = get_os_process(Lang),
+ Proc = get_os_process(Lang),
{JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db),
{JsonReq, JsonDoc} = case {DocId, Doc} of
@@ -200,51 +207,51 @@ render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) ->
{DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null};
_ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])}
end,
- try couch_os_process:prompt(Pid,
+ try proc_prompt(Proc,
[<<"update">>, UpdateSrc, JsonDoc, JsonReq]) of
FormResp ->
FormResp
after
- ok = ret_os_process(Lang, Pid)
+ ok = ret_os_process(Proc)
end.
start_view_list(Lang, ListSrc) ->
- Pid = get_os_process(Lang),
- true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]),
- {ok, {Lang, Pid}}.
+ Proc = get_os_process(Lang),
+ proc_prompt(Proc, [<<"add_fun">>, ListSrc]),
+ {ok, Proc}.
-render_list_head({_Lang, Pid}, Req, Db, Head) ->
+render_list_head(Proc, Req, Db, Head) ->
JsonReq = couch_httpd_external:json_req_obj(Req, Db),
- couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]).
+ proc_prompt(Proc, [<<"list">>, Head, JsonReq]).
-render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) ->
+render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) ->
JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc),
- couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]);
+ proc_prompt(Proc, [<<"list_row">>, JsonRow]);
-render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) ->
+render_list_row(Proc, _, {Key, Value}, _IncludeDoc) ->
JsonRow = {[{key, Key}, {value, Value}]},
- couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]).
+ proc_prompt(Proc, [<<"list_row">>, JsonRow]).
-render_list_tail({Lang, Pid}) ->
- JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]),
- ok = ret_os_process(Lang, Pid),
+render_list_tail(Proc) ->
+ JsonResp = proc_prompt(Proc, [<<"list_end">>]),
+ ok = ret_os_process(Proc),
JsonResp.
start_filter(Lang, FilterSrc) ->
- Pid = get_os_process(Lang),
- true = couch_os_process:prompt(Pid, [<<"add_fun">>, FilterSrc]),
- {ok, {Lang, Pid}}.
+ Proc = get_os_process(Lang),
+ true = proc_prompt(Proc, [<<"add_fun">>, FilterSrc]),
+ {ok, Proc}.
-filter_doc({_Lang, Pid}, Doc, Req, Db) ->
+filter_doc(Proc, Doc, Req, Db) ->
JsonReq = couch_httpd_external:json_req_obj(Req, Db),
JsonDoc = couch_doc:to_json_obj(Doc, [revs]),
JsonCtx = couch_util:json_user_ctx(Db),
- [true, [Pass]] = couch_os_process:prompt(Pid,
+ [true, [Pass]] = proc_prompt(Proc,
[<<"filter">>, [JsonDoc], JsonReq, JsonCtx]),
{ok, Pass}.
-end_filter({Lang, Pid}) ->
- ok = ret_os_process(Lang, Pid).
+end_filter(Proc) ->
+ ok = ret_os_process(Proc).
init([]) ->
@@ -258,58 +265,74 @@ init([]) ->
fun("query_servers" ++ _, _) ->
?MODULE:stop()
end),
+ ok = couch_config:register(
+ fun("native_query_servers" ++ _, _) ->
+ ?MODULE:stop()
+ end),
Langs = ets:new(couch_query_server_langs, [set, private]),
- PidLangs = ets:new(couch_query_server_pid_langs, [set, private]),
- Pids = ets:new(couch_query_server_procs, [set, private]),
+ PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
+ LangProcs = ets:new(couch_query_server_procs, [set, private]),
InUse = ets:new(couch_query_server_used, [set, private]),
+ % 'query_servers' specifies an OS command-line to execute.
lists:foreach(fun({Lang, Command}) ->
- true = ets:insert(Langs, {?l2b(Lang), Command})
+ true = ets:insert(Langs, {?l2b(Lang),
+ couch_os_process, start_link, [Command]})
end, couch_config:get("query_servers")),
+ % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
+ lists:foreach(fun({Lang, SpecStr}) ->
+ {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
+ true = ets:insert(Langs, {?l2b(Lang),
+ Mod, Fun, SpecArg})
+ end, couch_config:get("native_query_servers")),
process_flag(trap_exit, true),
- {ok, {Langs, PidLangs, Pids, InUse}}.
+ {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
+ PidProcs, % Keyed by PID, valus is a #proc record.
+ LangProcs, % Keyed by language name, value is a #proc record
+ InUse % Keyed by PID, value is #proc record.
+ }}.
terminate(_Reason, _Server) ->
ok.
-handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
+handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) ->
% Note to future self. Add max process limit.
- case ets:lookup(Pids, Lang) of
- [{Lang, [Pid|_]}] ->
- add_value(PidLangs, Pid, Lang),
- rem_from_list(Pids, Lang, Pid),
- add_to_list(InUse, Lang, Pid),
- {reply, {recycled, Pid, get_query_server_config()}, Server};
+ case ets:lookup(LangProcs, Lang) of
+ [{Lang, [Proc|_]}] ->
+ add_value(PidProcs, Proc#proc.pid, Proc),
+ rem_from_list(LangProcs, Lang, Proc),
+ add_to_list(InUse, Lang, Proc),
+ {reply, {recycled, Proc, get_query_server_config()}, Server};
_ ->
case (catch new_process(Langs, Lang)) of
- {ok, Pid} ->
- add_to_list(InUse, Lang, Pid),
- {reply, {new, Pid}, Server};
+ {ok, Proc} ->
+ add_to_list(InUse, Lang, Proc),
+ {reply, {new, Proc}, Server};
Error ->
{reply, Error, Server}
end
end;
-handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
+handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) ->
% Along with max process limit, here we should check
% if we're over the limit and discard when we are.
- add_to_list(Pids, Lang, Pid),
- rem_from_list(InUse, Lang, Pid),
+ add_to_list(LangProcs, Proc#proc.lang, Proc),
+ rem_from_list(InUse, Proc#proc.lang, Proc),
{reply, true, Server}.
handle_cast(_Whatever, Server) ->
{noreply, Server}.
-handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) ->
- case ets:lookup(PidLangs, Pid) of
- [{Pid, Lang}] ->
+handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) ->
+ case ets:lookup(PidProcs, Pid) of
+ [{Pid, Proc}] ->
case Status of
normal -> ok;
_ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
end,
- rem_value(PidLangs, Pid),
- catch rem_from_list(Pids, Lang, Pid),
- catch rem_from_list(InUse, Lang, Pid),
+ rem_value(PidProcs, Pid),
+ catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
+ catch rem_from_list(InUse, Proc#proc.lang, Proc),
{noreply, Server};
[] ->
?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
@@ -328,37 +351,55 @@ get_query_server_config() ->
new_process(Langs, Lang) ->
case ets:lookup(Langs, Lang) of
- [{Lang, Command}] ->
- couch_os_process:start_link(Command);
+ [{Lang, Mod, Func, Arg}] ->
+ {ok, Pid} = apply(Mod, Func, Arg),
+ {ok, #proc{lang=Lang,
+ pid=Pid,
+ % Called via proc_prompt, proc_set_timeout, and proc_stop
+ prompt_fun={Mod, prompt},
+ set_timeout_fun={Mod, set_timeout},
+ stop_fun={Mod, stop}}};
_ ->
{unknown_query_language, Lang}
end.
+proc_prompt(Proc, Args) ->
+ {Mod, Func} = Proc#proc.prompt_fun,
+ apply(Mod, Func, [Proc#proc.pid, Args]).
+
+proc_stop(Proc) ->
+ {Mod, Func} = Proc#proc.stop_fun,
+ apply(Mod, Func, [Proc#proc.pid]).
+
+proc_set_timeout(Proc, Timeout) ->
+ {Mod, Func} = Proc#proc.set_timeout_fun,
+ apply(Mod, Func, [Proc#proc.pid, Timeout]).
+
get_os_process(Lang) ->
case gen_server:call(couch_query_servers, {get_proc, Lang}) of
- {new, Pid} ->
- couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
- "couchdb", "os_process_timeout", "5000"))),
- link(Pid),
- Pid;
- {recycled, Pid, QueryConfig} ->
- case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of
+ {new, Proc} ->
+ proc_set_timeout(Proc, list_to_integer(couch_config:get(
+ "couchdb", "os_process_timeout", "5000"))),
+ link(Proc#proc.pid),
+ Proc;
+ {recycled, Proc, QueryConfig} ->
+ case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
true ->
- couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
- "couchdb", "os_process_timeout", "5000"))),
- link(Pid),
- Pid;
+ proc_set_timeout(Proc, list_to_integer(couch_config:get(
+ "couchdb", "os_process_timeout", "5000"))),
+ link(Proc#proc.pid),
+ Proc;
_ ->
- catch couch_os_process:stop(Pid),
+ catch proc_stop(Proc),
get_os_process(Lang)
end;
Error ->
throw(Error)
end.
-ret_os_process(Lang, Pid) ->
- true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
- catch unlink(Pid),
+ret_os_process(Proc) ->
+ true = gen_server:call(couch_query_servers, {ret_proc, Proc}),
+ catch unlink(Proc#proc.pid),
ok.
add_value(Tid, Key, Value) ->