diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 16:03:51 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 16:03:51 -0400 |
commit | ab14b5bfdb88d9f07b6885f4cc1208f80c8c72f4 (patch) | |
tree | 9b3676511006d715965631bf48ec54f509900f1b /src/fabric_view.erl | |
parent | 38bde53e2429f60b3209d71c562218f3c7429945 (diff) |
map views w/ keys, also forgot to git add stuff, BugzID 10220
Diffstat (limited to 'src/fabric_view.erl')
-rw-r--r-- | src/fabric_view.erl | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/src/fabric_view.erl b/src/fabric_view.erl new file mode 100644 index 00000000..ae5ce361 --- /dev/null +++ b/src/fabric_view.erl @@ -0,0 +1,153 @@ +-module(fabric_view). + +-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, + maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1]). + +-include("fabric.hrl"). + +%% @doc looks for a fully covered keyrange in the list of counters +-spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean(). +is_progress_possible(Counters) -> + Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, + [], Counters), + [First | Rest] = lists:ukeysort(1, Ranges), + {Head, Tail} = lists:foldl(fun + (_, {Head, Tail}) when Head =:= Tail -> + % this is the success condition, we can fast-forward + {Head, Tail}; + (_, {foo, bar}) -> + % we've already declared failure + {foo, bar}; + ({X,_}, {Head, Tail}) when Head < Tail, X > Tail -> + % gap in the keyrange, we're dead + {foo, bar}; + ({X,Y}, {Head, Tail}) when Head < Tail, X < Y -> + % the normal condition, adding to the tail + {Head, erlang:max(Tail, Y)}; + ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head -> + % we've wrapped all the way around, trigger success condition + {Head, Head}; + ({X,Y}, {Head, Tail}) when Head < Tail, X > Y -> + % this wraps the keyspace, but there's still a gap. We're dead + % TODO technically, another shard could be a superset of this one, and + % we could still be alive. Pretty unlikely though, and impossible if + % we don't allow shards to wrap around the boundary + {foo, bar} + end, First, Rest), + Head =:= Tail. + +-spec remove_overlapping_shards(#shard{}, [#shard{}]) -> [#shard{}]. +remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> + fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> + if Shard =:= Shard0 -> + % we can't remove ourselves + true; + A < B, X >= A, X < B -> + % lower bound is inside our range + false; + A < B, Y > A, Y =< B -> + % upper bound is inside our range + false; + B < A, X >= A orelse B < A, X < B -> + % target shard wraps the key range, lower bound is inside + false; + B < A, Y > A orelse B < A, Y =< B -> + % target shard wraps the key range, upper bound is inside + false; + true -> + true + end + end, Shards). + +maybe_pause_worker(Worker, From, State) -> + #collector{buffer_size = BufferSize, counters = Counters} = State, + case fabric_dict:lookup_element(Worker, Counters) of + BufferSize -> + State#collector{blocked = [{Worker,From} | State#collector.blocked]}; + _Count -> + gen_server:reply(From, ok), + State + end. + +maybe_resume_worker(Worker, State) -> + #collector{buffer_size = Buffer, counters = C, blocked = B} = State, + case fabric_dict:lookup_element(Worker, C) of + Count when Count < Buffer/2 -> + case couch_util:get_value(Worker, B) of + undefined -> + State; + From -> + gen_server:reply(From, ok), + State#collector{blocked = lists:keydelete(Worker, 1, B)} + end; + _Other -> + State + end. + +maybe_send_row(#collector{limit=0} = State) -> + #collector{user_acc=AccIn, callback=Callback} = State, + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; +maybe_send_row(State) -> + #collector{ + callback = Callback, + counters = Counters, + skip = Skip, + limit = Limit, + user_acc = AccIn + } = State, + case fabric_dict:any(0, Counters) of + true -> + {ok, State}; + false -> + case get_next_row(State) of + complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; + {_, NewState} when Skip > 0 -> + maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); + {Row, NewState} -> + case Callback(transform_row(Row), AccIn) of + {stop, Acc} -> + {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; + {ok, Acc} -> + maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) + end + end + end. + +keydict(nil) -> + undefined; +keydict(Keys) -> + {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, + {dict:new(),0}, Keys), + Dict. + +%% internal %% + +get_next_row(#collector{rows = []}) -> + complete; +get_next_row(State) -> + #collector{ + rows = [Row|Rest], + counters = Counters0, + stop_fun = Stop + } = State, + Worker = Row#view_row.worker, + Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), + NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), + case Stop(Row) of + true -> + complete; + false -> + {Row, NewState#collector{rows = Rest}} + end. + +transform_row(#view_row{key=Key, id=undefined}) -> + {row, {[{key,Key}, {error,not_found}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}. |