summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-07 23:44:33 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-07 23:45:18 -0400
commit2569631e249cc8209858f590a349f314b7253f3e (patch)
treef5159200982c26f4c8b9b364357e295f1a17c662 /src
parentab14b5bfdb88d9f07b6885f4cc1208f80c8c72f4 (diff)
reduce views, BugzID 10220
Diffstat (limited to 'src')
-rw-r--r--src/fabric_rpc.erl87
-rw-r--r--src/fabric_view.erl88
-rw-r--r--src/fabric_view_map.erl4
-rw-r--r--src/fabric_view_reduce.erl90
4 files changed, 231 insertions, 38 deletions
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 85c01906..aa922585 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -2,7 +2,7 @@
-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
--export([all_docs/2, map_view/4]).
+-export([all_docs/2, map_view/4, reduce_view/4]).
-include("fabric.hrl").
@@ -60,7 +60,7 @@ map_view(DbName, DDoc, ViewName, QueryArgs) ->
Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc),
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
{ok, Group} = couch_view_group:request_group(Pid, MinSeq),
- View = extract_view(Pid, ViewName, Group#group.views, ViewType),
+ View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType),
{ok, Total} = couch_view:get_row_count(View),
Acc0 = #view_acc{
db = Db,
@@ -85,6 +85,38 @@ map_view(DbName, DDoc, ViewName, QueryArgs) ->
end,
final_response(Total, Acc#view_acc.offset).
+reduce_view(DbName, Group0, ViewName, QueryArgs) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ #view_query_args{
+ start_key = StartKey,
+ start_docid = StartDocId,
+ end_key = EndKey,
+ end_docid = EndDocId,
+ group_level = GroupLevel,
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ direction = Dir,
+ stale = Stale
+ } = QueryArgs,
+ GroupFun = group_rows_fun(GroupLevel),
+ MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group(
+ Pid, MinSeq),
+ {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
+ ReduceView = {reduce, NthRed, Lang, View},
+ Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
+ case Keys of
+ nil ->
+ couch_view:fold_reduce(ReduceView, Dir, {StartKey,StartDocId},
+ {EndKey,EndDocId}, GroupFun, fun reduce_fold/3, Acc0);
+ _ ->
+ [couch_view:fold_reduce(ReduceView, Dir, {K,StartDocId}, {K,EndDocId},
+ GroupFun, fun reduce_fold/3, Acc0) || K <- Keys]
+ end,
+ rexi:reply(complete).
+
get_db_info(DbName) ->
with_db(DbName, [], {couch_db, get_db_info, []}).
@@ -229,33 +261,30 @@ default_stop_fun(#view_query_args{direction=rev} = Args) ->
couch_view:less_json([ViewKey, ViewId], [EndKey, EndDocId])
end.
-extract_view(Pid, ViewName, [], _ViewType) ->
- ?LOG_ERROR("missing_named_view ~p", [ViewName]),
- exit(Pid, kill),
- exit(missing_named_view);
-extract_view(Pid, ViewName, [View|Rest], ViewType) ->
- case lists:member(ViewName, view_names(View, ViewType)) of
- true ->
- if ViewType == reduce ->
- {index_of(ViewName, view_names(View, reduce)), View};
- true ->
- View
- end;
- false ->
- extract_view(Pid, ViewName, Rest, ViewType)
+group_rows_fun(exact) ->
+ fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
+group_rows_fun(0) ->
+ fun(_A, _B) -> true end;
+group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
+ fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
+ lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
+ ({Key1,_}, {Key2,_}) ->
+ Key1 == Key2
end.
-view_names(View, Type) when Type == red_map; Type == reduce ->
- [Name || {Name, _} <- View#view.reduce_funs];
-view_names(View, map) ->
- View#view.map_names.
-
-index_of(X, List) ->
- index_of(X, List, 1).
+reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
+ {stop, Acc};
+reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
+ send(null, Red, Acc);
+reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
+ send(Key, Red, Acc);
+reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
+ send(lists:sublist(K, I), Red, Acc).
-index_of(_X, [], _I) ->
- not_found;
-index_of(X, [X|_Rest], I) ->
- I;
-index_of(X, [_|Rest], I) ->
- index_of(X, Rest, I+1).
+send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+ case rexi:sync_reply(#view_row{key=Key, value=Value}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ stop ->
+ exit(normal)
+ end.
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index ae5ce361..70dedf27 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -1,7 +1,8 @@
-module(fabric_view).
-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
- maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1]).
+ maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
+ extract_view/4]).
-include("fabric.hrl").
@@ -100,10 +101,7 @@ maybe_send_row(State) ->
true ->
{ok, State};
false ->
- case get_next_row(State) of
- complete ->
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}};
+ try get_next_row(State) of
{_, NewState} when Skip > 0 ->
maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1});
{Row, NewState} ->
@@ -113,6 +111,9 @@ maybe_send_row(State) ->
{ok, Acc} ->
maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
end
+ catch complete ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
end
end.
@@ -125,8 +126,31 @@ keydict(Keys) ->
%% internal %%
+get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ keys = Keys,
+ rows = RowDict,
+ os_proc = Proc,
+ counters = Counters0
+ } = St,
+ {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
+ case dict:find(Key, RowDict) of
+ {ok, Records} ->
+ NewRowDict = dict:erase(Key, RowDict),
+ Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
+ fabric_dict:update_counter(Worker, -1, CountersAcc)
+ end, Counters0, Records),
+ Wrapped = [[V] || #view_row{value=V} <- Records],
+ {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
+ NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
+ {#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
+ error ->
+ NewSt = St#collector{keys=RestKeys},
+ {#view_row{key=Key, id=reduced, value={error, missing}}, NewSt}
+ end;
get_next_row(#collector{rows = []}) ->
- complete;
+ throw(complete);
get_next_row(State) ->
#collector{
rows = [Row|Rest],
@@ -138,11 +162,25 @@ get_next_row(State) ->
NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
case Stop(Row) of
true ->
- complete;
+ throw(complete);
false ->
{Row, NewState#collector{rows = Rest}}
end.
+find_next_key(undefined, Dir, RowDict) ->
+ case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
+ [] ->
+ throw(complete);
+ [Key|_] ->
+ {Key, undefined}
+ end;
+find_next_key([], _, _) ->
+ throw(complete);
+find_next_key([Key|Rest], _, _) ->
+ {Key, Rest}.
+
+transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
+ {row, {[{key,Key}, {value,Value}]}};
transform_row(#view_row{key=Key, id=undefined}) ->
{row, {[{key,Key}, {error,not_found}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
@@ -151,3 +189,39 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
{row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
{row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}.
+
+sort_fun(fwd) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
+sort_fun(rev) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
+
+extract_view(Pid, ViewName, [], _ViewType) ->
+ ?LOG_ERROR("missing_named_view ~p", [ViewName]),
+ exit(Pid, kill),
+ exit(missing_named_view);
+extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+ case lists:member(ViewName, view_names(View, ViewType)) of
+ true ->
+ if ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
+ end.
+
+view_names(View, Type) when Type == red_map; Type == reduce ->
+ [Name || {Name, _} <- View#view.reduce_funs];
+view_names(View, map) ->
+ View#view.map_names.
+
+index_of(X, List) ->
+ index_of(X, List, 1).
+
+index_of(_X, [], _I) ->
+ not_found;
+index_of(X, [X|_Rest], I) ->
+ I;
+index_of(X, [_|Rest], I) ->
+ index_of(X, Rest, I+1).
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 6ec7dfde..8316979f 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -20,7 +20,7 @@ go(DbName, {GroupId, View}, Args, Callback, Acc0) ->
skip = Skip,
limit = Limit,
stop_fun = stop_fun(Args),
- keydict = fabric_view:keydict(Keys),
+ keys = fabric_view:keydict(Keys),
user_acc = Acc0
},
try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
@@ -93,7 +93,7 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
query_args = #view_query_args{direction=Dir},
counters = Counters0,
rows = Rows0,
- keydict = KeyDict
+ keys = KeyDict
} = State,
Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
new file mode 100644
index 00000000..1a619877
--- /dev/null
+++ b/src/fabric_view_reduce.erl
@@ -0,0 +1,90 @@
+-module(fabric_view_reduce).
+
+-export([go/5]).
+
+-include("fabric.hrl").
+
+go(DbName, {GroupId, VName}, Args, Callback, Acc0) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+ #group{def_lang=Lang, views=Views} = Group =
+ couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc),
+ {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
+ {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
+ Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
+ Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
+ Shard#shard{ref = Ref}
+ end, partitions:all_parts(DbName)),
+ BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
+ #view_query_args{limit = Limit, skip = Skip} = Args,
+ State = #collector{
+ query_args = Args,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ lang = Group#group.def_lang,
+ os_proc = couch_query_servers:get_os_process(Lang),
+ reducer = RedSrc,
+ rows = dict:new(),
+ user_acc = Acc0
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ Error ->
+ Error
+ after
+ fabric_util:cleanup(Workers),
+ catch couch_query_servers:ret_os_process(State#collector.os_proc)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
+ #collector{counters = Counters0, rows = Rows0} = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, State};
+ % first ->
+ % gen_server:reply(From, ok),
+ % Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
+ % C1 = fabric_dict:store(Worker, 1, Counters0),
+ % C2 = fabric_view:remove_overlapping_shards(Worker, C1),
+ % NewState = State#collector{counters=C2, rows=Rows},
+ % case fabric_dict:any(first, C2) of
+ % true ->
+ % {ok, NewState};
+ % false ->
+ % fabric_view:maybe_send_row(State#collector{counters=C2, rows=Rows})
+ % end;
+ _ ->
+ Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
+ C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ C2 = fabric_view:remove_overlapping_shards(Worker, C1),
+ State1 = State#collector{rows=Rows, counters=C2},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2)
+ end;
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).