summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/bug_4715_fix_message_adding1
-rw-r--r--src/leap/mail/imap/server.py19
-rw-r--r--src/leap/mail/imap/tests/test_imap.py69
-rw-r--r--src/leap/mail/messageflow.py25
4 files changed, 85 insertions, 29 deletions
diff --git a/changes/bug_4715_fix_message_adding b/changes/bug_4715_fix_message_adding
new file mode 100644
index 0000000..53b875c
--- /dev/null
+++ b/changes/bug_4715_fix_message_adding
@@ -0,0 +1 @@
+ o Soledad writer consumes messages eagerly. Fixes failing tests. Closes: #4715
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index 6320a51..73ec223 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -834,14 +834,19 @@ class SoledadDocWriter(object):
"""
self._soledad = soledad
- def consume(self, item):
+ def consume(self, queue):
"""
Creates a new document in soledad db.
- :param item: object to update. content of the document to be inserted.
- :type item: dict
+ :param queue: queue to get item from, with content of the document
+ to be inserted.
+ :type queue: Queue
"""
- self._soledad.create_doc(item)
+ empty = queue.empty()
+ while not empty:
+ item = queue.get()
+ self._soledad.create_doc(item)
+ empty = queue.empty()
class MessageCollection(WithMsgFields, IndexedDB):
@@ -911,7 +916,7 @@ class MessageCollection(WithMsgFields, IndexedDB):
self._soledad_writer = MessageProducer(
SoledadDocWriter(soledad),
- period=0.2)
+ period=0.1)
def _get_empty_msg(self):
"""
@@ -941,6 +946,7 @@ class MessageCollection(WithMsgFields, IndexedDB):
:param uid: the message uid for this mailbox
:type uid: int
"""
+ logger.debug('adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
@@ -985,6 +991,7 @@ class MessageCollection(WithMsgFields, IndexedDB):
# ...should get a sanity check here.
content[self.UID_KEY] = uid
+ logger.debug('enqueuing message for write')
self._soledad_writer.put(content)
# XXX have to decide what shall we do with errors with this change...
#return self._soledad.create_doc(content)
@@ -1518,9 +1525,9 @@ class SoledadMailbox(WithMsgFields):
"""
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
-
delete = []
deleted = []
+
for m in self.messages.get_all():
if self.DELETED_FLAG in m.content[self.FLAGS_KEY]:
delete.append(m)
diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py
index 7d26862..9989989 100644
--- a/src/leap/mail/imap/tests/test_imap.py
+++ b/src/leap/mail/imap/tests/test_imap.py
@@ -25,6 +25,7 @@ XXX add authors from the original twisted tests.
@license: GPLv3, see included LICENSE file
"""
# XXX review license of the original tests!!!
+from nose.twistedtools import deferred
try:
from cStringIO import StringIO
@@ -370,6 +371,7 @@ class IMAP4HelperMixin(BaseLeapTest):
self.client.transport.loseConnection()
self.server.transport.loseConnection()
log.err(failure, "Problem with %r" % (self.function,))
+ failure.trap(Exception)
def loopback(self):
return loopback.loopbackAsync(self.server, self.client)
@@ -392,6 +394,8 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
MessageCollection interface in this particular TestCase
"""
self.messages = MessageCollection("testmbox", self._soledad._db)
+ for m in self.messages.get_all():
+ self.messages.remove(m)
def tearDown(self):
"""
@@ -421,13 +425,32 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
})
self.assertEqual(self.messages.count(), 0)
+ def testMultipleAdd(self):
+ """
+ Add multiple messages
+ """
+ # XXX watch out! we're serializing with a delay...
+ mc = self.messages
+ self.assertEqual(self.messages.count(), 0)
+ mc.add_msg('Stuff', subject="test1")
+ self.assertEqual(self.messages.count(), 1)
+ mc.add_msg('Stuff', subject="test2")
+ self.assertEqual(self.messages.count(), 2)
+ mc.add_msg('Stuff', subject="test3")
+ self.assertEqual(self.messages.count(), 3)
+ mc.add_msg('Stuff', subject="test4")
+ self.assertEqual(self.messages.count(), 4)
+
def testFilterByMailbox(self):
"""
Test that queries filter by selected mailbox
"""
mc = self.messages
+ self.assertEqual(self.messages.count(), 0)
mc.add_msg('', subject="test1")
+ self.assertEqual(self.messages.count(), 1)
mc.add_msg('', subject="test2")
+ self.assertEqual(self.messages.count(), 2)
mc.add_msg('', subject="test3")
self.assertEqual(self.messages.count(), 3)
@@ -459,6 +482,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# mailboxes operations
#
+ @deferred(timeout=None)
def testCreate(self):
"""
Test whether we can create mailboxes
@@ -497,6 +521,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
answers.sort()
self.assertEqual(mbox, [a.upper() for a in answers])
+ @deferred(timeout=None)
def testDelete(self):
"""
Test whether we can delete mailboxes
@@ -545,6 +570,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
failure.Failure)))
return d
+ @deferred(timeout=None)
def testNonExistentDelete(self):
"""
Test what happens if we try to delete a non-existent mailbox.
@@ -570,6 +596,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
'No such mailbox'))
return d
+ @deferred(timeout=None)
def testIllegalDelete(self):
"""
Try deleting a mailbox with sub-folders, and \NoSelect flag set.
@@ -604,6 +631,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(str(self.failure.value), expected))
return d
+ @deferred(timeout=None)
def testRename(self):
"""
Test whether we can rename a mailbox
@@ -627,6 +655,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
['NEWNAME']))
return d
+ @deferred(timeout=None)
def testIllegalInboxRename(self):
"""
Try to rename inbox. We expect it to fail. Then it would be not
@@ -654,6 +683,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.stashed, failure.Failure)))
return d
+ @deferred(timeout=None)
def testHierarchicalRename(self):
"""
Try to rename hierarchical mailboxes
@@ -680,6 +710,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
mboxes.sort()
self.assertEqual(mboxes, [s.upper() for s in expected])
+ @deferred(timeout=None)
def testSubscribe(self):
"""
Test whether we can mark a mailbox as subscribed to
@@ -701,6 +732,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
['THIS/MBOX']))
return d
+ @deferred(timeout=None)
def testUnsubscribe(self):
"""
Test whether we can unsubscribe from a set of mailboxes
@@ -725,6 +757,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
['THAT/MBOX']))
return d
+ @deferred(timeout=None)
def testSelect(self):
"""
Try to select a mailbox
@@ -764,6 +797,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# capabilities
#
+ @deferred(timeout=None)
def testCapability(self):
caps = {}
@@ -779,6 +813,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(lambda _: self.assertEqual(expected, caps))
+ @deferred(timeout=None)
def testCapabilityWithAuth(self):
caps = {}
self.server.challengers[
@@ -803,6 +838,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# authentication
#
+ @deferred(timeout=None)
def testLogout(self):
"""
Test log out
@@ -817,6 +853,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d = self.loopback()
return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1))
+ @deferred(timeout=None)
def testNoop(self):
"""
Test noop command
@@ -832,6 +869,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d = self.loopback()
return d.addCallback(lambda _: self.assertEqual(self.responses, []))
+ @deferred(timeout=None)
def testLogin(self):
"""
Test login
@@ -848,6 +886,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(self.server.account, SimpleLEAPServer.theAccount)
self.assertEqual(self.server.state, 'auth')
+ @deferred(timeout=None)
def testFailedLogin(self):
"""
Test bad login
@@ -866,6 +905,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(self.server.account, None)
self.assertEqual(self.server.state, 'unauth')
+ @deferred(timeout=None)
def testLoginRequiringQuoting(self):
"""
Test login requiring quoting
@@ -890,6 +930,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# Inspection
#
+ @deferred(timeout=None)
def testNamespace(self):
"""
Test retrieving namespace
@@ -914,6 +955,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
[[['', '/']], [], []]))
return d
+ @deferred(timeout=None)
def testExamine(self):
"""
L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and
@@ -983,6 +1025,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d2 = self.loopback()
return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
+ @deferred(timeout=None)
def testList(self):
"""
Test List command
@@ -999,22 +1042,21 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
))
return d
- # XXX implement subscriptions
- '''
+ @deferred(timeout=None)
def testLSub(self):
"""
Test LSub command
"""
- SimpleLEAPServer.theAccount.subscribe('ROOT/SUBTHINGL')
+ SimpleLEAPServer.theAccount.subscribe('ROOT/SUBTHINGL2')
def lsub():
return self.client.lsub('root', '%')
d = self._listSetup(lsub)
d.addCallback(self.assertEqual,
- [(SoledadMailbox.INIT_FLAGS, "/", "ROOT/SUBTHINGL")])
+ [(SoledadMailbox.INIT_FLAGS, "/", "ROOT/SUBTHINGL2")])
return d
- '''
+ @deferred(timeout=None)
def testStatus(self):
"""
Test Status command
@@ -1046,6 +1088,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
))
return d
+ @deferred(timeout=None)
def testFailedStatus(self):
"""
Test failed status command with a non-existent mailbox
@@ -1085,6 +1128,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# messages
#
+ @deferred(timeout=None)
def testFullAppend(self):
"""
Test appending a full message to the mailbox
@@ -1125,6 +1169,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(open(infile).read(), mb.messages[1].content['raw'])
+ @deferred(timeout=None)
def testPartialAppend(self):
"""
Test partially appending a message to the mailbox
@@ -1151,10 +1196,12 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
d2 = self.loopback()
d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestPartialAppend, infile)
+ return d.addCallback(
+ self._cbTestPartialAppend, infile)
def _cbTestPartialAppend(self, ignored, infile):
mb = SimpleLEAPServer.theAccount.getMailbox('PARTIAL/SUBTHING')
+
self.assertEqual(1, len(mb.messages))
self.assertEqual(
['\\SEEN', ],
@@ -1164,6 +1211,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
'Right now', mb.messages[1].content['date'])
self.assertEqual(open(infile).read(), mb.messages[1].content['raw'])
+ @deferred(timeout=None)
def testCheck(self):
"""
Test check command
@@ -1187,6 +1235,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# Okay, that was fun
+ @deferred(timeout=None)
def testClose(self):
"""
Test closing the mailbox. We expect to get deleted all messages flagged
@@ -1222,9 +1271,9 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(
m.messages[1].content['subject'],
'Message 2')
-
self.failUnless(m.closed)
+ @deferred(timeout=None)
def testExpunge(self):
"""
Test expunge command
@@ -1234,8 +1283,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
m = SimpleLEAPServer.theAccount.getMailbox(name)
m.messages.add_msg('', subject="Message 1",
flags=('\\Deleted', 'AnotherFlag'))
+ self.failUnless(m.messages.count() == 1)
m.messages.add_msg('', subject="Message 2", flags=('AnotherFlag',))
+ self.failUnless(m.messages.count() == 2)
m.messages.add_msg('', subject="Message 3", flags=('\\Deleted',))
+ self.failUnless(m.messages.count() == 3)
def login():
return self.client.login('testuser', 'password-test')
@@ -1261,7 +1313,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(self._cbTestExpunge, m)
def _cbTestExpunge(self, ignored, m):
- self.assertEqual(len(m.messages), 1)
+ # we only left 1 mssage with no deleted flag
+ self.assertEqual(m.messages.count(), 1)
self.assertEqual(
m.messages[1].content['subject'],
'Message 2')
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 21f6d62..a0a571d 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -26,11 +26,11 @@ from zope.interface import Interface, implements
class IMessageConsumer(Interface):
- def consume(self, item):
+ def consume(self, queue):
"""
Consumes the passed item.
- :param item: an object to be consumed.
+ :param item: q queue where we put the object to be consumed.
:type item: object
"""
# TODO we could add an optional type to be passed
@@ -44,11 +44,12 @@ class DummyMsgConsumer(object):
implements(IMessageConsumer)
- def consume(self, item):
+ def consume(self, queue):
"""
Just prints the passed item.
"""
- print "got item %s" % item
+ if not queue.empty():
+ print "got item %s" % queue.get()
class MessageProducer(object):
@@ -97,14 +98,9 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- # XXX right now I'm assuming that the period is good enough to allow
- # a right pace of processing. but we could also pass the queue object
- # to the consumer and let it choose whether process a new item or not.
-
+ self._consumer.consume(self._queue)
if self._queue.empty():
self.stop()
- else:
- self._consumer.consume(self._queue.get())
# public methods
@@ -114,20 +110,19 @@ class MessageProducer(object):
If the queue was empty, we will start the loop again.
"""
- was_empty = self._queue.empty()
-
# XXX this might raise if the queue does not accept any new
# items. what to do then?
self._queue.put(item)
- if was_empty:
- self.start()
+ self.start()
def start(self):
"""
Starts polling for new items.
"""
if not self._loop.running:
- self._loop.start(self._period)
+ self._loop.start(self._period, now=True)
+ else:
+ print "was running..., not starting"
def stop(self):
"""