summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2013-12-11 12:11:21 -0400
committerKali Kaneko <kali@leap.se>2013-12-11 13:22:56 -0400
commitddad3391ba8ad611a9bdaaf689b408d44eec9cc6 (patch)
tree897605e48a7765d08bdd3a8257f13669453d916f /src
parent44b8f5eaaaeeacbb1f9ceca1231cb53ef13f16ab (diff)
consume messages eagerly
Diffstat (limited to 'src')
-rw-r--r--src/leap/mail/imap/server.py19
-rw-r--r--src/leap/mail/imap/tests/test_imap.py24
-rw-r--r--src/leap/mail/messageflow.py25
3 files changed, 46 insertions, 22 deletions
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index 6320a51..73ec223 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -834,14 +834,19 @@ class SoledadDocWriter(object):
"""
self._soledad = soledad
- def consume(self, item):
+ def consume(self, queue):
"""
Creates a new document in soledad db.
- :param item: object to update. content of the document to be inserted.
- :type item: dict
+ :param queue: queue to get item from, with content of the document
+ to be inserted.
+ :type queue: Queue
"""
- self._soledad.create_doc(item)
+ empty = queue.empty()
+ while not empty:
+ item = queue.get()
+ self._soledad.create_doc(item)
+ empty = queue.empty()
class MessageCollection(WithMsgFields, IndexedDB):
@@ -911,7 +916,7 @@ class MessageCollection(WithMsgFields, IndexedDB):
self._soledad_writer = MessageProducer(
SoledadDocWriter(soledad),
- period=0.2)
+ period=0.1)
def _get_empty_msg(self):
"""
@@ -941,6 +946,7 @@ class MessageCollection(WithMsgFields, IndexedDB):
:param uid: the message uid for this mailbox
:type uid: int
"""
+ logger.debug('adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
@@ -985,6 +991,7 @@ class MessageCollection(WithMsgFields, IndexedDB):
# ...should get a sanity check here.
content[self.UID_KEY] = uid
+ logger.debug('enqueuing message for write')
self._soledad_writer.put(content)
# XXX have to decide what shall we do with errors with this change...
#return self._soledad.create_doc(content)
@@ -1518,9 +1525,9 @@ class SoledadMailbox(WithMsgFields):
"""
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
-
delete = []
deleted = []
+
for m in self.messages.get_all():
if self.DELETED_FLAG in m.content[self.FLAGS_KEY]:
delete.append(m)
diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py
index d78115e..9989989 100644
--- a/src/leap/mail/imap/tests/test_imap.py
+++ b/src/leap/mail/imap/tests/test_imap.py
@@ -394,6 +394,8 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
MessageCollection interface in this particular TestCase
"""
self.messages = MessageCollection("testmbox", self._soledad._db)
+ for m in self.messages.get_all():
+ self.messages.remove(m)
def tearDown(self):
"""
@@ -423,6 +425,22 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
})
self.assertEqual(self.messages.count(), 0)
+ def testMultipleAdd(self):
+ """
+ Add multiple messages
+ """
+ # XXX watch out! we're serializing with a delay...
+ mc = self.messages
+ self.assertEqual(self.messages.count(), 0)
+ mc.add_msg('Stuff', subject="test1")
+ self.assertEqual(self.messages.count(), 1)
+ mc.add_msg('Stuff', subject="test2")
+ self.assertEqual(self.messages.count(), 2)
+ mc.add_msg('Stuff', subject="test3")
+ self.assertEqual(self.messages.count(), 3)
+ mc.add_msg('Stuff', subject="test4")
+ self.assertEqual(self.messages.count(), 4)
+
def testFilterByMailbox(self):
"""
Test that queries filter by selected mailbox
@@ -1265,8 +1283,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
m = SimpleLEAPServer.theAccount.getMailbox(name)
m.messages.add_msg('', subject="Message 1",
flags=('\\Deleted', 'AnotherFlag'))
+ self.failUnless(m.messages.count() == 1)
m.messages.add_msg('', subject="Message 2", flags=('AnotherFlag',))
+ self.failUnless(m.messages.count() == 2)
m.messages.add_msg('', subject="Message 3", flags=('\\Deleted',))
+ self.failUnless(m.messages.count() == 3)
def login():
return self.client.login('testuser', 'password-test')
@@ -1292,7 +1313,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(self._cbTestExpunge, m)
def _cbTestExpunge(self, ignored, m):
- self.assertEqual(len(m.messages), 1)
+ # we only left 1 mssage with no deleted flag
+ self.assertEqual(m.messages.count(), 1)
self.assertEqual(
m.messages[1].content['subject'],
'Message 2')
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 21f6d62..a0a571d 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -26,11 +26,11 @@ from zope.interface import Interface, implements
class IMessageConsumer(Interface):
- def consume(self, item):
+ def consume(self, queue):
"""
Consumes the passed item.
- :param item: an object to be consumed.
+ :param item: q queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
@@ -44,11 +44,12 @@ class DummyMsgConsumer(object):
implements(IMessageConsumer)
- def consume(self, item):
+ def consume(self, queue):
"""
Just prints the passed item.
"""
- print "got item %s" % item
+ if not queue.empty():
+ print "got item %s" % queue.get()
class MessageProducer(object):
@@ -97,14 +98,9 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- # XXX right now I'm assuming that the period is good enough to allow
- # a right pace of processing. but we could also pass the queue object
- # to the consumer and let it choose whether process a new item or not.
-
+ self._consumer.consume(self._queue)
if self._queue.empty():
self.stop()
- else:
- self._consumer.consume(self._queue.get())
# public methods
@@ -114,20 +110,19 @@ class MessageProducer(object):
If the queue was empty, we will start the loop again.
"""
- was_empty = self._queue.empty()
-
# XXX this might raise if the queue does not accept any new
# items. what to do then?
self._queue.put(item)
- if was_empty:
- self.start()
+ self.start()
def start(self):
"""
Starts polling for new items.
"""
if not self._loop.running:
- self._loop.start(self._period)
+ self._loop.start(self._period, now=True)
+ else:
+ print "was running..., not starting"
def stop(self):
"""