% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_work_queue). -behaviour(gen_server). -export([new/1,queue/2,dequeue/1,dequeue/2,close/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). -record(q, { queue=queue:new(), blocked=[], max_size, max_items, items=0, size=0, work_waiters=[], close_on_dequeue=false, multi_workers=false }). new(Options) -> gen_server:start_link(couch_work_queue, Options, []). queue(Wq, Item) -> gen_server:call(Wq, {queue, Item}, infinity). dequeue(Wq) -> dequeue(Wq, all). dequeue(Wq, MaxItems) -> try gen_server:call(Wq, {dequeue, MaxItems}, infinity) catch _:_ -> closed end. close(Wq) -> gen_server:cast(Wq, close). init(Options) -> Q = #q{ max_size = couch_util:get_value(max_size, Options), max_items = couch_util:get_value(max_items, Options), multi_workers = couch_util:get_value(multi_workers, Options, false) }, {ok, Q}. terminate(_Reason, #q{work_waiters=Workers}) -> lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). handle_call({queue, Item}, From, #q{work_waiters=[]}=Q0) -> Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)), items=Q0#q.items + 1, queue=queue:in(Item, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items) of true -> {noreply, Q#q{blocked=[From | Q#q.blocked]}}; false -> {reply, ok, Q} end; handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> gen_server:reply(W, {ok, [Item]}), {reply, ok, Q#q{work_waiters=Rest}}; handle_call({dequeue, Max}, From, Q) -> #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, case {Workers, Multi} of {[_ | _], false} -> exit("Only one caller allowed to wait for this work at a time"); {[_ | _], true} -> {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; _ -> case Count of 0 -> {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; C when C > 0 -> deliver_queue_items(Max, Q) end end. deliver_queue_items(Max, Q) -> #q{ queue = Queue, items = Count, close_on_dequeue = Close, blocked = Blocked } = Q, case (Max =:= all) orelse (Max >= Count) of false -> {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []), Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2}, {reply, {ok, Items}, Q2}; true -> lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, case Close of false -> {reply, {ok, queue:to_list(Queue)}, Q2}; true -> {stop, normal, {ok, queue:to_list(Queue)}, Q2} end end. dequeue_items(0, Queue, Blocked, DequeuedAcc) -> {lists:reverse(DequeuedAcc), Queue, Blocked}; dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) -> {{value, Item}, Queue2} = queue:out(Queue), case Blocked of [] -> Blocked2 = Blocked; [From|Blocked2] -> gen_server:reply(From, ok) end, dequeue_items(NumItems-1, Queue2, Blocked2, [Item | DequeuedAcc]). handle_cast(close, #q{items=0}=Q) -> {stop, normal, Q}; handle_cast(close, Q) -> {noreply, Q#q{close_on_dequeue=true}}. code_change(_OldVsn, State, _Extra) -> {ok, State}. handle_info(X, Q) -> {stop, X, Q}.