summaryrefslogtreecommitdiff
path: root/deps/fabric/src/fabric_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/fabric/src/fabric_view.erl')
-rw-r--r--deps/fabric/src/fabric_view.erl362
1 files changed, 362 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric_view.erl b/deps/fabric/src/fabric_view.erl
new file mode 100644
index 00000000..fa2127e7
--- /dev/null
+++ b/deps/fabric/src/fabric_view.erl
@@ -0,0 +1,362 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view).
+
+-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
+ maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
+ extract_view/4, get_shards/2, remove_down_shards/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-spec remove_down_shards(#collector{}, node()) ->
+ {ok, #collector{}} | {error, any()}.
+remove_down_shards(Collector, BadNode) ->
+ #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector,
+ case fabric_util:remove_down_workers(Counters, BadNode) of
+ {ok, NewCounters} ->
+ {ok, Collector#collector{counters = NewCounters}};
+ error ->
+ Reason = {nodedown, <<"progress not possible">>},
+ Callback({error, Reason}, Acc)
+ end.
+
+%% @doc looks for a fully covered keyrange in the list of counters
+-spec is_progress_possible([{#shard{}, term()}]) -> boolean().
+is_progress_possible([]) ->
+ false;
+is_progress_possible(Counters) ->
+ Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
+ [], Counters),
+ [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
+ Result = lists:foldl(fun
+ (_, fail) ->
+ % we've already declared failure
+ fail;
+ (_, complete) ->
+ % this is the success condition, we can fast-forward
+ complete;
+ ({X,_}, Tail) when X > (Tail+1) ->
+ % gap in the keyrange, we're dead
+ fail;
+ ({_,Y}, Tail) ->
+ case erlang:max(Tail, Y) of
+ End when (End+1) =:= (2 bsl 31) ->
+ complete;
+ Else ->
+ % the normal condition, adding to the tail
+ Else
+ end
+ end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
+ (Start =:= 0) andalso (Result =:= complete).
+
+-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
+ [{#shard{}, any()}].
+remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
+ fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) ->
+ if Shard =:= Shard0 ->
+ % we can't remove ourselves
+ true;
+ A < B, X >= A, X < B ->
+ % lower bound is inside our range
+ rexi:kill(Node, Ref),
+ false;
+ A < B, Y > A, Y =< B ->
+ % upper bound is inside our range
+ rexi:kill(Node, Ref),
+ false;
+ B < A, X >= A orelse B < A, X < B ->
+ % target shard wraps the key range, lower bound is inside
+ rexi:kill(Node, Ref),
+ false;
+ B < A, Y > A orelse B < A, Y =< B ->
+ % target shard wraps the key range, upper bound is inside
+ rexi:kill(Node, Ref),
+ false;
+ true ->
+ true
+ end
+ end, Shards).
+
+maybe_pause_worker(Worker, From, State) ->
+ #collector{buffer_size = BufferSize, counters = Counters} = State,
+ case fabric_dict:lookup_element(Worker, Counters) of
+ BufferSize ->
+ State#collector{blocked = [{Worker,From} | State#collector.blocked]};
+ _Count ->
+ gen_server:reply(From, ok),
+ State
+ end.
+
+maybe_resume_worker(Worker, State) ->
+ #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
+ case fabric_dict:lookup_element(Worker, C) of
+ Count when Count < Buffer/2 ->
+ case couch_util:get_value(Worker, B) of
+ undefined ->
+ State;
+ From ->
+ gen_server:reply(From, ok),
+ State#collector{blocked = lists:keydelete(Worker, 1, B)}
+ end;
+ _Other ->
+ State
+ end.
+
+maybe_send_row(#collector{limit=0} = State) ->
+ #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
+ case fabric_dict:any(0, Counters) of
+ true ->
+ % we still need to send the total/offset header
+ {ok, State};
+ false ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
+ end;
+maybe_send_row(State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters,
+ skip = Skip,
+ limit = Limit,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:any(0, Counters) of
+ true ->
+ {ok, State};
+ false ->
+ try get_next_row(State) of
+ {_, NewState} when Skip > 0 ->
+ maybe_send_row(NewState#collector{skip=Skip-1});
+ {Row, NewState} ->
+ case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+ {stop, Acc} ->
+ {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
+ {ok, Acc} ->
+ maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
+ end
+ catch complete ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
+ end
+ end.
+
+%% if include_docs=true is used when keys and
+%% the values contain "_id" then use the "_id"s
+%% to retrieve documents and embed in result
+possibly_embed_doc(_State,
+ #view_row{id=reduced}=Row) ->
+ Row;
+possibly_embed_doc(_State,
+ #view_row{value=undefined}=Row) ->
+ Row;
+possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
+ #view_row{key=_Key, id=_Id, value=Value, doc=_Doc}=Row) ->
+ #view_query_args{include_docs=IncludeDocs} = Args,
+ case IncludeDocs andalso is_tuple(Value) of
+ true ->
+ {Props} = Value,
+ Rev0 = couch_util:get_value(<<"_rev">>, Props),
+ case couch_util:get_value(<<"_id">>,Props) of
+ undefined -> Row;
+ IncId ->
+ % use separate process to call fabric:open_doc
+ % to not interfere with current call
+ {Pid, Ref} = spawn_monitor(fun() ->
+ exit(
+ case Rev0 of
+ undefined ->
+ case fabric:open_doc(DbName, IncId, []) of
+ {ok, NewDoc} ->
+ Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
+ {not_found, _} ->
+ Row#view_row{doc=null}
+ end;
+ Rev0 ->
+ Rev = couch_doc:parse_rev(Rev0),
+ case fabric:open_revs(DbName, IncId, [Rev], []) of
+ {ok, [{ok, NewDoc}]} ->
+ Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
+ {ok, [{{not_found, _}, Rev}]} ->
+ Row#view_row{doc=null}
+ end
+ end) end),
+ receive {'DOWN',Ref,process,Pid, Resp} ->
+ Resp
+ end
+ end;
+ _ -> Row
+ end.
+
+
+keydict(nil) ->
+ undefined;
+keydict(Keys) ->
+ {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
+ {dict:new(),0}, Keys),
+ Dict.
+
+%% internal %%
+
+get_next_row(#collector{rows = []}) ->
+ throw(complete);
+get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ keys = Keys,
+ rows = RowDict,
+ os_proc = Proc,
+ counters = Counters0
+ } = St,
+ {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
+ case dict:find(Key, RowDict) of
+ {ok, Records} ->
+ NewRowDict = dict:erase(Key, RowDict),
+ Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
+ fabric_dict:update_counter(Worker, -1, CountersAcc)
+ end, Counters0, Records),
+ Wrapped = [[V] || #view_row{value=V} <- Records],
+ {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
+ NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
+ NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) ->
+ maybe_resume_worker(Worker, StateAcc)
+ end, NewSt, Records),
+ {#view_row{key=Key, id=reduced, value=Reduced}, NewState};
+ error ->
+ get_next_row(St#collector{keys=RestKeys})
+ end;
+get_next_row(State) ->
+ #collector{rows = [Row|Rest], counters = Counters0} = State,
+ Worker = Row#view_row.worker,
+ Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
+ NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
+ {Row, NewState#collector{rows = Rest}}.
+
+find_next_key(nil, Dir, RowDict) ->
+ case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
+ [] ->
+ throw(complete);
+ [Key|_] ->
+ {Key, nil}
+ end;
+find_next_key([], _, _) ->
+ throw(complete);
+find_next_key([Key|Rest], _, _) ->
+ {Key, Rest}.
+
+transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
+ {row, {[{key,Key}, {value,Value}]}};
+transform_row(#view_row{key=Key, id=undefined}) ->
+ {row, {[{key,Key}, {error,not_found}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}.
+
+
+sort_fun(fwd) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
+sort_fun(rev) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
+
+extract_view(Pid, ViewName, [], _ViewType) ->
+ twig:log(error, "missing_named_view ~p", [ViewName]),
+ exit(Pid, kill),
+ exit(missing_named_view);
+extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+ case lists:member(ViewName, view_names(View, ViewType)) of
+ true ->
+ if ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
+ end.
+
+view_names(View, Type) when Type == red_map; Type == reduce ->
+ [Name || {Name, _} <- View#view.reduce_funs];
+view_names(View, map) ->
+ View#view.map_names.
+
+index_of(X, List) ->
+ index_of(X, List, 1).
+
+index_of(_X, [], _I) ->
+ not_found;
+index_of(X, [X|_Rest], I) ->
+ I;
+index_of(X, [_|Rest], I) ->
+ index_of(X, Rest, I+1).
+
+get_shards(DbName, #view_query_args{stale=Stale})
+ when Stale == ok orelse Stale == update_after ->
+ mem3:ushards(DbName);
+get_shards(DbName, #view_query_args{stale=false}) ->
+ mem3:shards(DbName).
+
+% unit test
+is_progress_possible_test() ->
+ EndPoint = 2 bsl 31,
+ T1 = [[0, EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T1)),true),
+ T2 = [[0,10],[11,20],[21,EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T2)),true),
+ % gap
+ T3 = [[0,10],[12,EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T3)),false),
+ % outside range
+ T4 = [[1,10],[11,20],[21,EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T4)),false),
+ % outside range
+ T5 = [[0,10],[11,20],[21,EndPoint]],
+ ?assertEqual(is_progress_possible(mk_cnts(T5)),false).
+
+remove_overlapping_shards_test() ->
+ EndPoint = 2 bsl 31,
+ T1 = [[0,10],[11,20],[21,EndPoint-1]],
+ Shards = mk_cnts(T1,3),
+ ?assertEqual(orddict:size(
+ remove_overlapping_shards(#shard{name=list_to_atom("node-3"),
+ node=list_to_atom("node-3"),
+ range=[11,20]},
+ Shards)),7).
+
+mk_cnts(Ranges) ->
+ Shards = lists:map(fun(Range) ->
+ #shard{range=Range}
+ end,
+ Ranges),
+ orddict:from_list([{Shard,nil} || Shard <- Shards]).
+
+mk_cnts(Ranges, NoNodes) ->
+ orddict:from_list([{Shard,nil}
+ || Shard <-
+ lists:flatten(lists:map(
+ fun(Range) ->
+ mk_shards(NoNodes,Range,[])
+ end, Ranges))]
+ ).
+
+mk_shards(0,_Range,Shards) ->
+ Shards;
+mk_shards(NoNodes,Range,Shards) ->
+ NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)),
+ mk_shards(NoNodes-1,Range,
+ [#shard{name=NodeName, node=NodeName, range=Range} | Shards]).