summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-07 14:46:35 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-07 14:46:35 -0400
commit38bde53e2429f60b3209d71c562218f3c7429945 (patch)
tree5fd5ee047eeb7a3b46cc022c0eab121a74f5e1ec
parenta9a36b4184fae5e4078982d48c0536b8ba948391 (diff)
map views w/o keylist, BugzID 10220
-rw-r--r--ebin/fabric.app5
-rw-r--r--include/fabric.hrl1
-rw-r--r--src/fabric.erl12
-rw-r--r--src/fabric_rpc.erl105
-rw-r--r--src/fabric_view_all_docs.erl199
5 files changed, 135 insertions, 187 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app
index 2e0f8df3..c61ba87a 100644
--- a/ebin/fabric.app
+++ b/ebin/fabric.app
@@ -10,13 +10,16 @@
fabric_db_delete,
fabric_db_doc_count,
fabric_db_info,
+ fabric_dict,
fabric_doc_missing_revs,
fabric_doc_open,
fabric_doc_open_revs,
fabric_doc_update,
fabric_rpc,
fabric_util,
- fabric_view_all_docs
+ fabric_view,
+ fabric_view_all_docs,
+ fabric_view_map
]},
{registered, []},
{included_applications, []},
diff --git a/include/fabric.hrl b/include/fabric.hrl
index 31e4336c..fa8319b4 100644
--- a/include/fabric.hrl
+++ b/include/fabric.hrl
@@ -21,6 +21,7 @@
rows = [],
skip,
limit,
+ stop_fun,
user_acc
}).
diff --git a/src/fabric.erl b/src/fabric.erl
index 29396e9e..983cd818 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -9,7 +9,7 @@
update_docs/3]).
% Views
--export([all_docs/4]).
+-export([all_docs/4, query_view/5]).
% miscellany
-export([db_path/2, design_docs/1]).
@@ -65,6 +65,12 @@ all_docs(DbName, #view_query_args{} = QueryArgs, Callback, Acc0) when
is_function(Callback, 2) ->
fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0).
+query_view(DbName, View, #view_query_args{view_type=reduce} = QueryArgs,
+ Callback, Acc0) ->
+ fabric_view_reduce:go(dbname(DbName), view(View), QueryArgs, Callback, Acc0);
+query_view(DbName, View, #view_query_args{} = QueryArgs, Callback, Acc0) ->
+ fabric_view_map:go(dbname(DbName), view(View), QueryArgs, Callback, Acc0).
+
design_docs(DbName) ->
QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true},
Callback = fun({total_and_offset, _, _}, []) ->
@@ -117,6 +123,10 @@ rev(Rev) when is_list(Rev); is_binary(Rev) ->
rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
Rev.
+view(ViewName) ->
+ [Group, View] = re:split(ViewName, "/"),
+ {Group, View}.
+
opts(Options) ->
case couch_util:get_value(user_ctx, Options) of
undefined ->
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index a2d42007..0eab5e6d 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -2,7 +2,7 @@
-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
--export([all_docs/2]).
+-export([all_docs/2, map_view/4]).
-include("fabric.hrl").
@@ -11,6 +11,7 @@
limit,
include_docs,
offset = nil,
+ total_rows,
reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
stop_fun,
group_level = 0
@@ -30,14 +31,46 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
direction = Dir
} = QueryArgs,
StartId = if is_binary(StartKey) -> StartKey; true -> StartDocId end,
+ {ok, Total} = couch_db:get_doc_count(Db),
Acc0 = #view_acc{
db = Db,
include_docs = IncludeDocs,
limit = Limit+Skip,
+ total_rows = Total,
stop_fun = all_docs_stop_fun(QueryArgs)
},
{ok, Acc} = couch_db:enum_docs(Db, StartId, Dir, fun view_fold/3, Acc0),
- final_all_docs_response(Db, Acc#view_acc.offset).
+ final_response(Total, Acc#view_acc.offset).
+
+map_view(DbName, DDoc, ViewName, #view_query_args{keys=nil} = QueryArgs) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ #view_query_args{
+ start_key = StartKey,
+ start_docid = StartDocId,
+ limit = Limit,
+ skip = Skip,
+ include_docs = IncludeDocs,
+ direction = Dir,
+ stale = Stale,
+ view_type = ViewType
+ } = QueryArgs,
+ Start = {StartKey, StartDocId},
+ MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
+ Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc),
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ View = extract_view(Pid, ViewName, Group#group.views, ViewType),
+ {ok, Total} = couch_view:get_row_count(View),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ limit = Limit+Skip,
+ total_rows = Total,
+ reduce_fun = fun couch_view:reduce_to_count/1,
+ stop_fun = default_stop_fun(QueryArgs)
+ },
+ {ok, Acc} = couch_view:fold(View, Start, Dir, fun view_fold/3, Acc0),
+ final_response(Total, Acc#view_acc.offset).
get_db_info(DbName) ->
with_db(DbName, [], {couch_db, get_db_info, []}).
@@ -101,10 +134,9 @@ view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
#doc_info{revs=[#rev_info{deleted=true}|_]} ->
{ok, Acc}
end;
-view_fold(KV, OffsetReds, #view_acc{offset=nil} = Acc) ->
+view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
% calculates the offset for this shard
- #view_acc{db=Db, reduce_fun=Reduce} = Acc,
- {ok, Total} = couch_db:get_doc_count(Db),
+ #view_acc{reduce_fun=Reduce} = Acc,
Offset = Reduce(OffsetReds),
case rexi:sync_reply({total_and_offset, Total, Offset}) of
ok ->
@@ -150,10 +182,67 @@ all_docs_stop_fun(#view_query_args{direction=rev, end_key=EndKey}) ->
couch_db_updater:less_docid(ViewKey, EndKey)
end.
-final_all_docs_response(Db, nil) ->
- {ok, Total} = couch_db:get_doc_count(Db),
+final_response(Total, nil) ->
case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
rexi:reply(complete);
stop -> ok end;
-final_all_docs_response(_Db, _Offset) ->
+final_response(_Total, _Offset) ->
rexi:reply(complete).
+
+default_stop_fun(#view_query_args{direction=fwd, inclusive_end=true} = Args) ->
+ #view_query_args{end_key=EndKey, end_docid=EndDocId} = Args,
+ fun(ViewKey, ViewId) ->
+ couch_view:less_json([EndKey, EndDocId], [ViewKey, ViewId])
+ end;
+default_stop_fun(#view_query_args{direction=fwd} = Args) ->
+ #view_query_args{end_key=EndKey, end_docid=EndDocId} = Args,
+ fun
+ (ViewKey, _ViewId) when ViewKey == EndKey ->
+ true;
+ (ViewKey, ViewId) ->
+ couch_view:less_json([EndKey, EndDocId], [ViewKey, ViewId])
+ end;
+default_stop_fun(#view_query_args{direction=rev, inclusive_end=true} = Args) ->
+ #view_query_args{end_key=EndKey, end_docid=EndDocId} = Args,
+ fun(ViewKey, ViewId) ->
+ couch_view:less_json([ViewKey, ViewId], [EndKey, EndDocId])
+ end;
+default_stop_fun(#view_query_args{direction=rev} = Args) ->
+ #view_query_args{end_key=EndKey, end_docid=EndDocId} = Args,
+ fun
+ (ViewKey, _ViewId) when ViewKey == EndKey ->
+ true;
+ (ViewKey, ViewId) ->
+ couch_view:less_json([ViewKey, ViewId], [EndKey, EndDocId])
+ end.
+
+extract_view(Pid, ViewName, [], _ViewType) ->
+ ?LOG_ERROR("missing_named_view ~p", [ViewName]),
+ exit(Pid, kill),
+ exit(missing_named_view);
+extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+ case lists:member(ViewName, view_names(View, ViewType)) of
+ true ->
+ if ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
+ end.
+
+view_names(View, Type) when Type == red_map; Type == reduce ->
+ [Name || {Name, _} <- View#view.reduce_funs];
+view_names(View, map) ->
+ View#view.map_names.
+
+index_of(X, List) ->
+ index_of(X, List, 1).
+
+index_of(_X, [], _I) ->
+ not_found;
+index_of(X, [X|_Rest], I) ->
+ I;
+index_of(X, [_|Rest], I) ->
+ index_of(X, Rest, I+1).
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 6cdc66c5..99834286 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -16,9 +16,10 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
query_args = QueryArgs,
callback = Callback,
buffer_size = list_to_integer(BufferSize),
- counters = init_counters(Workers),
+ counters = fabric_dict:init(Workers, 0),
skip = Skip,
limit = Limit,
+ stop_fun = stop_fun(QueryArgs),
user_acc = Acc0
},
try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
@@ -58,8 +59,8 @@ handle_message({rexi_DOWN, _, _, _}, nil, State) ->
handle_message({rexi_EXIT, _}, Worker, State) ->
#collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = remove(Worker, Counters0),
- case is_progress_possible(Counters) of
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
true ->
{ok, State#collector{counters = Counters}};
false ->
@@ -75,18 +76,18 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
offset = Offset0,
user_acc = AccIn
} = State,
- case lookup_element(Worker, Counters0) of
+ case fabric_dict:lookup_element(Worker, Counters0) of
undefined ->
% this worker lost the race with other partition copies, terminate
gen_server:reply(From, stop),
{ok, State};
0 ->
gen_server:reply(From, ok),
- Counters1 = update_counter(Worker, 1, Counters0),
- Counters2 = remove_overlapping_shards(Worker, Counters1),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
Total = Total0 + Tot,
Offset = Offset0 + Off,
- case waiting_on_shards(Counters2) of
+ case fabric_dict:any(0, Counters2) of
true ->
{ok, State#collector{
counters = Counters2,
@@ -97,7 +98,7 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
FinalOffset = erlang:min(Total, Offset+State#collector.skip),
{Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
{Go, State#collector{
- counters = decrement_all_counters(Counters2),
+ counters = fabric_dict:decrement_all(Counters2),
total_rows = Total,
offset = FinalOffset,
user_acc = Acc
@@ -109,158 +110,30 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
Dir = Args#view_query_args.direction,
Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0),
- Counters1 = update_counter(Worker, 1, Counters0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = maybe_pause_worker(Worker, From, State1),
- maybe_send_row(State2);
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2);
handle_message(complete, Worker, State) ->
- Counters = update_counter(Worker, 1, State#collector.counters),
- maybe_send_row(State#collector{counters = Counters}).
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
-maybe_pause_worker(Worker, From, State) ->
- #collector{buffer_size = BufferSize, counters = Counters} = State,
- case lookup_element(Worker, Counters) of
- BufferSize ->
- State#collector{blocked = [{Worker,From} | State#collector.blocked]};
- _Count ->
- gen_server:reply(From, ok),
- State
- end.
-
-maybe_resume_worker(Worker, State) ->
- #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
- case lookup_element(Worker, C) of
- Count when Count < Buffer/2 ->
- case couch_util:get_value(Worker, B) of
- undefined ->
- State;
- From ->
- gen_server:reply(From, ok),
- State#collector{blocked = lists:keydelete(Worker, 1, B)}
- end;
- _Other ->
- State
- end.
-
-maybe_send_row(#collector{limit=0} = State) ->
- #collector{user_acc=AccIn, callback=Callback} = State,
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}};
-maybe_send_row(State) ->
- #collector{
- callback = Callback,
- counters = Counters,
- skip = Skip,
- limit = Limit,
- user_acc = AccIn
- } = State,
- case waiting_on_shards(Counters) of
- true ->
- {ok, State};
- false ->
- case get_next_row(State) of
- complete ->
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}};
- {_, NewState} when Skip > 0 ->
- maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1});
- {Row, NewState} ->
- case Callback(transform_row(Row), AccIn) of
- {stop, Acc} ->
- {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
- {ok, Acc} ->
- maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
- end
- end
- end.
-
-get_next_row(#collector{rows = []}) ->
- complete;
-get_next_row(State) ->
- #collector{query_args=Args, rows=[Row|Rest], counters=Counters0} = State,
- Worker = Row#view_row.worker,
- Counters1 = update_counter(Worker, -1, Counters0),
- NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
- case stop(Args, Row) of
- true ->
- complete;
- false ->
- {Row, NewState#collector{rows = Rest}}
+stop_fun(#view_query_args{direction=fwd, end_key=EndKey}) ->
+ fun(#view_row{id=Id}) ->
+ couch_db_updater:less_docid(EndKey, Id)
+ end;
+stop_fun(#view_query_args{direction=rev, end_key=EndKey}) ->
+ fun(#view_row{id=Id}) ->
+ couch_db_updater:less_docid(Id, EndKey)
end.
-stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) ->
- couch_db_updater:less_docid(EndKey, Id);
-stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) ->
- couch_db_updater:less_docid(Id, EndKey).
-
-transform_row(#view_row{key=Key, id=undefined}) ->
- {row, {[{key,Key}, {error,not_found}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
- {row, {[{key,Key}, {id,Id}, {value,Value}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
- {row, {[{key,Key}, {id,Id}, {value,Value}, {error,Reason}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
- {row, {[{key,Key}, {id,Id}, {value,Value}, {doc,Doc}]}}.
-
merge_row(fwd, Row, Rows) ->
lists:keymerge(#view_row.id, [Row], Rows);
merge_row(rev, Row, Rows) ->
lists:rkeymerge(#view_row.id, [Row], Rows).
-remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
- filter(fun(#shard{range=[X,Y]} = Shard, _Value) ->
- if Shard =:= Shard0 ->
- % we can't remove ourselves
- true;
- A < B, X >= A, X < B ->
- % lower bound is inside our range
- false;
- A < B, Y > A, Y =< B ->
- % upper bound is inside our range
- false;
- B < A, X >= A orelse B < A, X < B ->
- % target shard wraps the key range, lower bound is inside
- false;
- B < A, Y > A orelse B < A, Y =< B ->
- % target shard wraps the key range, upper bound is inside
- false;
- true ->
- true
- end
- end, Shards).
-
-%% @doc looks for a fully covered keyrange in the list of counters
--spec is_progress_possible([{#shard{}, non_neg_integer()}]) -> boolean().
-is_progress_possible(Counters) ->
- Ranges = fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters),
- [First | Rest] = lists:ukeysort(1, Ranges),
- {Head, Tail} = lists:foldl(fun
- (_, {Head, Tail}) when Head =:= Tail ->
- % this is the success condition, we can fast-forward
- {Head, Tail};
- (_, {foo, bar}) ->
- % we've already declared failure
- {foo, bar};
- ({X,_}, {Head, Tail}) when Head < Tail, X > Tail ->
- % gap in the keyrange, we're dead
- {foo, bar};
- ({X,Y}, {Head, Tail}) when Head < Tail, X < Y ->
- % the normal condition, adding to the tail
- {Head, erlang:max(Tail, Y)};
- ({X,Y}, {Head, Tail}) when Head < Tail, X > Y, Y >= Head ->
- % we've wrapped all the way around, trigger success condition
- {Head, Head};
- ({X,Y}, {Head, Tail}) when Head < Tail, X > Y ->
- % this wraps the keyspace, but there's still a gap. We're dead
- % TODO technically, another shard could be a superset of this one, and
- % we could still be alive. Pretty unlikely though, and impossible if
- % we don't allow shards to wrap around the boundary
- {foo, bar}
- end, First, Rest),
- Head =:= Tail.
-
doc_receive_loop([], _, _, _, Acc) ->
{ok, Acc};
doc_receive_loop(_, _, 0, _, Acc) ->
@@ -273,7 +146,7 @@ doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
end;
doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
- case Callback(transform_row(Row), AccIn) of
+ case Callback(fabric_view:transform_row(Row), AccIn) of
{ok, Acc} ->
doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
{stop, Acc} ->
@@ -300,31 +173,3 @@ open_doc(DbName, Id, IncludeDocs) ->
#view_row{key=Id, id=Id, value=Value}
end,
exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
-
-
-% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
-% have >> 100 shards, so a private interface is a good idea. - APK June 2010
-
-init_counters(Keys) ->
- orddict:from_list([{Key,0} || Key <- Keys]).
-
-decrement_all_counters(Dict) ->
- [{K,V-1} || {K,V} <- Dict].
-
-update_counter(Key, Incr, Dict0) ->
- orddict:update_counter(Key, Incr, Dict0).
-
-lookup_element(Key, Dict) ->
- couch_util:get_value(Key, Dict).
-
-waiting_on_shards(Dict) ->
- lists:keymember(0, 2, Dict).
-
-remove(Shard, Dict) ->
- orddict:erase(Shard, Dict).
-
-filter(Fun, Dict) ->
- orddict:filter(Fun, Dict).
-
-fold(Fun, Acc0, Dict) ->
- orddict:fold(Fun, Acc0, Dict).