diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 23:44:33 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-07 23:45:18 -0400 |
commit | 2569631e249cc8209858f590a349f314b7253f3e (patch) | |
tree | f5159200982c26f4c8b9b364357e295f1a17c662 | |
parent | ab14b5bfdb88d9f07b6885f4cc1208f80c8c72f4 (diff) |
reduce views, BugzID 10220
-rw-r--r-- | ebin/fabric.app | 3 | ||||
-rw-r--r-- | include/fabric.hrl | 5 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 87 | ||||
-rw-r--r-- | src/fabric_view.erl | 88 | ||||
-rw-r--r-- | src/fabric_view_map.erl | 4 | ||||
-rw-r--r-- | src/fabric_view_reduce.erl | 90 |
6 files changed, 237 insertions, 40 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app index c61ba87a..ef05bb5d 100644 --- a/ebin/fabric.app +++ b/ebin/fabric.app @@ -19,7 +19,8 @@ fabric_util, fabric_view, fabric_view_all_docs, - fabric_view_map + fabric_view_map, + fabric_view_reduce ]}, {registered, []}, {included_applications, []}, diff --git a/include/fabric.hrl b/include/fabric.hrl index 460cf578..f4665ca8 100644 --- a/include/fabric.hrl +++ b/include/fabric.hrl @@ -22,7 +22,10 @@ skip, limit, stop_fun, - keydict, + keys, + os_proc, + reducer, + lang, user_acc }). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 85c01906..aa922585 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -2,7 +2,7 @@ -export([get_db_info/1, get_doc_count/1, get_update_seq/1]). -export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). --export([all_docs/2, map_view/4]). +-export([all_docs/2, map_view/4, reduce_view/4]). -include("fabric.hrl"). @@ -60,7 +60,7 @@ map_view(DbName, DDoc, ViewName, QueryArgs) -> Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc), {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - View = extract_view(Pid, ViewName, Group#group.views, ViewType), + View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), {ok, Total} = couch_view:get_row_count(View), Acc0 = #view_acc{ db = Db, @@ -85,6 +85,38 @@ map_view(DbName, DDoc, ViewName, QueryArgs) -> end, final_response(Total, Acc#view_acc.offset). +reduce_view(DbName, Group0, ViewName, QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + start_key = StartKey, + start_docid = StartDocId, + end_key = EndKey, + end_docid = EndDocId, + group_level = GroupLevel, + limit = Limit, + skip = Skip, + keys = Keys, + direction = Dir, + stale = Stale + } = QueryArgs, + GroupFun = group_rows_fun(GroupLevel), + MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group( + Pid, MinSeq), + {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), + ReduceView = {reduce, NthRed, Lang, View}, + Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, + case Keys of + nil -> + couch_view:fold_reduce(ReduceView, Dir, {StartKey,StartDocId}, + {EndKey,EndDocId}, GroupFun, fun reduce_fold/3, Acc0); + _ -> + [couch_view:fold_reduce(ReduceView, Dir, {K,StartDocId}, {K,EndDocId}, + GroupFun, fun reduce_fold/3, Acc0) || K <- Keys] + end, + rexi:reply(complete). + get_db_info(DbName) -> with_db(DbName, [], {couch_db, get_db_info, []}). @@ -229,33 +261,30 @@ default_stop_fun(#view_query_args{direction=rev} = Args) -> couch_view:less_json([ViewKey, ViewId], [EndKey, EndDocId]) end. -extract_view(Pid, ViewName, [], _ViewType) -> - ?LOG_ERROR("missing_named_view ~p", [ViewName]), - exit(Pid, kill), - exit(missing_named_view); -extract_view(Pid, ViewName, [View|Rest], ViewType) -> - case lists:member(ViewName, view_names(View, ViewType)) of - true -> - if ViewType == reduce -> - {index_of(ViewName, view_names(View, reduce)), View}; - true -> - View - end; - false -> - extract_view(Pid, ViewName, Rest, ViewType) +group_rows_fun(exact) -> + fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; +group_rows_fun(0) -> + fun(_A, _B) -> true end; +group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> + fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> + lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); + ({Key1,_}, {Key2,_}) -> + Key1 == Key2 end. -view_names(View, Type) when Type == red_map; Type == reduce -> - [Name || {Name, _} <- View#view.reduce_funs]; -view_names(View, map) -> - View#view.map_names. - -index_of(X, List) -> - index_of(X, List, 1). +reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> + {stop, Acc}; +reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> + send(null, Red, Acc); +reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> + send(Key, Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> + send(lists:sublist(K, I), Red, Acc). -index_of(_X, [], _I) -> - not_found; -index_of(X, [X|_Rest], I) -> - I; -index_of(X, [_|Rest], I) -> - index_of(X, Rest, I+1). +send(Key, Value, #view_acc{limit=Limit} = Acc) -> + case rexi:sync_reply(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + stop -> + exit(normal) + end. diff --git a/src/fabric_view.erl b/src/fabric_view.erl index ae5ce361..70dedf27 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -1,7 +1,8 @@ -module(fabric_view). -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, - maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1]). + maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, + extract_view/4]). -include("fabric.hrl"). @@ -100,10 +101,7 @@ maybe_send_row(State) -> true -> {ok, State}; false -> - case get_next_row(State) of - complete -> - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; + try get_next_row(State) of {_, NewState} when Skip > 0 -> maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); {Row, NewState} -> @@ -113,6 +111,9 @@ maybe_send_row(State) -> {ok, Acc} -> maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) end + catch complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}} end end. @@ -125,8 +126,31 @@ keydict(Keys) -> %% internal %% +get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> + #collector{ + query_args = #view_query_args{direction=Dir}, + keys = Keys, + rows = RowDict, + os_proc = Proc, + counters = Counters0 + } = St, + {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), + case dict:find(Key, RowDict) of + {ok, Records} -> + NewRowDict = dict:erase(Key, RowDict), + Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> + fabric_dict:update_counter(Worker, -1, CountersAcc) + end, Counters0, Records), + Wrapped = [[V] || #view_row{value=V} <- Records], + {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), + NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, + {#view_row{key=Key, id=reduced, value=Reduced}, NewSt}; + error -> + NewSt = St#collector{keys=RestKeys}, + {#view_row{key=Key, id=reduced, value={error, missing}}, NewSt} + end; get_next_row(#collector{rows = []}) -> - complete; + throw(complete); get_next_row(State) -> #collector{ rows = [Row|Rest], @@ -138,11 +162,25 @@ get_next_row(State) -> NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), case Stop(Row) of true -> - complete; + throw(complete); false -> {Row, NewState#collector{rows = Rest}} end. +find_next_key(undefined, Dir, RowDict) -> + case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of + [] -> + throw(complete); + [Key|_] -> + {Key, undefined} + end; +find_next_key([], _, _) -> + throw(complete); +find_next_key([Key|Rest], _, _) -> + {Key, Rest}. + +transform_row(#view_row{key=Key, id=reduced, value=Value}) -> + {row, {[{key,Key}, {value,Value}]}}; transform_row(#view_row{key=Key, id=undefined}) -> {row, {[{key,Key}, {error,not_found}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> @@ -151,3 +189,39 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}. + +sort_fun(fwd) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; +sort_fun(rev) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. + +extract_view(Pid, ViewName, [], _ViewType) -> + ?LOG_ERROR("missing_named_view ~p", [ViewName]), + exit(Pid, kill), + exit(missing_named_view); +extract_view(Pid, ViewName, [View|Rest], ViewType) -> + case lists:member(ViewName, view_names(View, ViewType)) of + true -> + if ViewType == reduce -> + {index_of(ViewName, view_names(View, reduce)), View}; + true -> + View + end; + false -> + extract_view(Pid, ViewName, Rest, ViewType) + end. + +view_names(View, Type) when Type == red_map; Type == reduce -> + [Name || {Name, _} <- View#view.reduce_funs]; +view_names(View, map) -> + View#view.map_names. + +index_of(X, List) -> + index_of(X, List, 1). + +index_of(_X, [], _I) -> + not_found; +index_of(X, [X|_Rest], I) -> + I; +index_of(X, [_|Rest], I) -> + index_of(X, Rest, I+1). diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl index 6ec7dfde..8316979f 100644 --- a/src/fabric_view_map.erl +++ b/src/fabric_view_map.erl @@ -20,7 +20,7 @@ go(DbName, {GroupId, View}, Args, Callback, Acc0) -> skip = Skip, limit = Limit, stop_fun = stop_fun(Args), - keydict = fabric_view:keydict(Keys), + keys = fabric_view:keydict(Keys), user_acc = Acc0 }, try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, @@ -93,7 +93,7 @@ handle_message(#view_row{} = Row, {Worker, From}, State) -> query_args = #view_query_args{direction=Dir}, counters = Counters0, rows = Rows0, - keydict = KeyDict + keys = KeyDict } = State, Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl new file mode 100644 index 00000000..1a619877 --- /dev/null +++ b/src/fabric_view_reduce.erl @@ -0,0 +1,90 @@ +-module(fabric_view_reduce). + +-export([go/5]). + +-include("fabric.hrl"). + +go(DbName, {GroupId, VName}, Args, Callback, Acc0) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), + #group{def_lang=Lang, views=Views} = Group = + couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc), + {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), + {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), + Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> + Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), + Shard#shard{ref = Ref} + end, partitions:all_parts(DbName)), + BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"), + #view_query_args{limit = Limit, skip = Skip} = Args, + State = #collector{ + query_args = Args, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + lang = Group#group.def_lang, + os_proc = couch_query_servers:get_os_process(Lang), + reducer = RedSrc, + rows = dict:new(), + user_acc = Acc0 + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + Error -> + Error + after + fabric_util:cleanup(Workers), + catch couch_query_servers:ret_os_process(State#collector.os_proc) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> + #collector{counters = Counters0, rows = Rows0} = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, State}; + % first -> + % gen_server:reply(From, ok), + % Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), + % C1 = fabric_dict:store(Worker, 1, Counters0), + % C2 = fabric_view:remove_overlapping_shards(Worker, C1), + % NewState = State#collector{counters=C2, rows=Rows}, + % case fabric_dict:any(first, C2) of + % true -> + % {ok, NewState}; + % false -> + % fabric_view:maybe_send_row(State#collector{counters=C2, rows=Rows}) + % end; + _ -> + Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), + C1 = fabric_dict:update_counter(Worker, 1, Counters0), + C2 = fabric_view:remove_overlapping_shards(Worker, C1), + State1 = State#collector{rows=Rows, counters=C2}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2) + end; + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). |