From b8b2a9de28f4e74864f69e7681088d871e71820f Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 6 Sep 2010 18:10:16 -0700 Subject: support for filtered _changes on multi-node clusters, closes #5 --- apps/chttpd/src/chttpd_db.erl | 2 +- apps/couch/src/couch_changes.erl | 51 ++++++++++++++++++++++++++------- apps/fabric/src/fabric_rpc.erl | 4 +-- apps/fabric/src/fabric_view_changes.erl | 6 ++-- 4 files changed, 46 insertions(+), 17 deletions(-) (limited to 'apps') diff --git a/apps/chttpd/src/chttpd_db.erl b/apps/chttpd/src/chttpd_db.erl index 29571c9f..fc219621 100644 --- a/apps/chttpd/src/chttpd_db.erl +++ b/apps/chttpd/src/chttpd_db.erl @@ -50,7 +50,7 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, handle_changes_req(#httpd{method='GET'}=Req, Db) -> #changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req), ChangesArgs = Args0#changes_args{ - filter = couch_changes:make_filter_fun(Raw, Style, Req, Db) + filter = couch_changes:configure_filter(Raw, Style, Req, Db) }, case ChangesArgs#changes_args.feed of "normal" -> diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index b2848c88..0ca42f86 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -13,8 +13,8 @@ -module(couch_changes). -include("couch_db.hrl"). --export([handle_changes/3, get_changes_timeout/2, main_only_filter/1, - all_docs_filter/1, get_rest_db_updated/0, make_filter_fun/4]). +-export([handle_changes/3, get_changes_timeout/2, get_rest_db_updated/0, + configure_filter/4, filter/2]). %% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any() handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> @@ -102,16 +102,47 @@ make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) end; -make_filter_fun(_, main_only, _, _) -> - fun ?MODULE:main_only_filter/1; -make_filter_fun(_, all_docs, _, _) -> - fun ?MODULE:all_docs_filter/1. +make_filter_fun(_, Style, _, _) -> + fun(DI) -> ?MODULE:filter(DI, Style) end. -main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]. +configure_filter(Filter, Style, Req, Db) when is_list(Filter) -> + case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of + [] -> + % fall back to standard filter + Style; + [DName, FName] -> + JsonReq = chttpd_external:json_req_obj(Req, Db), + DesignId = <<"_design/", DName/binary>>, + DDoc = chttpd_db:couch_doc_open(Db, DesignId, nil, []), + % validate that the ddoc has the filter fun + #doc{body={Props}} = DDoc, + couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), + {custom, Style, {Db, JsonReq, DDoc, FName}}; + _Else -> + throw({bad_request, + "filter parameter must be of the form `designname/filtername`"}) + end; +configure_filter(_, Style, _, _) -> + Style. + +filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}, main_only) -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; +filter(#doc_info{revs=Revs}, all_docs) -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs]; +filter(#doc_info{id=Id, revs=RevInfos}, {custom, main_only, Acc}) -> + custom_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {custom, all_docs, Acc}) -> + custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). -all_docs_filter(#doc_info{revs=Revs}) -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs]. +custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> + {ok, Results} = fabric:open_revs(Db, Id, Revs, [deleted, conflicts]), + Docs = [Doc || {ok, Doc} <- Results], + {ok, Passes} = couch_query_servers:filter_docs({json_req,JsonReq}, Db, + DDoc, Filter, Docs), + ?LOG_INFO("filtering ~p ~p", [Id, Passes]), + [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} + || {Pass, #doc{revs={RevPos,[RevId|_]}}} + <- lists:zip(Passes, Docs), Pass == true]. get_changes_timeout(Args, Callback) -> #changes_args{ diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl index 301fd46b..a7d555e0 100644 --- a/apps/fabric/src/fabric_rpc.erl +++ b/apps/fabric/src/fabric_rpc.erl @@ -338,10 +338,10 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) -> end. changes_enumerator(DocInfo, {Db, _Seq, Args}) -> - #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args, + #changes_args{include_docs=IncludeDocs, filter=Acc} = Args, #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo, - case [Result || Result <- FilterFun(DocInfo), Result /= null] of + case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of [] -> {ok, {Db, Seq, Args}}; Results -> diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl index 63ffc02a..a4421a92 100644 --- a/apps/fabric/src/fabric_view_changes.erl +++ b/apps/fabric/src/fabric_view_changes.erl @@ -182,10 +182,8 @@ handle_message({complete, EndSeq}, Worker, State) -> end end. -make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) -> - Args#changes_args{filter = fun couch_changes:main_only_filter/1}; -make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) -> - Args#changes_args{filter = fun couch_changes:all_docs_filter/1}; +make_changes_args(#changes_args{style=Style, filter=undefined}=Args) -> + Args#changes_args{filter = Style}; make_changes_args(Args) -> Args. -- cgit v1.2.3