diff options
Diffstat (limited to 'apps')
-rw-r--r-- | apps/couch/src/couch_view_updater.erl | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl index b04a80ce..a07e5dd3 100644 --- a/apps/couch/src/couch_view_updater.erl +++ b/apps/couch/src/couch_view_updater.erl @@ -145,29 +145,47 @@ do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) -> -spec do_writes(pid(), pid() | nil, #group{}, pid(), boolean()) -> any(). do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) -> - case couch_work_queue:dequeue(WriteQueue) of - closed -> + case accumulate_writes(WriteQueue, couch_work_queue:dequeue(WriteQueue), nil) of + stop -> Parent ! {new_group, Group}; - {ok, Queue} -> - {NewSeq, ViewKeyValues, DocIdViewIdKeys} = lists:foldl( - fun({Seq, ViewKVs, DocIdViewIdKeys}, nil) -> - {Seq, ViewKVs, DocIdViewIdKeys}; - ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) -> - {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc, - AccViewKVs2 = lists:zipwith( - fun({View, KVsIn}, {_View, KVsAcc}) -> - {View, KVsIn ++ KVsAcc} - end, ViewKVs, AccViewKVs), - {lists:max([Seq, Seq2]), - AccViewKVs2, DocIdViewIdKeys ++ AccDocIdViewIdKeys} - end, nil, Queue), + {Go, {NewSeq, ViewKeyValues, DocIdViewIdKeys}} -> Group2 = write_changes(Group, ViewKeyValues, DocIdViewIdKeys, NewSeq, InitialBuild), - case Owner of - nil -> ok; - _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) - end, - ?MODULE:do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild) + if Go =:= stop -> + Parent ! {new_group, Group2}; + true -> + case Owner of + nil -> ok; + _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) + end, + ?MODULE:do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild) + end + end. + +accumulate_writes(_, closed, nil) -> + stop; +accumulate_writes(_, closed, Acc) -> + {stop, Acc}; +accumulate_writes(W, {ok, Queue}, Acc0) -> + {_, _, DocIdViewIdKeys} = NewAcc = lists:foldl( + fun(First, nil) -> First; ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) -> + {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc, + AccViewKVs2 = lists:zipwith( + fun({View, KVsIn}, {_View, KVsAcc}) -> + {View, KVsIn ++ KVsAcc} + end, ViewKVs, AccViewKVs), + {erlang:max(Seq, Seq2), AccViewKVs2, + DocIdViewIdKeys ++ AccDocIdViewIdKeys} + end, Acc0, Queue), + % check if we have enough items now + MinItems = couch_config:get("view_updater", "min_writer_items", "100"), + MinSize = couch_config:get("view_updater", "min_writer_size", "16777216"), + case length(DocIdViewIdKeys) >= list_to_integer(MinItems) orelse + process_info(self(), memory) >= list_to_integer(MinSize) of + true -> + {ok, NewAcc}; + false -> + accumulate_writes(W, couch_work_queue:dequeue(W), NewAcc) end. -spec view_insert_query_results([#doc{}], list(), any(), any()) -> any(). |