diff options
author | John Christopher Anderson <jchris@apache.org> | 2009-12-22 18:03:44 +0000 |
---|---|---|
committer | John Christopher Anderson <jchris@apache.org> | 2009-12-22 18:03:44 +0000 |
commit | ea3b1153e52ac1513da4d634eedefb05c261039c (patch) | |
tree | 858c5b3d81509bfe784b8d2d1252921cbf34aa54 /src/couchdb | |
parent | 22c551bb103072826c0299265670d1483c753dde (diff) |
move query server to a design-doc based protocol, closes COUCHDB-589
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@893249 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_doc.erl | 8 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 10 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 52 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_external.erl | 7 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_show.erl | 474 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_view.erl | 87 | ||||
-rw-r--r-- | src/couchdb/couch_native_process.erl | 243 | ||||
-rw-r--r-- | src/couchdb/couch_os_process.erl | 25 | ||||
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 257 |
9 files changed, 590 insertions, 573 deletions
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index bdefb95c..ba5c7450 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -292,15 +292,13 @@ att_to_iolist(#att{data=DataFun, len=Len}) when is_function(DataFun)-> lists:reverse(fold_streamed_data(DataFun, Len, fun(Data, Acc) -> [Data | Acc] end, [])). -get_validate_doc_fun(#doc{body={Props}}) -> - Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), +get_validate_doc_fun(#doc{body={Props}}=DDoc) -> case proplists:get_value(<<"validate_doc_update">>, Props) of undefined -> nil; - FunSrc -> + _Else -> fun(EditDoc, DiskDoc, Ctx) -> - couch_query_servers:validate_doc_update( - Lang, FunSrc, EditDoc, DiskDoc, Ctx) + couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx) end end. diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index a61b29fb..baa22d8f 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -51,7 +51,7 @@ start_link() -> DesignUrlHandlersList = lists:map( fun({UrlKey, SpecStr}) -> - {?l2b(UrlKey), make_arity_2_fun(SpecStr)} + {?l2b(UrlKey), make_arity_3_fun(SpecStr)} end, couch_config:get("httpd_design_handlers")), UrlHandlers = dict:from_list(UrlHandlersList), @@ -110,6 +110,14 @@ make_arity_2_fun(SpecStr) -> fun(Arg1, Arg2) -> Mod:Fun(Arg1, Arg2) end end. +make_arity_3_fun(SpecStr) -> + case couch_util:parse_term(SpecStr) of + {ok, {Mod, Fun, SpecArg}} -> + fun(Arg1, Arg2, Arg3) -> Mod:Fun(Arg1, Arg2, Arg3, SpecArg) end; + {ok, {Mod, Fun}} -> + fun(Arg1, Arg2, Arg3) -> Mod:Fun(Arg1, Arg2, Arg3) end + end. + % SpecStr is "{my_module, my_fun}, {my_module2, my_fun2}" make_arity_1_fun_list(SpecStr) -> [make_arity_1_fun(FunSpecStr) || FunSpecStr <- re:split(SpecStr, "(?<=})\\s*,\\s*(?={)", [{return, list}])]. diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 8b955c88..bf24b712 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -16,7 +16,7 @@ -export([handle_request/1, handle_compact_req/2, handle_design_req/2, db_req/2, couch_doc_open/4,handle_changes_req/2, update_doc_result_to_json/1, update_doc_result_to_json/2, - handle_design_info_req/2, handle_view_cleanup_req/2]). + handle_design_info_req/3, handle_view_cleanup_req/2]). -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, @@ -232,26 +232,18 @@ make_filter_fun(Req, Db) -> end; [DName, FName] -> DesignId = <<"_design/", DName/binary>>, - case couch_db:open_doc(Db, DesignId) of - {ok, #doc{body={Props}}} -> - FilterSrc = try couch_util:get_nested_json_value({Props}, - [<<"filters">>, FName]) - catch - throw:{not_found, _} -> - throw({bad_request, "invalid filter function"}) - end, - Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), - fun(DocInfos) -> - Docs = [Doc || {ok, Doc} <- [ - {ok, Doc} = couch_db:open_doc(Db, DInfo, [deleted]) - || DInfo <- DocInfos]], - {ok, Passes} = couch_query_servers:filter_docs(Lang, FilterSrc, Docs, Req, Db), - [{[{rev, couch_doc:rev_to_str(Rev)}]} - || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, - Pass <- Passes, Pass == true] - end; - _Error -> - throw({bad_request, "invalid design doc"}) + DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), + % validate that the ddoc has the filter fun + #doc{body={Props}} = DDoc, + couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), + fun(DocInfos) -> + Docs = [Doc || {ok, Doc} <- [ + {ok, Doc} = couch_db:open_doc(Db, DInfo, [deleted]) + || DInfo <- DocInfos]], + {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), + [{[{rev, couch_doc:rev_to_str(Rev)}]} + || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, + Pass <- Passes, Pass == true] end; _Else -> throw({bad_request, @@ -279,11 +271,14 @@ handle_view_cleanup_req(Req, _Db) -> handle_design_req(#httpd{ - path_parts=[_DbName,_Design,_DesName, <<"_",_/binary>> = Action | _Rest], + path_parts=[_DbName, _Design, DesignName, <<"_",_/binary>> = Action | _Rest], design_url_handlers = DesignUrlHandlers }=Req, Db) -> + % load ddoc + DesignId = <<"_design/", DesignName/binary>>, + DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), Handler = couch_util:dict_find(Action, DesignUrlHandlers, fun db_req/2), - Handler(Req, Db); + Handler(Req, Db, DDoc); handle_design_req(Req, Db) -> db_req(Req, Db). @@ -291,7 +286,7 @@ handle_design_req(Req, Db) -> handle_design_info_req(#httpd{ method='GET', path_parts=[_DbName, _Design, DesignName, _] - }=Req, Db) -> + }=Req, Db, _DDoc) -> DesignId = <<"_design/", DesignName/binary>>, {ok, GroupInfoList} = couch_view:get_group_info(Db, DesignId), send_json(Req, 200, {[ @@ -299,7 +294,7 @@ handle_design_info_req(#httpd{ {view_index, {GroupInfoList}} ]}); -handle_design_info_req(Req, _Db) -> +handle_design_info_req(Req, _Db, _DDoc) -> send_method_not_allowed(Req, "GET"). create_db_req(#httpd{user_ctx=UserCtx}=Req, DbName) -> @@ -725,7 +720,12 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> end; _ -> {DesignName, ShowName} = Format, - couch_httpd_show:handle_doc_show(Req, DesignName, ShowName, DocId, Db) + % load ddoc + DesignId = <<"_design/", DesignName/binary>>, + DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), + % open doc + Doc = couch_doc_open(Db, DocId, Rev, Options), + couch_httpd_show:handle_doc_show(Req, Db, DDoc, ShowName, Doc) end; db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> diff --git a/src/couchdb/couch_httpd_external.erl b/src/couchdb/couch_httpd_external.erl index 86b2bfc6..13aff847 100644 --- a/src/couchdb/couch_httpd_external.erl +++ b/src/couchdb/couch_httpd_external.erl @@ -13,7 +13,7 @@ -module(couch_httpd_external). -export([handle_external_req/2, handle_external_req/3]). --export([send_external_response/2, json_req_obj/2]). +-export([send_external_response/2, json_req_obj/2, json_req_obj/3]). -export([default_or_content_type/2, parse_external_response/1]). -import(couch_httpd,[send_error/4]). @@ -53,12 +53,12 @@ process_external_req(HttpReq, Db, Name) -> _ -> send_external_response(HttpReq, Response) end. - +json_req_obj(Req, Db) -> json_req_obj(Req, Db, null). json_req_obj(#httpd{mochi_req=Req, method=Verb, path_parts=Path, req_body=ReqBody - }, Db) -> + }, Db, DocId) -> Body = case ReqBody of undefined -> Req:recv_body(); Else -> Else @@ -74,6 +74,7 @@ json_req_obj(#httpd{mochi_req=Req, {ok, Info} = couch_db:get_db_info(Db), % add headers... {[{<<"info">>, {Info}}, + {<<"id">>, DocId}, {<<"verb">>, Verb}, {<<"path">>, Path}, {<<"query">>, to_json_terms(Req:parse_qs())}, diff --git a/src/couchdb/couch_httpd_show.erl b/src/couchdb/couch_httpd_show.erl index 5c95070a..467c0a42 100644 --- a/src/couchdb/couch_httpd_show.erl +++ b/src/couchdb/couch_httpd_show.erl @@ -12,8 +12,8 @@ -module(couch_httpd_show). --export([handle_doc_show_req/2, handle_doc_update_req/2, handle_view_list_req/2, - handle_doc_show/5, handle_view_list/7]). +-export([handle_doc_show_req/3, handle_doc_update_req/3, handle_view_list_req/3, + handle_doc_show/5, handle_view_list/6, get_fun_key/3]). -include("couch_db.hrl"). @@ -22,217 +22,245 @@ start_json_response/2,send_chunk/2,last_chunk/1,send_chunked_error/2, start_chunked_response/3, send_error/4]). +% /db/_design/foo/show/bar/docid +% show converts a json doc to a response of any content-type. +% it looks up the doc an then passes it to the query server. +% then it sends the response from the query server to the http client. handle_doc_show_req(#httpd{ - method='GET', - path_parts=[_DbName, _Design, DesignName, _Show, ShowName, DocId] - }=Req, Db) -> - handle_doc_show(Req, DesignName, ShowName, DocId, Db); + path_parts=[_, _, _, _, ShowName, DocId] + }=Req, Db, DDoc) -> + % open the doc + Doc = couch_httpd_db:couch_doc_open(Db, DocId, nil, [conflicts]), + % we don't handle revs here b/c they are an internal api + % returns 404 if there is no doc with DocId + handle_doc_show(Req, Db, DDoc, ShowName, Doc); handle_doc_show_req(#httpd{ - path_parts=[_DbName, _Design, DesignName, _Show, ShowName] - }=Req, Db) -> - handle_doc_show(Req, DesignName, ShowName, nil, Db); + path_parts=[_, _, _, _, ShowName] + }=Req, Db, DDoc) -> + % with no docid the doc is nil + handle_doc_show(Req, Db, DDoc, ShowName, nil); -handle_doc_show_req(#httpd{method='GET'}=Req, _Db) -> - send_error(Req, 404, <<"show_error">>, <<"Invalid path.">>); +handle_doc_show_req(Req, _Db, _DDoc) -> + send_error(Req, 404, <<"show_error">>, <<"Invalid path.">>). + +handle_doc_show(Req, Db, DDoc, ShowName, Doc) -> + % get responder for ddoc/showname + CurrentEtag = show_etag(Req, Doc, DDoc, []), + couch_httpd:etag_respond(Req, CurrentEtag, fun() -> + JsonReq = couch_httpd_external:json_req_obj(Req, Db), + JsonDoc = couch_query_servers:json_doc(Doc), + [<<"resp">>, ExternalResp] = + couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName], [JsonDoc, JsonReq]), + JsonResp = apply_etag(ExternalResp, CurrentEtag), + couch_httpd_external:send_external_response(Req, JsonResp) + end). -handle_doc_show_req(Req, _Db) -> - send_method_not_allowed(Req, "GET,POST,HEAD"). -handle_doc_update_req(#httpd{method = 'GET'}=Req, _Db) -> - send_method_not_allowed(Req, "POST,PUT,DELETE,ETC"); +show_etag(#httpd{user_ctx=UserCtx}=Req, Doc, DDoc, More) -> + Accept = couch_httpd:header_value(Req, "Accept"), + DocPart = case Doc of + nil -> nil; + Doc -> couch_httpd:doc_etag(Doc) + end, + couch_httpd:make_etag({couch_httpd:doc_etag(DDoc), DocPart, Accept, UserCtx#user_ctx.roles, More}). + +get_fun_key(DDoc, Type, Name) -> + #doc{body={Props}} = DDoc, + Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), + Src = couch_util:get_nested_json_value({Props}, [Type, Name]), + {Lang, Src}. + +% /db/_design/foo/update/bar/docid +% updates a doc based on a request +% handle_doc_update_req(#httpd{method = 'GET'}=Req, _Db, _DDoc) -> +% % anything but GET +% send_method_not_allowed(Req, "POST,PUT,DELETE,ETC"); handle_doc_update_req(#httpd{ - path_parts=[_DbName, _Design, DesignName, _Update, UpdateName, DocId] - }=Req, Db) -> - DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), - Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), - UpdateSrc = couch_util:get_nested_json_value({Props}, [<<"updates">>, UpdateName]), + path_parts=[_, _, _, _, UpdateName, DocId] + }=Req, Db, DDoc) -> Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, [conflicts]) - catch - _ -> nil - end, - send_doc_update_response(Lang, UpdateSrc, DocId, Doc, Req, Db); + catch + _ -> nil + end, + send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId); handle_doc_update_req(#httpd{ - path_parts=[_DbName, _Design, DesignName, _Update, UpdateName] - }=Req, Db) -> - DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), - Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), - UpdateSrc = couch_util:get_nested_json_value({Props}, [<<"updates">>, UpdateName]), - send_doc_update_response(Lang, UpdateSrc, nil, nil, Req, Db); + path_parts=[_, _, _, _, UpdateName] + }=Req, Db, DDoc) -> + send_doc_update_response(Req, Db, DDoc, UpdateName, nil, null); -handle_doc_update_req(Req, _Db) -> +handle_doc_update_req(Req, _Db, _DDoc) -> send_error(Req, 404, <<"update_error">>, <<"Invalid path.">>). - - -handle_doc_show(Req, DesignName, ShowName, DocId, Db) -> - DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), - Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), - ShowSrc = couch_util:get_nested_json_value({Props}, [<<"shows">>, ShowName]), - Doc = case DocId of - nil -> nil; - _ -> - try couch_httpd_db:couch_doc_open(Db, DocId, nil, [conflicts]) - catch - _ -> nil - end +send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> + JsonReq = couch_httpd_external:json_req_obj(Req, Db, DocId), + JsonDoc = couch_query_servers:json_doc(Doc), + case couch_query_servers:ddoc_prompt(DDoc, [<<"updates">>, UpdateName], [JsonDoc, JsonReq]) of + [<<"up">>, {NewJsonDoc}, JsonResp] -> + Options = case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of + "true" -> + [full_commit]; + _ -> + [] + end, + NewDoc = couch_doc:from_json_obj({NewJsonDoc}), + Code = 201, + {ok, _NewRev} = couch_db:update_doc(Db, NewDoc, Options); + [<<"up">>, _Other, JsonResp] -> + Code = 200, + ok end, - send_doc_show_response(Lang, ShowSrc, DocId, Doc, Req, Db). + JsonResp2 = json_apply_field({<<"code">>, Code}, JsonResp), + % todo set location field + couch_httpd_external:send_external_response(Req, JsonResp2). + % view-list request with view and list from same design doc. handle_view_list_req(#httpd{method='GET', - path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) -> - handle_view_list(Req, DesignName, ListName, DesignName, ViewName, Db, nil); + path_parts=[_, _, DesignName, _, ListName, ViewName]}=Req, Db, DDoc) -> + handle_view_list(Req, Db, DDoc, ListName, {DesignName, ViewName}, nil); % view-list request with view and list from different design docs. handle_view_list_req(#httpd{method='GET', - path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewDesignName, ViewName]}=Req, Db) -> - handle_view_list(Req, DesignName, ListName, ViewDesignName, ViewName, Db, nil); + path_parts=[_, _, _, _, ListName, ViewDesignName, ViewName]}=Req, Db, DDoc) -> + handle_view_list(Req, Db, DDoc, ListName, {ViewDesignName, ViewName}, nil); -handle_view_list_req(#httpd{method='GET'}=Req, _Db) -> +handle_view_list_req(#httpd{method='GET'}=Req, _Db, _DDoc) -> send_error(Req, 404, <<"list_error">>, <<"Invalid path.">>); handle_view_list_req(#httpd{method='POST', - path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) -> + path_parts=[_, _, DesignName, _, ListName, ViewName]}=Req, Db, DDoc) -> + % {Props2} = couch_httpd:json_body(Req), ReqBody = couch_httpd:body(Req), {Props2} = ?JSON_DECODE(ReqBody), Keys = proplists:get_value(<<"keys">>, Props2, nil), - handle_view_list(Req#httpd{req_body=ReqBody}, DesignName, ListName, DesignName, ViewName, Db, Keys); - -handle_view_list_req(Req, _Db) -> - send_method_not_allowed(Req, "GET,POST,HEAD"). + handle_view_list(Req#httpd{req_body=ReqBody}, Db, DDoc, ListName, {DesignName, ViewName}, Keys); -handle_view_list(Req, ListDesignName, ListName, ViewDesignName, ViewName, Db, Keys) -> - ListDesignId = <<"_design/", ListDesignName/binary>>, - #doc{body={ListProps}} = couch_httpd_db:couch_doc_open(Db, ListDesignId, nil, []), - if - ViewDesignName == ListDesignName -> - ViewDesignId = ListDesignId; - true -> - ViewDesignId = <<"_design/", ViewDesignName/binary>> - end, +handle_view_list_req(#httpd{method='POST', + path_parts=[_, _, _, _, ListName, ViewDesignName, ViewName]}=Req, Db, DDoc) -> + % {Props2} = couch_httpd:json_body(Req), + ReqBody = couch_httpd:body(Req), + {Props2} = ?JSON_DECODE(ReqBody), + Keys = proplists:get_value(<<"keys">>, Props2, nil), + handle_view_list(Req#httpd{req_body=ReqBody}, Db, DDoc, ListName, {ViewDesignName, ViewName}, Keys); - ListLang = proplists:get_value(<<"language">>, ListProps, <<"javascript">>), - ListSrc = couch_util:get_nested_json_value({ListProps}, [<<"lists">>, ListName]), - send_view_list_response(ListLang, ListSrc, ViewName, ViewDesignId, Req, Db, Keys). - - -send_view_list_response(Lang, ListSrc, ViewName, DesignId, Req, Db, Keys) -> - Stale = couch_httpd_view:get_stale_type(Req), - Reduce = couch_httpd_view:get_reduce_type(Req), - case couch_view:get_map_view(Db, DesignId, ViewName, Stale) of - {ok, View, Group} -> - QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, map), - output_map_list(Req, Lang, ListSrc, View, Group, Db, QueryArgs, Keys); - {not_found, _Reason} -> - case couch_view:get_reduce_view(Db, DesignId, ViewName, Stale) of - {ok, ReduceView, Group} -> - case Reduce of - false -> - QueryArgs = couch_httpd_view:parse_view_params( - Req, Keys, map_red - ), - MapView = couch_view:extract_map_view(ReduceView), - output_map_list(Req, Lang, ListSrc, MapView, Group, Db, QueryArgs, Keys); - _ -> - QueryArgs = couch_httpd_view:parse_view_params( - Req, Keys, reduce - ), - output_reduce_list(Req, Lang, ListSrc, ReduceView, Group, Db, QueryArgs, Keys) - end; - {not_found, Reason} -> - throw({not_found, Reason}) - end - end. +handle_view_list_req(#httpd{method='POST'}=Req, _Db, _DDoc) -> + send_error(Req, 404, <<"list_error">>, <<"Invalid path.">>); +handle_view_list_req(Req, _Db, _DDoc) -> + send_method_not_allowed(Req, "GET,POST,HEAD"). -output_map_list(#httpd{mochi_req=MReq, user_ctx=UserCtx}=Req, Lang, ListSrc, View, Group, Db, QueryArgs, nil) -> +handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> + ViewDesignId = <<"_design/", ViewDesignName/binary>>, + {ViewType, View, Group, QueryArgs} = couch_httpd_view:load_view(Req, Db, {ViewDesignId, ViewName}, Keys), + Etag = list_etag(Req, Db, Group, {couch_httpd:doc_etag(DDoc), Keys}), + couch_httpd:etag_respond(Req, Etag, fun() -> + output_list(ViewType, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys) + end). + +list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, More) -> + Accept = couch_httpd:header_value(Req, "Accept"), + couch_httpd_view:view_group_etag(Group, Db, {More, Accept, UserCtx#user_ctx.roles}). + +output_list(map, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys) -> + output_map_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys); +output_list(reduce, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys) -> + output_reduce_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys). + +% next step: +% use with_ddoc_proc/2 to make this simpler +output_map_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys) -> #view_query_args{ limit = Limit, skip = SkipCount } = QueryArgs, + + FoldAccInit = {Limit, SkipCount, undefined, []}, {ok, RowCount} = couch_view:get_row_count(View), - Headers = MReq:get(headers), - Hlist = mochiweb_headers:to_list(Headers), - Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd_view:view_group_etag(Group, Db, {Lang, ListSrc, Accept, UserCtx}), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - % get the os process here - % pass it into the view fold with closures - {ok, QueryServer} = couch_query_servers:start_view_list(Lang, ListSrc), + - StartListRespFun = make_map_start_resp_fun(QueryServer, Db), - SendListRowFun = make_map_send_row_fun(QueryServer), + couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) -> + + ListFoldHelpers = #view_fold_helper_funs{ + reduce_count = fun couch_view:reduce_to_count/1, + start_response = StartListRespFun = make_map_start_resp_fun(QServer, Db, LName), + send_row = make_map_send_row_fun(QServer) + }, + + {ok, _, FoldResult} = case Keys of + nil -> + FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, Etag, Db, RowCount, ListFoldHelpers), + couch_view:fold(View, FoldlFun, FoldAccInit, + couch_httpd_view:make_key_options(QueryArgs)); + Keys -> + lists:foldl( + fun(Key, {ok, _, FoldAcc}) -> + QueryArgs2 = QueryArgs#view_query_args{ + start_key = Key, + end_key = Key + }, + FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs2, Etag, Db, RowCount, ListFoldHelpers), + couch_view:fold(View, FoldlFun, FoldAcc, + couch_httpd_view:make_key_options(QueryArgs2)) + end, {ok, nil, FoldAccInit}, Keys) + end, + finish_list(Req, QServer, Etag, FoldResult, StartListRespFun, RowCount) + end). - FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, RowCount, - #view_fold_helper_funs{ - reduce_count = fun couch_view:reduce_to_count/1, - start_response = StartListRespFun, - send_row = SendListRowFun - }), - FoldAccInit = {Limit, SkipCount, undefined, []}, - {ok, _, FoldResult} = couch_view:fold(View, FoldlFun, FoldAccInit, - couch_httpd_view:make_key_options(QueryArgs)), - finish_list(Req, QueryServer, CurrentEtag, FoldResult, StartListRespFun, RowCount) - end); -output_map_list(#httpd{mochi_req=MReq, user_ctx=UserCtx}=Req, Lang, ListSrc, View, Group, Db, QueryArgs, Keys) -> +output_reduce_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys) -> #view_query_args{ limit = Limit, - skip = SkipCount + skip = SkipCount, + group_level = GroupLevel } = QueryArgs, - {ok, RowCount} = couch_view:get_row_count(View), - Headers = MReq:get(headers), - Hlist = mochiweb_headers:to_list(Headers), - Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd_view:view_group_etag(Group, Db, {Lang, ListSrc, Accept, UserCtx, Keys}), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - % get the os process here - % pass it into the view fold with closures - {ok, QueryServer} = couch_query_servers:start_view_list(Lang, ListSrc), - - StartListRespFun = make_map_start_resp_fun(QueryServer, Db), - SendListRowFun = make_map_send_row_fun(QueryServer), + couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) -> + StartListRespFun = make_reduce_start_resp_fun(QServer, Db, LName), + SendListRowFun = make_reduce_send_row_fun(QServer, Db), + {ok, GroupRowsFun, RespFun} = couch_httpd_view:make_reduce_fold_funs(Req, + GroupLevel, QueryArgs, Etag, + #reduce_fold_helper_funs{ + start_response = StartListRespFun, + send_row = SendListRowFun + }), FoldAccInit = {Limit, SkipCount, undefined, []}, - {ok, _, FoldResult} = lists:foldl( - fun(Key, {ok, _, FoldAcc}) -> - QueryArgs2 = QueryArgs#view_query_args{ - start_key = Key, - end_key = Key - }, - FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs2, CurrentEtag, Db, RowCount, - #view_fold_helper_funs{ - reduce_count = fun couch_view:reduce_to_count/1, - start_response = StartListRespFun, - send_row = SendListRowFun - }), - couch_view:fold(View, FoldlFun, FoldAcc, - couch_httpd_view:make_key_options(QueryArgs2)) - end, {ok, nil, FoldAccInit}, Keys), - finish_list(Req, QueryServer, CurrentEtag, FoldResult, StartListRespFun, RowCount) + {ok, FoldResult} = case Keys of + nil -> + couch_view:fold_reduce(View, RespFun, FoldAccInit, [{key_group_fun, GroupRowsFun} | + couch_httpd_view:make_key_options(QueryArgs)]); + Keys -> + lists:foldl( + fun(Key, {ok, FoldAcc}) -> + couch_view:fold_reduce(View, RespFun, FoldAcc, + [{key_group_fun, GroupRowsFun} | + couch_httpd_view:make_key_options( + QueryArgs#view_query_args{start_key=Key, end_key=Key})] + ) + end, {ok, FoldAccInit}, Keys) + end, + finish_list(Req, QServer, Etag, FoldResult, StartListRespFun, null) end). -make_map_start_resp_fun(QueryServer, Db) -> + +make_map_start_resp_fun(QueryServer, Db, LName) -> fun(Req, Etag, TotalRows, Offset, _Acc) -> Head = {[{<<"total_rows">>, TotalRows}, {<<"offset">>, Offset}]}, - start_list_resp(QueryServer, Req, Db, Head, Etag) + start_list_resp(QueryServer, LName, Req, Db, Head, Etag) end. -make_reduce_start_resp_fun(QueryServer, _Req, Db, _CurrentEtag) -> +make_reduce_start_resp_fun(QueryServer, Db, LName) -> fun(Req2, Etag, _Acc) -> - start_list_resp(QueryServer, Req2, Db, {[]}, Etag) + start_list_resp(QueryServer, LName, Req2, Db, {[]}, Etag) end. -start_list_resp(QueryServer, Req, Db, Head, Etag) -> - [<<"start">>,Chunks,JsonResp] = couch_query_servers:render_list_head(QueryServer, - Req, Db, Head), +start_list_resp(QServer, LName, Req, Db, Head, Etag) -> + JsonReq = couch_httpd_external:json_req_obj(Req, Db), + [<<"start">>,Chunks,JsonResp] = couch_query_servers:ddoc_proc_prompt(QServer, + [<<"lists">>, LName], [Head, JsonReq]), JsonResp2 = apply_etag(JsonResp, Etag), #extern_resp_args{ code = Code, @@ -255,7 +283,7 @@ make_reduce_send_row_fun(QueryServer, Db) -> send_list_row(Resp, QueryServer, Db, Row, RowFront, IncludeDoc) -> try - [Go,Chunks] = couch_query_servers:render_list_row(QueryServer, Db, Row, IncludeDoc), + [Go,Chunks] = prompt_list_row(QueryServer, Db, Row, IncludeDoc), Chunk = RowFront ++ ?b2l(?l2b(Chunks)), send_non_empty_chunk(Resp, Chunk), case Go of @@ -270,78 +298,22 @@ send_list_row(Resp, QueryServer, Db, Row, RowFront, IncludeDoc) -> throw({already_sent, Resp, Error}) end. + +prompt_list_row({Proc, _DDocId}, Db, {{Key, DocId}, Value}, IncludeDoc) -> + JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), + couch_query_servers:proc_prompt(Proc, [<<"list_row">>, JsonRow]); + +prompt_list_row({Proc, _DDocId}, _, {Key, Value}, _IncludeDoc) -> + JsonRow = {[{key, Key}, {value, Value}]}, + couch_query_servers:proc_prompt(Proc, [<<"list_row">>, JsonRow]). + send_non_empty_chunk(Resp, Chunk) -> case Chunk of [] -> ok; _ -> send_chunk(Resp, Chunk) end. -output_reduce_list(#httpd{mochi_req=MReq, user_ctx=UserCtx}=Req, Lang, ListSrc, View, Group, Db, QueryArgs, nil) -> - #view_query_args{ - limit = Limit, - skip = SkipCount, - group_level = GroupLevel - } = QueryArgs, - Headers = MReq:get(headers), - Hlist = mochiweb_headers:to_list(Headers), - Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd_view:view_group_etag(Group, Db, {Lang, ListSrc, Accept, UserCtx}), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - % get the os process here - % pass it into the view fold with closures - {ok, QueryServer} = couch_query_servers:start_view_list(Lang, ListSrc), - StartListRespFun = make_reduce_start_resp_fun(QueryServer, Req, Db, CurrentEtag), - SendListRowFun = make_reduce_send_row_fun(QueryServer, Db), - - {ok, GroupRowsFun, RespFun} = couch_httpd_view:make_reduce_fold_funs(Req, - GroupLevel, QueryArgs, CurrentEtag, - #reduce_fold_helper_funs{ - start_response = StartListRespFun, - send_row = SendListRowFun - }), - FoldAccInit = {Limit, SkipCount, undefined, []}, - {ok, FoldResult} = couch_view:fold_reduce(View, RespFun, FoldAccInit, - [{key_group_fun, GroupRowsFun} | - couch_httpd_view:make_key_options(QueryArgs)]), - finish_list(Req, QueryServer, CurrentEtag, FoldResult, StartListRespFun, null) - end); - -output_reduce_list(#httpd{mochi_req=MReq, user_ctx=UserCtx}=Req, Lang, ListSrc, View, Group, Db, QueryArgs, Keys) -> - #view_query_args{ - limit = Limit, - skip = SkipCount, - group_level = GroupLevel - } = QueryArgs, - Headers = MReq:get(headers), - Hlist = mochiweb_headers:to_list(Headers), - Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd_view:view_group_etag(Group, Db, {Lang, ListSrc, Accept, UserCtx, Keys}), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - % get the os process here - % pass it into the view fold with closures - {ok, QueryServer} = couch_query_servers:start_view_list(Lang, ListSrc), - StartListRespFun = make_reduce_start_resp_fun(QueryServer, Req, Db, CurrentEtag), - SendListRowFun = make_reduce_send_row_fun(QueryServer, Db), - - {ok, GroupRowsFun, RespFun} = couch_httpd_view:make_reduce_fold_funs(Req, - GroupLevel, QueryArgs, CurrentEtag, - #reduce_fold_helper_funs{ - start_response = StartListRespFun, - send_row = SendListRowFun - }), - FoldAccInit = {Limit, SkipCount, undefined, []}, - {ok, FoldResult} = lists:foldl( - fun(Key, {ok, FoldAcc}) -> - couch_view:fold_reduce(View, RespFun, FoldAcc, - [{key_group_fun, GroupRowsFun} | - couch_httpd_view:make_key_options( - QueryArgs#view_query_args{start_key=Key, end_key=Key})] - ) - end, {ok, FoldAccInit}, Keys), - finish_list(Req, QueryServer, CurrentEtag, FoldResult, StartListRespFun, null) - end). - -finish_list(Req, QueryServer, Etag, FoldResult, StartFun, TotalRows) -> +finish_list(Req, {Proc, _DDocId}, Etag, FoldResult, StartFun, TotalRows) -> FoldResult2 = case FoldResult of {Limit, SkipCount, Response, RowAcc} -> {Limit, SkipCount, Response, RowAcc, nil}; @@ -352,16 +324,15 @@ finish_list(Req, QueryServer, Etag, FoldResult, StartFun, TotalRows) -> {_, _, undefined, _, _} -> {ok, Resp, BeginBody} = render_head_for_empty_list(StartFun, Req, Etag, TotalRows), - [<<"end">>, Chunks] = couch_query_servers:render_list_tail(QueryServer), + [<<"end">>, Chunks] = couch_query_servers:proc_prompt(Proc, [<<"list_end">>]), Chunk = BeginBody ++ ?b2l(?l2b(Chunks)), send_non_empty_chunk(Resp, Chunk); {_, _, Resp, stop, _} -> ok; {_, _, Resp, _, _} -> - [<<"end">>, Chunks] = couch_query_servers:render_list_tail(QueryServer), + [<<"end">>, Chunks] = couch_query_servers:proc_prompt(Proc, [<<"list_end">>]), send_non_empty_chunk(Resp, ?b2l(?l2b(Chunks))) end, - couch_query_servers:stop_doc_map(QueryServer), last_chunk(Resp). @@ -370,53 +341,6 @@ render_head_for_empty_list(StartListRespFun, Req, Etag, null) -> render_head_for_empty_list(StartListRespFun, Req, Etag, TotalRows) -> StartListRespFun(Req, Etag, TotalRows, null, []). -send_doc_show_response(Lang, ShowSrc, DocId, nil, #httpd{mochi_req=MReq, user_ctx=UserCtx}=Req, Db) -> - % compute etag with no doc - Headers = MReq:get(headers), - Hlist = mochiweb_headers:to_list(Headers), - Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, nil, Accept, UserCtx}), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - [<<"resp">>, ExternalResp] = couch_query_servers:render_doc_show(Lang, ShowSrc, - DocId, nil, Req, Db), - JsonResp = apply_etag(ExternalResp, CurrentEtag), - couch_httpd_external:send_external_response(Req, JsonResp) - end); - -send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=Revs}=Doc, #httpd{mochi_req=MReq, user_ctx=UserCtx}=Req, Db) -> - % calculate the etag - Headers = MReq:get(headers), - Hlist = mochiweb_headers:to_list(Headers), - Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, Revs, Accept, UserCtx}), - % We know our etag now - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - [<<"resp">>, ExternalResp] = couch_query_servers:render_doc_show(Lang, ShowSrc, - DocId, Doc, Req, Db), - JsonResp = apply_etag(ExternalResp, CurrentEtag), - couch_httpd_external:send_external_response(Req, JsonResp) - end). - -send_doc_update_response(Lang, UpdateSrc, DocId, Doc, Req, Db) -> - case couch_query_servers:render_doc_update(Lang, UpdateSrc, - DocId, Doc, Req, Db) of - [<<"up">>, {NewJsonDoc}, JsonResp] -> - Options = case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of - "true" -> - [full_commit]; - _ -> - [] - end, - NewDoc = couch_doc:from_json_obj({NewJsonDoc}), - Code = 201, - % todo set location field - {ok, _NewRev} = couch_db:update_doc(Db, NewDoc, Options); - [<<"up">>, _Other, JsonResp] -> - Code = 200, - ok - end, - JsonResp2 = json_apply_field({<<"code">>, Code}, JsonResp), - couch_httpd_external:send_external_response(Req, JsonResp2). % Maybe this is in the proplists API % todo move to couch_util diff --git a/src/couchdb/couch_httpd_view.erl b/src/couchdb/couch_httpd_view.erl index af31ac9c..6419ca55 100644 --- a/src/couchdb/couch_httpd_view.erl +++ b/src/couchdb/couch_httpd_view.erl @@ -13,21 +13,21 @@ -module(couch_httpd_view). -include("couch_db.hrl"). --export([handle_view_req/2,handle_temp_view_req/2,handle_db_view_req/2]). +-export([handle_view_req/3,handle_temp_view_req/2]). -export([get_stale_type/1, get_reduce_type/1, parse_view_params/3]). -export([make_view_fold_fun/6, finish_view_fold/4, view_row_obj/3]). -export([view_group_etag/2, view_group_etag/3, make_reduce_fold_funs/5]). -export([design_doc_view/5, parse_bool_param/1, doc_member/2]). --export([make_key_options/1]). +-export([make_key_options/1, load_view/4]). -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,send_chunk/2, start_json_response/2, start_json_response/3, end_json_response/1, send_chunked_error/2]). -design_doc_view(Req, Db, Id, ViewName, Keys) -> - DesignId = <<"_design/", Id/binary>>, +design_doc_view(Req, Db, DName, ViewName, Keys) -> + DesignId = <<"_design/", DName/binary>>, Stale = get_stale_type(Req), Reduce = get_reduce_type(Req), Result = case couch_view:get_map_view(Db, DesignId, ViewName, Stale) of @@ -54,11 +54,11 @@ design_doc_view(Req, Db, Id, ViewName, Keys) -> Result. handle_view_req(#httpd{method='GET', - path_parts=[_Db, _Design, DName, _View, ViewName]}=Req, Db) -> + path_parts=[_, _, DName, _, ViewName]}=Req, Db, _DDoc) -> design_doc_view(Req, Db, DName, ViewName, nil); handle_view_req(#httpd{method='POST', - path_parts=[_Db, _Design, DName, _View, ViewName]}=Req, Db) -> + path_parts=[_, _, DName, _, ViewName]}=Req, Db, _DDoc) -> {Fields} = couch_httpd:json_body_obj(Req), case proplists:get_value(<<"keys">>, Fields, nil) of nil -> @@ -71,50 +71,7 @@ handle_view_req(#httpd{method='POST', throw({bad_request, "`keys` member must be a array."}) end; -handle_view_req(Req, _Db) -> - send_method_not_allowed(Req, "GET,POST,HEAD"). - -handle_db_view_req(#httpd{method='GET', - path_parts=[_Db, _View, DName, ViewName]}=Req, Db) -> - QueryArgs = couch_httpd_view:parse_view_params(Req, nil, nil), - #view_query_args{ - list = ListName - } = QueryArgs, - ?LOG_DEBUG("ici ~p", [ListName]), - case ListName of - nil -> couch_httpd_view:design_doc_view(Req, Db, DName, ViewName, nil); - _ -> - couch_httpd_show:handle_view_list(Req, DName, ListName, DName, ViewName, Db, nil) - end; - -handle_db_view_req(#httpd{method='POST', - path_parts=[_Db, _View, DName, ViewName]}=Req, Db) -> - QueryArgs = couch_httpd_view:parse_view_params(Req, nil, nil), - #view_query_args{ - list = ListName - } = QueryArgs, - case ListName of - nil -> - {Fields} = couch_httpd:json_body_obj(Req), - case proplists:get_value(<<"keys">>, Fields, nil) of - nil -> - Fmt = "POST to view ~p/~p in database ~p with no keys member.", - ?LOG_DEBUG(Fmt, [DName, ViewName, Db]), - couch_httpd_view:design_doc_view(Req, Db, DName, ViewName, nil); - Keys when is_list(Keys) -> - couch_httpd_view:design_doc_view(Req, Db, DName, ViewName, Keys); - _ -> - throw({bad_request, "`keys` member must be a array."}) - end; - _ -> - ReqBody = couch_httpd:body(Req), - {Props2} = ?JSON_DECODE(ReqBody), - Keys = proplists:get_value(<<"keys">>, Props2, nil), - couch_httpd_show:handle_view_list(Req#httpd{req_body=ReqBody}, - DName, ListName, DName, ViewName, Db, Keys) - end; - -handle_db_view_req(Req, _Db) -> +handle_view_req(Req, _Db, _DDoc) -> send_method_not_allowed(Req, "GET,POST,HEAD"). handle_temp_view_req(#httpd{method='POST'}=Req, Db) -> @@ -236,6 +193,35 @@ get_stale_type(Req) -> get_reduce_type(Req) -> list_to_atom(couch_httpd:qs_value(Req, "reduce", "true")). +load_view(Req, Db, {ViewDesignId, ViewName}, Keys) -> + Stale = couch_httpd_view:get_stale_type(Req), + Reduce = couch_httpd_view:get_reduce_type(Req), + case couch_view:get_map_view(Db, ViewDesignId, ViewName, Stale) of + {ok, View, Group} -> + QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, map), + {map, View, Group, QueryArgs}; + {not_found, _Reason} -> + case couch_view:get_reduce_view(Db, ViewDesignId, ViewName, Stale) of + {ok, ReduceView, Group} -> + case Reduce of + false -> + QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, map_red), + MapView = couch_view:extract_map_view(ReduceView), + {map, MapView, Group, QueryArgs}; + _ -> + QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, reduce), + {reduce, ReduceView, Group, QueryArgs} + end; + {not_found, Reason} -> + throw({not_found, Reason}) + end + end. + +% query_parse_error could be removed +% we wouldn't need to pass the view type, it'd just parse params. +% I'm not sure what to do about the error handling, but +% it might simplify things to have a parse_view_params function +% that doesn't throw(). parse_view_params(Req, Keys, ViewType) -> QueryList = couch_httpd:qs(Req), QueryParams = @@ -258,6 +244,7 @@ parse_view_params(Req, Keys, ViewType) -> {reduce, _, false} -> QueryArgs; {reduce, _, _} -> + % we can simplify code if we just drop this error message. Msg = <<"Multi-key fetchs for reduce " "view must include `group=true`">>, throw({query_parse_error, Msg}); diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl index 2b74073c..65e4e131 100644 --- a/src/couchdb/couch_native_process.erl +++ b/src/couchdb/couch_native_process.erl @@ -38,63 +38,102 @@ % extensions will evolve which offer useful layers on top of this view server % to help simplify your view code. -module(couch_native_process). +-behaviour(gen_server). --export([start_link/0]). --export([set_timeout/2, prompt/2, stop/1]). +-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2]). +-export([set_timeout/2, prompt/2]). -define(STATE, native_proc_state). --record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}). +-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}). -include("couch_db.hrl"). start_link() -> - {ok, self()}. + gen_server:start_link(?MODULE, [], []). -stop(_Pid) -> - ok. +% this is a bit messy, see also couch_query_servers handle_info +% stop(_Pid) -> +% ok. -set_timeout(_Pid, TimeOut) -> - NewState = case get(?STATE) of - undefined -> - #evstate{timeout=TimeOut}; - State -> - State#evstate{timeout=TimeOut} - end, - put(?STATE, NewState), - ok. +set_timeout(Pid, TimeOut) -> + gen_server:call(Pid, {set_timeout, TimeOut}). -prompt(Pid, Data) when is_pid(Pid), is_list(Data) -> - case get(?STATE) of - undefined -> - State = #evstate{}, - put(?STATE, State); - State -> - State - end, - case is_pid(State#evstate.list_pid) of - true -> - case hd(Data) of - <<"list_row">> -> ok; - <<"list_end">> -> ok; - _ -> throw({error, query_server_error}) - end; - _ -> - ok % Not listing - end, - {NewState, Resp} = run(State, to_binary(Data)), - put(?STATE, NewState), +prompt(Pid, Data) when is_list(Data) -> + gen_server:call(Pid, {prompt, Data}). + +% gen_server callbacks +init([]) -> + {ok, #evstate{ddocs=dict:new()}}. + +handle_call({set_timeout, TimeOut}, _From, State) -> + {reply, ok, State#evstate{timeout=TimeOut}}; + +handle_call({prompt, Data}, _From, State) -> + ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]), + {NewState, Resp} = try run(State, to_binary(Data)) of + {S, R} -> {S, R} + catch + throw:{error, Why} -> + {State, [<<"error">>, Why, Why]} + end, + case Resp of {error, Reason} -> Msg = io_lib:format("couch native server error: ~p", [Reason]), - {[{<<"error">>, list_to_binary(Msg)}]}; - _ -> - Resp + {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState}; + [<<"error">> | Rest] -> + Msg = io_lib:format("couch native server error: ~p", [Rest]), + {reply, [<<"error">> | Rest], NewState}; + [<<"fatal">> | Rest] -> + Msg = io_lib:format("couch native server error: ~p", [Rest]), + {stop, fatal, [<<"error">> | Rest], NewState}; + Resp -> + {reply, Resp, NewState} end. -run(_, [<<"reset">>]) -> - {#evstate{}, true}; -run(_, [<<"reset">>, QueryConfig]) -> - {#evstate{query_config=QueryConfig}, true}; +handle_cast(_Msg, State) -> {noreply, State}. +handle_info(_Msg, State) -> {noreply, State}. +terminate(_Reason, _State) -> ok. +code_change(_OldVersion, State, _Extra) -> {ok, State}. + +run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> + Pid ! {self(), list_row, Row}, + receive + {Pid, chunks, Data} -> + {State, [<<"chunks">>, Data]}; + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, [<<"end">>, Data]} + after State#evstate.timeout -> + throw({timeout, list_row}) + end; +run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> + Pid ! {self(), list_end}, + Resp = + receive + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + [<<"end">>, Data] + after State#evstate.timeout -> + throw({timeout, list_end}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, Resp}; +run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) -> + {State, [<<"error">>, list_error, list_error]}; +run(#evstate{ddocs=DDocs}, [<<"reset">>]) -> + {#evstate{ddocs=DDocs}, true}; +run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) -> + {#evstate{ddocs=DDocs, query_config=QueryConfig}, true}; run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> FunInfo = makefun(State, BinFunc), {State#evstate{funs=Funs ++ [FunInfo]}, true}; @@ -115,41 +154,55 @@ run(State, [<<"reduce">>, Funs, KVs]) -> {State, catch reduce(State, Funs, Keys2, Vals2, false)}; run(State, [<<"rereduce">>, Funs, Vals]) -> {State, catch reduce(State, Funs, null, Vals, true)}; -run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) -> - {_Sig, Fun} = makefun(State, BFun), - {State, catch Fun(NDoc, ODoc, Ctx)}; -run(State, [<<"filter">>, Docs, Req]) -> - {_Sig, Fun} = hd(State#evstate.funs), +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> + DDocs2 = store_ddoc(DDocs, DDocId, DDoc), + {State#evstate{ddocs=DDocs2}, true}; +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) -> + DDoc = load_ddoc(DDocs, DDocId), + ddoc(State, DDoc, Rest); +run(_, Unknown) -> + ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), + throw({error, unknown_command}). + +ddoc(State, {DDoc}, [FunPath, Args]) -> + % load fun from the FunPath + BFun = lists:foldl(fun + (Key, {Props}) when is_list(Props) -> + proplists:get_value(Key, Props, nil); + (Key, Fun) when is_binary(Fun) -> + Fun; + (Key, nil) -> + throw({error, not_found}); + (Key, Fun) -> + throw({error, malformed_ddoc}) + end, {DDoc}, FunPath), + ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args). + +ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) -> + {State, (catch apply(Fun, Args))}; +ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) -> Resp = lists:map(fun(Doc) -> (catch Fun(Doc, Req)) =:= true end, Docs), {State, [true, Resp]}; -run(State, [<<"show">>, BFun, Doc, Req]) -> - {_Sig, Fun} = makefun(State, BFun), - Resp = case (catch Fun(Doc, Req)) of +ddoc(State, {_, Fun}, [<<"shows">>|_], Args) -> + Resp = case (catch apply(Fun, Args)) of FunResp when is_list(FunResp) -> FunResp; - FunResp when tuple_size(FunResp) =:= 1 -> - [<<"resp">>, FunResp]; + {FunResp} -> + [<<"resp">>, {FunResp}]; FunResp -> FunResp end, {State, Resp}; -run(State, [<<"update">>, BFun, Doc, Req]) -> - {_Sig, Fun} = makefun(State, BFun), - Resp = case (catch Fun(Doc, Req)) of +ddoc(State, {_, Fun}, [<<"updates">>|_], Args) -> + Resp = case (catch apply(Fun, Args)) of [JsonDoc, JsonResp] -> [<<"up">>, JsonDoc, JsonResp] end, {State, Resp}; -run(State, [<<"list">>, Head, Req]) -> - {Sig, Fun} = hd(State#evstate.funs), - % This is kinda dirty - case is_function(Fun, 2) of - false -> throw({error, render_error}); - true -> ok - end, +ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> Self = self(), SpawnFun = fun() -> - LastChunk = (catch Fun(Head, Req)), + LastChunk = (catch apply(Fun, Args)), case start_list_resp(Self, Sig) of started -> receive @@ -177,44 +230,20 @@ run(State, [<<"list">>, Head, Req]) -> after State#evstate.timeout -> throw({timeout, list_start}) end, - {State#evstate{list_pid=Pid}, Resp}; -run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> - Pid ! {self(), list_row, Row}, - receive - {Pid, chunks, Data} -> - {State, [<<"chunks">>, Data]}; - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, [<<"end">>, Data]} - after State#evstate.timeout -> - throw({timeout, list_row}) - end; -run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> - Pid ! {self(), list_end}, - Resp = - receive - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - [<<"end">>, Data] - after State#evstate.timeout -> - throw({timeout, list_end}) - end, - process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, Resp}; -run(_, Unknown) -> - ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), - throw({error, query_server_error}). + {State#evstate{list_pid=Pid}, Resp}. + +store_ddoc(DDocs, DDocId, DDoc) -> + dict:store(DDocId, DDoc, DDocs). +load_ddoc(DDocs, DDocId) -> + try dict:fetch(DDocId, DDocs) of + {DDoc} -> {DDoc} + catch + _:Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))}) + end. bindings(State, Sig) -> + bindings(State, Sig, nil). +bindings(State, Sig, DDoc) -> Self = self(), Log = fun(Msg) -> @@ -262,14 +291,19 @@ bindings(State, Sig) -> FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, - [ + Bindings = [ {'Log', Log}, {'Emit', Emit}, {'Start', Start}, {'Send', Send}, {'GetRow', GetRow}, {'FoldRows', FoldRows} - ]. + ], + case DDoc of + {Props} -> + Bindings ++ [{'DDoc', DDoc}]; + _Else -> Bindings + end. % thanks to erlview, via: % http://erlang.org/pipermail/erlang-questions/2003-November/010544.html @@ -277,8 +311,11 @@ makefun(State, Source) -> Sig = erlang:md5(Source), BindFuns = bindings(State, Sig), {Sig, makefun(State, Source, BindFuns)}. - -makefun(_State, Source, BindFuns) -> +makefun(State, Source, {DDoc}) -> + Sig = erlang:md5(lists:flatten([Source, term_to_binary(DDoc)])), + BindFuns = bindings(State, Sig, {DDoc}), + {Sig, makefun(State, Source, BindFuns)}; +makefun(_State, Source, BindFuns) when is_list(BindFuns) -> FunStr = binary_to_list(Source), {ok, Tokens, _} = erl_scan:string(FunStr), Form = case (catch erl_parse:parse_exprs(Tokens)) of diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl index 72b715c3..5ac13715 100644 --- a/src/couchdb/couch_os_process.erl +++ b/src/couchdb/couch_os_process.erl @@ -53,7 +53,7 @@ prompt(Pid, Data) -> {ok, Result} -> Result; Error -> - ?LOG_ERROR("OS Process Error :: ~p",[Error]), + ?LOG_ERROR("OS Process Error ~p :: ~p",[Pid,Error]), throw(Error) end. @@ -80,22 +80,24 @@ readline(#os_proc{port = Port} = OsProc, Acc) -> % Standard JSON functions writejson(OsProc, Data) when is_record(OsProc, os_proc) -> - % ?LOG_DEBUG("OS Process Input :: ~p", [Data]), - true = writeline(OsProc, ?JSON_ENCODE(Data)). + JsonData = ?JSON_ENCODE(Data), + ?LOG_DEBUG("OS Process ~p Input :: ~s", [OsProc#os_proc.port, JsonData]), + true = writeline(OsProc, JsonData). readjson(OsProc) when is_record(OsProc, os_proc) -> Line = readline(OsProc), + ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]), case ?JSON_DECODE(Line) of [<<"log">>, Msg] when is_binary(Msg) -> % we got a message to log. Log it and continue - ?LOG_INFO("OS Process :: ~s", [Msg]), + ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]), readjson(OsProc); - {[{<<"error">>, Id}, {<<"reason">>, Reason}]} -> + [<<"error">>, Id, Reason] -> throw({list_to_atom(binary_to_list(Id)),Reason}); - {[{<<"reason">>, Reason}, {<<"error">>, Id}]} -> + [<<"fatal">>, Id, Reason] -> + ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p",[OsProc#os_proc.port, Id, Reason]), throw({list_to_atom(binary_to_list(Id)),Reason}); Result -> - % ?LOG_DEBUG("OS Process Output :: ~p", [Result]), Result end. @@ -112,6 +114,7 @@ init([Command, Options, PortOptions]) -> }, KillCmd = readline(BaseProc), Pid = self(), + ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]), spawn(fun() -> % this ensure the real os process is killed when this process dies. erlang:monitor(process, Pid), @@ -143,8 +146,12 @@ handle_call({prompt, Data}, _From, OsProc) -> Writer(OsProc, Data), {reply, {ok, Reader(OsProc)}, OsProc} catch - throw:OsError -> - {stop, normal, OsError, OsProc} + throw:{error, OsError} -> + {reply, OsError, OsProc}; + throw:{fatal, OsError} -> + {stop, normal, OsError, OsProc}; + throw:OtherError -> + {stop, normal, OtherError, OsProc} end. handle_cast({send, Data}, #os_proc{writer=Writer}=OsProc) -> diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 4ac56727..30f4c4c7 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -17,10 +17,11 @@ -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). --export([reduce/3, rereduce/3,validate_doc_update/5]). --export([render_doc_show/6, render_doc_update/6, start_view_list/2, - render_list_head/4, render_list_row/4, render_list_tail/1]). +-export([reduce/3, rereduce/3,validate_doc_update/4]). -export([filter_docs/5]). + +-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). + % -export([test/0]). -include("couch_db.hrl"). @@ -28,6 +29,7 @@ -record(proc, { pid, lang, + ddoc_keys = [], prompt_fun, set_timeout_fun, stop_fun @@ -37,7 +39,7 @@ start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). stop() -> - exit(whereis(couch_query_servers), close). + exit(whereis(couch_query_servers), normal). start_doc_map(Lang, Functions) -> Proc = get_os_process(Lang), @@ -91,21 +93,15 @@ group_reductions_results(List) -> rereduce(_Lang, [], _ReducedValues) -> {ok, []}; rereduce(Lang, RedSrcs, ReducedValues) -> - Proc = get_os_process(Lang), - Grouped = group_reductions_results(ReducedValues), - Results = try lists:zipwith( + Grouped = group_reductions_results(ReducedValues), + Results = lists:zipwith( fun (<<"_", _/binary>> = FunSrc, Values) -> {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), Result; (FunSrc, Values) -> - [true, [Result]] = - proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]), - Result - end, RedSrcs, Grouped) - after - ok = ret_os_process(Proc) - end, + os_rereduce(Lang, [FunSrc], Values) + end, RedSrcs, Grouped), {ok, Results}. reduce(_Lang, [], _KVs) -> @@ -137,6 +133,17 @@ os_reduce(Lang, OsRedSrcs, KVs) -> end, {ok, OsResults}. +os_rereduce(_Lang, [], _KVs) -> + {ok, []}; +os_rereduce(Lang, OsRedSrcs, KVs) -> + Proc = get_os_process(Lang), + try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of + [true, [Reduction]] -> Reduction + after + ok = ret_os_process(Proc) + end. + + builtin_reduce(_Re, [], _KVs, Acc) -> {ok, lists:reverse(Acc)}; builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) -> @@ -157,92 +164,49 @@ builtin_sum_rows(KVs) -> throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>}) end, 0, KVs). -validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> - Proc = get_os_process(Lang), - JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), - JsonDiskDoc = - if DiskDoc == nil -> - null; - true -> - couch_doc:to_json_obj(DiskDoc, [revs]) - end, - try proc_prompt(Proc, - [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of - 1 -> - ok; - {[{<<"forbidden">>, Message}]} -> - throw({forbidden, Message}); - {[{<<"unauthorized">>, Message}]} -> - throw({unauthorized, Message}) - after - ok = ret_os_process(Proc) - end. -% todo use json_apply_field -append_docid(DocId, JsonReqIn) -> - [{<<"docId">>, DocId} | JsonReqIn]. -render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> - Proc = get_os_process(Lang), - {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), - - {JsonReq, JsonDoc} = case {DocId, Doc} of - {nil, nil} -> {{JsonReqIn}, null}; - {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; - _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} - end, - try proc_prompt(Proc, [<<"show">>, ShowSrc, JsonDoc, JsonReq]) - after - ok = ret_os_process(Proc) - end. - -render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> - Proc = get_os_process(Lang), - {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), - - {JsonReq, JsonDoc} = case {DocId, Doc} of - {nil, nil} -> {{JsonReqIn}, null}; - {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; - _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} - end, - try proc_prompt(Proc, [<<"update">>, UpdateSrc, JsonDoc, JsonReq]) - after - ok = ret_os_process(Proc) +% use the function stored in ddoc.validate_doc_update to test an update. +validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx) -> + JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), + JsonDiskDoc = json_doc(DiskDoc), + case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx]) of + 1 -> + ok; + {[{<<"forbidden">>, Message}]} -> + throw({forbidden, Message}); + {[{<<"unauthorized">>, Message}]} -> + throw({unauthorized, Message}) end. -start_view_list(Lang, ListSrc) -> - Proc = get_os_process(Lang), - proc_prompt(Proc, [<<"add_fun">>, ListSrc]), - {ok, Proc}. - -render_list_head(Proc, Req, Db, Head) -> - JsonReq = couch_httpd_external:json_req_obj(Req, Db), - proc_prompt(Proc, [<<"list">>, Head, JsonReq]). - -render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) -> - JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), - proc_prompt(Proc, [<<"list_row">>, JsonRow]); - -render_list_row(Proc, _, {Key, Value}, _IncludeDoc) -> - JsonRow = {[{key, Key}, {value, Value}]}, - proc_prompt(Proc, [<<"list_row">>, JsonRow]). - -render_list_tail(Proc) -> - JsonResp = proc_prompt(Proc, [<<"list_end">>]), - ok = ret_os_process(Proc), - JsonResp. +json_doc(nil) -> null; +json_doc(Doc) -> + couch_doc:to_json_obj(Doc, [revs]). -filter_docs(Lang, Src, Docs, Req, Db) -> +filter_docs(Req, Db, DDoc, FName, Docs) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db), JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs], JsonCtx = couch_util:json_user_ctx(Db), - Proc = get_os_process(Lang), - [true, Passes] = proc_prompt(Proc, - [<<"filter">>, Src, JsonDocs, JsonReq, JsonCtx]), - ret_os_process(Proc), - {ok, Passes}. + [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq, JsonCtx]), + {ok, Passes}. + +ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]). + +ddoc_prompt(DDoc, FunPath, Args) -> + with_ddoc_proc(DDoc, fun({Proc, DDocId}) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]) + end). + +with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> + Rev = couch_doc:rev_to_str({Start, DiskRev}), + DDocKey = {DDocId, Rev}, + Proc = get_ddoc_process(DDoc, DDocKey), + try Fun({Proc, DDocId}) + after + ok = ret_os_process(Proc) + end. init([]) -> - % read config and register for configuration changes % just stop if one of the config settings change. couch_server_sup @@ -282,7 +246,39 @@ init([]) -> terminate(_Reason, _Server) -> ok. - +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> + % Note to future self. Add max process limit. + Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), + case ets:lookup(LangProcs, Lang) of + [{Lang, [P|Rest]}] -> + % find a proc in the set that has the DDoc + case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of + {ok, Proc} -> + % looks like the proc isn't getting dropped from the list. + % we need to change this to take a fun for equality checking + % so we can do a comparison on portnum + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc), + {reply, {ok, Proc, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; + _ -> + case (catch new_process(Langs, Lang)) of + {ok, Proc} -> + add_value(PidProcs, Proc#proc.pid, Proc), + case proc_with_ddoc(DDoc, DDocKey, [Proc]) of + {ok, Proc2} -> + rem_from_list(LangProcs, Lang, Proc), + add_to_list(InUse, Lang, Proc2), + {reply, {ok, Proc2, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; + Error -> + {reply, Error, Server} + end + end; handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> % Note to future self. Add max process limit. case ets:lookup(LangProcs, Lang) of @@ -290,12 +286,13 @@ handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) add_value(PidProcs, Proc#proc.pid, Proc), rem_from_list(LangProcs, Lang, Proc), add_to_list(InUse, Lang, Proc), - {reply, {recycled, Proc, get_query_server_config()}, Server}; + {reply, {ok, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> + add_value(PidProcs, Proc#proc.pid, Proc), add_to_list(InUse, Lang, Proc), - {reply, {new, Proc}, Server}; + {reply, {ok, Proc, get_query_server_config()}, Server}; Error -> {reply, Error, Server} end @@ -350,6 +347,23 @@ new_process(Langs, Lang) -> {unknown_query_language, Lang} end. +proc_with_ddoc(DDoc, DDocKey, LangProcs) -> + DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) -> + lists:any(fun(Key) -> + Key == DDocKey + end, Keys) + end, LangProcs), + case DDocProcs of + [DDocProc|_] -> + ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]), + {ok, DDocProc}; + [] -> + [TeachProc|_] = LangProcs, + ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]), + {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc), + {ok, SmartProc} + end. + proc_prompt(Proc, Args) -> {Mod, Func} = Proc#proc.prompt_fun, apply(Mod, Func, [Proc#proc.pid, Args]). @@ -362,14 +376,44 @@ proc_set_timeout(Proc, Timeout) -> {Mod, Func} = Proc#proc.set_timeout_fun, apply(Mod, Func, [Proc#proc.pid, Timeout]). +teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> + % send ddoc over the wire + % we only share the rev with the client we know to update code + % but it only keeps the latest copy, per each ddoc, around. + true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), + % we should remove any other ddocs keys for this docid + % because the query server overwrites without the rev + Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + % add ddoc to the proc + {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. + +get_ddoc_process(#doc{} = DDoc, DDocKey) -> + % remove this case statement + case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of + {ok, Proc, QueryConfig} -> + % process knows the ddoc + case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of + true -> + proc_set_timeout(Proc, list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000"))), + link(Proc#proc.pid), + Proc; + _ -> + catch proc_stop(Proc), + get_ddoc_process(DDoc, DDocKey) + end; + Error -> + throw(Error) + end. + +ret_ddoc_process(Proc) -> + true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + catch unlink(Proc#proc.pid), + ok. + get_os_process(Lang) -> case gen_server:call(couch_query_servers, {get_proc, Lang}) of - {new, Proc} -> - proc_set_timeout(Proc, list_to_integer(couch_config:get( - "couchdb", "os_process_timeout", "5000"))), - link(Proc#proc.pid), - Proc; - {recycled, Proc, QueryConfig} -> + {ok, Proc, QueryConfig} -> case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of true -> proc_set_timeout(Proc, list_to_integer(couch_config:get( @@ -403,9 +447,20 @@ add_to_list(Tid, Key, Value) -> true = ets:insert(Tid, {Key, [Value]}) end. +rem_from_list(Tid, Key, Value) when is_record(Value, proc)-> + Pid = Value#proc.pid, + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + % make a new values list that doesn't include the Value arg + NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid], + ets:insert(Tid, {Key, NewValues}); + [] -> ok + end; rem_from_list(Tid, Key, Value) -> case ets:lookup(Tid, Key) of [{Key, Vals}] -> - ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]}); + % make a new values list that doesn't include the Value arg + NewValues = [Val || Val <- Vals, Val /= Value], + ets:insert(Tid, {Key, NewValues}); [] -> ok end. |