From ea3b1153e52ac1513da4d634eedefb05c261039c Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Tue, 22 Dec 2009 18:03:44 +0000 Subject: move query server to a design-doc based protocol, closes COUCHDB-589 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@893249 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_native_process.erl | 243 ++++++++++++++++++++--------------- 1 file changed, 140 insertions(+), 103 deletions(-) (limited to 'src/couchdb/couch_native_process.erl') diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl index 2b74073c..65e4e131 100644 --- a/src/couchdb/couch_native_process.erl +++ b/src/couchdb/couch_native_process.erl @@ -38,63 +38,102 @@ % extensions will evolve which offer useful layers on top of this view server % to help simplify your view code. -module(couch_native_process). +-behaviour(gen_server). --export([start_link/0]). --export([set_timeout/2, prompt/2, stop/1]). +-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2]). +-export([set_timeout/2, prompt/2]). -define(STATE, native_proc_state). --record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}). +-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}). -include("couch_db.hrl"). start_link() -> - {ok, self()}. + gen_server:start_link(?MODULE, [], []). -stop(_Pid) -> - ok. +% this is a bit messy, see also couch_query_servers handle_info +% stop(_Pid) -> +% ok. -set_timeout(_Pid, TimeOut) -> - NewState = case get(?STATE) of - undefined -> - #evstate{timeout=TimeOut}; - State -> - State#evstate{timeout=TimeOut} - end, - put(?STATE, NewState), - ok. +set_timeout(Pid, TimeOut) -> + gen_server:call(Pid, {set_timeout, TimeOut}). -prompt(Pid, Data) when is_pid(Pid), is_list(Data) -> - case get(?STATE) of - undefined -> - State = #evstate{}, - put(?STATE, State); - State -> - State - end, - case is_pid(State#evstate.list_pid) of - true -> - case hd(Data) of - <<"list_row">> -> ok; - <<"list_end">> -> ok; - _ -> throw({error, query_server_error}) - end; - _ -> - ok % Not listing - end, - {NewState, Resp} = run(State, to_binary(Data)), - put(?STATE, NewState), +prompt(Pid, Data) when is_list(Data) -> + gen_server:call(Pid, {prompt, Data}). + +% gen_server callbacks +init([]) -> + {ok, #evstate{ddocs=dict:new()}}. + +handle_call({set_timeout, TimeOut}, _From, State) -> + {reply, ok, State#evstate{timeout=TimeOut}}; + +handle_call({prompt, Data}, _From, State) -> + ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]), + {NewState, Resp} = try run(State, to_binary(Data)) of + {S, R} -> {S, R} + catch + throw:{error, Why} -> + {State, [<<"error">>, Why, Why]} + end, + case Resp of {error, Reason} -> Msg = io_lib:format("couch native server error: ~p", [Reason]), - {[{<<"error">>, list_to_binary(Msg)}]}; - _ -> - Resp + {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState}; + [<<"error">> | Rest] -> + Msg = io_lib:format("couch native server error: ~p", [Rest]), + {reply, [<<"error">> | Rest], NewState}; + [<<"fatal">> | Rest] -> + Msg = io_lib:format("couch native server error: ~p", [Rest]), + {stop, fatal, [<<"error">> | Rest], NewState}; + Resp -> + {reply, Resp, NewState} end. -run(_, [<<"reset">>]) -> - {#evstate{}, true}; -run(_, [<<"reset">>, QueryConfig]) -> - {#evstate{query_config=QueryConfig}, true}; +handle_cast(_Msg, State) -> {noreply, State}. +handle_info(_Msg, State) -> {noreply, State}. +terminate(_Reason, _State) -> ok. +code_change(_OldVersion, State, _Extra) -> {ok, State}. + +run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> + Pid ! {self(), list_row, Row}, + receive + {Pid, chunks, Data} -> + {State, [<<"chunks">>, Data]}; + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, [<<"end">>, Data]} + after State#evstate.timeout -> + throw({timeout, list_row}) + end; +run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> + Pid ! {self(), list_end}, + Resp = + receive + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + [<<"end">>, Data] + after State#evstate.timeout -> + throw({timeout, list_end}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, Resp}; +run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) -> + {State, [<<"error">>, list_error, list_error]}; +run(#evstate{ddocs=DDocs}, [<<"reset">>]) -> + {#evstate{ddocs=DDocs}, true}; +run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) -> + {#evstate{ddocs=DDocs, query_config=QueryConfig}, true}; run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> FunInfo = makefun(State, BinFunc), {State#evstate{funs=Funs ++ [FunInfo]}, true}; @@ -115,41 +154,55 @@ run(State, [<<"reduce">>, Funs, KVs]) -> {State, catch reduce(State, Funs, Keys2, Vals2, false)}; run(State, [<<"rereduce">>, Funs, Vals]) -> {State, catch reduce(State, Funs, null, Vals, true)}; -run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) -> - {_Sig, Fun} = makefun(State, BFun), - {State, catch Fun(NDoc, ODoc, Ctx)}; -run(State, [<<"filter">>, Docs, Req]) -> - {_Sig, Fun} = hd(State#evstate.funs), +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> + DDocs2 = store_ddoc(DDocs, DDocId, DDoc), + {State#evstate{ddocs=DDocs2}, true}; +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) -> + DDoc = load_ddoc(DDocs, DDocId), + ddoc(State, DDoc, Rest); +run(_, Unknown) -> + ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), + throw({error, unknown_command}). + +ddoc(State, {DDoc}, [FunPath, Args]) -> + % load fun from the FunPath + BFun = lists:foldl(fun + (Key, {Props}) when is_list(Props) -> + proplists:get_value(Key, Props, nil); + (Key, Fun) when is_binary(Fun) -> + Fun; + (Key, nil) -> + throw({error, not_found}); + (Key, Fun) -> + throw({error, malformed_ddoc}) + end, {DDoc}, FunPath), + ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args). + +ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) -> + {State, (catch apply(Fun, Args))}; +ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) -> Resp = lists:map(fun(Doc) -> (catch Fun(Doc, Req)) =:= true end, Docs), {State, [true, Resp]}; -run(State, [<<"show">>, BFun, Doc, Req]) -> - {_Sig, Fun} = makefun(State, BFun), - Resp = case (catch Fun(Doc, Req)) of +ddoc(State, {_, Fun}, [<<"shows">>|_], Args) -> + Resp = case (catch apply(Fun, Args)) of FunResp when is_list(FunResp) -> FunResp; - FunResp when tuple_size(FunResp) =:= 1 -> - [<<"resp">>, FunResp]; + {FunResp} -> + [<<"resp">>, {FunResp}]; FunResp -> FunResp end, {State, Resp}; -run(State, [<<"update">>, BFun, Doc, Req]) -> - {_Sig, Fun} = makefun(State, BFun), - Resp = case (catch Fun(Doc, Req)) of +ddoc(State, {_, Fun}, [<<"updates">>|_], Args) -> + Resp = case (catch apply(Fun, Args)) of [JsonDoc, JsonResp] -> [<<"up">>, JsonDoc, JsonResp] end, {State, Resp}; -run(State, [<<"list">>, Head, Req]) -> - {Sig, Fun} = hd(State#evstate.funs), - % This is kinda dirty - case is_function(Fun, 2) of - false -> throw({error, render_error}); - true -> ok - end, +ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> Self = self(), SpawnFun = fun() -> - LastChunk = (catch Fun(Head, Req)), + LastChunk = (catch apply(Fun, Args)), case start_list_resp(Self, Sig) of started -> receive @@ -177,44 +230,20 @@ run(State, [<<"list">>, Head, Req]) -> after State#evstate.timeout -> throw({timeout, list_start}) end, - {State#evstate{list_pid=Pid}, Resp}; -run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> - Pid ! {self(), list_row, Row}, - receive - {Pid, chunks, Data} -> - {State, [<<"chunks">>, Data]}; - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, [<<"end">>, Data]} - after State#evstate.timeout -> - throw({timeout, list_row}) - end; -run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> - Pid ! {self(), list_end}, - Resp = - receive - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - [<<"end">>, Data] - after State#evstate.timeout -> - throw({timeout, list_end}) - end, - process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, Resp}; -run(_, Unknown) -> - ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), - throw({error, query_server_error}). + {State#evstate{list_pid=Pid}, Resp}. + +store_ddoc(DDocs, DDocId, DDoc) -> + dict:store(DDocId, DDoc, DDocs). +load_ddoc(DDocs, DDocId) -> + try dict:fetch(DDocId, DDocs) of + {DDoc} -> {DDoc} + catch + _:Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))}) + end. bindings(State, Sig) -> + bindings(State, Sig, nil). +bindings(State, Sig, DDoc) -> Self = self(), Log = fun(Msg) -> @@ -262,14 +291,19 @@ bindings(State, Sig) -> FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, - [ + Bindings = [ {'Log', Log}, {'Emit', Emit}, {'Start', Start}, {'Send', Send}, {'GetRow', GetRow}, {'FoldRows', FoldRows} - ]. + ], + case DDoc of + {Props} -> + Bindings ++ [{'DDoc', DDoc}]; + _Else -> Bindings + end. % thanks to erlview, via: % http://erlang.org/pipermail/erlang-questions/2003-November/010544.html @@ -277,8 +311,11 @@ makefun(State, Source) -> Sig = erlang:md5(Source), BindFuns = bindings(State, Sig), {Sig, makefun(State, Source, BindFuns)}. - -makefun(_State, Source, BindFuns) -> +makefun(State, Source, {DDoc}) -> + Sig = erlang:md5(lists:flatten([Source, term_to_binary(DDoc)])), + BindFuns = bindings(State, Sig, {DDoc}), + {Sig, makefun(State, Source, BindFuns)}; +makefun(_State, Source, BindFuns) when is_list(BindFuns) -> FunStr = binary_to_list(Source), {ok, Tokens, _} = erl_scan:string(FunStr), Form = case (catch erl_parse:parse_exprs(Tokens)) of -- cgit v1.2.3