summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2009-12-22 18:03:44 +0000
committerJohn Christopher Anderson <jchris@apache.org>2009-12-22 18:03:44 +0000
commitea3b1153e52ac1513da4d634eedefb05c261039c (patch)
tree858c5b3d81509bfe784b8d2d1252921cbf34aa54 /src/couchdb/couch_query_servers.erl
parent22c551bb103072826c0299265670d1483c753dde (diff)
move query server to a design-doc based protocol, closes COUCHDB-589
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@893249 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl257
1 files changed, 156 insertions, 101 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 4ac56727..30f4c4c7 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -17,10 +17,11 @@
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
--export([reduce/3, rereduce/3,validate_doc_update/5]).
--export([render_doc_show/6, render_doc_update/6, start_view_list/2,
- render_list_head/4, render_list_row/4, render_list_tail/1]).
+-export([reduce/3, rereduce/3,validate_doc_update/4]).
-export([filter_docs/5]).
+
+-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+
% -export([test/0]).
-include("couch_db.hrl").
@@ -28,6 +29,7 @@
-record(proc, {
pid,
lang,
+ ddoc_keys = [],
prompt_fun,
set_timeout_fun,
stop_fun
@@ -37,7 +39,7 @@ start_link() ->
gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
stop() ->
- exit(whereis(couch_query_servers), close).
+ exit(whereis(couch_query_servers), normal).
start_doc_map(Lang, Functions) ->
Proc = get_os_process(Lang),
@@ -91,21 +93,15 @@ group_reductions_results(List) ->
rereduce(_Lang, [], _ReducedValues) ->
{ok, []};
rereduce(Lang, RedSrcs, ReducedValues) ->
- Proc = get_os_process(Lang),
- Grouped = group_reductions_results(ReducedValues),
- Results = try lists:zipwith(
+ Grouped = group_reductions_results(ReducedValues),
+ Results = lists:zipwith(
fun
(<<"_", _/binary>> = FunSrc, Values) ->
{ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
Result;
(FunSrc, Values) ->
- [true, [Result]] =
- proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]),
- Result
- end, RedSrcs, Grouped)
- after
- ok = ret_os_process(Proc)
- end,
+ os_rereduce(Lang, [FunSrc], Values)
+ end, RedSrcs, Grouped),
{ok, Results}.
reduce(_Lang, [], _KVs) ->
@@ -137,6 +133,17 @@ os_reduce(Lang, OsRedSrcs, KVs) ->
end,
{ok, OsResults}.
+os_rereduce(_Lang, [], _KVs) ->
+ {ok, []};
+os_rereduce(Lang, OsRedSrcs, KVs) ->
+ Proc = get_os_process(Lang),
+ try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
+ [true, [Reduction]] -> Reduction
+ after
+ ok = ret_os_process(Proc)
+ end.
+
+
builtin_reduce(_Re, [], _KVs, Acc) ->
{ok, lists:reverse(Acc)};
builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) ->
@@ -157,92 +164,49 @@ builtin_sum_rows(KVs) ->
throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>})
end, 0, KVs).
-validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
- Proc = get_os_process(Lang),
- JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
- JsonDiskDoc =
- if DiskDoc == nil ->
- null;
- true ->
- couch_doc:to_json_obj(DiskDoc, [revs])
- end,
- try proc_prompt(Proc,
- [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
- 1 ->
- ok;
- {[{<<"forbidden">>, Message}]} ->
- throw({forbidden, Message});
- {[{<<"unauthorized">>, Message}]} ->
- throw({unauthorized, Message})
- after
- ok = ret_os_process(Proc)
- end.
-% todo use json_apply_field
-append_docid(DocId, JsonReqIn) ->
- [{<<"docId">>, DocId} | JsonReqIn].
-render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) ->
- Proc = get_os_process(Lang),
- {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db),
-
- {JsonReq, JsonDoc} = case {DocId, Doc} of
- {nil, nil} -> {{JsonReqIn}, null};
- {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null};
- _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])}
- end,
- try proc_prompt(Proc, [<<"show">>, ShowSrc, JsonDoc, JsonReq])
- after
- ok = ret_os_process(Proc)
- end.
-
-render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) ->
- Proc = get_os_process(Lang),
- {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db),
-
- {JsonReq, JsonDoc} = case {DocId, Doc} of
- {nil, nil} -> {{JsonReqIn}, null};
- {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null};
- _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])}
- end,
- try proc_prompt(Proc, [<<"update">>, UpdateSrc, JsonDoc, JsonReq])
- after
- ok = ret_os_process(Proc)
+% use the function stored in ddoc.validate_doc_update to test an update.
+validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx) ->
+ JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
+ JsonDiskDoc = json_doc(DiskDoc),
+ case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx]) of
+ 1 ->
+ ok;
+ {[{<<"forbidden">>, Message}]} ->
+ throw({forbidden, Message});
+ {[{<<"unauthorized">>, Message}]} ->
+ throw({unauthorized, Message})
end.
-start_view_list(Lang, ListSrc) ->
- Proc = get_os_process(Lang),
- proc_prompt(Proc, [<<"add_fun">>, ListSrc]),
- {ok, Proc}.
-
-render_list_head(Proc, Req, Db, Head) ->
- JsonReq = couch_httpd_external:json_req_obj(Req, Db),
- proc_prompt(Proc, [<<"list">>, Head, JsonReq]).
-
-render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) ->
- JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc),
- proc_prompt(Proc, [<<"list_row">>, JsonRow]);
-
-render_list_row(Proc, _, {Key, Value}, _IncludeDoc) ->
- JsonRow = {[{key, Key}, {value, Value}]},
- proc_prompt(Proc, [<<"list_row">>, JsonRow]).
-
-render_list_tail(Proc) ->
- JsonResp = proc_prompt(Proc, [<<"list_end">>]),
- ok = ret_os_process(Proc),
- JsonResp.
+json_doc(nil) -> null;
+json_doc(Doc) ->
+ couch_doc:to_json_obj(Doc, [revs]).
-filter_docs(Lang, Src, Docs, Req, Db) ->
+filter_docs(Req, Db, DDoc, FName, Docs) ->
JsonReq = couch_httpd_external:json_req_obj(Req, Db),
JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
JsonCtx = couch_util:json_user_ctx(Db),
- Proc = get_os_process(Lang),
- [true, Passes] = proc_prompt(Proc,
- [<<"filter">>, Src, JsonDocs, JsonReq, JsonCtx]),
- ret_os_process(Proc),
- {ok, Passes}.
+ [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq, JsonCtx]),
+ {ok, Passes}.
+
+ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
+ proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
+
+ddoc_prompt(DDoc, FunPath, Args) ->
+ with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+ proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
+ end).
+
+with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+ Rev = couch_doc:rev_to_str({Start, DiskRev}),
+ DDocKey = {DDocId, Rev},
+ Proc = get_ddoc_process(DDoc, DDocKey),
+ try Fun({Proc, DDocId})
+ after
+ ok = ret_os_process(Proc)
+ end.
init([]) ->
-
% read config and register for configuration changes
% just stop if one of the config settings change. couch_server_sup
@@ -282,7 +246,39 @@ init([]) ->
terminate(_Reason, _Server) ->
ok.
-
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) ->
+ % Note to future self. Add max process limit.
+ Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
+ case ets:lookup(LangProcs, Lang) of
+ [{Lang, [P|Rest]}] ->
+ % find a proc in the set that has the DDoc
+ case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of
+ {ok, Proc} ->
+ % looks like the proc isn't getting dropped from the list.
+ % we need to change this to take a fun for equality checking
+ % so we can do a comparison on portnum
+ rem_from_list(LangProcs, Lang, Proc),
+ add_to_list(InUse, Lang, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, Server};
+ Error ->
+ {reply, Error, Server}
+ end;
+ _ ->
+ case (catch new_process(Langs, Lang)) of
+ {ok, Proc} ->
+ add_value(PidProcs, Proc#proc.pid, Proc),
+ case proc_with_ddoc(DDoc, DDocKey, [Proc]) of
+ {ok, Proc2} ->
+ rem_from_list(LangProcs, Lang, Proc),
+ add_to_list(InUse, Lang, Proc2),
+ {reply, {ok, Proc2, get_query_server_config()}, Server};
+ Error ->
+ {reply, Error, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
+ end
+ end;
handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) ->
% Note to future self. Add max process limit.
case ets:lookup(LangProcs, Lang) of
@@ -290,12 +286,13 @@ handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server)
add_value(PidProcs, Proc#proc.pid, Proc),
rem_from_list(LangProcs, Lang, Proc),
add_to_list(InUse, Lang, Proc),
- {reply, {recycled, Proc, get_query_server_config()}, Server};
+ {reply, {ok, Proc, get_query_server_config()}, Server};
_ ->
case (catch new_process(Langs, Lang)) of
{ok, Proc} ->
+ add_value(PidProcs, Proc#proc.pid, Proc),
add_to_list(InUse, Lang, Proc),
- {reply, {new, Proc}, Server};
+ {reply, {ok, Proc, get_query_server_config()}, Server};
Error ->
{reply, Error, Server}
end
@@ -350,6 +347,23 @@ new_process(Langs, Lang) ->
{unknown_query_language, Lang}
end.
+proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
+ DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) ->
+ lists:any(fun(Key) ->
+ Key == DDocKey
+ end, Keys)
+ end, LangProcs),
+ case DDocProcs of
+ [DDocProc|_] ->
+ ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]),
+ {ok, DDocProc};
+ [] ->
+ [TeachProc|_] = LangProcs,
+ ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]),
+ {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc),
+ {ok, SmartProc}
+ end.
+
proc_prompt(Proc, Args) ->
{Mod, Func} = Proc#proc.prompt_fun,
apply(Mod, Func, [Proc#proc.pid, Args]).
@@ -362,14 +376,44 @@ proc_set_timeout(Proc, Timeout) ->
{Mod, Func} = Proc#proc.set_timeout_fun,
apply(Mod, Func, [Proc#proc.pid, Timeout]).
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+ % send ddoc over the wire
+ % we only share the rev with the client we know to update code
+ % but it only keeps the latest copy, per each ddoc, around.
+ true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ % we should remove any other ddocs keys for this docid
+ % because the query server overwrites without the rev
+ Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ % add ddoc to the proc
+ {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
+get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+ % remove this case statement
+ case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
+ {ok, Proc, QueryConfig} ->
+ % process knows the ddoc
+ case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
+ true ->
+ proc_set_timeout(Proc, list_to_integer(couch_config:get(
+ "couchdb", "os_process_timeout", "5000"))),
+ link(Proc#proc.pid),
+ Proc;
+ _ ->
+ catch proc_stop(Proc),
+ get_ddoc_process(DDoc, DDocKey)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+ret_ddoc_process(Proc) ->
+ true = gen_server:call(couch_query_servers, {ret_proc, Proc}),
+ catch unlink(Proc#proc.pid),
+ ok.
+
get_os_process(Lang) ->
case gen_server:call(couch_query_servers, {get_proc, Lang}) of
- {new, Proc} ->
- proc_set_timeout(Proc, list_to_integer(couch_config:get(
- "couchdb", "os_process_timeout", "5000"))),
- link(Proc#proc.pid),
- Proc;
- {recycled, Proc, QueryConfig} ->
+ {ok, Proc, QueryConfig} ->
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
@@ -403,9 +447,20 @@ add_to_list(Tid, Key, Value) ->
true = ets:insert(Tid, {Key, [Value]})
end.
+rem_from_list(Tid, Key, Value) when is_record(Value, proc)->
+ Pid = Value#proc.pid,
+ case ets:lookup(Tid, Key) of
+ [{Key, Vals}] ->
+ % make a new values list that doesn't include the Value arg
+ NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid],
+ ets:insert(Tid, {Key, NewValues});
+ [] -> ok
+ end;
rem_from_list(Tid, Key, Value) ->
case ets:lookup(Tid, Key) of
[{Key, Vals}] ->
- ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]});
+ % make a new values list that doesn't include the Value arg
+ NewValues = [Val || Val <- Vals, Val /= Value],
+ ets:insert(Tid, {Key, NewValues});
[] -> ok
end.