diff options
Diffstat (limited to 'src/fabric_view_all_docs.erl')
-rw-r--r-- | src/fabric_view_all_docs.erl | 199 |
1 files changed, 22 insertions, 177 deletions
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl index 6cdc66c5..99834286 100644 --- a/src/fabric_view_all_docs.erl +++ b/src/fabric_view_all_docs.erl @@ -16,9 +16,10 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> query_args = QueryArgs, callback = Callback, buffer_size = list_to_integer(BufferSize), - counters = init_counters(Workers), + counters = fabric_dict:init(Workers, 0), skip = Skip, limit = Limit, + stop_fun = stop_fun(QueryArgs), user_acc = Acc0 }, try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, @@ -58,8 +59,8 @@ handle_message({rexi_DOWN, _, _, _}, nil, State) -> handle_message({rexi_EXIT, _}, Worker, State) -> #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = remove(Worker, Counters0), - case is_progress_possible(Counters) of + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of true -> {ok, State#collector{counters = Counters}}; false -> @@ -75,18 +76,18 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> offset = Offset0, user_acc = AccIn } = State, - case lookup_element(Worker, Counters0) of + case fabric_dict:lookup_element(Worker, Counters0) of undefined -> % this worker lost the race with other partition copies, terminate gen_server:reply(From, stop), {ok, State}; 0 -> gen_server:reply(From, ok), - Counters1 = update_counter(Worker, 1, Counters0), - Counters2 = remove_overlapping_shards(Worker, Counters1), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), Total = Total0 + Tot, Offset = Offset0 + Off, - case waiting_on_shards(Counters2) of + case fabric_dict:any(0, Counters2) of true -> {ok, State#collector{ counters = Counters2, @@ -97,7 +98,7 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> FinalOffset = erlang:min(Total, Offset+State#collector.skip), {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), {Go, State#collector{ - counters = decrement_all_counters(Counters2), + counters = fabric_dict:decrement_all(Counters2), total_rows = Total, offset = FinalOffset, user_acc = Acc @@ -109,158 +110,30 @@ handle_message(#view_row{} = Row, {Worker, From}, State) -> #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, Dir = Args#view_query_args.direction, Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), - Counters1 = update_counter(Worker, 1, Counters0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), State1 = State#collector{rows=Rows, counters=Counters1}, - State2 = maybe_pause_worker(Worker, From, State1), - maybe_send_row(State2); + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2); handle_message(complete, Worker, State) -> - Counters = update_counter(Worker, 1, State#collector.counters), - maybe_send_row(State#collector{counters = Counters}). + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). -maybe_pause_worker(Worker, From, State) -> - #collector{buffer_size = BufferSize, counters = Counters} = State, - case lookup_element(Worker, Counters) of - BufferSize -> - State#collector{blocked = [{Worker,From} | State#collector.blocked]}; - _Count -> - gen_server:reply(From, ok), - State - end. - -maybe_resume_worker(Worker, State) -> - #collector{buffer_size = Buffer, counters = C, blocked = B} = State, - case lookup_element(Worker, C) of - Count when Count < Buffer/2 -> - case couch_util:get_value(Worker, B) of - undefined -> - State; - From -> - gen_server:reply(From, ok), - State#collector{blocked = lists:keydelete(Worker, 1, B)} - end; - _Other -> - State - end. - -maybe_send_row(#collector{limit=0} = State) -> - #collector{user_acc=AccIn, callback=Callback} = State, - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; -maybe_send_row(State) -> - #collector{ - callback = Callback, - counters = Counters, - skip = Skip, - limit = Limit, - user_acc = AccIn - } = State, - case waiting_on_shards(Counters) of - true -> - {ok, State}; - false -> - case get_next_row(State) of - complete -> - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; - {_, NewState} when Skip > 0 -> - maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); - {Row, NewState} -> - case Callback(transform_row(Row), AccIn) of - {stop, Acc} -> - {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; - {ok, Acc} -> - maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) - end - end - end. - -get_next_row(#collector{rows = []}) -> - complete; -get_next_row(State) -> - #collector{query_args=Args, rows=[Row|Rest], counters=Counters0} = State, - Worker = Row#view_row.worker, - Counters1 = update_counter(Worker, -1, Counters0), - NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), - case stop(Args, Row) of - true -> - complete; - false -> - {Row, NewState#collector{rows = Rest}} +stop_fun(#view_query_args{direction=fwd, end_key=EndKey}) -> + fun(#view_row{id=Id}) -> + couch_db_updater:less_docid(EndKey, Id) + end; +stop_fun(#view_query_args{direction=rev, end_key=EndKey}) -> + fun(#view_row{id=Id}) -> + couch_db_updater:less_docid(Id, EndKey) end. -stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) -> - couch_db_updater:less_docid(EndKey, Id); -stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) -> - couch_db_updater:less_docid(Id, EndKey). - -transform_row(#view_row{key=Key, id=undefined}) -> - {row, {[{key,Key}, {error,not_found}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> - {row, {[{key,Key}, {id,Id}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> - {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> - {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}. - merge_row(fwd, Row, Rows) -> lists:keymerge(#view_row.id, [Row], Rows); merge_row(rev, Row, Rows) -> lists:rkeymerge(#view_row.id, [Row], Rows). -remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> - filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> - if Shard =:= Shard0 -> - % we can't remove ourselves - true; - A < B, X >= A, X < B -> - % lower bound is inside our range - false; - A < B, Y > A, Y =< B -> - % upper bound is inside our range - false; - B < A, X >= A orelse B < A, X < B -> - % target shard wraps the key range, lower bound is inside - false; - B < A, Y > A orelse B < A, Y =< B -> - % target shard wraps the key range, upper bound is inside - false; - true -> - true - end - end, Shards). - -%% @doc looks for a fully covered keyrange in the list of counters --spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean(). -is_progress_possible(Counters) -> - Ranges = fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters), - [First | Rest] = lists:ukeysort(1, Ranges), - {Head, Tail} = lists:foldl(fun - (_, {Head, Tail}) when Head =:= Tail -> - % this is the success condition, we can fast-forward - {Head, Tail}; - (_, {foo, bar}) -> - % we've already declared failure - {foo, bar}; - ({X,_}, {Head, Tail}) when Head < Tail, X > Tail -> - % gap in the keyrange, we're dead - {foo, bar}; - ({X,Y}, {Head, Tail}) when Head < Tail, X < Y -> - % the normal condition, adding to the tail - {Head, erlang:max(Tail, Y)}; - ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head -> - % we've wrapped all the way around, trigger success condition - {Head, Head}; - ({X,Y}, {Head, Tail}) when Head < Tail, X > Y -> - % this wraps the keyspace, but there's still a gap. We're dead - % TODO technically, another shard could be a superset of this one, and - % we could still be alive. Pretty unlikely though, and impossible if - % we don't allow shards to wrap around the boundary - {foo, bar} - end, First, Rest), - Head =:= Tail. - doc_receive_loop([], _, _, _, Acc) -> {ok, Acc}; doc_receive_loop(_, _, 0, _, Acc) -> @@ -273,7 +146,7 @@ doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> end; doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> - case Callback(transform_row(Row), AccIn) of + case Callback(fabric_view:transform_row(Row), AccIn) of {ok, Acc} -> doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); {stop, Acc} -> @@ -300,31 +173,3 @@ open_doc(DbName, Id, IncludeDocs) -> #view_row{key=Id, id=Id, value=Value} end, exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). - - -% Instead of ets, let's use an ordered keylist. We'll need to revisit if we -% have >> 100 shards, so a private interface is a good idea. - APK June 2010 - -init_counters(Keys) -> - orddict:from_list([{Key,0} || Key <- Keys]). - -decrement_all_counters(Dict) -> - [{K,V-1} || {K,V} <- Dict]. - -update_counter(Key, Incr, Dict0) -> - orddict:update_counter(Key, Incr, Dict0). - -lookup_element(Key, Dict) -> - couch_util:get_value(Key, Dict). - -waiting_on_shards(Dict) -> - lists:keymember(0, 2, Dict). - -remove(Shard, Dict) -> - orddict:erase(Shard, Dict). - -filter(Fun, Dict) -> - orddict:filter(Fun, Dict). - -fold(Fun, Acc0, Dict) -> - orddict:fold(Fun, Acc0, Dict). |