summaryrefslogtreecommitdiff
path: root/tests/integration/mail/imap
diff options
context:
space:
mode:
authorKali Kaneko (leap communications) <kali@leap.se>2016-09-01 00:06:52 -0400
committerKali Kaneko (leap communications) <kali@leap.se>2016-09-01 00:06:52 -0400
commitf826bc473a0c50fcf55f4e8609aa07622814f902 (patch)
tree32665c6608c536c3b3db5b3fa504567043171c91 /tests/integration/mail/imap
parentc74c51f9fc753c6a870f7c14d5fdd10b152e0991 (diff)
[tests] move tests to root folder
Diffstat (limited to 'tests/integration/mail/imap')
-rw-r--r--tests/integration/mail/imap/.gitignore1
-rwxr-xr-xtests/integration/mail/imap/getmail344
-rwxr-xr-xtests/integration/mail/imap/imapclient.py207
-rwxr-xr-xtests/integration/mail/imap/regressions_mime_struct461
l---------tests/integration/mail/imap/rfc822.message1
l---------tests/integration/mail/imap/rfc822.multi-minimal.message1
l---------tests/integration/mail/imap/rfc822.multi-nested.message1
l---------tests/integration/mail/imap/rfc822.multi-signed.message1
l---------tests/integration/mail/imap/rfc822.multi.message1
l---------tests/integration/mail/imap/rfc822.plain.message1
-rwxr-xr-xtests/integration/mail/imap/stress_tests_imap.zsh178
-rw-r--r--tests/integration/mail/imap/test_imap.py1062
-rw-r--r--tests/integration/mail/imap/walktree.py127
13 files changed, 2386 insertions, 0 deletions
diff --git a/tests/integration/mail/imap/.gitignore b/tests/integration/mail/imap/.gitignore
new file mode 100644
index 00000000..60baa9cb
--- /dev/null
+++ b/tests/integration/mail/imap/.gitignore
@@ -0,0 +1 @@
+data/*
diff --git a/tests/integration/mail/imap/getmail b/tests/integration/mail/imap/getmail
new file mode 100755
index 00000000..dd3fa0bb
--- /dev/null
+++ b/tests/integration/mail/imap/getmail
@@ -0,0 +1,344 @@
+#!/usr/bin/env python
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE in twisted for details.
+
+# Modifications by LEAP Developers 2014 to fit
+# Bitmask configuration settings.
+"""
+Simple IMAP4 client which displays the subjects of all messages in a
+particular mailbox.
+"""
+
+import os
+import sys
+
+from twisted.internet import protocol
+from twisted.internet import ssl
+from twisted.internet import defer
+from twisted.internet import stdio
+from twisted.mail import imap4
+from twisted.protocols import basic
+from twisted.python import log
+
+# Global options stored here from main
+_opts = {}
+
+
+class TrivialPrompter(basic.LineReceiver):
+ from os import linesep as delimiter
+
+ promptDeferred = None
+
+ def prompt(self, msg):
+ assert self.promptDeferred is None
+ self.display(msg)
+ self.promptDeferred = defer.Deferred()
+ return self.promptDeferred
+
+ def display(self, msg):
+ self.transport.write(msg)
+
+ def lineReceived(self, line):
+ if self.promptDeferred is None:
+ return
+ d, self.promptDeferred = self.promptDeferred, None
+ d.callback(line)
+
+
+class SimpleIMAP4Client(imap4.IMAP4Client):
+ """
+ A client with callbacks for greeting messages from an IMAP server.
+ """
+ greetDeferred = None
+
+ def serverGreeting(self, caps):
+ self.serverCapabilities = caps
+ if self.greetDeferred is not None:
+ d, self.greetDeferred = self.greetDeferred, None
+ d.callback(self)
+
+
+class SimpleIMAP4ClientFactory(protocol.ClientFactory):
+ usedUp = False
+
+ protocol = SimpleIMAP4Client
+
+ def __init__(self, username, onConn):
+ self.ctx = ssl.ClientContextFactory()
+
+ self.username = username
+ self.onConn = onConn
+
+ def buildProtocol(self, addr):
+ """
+ Initiate the protocol instance. Since we are building a simple IMAP
+ client, we don't bother checking what capabilities the server has. We
+ just add all the authenticators twisted.mail has.
+ """
+ assert not self.usedUp
+ self.usedUp = True
+
+ p = self.protocol(self.ctx)
+ p.factory = self
+ p.greetDeferred = self.onConn
+
+ p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
+ p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
+ p.registerAuthenticator(
+ imap4.CramMD5ClientAuthenticator(self.username))
+
+ return p
+
+ def clientConnectionFailed(self, connector, reason):
+ d, self.onConn = self.onConn, None
+ d.errback(reason)
+
+
+def cbServerGreeting(proto, username, password):
+ """
+ Initial callback - invoked after the server sends us its greet message.
+ """
+ # Hook up stdio
+ tp = TrivialPrompter()
+ stdio.StandardIO(tp)
+
+ # And make it easily accessible
+ proto.prompt = tp.prompt
+ proto.display = tp.display
+
+ # Try to authenticate securely
+ return proto.authenticate(
+ password).addCallback(
+ cbAuthentication,
+ proto).addErrback(
+ ebAuthentication, proto, username, password
+ )
+
+
+def ebConnection(reason):
+ """
+ Fallback error-handler. If anything goes wrong, log it and quit.
+ """
+ log.startLogging(sys.stdout)
+ log.err(reason)
+ return reason
+
+
+def cbAuthentication(result, proto):
+ """
+ Callback after authentication has succeeded.
+
+ Lists a bunch of mailboxes.
+ """
+ return proto.list("", "*"
+ ).addCallback(cbMailboxList, proto
+ )
+
+
+def ebAuthentication(failure, proto, username, password):
+ """
+ Errback invoked when authentication fails.
+
+ If it failed because no SASL mechanisms match, offer the user the choice
+ of logging in insecurely.
+
+ If you are trying to connect to your Gmail account, you will be here!
+ """
+ failure.trap(imap4.NoSupportedAuthentication)
+ return InsecureLogin(proto, username, password)
+
+
+def InsecureLogin(proto, username, password):
+ """
+ insecure-login.
+ """
+ return proto.login(username, password
+ ).addCallback(cbAuthentication, proto
+ )
+
+
+def cbMailboxList(result, proto):
+ """
+ Callback invoked when a list of mailboxes has been retrieved.
+ If we have a selected mailbox in the global options, we directly pick it.
+ Otherwise, we offer a prompt to let user choose one.
+ """
+ all_mbox_list = [e[2] for e in result]
+ s = '\n'.join(['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(all_mbox_list)), all_mbox_list)])
+ if not s:
+ return defer.fail(Exception("No mailboxes exist on server!"))
+
+ selected_mailbox = _opts.get('mailbox')
+
+ if not selected_mailbox:
+ return proto.prompt(s + "\nWhich mailbox? [1] "
+ ).addCallback(cbPickMailbox, proto, all_mbox_list
+ )
+ else:
+ mboxes_lower = map(lambda s: s.lower(), all_mbox_list)
+ index = mboxes_lower.index(selected_mailbox.lower()) + 1
+ return cbPickMailbox(index, proto, all_mbox_list)
+
+
+def cbPickMailbox(result, proto, mboxes):
+ """
+ When the user selects a mailbox, "examine" it.
+ """
+ mbox = mboxes[int(result or '1') - 1]
+ return proto.examine(mbox
+ ).addCallback(cbExamineMbox, proto
+ )
+
+
+def cbExamineMbox(result, proto):
+ """
+ Callback invoked when examine command completes.
+
+ Retrieve the subject header of every message in the mailbox.
+ """
+ return proto.fetchSpecific('1:*',
+ headerType='HEADER.FIELDS',
+ headerArgs=['SUBJECT'],
+ ).addCallback(cbFetch, proto,
+ )
+
+
+def cbFetch(result, proto):
+ """
+ Display a listing of the messages in the mailbox, based on the collected
+ headers.
+ """
+ selected_subject = _opts.get('subject', None)
+ index = None
+
+ if result:
+ keys = result.keys()
+ keys.sort()
+
+ if selected_subject:
+ for k in keys:
+ # remove 'Subject: ' preffix plus eol
+ subject = result[k][0][2][9:].rstrip('\r\n')
+ if subject.lower() == selected_subject.lower():
+ index = k
+ break
+ else:
+ for k in keys:
+ proto.display('%s %s' % (k, result[k][0][2]))
+ else:
+ print "Hey, an empty mailbox!"
+
+ if not index:
+ return proto.prompt("\nWhich message? [1] (Q quits) "
+ ).addCallback(cbPickMessage, proto)
+ else:
+ return cbPickMessage(index, proto)
+
+
+def cbPickMessage(result, proto):
+ """
+ Pick a message.
+ """
+ if result == "Q":
+ print "Bye!"
+ return proto.logout()
+
+ return proto.fetchSpecific(
+ '%s' % result,
+ headerType='',
+ headerArgs=['BODY.PEEK[]'],
+ ).addCallback(cbShowmessage, proto)
+
+
+def cbShowmessage(result, proto):
+ """
+ Display message.
+ """
+ if result:
+ keys = result.keys()
+ keys.sort()
+ for k in keys:
+ proto.display('%s %s' % (k, result[k][0][2]))
+ else:
+ print "Hey, an empty message!"
+
+ return proto.logout()
+
+
+def cbClose(result):
+ """
+ Close the connection when we finish everything.
+ """
+ from twisted.internet import reactor
+ reactor.stop()
+
+
+def main():
+ import argparse
+ import ConfigParser
+ import sys
+ from twisted.internet import reactor
+
+ description = (
+ 'Get messages from a LEAP IMAP Proxy.\nThis is a '
+ 'debugging tool, do not use this to retrieve any sensitive '
+ 'information, or we will send ninjas to your house!')
+ epilog = (
+ 'In case you want to automate the usage of this utility '
+ 'you can place your credentials in a file pointed by '
+ 'BITMASK_CREDENTIALS. You need to have a [Credentials] '
+ 'section, with username=<user@provider> and password fields')
+
+ parser = argparse.ArgumentParser(description=description, epilog=epilog)
+ credentials = os.environ.get('BITMASK_CREDENTIALS')
+
+ if credentials:
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read(credentials)
+ username = config.get('Credentials', 'username')
+ password = config.get('Credentials', 'password')
+ except Exception, e:
+ print "Error reading credentials file: {0}".format(e)
+ sys.exit()
+ else:
+ parser.add_argument('username', type=str)
+ parser.add_argument('password', type=str)
+
+ parser.add_argument('--mailbox', dest='mailbox', default=None,
+ help='Which mailbox to retrieve. Empty for interactive prompt.')
+ parser.add_argument('--subject', dest='subject', default=None,
+ help='A subject for retrieve a mail that matches. Empty for interactive prompt.')
+
+ ns = parser.parse_args()
+
+ if not credentials:
+ username = ns.username
+ password = ns.password
+
+ _opts['mailbox'] = ns.mailbox
+ _opts['subject'] = ns.subject
+
+ hostname = "localhost"
+ port = "1984"
+
+ onConn = defer.Deferred(
+ ).addCallback(cbServerGreeting, username, password
+ ).addErrback(ebConnection
+ ).addBoth(cbClose)
+
+ factory = SimpleIMAP4ClientFactory(username, onConn)
+
+ if port == '993':
+ reactor.connectSSL(
+ hostname, int(port), factory, ssl.ClientContextFactory())
+ else:
+ if not port:
+ port = 143
+ reactor.connectTCP(hostname, int(port), factory)
+ reactor.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/integration/mail/imap/imapclient.py b/tests/integration/mail/imap/imapclient.py
new file mode 100755
index 00000000..c353ceed
--- /dev/null
+++ b/tests/integration/mail/imap/imapclient.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+"""
+Simple IMAP4 client which connects to our custome
+IMAP4 server: imapserver.py.
+"""
+
+import sys
+
+from twisted.internet import protocol
+from twisted.internet import defer
+from twisted.internet import stdio
+from twisted.mail import imap4
+from twisted.protocols import basic
+from twisted.python import util
+from twisted.python import log
+
+
+class TrivialPrompter(basic.LineReceiver):
+ # from os import linesep as delimiter
+
+ promptDeferred = None
+
+ def prompt(self, msg):
+ assert self.promptDeferred is None
+ self.display(msg)
+ self.promptDeferred = defer.Deferred()
+ return self.promptDeferred
+
+ def display(self, msg):
+ self.transport.write(msg)
+
+ def lineReceived(self, line):
+ if self.promptDeferred is None:
+ return
+ d, self.promptDeferred = self.promptDeferred, None
+ d.callback(line)
+
+
+class SimpleIMAP4Client(imap4.IMAP4Client):
+
+ """
+ Add callbacks when the client receives greeting messages from
+ an IMAP server.
+ """
+ greetDeferred = None
+
+ def serverGreeting(self, caps):
+ self.serverCapabilities = caps
+ if self.greetDeferred is not None:
+ d, self.greetDeferred = self.greetDeferred, None
+ d.callback(self)
+
+
+class SimpleIMAP4ClientFactory(protocol.ClientFactory):
+ usedUp = False
+ protocol = SimpleIMAP4Client
+
+ def __init__(self, username, onConn):
+ self.username = username
+ self.onConn = onConn
+
+ def buildProtocol(self, addr):
+ assert not self.usedUp
+ self.usedUp = True
+
+ p = self.protocol()
+ p.factory = self
+ p.greetDeferred = self.onConn
+
+ p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
+ p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
+ p.registerAuthenticator(
+ imap4.CramMD5ClientAuthenticator(self.username))
+
+ return p
+
+ def clientConnectionFailed(self, connector, reason):
+ d, self.onConn = self.onConn, None
+ d.errback(reason)
+
+
+def cbServerGreeting(proto, username, password):
+ """
+ Initial callback - invoked after the server sends us its greet message.
+ """
+ # Hook up stdio
+ tp = TrivialPrompter()
+ stdio.StandardIO(tp)
+
+ # And make it easily accessible
+ proto.prompt = tp.prompt
+ proto.display = tp.display
+
+ # Try to authenticate securely
+ return proto.authenticate(
+ password).addCallback(
+ cbAuthentication, proto).addErrback(
+ ebAuthentication, proto, username, password)
+
+
+def ebConnection(reason):
+ """
+ Fallback error-handler. If anything goes wrong, log it and quit.
+ """
+ log.startLogging(sys.stdout)
+ log.err(reason)
+ return reason
+
+
+def cbAuthentication(result, proto):
+ """
+ Callback after authentication has succeeded.
+ List a bunch of mailboxes.
+ """
+ return proto.list("", "*"
+ ).addCallback(cbMailboxList, proto
+ )
+
+
+def ebAuthentication(failure, proto, username, password):
+ """
+ Errback invoked when authentication fails.
+ If it failed because no SASL mechanisms match, offer the user the choice
+ of logging in insecurely.
+ If you are trying to connect to your Gmail account, you will be here!
+ """
+ failure.trap(imap4.NoSupportedAuthentication)
+ return proto.prompt(
+ "No secure authentication available. Login insecurely? (y/N) "
+ ).addCallback(cbInsecureLogin, proto, username, password
+ )
+
+
+def cbInsecureLogin(result, proto, username, password):
+ """
+ Callback for "insecure-login" prompt.
+ """
+ if result.lower() == "y":
+ # If they said yes, do it.
+ return proto.login(username, password
+ ).addCallback(cbAuthentication, proto
+ )
+ return defer.fail(Exception("Login failed for security reasons."))
+
+
+def cbMailboxList(result, proto):
+ """
+ Callback invoked when a list of mailboxes has been retrieved.
+ """
+ result = [e[2] for e in result]
+ s = '\n'.join(
+ ['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(result)), result)])
+ if not s:
+ return defer.fail(Exception("No mailboxes exist on server!"))
+ return proto.prompt(s + "\nWhich mailbox? [1] "
+ ).addCallback(cbPickMailbox, proto, result
+ )
+
+
+def cbPickMailbox(result, proto, mboxes):
+ """
+ When the user selects a mailbox, "examine" it.
+ """
+ mbox = mboxes[int(result or '1') - 1]
+ return proto.status(mbox, 'MESSAGES', 'UNSEEN'
+ ).addCallback(cbMboxStatus, proto)
+
+
+def cbMboxStatus(result, proto):
+ print "You have %s messages (%s unseen)!" % (
+ result['MESSAGES'], result['UNSEEN'])
+ return proto.logout()
+
+
+def cbClose(result):
+ """
+ Close the connection when we finish everything.
+ """
+ from twisted.internet import reactor
+ reactor.stop()
+
+
+def main():
+ hostname = raw_input('IMAP4 Server Hostname: ')
+ port = raw_input('IMAP4 Server Port (the default is 143): ')
+ username = raw_input('IMAP4 Username: ')
+ password = util.getPassword('IMAP4 Password: ')
+
+ onConn = defer.Deferred(
+ ).addCallback(cbServerGreeting, username, password
+ ).addErrback(ebConnection
+ ).addBoth(cbClose)
+
+ factory = SimpleIMAP4ClientFactory(username, onConn)
+
+ from twisted.internet import reactor
+ conn = reactor.connectTCP(hostname, int(port), factory)
+ reactor.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/integration/mail/imap/regressions_mime_struct b/tests/integration/mail/imap/regressions_mime_struct
new file mode 100755
index 00000000..03326646
--- /dev/null
+++ b/tests/integration/mail/imap/regressions_mime_struct
@@ -0,0 +1,461 @@
+#!/usr/bin/env python
+
+# -*- coding: utf-8 -*-
+# regression_mime_struct
+# Copyright (C) 2014 LEAP
+# Copyright (c) Twisted Matrix Laboratories.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Simple Regression Tests for checking MIME struct handling using IMAP4 client.
+
+Iterates trough all mails under a given folder and tries to APPEND them to
+the server being tested. After FETCHING the pushed message, it compares
+the received version with the one that was saved, and exits with an error
+code if they do not match.
+"""
+import os
+import StringIO
+import sys
+
+from email.parser import Parser
+
+from twisted.internet import protocol
+from twisted.internet import ssl
+from twisted.internet import defer
+from twisted.internet import stdio
+from twisted.mail import imap4
+from twisted.protocols import basic
+from twisted.python import log
+
+
+REGRESSIONS_FOLDER = os.environ.get(
+ "REGRESSIONS_FOLDER", "regressions_test")
+print "[+] Using regressions folder:", REGRESSIONS_FOLDER
+
+parser = Parser()
+
+
+def get_msg_parts(raw):
+ """
+ Return a representation of the parts of a message suitable for
+ comparison.
+
+ :param raw: string for the message
+ :type raw: str
+ """
+ m = parser.parsestr(raw)
+ return [dict(part.items())
+ if part.is_multipart()
+ else part.get_payload()
+ for part in m.walk()]
+
+
+def compare_msg_parts(a, b):
+ """
+ Compare two sequences of parts of messages.
+
+ :param a: part sequence for message a
+ :param b: part sequence for message b
+
+ :return: True if both message sequences are equivalent.
+ :rtype: bool
+ """
+ # XXX This could be smarter and show the differences in the
+ # different parts when/where they differ.
+ #import pprint; pprint.pprint(a[0])
+ #import pprint; pprint.pprint(b[0])
+
+ def lowerkey(d):
+ return dict((k.lower(), v.replace('\r', ''))
+ for k, v in d.iteritems())
+
+ def eq(x, y):
+ # For dicts, we compare a variation with their keys
+ # in lowercase, and \r removed from their values
+ if all(map(lambda i: isinstance(i, dict), (x, y))):
+ x, y = map(lowerkey, (x, y))
+ return x == y
+
+ compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b))
+ all_match = all(compare_vector)
+
+ if not all_match:
+ print "PARTS MISMATCH!"
+ print "vector: ", compare_vector
+ index = compare_vector.index(False)
+ from pprint import pprint
+ print "Expected:"
+ pprint(a[index])
+ print ("***")
+ print "Found:"
+ pprint(b[index])
+ print
+
+ return all_match
+
+
+def get_fd(string):
+ """
+ Return a file descriptor with the passed string
+ as content.
+ """
+ fd = StringIO.StringIO()
+ fd.write(string)
+ fd.seek(0)
+ return fd
+
+
+class TrivialPrompter(basic.LineReceiver):
+ promptDeferred = None
+
+ def prompt(self, msg):
+ assert self.promptDeferred is None
+ self.display(msg)
+ self.promptDeferred = defer.Deferred()
+ return self.promptDeferred
+
+ def display(self, msg):
+ self.transport.write(msg)
+
+ def lineReceived(self, line):
+ if self.promptDeferred is None:
+ return
+ d, self.promptDeferred = self.promptDeferred, None
+ d.callback(line)
+
+
+class SimpleIMAP4Client(imap4.IMAP4Client):
+ """
+ A client with callbacks for greeting messages from an IMAP server.
+ """
+ greetDeferred = None
+
+ def serverGreeting(self, caps):
+ self.serverCapabilities = caps
+ if self.greetDeferred is not None:
+ d, self.greetDeferred = self.greetDeferred, None
+ d.callback(self)
+
+
+class SimpleIMAP4ClientFactory(protocol.ClientFactory):
+ usedUp = False
+ protocol = SimpleIMAP4Client
+
+ def __init__(self, username, onConn):
+ self.ctx = ssl.ClientContextFactory()
+
+ self.username = username
+ self.onConn = onConn
+
+ def buildProtocol(self, addr):
+ """
+ Initiate the protocol instance. Since we are building a simple IMAP
+ client, we don't bother checking what capabilities the server has. We
+ just add all the authenticators twisted.mail has. Note: Gmail no
+ longer uses any of the methods below, it's been using XOAUTH since
+ 2010.
+ """
+ assert not self.usedUp
+ self.usedUp = True
+
+ p = self.protocol(self.ctx)
+ p.factory = self
+ p.greetDeferred = self.onConn
+
+ p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
+ p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
+ p.registerAuthenticator(
+ imap4.CramMD5ClientAuthenticator(self.username))
+
+ return p
+
+ def clientConnectionFailed(self, connector, reason):
+ d, self.onConn = self.onConn, None
+ d.errback(reason)
+
+
+def cbServerGreeting(proto, username, password):
+ """
+ Initial callback - invoked after the server sends us its greet message.
+ """
+ # Hook up stdio
+ tp = TrivialPrompter()
+ stdio.StandardIO(tp)
+
+ # And make it easily accessible
+ proto.prompt = tp.prompt
+ proto.display = tp.display
+
+ # Try to authenticate securely
+ return proto.authenticate(
+ password).addCallback(
+ cbAuthentication,
+ proto).addErrback(
+ ebAuthentication, proto, username, password
+ )
+
+
+def ebConnection(reason):
+ """
+ Fallback error-handler. If anything goes wrong, log it and quit.
+ """
+ log.startLogging(sys.stdout)
+ log.err(reason)
+ return reason
+
+
+def cbAuthentication(result, proto):
+ """
+ Callback after authentication has succeeded.
+
+ Lists a bunch of mailboxes.
+ """
+ return proto.select(
+ REGRESSIONS_FOLDER
+ ).addCallback(
+ cbSelectMbox, proto
+ ).addErrback(
+ ebSelectMbox, proto, REGRESSIONS_FOLDER)
+
+
+def ebAuthentication(failure, proto, username, password):
+ """
+ Errback invoked when authentication fails.
+
+ If it failed because no SASL mechanisms match, offer the user the choice
+ of logging in insecurely.
+
+ If you are trying to connect to your Gmail account, you will be here!
+ """
+ failure.trap(imap4.NoSupportedAuthentication)
+ return InsecureLogin(proto, username, password)
+
+
+def InsecureLogin(proto, username, password):
+ """
+ Raise insecure-login error.
+ """
+ return proto.login(
+ username, password
+ ).addCallback(
+ cbAuthentication, proto)
+
+
+def cbSelectMbox(result, proto):
+ """
+ Callback invoked when select command finishes successfully.
+
+ If any message is in the test folder, it will flag them as deleted and
+ expunge.
+ If no messages found, it will start with the APPEND tests.
+ """
+ print "SELECT: %s EXISTS " % result.get("EXISTS", "??")
+
+ if result["EXISTS"] != 0:
+ # Flag as deleted, expunge, and do an examine again.
+ print "There is mail here, will delete..."
+ return cbDeleteAndExpungeTestFolder(proto)
+
+ else:
+ return cbAppendNextMessage(proto)
+
+
+def ebSelectMbox(failure, proto, folder):
+ """
+ Errback invoked when the examine command fails.
+
+ Creates the folder.
+ """
+ log.err(failure)
+ log.msg("Folder %r does not exist. Creating..." % (folder,))
+ return proto.create(folder).addCallback(cbAuthentication, proto)
+
+
+def ebExpunge(failure):
+ log.err(failure)
+
+
+def cbDeleteAndExpungeTestFolder(proto):
+ """
+ Callback invoked fom cbExamineMbox when the number of messages in the
+ mailbox is not zero. It flags all messages as deleted and expunge the
+ mailbox.
+ """
+ return proto.setFlags(
+ "1:*", ("\\Deleted",)
+ ).addCallback(
+ lambda r: proto.expunge()
+ ).addCallback(
+ cbExpunge, proto
+ ).addErrback(
+ ebExpunge)
+
+
+def cbExpunge(result, proto):
+ return proto.select(
+ REGRESSIONS_FOLDER
+ ).addCallback(
+ cbSelectMbox, proto
+ ).addErrback(ebSettingDeleted, proto)
+
+
+def ebSettingDeleted(failure, proto):
+ """
+ Report errors during deletion of messages in the mailbox.
+ """
+ print failure.getTraceback()
+
+
+def cbAppendNextMessage(proto):
+ """
+ Appends the next message in the global queue to the test folder.
+ """
+ # 1. Get the next test message from global tuple.
+ try:
+ next_sample = SAMPLES.pop()
+ except IndexError:
+ # we're done!
+ return proto.logout()
+
+ print "\nAPPEND %s" % (next_sample,)
+ raw = open(next_sample).read()
+ msg = get_fd(raw)
+ return proto.append(
+ REGRESSIONS_FOLDER, msg
+ ).addCallback(
+ lambda r: proto.select(REGRESSIONS_FOLDER)
+ ).addCallback(
+ cbAppend, proto, raw
+ ).addErrback(
+ ebAppend, proto, raw)
+
+
+def cbAppend(result, proto, orig_msg):
+ """
+ Fetches the message right after an append.
+ """
+ # XXX keep account of highest UID
+ uid = "1:*"
+
+ return proto.fetchSpecific(
+ '%s' % uid,
+ headerType='',
+ headerArgs=['BODY.PEEK[]'],
+ ).addCallback(
+ cbCompareMessage, proto, orig_msg
+ ).addErrback(ebAppend, proto, orig_msg)
+
+
+def ebAppend(failure, proto, raw):
+ """
+ Errorback for the append operation
+ """
+ print "ERROR WHILE APPENDING!"
+ print failure.getTraceback()
+
+
+def cbPickMessage(result, proto):
+ """
+ Pick a message.
+ """
+ return proto.fetchSpecific(
+ '%s' % result,
+ headerType='',
+ headerArgs=['BODY.PEEK[]'],
+ ).addCallback(cbCompareMessage, proto)
+
+
+def cbCompareMessage(result, proto, raw):
+ """
+ Display message and compare it with the original one.
+ """
+ parts_orig = get_msg_parts(raw)
+
+ if result:
+ keys = result.keys()
+ keys.sort()
+ else:
+ print "[-] GOT NO RESULT"
+ return proto.logout()
+
+ latest = max(keys)
+
+ fetched_msg = result[latest][0][2]
+ parts_fetched = get_msg_parts(fetched_msg)
+
+ equal = compare_msg_parts(
+ parts_orig,
+ parts_fetched)
+
+ if equal:
+ print "[+] MESSAGES MATCH"
+ return cbAppendNextMessage(proto)
+ else:
+ print "[-] ERROR: MESSAGES DO NOT MATCH !!!"
+ print " ABORTING COMPARISON..."
+ # FIXME logout and print the subject ...
+ return proto.logout()
+
+
+def cbClose(result):
+ """
+ Close the connection when we finish everything.
+ """
+ from twisted.internet import reactor
+ reactor.stop()
+
+
+def main():
+ import glob
+ import sys
+
+ if len(sys.argv) != 4:
+ print "Usage: regressions <user> <pass> <samples-folder>"
+ sys.exit()
+
+ hostname = "localhost"
+ port = "1984"
+ username = sys.argv[1]
+ password = sys.argv[2]
+
+ samplesdir = sys.argv[3]
+
+ if not os.path.isdir(samplesdir):
+ print ("Could not find samples folder! "
+ "Make sure of copying mail_breaker contents there.")
+ sys.exit()
+
+ samples = glob.glob(samplesdir + '/*')
+
+ global SAMPLES
+ SAMPLES = []
+ SAMPLES += samples
+
+ onConn = defer.Deferred(
+ ).addCallback(
+ cbServerGreeting, username, password
+ ).addErrback(
+ ebConnection
+ ).addBoth(cbClose)
+
+ factory = SimpleIMAP4ClientFactory(username, onConn)
+
+ from twisted.internet import reactor
+ reactor.connectTCP(hostname, int(port), factory)
+ reactor.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/integration/mail/imap/rfc822.message b/tests/integration/mail/imap/rfc822.message
new file mode 120000
index 00000000..b19cc280
--- /dev/null
+++ b/tests/integration/mail/imap/rfc822.message
@@ -0,0 +1 @@
+../../tests/rfc822.message \ No newline at end of file
diff --git a/tests/integration/mail/imap/rfc822.multi-minimal.message b/tests/integration/mail/imap/rfc822.multi-minimal.message
new file mode 120000
index 00000000..e0aa678b
--- /dev/null
+++ b/tests/integration/mail/imap/rfc822.multi-minimal.message
@@ -0,0 +1 @@
+../../tests/rfc822.multi-minimal.message \ No newline at end of file
diff --git a/tests/integration/mail/imap/rfc822.multi-nested.message b/tests/integration/mail/imap/rfc822.multi-nested.message
new file mode 120000
index 00000000..306d0dec
--- /dev/null
+++ b/tests/integration/mail/imap/rfc822.multi-nested.message
@@ -0,0 +1 @@
+../../tests/rfc822.multi-nested.message \ No newline at end of file
diff --git a/tests/integration/mail/imap/rfc822.multi-signed.message b/tests/integration/mail/imap/rfc822.multi-signed.message
new file mode 120000
index 00000000..4172244e
--- /dev/null
+++ b/tests/integration/mail/imap/rfc822.multi-signed.message
@@ -0,0 +1 @@
+../../tests/rfc822.multi-signed.message \ No newline at end of file
diff --git a/tests/integration/mail/imap/rfc822.multi.message b/tests/integration/mail/imap/rfc822.multi.message
new file mode 120000
index 00000000..62057d20
--- /dev/null
+++ b/tests/integration/mail/imap/rfc822.multi.message
@@ -0,0 +1 @@
+../../tests/rfc822.multi.message \ No newline at end of file
diff --git a/tests/integration/mail/imap/rfc822.plain.message b/tests/integration/mail/imap/rfc822.plain.message
new file mode 120000
index 00000000..5bab0e8d
--- /dev/null
+++ b/tests/integration/mail/imap/rfc822.plain.message
@@ -0,0 +1 @@
+../../tests/rfc822.plain.message \ No newline at end of file
diff --git a/tests/integration/mail/imap/stress_tests_imap.zsh b/tests/integration/mail/imap/stress_tests_imap.zsh
new file mode 100755
index 00000000..544facaa
--- /dev/null
+++ b/tests/integration/mail/imap/stress_tests_imap.zsh
@@ -0,0 +1,178 @@
+#!/bin/zsh
+# BATCH STRESS TEST FOR IMAP ----------------------
+# http://imgs.xkcd.com/comics/science.jpg
+#
+# Run imaptest against a LEAP IMAP server
+# for a fixed period of time, and collect output.
+#
+# Author: Kali Kaneko
+# Date: 2014 01 26
+#
+# To run, you need to have `imaptest` in your path.
+# See:
+# http://www.imapwiki.org/ImapTest/Installation
+#
+# For the tests, I'm using a 10MB file sample that
+# can be downloaded from:
+# http://www.dovecot.org/tmp/dovecot-crlf
+#
+# Want to contribute to benchmarking?
+#
+# 1. Create a pristine account in a bitmask provider.
+#
+# 2. Launch your bitmask client, with different flags
+# if you desire.
+#
+# For example to try the nosync flag in sqlite:
+#
+# LEAP_SQLITE_NOSYNC=1 bitmask --debug -N --offline -l /tmp/leap.log
+#
+# 3. Run at several points in time (ie: just after
+# launching the bitmask client. one minute after,
+# ten minutes after)
+#
+# mkdir data
+# cd data
+# ../leap_tests_imap.zsh | tee sqlite_nosync_run2.log
+#
+# 4. Submit your results to: kali at leap dot se
+# together with the logs of the bitmask run.
+#
+# Please provide also details about your system, and
+# the type of hard disk setup you are running against.
+#
+
+# ------------------------------------------------
+# Edit these variables if you are too lazy to pass
+# the user and mbox as parameters. Like me.
+
+USER="test_f14@dev.bitmask.net"
+MBOX="~/leap/imaptest/data/dovecot-crlf"
+
+HOST="localhost"
+PORT="1984"
+
+# in case you have it aliased
+GREP="/bin/grep"
+IMAPTEST="imaptest"
+
+# -----------------------------------------------
+#
+# These should be kept constant across benchmarking
+# runs across different machines, for comparability.
+
+DURATION=200
+NUM_MSG=200
+
+
+# TODO add another function, and a cli flag, to be able
+# to take several aggretates spaced in time, along a period
+# of several minutes.
+
+imaptest_cmd() {
+ stdbuf -o0 ${IMAPTEST} user=${USER} pass=1234 host=${HOST} \
+ port=${PORT} mbox=${MBOX} clients=1 msgs=${NUM_MSG} \
+ no_pipelining 2>/dev/null
+}
+
+stress_imap() {
+ mkfifo imap_pipe
+ cat imap_pipe | tee output &
+ imaptest_cmd >> imap_pipe
+}
+
+wait_and_kill() {
+ while :
+ do
+ sleep $DURATION
+ pkill -2 imaptest
+ rm imap_pipe
+ break
+ done
+}
+
+print_results() {
+ sleep 1
+ echo
+ echo
+ echo "AGGREGATED RESULTS"
+ echo "----------------------"
+ echo "\tavg\tstdev"
+ $GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \
+ gawk '
+function avg(data, count) {
+ sum=0;
+ for( x=0; x <= count-1; x++) {
+ sum += data[x];
+ }
+ return sum/count;
+}
+function std_dev(data, count) {
+ sum=0;
+ for( x=0; x <= count-1; x++) {
+ sum += data[x];
+ }
+ average = sum/count;
+
+ sumsq=0;
+ for( x=0; x <= count-1; x++) {
+ sumsq += (data[x] - average)^2;
+ }
+ return sqrt(sumsq/count);
+}
+BEGIN {
+ cnt = 0
+} END {
+
+printf("LOGI:\t%04.2lf\t%04.2f\n", avg(array[1], NR), std_dev(array[1], NR));
+printf("LIST:\t%04.2lf\t%04.2f\n", avg(array[2], NR), std_dev(array[2], NR));
+printf("STAT:\t%04.2lf\t%04.2f\n", avg(array[3], NR), std_dev(array[3], NR));
+printf("SELE:\t%04.2lf\t%04.2f\n", avg(array[4], NR), std_dev(array[4], NR));
+printf("FETC:\t%04.2lf\t%04.2f\n", avg(array[5], NR), std_dev(array[5], NR));
+printf("FET2:\t%04.2lf\t%04.2f\n", avg(array[6], NR), std_dev(array[6], NR));
+printf("STOR:\t%04.2lf\t%04.2f\n", avg(array[7], NR), std_dev(array[7], NR));
+printf("DELE:\t%04.2lf\t%04.2f\n", avg(array[8], NR), std_dev(array[8], NR));
+printf("EXPU:\t%04.2lf\t%04.2f\n", avg(array[9], NR), std_dev(array[9], NR));
+printf("APPE:\t%04.2lf\t%04.2f\n", avg(array[10], NR), std_dev(array[10], NR));
+printf("LOGO:\t%04.2lf\t%04.2f\n", avg(array[11], NR), std_dev(array[11], NR));
+
+print ""
+print "TOT samples", NR;
+}
+{
+ it = cnt++;
+ array[1][it] = $1;
+ array[2][it] = $2;
+ array[3][it] = $3;
+ array[4][it] = $4;
+ array[5][it] = $5;
+ array[6][it] = $6;
+ array[7][it] = $7;
+ array[8][it] = $8;
+ array[9][it] = $9;
+ array[10][it] = $10;
+ array[11][it] = $11;
+}'
+}
+
+
+{ test $1 = "--help" } && {
+ echo "Usage: $0 [user@provider] [/path/to/sample.mbox]"
+ exit 0
+}
+
+# If the first parameter is passed, take it as the user
+{ test $1 } && {
+ USER=$1
+}
+
+# If the second parameter is passed, take it as the mbox
+{ test $2 } && {
+ MBOX=$2
+}
+
+echo "[+] LEAP IMAP TESTS"
+echo "[+] Running imaptest for $DURATION seconds with $NUM_MSG messages"
+wait_and_kill &
+stress_imap
+print_results
diff --git a/tests/integration/mail/imap/test_imap.py b/tests/integration/mail/imap/test_imap.py
new file mode 100644
index 00000000..8d34a499
--- /dev/null
+++ b/tests/integration/mail/imap/test_imap.py
@@ -0,0 +1,1062 @@
+# -*- coding: utf-8 -*-
+# test_imap.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test case for leap.email.imap.server
+TestCases taken from twisted tests and modified to make them work
+against our implementation of the IMAPAccount.
+
+@authors: Kali Kaneko, <kali@leap.se>
+XXX add authors from the original twisted tests.
+
+@license: GPLv3, see included LICENSE file
+"""
+# XXX review license of the original tests!!!
+import os
+import string
+import types
+
+
+from twisted.mail import imap4
+from twisted.internet import defer
+from twisted.python import util
+from twisted.python import failure
+
+from twisted import cred
+
+from leap.bitmask.mail.imap.mailbox import IMAPMailbox
+from leap.bitmask.mail.imap.messages import CaseInsensitiveDict
+from leap.bitmask.mail.testing.imap import IMAP4HelperMixin
+
+
+TEST_USER = "testuser@leap.se"
+TEST_PASSWD = "1234"
+
+HERE = os.path.split(os.path.abspath(__file__))[0]
+
+
+def strip(f):
+ return lambda result, f=f: f()
+
+
+def sortNest(l):
+ l = l[:]
+ l.sort()
+ for i in range(len(l)):
+ if isinstance(l[i], types.ListType):
+ l[i] = sortNest(l[i])
+ elif isinstance(l[i], types.TupleType):
+ l[i] = tuple(sortNest(list(l[i])))
+ return l
+
+
+class TestRealm:
+ """
+ A minimal auth realm for testing purposes only
+ """
+ theAccount = None
+
+ def requestAvatar(self, avatarId, mind, *interfaces):
+ return imap4.IAccount, self.theAccount, lambda: None
+
+#
+# TestCases
+#
+
+# DEBUG ---
+# from twisted.internet.base import DelayedCall
+# DelayedCall.debug = True
+
+
+class LEAPIMAP4ServerTestCase(IMAP4HelperMixin):
+
+ """
+ Tests for the generic behavior of the LEAPIMAP4Server
+ which, right now, it's just implemented in this test file as
+ LEAPIMAPServer. We will move the implementation, together with
+ authentication bits, to leap.bitmask.mail.imap.server so it can be
+ instantiated from the tac file.
+
+ Right now this TestCase tries to mimmick as close as possible the
+ organization from the twisted.mail.imap tests so we can achieve
+ a complete implementation. The order in which they appear reflect
+ the intended order of implementation.
+ """
+
+ #
+ # mailboxes operations
+ #
+
+ def testCreate(self):
+ """
+ Test whether we can create mailboxes
+ """
+ succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'foobox')
+ fail = ('testbox', 'test/box')
+ acc = self.server.theAccount
+
+ def cb():
+ self.result.append(1)
+
+ def eb(failure):
+ self.result.append(0)
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def create():
+ create_deferreds = []
+ for name in succeed + fail:
+ d = self.client.create(name)
+ d.addCallback(strip(cb)).addErrback(eb)
+ create_deferreds.append(d)
+ dd = defer.gatherResults(create_deferreds)
+ dd.addCallbacks(self._cbStopClient, self._ebGeneral)
+ return dd
+
+ self.result = []
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(create))
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2], consumeErrors=True)
+ d.addCallback(lambda _: acc.account.list_all_mailbox_names())
+ return d.addCallback(self._cbTestCreate, succeed, fail)
+
+ def _cbTestCreate(self, mailboxes, succeed, fail):
+ self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail))
+
+ answers = ([u'INBOX', u'testbox', u'test/box', u'test',
+ u'test/box/box', 'foobox'])
+ self.assertEqual(sorted(mailboxes), sorted([a for a in answers]))
+
+ def testDelete(self):
+ """
+ Test whether we can delete mailboxes
+ """
+ def add_mailbox():
+ return self.server.theAccount.addMailbox('test-delete/me')
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def delete():
+ return self.client.delete('test-delete/me')
+
+ acc = self.server.theAccount.account
+
+ d1 = self.connected.addCallback(add_mailbox)
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(delete), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: acc.list_all_mailbox_names())
+ d.addCallback(lambda mboxes: self.assertEqual(
+ mboxes, ['INBOX']))
+ return d
+
+ def testIllegalInboxDelete(self):
+ """
+ Test what happens if we try to delete the user Inbox.
+ We expect that operation to fail.
+ """
+ self.stashed = None
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def delete():
+ return self.client.delete('inbox')
+
+ def stash(result):
+ self.stashed = result
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(delete), self._ebGeneral)
+ d1.addBoth(stash)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
+ failure.Failure)))
+ return d
+
+ def testNonExistentDelete(self):
+ """
+ Test what happens if we try to delete a non-existent mailbox.
+ We expect an error raised stating 'No such mailbox'
+ """
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def delete():
+ return self.client.delete('delete/me')
+ self.failure = failure
+
+ def deleteFailed(failure):
+ self.failure = failure
+
+ self.failure = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(delete)).addErrback(deleteFailed)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.assertTrue(
+ str(self.failure.value).startswith('No such mailbox')))
+ return d
+
+ def testIllegalDelete(self):
+ """
+ Try deleting a mailbox with sub-folders, and \NoSelect flag set.
+ An exception is expected.
+ """
+ acc = self.server.theAccount
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def create_mailboxes():
+ d1 = acc.addMailbox('delete')
+ d2 = acc.addMailbox('delete/me')
+ d = defer.gatherResults([d1, d2])
+ return d
+
+ def get_noselect_mailbox(mboxes):
+ mbox = mboxes[0]
+ return mbox.setFlags((r'\Noselect',))
+
+ def delete_mbox(ignored):
+ return self.client.delete('delete')
+
+ def deleteFailed(failure):
+ self.failure = failure
+
+ self.failure = None
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(create_mailboxes))
+ d1.addCallback(get_noselect_mailbox)
+
+ d1.addCallback(delete_mbox).addErrback(deleteFailed)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ expected = ("Hierarchically inferior mailboxes exist "
+ "and \\Noselect is set")
+ d.addCallback(lambda _:
+ self.assertTrue(self.failure is not None))
+ d.addCallback(lambda _:
+ self.assertEqual(str(self.failure.value), expected))
+ return d
+
+ # FIXME --- this test sometimes FAILS (timing issue).
+ # Some of the deferreds used in the rename op is not waiting for the
+ # operations properly
+ def testRename(self):
+ """
+ Test whether we can rename a mailbox
+ """
+ def create_mbox():
+ return self.server.theAccount.addMailbox('oldmbox')
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def rename():
+ return self.client.rename('oldmbox', 'newname')
+
+ d1 = self.connected.addCallback(strip(create_mbox))
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(rename), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.server.theAccount.account.list_all_mailbox_names())
+ d.addCallback(lambda mboxes:
+ self.assertItemsEqual(mboxes, ['INBOX', 'newname']))
+ return d
+
+ def testIllegalInboxRename(self):
+ """
+ Try to rename inbox. We expect it to fail. Then it would be not
+ an inbox anymore, would it?
+ """
+ self.stashed = None
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def rename():
+ return self.client.rename('inbox', 'frotz')
+
+ def stash(stuff):
+ self.stashed = stuff
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(rename), self._ebGeneral)
+ d1.addBoth(stash)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.failUnless(isinstance(
+ self.stashed, failure.Failure)))
+ return d
+
+ def testHierarchicalRename(self):
+ """
+ Try to rename hierarchical mailboxes
+ """
+ acc = self.server.theAccount
+
+ def add_mailboxes():
+ return defer.gatherResults([
+ acc.addMailbox('oldmbox/m1'),
+ acc.addMailbox('oldmbox/m2')])
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def rename():
+ return self.client.rename('oldmbox', 'newname')
+
+ d1 = self.connected.addCallback(strip(add_mailboxes))
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(rename), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: acc.account.list_all_mailbox_names())
+ return d.addCallback(self._cbTestHierarchicalRename)
+
+ def _cbTestHierarchicalRename(self, mailboxes):
+ expected = ['INBOX', 'newname/m1', 'newname/m2']
+ self.assertEqual(sorted(mailboxes), sorted([s for s in expected]))
+
+ def testSubscribe(self):
+ """
+ Test whether we can mark a mailbox as subscribed to
+ """
+ acc = self.server.theAccount
+
+ def add_mailbox():
+ return acc.addMailbox('this/mbox')
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def subscribe():
+ return self.client.subscribe('this/mbox')
+
+ def get_subscriptions(ignored):
+ return self.server.theAccount.getSubscriptions()
+
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(subscribe), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(get_subscriptions)
+ d.addCallback(lambda subscriptions:
+ self.assertEqual(subscriptions,
+ ['this/mbox']))
+ return d
+
+ def testUnsubscribe(self):
+ """
+ Test whether we can unsubscribe from a set of mailboxes
+ """
+ acc = self.server.theAccount
+
+ def add_mailboxes():
+ return defer.gatherResults([
+ acc.addMailbox('this/mbox'),
+ acc.addMailbox('that/mbox')])
+
+ def dc1():
+ return acc.subscribe('this/mbox')
+
+ def dc2():
+ return acc.subscribe('that/mbox')
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def unsubscribe():
+ return self.client.unsubscribe('this/mbox')
+
+ def get_subscriptions(ignored):
+ return acc.getSubscriptions()
+
+ d1 = self.connected.addCallback(strip(add_mailboxes))
+ d1.addCallback(strip(login))
+ d1.addCallback(strip(dc1))
+ d1.addCallback(strip(dc2))
+ d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(get_subscriptions)
+ d.addCallback(lambda subscriptions:
+ self.assertEqual(subscriptions,
+ ['that/mbox']))
+ return d
+
+ def testSelect(self):
+ """
+ Try to select a mailbox
+ """
+ mbox_name = "TESTMAILBOXSELECT"
+ self.selectedArgs = None
+
+ acc = self.server.theAccount
+
+ def add_mailbox():
+ return acc.addMailbox(mbox_name, creation_ts=42)
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def select():
+ def selected(args):
+ self.selectedArgs = args
+ self._cbStopClient(None)
+ d = self.client.select(mbox_name)
+ d.addCallback(selected)
+ return d
+
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallback(strip(select))
+ # d1.addErrback(self._ebGeneral)
+
+ d2 = self.loopback()
+
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(self._cbTestSelect)
+ return d
+
+ def _cbTestSelect(self, ignored):
+ self.assertTrue(self.selectedArgs is not None)
+
+ self.assertEqual(self.selectedArgs, {
+ 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42,
+ 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged',
+ '\\Deleted', '\\Draft', '\\Recent', 'List'),
+ 'READ-WRITE': True
+ })
+
+ #
+ # capabilities
+ #
+
+ def testCapability(self):
+ caps = {}
+
+ def getCaps():
+ def gotCaps(c):
+ caps.update(c)
+ self.server.transport.loseConnection()
+ return self.client.getCapabilities().addCallback(gotCaps)
+
+ d1 = self.connected
+ d1.addCallback(
+ strip(getCaps)).addErrback(self._ebGeneral)
+
+ d = defer.gatherResults([self.loopback(), d1])
+ expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'LITERAL+': None,
+ 'IDLE': None}
+ d.addCallback(lambda _: self.assertEqual(expected, caps))
+ return d
+
+ def testCapabilityWithAuth(self):
+ caps = {}
+ self.server.challengers[
+ 'CRAM-MD5'] = cred.credentials.CramMD5Credentials
+
+ def getCaps():
+ def gotCaps(c):
+ caps.update(c)
+ self.server.transport.loseConnection()
+ return self.client.getCapabilities().addCallback(gotCaps)
+ d1 = self.connected.addCallback(
+ strip(getCaps)).addErrback(self._ebGeneral)
+
+ d = defer.gatherResults([self.loopback(), d1])
+
+ expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
+ 'IDLE': None, 'LITERAL+': None,
+ 'AUTH': ['CRAM-MD5']}
+
+ d.addCallback(lambda _: self.assertEqual(expCap, caps))
+ return d
+
+ #
+ # authentication
+ #
+
+ def testLogout(self):
+ """
+ Test log out
+ """
+ self.loggedOut = 0
+
+ def logout():
+ def setLoggedOut():
+ self.loggedOut = 1
+ self.client.logout().addCallback(strip(setLoggedOut))
+ self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
+ d = self.loopback()
+ return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1))
+
+ def testNoop(self):
+ """
+ Test noop command
+ """
+ self.responses = None
+
+ def noop():
+ def setResponses(responses):
+ self.responses = responses
+ self.server.transport.loseConnection()
+ self.client.noop().addCallback(setResponses)
+ self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
+ d = self.loopback()
+ return d.addCallback(lambda _: self.assertEqual(self.responses, []))
+
+ def testLogin(self):
+ """
+ Test login
+ """
+ def login():
+ d = self.client.login(TEST_USER, TEST_PASSWD)
+ d.addCallback(self._cbStopClient)
+ d1 = self.connected.addCallback(
+ strip(login)).addErrback(self._ebGeneral)
+ d = defer.gatherResults([d1, self.loopback()])
+ return d.addCallback(self._cbTestLogin)
+
+ def _cbTestLogin(self, ignored):
+ self.assertEqual(self.server.state, 'auth')
+
+ def testFailedLogin(self):
+ """
+ Test bad login
+ """
+ def login():
+ d = self.client.login("bad_user@leap.se", TEST_PASSWD)
+ d.addBoth(self._cbStopClient)
+
+ d1 = self.connected.addCallback(
+ strip(login)).addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestFailedLogin)
+
+ def _cbTestFailedLogin(self, ignored):
+ self.assertEqual(self.server.state, 'unauth')
+ self.assertEqual(self.server.account, None)
+
+ def testLoginRequiringQuoting(self):
+ """
+ Test login requiring quoting
+ """
+ self.server.checker.userid = '{test}user@leap.se'
+ self.server.checker.password = '{test}password'
+
+ def login():
+ d = self.client.login('{test}user@leap.se', '{test}password')
+ d.addBoth(self._cbStopClient)
+
+ d1 = self.connected.addCallback(
+ strip(login)).addErrback(self._ebGeneral)
+ d = defer.gatherResults([self.loopback(), d1])
+ return d.addCallback(self._cbTestLoginRequiringQuoting)
+
+ def _cbTestLoginRequiringQuoting(self, ignored):
+ self.assertEqual(self.server.state, 'auth')
+
+ #
+ # Inspection
+ #
+
+ def testNamespace(self):
+ """
+ Test retrieving namespace
+ """
+ self.namespaceArgs = None
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def namespace():
+ def gotNamespace(args):
+ self.namespaceArgs = args
+ self._cbStopClient(None)
+ return self.client.namespace().addCallback(gotNamespace)
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(namespace))
+ d1.addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.assertEqual(self.namespaceArgs,
+ [[['', '/']], [], []]))
+ return d
+
+ def testExamine(self):
+ """
+ L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and
+ returns a L{Deferred} which fires with a C{dict} with as many of the
+ following keys as the server includes in its response: C{'FLAGS'},
+ C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'},
+ C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}.
+
+ Unfortunately the server doesn't generate all of these so it's hard to
+ test the client's handling of them here. See
+ L{IMAP4ClientExamineTests} below.
+
+ See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2,
+ for details.
+ """
+ # TODO implement the IMAP4ClientExamineTests testcase.
+ mbox_name = "test_mailbox_e"
+ acc = self.server.theAccount
+ self.examinedArgs = None
+
+ def add_mailbox():
+ return acc.addMailbox(mbox_name, creation_ts=42)
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def examine():
+ def examined(args):
+ self.examinedArgs = args
+ self._cbStopClient(None)
+ d = self.client.examine(mbox_name)
+ d.addCallback(examined)
+ return d
+
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallback(strip(examine))
+ d1.addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestExamine)
+
+ def _cbTestExamine(self, ignored):
+ self.assertEqual(self.examinedArgs, {
+ 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42,
+ 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged',
+ '\\Deleted', '\\Draft', '\\Recent', 'List'),
+ 'READ-WRITE': False})
+
+ def _listSetup(self, f, f2=None):
+
+ acc = self.server.theAccount
+
+ def dc1():
+ return acc.addMailbox('root_subthing', creation_ts=42)
+
+ def dc2():
+ return acc.addMailbox('root_another_thing', creation_ts=42)
+
+ def dc3():
+ return acc.addMailbox('non_root_subthing', creation_ts=42)
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def listed(answers):
+ self.listed = answers
+
+ self.listed = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(dc1))
+ d1.addCallback(strip(dc2))
+ d1.addCallback(strip(dc3))
+
+ if f2 is not None:
+ d1.addCallback(f2)
+
+ d1.addCallbacks(strip(f), self._ebGeneral)
+ d1.addCallbacks(listed, self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
+
+ def testList(self):
+ """
+ Test List command
+ """
+ def list():
+ return self.client.list('root', '%')
+
+ d = self._listSetup(list)
+ d.addCallback(lambda listed: self.assertEqual(
+ sortNest(listed),
+ sortNest([
+ (IMAPMailbox.init_flags, "/", "root_subthing"),
+ (IMAPMailbox.init_flags, "/", "root_another_thing")
+ ])
+ ))
+ return d
+
+ def testLSub(self):
+ """
+ Test LSub command
+ """
+ acc = self.server.theAccount
+
+ def subs_mailbox():
+ # why not client.subscribe instead?
+ return acc.subscribe('root_subthing')
+
+ def lsub():
+ return self.client.lsub('root', '%')
+
+ d = self._listSetup(lsub, strip(subs_mailbox))
+ d.addCallback(self.assertEqual,
+ [(IMAPMailbox.init_flags, "/", "root_subthing")])
+ return d
+
+ def testStatus(self):
+ """
+ Test Status command
+ """
+ acc = self.server.theAccount
+
+ def add_mailbox():
+ return acc.addMailbox('root_subthings')
+
+ # XXX FIXME ---- should populate this a little bit,
+ # with unseen etc...
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def status():
+ return self.client.status(
+ 'root_subthings', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
+
+ def statused(result):
+ self.statused = result
+
+ self.statused = None
+
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(status), self._ebGeneral)
+ d1.addCallbacks(statused, self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.assertEqual(
+ self.statused,
+ {'MESSAGES': 0, 'UIDNEXT': '1', 'UNSEEN': 0}
+ ))
+ return d
+
+ def testFailedStatus(self):
+ """
+ Test failed status command with a non-existent mailbox
+ """
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def status():
+ return self.client.status(
+ 'root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
+
+ def statused(result):
+ self.statused = result
+
+ def failed(failure):
+ self.failure = failure
+
+ self.statused = self.failure = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(status), self._ebGeneral)
+ d1.addCallbacks(statused, failed)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d1, d2]).addCallback(
+ self._cbTestFailedStatus)
+
+ def _cbTestFailedStatus(self, ignored):
+ self.assertEqual(
+ self.statused, None
+ )
+ self.assertEqual(
+ self.failure.value.args,
+ ('Could not open mailbox',)
+ )
+
+ #
+ # messages
+ #
+
+ def testFullAppend(self):
+ """
+ Test appending a full message to the mailbox
+ """
+ infile = os.path.join(HERE, '..', 'rfc822.message')
+ message = open(infile)
+ acc = self.server.theAccount
+ mailbox_name = "appendmbox/subthing"
+
+ def add_mailbox():
+ return acc.addMailbox(mailbox_name)
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def append():
+ return self.client.append(
+ mailbox_name, message,
+ ('\\SEEN', '\\DELETED'),
+ 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
+ )
+
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(append), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+
+ d.addCallback(lambda _: acc.getMailbox(mailbox_name))
+ d.addCallback(lambda mb: mb.fetch(imap4.MessageSet(start=1), True))
+ return d.addCallback(self._cbTestFullAppend, infile)
+
+ def _cbTestFullAppend(self, fetched, infile):
+ fetched = list(fetched)
+ self.assertTrue(len(fetched) == 1)
+ self.assertTrue(len(fetched[0]) == 2)
+ uid, msg = fetched[0]
+ parsed = self.parser.parse(open(infile))
+ expected_body = parsed.get_payload()
+ expected_headers = CaseInsensitiveDict(parsed.items())
+
+ def assert_flags(flags):
+ self.assertEqual(
+ set(('\\SEEN', '\\DELETED')),
+ set(flags))
+
+ def assert_date(date):
+ self.assertEqual(
+ 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
+ date)
+
+ def assert_body(body):
+ gotbody = body.read()
+ self.assertEqual(expected_body, gotbody)
+
+ def assert_headers(headers):
+ self.assertItemsEqual(map(string.lower, expected_headers), headers)
+
+ d = defer.maybeDeferred(msg.getFlags)
+ d.addCallback(assert_flags)
+
+ d.addCallback(lambda _: defer.maybeDeferred(msg.getInternalDate))
+ d.addCallback(assert_date)
+
+ d.addCallback(
+ lambda _: defer.maybeDeferred(
+ msg.getBodyFile, self._soledad))
+ d.addCallback(assert_body)
+
+ d.addCallback(lambda _: defer.maybeDeferred(msg.getHeaders, True))
+ d.addCallback(assert_headers)
+
+ return d
+
+ def testPartialAppend(self):
+ """
+ Test partially appending a message to the mailbox
+ """
+ # TODO this test sometimes will fail because of the notify_just_mdoc
+ infile = os.path.join(HERE, '..', 'rfc822.message')
+
+ acc = self.server.theAccount
+
+ def add_mailbox():
+ return acc.addMailbox('PARTIAL/SUBTHING')
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def append():
+ message = file(infile)
+ return self.client.sendCommand(
+ imap4.Command(
+ 'APPEND',
+ 'PARTIAL/SUBTHING (\\SEEN) "Right now" '
+ '{%d}' % os.path.getsize(infile),
+ (), self.client._IMAP4Client__cbContinueAppend, message
+ )
+ )
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallbacks(strip(append), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+
+ d.addCallback(lambda _: acc.getMailbox("PARTIAL/SUBTHING"))
+ d.addCallback(lambda mb: mb.fetch(imap4.MessageSet(start=1), True))
+ return d.addCallback(
+ self._cbTestPartialAppend, infile)
+
+ def _cbTestPartialAppend(self, fetched, infile):
+ fetched = list(fetched)
+ self.assertTrue(len(fetched) == 1)
+ self.assertTrue(len(fetched[0]) == 2)
+ uid, msg = fetched[0]
+ parsed = self.parser.parse(open(infile))
+ expected_body = parsed.get_payload()
+
+ def assert_flags(flags):
+ self.assertEqual(
+ set((['\\SEEN'])), set(flags))
+
+ def assert_body(body):
+ gotbody = body.read()
+ self.assertEqual(expected_body, gotbody)
+
+ d = defer.maybeDeferred(msg.getFlags)
+ d.addCallback(assert_flags)
+
+ d.addCallback(lambda _: defer.maybeDeferred(msg.getBodyFile))
+ d.addCallback(assert_body)
+ return d
+
+ def testCheck(self):
+ """
+ Test check command
+ """
+ def add_mailbox():
+ return self.server.theAccount.addMailbox('root/subthing')
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def select():
+ return self.client.select('root/subthing')
+
+ def check():
+ return self.client.check()
+
+ d = self.connected.addCallbacks(
+ strip(add_mailbox), self._ebGeneral)
+ d.addCallbacks(lambda _: login(), self._ebGeneral)
+ d.addCallbacks(strip(select), self._ebGeneral)
+ d.addCallbacks(strip(check), self._ebGeneral)
+ d.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d, d2])
+
+ # Okay, that was much fun indeed
+
+ def testExpunge(self):
+ """
+ Test expunge command
+ """
+ acc = self.server.theAccount
+ mailbox_name = 'mailboxexpunge'
+
+ def add_mailbox():
+ return acc.addMailbox(mailbox_name)
+
+ def login():
+ return self.client.login(TEST_USER, TEST_PASSWD)
+
+ def select():
+ return self.client.select(mailbox_name)
+
+ def save_mailbox(mailbox):
+ self.mailbox = mailbox
+
+ def get_mailbox():
+ d = acc.getMailbox(mailbox_name)
+ d.addCallback(save_mailbox)
+ return d
+
+ def add_messages():
+ d = self.mailbox.addMessage(
+ 'test 1', flags=('\\Deleted', 'AnotherFlag'),
+ notify_just_mdoc=False)
+ d.addCallback(lambda _: self.mailbox.addMessage(
+ 'test 2', flags=('AnotherFlag',),
+ notify_just_mdoc=False))
+ d.addCallback(lambda _: self.mailbox.addMessage(
+ 'test 3', flags=('\\Deleted',),
+ notify_just_mdoc=False))
+ return d
+
+ def expunge():
+ return self.client.expunge()
+
+ def expunged(results):
+ self.failIf(self.server.mbox is None)
+ self.results = results
+
+ self.results = None
+ d1 = self.connected.addCallback(strip(add_mailbox))
+ d1.addCallback(strip(login))
+ d1.addCallback(strip(get_mailbox))
+ d1.addCallbacks(strip(add_messages), self._ebGeneral)
+ d1.addCallbacks(strip(select), self._ebGeneral)
+ d1.addCallbacks(strip(expunge), self._ebGeneral)
+ d1.addCallbacks(expunged, self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.mailbox.getMessageCount())
+ return d.addCallback(self._cbTestExpunge)
+
+ def _cbTestExpunge(self, count):
+ # we only left 1 mssage with no deleted flag
+ self.assertEqual(count, 1)
+ # the uids of the deleted messages
+ self.assertItemsEqual(self.results, [1, 3])
+
+
+class AccountTestCase(IMAP4HelperMixin):
+ """
+ Test the Account.
+ """
+ def _create_empty_mailbox(self):
+ return self.server.theAccount.addMailbox('')
+
+ def _create_one_mailbox(self):
+ return self.server.theAccount.addMailbox('one')
+
+ def test_illegalMailboxCreate(self):
+ self.assertRaises(AssertionError, self._create_empty_mailbox)
+
+
+class IMAP4ServerSearchTestCase(IMAP4HelperMixin):
+ """
+ Tests for the behavior of the search_* functions in L{imap5.IMAP4Server}.
+ """
+ # XXX coming soon to your screens!
+ pass
diff --git a/tests/integration/mail/imap/walktree.py b/tests/integration/mail/imap/walktree.py
new file mode 100644
index 00000000..5a4ed7e3
--- /dev/null
+++ b/tests/integration/mail/imap/walktree.py
@@ -0,0 +1,127 @@
+# -*- coding: utf-8 -*-
+# walktree.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the walktree module.
+"""
+import os
+import sys
+import pprint
+from email import parser
+
+from leap.bitmask.mail import walk as W
+
+DEBUG = os.environ.get("BITMASK_MAIL_DEBUG")
+
+
+p = parser.Parser()
+
+# TODO pass an argument of the type of message
+
+##################################################
+# Input from hell
+
+if len(sys.argv) > 1:
+ FILENAME = sys.argv[1]
+else:
+ FILENAME = "rfc822.multi-signed.message"
+
+"""
+FILENAME = "rfc822.plain.message"
+FILENAME = "rfc822.multi-minimal.message"
+"""
+
+msg = p.parse(open(FILENAME))
+DO_CHECK = False
+#################################################
+
+parts = W.get_parts(msg)
+
+if DEBUG:
+ def trim(item):
+ item = item[:10]
+ [trim(part["phash"]) for part in parts if part.get('phash', None)]
+
+raw_docs = list(W.get_raw_docs(msg, parts))
+
+body_phash_fun = [W.get_body_phash_simple,
+ W.get_body_phash_multi][int(msg.is_multipart())]
+body_phash = body_phash_fun(W.get_payloads(msg))
+parts_map = W.walk_msg_tree(parts, body_phash=body_phash)
+
+
+# TODO add missing headers!
+expected = {
+ 'body': '1ddfa80485',
+ 'multi': True,
+ 'part_map': {
+ 1: {
+ 'headers': {'Content-Disposition': 'inline',
+ 'Content-Type': 'multipart/mixed; '
+ 'boundary="z0eOaCaDLjvTGF2l"'},
+ 'multi': True,
+ 'part_map': {1: {'ctype': 'text/plain',
+ 'headers': [
+ ('Content-Type',
+ 'text/plain; charset=utf-8'),
+ ('Content-Disposition',
+ 'inline'),
+ ('Content-Transfer-Encoding',
+ 'quoted-printable')],
+ 'multi': False,
+ 'parts': 1,
+ 'phash': '1ddfa80485',
+ 'size': 206},
+ 2: {'ctype': 'text/plain',
+ 'headers': [('Content-Type',
+ 'text/plain; charset=us-ascii'),
+ ('Content-Disposition',
+ 'attachment; '
+ 'filename="attach.txt"')],
+ 'multi': False,
+ 'parts': 1,
+ 'phash': '7a94e4d769',
+ 'size': 133},
+ 3: {'ctype': 'application/octet-stream',
+ 'headers': [('Content-Type',
+ 'application/octet-stream'),
+ ('Content-Disposition',
+ 'attachment; filename="hack.ico"'),
+ ('Content-Transfer-Encoding',
+ 'base64')],
+ 'multi': False,
+ 'parts': 1,
+ 'phash': 'c42cccebbd',
+ 'size': 12736}}},
+ 2: {'ctype': 'application/pgp-signature',
+ 'headers': [('Content-Type', 'application/pgp-signature')],
+ 'multi': False,
+ 'parts': 1,
+ 'phash': '8f49fbf749',
+ 'size': 877}}}
+
+if DEBUG and DO_CHECK:
+ # TODO turn this into a proper unittest
+ assert(parts_map == expected)
+ print "Structure: OK"
+
+
+print
+print "RAW DOCS"
+pprint.pprint(raw_docs)
+print
+print "PARTS MAP"
+pprint.pprint(parts_map)