diff options
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 257 |
1 files changed, 156 insertions, 101 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 4ac56727..30f4c4c7 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -17,10 +17,11 @@ -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). --export([reduce/3, rereduce/3,validate_doc_update/5]). --export([render_doc_show/6, render_doc_update/6, start_view_list/2, - render_list_head/4, render_list_row/4, render_list_tail/1]). +-export([reduce/3, rereduce/3,validate_doc_update/4]). -export([filter_docs/5]). + +-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). + % -export([test/0]). -include("couch_db.hrl"). @@ -28,6 +29,7 @@ -record(proc, { pid, lang, + ddoc_keys = [], prompt_fun, set_timeout_fun, stop_fun @@ -37,7 +39,7 @@ start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). stop() -> - exit(whereis(couch_query_servers), close). + exit(whereis(couch_query_servers), normal). start_doc_map(Lang, Functions) -> Proc = get_os_process(Lang), @@ -91,21 +93,15 @@ group_reductions_results(List) -> rereduce(_Lang, [], _ReducedValues) -> {ok, []}; rereduce(Lang, RedSrcs, ReducedValues) -> - Proc = get_os_process(Lang), - Grouped = group_reductions_results(ReducedValues), - Results = try lists:zipwith( + Grouped = group_reductions_results(ReducedValues), + Results = lists:zipwith( fun (<<"_", _/binary>> = FunSrc, Values) -> {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), Result; (FunSrc, Values) -> - [true, [Result]] = - proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]), - Result - end, RedSrcs, Grouped) - after - ok = ret_os_process(Proc) - end, + os_rereduce(Lang, [FunSrc], Values) + end, RedSrcs, Grouped), {ok, Results}. reduce(_Lang, [], _KVs) -> @@ -137,6 +133,17 @@ os_reduce(Lang, OsRedSrcs, KVs) -> end, {ok, OsResults}. +os_rereduce(_Lang, [], _KVs) -> + {ok, []}; +os_rereduce(Lang, OsRedSrcs, KVs) -> + Proc = get_os_process(Lang), + try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of + [true, [Reduction]] -> Reduction + after + ok = ret_os_process(Proc) + end. + + builtin_reduce(_Re, [], _KVs, Acc) -> {ok, lists:reverse(Acc)}; builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) -> @@ -157,92 +164,49 @@ builtin_sum_rows(KVs) -> throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>}) end, 0, KVs). -validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> - Proc = get_os_process(Lang), - JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), - JsonDiskDoc = - if DiskDoc == nil -> - null; - true -> - couch_doc:to_json_obj(DiskDoc, [revs]) - end, - try proc_prompt(Proc, - [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of - 1 -> - ok; - {[{<<"forbidden">>, Message}]} -> - throw({forbidden, Message}); - {[{<<"unauthorized">>, Message}]} -> - throw({unauthorized, Message}) - after - ok = ret_os_process(Proc) - end. -% todo use json_apply_field -append_docid(DocId, JsonReqIn) -> - [{<<"docId">>, DocId} | JsonReqIn]. -render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> - Proc = get_os_process(Lang), - {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), - - {JsonReq, JsonDoc} = case {DocId, Doc} of - {nil, nil} -> {{JsonReqIn}, null}; - {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; - _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} - end, - try proc_prompt(Proc, [<<"show">>, ShowSrc, JsonDoc, JsonReq]) - after - ok = ret_os_process(Proc) - end. - -render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> - Proc = get_os_process(Lang), - {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), - - {JsonReq, JsonDoc} = case {DocId, Doc} of - {nil, nil} -> {{JsonReqIn}, null}; - {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; - _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} - end, - try proc_prompt(Proc, [<<"update">>, UpdateSrc, JsonDoc, JsonReq]) - after - ok = ret_os_process(Proc) +% use the function stored in ddoc.validate_doc_update to test an update. +validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx) -> + JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), + JsonDiskDoc = json_doc(DiskDoc), + case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx]) of + 1 -> + ok; + {[{<<"forbidden">>, Message}]} -> + throw({forbidden, Message}); + {[{<<"unauthorized">>, Message}]} -> + throw({unauthorized, Message}) end. -start_view_list(Lang, ListSrc) -> - Proc = get_os_process(Lang), - proc_prompt(Proc, [<<"add_fun">>, ListSrc]), - {ok, Proc}. - -render_list_head(Proc, Req, Db, Head) -> - JsonReq = couch_httpd_external:json_req_obj(Req, Db), - proc_prompt(Proc, [<<"list">>, Head, JsonReq]). - -render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) -> - JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), - proc_prompt(Proc, [<<"list_row">>, JsonRow]); - -render_list_row(Proc, _, {Key, Value}, _IncludeDoc) -> - JsonRow = {[{key, Key}, {value, Value}]}, - proc_prompt(Proc, [<<"list_row">>, JsonRow]). - -render_list_tail(Proc) -> - JsonResp = proc_prompt(Proc, [<<"list_end">>]), - ok = ret_os_process(Proc), - JsonResp. +json_doc(nil) -> null; +json_doc(Doc) -> + couch_doc:to_json_obj(Doc, [revs]). -filter_docs(Lang, Src, Docs, Req, Db) -> +filter_docs(Req, Db, DDoc, FName, Docs) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs], JsonCtx = couch_util:json_user_ctx(Db), - Proc = get_os_process(Lang), - [true, Passes] = proc_prompt(Proc, - [<<"filter">>, Src, JsonDocs, JsonReq, JsonCtx]), - ret_os_process(Proc), - {ok, Passes}. + [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq, JsonCtx]), + {ok, Passes}. + +ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]). + +ddoc_prompt(DDoc, FunPath, Args) -> + with_ddoc_proc(DDoc, fun({Proc, DDocId}) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]) + end). + +with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> + Rev = couch_doc:rev_to_str({Start, DiskRev}), + DDocKey = {DDocId, Rev}, + Proc = get_ddoc_process(DDoc, DDocKey), + try Fun({Proc, DDocId}) + after + ok = ret_os_process(Proc) + end. init([]) -> - % read config and register for configuration changes % just stop if one of the config settings change. couch_server_sup @@ -282,7 +246,39 @@ init([]) -> terminate(_Reason, _Server) -> ok. - +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> + % Note to future self. Add max process limit. + Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), + case ets:lookup(LangProcs, Lang) of + [{Lang, [P|Rest]}] -> + % find a proc in the set that has the DDoc + case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of + {ok, Proc} -> + % looks like the proc isn't getting dropped from the list. + % we need to change this to take a fun for equality checking + % so we can do a comparison on portnum + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), + {reply, {ok, Proc, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; + _ -> + case (catch new_process(Langs, Lang)) of + {ok, Proc} -> + add_value(PidProcs, Proc#proc.pid, Proc), + case proc_with_ddoc(DDoc, DDocKey, [Proc]) of + {ok, Proc2} -> + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc2), + {reply, {ok, Proc2, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; + Error -> + {reply, Error, Server} + end + end; handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. case ets:lookup(LangProcs, Lang) of @@ -290,12 +286,13 @@ handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) add_value(PidProcs, Proc#proc.pid, Proc), rem_from_list(LangProcs, Lang, Proc), add_to_list(InUse, Lang, Proc), - {reply, {recycled, Proc, get_query_server_config()}, Server}; + {reply, {ok, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> + add_value(PidProcs, Proc#proc.pid, Proc), add_to_list(InUse, Lang, Proc), - {reply, {new, Proc}, Server}; + {reply, {ok, Proc, get_query_server_config()}, Server}; Error -> {reply, Error, Server} end @@ -350,6 +347,23 @@ new_process(Langs, Lang) -> {unknown_query_language, Lang} end. +proc_with_ddoc(DDoc, DDocKey, LangProcs) -> + DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) -> + lists:any(fun(Key) -> + Key == DDocKey + end, Keys) + end, LangProcs), + case DDocProcs of + [DDocProc|_] -> + ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]), + {ok, DDocProc}; + [] -> + [TeachProc|_] = LangProcs, + ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]), + {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc), + {ok, SmartProc} + end. + proc_prompt(Proc, Args) -> {Mod, Func} = Proc#proc.prompt_fun, apply(Mod, Func, [Proc#proc.pid, Args]). @@ -362,14 +376,44 @@ proc_set_timeout(Proc, Timeout) -> {Mod, Func} = Proc#proc.set_timeout_fun, apply(Mod, Func, [Proc#proc.pid, Timeout]). +teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> + % send ddoc over the wire + % we only share the rev with the client we know to update code + % but it only keeps the latest copy, per each ddoc, around. + true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), + % we should remove any other ddocs keys for this docid + % because the query server overwrites without the rev + Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + % add ddoc to the proc + {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. + +get_ddoc_process(#doc{} = DDoc, DDocKey) -> + % remove this case statement + case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of + {ok, Proc, QueryConfig} -> + % process knows the ddoc + case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of + true -> + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; + _ -> + catch proc_stop(Proc), + get_ddoc_process(DDoc, DDocKey) + end; + Error -> + throw(Error) + end. + +ret_ddoc_process(Proc) -> + true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + catch unlink(Proc#proc.pid), + ok. + get_os_process(Lang) -> case gen_server:call(couch_query_servers, {get_proc, Lang}) of - {new, Proc} -> - proc_set_timeout(Proc, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Proc#proc.pid), - Proc; - {recycled, Proc, QueryConfig} -> + {ok, Proc, QueryConfig} -> case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> proc_set_timeout(Proc, list_to_integer(couch_config:get( @@ -403,9 +447,20 @@ add_to_list(Tid, Key, Value) -> true = ets:insert(Tid, {Key, [Value]}) end. +rem_from_list(Tid, Key, Value) when is_record(Value, proc)-> + Pid = Value#proc.pid, + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + % make a new values list that doesn't include the Value arg + NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid], + ets:insert(Tid, {Key, NewValues}); + [] -> ok + end; rem_from_list(Tid, Key, Value) -> case ets:lookup(Tid, Key) of [{Key, Vals}] -> - ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]}); + % make a new values list that doesn't include the Value arg + NewValues = [Val || Val <- Vals, Val /= Value], + ets:insert(Tid, {Key, NewValues}); [] -> ok end. |