diff options
author | Kali Kaneko <kali@leap.se> | 2013-11-28 12:47:30 -0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2013-11-28 12:47:30 -0200 |
commit | f78bf4c8880de648ad84aa4b4946d922c8298388 (patch) | |
tree | d7c71039c97a1bfc27e625e2c00feb7b61d13253 /mail/src/leap | |
parent | 9fd987bb876171e419213bd710e4656ce86d73a6 (diff) |
add message producer and consumer
Diffstat (limited to 'mail/src/leap')
-rw-r--r-- | mail/src/leap/mail/messageflow.py | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py new file mode 100644 index 0000000..21f6d62 --- /dev/null +++ b/mail/src/leap/mail/messageflow.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# messageflow.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Message Producers and Consumers for flow control. +""" +import Queue + +from twisted.internet.task import LoopingCall + +from zope.interface import Interface, implements + + +class IMessageConsumer(Interface): + + def consume(self, item): + """ + Consumes the passed item. + + :param item: an object to be consumed. + :type item: object + """ + # TODO we could add an optional type to be passed + # for doing type check. + + # TODO in case of errors, we could return the object to + # the queue, maybe wrapped in an object with a retries attribute. + + +class DummyMsgConsumer(object): + + implements(IMessageConsumer) + + def consume(self, item): + """ + Just prints the passed item. + """ + print "got item %s" % item + + +class MessageProducer(object): + """ + A Producer class that we can use to temporarily buffer the production + of messages so that different objects can consume them. + + This is useful for serializing the consumption of the messages stream + in the case of an slow resource (db), or for returning early from a + deferred chain and leave further processing detached from the calling loop, + as in the case of smtp. + """ + # TODO this can be seen as a first step towards properly implementing + # components that implement IPushProducer / IConsumer interfaces. + # However, I need to think more about how to pause the streaming. + # In any case, the differential rate between message production + # and consumption is not likely (?) to consume huge amounts of memory in + # our current settings, so the need to pause the stream is not urgent now. + + def __init__(self, consumer, queue=Queue.Queue, period=1): + """ + Initializes the MessageProducer + + :param consumer: an instance of a IMessageConsumer that will consume + the new messages. + :param queue: any queue implementation to be used as the temporary + buffer for new items. Default is a FIFO Queue. + :param period: the period to check for new items, in seconds. + """ + # XXX should assert it implements IConsumer / IMailConsumer + # it should implement a `consume` method + self._consumer = consumer + + self._queue = queue() + self._period = period + + self._loop = LoopingCall(self._check_for_new) + + # private methods + + def _check_for_new(self): + """ + Checks for new items in the internal queue, and calls the consume + method in the consumer. + + If the queue is found empty, the loop is stopped. It will be started + again after the addition of new items. + """ + # XXX right now I'm assuming that the period is good enough to allow + # a right pace of processing. but we could also pass the queue object + # to the consumer and let it choose whether process a new item or not. + + if self._queue.empty(): + self.stop() + else: + self._consumer.consume(self._queue.get()) + + # public methods + + def put(self, item): + """ + Puts a new item in the queue. + + If the queue was empty, we will start the loop again. + """ + was_empty = self._queue.empty() + + # XXX this might raise if the queue does not accept any new + # items. what to do then? + self._queue.put(item) + if was_empty: + self.start() + + def start(self): + """ + Starts polling for new items. + """ + if not self._loop.running: + self._loop.start(self._period) + + def stop(self): + """ + Stop polling for new items. + """ + if self._loop.running: + self._loop.stop() + + +if __name__ == "__main__": + from twisted.internet import reactor + producer = MessageProducer(DummyMsgConsumer()) + producer.start() + + for delay, item in ((2, 1), (3, 2), (4, 3), + (6, 4), (7, 5), (8, 6), (8.2, 7), + (15, 'a'), (16, 'b'), (17, 'c')): + reactor.callLater(delay, producer.put, item) + reactor.run() |