diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 15:46:05 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 21:45:32 -0400 |
commit | ebac05f686b56791511cb9b599dfb5a742dcfc96 (patch) | |
tree | 00a789cd058f98fa014d927f094f9e6e9f91f6f2 /apps/fabric/src/fabric_view.erl | |
parent | 952a85381ff4b5b34426000b1dee73c9e74becdd (diff) |
use get-deps to pull down individual cloudant projects
Diffstat (limited to 'apps/fabric/src/fabric_view.erl')
-rw-r--r-- | apps/fabric/src/fabric_view.erl | 235 |
1 files changed, 0 insertions, 235 deletions
diff --git a/apps/fabric/src/fabric_view.erl b/apps/fabric/src/fabric_view.erl deleted file mode 100644 index e5f19b73..00000000 --- a/apps/fabric/src/fabric_view.erl +++ /dev/null @@ -1,235 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view). - --export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, - maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, - extract_view/4]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -%% @doc looks for a fully covered keyrange in the list of counters --spec is_progress_possible([{#shard{}, term()}]) -> boolean(). -is_progress_possible([]) -> - false; -is_progress_possible(Counters) -> - Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, - [], Counters), - [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), - Result = lists:foldl(fun - (_, fail) -> - % we've already declared failure - fail; - (_, complete) -> - % this is the success condition, we can fast-forward - complete; - ({X,_}, Tail) when X > (Tail+1) -> - % gap in the keyrange, we're dead - fail; - ({_,Y}, Tail) -> - case erlang:max(Tail, Y) of - End when (End+1) =:= (2 bsl 31) -> - complete; - Else -> - % the normal condition, adding to the tail - Else - end - end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), - (Start =:= 0) andalso (Result =:= complete). - --spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> - [{#shard{}, any()}]. -remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> - fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> - if Shard =:= Shard0 -> - % we can't remove ourselves - true; - A < B, X >= A, X < B -> - % lower bound is inside our range - false; - A < B, Y > A, Y =< B -> - % upper bound is inside our range - false; - B < A, X >= A orelse B < A, X < B -> - % target shard wraps the key range, lower bound is inside - false; - B < A, Y > A orelse B < A, Y =< B -> - % target shard wraps the key range, upper bound is inside - false; - true -> - true - end - end, Shards). - -maybe_pause_worker(Worker, From, State) -> - #collector{buffer_size = BufferSize, counters = Counters} = State, - case fabric_dict:lookup_element(Worker, Counters) of - BufferSize -> - State#collector{blocked = [{Worker,From} | State#collector.blocked]}; - _Count -> - gen_server:reply(From, ok), - State - end. - -maybe_resume_worker(Worker, State) -> - #collector{buffer_size = Buffer, counters = C, blocked = B} = State, - case fabric_dict:lookup_element(Worker, C) of - Count when Count < Buffer/2 -> - case couch_util:get_value(Worker, B) of - undefined -> - State; - From -> - gen_server:reply(From, ok), - State#collector{blocked = lists:keydelete(Worker, 1, B)} - end; - _Other -> - State - end. - -maybe_send_row(#collector{limit=0} = State) -> - #collector{user_acc=AccIn, callback=Callback} = State, - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; -maybe_send_row(State) -> - #collector{ - callback = Callback, - counters = Counters, - skip = Skip, - limit = Limit, - user_acc = AccIn - } = State, - case fabric_dict:any(0, Counters) of - true -> - {ok, State}; - false -> - try get_next_row(State) of - {_, NewState} when Skip > 0 -> - maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); - {Row, NewState} -> - case Callback(transform_row(Row), AccIn) of - {stop, Acc} -> - {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; - {ok, Acc} -> - maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) - end - catch complete -> - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}} - end - end. - -keydict(nil) -> - undefined; -keydict(Keys) -> - {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, - {dict:new(),0}, Keys), - Dict. - -%% internal %% - -get_next_row(#collector{rows = []}) -> - throw(complete); -get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> - #collector{ - query_args = #view_query_args{direction=Dir}, - keys = Keys, - rows = RowDict, - os_proc = Proc, - counters = Counters0 - } = St, - {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), - case dict:find(Key, RowDict) of - {ok, Records} -> - NewRowDict = dict:erase(Key, RowDict), - Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> - fabric_dict:update_counter(Worker, -1, CountersAcc) - end, Counters0, Records), - Wrapped = [[V] || #view_row{value=V} <- Records], - {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), - NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, - NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) -> - maybe_resume_worker(Worker, StateAcc) - end, NewSt, Records), - {#view_row{key=Key, id=reduced, value=Reduced}, NewState}; - error -> - get_next_row(St#collector{keys=RestKeys}) - end; -get_next_row(State) -> - #collector{rows = [Row|Rest], counters = Counters0} = State, - Worker = Row#view_row.worker, - Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), - NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), - {Row, NewState#collector{rows = Rest}}. - -find_next_key(nil, Dir, RowDict) -> - case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of - [] -> - throw(complete); - [Key|_] -> - {Key, nil} - end; -find_next_key([], _, _) -> - throw(complete); -find_next_key([Key|Rest], _, _) -> - {Key, Rest}. - -transform_row(#view_row{key=Key, id=reduced, value=Value}) -> - {row, {[{key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=undefined}) -> - {row, {[{key,Key}, {error,not_found}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. - -sort_fun(fwd) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; -sort_fun(rev) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. - -extract_view(Pid, ViewName, [], _ViewType) -> - ?LOG_ERROR("missing_named_view ~p", [ViewName]), - exit(Pid, kill), - exit(missing_named_view); -extract_view(Pid, ViewName, [View|Rest], ViewType) -> - case lists:member(ViewName, view_names(View, ViewType)) of - true -> - if ViewType == reduce -> - {index_of(ViewName, view_names(View, reduce)), View}; - true -> - View - end; - false -> - extract_view(Pid, ViewName, Rest, ViewType) - end. - -view_names(View, Type) when Type == red_map; Type == reduce -> - [Name || {Name, _} <- View#view.reduce_funs]; -view_names(View, map) -> - View#view.map_names. - -index_of(X, List) -> - index_of(X, List, 1). - -index_of(_X, [], _I) -> - not_found; -index_of(X, [X|_Rest], I) -> - I; -index_of(X, [_|Rest], I) -> - index_of(X, Rest, I+1). |