From e10858cd53e9b2a1cf769d7d71e4c1adcf30b4f3 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Wed, 12 Aug 2009 19:58:14 +0000 Subject: Introduces native Erlang query servers. Closes COUCHDB-377 Thanks Mark Hammond and Paul Davis for doing most of the work, and Michael McDaniel for the inspiration. There is still room for improvement on the APIs exposed to the Erlang views, as well as likely a whole lot of work to be done to increase parallelism. But the important part now is that we have native Erlang views. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@803685 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/Makefile.am | 2 + src/couchdb/couch_httpd_show.erl | 1 + src/couchdb/couch_native_process.erl | 347 +++++++++++++++++++++++++++++++++++ src/couchdb/couch_query_servers.erl | 205 ++++++++++++--------- 4 files changed, 473 insertions(+), 82 deletions(-) create mode 100644 src/couchdb/couch_native_process.erl (limited to 'src') diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 6691dbba..7d7b1720 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -70,6 +70,7 @@ source_files = \ couch_httpd_stats_handlers.erl \ couch_key_tree.erl \ couch_log.erl \ + couch_native_process.erl \ couch_os_process.erl \ couch_query_servers.erl \ couch_ref_counter.erl \ @@ -122,6 +123,7 @@ compiled_files = \ couch_httpd_stats_handlers.beam \ couch_key_tree.beam \ couch_log.beam \ + couch_native_process.beam \ couch_os_process.beam \ couch_query_servers.beam \ couch_ref_counter.beam \ diff --git a/src/couchdb/couch_httpd_show.erl b/src/couchdb/couch_httpd_show.erl index 294acbac..ed0b9ede 100644 --- a/src/couchdb/couch_httpd_show.erl +++ b/src/couchdb/couch_httpd_show.erl @@ -416,6 +416,7 @@ send_doc_update_response(Lang, UpdateSrc, DocId, Doc, Req, Db) -> end, NewDoc = couch_doc:from_json_obj({NewJsonDoc}), Code = 201, + % todo set location field {ok, _NewRev} = couch_db:update_doc(Db, NewDoc, Options); [<<"up">>, _Other, JsonResp] -> Code = 200, diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl new file mode 100644 index 00000000..df879633 --- /dev/null +++ b/src/couchdb/couch_native_process.erl @@ -0,0 +1,347 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% +% You may obtain a copy of the License at +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, +% software distributed under the License is distributed on an +% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +% either express or implied. +% +% See the License for the specific language governing permissions +% and limitations under the License. +% +% This file drew much inspiration from erlview, which was written by and +% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0 +% +% +% This module provides the smallest possible native view-server. +% With this module in-place, you can add the following to your couch INI files: +% [native_query_servers] +% erlang={couch_native_process, start_link, []} +% +% Which will then allow following example map function to be used: +% +% fun({Doc}) -> +% % Below, we emit a single record - the _id as key, null as value +% DocId = proplists:get_value(Doc, <<"_id">>, null), +% Emit(DocId, null) +% end. +% +% which should be roughly the same as the javascript: +% emit(doc._id, null); +% +% This module exposes enough functions such that a native erlang server can +% act as a fully-fleged view server, but no 'helper' functions specifically +% for simplifying your erlang view code. It is expected other third-party +% extensions will evolve which offer useful layers on top of this view server +% to help simplify your view code. +-module(couch_native_process). + +-export([start_link/0]). +-export([set_timeout/2, prompt/2, stop/1]). + +-define(STATE, native_proc_state). +-record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}). + +-include("couch_db.hrl"). + +start_link() -> + {ok, self()}. + +stop(_Pid) -> + ok. + +set_timeout(_Pid, TimeOut) -> + NewState = case get(?STATE) of + undefined -> + #evstate{timeout=TimeOut}; + State -> + State#evstate{timeout=TimeOut} + end, + put(?STATE, NewState), + ok. + +prompt(Pid, Data) when is_pid(Pid), is_list(Data) -> + case get(?STATE) of + undefined -> + State = #evstate{}, + put(?STATE, State); + State -> + State + end, + case is_pid(State#evstate.list_pid) of + true -> + case hd(Data) of + <<"list_row">> -> ok; + <<"list_end">> -> ok; + _ -> throw({error, query_server_error}) + end; + _ -> + ok % Not listing + end, + {NewState, Resp} = run(State, Data), + put(?STATE, NewState), + case Resp of + {error, Reason} -> + Msg = io_lib:format("couch native server error: ~p", [Reason]), + {[{<<"error">>, list_to_binary(Msg)}]}; + _ -> + Resp + end. + +run(_, [<<"reset">>]) -> + {#evstate{}, true}; +run(_, [<<"reset">>, QueryConfig]) -> + {#evstate{query_config=QueryConfig}, true}; +run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> + FunInfo = makefun(State, BinFunc), + {State#evstate{funs=Funs ++ [FunInfo]}, true}; +run(State, [<<"map_doc">> , Doc]) -> + Resp = lists:map(fun({Sig, Fun}) -> + erlang:put(Sig, []), + Fun(Doc), + lists:reverse(erlang:get(Sig)) + end, State#evstate.funs), + {State, Resp}; +run(State, [<<"reduce">>, Funs, KVs]) -> + {Keys, Vals} = + lists:foldl(fun([K, V], {KAcc, VAcc}) -> + {[K | KAcc], [V | VAcc]} + end, {[], []}, KVs), + Keys2 = lists:reverse(Keys), + Vals2 = lists:reverse(Vals), + {State, catch reduce(State, Funs, Keys2, Vals2, false)}; +run(State, [<<"rereduce">>, Funs, Vals]) -> + {State, catch reduce(State, Funs, null, Vals, true)}; +run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) -> + {_Sig, Fun} = makefun(State, BFun), + {State, catch Fun(NDoc, ODoc, Ctx)}; +run(State, [<<"filter">>, Docs, Req]) -> + {_Sig, Fun} = hd(State#evstate.funs), + Resp = lists:map(fun(Doc) -> + case (catch Fun(Doc, Req)) of + true -> true; + _ -> false + end + end, Docs), + {State, [true, Resp]}; +run(State, [<<"show">>, BFun, Doc, Req]) -> + {_Sig, Fun} = makefun(State, BFun), + Resp = case (catch Fun(Doc, Req)) of + FunResp when is_list(FunResp) -> + FunResp; + FunResp when is_tuple(FunResp), size(FunResp) == 1 -> + [<<"resp">>, FunResp]; + FunResp -> + FunResp + end, + {State, Resp}; +run(State, [<<"update">>, BFun, Doc, Req]) -> + {_Sig, Fun} = makefun(State, BFun), + Resp = case (catch Fun(Doc, Req)) of + [JsonDoc, JsonResp] -> + [<<"up">>, JsonDoc, JsonResp] + end, + {State, Resp}; +run(State, [<<"list">>, Head, Req]) -> + {Sig, Fun} = hd(State#evstate.funs), + % This is kinda dirty + case is_function(Fun, 2) of + false -> throw({error, render_error}); + true -> ok + end, + Self = self(), + SpawnFun = fun() -> + LastChunk = (catch Fun(Head, Req)), + case start_list_resp(Self, Sig) of + started -> + receive + {Self, list_row, _Row} -> ignore; + {Self, list_end} -> ignore + after State#evstate.timeout -> + throw({timeout, list_cleanup_pid}) + end; + _ -> + ok + end, + LastChunks = + case erlang:get(Sig) of + undefined -> [LastChunk]; + OtherChunks -> [LastChunk | OtherChunks] + end, + Self ! {self(), list_end, lists:reverse(LastChunks)} + end, + erlang:put(do_trap, process_flag(trap_exit, true)), + Pid = spawn_link(SpawnFun), + Resp = + receive + {Pid, start, Chunks, JsonResp} -> + [<<"start">>, Chunks, JsonResp] + after State#evstate.timeout -> + throw({timeout, list_start}) + end, + {State#evstate{list_pid=Pid}, Resp}; +run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> + Pid ! {self(), list_row, Row}, + receive + {Pid, chunks, Data} -> + {State, [<<"chunks">>, Data]}; + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, [<<"end">>, Data]} + after State#evstate.timeout -> + throw({timeout, list_row}) + end; +run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> + Pid ! {self(), list_end}, + Resp = + receive + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + [<<"end">>, Data] + after State#evstate.timeout -> + throw({timeout, list_end}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, Resp}; +run(_, Unknown) -> + ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), + throw({error, query_server_error}). + +bindings(State, Sig) -> + Self = self(), + + Log = fun(Msg) -> + ?LOG_INFO(Msg, []) + end, + + Emit = fun(Id, Value) -> + Curr = erlang:get(Sig), + erlang:put(Sig, [[Id, Value] | Curr]) + end, + + Start = fun(Headers) -> + erlang:put(list_headers, Headers) + end, + + Send = fun(Chunk) -> + Curr = + case erlang:get(Sig) of + undefined -> []; + Else -> Else + end, + erlang:put(Sig, [Chunk | Curr]) + end, + + GetRow = fun() -> + case start_list_resp(Self, Sig) of + started -> + ok; + _ -> + Chunks = + case erlang:get(Sig) of + undefined -> []; + CurrChunks -> CurrChunks + end, + Self ! {self(), chunks, lists:reverse(Chunks)} + end, + erlang:put(Sig, []), + receive + {Self, list_row, Row} -> Row; + {Self, list_end} -> nil + after State#evstate.timeout -> + throw({timeout, list_pid_getrow}) + end + end, + + FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, + + [ + {'Log', Log}, + {'Emit', Emit}, + {'Start', Start}, + {'Send', Send}, + {'GetRow', GetRow}, + {'FoldRows', FoldRows} + ]. + +% thanks to erlview, via: +% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html +makefun(State, Source) -> + Sig = erlang:md5(Source), + BindFuns = bindings(State, Sig), + {Sig, makefun(State, Source, BindFuns)}. + +makefun(_State, Source, BindFuns) -> + FunStr = binary_to_list(Source), + {ok, Tokens, _} = erl_scan:string(FunStr), + Form = case (catch erl_parse:parse_exprs(Tokens)) of + {ok, [ParsedForm]} -> + ParsedForm; + {error, {LineNum, _Mod, [Mesg, Params]}}=Error -> + io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]), + io:format(standard_error, "~s~p~n", [Mesg, Params]), + throw(Error) + end, + Bindings = lists:foldl(fun({Name, Fun}, Acc) -> + erl_eval:add_binding(Name, Fun, Acc) + end, erl_eval:new_bindings(), BindFuns), + {value, Fun, _} = erl_eval:expr(Form, Bindings), + Fun. + +reduce(State, BinFuns, Keys, Vals, ReReduce) -> + Funs = case is_list(BinFuns) of + true -> + lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); + _ -> + [makefun(State, BinFuns)] + end, + Reds = lists:map(fun({_Sig, Fun}) -> + Fun(Keys, Vals, ReReduce) + end, Funs), + [true, Reds]. + +foldrows(GetRow, ProcRow, Acc) -> + case GetRow() of + nil -> + {ok, Acc}; + Row -> + case (catch ProcRow(Row, Acc)) of + {ok, Acc2} -> + foldrows(GetRow, ProcRow, Acc2); + {stop, Acc2} -> + {ok, Acc2} + end + end. + +start_list_resp(Self, Sig) -> + case erlang:get(list_started) of + undefined -> + Headers = + case erlang:get(list_headers) of + undefined -> {[{<<"headers">>, {[]}}]}; + CurrHdrs -> CurrHdrs + end, + Chunks = + case erlang:get(Sig) of + undefined -> []; + CurrChunks -> CurrChunks + end, + Self ! {self(), start, lists:reverse(Chunks), Headers}, + erlang:put(list_started, true), + erlang:put(Sig, []), + started; + _ -> + ok + end. diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index fca7c85a..f94cc28b 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -25,6 +25,14 @@ -include("couch_db.hrl"). +-record(proc, { + pid, + lang, + prompt_fun, + set_timeout_fun, + stop_fun +}). + start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). @@ -32,19 +40,19 @@ stop() -> exit(whereis(couch_query_servers), close). start_doc_map(Lang, Functions) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> - true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource]) + true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) end, Functions), - {ok, {Lang, Pid}}. + {ok, Proc}. -map_docs({_Lang, Pid}, Docs) -> +map_docs(Proc, Docs) -> % send the documents Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]), + FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]), % the results are a json array of function map yields like this: % [FunResults1, FunResults2 ...] % where funresults is are json arrays of key value pairs: @@ -63,8 +71,8 @@ map_docs({_Lang, Pid}, Docs) -> stop_doc_map(nil) -> ok; -stop_doc_map({Lang, Pid}) -> - ok = ret_os_process(Lang, Pid). +stop_doc_map(Proc) -> + ok = ret_os_process(Proc). group_reductions_results([]) -> []; @@ -83,7 +91,7 @@ group_reductions_results(List) -> rereduce(_Lang, [], _ReducedValues) -> {ok, []}; rereduce(Lang, RedSrcs, ReducedValues) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), Grouped = group_reductions_results(ReducedValues), Results = try lists:zipwith( fun @@ -92,11 +100,11 @@ rereduce(Lang, RedSrcs, ReducedValues) -> Result; (FunSrc, Values) -> [true, [Result]] = - couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]), + proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]), Result end, RedSrcs, Grouped) after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end, {ok, Results}. @@ -121,12 +129,11 @@ recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) os_reduce(_Lang, [], _KVs) -> {ok, []}; os_reduce(Lang, OsRedSrcs, KVs) -> - Pid = get_os_process(Lang), - OsResults = try couch_os_process:prompt(Pid, - [<<"reduce">>, OsRedSrcs, KVs]) of + Proc = get_os_process(Lang), + OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of [true, Reductions] -> Reductions after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end, {ok, OsResults}. @@ -151,7 +158,7 @@ builtin_sum_rows(KVs) -> end, 0, KVs). validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), JsonDiskDoc = if DiskDoc == nil -> @@ -159,7 +166,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> true -> couch_doc:to_json_obj(DiskDoc, [revs]) end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of 1 -> ok; @@ -168,14 +175,14 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> {[{<<"unauthorized">>, Message}]} -> throw({unauthorized, Message}) after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. % todo use json_apply_field append_docid(DocId, JsonReqIn) -> [{<<"docId">>, DocId} | JsonReqIn]. render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), {JsonReq, JsonDoc} = case {DocId, Doc} of @@ -183,16 +190,16 @@ render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of FormResp -> FormResp after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> - Pid = get_os_process(Lang), + Proc = get_os_process(Lang), {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), {JsonReq, JsonDoc} = case {DocId, Doc} of @@ -200,51 +207,51 @@ render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} end, - try couch_os_process:prompt(Pid, + try proc_prompt(Proc, [<<"update">>, UpdateSrc, JsonDoc, JsonReq]) of FormResp -> FormResp after - ok = ret_os_process(Lang, Pid) + ok = ret_os_process(Proc) end. start_view_list(Lang, ListSrc) -> - Pid = get_os_process(Lang), - true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]), - {ok, {Lang, Pid}}. + Proc = get_os_process(Lang), + proc_prompt(Proc, [<<"add_fun">>, ListSrc]), + {ok, Proc}. -render_list_head({_Lang, Pid}, Req, Db, Head) -> +render_list_head(Proc, Req, Db, Head) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), - couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]). + proc_prompt(Proc, [<<"list">>, Head, JsonReq]). -render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) -> +render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) -> JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), - couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]); + proc_prompt(Proc, [<<"list_row">>, JsonRow]); -render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) -> +render_list_row(Proc, _, {Key, Value}, _IncludeDoc) -> JsonRow = {[{key, Key}, {value, Value}]}, - couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]). + proc_prompt(Proc, [<<"list_row">>, JsonRow]). -render_list_tail({Lang, Pid}) -> - JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]), - ok = ret_os_process(Lang, Pid), +render_list_tail(Proc) -> + JsonResp = proc_prompt(Proc, [<<"list_end">>]), + ok = ret_os_process(Proc), JsonResp. start_filter(Lang, FilterSrc) -> - Pid = get_os_process(Lang), - true = couch_os_process:prompt(Pid, [<<"add_fun">>, FilterSrc]), - {ok, {Lang, Pid}}. + Proc = get_os_process(Lang), + true = proc_prompt(Proc, [<<"add_fun">>, FilterSrc]), + {ok, Proc}. -filter_doc({_Lang, Pid}, Doc, Req, Db) -> +filter_doc(Proc, Doc, Req, Db) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), JsonDoc = couch_doc:to_json_obj(Doc, [revs]), JsonCtx = couch_util:json_user_ctx(Db), - [true, [Pass]] = couch_os_process:prompt(Pid, + [true, [Pass]] = proc_prompt(Proc, [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]), {ok, Pass}. -end_filter({Lang, Pid}) -> - ok = ret_os_process(Lang, Pid). +end_filter(Proc) -> + ok = ret_os_process(Proc). init([]) -> @@ -258,58 +265,74 @@ init([]) -> fun("query_servers" ++ _, _) -> ?MODULE:stop() end), + ok = couch_config:register( + fun("native_query_servers" ++ _, _) -> + ?MODULE:stop() + end), Langs = ets:new(couch_query_server_langs, [set, private]), - PidLangs = ets:new(couch_query_server_pid_langs, [set, private]), - Pids = ets:new(couch_query_server_procs, [set, private]), + PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), + LangProcs = ets:new(couch_query_server_procs, [set, private]), InUse = ets:new(couch_query_server_used, [set, private]), + % 'query_servers' specifies an OS command-line to execute. lists:foreach(fun({Lang, Command}) -> - true = ets:insert(Langs, {?l2b(Lang), Command}) + true = ets:insert(Langs, {?l2b(Lang), + couch_os_process, start_link, [Command]}) end, couch_config:get("query_servers")), + % 'native_query_servers' specifies a {Module, Func, Arg} tuple. + lists:foreach(fun({Lang, SpecStr}) -> + {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), + true = ets:insert(Langs, {?l2b(Lang), + Mod, Fun, SpecArg}) + end, couch_config:get("native_query_servers")), process_flag(trap_exit, true), - {ok, {Langs, PidLangs, Pids, InUse}}. + {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} + PidProcs, % Keyed by PID, valus is a #proc record. + LangProcs, % Keyed by language name, value is a #proc record + InUse % Keyed by PID, value is #proc record. + }}. terminate(_Reason, _Server) -> ok. -handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) -> +handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. - case ets:lookup(Pids, Lang) of - [{Lang, [Pid|_]}] -> - add_value(PidLangs, Pid, Lang), - rem_from_list(Pids, Lang, Pid), - add_to_list(InUse, Lang, Pid), - {reply, {recycled, Pid, get_query_server_config()}, Server}; + case ets:lookup(LangProcs, Lang) of + [{Lang, [Proc|_]}] -> + add_value(PidProcs, Proc#proc.pid, Proc), + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), + {reply, {recycled, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of - {ok, Pid} -> - add_to_list(InUse, Lang, Pid), - {reply, {new, Pid}, Server}; + {ok, Proc} -> + add_to_list(InUse, Lang, Proc), + {reply, {new, Proc}, Server}; Error -> {reply, Error, Server} end end; -handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> +handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) -> % Along with max process limit, here we should check % if we're over the limit and discard when we are. - add_to_list(Pids, Lang, Pid), - rem_from_list(InUse, Lang, Pid), + add_to_list(LangProcs, Proc#proc.lang, Proc), + rem_from_list(InUse, Proc#proc.lang, Proc), {reply, true, Server}. handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) -> - case ets:lookup(PidLangs, Pid) of - [{Pid, Lang}] -> +handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> + case ets:lookup(PidProcs, Pid) of + [{Pid, Proc}] -> case Status of normal -> ok; _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) end, - rem_value(PidLangs, Pid), - catch rem_from_list(Pids, Lang, Pid), - catch rem_from_list(InUse, Lang, Pid), + rem_value(PidProcs, Pid), + catch rem_from_list(LangProcs, Proc#proc.lang, Proc), + catch rem_from_list(InUse, Proc#proc.lang, Proc), {noreply, Server}; [] -> ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), @@ -328,37 +351,55 @@ get_query_server_config() -> new_process(Langs, Lang) -> case ets:lookup(Langs, Lang) of - [{Lang, Command}] -> - couch_os_process:start_link(Command); + [{Lang, Mod, Func, Arg}] -> + {ok, Pid} = apply(Mod, Func, Arg), + {ok, #proc{lang=Lang, + pid=Pid, + % Called via proc_prompt, proc_set_timeout, and proc_stop + prompt_fun={Mod, prompt}, + set_timeout_fun={Mod, set_timeout}, + stop_fun={Mod, stop}}}; _ -> {unknown_query_language, Lang} end. +proc_prompt(Proc, Args) -> + {Mod, Func} = Proc#proc.prompt_fun, + apply(Mod, Func, [Proc#proc.pid, Args]). + +proc_stop(Proc) -> + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Proc#proc.pid]). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + get_os_process(Lang) -> case gen_server:call(couch_query_servers, {get_proc, Lang}) of - {new, Pid} -> - couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Pid), - Pid; - {recycled, Pid, QueryConfig} -> - case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of + {new, Proc} -> + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; + {recycled, Proc, QueryConfig} -> + case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> - couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Pid), - Pid; + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; _ -> - catch couch_os_process:stop(Pid), + catch proc_stop(Proc), get_os_process(Lang) end; Error -> throw(Error) end. -ret_os_process(Lang, Pid) -> - true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}), - catch unlink(Pid), +ret_os_process(Proc) -> + true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + catch unlink(Proc#proc.pid), ok. add_value(Tid, Key, Value) -> -- cgit v1.2.3