summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_native_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_native_process.erl')
-rw-r--r--src/couchdb/couch_native_process.erl243
1 files changed, 140 insertions, 103 deletions
diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl
index 2b74073c..65e4e131 100644
--- a/src/couchdb/couch_native_process.erl
+++ b/src/couchdb/couch_native_process.erl
@@ -38,63 +38,102 @@
% extensions will evolve which offer useful layers on top of this view server
% to help simplify your view code.
-module(couch_native_process).
+-behaviour(gen_server).
--export([start_link/0]).
--export([set_timeout/2, prompt/2, stop/1]).
+-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2]).
+-export([set_timeout/2, prompt/2]).
-define(STATE, native_proc_state).
--record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}).
+-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
-include("couch_db.hrl").
start_link() ->
- {ok, self()}.
+ gen_server:start_link(?MODULE, [], []).
-stop(_Pid) ->
- ok.
+% this is a bit messy, see also couch_query_servers handle_info
+% stop(_Pid) ->
+% ok.
-set_timeout(_Pid, TimeOut) ->
- NewState = case get(?STATE) of
- undefined ->
- #evstate{timeout=TimeOut};
- State ->
- State#evstate{timeout=TimeOut}
- end,
- put(?STATE, NewState),
- ok.
+set_timeout(Pid, TimeOut) ->
+ gen_server:call(Pid, {set_timeout, TimeOut}).
-prompt(Pid, Data) when is_pid(Pid), is_list(Data) ->
- case get(?STATE) of
- undefined ->
- State = #evstate{},
- put(?STATE, State);
- State ->
- State
- end,
- case is_pid(State#evstate.list_pid) of
- true ->
- case hd(Data) of
- <<"list_row">> -> ok;
- <<"list_end">> -> ok;
- _ -> throw({error, query_server_error})
- end;
- _ ->
- ok % Not listing
- end,
- {NewState, Resp} = run(State, to_binary(Data)),
- put(?STATE, NewState),
+prompt(Pid, Data) when is_list(Data) ->
+ gen_server:call(Pid, {prompt, Data}).
+
+% gen_server callbacks
+init([]) ->
+ {ok, #evstate{ddocs=dict:new()}}.
+
+handle_call({set_timeout, TimeOut}, _From, State) ->
+ {reply, ok, State#evstate{timeout=TimeOut}};
+
+handle_call({prompt, Data}, _From, State) ->
+ ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
+ {NewState, Resp} = try run(State, to_binary(Data)) of
+ {S, R} -> {S, R}
+ catch
+ throw:{error, Why} ->
+ {State, [<<"error">>, Why, Why]}
+ end,
+
case Resp of
{error, Reason} ->
Msg = io_lib:format("couch native server error: ~p", [Reason]),
- {[{<<"error">>, list_to_binary(Msg)}]};
- _ ->
- Resp
+ {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState};
+ [<<"error">> | Rest] ->
+ Msg = io_lib:format("couch native server error: ~p", [Rest]),
+ {reply, [<<"error">> | Rest], NewState};
+ [<<"fatal">> | Rest] ->
+ Msg = io_lib:format("couch native server error: ~p", [Rest]),
+ {stop, fatal, [<<"error">> | Rest], NewState};
+ Resp ->
+ {reply, Resp, NewState}
end.
-run(_, [<<"reset">>]) ->
- {#evstate{}, true};
-run(_, [<<"reset">>, QueryConfig]) ->
- {#evstate{query_config=QueryConfig}, true};
+handle_cast(_Msg, State) -> {noreply, State}.
+handle_info(_Msg, State) -> {noreply, State}.
+terminate(_Reason, _State) -> ok.
+code_change(_OldVersion, State, _Extra) -> {ok, State}.
+
+run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+ Pid ! {self(), list_row, Row},
+ receive
+ {Pid, chunks, Data} ->
+ {State, [<<"chunks">>, Data]};
+ {Pid, list_end, Data} ->
+ receive
+ {'EXIT', Pid, normal} -> ok
+ after State#evstate.timeout ->
+ throw({timeout, list_cleanup})
+ end,
+ process_flag(trap_exit, erlang:get(do_trap)),
+ {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+ after State#evstate.timeout ->
+ throw({timeout, list_row})
+ end;
+run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+ Pid ! {self(), list_end},
+ Resp =
+ receive
+ {Pid, list_end, Data} ->
+ receive
+ {'EXIT', Pid, normal} -> ok
+ after State#evstate.timeout ->
+ throw({timeout, list_cleanup})
+ end,
+ [<<"end">>, Data]
+ after State#evstate.timeout ->
+ throw({timeout, list_end})
+ end,
+ process_flag(trap_exit, erlang:get(do_trap)),
+ {State#evstate{list_pid=nil}, Resp};
+run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
+ {State, [<<"error">>, list_error, list_error]};
+run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
+ {#evstate{ddocs=DDocs}, true};
+run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) ->
+ {#evstate{ddocs=DDocs, query_config=QueryConfig}, true};
run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
FunInfo = makefun(State, BinFunc),
{State#evstate{funs=Funs ++ [FunInfo]}, true};
@@ -115,41 +154,55 @@ run(State, [<<"reduce">>, Funs, KVs]) ->
{State, catch reduce(State, Funs, Keys2, Vals2, false)};
run(State, [<<"rereduce">>, Funs, Vals]) ->
{State, catch reduce(State, Funs, null, Vals, true)};
-run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) ->
- {_Sig, Fun} = makefun(State, BFun),
- {State, catch Fun(NDoc, ODoc, Ctx)};
-run(State, [<<"filter">>, Docs, Req]) ->
- {_Sig, Fun} = hd(State#evstate.funs),
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
+ DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
+ {State#evstate{ddocs=DDocs2}, true};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
+ DDoc = load_ddoc(DDocs, DDocId),
+ ddoc(State, DDoc, Rest);
+run(_, Unknown) ->
+ ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
+ throw({error, unknown_command}).
+
+ddoc(State, {DDoc}, [FunPath, Args]) ->
+ % load fun from the FunPath
+ BFun = lists:foldl(fun
+ (Key, {Props}) when is_list(Props) ->
+ proplists:get_value(Key, Props, nil);
+ (Key, Fun) when is_binary(Fun) ->
+ Fun;
+ (Key, nil) ->
+ throw({error, not_found});
+ (Key, Fun) ->
+ throw({error, malformed_ddoc})
+ end, {DDoc}, FunPath),
+ ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
+
+ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
+ {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
Resp = lists:map(fun(Doc) -> (catch Fun(Doc, Req)) =:= true end, Docs),
{State, [true, Resp]};
-run(State, [<<"show">>, BFun, Doc, Req]) ->
- {_Sig, Fun} = makefun(State, BFun),
- Resp = case (catch Fun(Doc, Req)) of
+ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
+ Resp = case (catch apply(Fun, Args)) of
FunResp when is_list(FunResp) ->
FunResp;
- FunResp when tuple_size(FunResp) =:= 1 ->
- [<<"resp">>, FunResp];
+ {FunResp} ->
+ [<<"resp">>, {FunResp}];
FunResp ->
FunResp
end,
{State, Resp};
-run(State, [<<"update">>, BFun, Doc, Req]) ->
- {_Sig, Fun} = makefun(State, BFun),
- Resp = case (catch Fun(Doc, Req)) of
+ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
+ Resp = case (catch apply(Fun, Args)) of
[JsonDoc, JsonResp] ->
[<<"up">>, JsonDoc, JsonResp]
end,
{State, Resp};
-run(State, [<<"list">>, Head, Req]) ->
- {Sig, Fun} = hd(State#evstate.funs),
- % This is kinda dirty
- case is_function(Fun, 2) of
- false -> throw({error, render_error});
- true -> ok
- end,
+ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
Self = self(),
SpawnFun = fun() ->
- LastChunk = (catch Fun(Head, Req)),
+ LastChunk = (catch apply(Fun, Args)),
case start_list_resp(Self, Sig) of
started ->
receive
@@ -177,44 +230,20 @@ run(State, [<<"list">>, Head, Req]) ->
after State#evstate.timeout ->
throw({timeout, list_start})
end,
- {State#evstate{list_pid=Pid}, Resp};
-run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
- Pid ! {self(), list_row, Row},
- receive
- {Pid, chunks, Data} ->
- {State, [<<"chunks">>, Data]};
- {Pid, list_end, Data} ->
- receive
- {'EXIT', Pid, normal} -> ok
- after State#evstate.timeout ->
- throw({timeout, list_cleanup})
- end,
- process_flag(trap_exit, erlang:get(do_trap)),
- {State#evstate{list_pid=nil}, [<<"end">>, Data]}
- after State#evstate.timeout ->
- throw({timeout, list_row})
- end;
-run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
- Pid ! {self(), list_end},
- Resp =
- receive
- {Pid, list_end, Data} ->
- receive
- {'EXIT', Pid, normal} -> ok
- after State#evstate.timeout ->
- throw({timeout, list_cleanup})
- end,
- [<<"end">>, Data]
- after State#evstate.timeout ->
- throw({timeout, list_end})
- end,
- process_flag(trap_exit, erlang:get(do_trap)),
- {State#evstate{list_pid=nil}, Resp};
-run(_, Unknown) ->
- ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
- throw({error, query_server_error}).
+ {State#evstate{list_pid=Pid}, Resp}.
+
+store_ddoc(DDocs, DDocId, DDoc) ->
+ dict:store(DDocId, DDoc, DDocs).
+load_ddoc(DDocs, DDocId) ->
+ try dict:fetch(DDocId, DDocs) of
+ {DDoc} -> {DDoc}
+ catch
+ _:Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
+ end.
bindings(State, Sig) ->
+ bindings(State, Sig, nil).
+bindings(State, Sig, DDoc) ->
Self = self(),
Log = fun(Msg) ->
@@ -262,14 +291,19 @@ bindings(State, Sig) ->
FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
- [
+ Bindings = [
{'Log', Log},
{'Emit', Emit},
{'Start', Start},
{'Send', Send},
{'GetRow', GetRow},
{'FoldRows', FoldRows}
- ].
+ ],
+ case DDoc of
+ {Props} ->
+ Bindings ++ [{'DDoc', DDoc}];
+ _Else -> Bindings
+ end.
% thanks to erlview, via:
% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
@@ -277,8 +311,11 @@ makefun(State, Source) ->
Sig = erlang:md5(Source),
BindFuns = bindings(State, Sig),
{Sig, makefun(State, Source, BindFuns)}.
-
-makefun(_State, Source, BindFuns) ->
+makefun(State, Source, {DDoc}) ->
+ Sig = erlang:md5(lists:flatten([Source, term_to_binary(DDoc)])),
+ BindFuns = bindings(State, Sig, {DDoc}),
+ {Sig, makefun(State, Source, BindFuns)};
+makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
FunStr = binary_to_list(Source),
{ok, Tokens, _} = erl_scan:string(FunStr),
Form = case (catch erl_parse:parse_exprs(Tokens)) of