summaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/couch/src/couch_view_updater.erl58
1 files changed, 38 insertions, 20 deletions
diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl
index b04a80ce..a07e5dd3 100644
--- a/apps/couch/src/couch_view_updater.erl
+++ b/apps/couch/src/couch_view_updater.erl
@@ -145,29 +145,47 @@ do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) ->
-spec do_writes(pid(), pid() | nil, #group{}, pid(), boolean()) -> any().
do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) ->
- case couch_work_queue:dequeue(WriteQueue) of
- closed ->
+ case accumulate_writes(WriteQueue, couch_work_queue:dequeue(WriteQueue), nil) of
+ stop ->
Parent ! {new_group, Group};
- {ok, Queue} ->
- {NewSeq, ViewKeyValues, DocIdViewIdKeys} = lists:foldl(
- fun({Seq, ViewKVs, DocIdViewIdKeys}, nil) ->
- {Seq, ViewKVs, DocIdViewIdKeys};
- ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) ->
- {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc,
- AccViewKVs2 = lists:zipwith(
- fun({View, KVsIn}, {_View, KVsAcc}) ->
- {View, KVsIn ++ KVsAcc}
- end, ViewKVs, AccViewKVs),
- {lists:max([Seq, Seq2]),
- AccViewKVs2, DocIdViewIdKeys ++ AccDocIdViewIdKeys}
- end, nil, Queue),
+ {Go, {NewSeq, ViewKeyValues, DocIdViewIdKeys}} ->
Group2 = write_changes(Group, ViewKeyValues, DocIdViewIdKeys, NewSeq,
InitialBuild),
- case Owner of
- nil -> ok;
- _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2})
- end,
- ?MODULE:do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild)
+ if Go =:= stop ->
+ Parent ! {new_group, Group2};
+ true ->
+ case Owner of
+ nil -> ok;
+ _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2})
+ end,
+ ?MODULE:do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild)
+ end
+ end.
+
+accumulate_writes(_, closed, nil) ->
+ stop;
+accumulate_writes(_, closed, Acc) ->
+ {stop, Acc};
+accumulate_writes(W, {ok, Queue}, Acc0) ->
+ {_, _, DocIdViewIdKeys} = NewAcc = lists:foldl(
+ fun(First, nil) -> First; ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) ->
+ {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc,
+ AccViewKVs2 = lists:zipwith(
+ fun({View, KVsIn}, {_View, KVsAcc}) ->
+ {View, KVsIn ++ KVsAcc}
+ end, ViewKVs, AccViewKVs),
+ {erlang:max(Seq, Seq2), AccViewKVs2,
+ DocIdViewIdKeys ++ AccDocIdViewIdKeys}
+ end, Acc0, Queue),
+ % check if we have enough items now
+ MinItems = couch_config:get("view_updater", "min_writer_items", "100"),
+ MinSize = couch_config:get("view_updater", "min_writer_size", "16777216"),
+ case length(DocIdViewIdKeys) >= list_to_integer(MinItems) orelse
+ process_info(self(), memory) >= list_to_integer(MinSize) of
+ true ->
+ {ok, NewAcc};
+ false ->
+ accumulate_writes(W, couch_work_queue:dequeue(W), NewAcc)
end.
-spec view_insert_query_results([#doc{}], list(), any(), any()) -> any().