summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-09-06 18:10:16 -0700
committerAdam Kocoloski <adam@cloudant.com>2010-09-06 22:36:45 -0700
commitb8b2a9de28f4e74864f69e7681088d871e71820f (patch)
tree00f2a686de5b17f52b6004f6b3fb17073073680e
parenta55c40ce30d5279df93b5c475cc6718575a1e192 (diff)
support for filtered _changes on multi-node clusters, closes #5
-rw-r--r--apps/chttpd/src/chttpd_db.erl2
-rw-r--r--apps/couch/src/couch_changes.erl51
-rw-r--r--apps/fabric/src/fabric_rpc.erl4
-rw-r--r--apps/fabric/src/fabric_view_changes.erl6
4 files changed, 46 insertions, 17 deletions
diff --git a/apps/chttpd/src/chttpd_db.erl b/apps/chttpd/src/chttpd_db.erl
index 29571c9f..fc219621 100644
--- a/apps/chttpd/src/chttpd_db.erl
+++ b/apps/chttpd/src/chttpd_db.erl
@@ -50,7 +50,7 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method,
handle_changes_req(#httpd{method='GET'}=Req, Db) ->
#changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req),
ChangesArgs = Args0#changes_args{
- filter = couch_changes:make_filter_fun(Raw, Style, Req, Db)
+ filter = couch_changes:configure_filter(Raw, Style, Req, Db)
},
case ChangesArgs#changes_args.feed of
"normal" ->
diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl
index b2848c88..0ca42f86 100644
--- a/apps/couch/src/couch_changes.erl
+++ b/apps/couch/src/couch_changes.erl
@@ -13,8 +13,8 @@
-module(couch_changes).
-include("couch_db.hrl").
--export([handle_changes/3, get_changes_timeout/2, main_only_filter/1,
- all_docs_filter/1, get_rest_db_updated/0, make_filter_fun/4]).
+-export([handle_changes/3, get_changes_timeout/2, get_rest_db_updated/0,
+ configure_filter/4, filter/2]).
%% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any()
handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) ->
@@ -102,16 +102,47 @@ make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) ->
throw({bad_request,
"filter parameter must be of the form `designname/filtername`"})
end;
-make_filter_fun(_, main_only, _, _) ->
- fun ?MODULE:main_only_filter/1;
-make_filter_fun(_, all_docs, _, _) ->
- fun ?MODULE:all_docs_filter/1.
+make_filter_fun(_, Style, _, _) ->
+ fun(DI) -> ?MODULE:filter(DI, Style) end.
-main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}].
+configure_filter(Filter, Style, Req, Db) when is_list(Filter) ->
+ case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of
+ [] ->
+ % fall back to standard filter
+ Style;
+ [DName, FName] ->
+ JsonReq = chttpd_external:json_req_obj(Req, Db),
+ DesignId = <<"_design/", DName/binary>>,
+ DDoc = chttpd_db:couch_doc_open(Db, DesignId, nil, []),
+ % validate that the ddoc has the filter fun
+ #doc{body={Props}} = DDoc,
+ couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
+ {custom, Style, {Db, JsonReq, DDoc, FName}};
+ _Else ->
+ throw({bad_request,
+ "filter parameter must be of the form `designname/filtername`"})
+ end;
+configure_filter(_, Style, _, _) ->
+ Style.
+
+filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}, main_only) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+filter(#doc_info{revs=Revs}, all_docs) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs];
+filter(#doc_info{id=Id, revs=RevInfos}, {custom, main_only, Acc}) ->
+ custom_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc);
+filter(#doc_info{id=Id, revs=RevInfos}, {custom, all_docs, Acc}) ->
+ custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc).
-all_docs_filter(#doc_info{revs=Revs}) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs].
+custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) ->
+ {ok, Results} = fabric:open_revs(Db, Id, Revs, [deleted, conflicts]),
+ Docs = [Doc || {ok, Doc} <- Results],
+ {ok, Passes} = couch_query_servers:filter_docs({json_req,JsonReq}, Db,
+ DDoc, Filter, Docs),
+ ?LOG_INFO("filtering ~p ~p", [Id, Passes]),
+ [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+ || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+ <- lists:zip(Passes, Docs), Pass == true].
get_changes_timeout(Args, Callback) ->
#changes_args{
diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl
index 301fd46b..a7d555e0 100644
--- a/apps/fabric/src/fabric_rpc.erl
+++ b/apps/fabric/src/fabric_rpc.erl
@@ -338,10 +338,10 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) ->
end.
changes_enumerator(DocInfo, {Db, _Seq, Args}) ->
- #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args,
+ #changes_args{include_docs=IncludeDocs, filter=Acc} = Args,
#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
= DocInfo,
- case [Result || Result <- FilterFun(DocInfo), Result /= null] of
+ case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of
[] ->
{ok, {Db, Seq, Args}};
Results ->
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl
index 63ffc02a..a4421a92 100644
--- a/apps/fabric/src/fabric_view_changes.erl
+++ b/apps/fabric/src/fabric_view_changes.erl
@@ -182,10 +182,8 @@ handle_message({complete, EndSeq}, Worker, State) ->
end
end.
-make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) ->
- Args#changes_args{filter = fun couch_changes:main_only_filter/1};
-make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) ->
- Args#changes_args{filter = fun couch_changes:all_docs_filter/1};
+make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
+ Args#changes_args{filter = Style};
make_changes_args(Args) ->
Args.