diff options
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r-- | src/leap/mail/messageflow.py | 39 |
1 files changed, 33 insertions, 6 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index ac26e45..ed6abcd 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -25,12 +25,15 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): + """ + I consume messages from a queue. + """ def consume(self, queue): """ Consumes the passed item. - :param item: q queue where we put the object to be consumed. + :param item: a queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -40,6 +43,28 @@ class IMessageConsumer(Interface): # the queue, maybe wrapped in an object with a retries attribute. +class IMessageProducer(Interface): + """ + I produce messages and put them in a store to be consumed by other + entities. + """ + + def push(self, item): + """ + Push a new item in the queue. + """ + + def start(self): + """ + Start producing items. + """ + + def stop(self): + """ + Stop producing items. + """ + + class DummyMsgConsumer(object): implements(IMessageConsumer) @@ -62,6 +87,8 @@ class MessageProducer(object): deferred chain and leave further processing detached from the calling loop, as in the case of smtp. """ + implements(IMessageProducer) + # TODO this can be seen as a first step towards properly implementing # components that implement IPushProducer / IConsumer interfaces. # However, I need to think more about how to pause the streaming. @@ -92,7 +119,7 @@ class MessageProducer(object): def _check_for_new(self): """ - Checks for new items in the internal queue, and calls the consume + Check for new items in the internal queue, and calls the consume method in the consumer. If the queue is found empty, the loop is stopped. It will be started @@ -102,11 +129,11 @@ class MessageProducer(object): if self._queue.empty(): self.stop() - # public methods + # public methods: IMessageProducer - def put(self, item): + def push(self, item): """ - Puts a new item in the queue. + Push a new item in the queue. If the queue was empty, we will start the loop again. """ @@ -117,7 +144,7 @@ class MessageProducer(object): def start(self): """ - Starts polling for new items. + Start polling for new items. """ if not self._loop.running: self._loop.start(self._period, now=True) |