diff options
Diffstat (limited to 'src/couchdb/couch_work_queue.erl')
-rw-r--r-- | src/couchdb/couch_work_queue.erl | 155 |
1 files changed, 0 insertions, 155 deletions
diff --git a/src/couchdb/couch_work_queue.erl b/src/couchdb/couch_work_queue.erl deleted file mode 100644 index 13ec7335..00000000 --- a/src/couchdb/couch_work_queue.erl +++ /dev/null @@ -1,155 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_work_queue). --behaviour(gen_server). - -% public API --export([new/1, queue/2, dequeue/1, dequeue/2, close/1]). - -% gen_server callbacks --export([init/1, terminate/2]). --export([handle_call/3, handle_cast/2, code_change/3, handle_info/2]). - --record(q, { - queue = queue:new(), - blocked = [], - max_size, - max_items, - items = 0, - size = 0, - work_waiters = [], - close_on_dequeue = false, - multi_workers = false -}). - - -new(Options) -> - gen_server:start_link(couch_work_queue, Options, []). - - -queue(Wq, Item) -> - gen_server:call(Wq, {queue, Item}, infinity). - - -dequeue(Wq) -> - dequeue(Wq, all). - - -dequeue(Wq, MaxItems) -> - try - gen_server:call(Wq, {dequeue, MaxItems}, infinity) - catch - _:_ -> closed - end. - - -close(Wq) -> - gen_server:cast(Wq, close). - - -init(Options) -> - Q = #q{ - max_size = couch_util:get_value(max_size, Options), - max_items = couch_util:get_value(max_items, Options), - multi_workers = couch_util:get_value(multi_workers, Options, false) - }, - {ok, Q}. - - -terminate(_Reason, #q{work_waiters=Workers}) -> - lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). - - -handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) -> - Q = Q0#q{size = Q0#q.size + byte_size(term_to_binary(Item)), - items = Q0#q.items + 1, - queue = queue:in(Item, Q0#q.queue)}, - case (Q#q.size >= Q#q.max_size) orelse - (Q#q.items >= Q#q.max_items) of - true -> - {noreply, Q#q{blocked = [From | Q#q.blocked]}}; - false -> - {reply, ok, Q} - end; - -handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> - gen_server:reply(W, {ok, [Item]}), - {reply, ok, Q#q{work_waiters = Rest}}; - -handle_call({dequeue, Max}, From, Q) -> - #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, - case {Workers, Multi} of - {[_ | _], false} -> - exit("Only one caller allowed to wait for this work at a time"); - {[_ | _], true} -> - {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; - _ -> - case Count of - 0 -> - {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; - C when C > 0 -> - deliver_queue_items(Max, Q) - end - end. - - -deliver_queue_items(Max, Q) -> - #q{ - queue = Queue, - items = Count, - close_on_dequeue = Close, - blocked = Blocked - } = Q, - case (Max =:= all) orelse (Max >= Count) of - false -> - {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []), - Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2}, - {reply, {ok, Items}, Q2}; - true -> - lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), - Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, - case Close of - false -> - {reply, {ok, queue:to_list(Queue)}, Q2}; - true -> - {stop, normal, {ok, queue:to_list(Queue)}, Q2} - end - end. - - -dequeue_items(0, Queue, Blocked, DequeuedAcc) -> - {lists:reverse(DequeuedAcc), Queue, Blocked}; - -dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) -> - {{value, Item}, Queue2} = queue:out(Queue), - case Blocked of - [] -> - Blocked2 = Blocked; - [From | Blocked2] -> - gen_server:reply(From, ok) - end, - dequeue_items(NumItems - 1, Queue2, Blocked2, [Item | DequeuedAcc]). - - -handle_cast(close, #q{items = 0} = Q) -> - {stop, normal, Q}; - -handle_cast(close, Q) -> - {noreply, Q#q{close_on_dequeue = true}}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_info(X, Q) -> - {stop, X, Q}. |