diff options
author | Kali Kaneko <kali@leap.se> | 2013-12-11 12:11:21 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2013-12-11 13:22:56 -0400 |
commit | ddad3391ba8ad611a9bdaaf689b408d44eec9cc6 (patch) | |
tree | 897605e48a7765d08bdd3a8257f13669453d916f /src/leap/mail/messageflow.py | |
parent | 44b8f5eaaaeeacbb1f9ceca1231cb53ef13f16ab (diff) |
consume messages eagerly
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r-- | src/leap/mail/messageflow.py | 25 |
1 files changed, 10 insertions, 15 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 21f6d62..a0a571d 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -26,11 +26,11 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): - def consume(self, item): + def consume(self, queue): """ Consumes the passed item. - :param item: an object to be consumed. + :param item: q queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -44,11 +44,12 @@ class DummyMsgConsumer(object): implements(IMessageConsumer) - def consume(self, item): + def consume(self, queue): """ Just prints the passed item. """ - print "got item %s" % item + if not queue.empty(): + print "got item %s" % queue.get() class MessageProducer(object): @@ -97,14 +98,9 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - # XXX right now I'm assuming that the period is good enough to allow - # a right pace of processing. but we could also pass the queue object - # to the consumer and let it choose whether process a new item or not. - + self._consumer.consume(self._queue) if self._queue.empty(): self.stop() - else: - self._consumer.consume(self._queue.get()) # public methods @@ -114,20 +110,19 @@ class MessageProducer(object): If the queue was empty, we will start the loop again. """ - was_empty = self._queue.empty() - # XXX this might raise if the queue does not accept any new # items. what to do then? self._queue.put(item) - if was_empty: - self.start() + self.start() def start(self): """ Starts polling for new items. """ if not self._loop.running: - self._loop.start(self._period) + self._loop.start(self._period, now=True) + else: + print "was running..., not starting" def stop(self): """ |