summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-08 10:51:08 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-08 10:51:08 -0400
commit42a45f2a77a125f8c0f5be99edd574fbe391f1ee (patch)
treeb9fc8cd3bbc47612af3650a6f60ea27088e82bf3
parent2569631e249cc8209858f590a349f314b7253f3e (diff)
ignore missing Keys in reduce view, like couch. BugzID 10220
-rw-r--r--src/fabric_view.erl11
-rw-r--r--src/fabric_view_reduce.erl14
2 files changed, 7 insertions, 18 deletions
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 70dedf27..2432ab40 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -126,6 +126,8 @@ keydict(Keys) ->
%% internal %%
+get_next_row(#collector{rows = []}) ->
+ throw(complete);
get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
#collector{
query_args = #view_query_args{direction=Dir},
@@ -146,11 +148,8 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
{#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
error ->
- NewSt = St#collector{keys=RestKeys},
- {#view_row{key=Key, id=reduced, value={error, missing}}, NewSt}
+ get_next_row(St#collector{keys=RestKeys})
end;
-get_next_row(#collector{rows = []}) ->
- throw(complete);
get_next_row(State) ->
#collector{
rows = [Row|Rest],
@@ -167,12 +166,12 @@ get_next_row(State) ->
{Row, NewState#collector{rows = Rest}}
end.
-find_next_key(undefined, Dir, RowDict) ->
+find_next_key(nil, Dir, RowDict) ->
case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
[] ->
throw(complete);
[Key|_] ->
- {Key, undefined}
+ {Key, nil}
end;
find_next_key([], _, _) ->
throw(complete);
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 1a619877..0e52ec84 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -21,6 +21,7 @@ go(DbName, {GroupId, VName}, Args, Callback, Acc0) ->
callback = Callback,
buffer_size = list_to_integer(BufferSize),
counters = fabric_dict:init(Workers, 0),
+ keys = Args#view_query_args.keys,
skip = Skip,
limit = Limit,
lang = Group#group.def_lang,
@@ -64,21 +65,10 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
% this worker lost the race with other partition copies, terminate it
gen_server:reply(From, stop),
{ok, State};
- % first ->
- % gen_server:reply(From, ok),
- % Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
- % C1 = fabric_dict:store(Worker, 1, Counters0),
- % C2 = fabric_view:remove_overlapping_shards(Worker, C1),
- % NewState = State#collector{counters=C2, rows=Rows},
- % case fabric_dict:any(first, C2) of
- % true ->
- % {ok, NewState};
- % false ->
- % fabric_view:maybe_send_row(State#collector{counters=C2, rows=Rows})
- % end;
_ ->
Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ % TODO time this call, if slow don't do it every time
C2 = fabric_view:remove_overlapping_shards(Worker, C1),
State1 = State#collector{rows=Rows, counters=C2},
State2 = fabric_view:maybe_pause_worker(Worker, From, State1),