diff options
Diffstat (limited to 'apps/couch/src/couch_work_queue.erl')
-rw-r--r-- | apps/couch/src/couch_work_queue.erl | 134 |
1 files changed, 87 insertions, 47 deletions
diff --git a/apps/couch/src/couch_work_queue.erl b/apps/couch/src/couch_work_queue.erl index decfcad8..13ec7335 100644 --- a/apps/couch/src/couch_work_queue.erl +++ b/apps/couch/src/couch_work_queue.erl @@ -13,99 +13,139 @@ -module(couch_work_queue). -behaviour(gen_server). --export([new/2,queue/2,dequeue/1,dequeue/2,close/1]). --export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). +% public API +-export([new/1, queue/2, dequeue/1, dequeue/2, close/1]). + +% gen_server callbacks +-export([init/1, terminate/2]). +-export([handle_call/3, handle_cast/2, code_change/3, handle_info/2]). -record(q, { - queue=queue:new(), - blocked=[], + queue = queue:new(), + blocked = [], max_size, max_items, - items=0, - size=0, - work_waiter=nil, - close_on_dequeue=false + items = 0, + size = 0, + work_waiters = [], + close_on_dequeue = false, + multi_workers = false }). -new(MaxSize, MaxItems) -> - gen_server:start_link(couch_work_queue, {MaxSize, MaxItems}, []). + +new(Options) -> + gen_server:start_link(couch_work_queue, Options, []). + queue(Wq, Item) -> gen_server:call(Wq, {queue, Item}, infinity). + dequeue(Wq) -> dequeue(Wq, all). + dequeue(Wq, MaxItems) -> - try gen_server:call(Wq, {dequeue, MaxItems}, infinity) + try + gen_server:call(Wq, {dequeue, MaxItems}, infinity) catch _:_ -> closed end. + close(Wq) -> gen_server:cast(Wq, close). -init({MaxSize,MaxItems}) -> - {ok, #q{max_size=MaxSize, max_items=MaxItems}}. +init(Options) -> + Q = #q{ + max_size = couch_util:get_value(max_size, Options), + max_items = couch_util:get_value(max_items, Options), + multi_workers = couch_util:get_value(multi_workers, Options, false) + }, + {ok, Q}. + + +terminate(_Reason, #q{work_waiters=Workers}) -> + lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). -terminate(_Reason, #q{work_waiter=nil}) -> - ok; -terminate(_Reason, #q{work_waiter={WWFrom, _}}) -> - gen_server:reply(WWFrom, closed). -handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) -> - Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)), - items=Q0#q.items + 1, - queue=queue:in(Item, Q0#q.queue)}, +handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) -> + Q = Q0#q{size = Q0#q.size + byte_size(term_to_binary(Item)), + items = Q0#q.items + 1, + queue = queue:in(Item, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items) of true -> - {noreply, Q#q{blocked=[From | Q#q.blocked]}}; + {noreply, Q#q{blocked = [From | Q#q.blocked]}}; false -> {reply, ok, Q} end; -handle_call({queue, Item}, _From, #q{work_waiter={WWFrom, _Max}}=Q) -> - gen_server:reply(WWFrom, {ok, [Item]}), - {reply, ok, Q#q{work_waiter=nil}}; -handle_call({dequeue, _Max}, _From, #q{work_waiter=WW}) when WW /= nil -> - exit("Only one caller allowed to wait for work at a time"); -handle_call({dequeue, Max}, From, #q{items=0}=Q) -> - {noreply, Q#q{work_waiter={From, Max}}}; -handle_call({dequeue, Max}, _From, #q{queue=Queue, max_size=MaxSize, - max_items=MaxItems, items=Items,close_on_dequeue=Close}=Q) -> - if Max >= Items orelse Max == all -> - [gen_server:reply(From, ok) || From <- Q#q.blocked], - Q2 = #q{max_size=MaxSize, max_items=MaxItems}, - if Close -> - {stop, normal, {ok, queue:to_list(Queue)}, Q2}; - true -> - {reply, {ok, queue:to_list(Queue)}, Q2} - end; + +handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> + gen_server:reply(W, {ok, [Item]}), + {reply, ok, Q#q{work_waiters = Rest}}; + +handle_call({dequeue, Max}, From, Q) -> + #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, + case {Workers, Multi} of + {[_ | _], false} -> + exit("Only one caller allowed to wait for this work at a time"); + {[_ | _], true} -> + {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; + _ -> + case Count of + 0 -> + {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; + C when C > 0 -> + deliver_queue_items(Max, Q) + end + end. + + +deliver_queue_items(Max, Q) -> + #q{ + queue = Queue, + items = Count, + close_on_dequeue = Close, + blocked = Blocked + } = Q, + case (Max =:= all) orelse (Max >= Count) of + false -> + {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []), + Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2}, + {reply, {ok, Items}, Q2}; true -> - {DequeuedItems, Queue2, Blocked2} = - dequeue_items(Max, Queue, Q#q.blocked, []), - {reply, {ok, DequeuedItems}, - Q#q{items=Items-Max,blocked=Blocked2,queue=Queue2}} + lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), + Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, + case Close of + false -> + {reply, {ok, queue:to_list(Queue)}, Q2}; + true -> + {stop, normal, {ok, queue:to_list(Queue)}, Q2} + end end. + dequeue_items(0, Queue, Blocked, DequeuedAcc) -> {lists:reverse(DequeuedAcc), Queue, Blocked}; + dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) -> {{value, Item}, Queue2} = queue:out(Queue), case Blocked of [] -> Blocked2 = Blocked; - [From|Blocked2] -> + [From | Blocked2] -> gen_server:reply(From, ok) end, - dequeue_items(NumItems-1, Queue2, Blocked2, [Item | DequeuedAcc]). + dequeue_items(NumItems - 1, Queue2, Blocked2, [Item | DequeuedAcc]). -handle_cast(close, #q{items=0}=Q) -> +handle_cast(close, #q{items = 0} = Q) -> {stop, normal, Q}; + handle_cast(close, Q) -> - {noreply, Q#q{close_on_dequeue=true}}. + {noreply, Q#q{close_on_dequeue = true}}. code_change(_OldVsn, State, _Extra) -> |