summaryrefslogtreecommitdiff
path: root/src/leap/mail/messageflow.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r--src/leap/mail/messageflow.py39
1 files changed, 33 insertions, 6 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index ac26e45..ed6abcd 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -25,12 +25,15 @@ from zope.interface import Interface, implements
class IMessageConsumer(Interface):
+ """
+ I consume messages from a queue.
+ """
def consume(self, queue):
"""
Consumes the passed item.
- :param item: q queue where we put the object to be consumed.
+ :param item: a queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
@@ -40,6 +43,28 @@ class IMessageConsumer(Interface):
# the queue, maybe wrapped in an object with a retries attribute.
+class IMessageProducer(Interface):
+ """
+ I produce messages and put them in a store to be consumed by other
+ entities.
+ """
+
+ def push(self, item):
+ """
+ Push a new item in the queue.
+ """
+
+ def start(self):
+ """
+ Start producing items.
+ """
+
+ def stop(self):
+ """
+ Stop producing items.
+ """
+
+
class DummyMsgConsumer(object):
implements(IMessageConsumer)
@@ -62,6 +87,8 @@ class MessageProducer(object):
deferred chain and leave further processing detached from the calling loop,
as in the case of smtp.
"""
+ implements(IMessageProducer)
+
# TODO this can be seen as a first step towards properly implementing
# components that implement IPushProducer / IConsumer interfaces.
# However, I need to think more about how to pause the streaming.
@@ -92,7 +119,7 @@ class MessageProducer(object):
def _check_for_new(self):
"""
- Checks for new items in the internal queue, and calls the consume
+ Check for new items in the internal queue, and calls the consume
method in the consumer.
If the queue is found empty, the loop is stopped. It will be started
@@ -102,11 +129,11 @@ class MessageProducer(object):
if self._queue.empty():
self.stop()
- # public methods
+ # public methods: IMessageProducer
- def put(self, item):
+ def push(self, item):
"""
- Puts a new item in the queue.
+ Push a new item in the queue.
If the queue was empty, we will start the loop again.
"""
@@ -117,7 +144,7 @@ class MessageProducer(object):
def start(self):
"""
- Starts polling for new items.
+ Start polling for new items.
"""
if not self._loop.running:
self._loop.start(self._period, now=True)