diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 16:03:51 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 16:03:51 -0400 |
commit | ab14b5bfdb88d9f07b6885f4cc1208f80c8c72f4 (patch) | |
tree | 9b3676511006d715965631bf48ec54f509900f1b | |
parent | 38bde53e2429f60b3209d71c562218f3c7429945 (diff) |
map views w/ keys, also forgot to git add stuff, BugzID 10220
-rw-r--r-- | include/fabric.hrl | 1 | ||||
-rw-r--r-- | src/fabric_dict.erl | 32 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 17 | ||||
-rw-r--r-- | src/fabric_view.erl | 153 | ||||
-rw-r--r-- | src/fabric_view_map.erl | 148 |
5 files changed, 349 insertions, 2 deletions
diff --git a/include/fabric.hrl b/include/fabric.hrl index fa8319b4..460cf578 100644 --- a/include/fabric.hrl +++ b/include/fabric.hrl @@ -22,6 +22,7 @@ skip, limit, stop_fun, + keydict, user_acc }). diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl new file mode 100644 index 00000000..a4191682 --- /dev/null +++ b/src/fabric_dict.erl @@ -0,0 +1,32 @@ +-module(fabric_dict). +-compile(export_all). + +% Instead of ets, let's use an ordered keylist. We'll need to revisit if we +% have >> 100 shards, so a private interface is a good idea. - APK June 2010 + +init(Keys, InitialValue) -> + orddict:from_list([{Key, InitialValue} || Key <- Keys]). + + +decrement_all(Dict) -> + [{K,V-1} || {K,V} <- Dict]. + +erase(Key, Dict) -> + orddict:erase(Key, Dict). + +update_counter(Key, Incr, Dict0) -> + orddict:update_counter(Key, Incr, Dict0). + + +lookup_element(Key, Dict) -> + couch_util:get_value(Key, Dict). + + +any(Value, Dict) -> + lists:keymember(Value, 2, Dict). + +filter(Fun, Dict) -> + orddict:filter(Fun, Dict). + +fold(Fun, Acc0, Dict) -> + orddict:fold(Fun, Acc0, Dict). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 0eab5e6d..85c01906 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -42,13 +42,14 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> {ok, Acc} = couch_db:enum_docs(Db, StartId, Dir, fun view_fold/3, Acc0), final_response(Total, Acc#view_acc.offset). -map_view(DbName, DDoc, ViewName, #view_query_args{keys=nil} = QueryArgs) -> +map_view(DbName, DDoc, ViewName, QueryArgs) -> {ok, Db} = couch_db:open(DbName, []), #view_query_args{ start_key = StartKey, start_docid = StartDocId, limit = Limit, skip = Skip, + keys = Keys, include_docs = IncludeDocs, direction = Dir, stale = Stale, @@ -69,7 +70,19 @@ map_view(DbName, DDoc, ViewName, #view_query_args{keys=nil} = QueryArgs) -> reduce_fun = fun couch_view:reduce_to_count/1, stop_fun = default_stop_fun(QueryArgs) }, - {ok, Acc} = couch_view:fold(View, Start, Dir, fun view_fold/3, Acc0), + case Keys of + nil -> + {ok, Acc} = couch_view:fold(View, Start, Dir, fun view_fold/3, Acc0); + _ -> + Acc = lists:foldl(fun(Key, AccIn) -> + KeyStart = {Key, StartDocId}, + KeyStop = default_stop_fun(QueryArgs#view_query_args{start_key=Key, + end_key=Key}), + {_Go, Out} = couch_view:fold(View, KeyStart, Dir, fun view_fold/3, + AccIn#view_acc{stop_fun = KeyStop}), + Out + end, Acc0, Keys) + end, final_response(Total, Acc#view_acc.offset). get_db_info(DbName) -> diff --git a/src/fabric_view.erl b/src/fabric_view.erl new file mode 100644 index 00000000..ae5ce361 --- /dev/null +++ b/src/fabric_view.erl @@ -0,0 +1,153 @@ +-module(fabric_view). + +-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, + maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1]). + +-include("fabric.hrl"). + +%% @doc looks for a fully covered keyrange in the list of counters +-spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean(). +is_progress_possible(Counters) -> + Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, + [], Counters), + [First | Rest] = lists:ukeysort(1, Ranges), + {Head, Tail} = lists:foldl(fun + (_, {Head, Tail}) when Head =:= Tail -> + % this is the success condition, we can fast-forward + {Head, Tail}; + (_, {foo, bar}) -> + % we've already declared failure + {foo, bar}; + ({X,_}, {Head, Tail}) when Head < Tail, X > Tail -> + % gap in the keyrange, we're dead + {foo, bar}; + ({X,Y}, {Head, Tail}) when Head < Tail, X < Y -> + % the normal condition, adding to the tail + {Head, erlang:max(Tail, Y)}; + ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head -> + % we've wrapped all the way around, trigger success condition + {Head, Head}; + ({X,Y}, {Head, Tail}) when Head < Tail, X > Y -> + % this wraps the keyspace, but there's still a gap. We're dead + % TODO technically, another shard could be a superset of this one, and + % we could still be alive. Pretty unlikely though, and impossible if + % we don't allow shards to wrap around the boundary + {foo, bar} + end, First, Rest), + Head =:= Tail. + +-spec remove_overlapping_shards(#shard{}, [#shard{}]) -> [#shard{}]. +remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> + fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> + if Shard =:= Shard0 -> + % we can't remove ourselves + true; + A < B, X >= A, X < B -> + % lower bound is inside our range + false; + A < B, Y > A, Y =< B -> + % upper bound is inside our range + false; + B < A, X >= A orelse B < A, X < B -> + % target shard wraps the key range, lower bound is inside + false; + B < A, Y > A orelse B < A, Y =< B -> + % target shard wraps the key range, upper bound is inside + false; + true -> + true + end + end, Shards). + +maybe_pause_worker(Worker, From, State) -> + #collector{buffer_size = BufferSize, counters = Counters} = State, + case fabric_dict:lookup_element(Worker, Counters) of + BufferSize -> + State#collector{blocked = [{Worker,From} | State#collector.blocked]}; + _Count -> + gen_server:reply(From, ok), + State + end. + +maybe_resume_worker(Worker, State) -> + #collector{buffer_size = Buffer, counters = C, blocked = B} = State, + case fabric_dict:lookup_element(Worker, C) of + Count when Count < Buffer/2 -> + case couch_util:get_value(Worker, B) of + undefined -> + State; + From -> + gen_server:reply(From, ok), + State#collector{blocked = lists:keydelete(Worker, 1, B)} + end; + _Other -> + State + end. + +maybe_send_row(#collector{limit=0} = State) -> + #collector{user_acc=AccIn, callback=Callback} = State, + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; +maybe_send_row(State) -> + #collector{ + callback = Callback, + counters = Counters, + skip = Skip, + limit = Limit, + user_acc = AccIn + } = State, + case fabric_dict:any(0, Counters) of + true -> + {ok, State}; + false -> + case get_next_row(State) of + complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; + {_, NewState} when Skip > 0 -> + maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); + {Row, NewState} -> + case Callback(transform_row(Row), AccIn) of + {stop, Acc} -> + {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; + {ok, Acc} -> + maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) + end + end + end. + +keydict(nil) -> + undefined; +keydict(Keys) -> + {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, + {dict:new(),0}, Keys), + Dict. + +%% internal %% + +get_next_row(#collector{rows = []}) -> + complete; +get_next_row(State) -> + #collector{ + rows = [Row|Rest], + counters = Counters0, + stop_fun = Stop + } = State, + Worker = Row#view_row.worker, + Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), + NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), + case Stop(Row) of + true -> + complete; + false -> + {Row, NewState#collector{rows = Rest}} + end. + +transform_row(#view_row{key=Key, id=undefined}) -> + {row, {[{key,Key}, {error,not_found}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> + {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}. diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl new file mode 100644 index 00000000..6ec7dfde --- /dev/null +++ b/src/fabric_view_map.erl @@ -0,0 +1,148 @@ +-module(fabric_view_map). + +-export([go/5]). + +-include("fabric.hrl"). + +go(DbName, {GroupId, View}, Args, Callback, Acc0) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), + Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> + Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}), + Shard#shard{ref = Ref} + end, partitions:all_parts(DbName)), + BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), + #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, + State = #collector{ + query_args = Args, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + stop_fun = stop_fun(Args), + keydict = fabric_view:keydict(Keys), + user_acc = Acc0 + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + Error -> + Error + after + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate + gen_server:reply(From, stop), + {ok, State}; + 0 -> + gen_server:reply(From, ok), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case fabric_dict:any(0, Counters2) of + true -> + {ok, State#collector{ + counters = Counters2, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters2), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end + end; + +handle_message(#view_row{} = Row, {Worker, From}, State) -> + #collector{ + query_args = #view_query_args{direction=Dir}, + counters = Counters0, + rows = Rows0, + keydict = KeyDict + } = State, + Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2); + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). + +stop_fun(#view_query_args{} = QueryArgs) -> + #view_query_args{ + direction = Dir, + inclusive_end = Inclusive, + end_key = EndKey, + end_docid = EndDocId + } = QueryArgs, + stop_fun(Dir, Inclusive, EndKey, EndDocId). + +stop_fun(fwd, true, EndKey, EndDocId) -> + fun(#view_row{key=Key, id=Id}) -> + couch_view:less_json([EndKey, EndDocId], [Key, Id]) + end; +stop_fun(fwd, false, EndKey, EndDocId) -> + fun(#view_row{key=K}) when K==EndKey -> true; (#view_row{key=Key, id=Id}) -> + couch_view:less_json([EndKey, EndDocId], [Key, Id]) + end; +stop_fun(rev, true, EndKey, EndDocId) -> + fun(#view_row{key=Key, id=Id}) -> + couch_view:less_json([Key, Id], [EndKey, EndDocId]) + end; +stop_fun(rev, false, EndKey, EndDocId) -> + fun(#view_row{key=K}) when K==EndKey -> true; (#view_row{key=Key, id=Id}) -> + couch_view:less_json([Key, Id], [EndKey, EndDocId]) + end. + +merge_row(fwd, undefined, Row, Rows) -> + lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> + couch_view:less_json([KeyA, IdA], [KeyB, IdB]) + end, [Row], Rows); +merge_row(rev, undefined, Row, Rows) -> + lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> + couch_view:less_json([KeyB, IdB], [KeyA, IdA]) + end, [Row], Rows); +merge_row(_, KeyDict, Row, Rows) -> + lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> + if A =:= B -> IdA < IdB; true -> + dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) + end + end, [Row], Rows). + |