From 58e2e130afbe692e216a0943907b7c26b1d36e8b Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Sat, 21 Aug 2010 20:26:31 +0000 Subject: Work queue - add support for multiple consumers. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@987825 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_work_queue.erl | 77 +++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/src/couchdb/couch_work_queue.erl b/src/couchdb/couch_work_queue.erl index 44f2816d..637858b0 100644 --- a/src/couchdb/couch_work_queue.erl +++ b/src/couchdb/couch_work_queue.erl @@ -23,8 +23,9 @@ max_items, items=0, size=0, - work_waiter=nil, - close_on_dequeue=false + work_waiters=[], + close_on_dequeue=false, + multi_workers=false }). new(Options) -> @@ -49,16 +50,15 @@ close(Wq) -> init(Options) -> Q = #q{ max_size = couch_util:get_value(max_size, Options), - max_items = couch_util:get_value(max_items, Options) + max_items = couch_util:get_value(max_items, Options), + multi_workers = couch_util:get_value(multi_workers, Options, false) }, {ok, Q}. -terminate(_Reason, #q{work_waiter=nil}) -> - ok; -terminate(_Reason, #q{work_waiter={WWFrom, _}}) -> - gen_server:reply(WWFrom, closed). +terminate(_Reason, #q{work_waiters=Workers}) -> + lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). -handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) -> +handle_call({queue, Item}, From, #q{work_waiters=[]}=Q0) -> Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)), items=Q0#q.items + 1, queue=queue:in(Item, Q0#q.queue)}, @@ -69,28 +69,47 @@ handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) -> false -> {reply, ok, Q} end; -handle_call({queue, Item}, _From, #q{work_waiter={WWFrom, _Max}}=Q) -> - gen_server:reply(WWFrom, {ok, [Item]}), - {reply, ok, Q#q{work_waiter=nil}}; -handle_call({dequeue, _Max}, _From, #q{work_waiter=WW}) when WW /= nil -> - exit("Only one caller allowed to wait for work at a time"); -handle_call({dequeue, Max}, From, #q{items=0}=Q) -> - {noreply, Q#q{work_waiter={From, Max}}}; -handle_call({dequeue, Max}, _From, #q{queue=Queue, max_size=MaxSize, - max_items=MaxItems, items=Items,close_on_dequeue=Close}=Q) -> - if Max >= Items orelse Max == all -> - [gen_server:reply(From, ok) || From <- Q#q.blocked], - Q2 = #q{max_size=MaxSize, max_items=MaxItems}, - if Close -> - {stop, normal, {ok, queue:to_list(Queue)}, Q2}; - true -> - {reply, {ok, queue:to_list(Queue)}, Q2} - end; +handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> + gen_server:reply(W, {ok, [Item]}), + {reply, ok, Q#q{work_waiters=Rest}}; + +handle_call({dequeue, Max}, From, Q) -> + #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, + case {Workers, Multi} of + {[_ | _], false} -> + exit("Only one caller allowed to wait for this work at a time"); + {[_ | _], true} -> + {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; + _ -> + case Count of + 0 -> + {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; + C when C > 0 -> + deliver_queue_items(Max, Q) + end + end. + +deliver_queue_items(Max, Q) -> + #q{ + queue = Queue, + items = Count, + close_on_dequeue = Close, + blocked = Blocked + } = Q, + case (Max =:= all) orelse (Max >= Count) of + false -> + {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []), + Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2}, + {reply, {ok, Items}, Q2}; true -> - {DequeuedItems, Queue2, Blocked2} = - dequeue_items(Max, Queue, Q#q.blocked, []), - {reply, {ok, DequeuedItems}, - Q#q{items=Items-Max,blocked=Blocked2,queue=Queue2}} + lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), + Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, + case Close of + false -> + {reply, {ok, queue:to_list(Queue)}, Q2}; + true -> + {stop, normal, {ok, queue:to_list(Queue)}, Q2} + end end. dequeue_items(0, Queue, Blocked, DequeuedAcc) -> -- cgit v1.2.3