# -*- coding: utf-8 -*- # test_imap.py # Copyright (C) 2013 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Test case for leap.email.imap.server TestCases taken from twisted tests and modified to make them work against SoledadBackedAccount. @authors: Kali Kaneko, XXX add authors from the original twisted tests. @license: GPLv3, see included LICENSE file """ # XXX review license of the original tests!!! try: from cStringIO import StringIO except ImportError: from StringIO import StringIO import os import types from twisted.mail import imap4 from twisted.internet import defer from twisted.trial import unittest from twisted.python import util from twisted.python import failure from twisted import cred # import u1db from leap.mail.imap.mailbox import SoledadMailbox from leap.mail.imap.memorystore import MemoryStore from leap.mail.imap.messages import MessageCollection from leap.mail.imap.server import LeapIMAPServer from leap.mail.imap.tests.utils import IMAP4HelperMixin TEST_USER = "testuser@leap.se" TEST_PASSWD = "1234" def strip(f): return lambda result, f=f: f() def sortNest(l): l = l[:] l.sort() for i in range(len(l)): if isinstance(l[i], types.ListType): l[i] = sortNest(l[i]) elif isinstance(l[i], types.TupleType): l[i] = tuple(sortNest(list(l[i]))) return l class TestRealm: """ A minimal auth realm for testing purposes only """ theAccount = None def requestAvatar(self, avatarId, mind, *interfaces): return imap4.IAccount, self.theAccount, lambda: None # # TestCases # class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): """ Tests for the MessageCollection class """ count = 0 def setUp(self): """ setUp method for each test We override mixin method since we are only testing MessageCollection interface in this particular TestCase """ super(MessageCollectionTestCase, self).setUp() memstore = MemoryStore() self.messages = MessageCollection("testmbox%s" % (self.count,), self._soledad, memstore=memstore) MessageCollectionTestCase.count += 1 def tearDown(self): """ tearDown method for each test """ del self.messages def testEmptyMessage(self): """ Test empty message and collection """ em = self.messages._get_empty_doc() self.assertEqual( em, { "chash": '', "deleted": False, "flags": [], "mbox": "inbox", "seen": False, "multi": False, "size": 0, "type": "flags", "uid": 1, }) self.assertEqual(self.messages.count(), 0) def testMultipleAdd(self): """ Add multiple messages """ mc = self.messages self.assertEqual(self.messages.count(), 0) def add_first(): d = defer.gatherResults([ mc.add_msg('Stuff 1', subject="test1"), mc.add_msg('Stuff 2', subject="test2"), mc.add_msg('Stuff 3', subject="test3"), mc.add_msg('Stuff 4', subject="test4")]) return d def add_second(result): d = defer.gatherResults([ mc.add_msg('Stuff 5', subject="test5"), mc.add_msg('Stuff 6', subject="test6"), mc.add_msg('Stuff 7', subject="test7")]) return d def check_second(result): return self.assertEqual(mc.count(), 7) d1 = add_first() d1.addCallback(add_second) d1.addCallback(check_second) def testRecentCount(self): """ Test the recent count """ mc = self.messages countrecent = mc.count_recent eq = self.assertEqual self.assertEqual(countrecent(), 0) d = mc.add_msg('Stuff', subject="test1") # For the semantics defined in the RFC, we auto-add the # recent flag by default. def add2(_): return mc.add_msg('Stuff', subject="test2", flags=('\\Deleted',)) def add3(_): return mc.add_msg('Stuff', subject="test3", flags=('\\Recent',)) def add4(_): return mc.add_msg('Stuff', subject="test4", flags=('\\Deleted', '\\Recent')) d.addCallback(lambda r: eq(countrecent(), 1)) d.addCallback(add2) d.addCallback(lambda r: eq(countrecent(), 2)) d.addCallback(add3) d.addCallback(lambda r: eq(countrecent(), 3)) d.addCallback(add4) d.addCallback(lambda r: eq(countrecent(), 4)) def testFilterByMailbox(self): """ Test that queries filter by selected mailbox """ mc = self.messages self.assertEqual(self.messages.count(), 0) def add_1(): d1 = mc.add_msg('msg 1', subject="test1") d2 = mc.add_msg('msg 2', subject="test2") d3 = mc.add_msg('msg 3', subject="test3") d = defer.gatherResults([d1, d2, d3]) return d add_1().addCallback(lambda ignored: self.assertEqual( mc.count(), 3)) # XXX this has to be redone to fit memstore ------------# #newmsg = mc._get_empty_doc() #newmsg['mailbox'] = "mailbox/foo" #mc._soledad.create_doc(newmsg) #self.assertEqual(mc.count(), 3) #self.assertEqual( #len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4) class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): # TODO this currently will use a memory-only store. # create a different one for testing soledad sync. """ Tests for the generic behavior of the LeapIMAP4Server which, right now, it's just implemented in this test file as LeapIMAPServer. We will move the implementation, together with authentication bits, to leap.mail.imap.server so it can be instantiated from the tac file. Right now this TestCase tries to mimmick as close as possible the organization from the twisted.mail.imap tests so we can achieve a complete implementation. The order in which they appear reflect the intended order of implementation. """ # # mailboxes operations # def testCreate(self): """ Test whether we can create mailboxes """ succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'foobox') fail = ('testbox', 'test/box') def cb(): self.result.append(1) def eb(failure): self.result.append(0) def login(): return self.client.login(TEST_USER, TEST_PASSWD) def create(): for name in succeed + fail: d = self.client.create(name) d.addCallback(strip(cb)).addErrback(eb) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.result = [] d1 = self.connected.addCallback(strip(login)).addCallback( strip(create)) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestCreate, succeed, fail) def _cbTestCreate(self, ignored, succeed, fail): self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail)) mboxes = list(LeapIMAPServer.theAccount.mailboxes) answers = ([u'INBOX', u'foobox', 'test', u'test/box', u'test/box/box', 'testbox']) self.assertEqual(mboxes, [a for a in answers]) def testDelete(self): """ Test whether we can delete mailboxes """ LeapIMAPServer.theAccount.addMailbox('delete/me') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('delete/me') d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(delete), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback( lambda _: self.assertEqual( LeapIMAPServer.theAccount.mailboxes, ['INBOX'])) return d def testIllegalInboxDelete(self): """ Test what happens if we try to delete the user Inbox. We expect that operation to fail. """ self.stashed = None def login(): return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('inbox') def stash(result): self.stashed = result d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(delete), self._ebGeneral) d1.addBoth(stash) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.failUnless(isinstance(self.stashed, failure.Failure))) return d def testNonExistentDelete(self): """ Test what happens if we try to delete a non-existent mailbox. We expect an error raised stating 'No such mailbox' """ def login(): return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('delete/me') self.failure = failure def deleteFailed(failure): self.failure = failure self.failure = None d1 = self.connected.addCallback(strip(login)) d1.addCallback(strip(delete)).addErrback(deleteFailed) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertTrue( str(self.failure.value).startswith('No such mailbox'))) return d def testIllegalDelete(self): """ Try deleting a mailbox with sub-folders, and \NoSelect flag set. An exception is expected. """ LeapIMAPServer.theAccount.addMailbox('delete') to_delete = LeapIMAPServer.theAccount.getMailbox('delete') to_delete.setFlags((r'\Noselect',)) to_delete.getFlags() LeapIMAPServer.theAccount.addMailbox('delete/me') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('delete') def deleteFailed(failure): self.failure = failure self.failure = None d1 = self.connected.addCallback(strip(login)) d1.addCallback(strip(delete)).addErrback(deleteFailed) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) expected = ("Hierarchically inferior mailboxes exist " "and \\Noselect is set") d.addCallback(lambda _: self.assertTrue(self.failure is not None)) d.addCallback(lambda _: self.assertEqual(str(self.failure.value), expected)) return d def testRename(self): """ Test whether we can rename a mailbox """ LeapIMAPServer.theAccount.addMailbox('oldmbox') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def rename(): return self.client.rename('oldmbox', 'newname') d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(rename), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( LeapIMAPServer.theAccount.mailboxes, ['INBOX', 'newname'])) return d def testIllegalInboxRename(self): """ Try to rename inbox. We expect it to fail. Then it would be not an inbox anymore, would it? """ self.stashed = None def login(): return self.client.login(TEST_USER, TEST_PASSWD) def rename(): return self.client.rename('inbox', 'frotz') def stash(stuff): self.stashed = stuff d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(rename), self._ebGeneral) d1.addBoth(stash) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.failUnless(isinstance( self.stashed, failure.Failure))) return d def testHierarchicalRename(self): """ Try to rename hierarchical mailboxes """ LeapIMAPServer.theAccount.create('oldmbox/m1') LeapIMAPServer.theAccount.create('oldmbox/m2') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def rename(): return self.client.rename('oldmbox', 'newname') d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(rename), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestHierarchicalRename) def _cbTestHierarchicalRename(self, ignored): mboxes = LeapIMAPServer.theAccount.mailboxes expected = ['INBOX', 'newname', 'newname/m1', 'newname/m2'] self.assertEqual(mboxes, [s for s in expected]) def testSubscribe(self): """ Test whether we can mark a mailbox as subscribed to """ def login(): return self.client.login(TEST_USER, TEST_PASSWD) def subscribe(): return self.client.subscribe('this/mbox') d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(subscribe), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( LeapIMAPServer.theAccount.subscriptions, ['this/mbox'])) return d def testUnsubscribe(self): """ Test whether we can unsubscribe from a set of mailboxes """ LeapIMAPServer.theAccount.subscribe('this/mbox') LeapIMAPServer.theAccount.subscribe('that/mbox') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def unsubscribe(): return self.client.unsubscribe('this/mbox') d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(unsubscribe), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( LeapIMAPServer.theAccount.subscriptions, ['that/mbox'])) return d def testSelect(self): """ Try to select a mailbox """ self.server.theAccount.addMailbox('TESTMAILBOX-SELECT', creation_ts=42) self.selectedArgs = None def login(): return self.client.login(TEST_USER, TEST_PASSWD) def select(): def selected(args): self.selectedArgs = args self._cbStopClient(None) d = self.client.select('TESTMAILBOX-SELECT') d.addCallback(selected) return d d1 = self.connected.addCallback(strip(login)) d1.addCallback(strip(select)) d1.addErrback(self._ebGeneral) d2 = self.loopback() return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect) def _cbTestSelect(self, ignored): mbox = LeapIMAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT') self.assertEqual(self.server.mbox.messages.mbox, mbox.messages.mbox) # XXX UIDVALIDITY should be "42" if the creation_ts is passed along # to the memory store. However, the current state of the account # implementation is incomplete and we're writing to soledad store # directly there. We should handle the UIDVALIDITY timestamping # mechanism in a separate test suite. self.assertEqual(self.selectedArgs, { 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 0, # 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42, 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged', '\\Deleted', '\\Draft', '\\Recent', 'List'), 'READ-WRITE': True }) # # capabilities # def testCapability(self): caps = {} def getCaps(): def gotCaps(c): caps.update(c) self.server.transport.loseConnection() return self.client.getCapabilities().addCallback(gotCaps) d1 = self.connected.addCallback( strip(getCaps)).addErrback(self._ebGeneral) d = defer.gatherResults([self.loopback(), d1]) expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'LITERAL+': None, 'IDLE': None} return d.addCallback(lambda _: self.assertEqual(expected, caps)) def testCapabilityWithAuth(self): caps = {} self.server.challengers[ 'CRAM-MD5'] = cred.credentials.CramMD5Credentials def getCaps(): def gotCaps(c): caps.update(c) self.server.transport.loseConnection() return self.client.getCapabilities().addCallback(gotCaps) d1 = self.connected.addCallback( strip(getCaps)).addErrback(self._ebGeneral) d = defer.gatherResults([self.loopback(), d1]) expCap = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None, 'LITERAL+': None, 'AUTH': ['CRAM-MD5']} return d.addCallback(lambda _: self.assertEqual(expCap, caps)) # # authentication # def testLogout(self): """ Test log out """ self.loggedOut = 0 def logout(): def setLoggedOut(): self.loggedOut = 1 self.client.logout().addCallback(strip(setLoggedOut)) self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral) d = self.loopback() return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1)) def testNoop(self): """ Test noop command """ self.responses = None def noop(): def setResponses(responses): self.responses = responses self.server.transport.loseConnection() self.client.noop().addCallback(setResponses) self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral) d = self.loopback() return d.addCallback(lambda _: self.assertEqual(self.responses, [])) def testLogin(self): """ Test login """ def login(): d = self.client.login(TEST_USER, TEST_PASSWD) d.addCallback(self._cbStopClient) d1 = self.connected.addCallback( strip(login)).addErrback(self._ebGeneral) d = defer.gatherResults([d1, self.loopback()]) return d.addCallback(self._cbTestLogin) def _cbTestLogin(self, ignored): self.assertEqual(self.server.account, LeapIMAPServer.theAccount) self.assertEqual(self.server.state, 'auth') def testFailedLogin(self): """ Test bad login """ def login(): d = self.client.login("bad_user@leap.se", TEST_PASSWD) d.addBoth(self._cbStopClient) d1 = self.connected.addCallback( strip(login)).addErrback(self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestFailedLogin) def _cbTestFailedLogin(self, ignored): self.assertEqual(self.server.state, 'unauth') self.assertEqual(self.server.account, None) def testLoginRequiringQuoting(self): """ Test login requiring quoting """ self.server._userid = '{test}user@leap.se' self.server._password = '{test}password' def login(): d = self.client.login('{test}user@leap.se', '{test}password') d.addBoth(self._cbStopClient) d1 = self.connected.addCallback( strip(login)).addErrback(self._ebGeneral) d = defer.gatherResults([self.loopback(), d1]) return d.addCallback(self._cbTestLoginRequiringQuoting) def _cbTestLoginRequiringQuoting(self, ignored): self.assertEqual(self.server.account, LeapIMAPServer.theAccount) self.assertEqual(self.server.state, 'auth') # # Inspection # def testNamespace(self): """ Test retrieving namespace """ self.namespaceArgs = None def login(): return self.client.login(TEST_USER, TEST_PASSWD) def namespace(): def gotNamespace(args): self.namespaceArgs = args self._cbStopClient(None) return self.client.namespace().addCallback(gotNamespace) d1 = self.connected.addCallback(strip(login)) d1.addCallback(strip(namespace)) d1.addErrback(self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual(self.namespaceArgs, [[['', '/']], [], []])) return d def testExamine(self): """ L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and returns a L{Deferred} which fires with a C{dict} with as many of the following keys as the server includes in its response: C{'FLAGS'}, C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'}, C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}. Unfortunately the server doesn't generate all of these so it's hard to test the client's handling of them here. See L{IMAP4ClientExamineTests} below. See U{RFC 3501}, section 6.3.2, for details. """ # TODO implement the IMAP4ClientExamineTests testcase. self.server.theAccount.addMailbox('test-mailbox-e', creation_ts=42) self.examinedArgs = None def login(): return self.client.login(TEST_USER, TEST_PASSWD) def examine(): def examined(args): self.examinedArgs = args self._cbStopClient(None) d = self.client.examine('test-mailbox-e') d.addCallback(examined) return d d1 = self.connected.addCallback(strip(login)) d1.addCallback(strip(examine)) d1.addErrback(self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestExamine) def _cbTestExamine(self, ignored): mbox = self.server.theAccount.getMailbox('test-mailbox-e') self.assertEqual(self.server.mbox.messages.mbox, mbox.messages.mbox) # XXX UIDVALIDITY should be "42" if the creation_ts is passed along # to the memory store. However, the current state of the account # implementation is incomplete and we're writing to soledad store # directly there. We should handle the UIDVALIDITY timestamping # mechanism in a separate test suite. self.assertEqual(self.examinedArgs, { 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 0, # 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42, 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged', '\\Deleted', '\\Draft', '\\Recent', 'List'), 'READ-WRITE': False}) def _listSetup(self, f): LeapIMAPServer.theAccount.addMailbox('root/subthingl', creation_ts=42) LeapIMAPServer.theAccount.addMailbox('root/another-thing', creation_ts=42) LeapIMAPServer.theAccount.addMailbox('non-root/subthing', creation_ts=42) def login(): return self.client.login(TEST_USER, TEST_PASSWD) def listed(answers): self.listed = answers self.listed = None d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(f), self._ebGeneral) d1.addCallbacks(listed, self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed) def testList(self): """ Test List command """ def list(): return self.client.list('root', '%') d = self._listSetup(list) d.addCallback(lambda listed: self.assertEqual( sortNest(listed), sortNest([ (SoledadMailbox.INIT_FLAGS, "/", "root/subthingl"), (SoledadMailbox.INIT_FLAGS, "/", "root/another-thing") ]) )) return d def testLSub(self): """ Test LSub command """ LeapIMAPServer.theAccount.subscribe('root/subthingl2') def lsub(): return self.client.lsub('root', '%') d = self._listSetup(lsub) d.addCallback(self.assertEqual, [(SoledadMailbox.INIT_FLAGS, "/", "root/subthingl2")]) return d def testStatus(self): """ Test Status command """ LeapIMAPServer.theAccount.addMailbox('root/subthings') # XXX FIXME ---- should populate this a little bit, # with unseen etc... def login(): return self.client.login(TEST_USER, TEST_PASSWD) def status(): return self.client.status( 'root/subthings', 'MESSAGES', 'UIDNEXT', 'UNSEEN') def statused(result): self.statused = result self.statused = None d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(status), self._ebGeneral) d1.addCallbacks(statused, self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( self.statused, {'MESSAGES': 0, 'UIDNEXT': '1', 'UNSEEN': 0} )) return d def testFailedStatus(self): """ Test failed status command with a non-existent mailbox """ def login(): return self.client.login(TEST_USER, TEST_PASSWD) def status(): return self.client.status( 'root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN') def statused(result): self.statused = result def failed(failure): self.failure = failure self.statused = self.failure = None d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(status), self._ebGeneral) d1.addCallbacks(statused, failed) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() return defer.gatherResults([d1, d2]).addCallback( self._cbTestFailedStatus) def _cbTestFailedStatus(self, ignored): self.assertEqual( self.statused, None ) self.assertEqual( self.failure.value.args, ('Could not open mailbox',) ) # # messages # def testFullAppend(self): """ Test appending a full message to the mailbox """ infile = util.sibpath(__file__, 'rfc822.message') message = open(infile) LeapIMAPServer.theAccount.addMailbox('root/subthing') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def append(): return self.client.append( 'root/subthing', message, ('\\SEEN', '\\DELETED'), 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', ) d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(append), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestFullAppend, infile) def _cbTestFullAppend(self, ignored, infile): mb = LeapIMAPServer.theAccount.getMailbox('root/subthing') self.assertEqual(1, len(mb.messages)) msg = mb.messages.get_msg_by_uid(1) self.assertEqual( set(('\\Recent', '\\SEEN', '\\DELETED')), set(msg.getFlags())) self.assertEqual( 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', msg.getInternalDate()) parsed = self.parser.parse(open(infile)) body = parsed.get_payload() headers = dict(parsed.items()) self.assertEqual( body, msg.getBodyFile().read()) gotheaders = msg.getHeaders(True) self.assertItemsEqual( headers, gotheaders) def testPartialAppend(self): """ Test partially appending a message to the mailbox """ infile = util.sibpath(__file__, 'rfc822.message') LeapIMAPServer.theAccount.addMailbox('PARTIAL/SUBTHING') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def append(): message = file(infile) return self.client.sendCommand( imap4.Command( 'APPEND', 'PARTIAL/SUBTHING (\\SEEN) "Right now" ' '{%d}' % os.path.getsize(infile), (), self.client._IMAP4Client__cbContinueAppend, message ) ) d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(append), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback( self._cbTestPartialAppend, infile) def _cbTestPartialAppend(self, ignored, infile): mb = LeapIMAPServer.theAccount.getMailbox('PARTIAL/SUBTHING') self.assertEqual(1, len(mb.messages)) msg = mb.messages.get_msg_by_uid(1) self.assertEqual( set(('\\SEEN', '\\Recent')), set(msg.getFlags()) ) parsed = self.parser.parse(open(infile)) body = parsed.get_payload() self.assertEqual( body, msg.getBodyFile().read()) def testCheck(self): """ Test check command """ LeapIMAPServer.theAccount.addMailbox('root/subthing') def login(): return self.client.login(TEST_USER, TEST_PASSWD) def select(): return self.client.select('root/subthing') def check(): return self.client.check() d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(select), self._ebGeneral) d.addCallbacks(strip(check), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) return self.loopback() # Okay, that was fun def testClose(self): """ Test closing the mailbox. We expect to get deleted all messages flagged as such. """ name = 'mailbox-close' self.server.theAccount.addMailbox(name) m = LeapIMAPServer.theAccount.getMailbox(name) def login(): return self.client.login(TEST_USER, TEST_PASSWD) def select(): return self.client.select(name) def add_messages(): d1 = m.messages.add_msg( 'test 1', subject="Message 1", flags=('\\Deleted', 'AnotherFlag')) d2 = m.messages.add_msg( 'test 2', subject="Message 2", flags=('AnotherFlag',)) d3 = m.messages.add_msg( 'test 3', subject="Message 3", flags=('\\Deleted',)) d = defer.gatherResults([d1, d2, d3]) return d def close(): return self.client.close() d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(select), self._ebGeneral) d.addCallbacks(strip(add_messages), self._ebGeneral) d.addCallbacks(strip(close), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m) def _cbTestClose(self, ignored, m): self.assertEqual(len(m.messages), 1) msg = m.messages.get_msg_by_uid(2) self.assertTrue(msg is not None) self.assertEqual( dict(msg.hdoc.content)['subject'], 'Message 2') self.failUnless(m.closed) def testExpunge(self): """ Test expunge command """ name = 'mailbox-expunge' self.server.theAccount.addMailbox(name) m = LeapIMAPServer.theAccount.getMailbox(name) def login(): return self.client.login(TEST_USER, TEST_PASSWD) def select(): return self.client.select('mailbox-expunge') def add_messages(): d1 = m.messages.add_msg( 'test 1', subject="Message 1", flags=('\\Deleted', 'AnotherFlag')) d2 = m.messages.add_msg( 'test 2', subject="Message 2", flags=('AnotherFlag',)) d3 = m.messages.add_msg( 'test 3', subject="Message 3", flags=('\\Deleted',)) d = defer.gatherResults([d1, d2, d3]) return d def expunge(): return self.client.expunge() def expunged(results): self.failIf(self.server.mbox is None) self.results = results self.results = None d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(select), self._ebGeneral) d1.addCallbacks(strip(add_messages), self._ebGeneral) d1.addCallbacks(strip(expunge), self._ebGeneral) d1.addCallbacks(expunged, self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestExpunge, m) def _cbTestExpunge(self, ignored, m): # we only left 1 mssage with no deleted flag self.assertEqual(len(m.messages), 1) msg = m.messages.get_msg_by_uid(2) msg = list(m.messages)[0] self.assertTrue(msg is not None) self.assertEqual( msg.hdoc.content['subject'], 'Message 2') # the uids of the deleted messages self.assertItemsEqual(self.results, [1, 3]) class AccountTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test the Account. """ def _create_empty_mailbox(self): LeapIMAPServer.theAccount.addMailbox('') def _create_one_mailbox(self): LeapIMAPServer.theAccount.addMailbox('one') def test_illegalMailboxCreate(self): self.assertRaises(AssertionError, self._create_empty_mailbox) class IMAP4ServerSearchTestCase(IMAP4HelperMixin, unittest.TestCase): """ Tests for the behavior of the search_* functions in L{imap5.IMAP4Server}. """ # XXX coming soon to your screens! pass