summaryrefslogtreecommitdiff
path: root/src/fabric_view.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-07 23:44:33 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-07 23:45:18 -0400
commit2569631e249cc8209858f590a349f314b7253f3e (patch)
treef5159200982c26f4c8b9b364357e295f1a17c662 /src/fabric_view.erl
parentab14b5bfdb88d9f07b6885f4cc1208f80c8c72f4 (diff)
reduce views, BugzID 10220
Diffstat (limited to 'src/fabric_view.erl')
-rw-r--r--src/fabric_view.erl88
1 files changed, 81 insertions, 7 deletions
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index ae5ce361..70dedf27 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -1,7 +1,8 @@
-module(fabric_view).
-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
- maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1]).
+ maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
+ extract_view/4]).
-include("fabric.hrl").
@@ -100,10 +101,7 @@ maybe_send_row(State) ->
true ->
{ok, State};
false ->
- case get_next_row(State) of
- complete ->
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}};
+ try get_next_row(State) of
{_, NewState} when Skip > 0 ->
maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1});
{Row, NewState} ->
@@ -113,6 +111,9 @@ maybe_send_row(State) ->
{ok, Acc} ->
maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
end
+ catch complete ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
end
end.
@@ -125,8 +126,31 @@ keydict(Keys) ->
%% internal %%
+get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ keys = Keys,
+ rows = RowDict,
+ os_proc = Proc,
+ counters = Counters0
+ } = St,
+ {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
+ case dict:find(Key, RowDict) of
+ {ok, Records} ->
+ NewRowDict = dict:erase(Key, RowDict),
+ Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
+ fabric_dict:update_counter(Worker, -1, CountersAcc)
+ end, Counters0, Records),
+ Wrapped = [[V] || #view_row{value=V} <- Records],
+ {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
+ NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
+ {#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
+ error ->
+ NewSt = St#collector{keys=RestKeys},
+ {#view_row{key=Key, id=reduced, value={error, missing}}, NewSt}
+ end;
get_next_row(#collector{rows = []}) ->
- complete;
+ throw(complete);
get_next_row(State) ->
#collector{
rows = [Row|Rest],
@@ -138,11 +162,25 @@ get_next_row(State) ->
NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
case Stop(Row) of
true ->
- complete;
+ throw(complete);
false ->
{Row, NewState#collector{rows = Rest}}
end.
+find_next_key(undefined, Dir, RowDict) ->
+ case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
+ [] ->
+ throw(complete);
+ [Key|_] ->
+ {Key, undefined}
+ end;
+find_next_key([], _, _) ->
+ throw(complete);
+find_next_key([Key|Rest], _, _) ->
+ {Key, Rest}.
+
+transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
+ {row, {[{key,Key}, {value,Value}]}};
transform_row(#view_row{key=Key, id=undefined}) ->
{row, {[{key,Key}, {error,not_found}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
@@ -151,3 +189,39 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
{row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
{row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}.
+
+sort_fun(fwd) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
+sort_fun(rev) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
+
+extract_view(Pid, ViewName, [], _ViewType) ->
+ ?LOG_ERROR("missing_named_view ~p", [ViewName]),
+ exit(Pid, kill),
+ exit(missing_named_view);
+extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+ case lists:member(ViewName, view_names(View, ViewType)) of
+ true ->
+ if ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
+ end.
+
+view_names(View, Type) when Type == red_map; Type == reduce ->
+ [Name || {Name, _} <- View#view.reduce_funs];
+view_names(View, map) ->
+ View#view.map_names.
+
+index_of(X, List) ->
+ index_of(X, List, 1).
+
+index_of(_X, [], _I) ->
+ not_found;
+index_of(X, [X|_Rest], I) ->
+ I;
+index_of(X, [_|Rest], I) ->
+ index_of(X, Rest, I+1).