From ddad3391ba8ad611a9bdaaf689b408d44eec9cc6 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 11 Dec 2013 12:11:21 -0400 Subject: consume messages eagerly --- src/leap/mail/imap/server.py | 19 +++++++++++++------ src/leap/mail/imap/tests/test_imap.py | 24 +++++++++++++++++++++++- src/leap/mail/messageflow.py | 25 ++++++++++--------------- 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index 6320a51..73ec223 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -834,14 +834,19 @@ class SoledadDocWriter(object): """ self._soledad = soledad - def consume(self, item): + def consume(self, queue): """ Creates a new document in soledad db. - :param item: object to update. content of the document to be inserted. - :type item: dict + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue """ - self._soledad.create_doc(item) + empty = queue.empty() + while not empty: + item = queue.get() + self._soledad.create_doc(item) + empty = queue.empty() class MessageCollection(WithMsgFields, IndexedDB): @@ -911,7 +916,7 @@ class MessageCollection(WithMsgFields, IndexedDB): self._soledad_writer = MessageProducer( SoledadDocWriter(soledad), - period=0.2) + period=0.1) def _get_empty_msg(self): """ @@ -941,6 +946,7 @@ class MessageCollection(WithMsgFields, IndexedDB): :param uid: the message uid for this mailbox :type uid: int """ + logger.debug('adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) @@ -985,6 +991,7 @@ class MessageCollection(WithMsgFields, IndexedDB): # ...should get a sanity check here. content[self.UID_KEY] = uid + logger.debug('enqueuing message for write') self._soledad_writer.put(content) # XXX have to decide what shall we do with errors with this change... #return self._soledad.create_doc(content) @@ -1518,9 +1525,9 @@ class SoledadMailbox(WithMsgFields): """ if not self.isWriteable(): raise imap4.ReadOnlyMailbox - delete = [] deleted = [] + for m in self.messages.get_all(): if self.DELETED_FLAG in m.content[self.FLAGS_KEY]: delete.append(m) diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py index d78115e..9989989 100644 --- a/src/leap/mail/imap/tests/test_imap.py +++ b/src/leap/mail/imap/tests/test_imap.py @@ -394,6 +394,8 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): MessageCollection interface in this particular TestCase """ self.messages = MessageCollection("testmbox", self._soledad._db) + for m in self.messages.get_all(): + self.messages.remove(m) def tearDown(self): """ @@ -423,6 +425,22 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): }) self.assertEqual(self.messages.count(), 0) + def testMultipleAdd(self): + """ + Add multiple messages + """ + # XXX watch out! we're serializing with a delay... + mc = self.messages + self.assertEqual(self.messages.count(), 0) + mc.add_msg('Stuff', subject="test1") + self.assertEqual(self.messages.count(), 1) + mc.add_msg('Stuff', subject="test2") + self.assertEqual(self.messages.count(), 2) + mc.add_msg('Stuff', subject="test3") + self.assertEqual(self.messages.count(), 3) + mc.add_msg('Stuff', subject="test4") + self.assertEqual(self.messages.count(), 4) + def testFilterByMailbox(self): """ Test that queries filter by selected mailbox @@ -1265,8 +1283,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): m = SimpleLEAPServer.theAccount.getMailbox(name) m.messages.add_msg('', subject="Message 1", flags=('\\Deleted', 'AnotherFlag')) + self.failUnless(m.messages.count() == 1) m.messages.add_msg('', subject="Message 2", flags=('AnotherFlag',)) + self.failUnless(m.messages.count() == 2) m.messages.add_msg('', subject="Message 3", flags=('\\Deleted',)) + self.failUnless(m.messages.count() == 3) def login(): return self.client.login('testuser', 'password-test') @@ -1292,7 +1313,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): return d.addCallback(self._cbTestExpunge, m) def _cbTestExpunge(self, ignored, m): - self.assertEqual(len(m.messages), 1) + # we only left 1 mssage with no deleted flag + self.assertEqual(m.messages.count(), 1) self.assertEqual( m.messages[1].content['subject'], 'Message 2') diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 21f6d62..a0a571d 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -26,11 +26,11 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): - def consume(self, item): + def consume(self, queue): """ Consumes the passed item. - :param item: an object to be consumed. + :param item: q queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -44,11 +44,12 @@ class DummyMsgConsumer(object): implements(IMessageConsumer) - def consume(self, item): + def consume(self, queue): """ Just prints the passed item. """ - print "got item %s" % item + if not queue.empty(): + print "got item %s" % queue.get() class MessageProducer(object): @@ -97,14 +98,9 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - # XXX right now I'm assuming that the period is good enough to allow - # a right pace of processing. but we could also pass the queue object - # to the consumer and let it choose whether process a new item or not. - + self._consumer.consume(self._queue) if self._queue.empty(): self.stop() - else: - self._consumer.consume(self._queue.get()) # public methods @@ -114,20 +110,19 @@ class MessageProducer(object): If the queue was empty, we will start the loop again. """ - was_empty = self._queue.empty() - # XXX this might raise if the queue does not accept any new # items. what to do then? self._queue.put(item) - if was_empty: - self.start() + self.start() def start(self): """ Starts polling for new items. """ if not self._loop.running: - self._loop.start(self._period) + self._loop.start(self._period, now=True) + else: + print "was running..., not starting" def stop(self): """ -- cgit v1.2.3