summaryrefslogtreecommitdiff
path: root/src/leap/mail/messageflow.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r--src/leap/mail/messageflow.py97
1 files changed, 74 insertions, 23 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 21f6d62..c8f224c 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -25,12 +25,15 @@ from zope.interface import Interface, implements
class IMessageConsumer(Interface):
+ """
+ I consume messages from a queue.
+ """
- def consume(self, item):
+ def consume(self, queue):
"""
Consumes the passed item.
- :param item: an object to be consumed.
+ :param item: a queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
@@ -40,15 +43,43 @@ class IMessageConsumer(Interface):
# the queue, maybe wrapped in an object with a retries attribute.
+class IMessageProducer(Interface):
+ """
+ I produce messages and put them in a store to be consumed by other
+ entities.
+ """
+
+ def push(self, item, state=None):
+ """
+ Push a new item in the queue.
+ """
+
+ def start(self):
+ """
+ Start producing items.
+ """
+
+ def stop(self):
+ """
+ Stop producing items.
+ """
+
+ def flush(self):
+ """
+ Flush queued messages to consumer.
+ """
+
+
class DummyMsgConsumer(object):
implements(IMessageConsumer)
- def consume(self, item):
+ def consume(self, queue):
"""
Just prints the passed item.
"""
- print "got item %s" % item
+ if not queue.empty():
+ print "got item %s" % queue.get()
class MessageProducer(object):
@@ -61,6 +92,8 @@ class MessageProducer(object):
deferred chain and leave further processing detached from the calling loop,
as in the case of smtp.
"""
+ implements(IMessageProducer)
+
# TODO this can be seen as a first step towards properly implementing
# components that implement IPushProducer / IConsumer interfaces.
# However, I need to think more about how to pause the streaming.
@@ -68,6 +101,10 @@ class MessageProducer(object):
# and consumption is not likely (?) to consume huge amounts of memory in
# our current settings, so the need to pause the stream is not urgent now.
+ # TODO use enum
+ STATE_NEW = 1
+ STATE_DIRTY = 2
+
def __init__(self, consumer, queue=Queue.Queue, period=1):
"""
Initializes the MessageProducer
@@ -82,7 +119,8 @@ class MessageProducer(object):
# it should implement a `consume` method
self._consumer = consumer
- self._queue = queue()
+ self._queue_new = queue()
+ self._queue_dirty = queue()
self._period = period
self._loop = LoopingCall(self._check_for_new)
@@ -91,43 +129,50 @@ class MessageProducer(object):
def _check_for_new(self):
"""
- Checks for new items in the internal queue, and calls the consume
+ Check for new items in the internal queue, and calls the consume
method in the consumer.
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- # XXX right now I'm assuming that the period is good enough to allow
- # a right pace of processing. but we could also pass the queue object
- # to the consumer and let it choose whether process a new item or not.
-
- if self._queue.empty():
+ self._consumer.consume((self._queue_new, self._queue_dirty))
+ if self.is_queue_empty():
self.stop()
- else:
- self._consumer.consume(self._queue.get())
- # public methods
+ def is_queue_empty(self):
+ """
+ Return True if queue is empty, False otherwise.
+ """
+ new = self._queue_new
+ dirty = self._queue_dirty
+ return new.empty() and dirty.empty()
+
+ # public methods: IMessageProducer
- def put(self, item):
+ def push(self, item, state=None):
"""
- Puts a new item in the queue.
+ Push a new item in the queue.
If the queue was empty, we will start the loop again.
"""
- was_empty = self._queue.empty()
-
# XXX this might raise if the queue does not accept any new
# items. what to do then?
- self._queue.put(item)
- if was_empty:
- self.start()
+ queue = self._queue_new
+
+ if state == self.STATE_NEW:
+ queue = self._queue_new
+ if state == self.STATE_DIRTY:
+ queue = self._queue_dirty
+
+ queue.put(item)
+ self.start()
def start(self):
"""
- Starts polling for new items.
+ Start polling for new items.
"""
if not self._loop.running:
- self._loop.start(self._period)
+ self._loop.start(self._period, now=True)
def stop(self):
"""
@@ -136,6 +181,12 @@ class MessageProducer(object):
if self._loop.running:
self._loop.stop()
+ def flush(self):
+ """
+ Flush queued messages to consumer.
+ """
+ self._check_for_new()
+
if __name__ == "__main__":
from twisted.internet import reactor