From ddad3391ba8ad611a9bdaaf689b408d44eec9cc6 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 11 Dec 2013 12:11:21 -0400 Subject: consume messages eagerly --- src/leap/mail/messageflow.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) (limited to 'src/leap/mail/messageflow.py') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 21f6d62..a0a571d 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -26,11 +26,11 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): - def consume(self, item): + def consume(self, queue): """ Consumes the passed item. - :param item: an object to be consumed. + :param item: q queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -44,11 +44,12 @@ class DummyMsgConsumer(object): implements(IMessageConsumer) - def consume(self, item): + def consume(self, queue): """ Just prints the passed item. """ - print "got item %s" % item + if not queue.empty(): + print "got item %s" % queue.get() class MessageProducer(object): @@ -97,14 +98,9 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - # XXX right now I'm assuming that the period is good enough to allow - # a right pace of processing. but we could also pass the queue object - # to the consumer and let it choose whether process a new item or not. - + self._consumer.consume(self._queue) if self._queue.empty(): self.stop() - else: - self._consumer.consume(self._queue.get()) # public methods @@ -114,20 +110,19 @@ class MessageProducer(object): If the queue was empty, we will start the loop again. """ - was_empty = self._queue.empty() - # XXX this might raise if the queue does not accept any new # items. what to do then? self._queue.put(item) - if was_empty: - self.start() + self.start() def start(self): """ Starts polling for new items. """ if not self._loop.running: - self._loop.start(self._period) + self._loop.start(self._period, now=True) + else: + print "was running..., not starting" def stop(self): """ -- cgit v1.2.3 From 2b53238ce5211bc23da8d1e8903335daa12ca02e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 14 Jan 2014 16:28:07 -0400 Subject: remove locks (moved to soledad client) --- src/leap/mail/messageflow.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/leap/mail/messageflow.py') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index a0a571d..ac26e45 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -121,8 +121,6 @@ class MessageProducer(object): """ if not self._loop.running: self._loop.start(self._period, now=True) - else: - print "was running..., not starting" def stop(self): """ -- cgit v1.2.3 From 4ae6ad57a0f80143e3ded867c1fdd2264804a775 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 21 Jan 2014 19:22:09 -0400 Subject: memory store for append/fetch/copy --- src/leap/mail/messageflow.py | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) (limited to 'src/leap/mail/messageflow.py') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index ac26e45..ed6abcd 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -25,12 +25,15 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): + """ + I consume messages from a queue. + """ def consume(self, queue): """ Consumes the passed item. - :param item: q queue where we put the object to be consumed. + :param item: a queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -40,6 +43,28 @@ class IMessageConsumer(Interface): # the queue, maybe wrapped in an object with a retries attribute. +class IMessageProducer(Interface): + """ + I produce messages and put them in a store to be consumed by other + entities. + """ + + def push(self, item): + """ + Push a new item in the queue. + """ + + def start(self): + """ + Start producing items. + """ + + def stop(self): + """ + Stop producing items. + """ + + class DummyMsgConsumer(object): implements(IMessageConsumer) @@ -62,6 +87,8 @@ class MessageProducer(object): deferred chain and leave further processing detached from the calling loop, as in the case of smtp. """ + implements(IMessageProducer) + # TODO this can be seen as a first step towards properly implementing # components that implement IPushProducer / IConsumer interfaces. # However, I need to think more about how to pause the streaming. @@ -92,7 +119,7 @@ class MessageProducer(object): def _check_for_new(self): """ - Checks for new items in the internal queue, and calls the consume + Check for new items in the internal queue, and calls the consume method in the consumer. If the queue is found empty, the loop is stopped. It will be started @@ -102,11 +129,11 @@ class MessageProducer(object): if self._queue.empty(): self.stop() - # public methods + # public methods: IMessageProducer - def put(self, item): + def push(self, item): """ - Puts a new item in the queue. + Push a new item in the queue. If the queue was empty, we will start the loop again. """ @@ -117,7 +144,7 @@ class MessageProducer(object): def start(self): """ - Starts polling for new items. + Start polling for new items. """ if not self._loop.running: self._loop.start(self._period, now=True) -- cgit v1.2.3 From e02db78b1b6d8fe021efd4adb250c64a1dd4bac4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 05:39:13 -0400 Subject: flags use the memstore * add new/dirty deferred dict to notify when written to disk * fix eventual duplication after copy * fix flag flickering on first retrieval. --- src/leap/mail/messageflow.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'src/leap/mail/messageflow.py') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index ed6abcd..b7fc030 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -126,9 +126,15 @@ class MessageProducer(object): again after the addition of new items. """ self._consumer.consume(self._queue) - if self._queue.empty(): + if self.is_queue_empty(): self.stop() + def is_queue_empty(self): + """ + Return True if queue is empty, False otherwise. + """ + return self._queue.empty() + # public methods: IMessageProducer def push(self, item): -- cgit v1.2.3 From 362aaec0897261973e58b4282f5c054985d1f113 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 6 Feb 2014 15:46:01 -0200 Subject: Flush IMAP data to disk when stopping. Closes #5095. --- src/leap/mail/messageflow.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src/leap/mail/messageflow.py') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index b7fc030..80121c8 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -64,6 +64,11 @@ class IMessageProducer(Interface): Stop producing items. """ + def flush(self): + """ + Flush queued messages to consumer. + """ + class DummyMsgConsumer(object): @@ -162,6 +167,12 @@ class MessageProducer(object): if self._loop.running: self._loop.stop() + def flush(self): + """ + Flush queued messages to consumer. + """ + self._check_for_new() + if __name__ == "__main__": from twisted.internet import reactor -- cgit v1.2.3 From 4338368aa2ba0efaee742e9000e21b81af34d3db Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 11 Feb 2014 01:43:14 -0400 Subject: separate new and dirty queues --- src/leap/mail/messageflow.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) (limited to 'src/leap/mail/messageflow.py') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 80121c8..c8f224c 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -49,7 +49,7 @@ class IMessageProducer(Interface): entities. """ - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. """ @@ -101,6 +101,10 @@ class MessageProducer(object): # and consumption is not likely (?) to consume huge amounts of memory in # our current settings, so the need to pause the stream is not urgent now. + # TODO use enum + STATE_NEW = 1 + STATE_DIRTY = 2 + def __init__(self, consumer, queue=Queue.Queue, period=1): """ Initializes the MessageProducer @@ -115,7 +119,8 @@ class MessageProducer(object): # it should implement a `consume` method self._consumer = consumer - self._queue = queue() + self._queue_new = queue() + self._queue_dirty = queue() self._period = period self._loop = LoopingCall(self._check_for_new) @@ -130,7 +135,7 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - self._consumer.consume(self._queue) + self._consumer.consume((self._queue_new, self._queue_dirty)) if self.is_queue_empty(): self.stop() @@ -138,11 +143,13 @@ class MessageProducer(object): """ Return True if queue is empty, False otherwise. """ - return self._queue.empty() + new = self._queue_new + dirty = self._queue_dirty + return new.empty() and dirty.empty() # public methods: IMessageProducer - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. @@ -150,7 +157,14 @@ class MessageProducer(object): """ # XXX this might raise if the queue does not accept any new # items. what to do then? - self._queue.put(item) + queue = self._queue_new + + if state == self.STATE_NEW: + queue = self._queue_new + if state == self.STATE_DIRTY: + queue = self._queue_dirty + + queue.put(item) self.start() def start(self): -- cgit v1.2.3