diff options
author | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
---|---|---|
committer | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
commit | 74428b3176d286312f69e124d4d613c27a1ec93e (patch) | |
tree | eec0561e2184472dfedba3135a3d005efd478c34 /src/leap/mail/messageflow.py | |
parent | 781bd2f4d2a047088d1a0ecd673a38c80ea0c0c0 (diff) | |
parent | 23e28bae2c3cb74e00e29ee8add0b73adeb65c2b (diff) |
Merge remote-tracking branch 'kali/feature/in-memory-store' into develop
Diffstat (limited to 'src/leap/mail/messageflow.py')
-rw-r--r-- | src/leap/mail/messageflow.py | 47 |
1 files changed, 40 insertions, 7 deletions
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index ac26e45..b7fc030 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -25,12 +25,15 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): + """ + I consume messages from a queue. + """ def consume(self, queue): """ Consumes the passed item. - :param item: q queue where we put the object to be consumed. + :param item: a queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -40,6 +43,28 @@ class IMessageConsumer(Interface): # the queue, maybe wrapped in an object with a retries attribute. +class IMessageProducer(Interface): + """ + I produce messages and put them in a store to be consumed by other + entities. + """ + + def push(self, item): + """ + Push a new item in the queue. + """ + + def start(self): + """ + Start producing items. + """ + + def stop(self): + """ + Stop producing items. + """ + + class DummyMsgConsumer(object): implements(IMessageConsumer) @@ -62,6 +87,8 @@ class MessageProducer(object): deferred chain and leave further processing detached from the calling loop, as in the case of smtp. """ + implements(IMessageProducer) + # TODO this can be seen as a first step towards properly implementing # components that implement IPushProducer / IConsumer interfaces. # However, I need to think more about how to pause the streaming. @@ -92,21 +119,27 @@ class MessageProducer(object): def _check_for_new(self): """ - Checks for new items in the internal queue, and calls the consume + Check for new items in the internal queue, and calls the consume method in the consumer. If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ self._consumer.consume(self._queue) - if self._queue.empty(): + if self.is_queue_empty(): self.stop() - # public methods + def is_queue_empty(self): + """ + Return True if queue is empty, False otherwise. + """ + return self._queue.empty() + + # public methods: IMessageProducer - def put(self, item): + def push(self, item): """ - Puts a new item in the queue. + Push a new item in the queue. If the queue was empty, we will start the loop again. """ @@ -117,7 +150,7 @@ class MessageProducer(object): def start(self): """ - Starts polling for new items. + Start polling for new items. """ if not self._loop.running: self._loop.start(self._period, now=True) |