summaryrefslogtreecommitdiff
path: root/src/leap/mail/messageflow.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r--src/leap/mail/messageflow.py25
1 files changed, 10 insertions, 15 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 21f6d62..a0a571d 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -26,11 +26,11 @@ from zope.interface import Interface, implements
class IMessageConsumer(Interface):
- def consume(self, item):
+ def consume(self, queue):
"""
Consumes the passed item.
- :param item: an object to be consumed.
+ :param item: q queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
@@ -44,11 +44,12 @@ class DummyMsgConsumer(object):
implements(IMessageConsumer)
- def consume(self, item):
+ def consume(self, queue):
"""
Just prints the passed item.
"""
- print "got item %s" % item
+ if not queue.empty():
+ print "got item %s" % queue.get()
class MessageProducer(object):
@@ -97,14 +98,9 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- # XXX right now I'm assuming that the period is good enough to allow
- # a right pace of processing. but we could also pass the queue object
- # to the consumer and let it choose whether process a new item or not.
-
+ self._consumer.consume(self._queue)
if self._queue.empty():
self.stop()
- else:
- self._consumer.consume(self._queue.get())
# public methods
@@ -114,20 +110,19 @@ class MessageProducer(object):
If the queue was empty, we will start the loop again.
"""
- was_empty = self._queue.empty()
-
# XXX this might raise if the queue does not accept any new
# items. what to do then?
self._queue.put(item)
- if was_empty:
- self.start()
+ self.start()
def start(self):
"""
Starts polling for new items.
"""
if not self._loop.running:
- self._loop.start(self._period)
+ self._loop.start(self._period, now=True)
+ else:
+ print "was running..., not starting"
def stop(self):
"""