diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 23:44:33 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 23:45:18 -0400 |
commit | 2569631e249cc8209858f590a349f314b7253f3e (patch) | |
tree | f5159200982c26f4c8b9b364357e295f1a17c662 /src/fabric_view.erl | |
parent | ab14b5bfdb88d9f07b6885f4cc1208f80c8c72f4 (diff) |
reduce views, BugzID 10220
Diffstat (limited to 'src/fabric_view.erl')
-rw-r--r-- | src/fabric_view.erl | 88 |
1 files changed, 81 insertions, 7 deletions
diff --git a/src/fabric_view.erl b/src/fabric_view.erl index ae5ce361..70dedf27 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -1,7 +1,8 @@ -module(fabric_view). -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, - maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1]). + maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, + extract_view/4]). -include("fabric.hrl"). @@ -100,10 +101,7 @@ maybe_send_row(State) -> true -> {ok, State}; false -> - case get_next_row(State) of - complete -> - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; + try get_next_row(State) of {_, NewState} when Skip > 0 -> maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); {Row, NewState} -> @@ -113,6 +111,9 @@ maybe_send_row(State) -> {ok, Acc} -> maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) end + catch complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}} end end. @@ -125,8 +126,31 @@ keydict(Keys) -> %% internal %% +get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> + #collector{ + query_args = #view_query_args{direction=Dir}, + keys = Keys, + rows = RowDict, + os_proc = Proc, + counters = Counters0 + } = St, + {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), + case dict:find(Key, RowDict) of + {ok, Records} -> + NewRowDict = dict:erase(Key, RowDict), + Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> + fabric_dict:update_counter(Worker, -1, CountersAcc) + end, Counters0, Records), + Wrapped = [[V] || #view_row{value=V} <- Records], + {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), + NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, + {#view_row{key=Key, id=reduced, value=Reduced}, NewSt}; + error -> + NewSt = St#collector{keys=RestKeys}, + {#view_row{key=Key, id=reduced, value={error, missing}}, NewSt} + end; get_next_row(#collector{rows = []}) -> - complete; + throw(complete); get_next_row(State) -> #collector{ rows = [Row|Rest], @@ -138,11 +162,25 @@ get_next_row(State) -> NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), case Stop(Row) of true -> - complete; + throw(complete); false -> {Row, NewState#collector{rows = Rest}} end. +find_next_key(undefined, Dir, RowDict) -> + case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of + [] -> + throw(complete); + [Key|_] -> + {Key, undefined} + end; +find_next_key([], _, _) -> + throw(complete); +find_next_key([Key|Rest], _, _) -> + {Key, Rest}. + +transform_row(#view_row{key=Key, id=reduced, value=Value}) -> + {row, {[{key,Key}, {value,Value}]}}; transform_row(#view_row{key=Key, id=undefined}) -> {row, {[{key,Key}, {error,not_found}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> @@ -151,3 +189,39 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}. + +sort_fun(fwd) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; +sort_fun(rev) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. + +extract_view(Pid, ViewName, [], _ViewType) -> + ?LOG_ERROR("missing_named_view ~p", [ViewName]), + exit(Pid, kill), + exit(missing_named_view); +extract_view(Pid, ViewName, [View|Rest], ViewType) -> + case lists:member(ViewName, view_names(View, ViewType)) of + true -> + if ViewType == reduce -> + {index_of(ViewName, view_names(View, reduce)), View}; + true -> + View + end; + false -> + extract_view(Pid, ViewName, Rest, ViewType) + end. + +view_names(View, Type) when Type == red_map; Type == reduce -> + [Name || {Name, _} <- View#view.reduce_funs]; +view_names(View, map) -> + View#view.map_names. + +index_of(X, List) -> + index_of(X, List, 1). + +index_of(_X, [], _I) -> + not_found; +index_of(X, [X|_Rest], I) -> + I; +index_of(X, [_|Rest], I) -> + index_of(X, Rest, I+1). |