diff options
Diffstat (limited to 'src/leap/bitmask/mail/imap')
-rw-r--r-- | src/leap/bitmask/mail/imap/tests/.gitignore | 1 | ||||
-rwxr-xr-x | src/leap/bitmask/mail/imap/tests/getmail | 344 | ||||
-rwxr-xr-x | src/leap/bitmask/mail/imap/tests/imapclient.py | 207 | ||||
-rwxr-xr-x | src/leap/bitmask/mail/imap/tests/regressions_mime_struct | 461 | ||||
l--------- | src/leap/bitmask/mail/imap/tests/rfc822.message | 1 | ||||
l--------- | src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message | 1 | ||||
l--------- | src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message | 1 | ||||
l--------- | src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message | 1 | ||||
l--------- | src/leap/bitmask/mail/imap/tests/rfc822.multi.message | 1 | ||||
l--------- | src/leap/bitmask/mail/imap/tests/rfc822.plain.message | 1 | ||||
-rwxr-xr-x | src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh | 178 | ||||
-rw-r--r-- | src/leap/bitmask/mail/imap/tests/test_imap.py | 1060 | ||||
-rw-r--r-- | src/leap/bitmask/mail/imap/tests/walktree.py | 127 |
13 files changed, 0 insertions, 2384 deletions
diff --git a/src/leap/bitmask/mail/imap/tests/.gitignore b/src/leap/bitmask/mail/imap/tests/.gitignore deleted file mode 100644 index 60baa9c..0000000 --- a/src/leap/bitmask/mail/imap/tests/.gitignore +++ /dev/null @@ -1 +0,0 @@ -data/* diff --git a/src/leap/bitmask/mail/imap/tests/getmail b/src/leap/bitmask/mail/imap/tests/getmail deleted file mode 100755 index dd3fa0b..0000000 --- a/src/leap/bitmask/mail/imap/tests/getmail +++ /dev/null @@ -1,344 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE in twisted for details. - -# Modifications by LEAP Developers 2014 to fit -# Bitmask configuration settings. -""" -Simple IMAP4 client which displays the subjects of all messages in a -particular mailbox. -""" - -import os -import sys - -from twisted.internet import protocol -from twisted.internet import ssl -from twisted.internet import defer -from twisted.internet import stdio -from twisted.mail import imap4 -from twisted.protocols import basic -from twisted.python import log - -# Global options stored here from main -_opts = {} - - -class TrivialPrompter(basic.LineReceiver): - from os import linesep as delimiter - - promptDeferred = None - - def prompt(self, msg): - assert self.promptDeferred is None - self.display(msg) - self.promptDeferred = defer.Deferred() - return self.promptDeferred - - def display(self, msg): - self.transport.write(msg) - - def lineReceived(self, line): - if self.promptDeferred is None: - return - d, self.promptDeferred = self.promptDeferred, None - d.callback(line) - - -class SimpleIMAP4Client(imap4.IMAP4Client): - """ - A client with callbacks for greeting messages from an IMAP server. - """ - greetDeferred = None - - def serverGreeting(self, caps): - self.serverCapabilities = caps - if self.greetDeferred is not None: - d, self.greetDeferred = self.greetDeferred, None - d.callback(self) - - -class SimpleIMAP4ClientFactory(protocol.ClientFactory): - usedUp = False - - protocol = SimpleIMAP4Client - - def __init__(self, username, onConn): - self.ctx = ssl.ClientContextFactory() - - self.username = username - self.onConn = onConn - - def buildProtocol(self, addr): - """ - Initiate the protocol instance. Since we are building a simple IMAP - client, we don't bother checking what capabilities the server has. We - just add all the authenticators twisted.mail has. - """ - assert not self.usedUp - self.usedUp = True - - p = self.protocol(self.ctx) - p.factory = self - p.greetDeferred = self.onConn - - p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) - p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) - p.registerAuthenticator( - imap4.CramMD5ClientAuthenticator(self.username)) - - return p - - def clientConnectionFailed(self, connector, reason): - d, self.onConn = self.onConn, None - d.errback(reason) - - -def cbServerGreeting(proto, username, password): - """ - Initial callback - invoked after the server sends us its greet message. - """ - # Hook up stdio - tp = TrivialPrompter() - stdio.StandardIO(tp) - - # And make it easily accessible - proto.prompt = tp.prompt - proto.display = tp.display - - # Try to authenticate securely - return proto.authenticate( - password).addCallback( - cbAuthentication, - proto).addErrback( - ebAuthentication, proto, username, password - ) - - -def ebConnection(reason): - """ - Fallback error-handler. If anything goes wrong, log it and quit. - """ - log.startLogging(sys.stdout) - log.err(reason) - return reason - - -def cbAuthentication(result, proto): - """ - Callback after authentication has succeeded. - - Lists a bunch of mailboxes. - """ - return proto.list("", "*" - ).addCallback(cbMailboxList, proto - ) - - -def ebAuthentication(failure, proto, username, password): - """ - Errback invoked when authentication fails. - - If it failed because no SASL mechanisms match, offer the user the choice - of logging in insecurely. - - If you are trying to connect to your Gmail account, you will be here! - """ - failure.trap(imap4.NoSupportedAuthentication) - return InsecureLogin(proto, username, password) - - -def InsecureLogin(proto, username, password): - """ - insecure-login. - """ - return proto.login(username, password - ).addCallback(cbAuthentication, proto - ) - - -def cbMailboxList(result, proto): - """ - Callback invoked when a list of mailboxes has been retrieved. - If we have a selected mailbox in the global options, we directly pick it. - Otherwise, we offer a prompt to let user choose one. - """ - all_mbox_list = [e[2] for e in result] - s = '\n'.join(['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(all_mbox_list)), all_mbox_list)]) - if not s: - return defer.fail(Exception("No mailboxes exist on server!")) - - selected_mailbox = _opts.get('mailbox') - - if not selected_mailbox: - return proto.prompt(s + "\nWhich mailbox? [1] " - ).addCallback(cbPickMailbox, proto, all_mbox_list - ) - else: - mboxes_lower = map(lambda s: s.lower(), all_mbox_list) - index = mboxes_lower.index(selected_mailbox.lower()) + 1 - return cbPickMailbox(index, proto, all_mbox_list) - - -def cbPickMailbox(result, proto, mboxes): - """ - When the user selects a mailbox, "examine" it. - """ - mbox = mboxes[int(result or '1') - 1] - return proto.examine(mbox - ).addCallback(cbExamineMbox, proto - ) - - -def cbExamineMbox(result, proto): - """ - Callback invoked when examine command completes. - - Retrieve the subject header of every message in the mailbox. - """ - return proto.fetchSpecific('1:*', - headerType='HEADER.FIELDS', - headerArgs=['SUBJECT'], - ).addCallback(cbFetch, proto, - ) - - -def cbFetch(result, proto): - """ - Display a listing of the messages in the mailbox, based on the collected - headers. - """ - selected_subject = _opts.get('subject', None) - index = None - - if result: - keys = result.keys() - keys.sort() - - if selected_subject: - for k in keys: - # remove 'Subject: ' preffix plus eol - subject = result[k][0][2][9:].rstrip('\r\n') - if subject.lower() == selected_subject.lower(): - index = k - break - else: - for k in keys: - proto.display('%s %s' % (k, result[k][0][2])) - else: - print "Hey, an empty mailbox!" - - if not index: - return proto.prompt("\nWhich message? [1] (Q quits) " - ).addCallback(cbPickMessage, proto) - else: - return cbPickMessage(index, proto) - - -def cbPickMessage(result, proto): - """ - Pick a message. - """ - if result == "Q": - print "Bye!" - return proto.logout() - - return proto.fetchSpecific( - '%s' % result, - headerType='', - headerArgs=['BODY.PEEK[]'], - ).addCallback(cbShowmessage, proto) - - -def cbShowmessage(result, proto): - """ - Display message. - """ - if result: - keys = result.keys() - keys.sort() - for k in keys: - proto.display('%s %s' % (k, result[k][0][2])) - else: - print "Hey, an empty message!" - - return proto.logout() - - -def cbClose(result): - """ - Close the connection when we finish everything. - """ - from twisted.internet import reactor - reactor.stop() - - -def main(): - import argparse - import ConfigParser - import sys - from twisted.internet import reactor - - description = ( - 'Get messages from a LEAP IMAP Proxy.\nThis is a ' - 'debugging tool, do not use this to retrieve any sensitive ' - 'information, or we will send ninjas to your house!') - epilog = ( - 'In case you want to automate the usage of this utility ' - 'you can place your credentials in a file pointed by ' - 'BITMASK_CREDENTIALS. You need to have a [Credentials] ' - 'section, with username=<user@provider> and password fields') - - parser = argparse.ArgumentParser(description=description, epilog=epilog) - credentials = os.environ.get('BITMASK_CREDENTIALS') - - if credentials: - try: - config = ConfigParser.ConfigParser() - config.read(credentials) - username = config.get('Credentials', 'username') - password = config.get('Credentials', 'password') - except Exception, e: - print "Error reading credentials file: {0}".format(e) - sys.exit() - else: - parser.add_argument('username', type=str) - parser.add_argument('password', type=str) - - parser.add_argument('--mailbox', dest='mailbox', default=None, - help='Which mailbox to retrieve. Empty for interactive prompt.') - parser.add_argument('--subject', dest='subject', default=None, - help='A subject for retrieve a mail that matches. Empty for interactive prompt.') - - ns = parser.parse_args() - - if not credentials: - username = ns.username - password = ns.password - - _opts['mailbox'] = ns.mailbox - _opts['subject'] = ns.subject - - hostname = "localhost" - port = "1984" - - onConn = defer.Deferred( - ).addCallback(cbServerGreeting, username, password - ).addErrback(ebConnection - ).addBoth(cbClose) - - factory = SimpleIMAP4ClientFactory(username, onConn) - - if port == '993': - reactor.connectSSL( - hostname, int(port), factory, ssl.ClientContextFactory()) - else: - if not port: - port = 143 - reactor.connectTCP(hostname, int(port), factory) - reactor.run() - - -if __name__ == '__main__': - main() diff --git a/src/leap/bitmask/mail/imap/tests/imapclient.py b/src/leap/bitmask/mail/imap/tests/imapclient.py deleted file mode 100755 index c353cee..0000000 --- a/src/leap/bitmask/mail/imap/tests/imapclient.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - - -""" -Simple IMAP4 client which connects to our custome -IMAP4 server: imapserver.py. -""" - -import sys - -from twisted.internet import protocol -from twisted.internet import defer -from twisted.internet import stdio -from twisted.mail import imap4 -from twisted.protocols import basic -from twisted.python import util -from twisted.python import log - - -class TrivialPrompter(basic.LineReceiver): - # from os import linesep as delimiter - - promptDeferred = None - - def prompt(self, msg): - assert self.promptDeferred is None - self.display(msg) - self.promptDeferred = defer.Deferred() - return self.promptDeferred - - def display(self, msg): - self.transport.write(msg) - - def lineReceived(self, line): - if self.promptDeferred is None: - return - d, self.promptDeferred = self.promptDeferred, None - d.callback(line) - - -class SimpleIMAP4Client(imap4.IMAP4Client): - - """ - Add callbacks when the client receives greeting messages from - an IMAP server. - """ - greetDeferred = None - - def serverGreeting(self, caps): - self.serverCapabilities = caps - if self.greetDeferred is not None: - d, self.greetDeferred = self.greetDeferred, None - d.callback(self) - - -class SimpleIMAP4ClientFactory(protocol.ClientFactory): - usedUp = False - protocol = SimpleIMAP4Client - - def __init__(self, username, onConn): - self.username = username - self.onConn = onConn - - def buildProtocol(self, addr): - assert not self.usedUp - self.usedUp = True - - p = self.protocol() - p.factory = self - p.greetDeferred = self.onConn - - p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) - p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) - p.registerAuthenticator( - imap4.CramMD5ClientAuthenticator(self.username)) - - return p - - def clientConnectionFailed(self, connector, reason): - d, self.onConn = self.onConn, None - d.errback(reason) - - -def cbServerGreeting(proto, username, password): - """ - Initial callback - invoked after the server sends us its greet message. - """ - # Hook up stdio - tp = TrivialPrompter() - stdio.StandardIO(tp) - - # And make it easily accessible - proto.prompt = tp.prompt - proto.display = tp.display - - # Try to authenticate securely - return proto.authenticate( - password).addCallback( - cbAuthentication, proto).addErrback( - ebAuthentication, proto, username, password) - - -def ebConnection(reason): - """ - Fallback error-handler. If anything goes wrong, log it and quit. - """ - log.startLogging(sys.stdout) - log.err(reason) - return reason - - -def cbAuthentication(result, proto): - """ - Callback after authentication has succeeded. - List a bunch of mailboxes. - """ - return proto.list("", "*" - ).addCallback(cbMailboxList, proto - ) - - -def ebAuthentication(failure, proto, username, password): - """ - Errback invoked when authentication fails. - If it failed because no SASL mechanisms match, offer the user the choice - of logging in insecurely. - If you are trying to connect to your Gmail account, you will be here! - """ - failure.trap(imap4.NoSupportedAuthentication) - return proto.prompt( - "No secure authentication available. Login insecurely? (y/N) " - ).addCallback(cbInsecureLogin, proto, username, password - ) - - -def cbInsecureLogin(result, proto, username, password): - """ - Callback for "insecure-login" prompt. - """ - if result.lower() == "y": - # If they said yes, do it. - return proto.login(username, password - ).addCallback(cbAuthentication, proto - ) - return defer.fail(Exception("Login failed for security reasons.")) - - -def cbMailboxList(result, proto): - """ - Callback invoked when a list of mailboxes has been retrieved. - """ - result = [e[2] for e in result] - s = '\n'.join( - ['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(result)), result)]) - if not s: - return defer.fail(Exception("No mailboxes exist on server!")) - return proto.prompt(s + "\nWhich mailbox? [1] " - ).addCallback(cbPickMailbox, proto, result - ) - - -def cbPickMailbox(result, proto, mboxes): - """ - When the user selects a mailbox, "examine" it. - """ - mbox = mboxes[int(result or '1') - 1] - return proto.status(mbox, 'MESSAGES', 'UNSEEN' - ).addCallback(cbMboxStatus, proto) - - -def cbMboxStatus(result, proto): - print "You have %s messages (%s unseen)!" % ( - result['MESSAGES'], result['UNSEEN']) - return proto.logout() - - -def cbClose(result): - """ - Close the connection when we finish everything. - """ - from twisted.internet import reactor - reactor.stop() - - -def main(): - hostname = raw_input('IMAP4 Server Hostname: ') - port = raw_input('IMAP4 Server Port (the default is 143): ') - username = raw_input('IMAP4 Username: ') - password = util.getPassword('IMAP4 Password: ') - - onConn = defer.Deferred( - ).addCallback(cbServerGreeting, username, password - ).addErrback(ebConnection - ).addBoth(cbClose) - - factory = SimpleIMAP4ClientFactory(username, onConn) - - from twisted.internet import reactor - conn = reactor.connectTCP(hostname, int(port), factory) - reactor.run() - - -if __name__ == '__main__': - main() diff --git a/src/leap/bitmask/mail/imap/tests/regressions_mime_struct b/src/leap/bitmask/mail/imap/tests/regressions_mime_struct deleted file mode 100755 index 0332664..0000000 --- a/src/leap/bitmask/mail/imap/tests/regressions_mime_struct +++ /dev/null @@ -1,461 +0,0 @@ -#!/usr/bin/env python - -# -*- coding: utf-8 -*- -# regression_mime_struct -# Copyright (C) 2014 LEAP -# Copyright (c) Twisted Matrix Laboratories. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Simple Regression Tests for checking MIME struct handling using IMAP4 client. - -Iterates trough all mails under a given folder and tries to APPEND them to -the server being tested. After FETCHING the pushed message, it compares -the received version with the one that was saved, and exits with an error -code if they do not match. -""" -import os -import StringIO -import sys - -from email.parser import Parser - -from twisted.internet import protocol -from twisted.internet import ssl -from twisted.internet import defer -from twisted.internet import stdio -from twisted.mail import imap4 -from twisted.protocols import basic -from twisted.python import log - - -REGRESSIONS_FOLDER = os.environ.get( - "REGRESSIONS_FOLDER", "regressions_test") -print "[+] Using regressions folder:", REGRESSIONS_FOLDER - -parser = Parser() - - -def get_msg_parts(raw): - """ - Return a representation of the parts of a message suitable for - comparison. - - :param raw: string for the message - :type raw: str - """ - m = parser.parsestr(raw) - return [dict(part.items()) - if part.is_multipart() - else part.get_payload() - for part in m.walk()] - - -def compare_msg_parts(a, b): - """ - Compare two sequences of parts of messages. - - :param a: part sequence for message a - :param b: part sequence for message b - - :return: True if both message sequences are equivalent. - :rtype: bool - """ - # XXX This could be smarter and show the differences in the - # different parts when/where they differ. - #import pprint; pprint.pprint(a[0]) - #import pprint; pprint.pprint(b[0]) - - def lowerkey(d): - return dict((k.lower(), v.replace('\r', '')) - for k, v in d.iteritems()) - - def eq(x, y): - # For dicts, we compare a variation with their keys - # in lowercase, and \r removed from their values - if all(map(lambda i: isinstance(i, dict), (x, y))): - x, y = map(lowerkey, (x, y)) - return x == y - - compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b)) - all_match = all(compare_vector) - - if not all_match: - print "PARTS MISMATCH!" - print "vector: ", compare_vector - index = compare_vector.index(False) - from pprint import pprint - print "Expected:" - pprint(a[index]) - print ("***") - print "Found:" - pprint(b[index]) - print - - return all_match - - -def get_fd(string): - """ - Return a file descriptor with the passed string - as content. - """ - fd = StringIO.StringIO() - fd.write(string) - fd.seek(0) - return fd - - -class TrivialPrompter(basic.LineReceiver): - promptDeferred = None - - def prompt(self, msg): - assert self.promptDeferred is None - self.display(msg) - self.promptDeferred = defer.Deferred() - return self.promptDeferred - - def display(self, msg): - self.transport.write(msg) - - def lineReceived(self, line): - if self.promptDeferred is None: - return - d, self.promptDeferred = self.promptDeferred, None - d.callback(line) - - -class SimpleIMAP4Client(imap4.IMAP4Client): - """ - A client with callbacks for greeting messages from an IMAP server. - """ - greetDeferred = None - - def serverGreeting(self, caps): - self.serverCapabilities = caps - if self.greetDeferred is not None: - d, self.greetDeferred = self.greetDeferred, None - d.callback(self) - - -class SimpleIMAP4ClientFactory(protocol.ClientFactory): - usedUp = False - protocol = SimpleIMAP4Client - - def __init__(self, username, onConn): - self.ctx = ssl.ClientContextFactory() - - self.username = username - self.onConn = onConn - - def buildProtocol(self, addr): - """ - Initiate the protocol instance. Since we are building a simple IMAP - client, we don't bother checking what capabilities the server has. We - just add all the authenticators twisted.mail has. Note: Gmail no - longer uses any of the methods below, it's been using XOAUTH since - 2010. - """ - assert not self.usedUp - self.usedUp = True - - p = self.protocol(self.ctx) - p.factory = self - p.greetDeferred = self.onConn - - p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) - p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) - p.registerAuthenticator( - imap4.CramMD5ClientAuthenticator(self.username)) - - return p - - def clientConnectionFailed(self, connector, reason): - d, self.onConn = self.onConn, None - d.errback(reason) - - -def cbServerGreeting(proto, username, password): - """ - Initial callback - invoked after the server sends us its greet message. - """ - # Hook up stdio - tp = TrivialPrompter() - stdio.StandardIO(tp) - - # And make it easily accessible - proto.prompt = tp.prompt - proto.display = tp.display - - # Try to authenticate securely - return proto.authenticate( - password).addCallback( - cbAuthentication, - proto).addErrback( - ebAuthentication, proto, username, password - ) - - -def ebConnection(reason): - """ - Fallback error-handler. If anything goes wrong, log it and quit. - """ - log.startLogging(sys.stdout) - log.err(reason) - return reason - - -def cbAuthentication(result, proto): - """ - Callback after authentication has succeeded. - - Lists a bunch of mailboxes. - """ - return proto.select( - REGRESSIONS_FOLDER - ).addCallback( - cbSelectMbox, proto - ).addErrback( - ebSelectMbox, proto, REGRESSIONS_FOLDER) - - -def ebAuthentication(failure, proto, username, password): - """ - Errback invoked when authentication fails. - - If it failed because no SASL mechanisms match, offer the user the choice - of logging in insecurely. - - If you are trying to connect to your Gmail account, you will be here! - """ - failure.trap(imap4.NoSupportedAuthentication) - return InsecureLogin(proto, username, password) - - -def InsecureLogin(proto, username, password): - """ - Raise insecure-login error. - """ - return proto.login( - username, password - ).addCallback( - cbAuthentication, proto) - - -def cbSelectMbox(result, proto): - """ - Callback invoked when select command finishes successfully. - - If any message is in the test folder, it will flag them as deleted and - expunge. - If no messages found, it will start with the APPEND tests. - """ - print "SELECT: %s EXISTS " % result.get("EXISTS", "??") - - if result["EXISTS"] != 0: - # Flag as deleted, expunge, and do an examine again. - print "There is mail here, will delete..." - return cbDeleteAndExpungeTestFolder(proto) - - else: - return cbAppendNextMessage(proto) - - -def ebSelectMbox(failure, proto, folder): - """ - Errback invoked when the examine command fails. - - Creates the folder. - """ - log.err(failure) - log.msg("Folder %r does not exist. Creating..." % (folder,)) - return proto.create(folder).addCallback(cbAuthentication, proto) - - -def ebExpunge(failure): - log.err(failure) - - -def cbDeleteAndExpungeTestFolder(proto): - """ - Callback invoked fom cbExamineMbox when the number of messages in the - mailbox is not zero. It flags all messages as deleted and expunge the - mailbox. - """ - return proto.setFlags( - "1:*", ("\\Deleted",) - ).addCallback( - lambda r: proto.expunge() - ).addCallback( - cbExpunge, proto - ).addErrback( - ebExpunge) - - -def cbExpunge(result, proto): - return proto.select( - REGRESSIONS_FOLDER - ).addCallback( - cbSelectMbox, proto - ).addErrback(ebSettingDeleted, proto) - - -def ebSettingDeleted(failure, proto): - """ - Report errors during deletion of messages in the mailbox. - """ - print failure.getTraceback() - - -def cbAppendNextMessage(proto): - """ - Appends the next message in the global queue to the test folder. - """ - # 1. Get the next test message from global tuple. - try: - next_sample = SAMPLES.pop() - except IndexError: - # we're done! - return proto.logout() - - print "\nAPPEND %s" % (next_sample,) - raw = open(next_sample).read() - msg = get_fd(raw) - return proto.append( - REGRESSIONS_FOLDER, msg - ).addCallback( - lambda r: proto.select(REGRESSIONS_FOLDER) - ).addCallback( - cbAppend, proto, raw - ).addErrback( - ebAppend, proto, raw) - - -def cbAppend(result, proto, orig_msg): - """ - Fetches the message right after an append. - """ - # XXX keep account of highest UID - uid = "1:*" - - return proto.fetchSpecific( - '%s' % uid, - headerType='', - headerArgs=['BODY.PEEK[]'], - ).addCallback( - cbCompareMessage, proto, orig_msg - ).addErrback(ebAppend, proto, orig_msg) - - -def ebAppend(failure, proto, raw): - """ - Errorback for the append operation - """ - print "ERROR WHILE APPENDING!" - print failure.getTraceback() - - -def cbPickMessage(result, proto): - """ - Pick a message. - """ - return proto.fetchSpecific( - '%s' % result, - headerType='', - headerArgs=['BODY.PEEK[]'], - ).addCallback(cbCompareMessage, proto) - - -def cbCompareMessage(result, proto, raw): - """ - Display message and compare it with the original one. - """ - parts_orig = get_msg_parts(raw) - - if result: - keys = result.keys() - keys.sort() - else: - print "[-] GOT NO RESULT" - return proto.logout() - - latest = max(keys) - - fetched_msg = result[latest][0][2] - parts_fetched = get_msg_parts(fetched_msg) - - equal = compare_msg_parts( - parts_orig, - parts_fetched) - - if equal: - print "[+] MESSAGES MATCH" - return cbAppendNextMessage(proto) - else: - print "[-] ERROR: MESSAGES DO NOT MATCH !!!" - print " ABORTING COMPARISON..." - # FIXME logout and print the subject ... - return proto.logout() - - -def cbClose(result): - """ - Close the connection when we finish everything. - """ - from twisted.internet import reactor - reactor.stop() - - -def main(): - import glob - import sys - - if len(sys.argv) != 4: - print "Usage: regressions <user> <pass> <samples-folder>" - sys.exit() - - hostname = "localhost" - port = "1984" - username = sys.argv[1] - password = sys.argv[2] - - samplesdir = sys.argv[3] - - if not os.path.isdir(samplesdir): - print ("Could not find samples folder! " - "Make sure of copying mail_breaker contents there.") - sys.exit() - - samples = glob.glob(samplesdir + '/*') - - global SAMPLES - SAMPLES = [] - SAMPLES += samples - - onConn = defer.Deferred( - ).addCallback( - cbServerGreeting, username, password - ).addErrback( - ebConnection - ).addBoth(cbClose) - - factory = SimpleIMAP4ClientFactory(username, onConn) - - from twisted.internet import reactor - reactor.connectTCP(hostname, int(port), factory) - reactor.run() - - -if __name__ == '__main__': - main() diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.message b/src/leap/bitmask/mail/imap/tests/rfc822.message deleted file mode 120000 index b19cc28..0000000 --- a/src/leap/bitmask/mail/imap/tests/rfc822.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.message
\ No newline at end of file diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message deleted file mode 120000 index e0aa678..0000000 --- a/src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.multi-minimal.message
\ No newline at end of file diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message deleted file mode 120000 index 306d0de..0000000 --- a/src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.multi-nested.message
\ No newline at end of file diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message deleted file mode 120000 index 4172244..0000000 --- a/src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.multi-signed.message
\ No newline at end of file diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi.message deleted file mode 120000 index 62057d2..0000000 --- a/src/leap/bitmask/mail/imap/tests/rfc822.multi.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.multi.message
\ No newline at end of file diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.plain.message b/src/leap/bitmask/mail/imap/tests/rfc822.plain.message deleted file mode 120000 index 5bab0e8..0000000 --- a/src/leap/bitmask/mail/imap/tests/rfc822.plain.message +++ /dev/null @@ -1 +0,0 @@ -../../tests/rfc822.plain.message
\ No newline at end of file diff --git a/src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh b/src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh deleted file mode 100755 index 544faca..0000000 --- a/src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh +++ /dev/null @@ -1,178 +0,0 @@ -#!/bin/zsh -# BATCH STRESS TEST FOR IMAP ---------------------- -# http://imgs.xkcd.com/comics/science.jpg -# -# Run imaptest against a LEAP IMAP server -# for a fixed period of time, and collect output. -# -# Author: Kali Kaneko -# Date: 2014 01 26 -# -# To run, you need to have `imaptest` in your path. -# See: -# http://www.imapwiki.org/ImapTest/Installation -# -# For the tests, I'm using a 10MB file sample that -# can be downloaded from: -# http://www.dovecot.org/tmp/dovecot-crlf -# -# Want to contribute to benchmarking? -# -# 1. Create a pristine account in a bitmask provider. -# -# 2. Launch your bitmask client, with different flags -# if you desire. -# -# For example to try the nosync flag in sqlite: -# -# LEAP_SQLITE_NOSYNC=1 bitmask --debug -N --offline -l /tmp/leap.log -# -# 3. Run at several points in time (ie: just after -# launching the bitmask client. one minute after, -# ten minutes after) -# -# mkdir data -# cd data -# ../leap_tests_imap.zsh | tee sqlite_nosync_run2.log -# -# 4. Submit your results to: kali at leap dot se -# together with the logs of the bitmask run. -# -# Please provide also details about your system, and -# the type of hard disk setup you are running against. -# - -# ------------------------------------------------ -# Edit these variables if you are too lazy to pass -# the user and mbox as parameters. Like me. - -USER="test_f14@dev.bitmask.net" -MBOX="~/leap/imaptest/data/dovecot-crlf" - -HOST="localhost" -PORT="1984" - -# in case you have it aliased -GREP="/bin/grep" -IMAPTEST="imaptest" - -# ----------------------------------------------- -# -# These should be kept constant across benchmarking -# runs across different machines, for comparability. - -DURATION=200 -NUM_MSG=200 - - -# TODO add another function, and a cli flag, to be able -# to take several aggretates spaced in time, along a period -# of several minutes. - -imaptest_cmd() { - stdbuf -o0 ${IMAPTEST} user=${USER} pass=1234 host=${HOST} \ - port=${PORT} mbox=${MBOX} clients=1 msgs=${NUM_MSG} \ - no_pipelining 2>/dev/null -} - -stress_imap() { - mkfifo imap_pipe - cat imap_pipe | tee output & - imaptest_cmd >> imap_pipe -} - -wait_and_kill() { - while : - do - sleep $DURATION - pkill -2 imaptest - rm imap_pipe - break - done -} - -print_results() { - sleep 1 - echo - echo - echo "AGGREGATED RESULTS" - echo "----------------------" - echo "\tavg\tstdev" - $GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \ - gawk ' -function avg(data, count) { - sum=0; - for( x=0; x <= count-1; x++) { - sum += data[x]; - } - return sum/count; -} -function std_dev(data, count) { - sum=0; - for( x=0; x <= count-1; x++) { - sum += data[x]; - } - average = sum/count; - - sumsq=0; - for( x=0; x <= count-1; x++) { - sumsq += (data[x] - average)^2; - } - return sqrt(sumsq/count); -} -BEGIN { - cnt = 0 -} END { - -printf("LOGI:\t%04.2lf\t%04.2f\n", avg(array[1], NR), std_dev(array[1], NR)); -printf("LIST:\t%04.2lf\t%04.2f\n", avg(array[2], NR), std_dev(array[2], NR)); -printf("STAT:\t%04.2lf\t%04.2f\n", avg(array[3], NR), std_dev(array[3], NR)); -printf("SELE:\t%04.2lf\t%04.2f\n", avg(array[4], NR), std_dev(array[4], NR)); -printf("FETC:\t%04.2lf\t%04.2f\n", avg(array[5], NR), std_dev(array[5], NR)); -printf("FET2:\t%04.2lf\t%04.2f\n", avg(array[6], NR), std_dev(array[6], NR)); -printf("STOR:\t%04.2lf\t%04.2f\n", avg(array[7], NR), std_dev(array[7], NR)); -printf("DELE:\t%04.2lf\t%04.2f\n", avg(array[8], NR), std_dev(array[8], NR)); -printf("EXPU:\t%04.2lf\t%04.2f\n", avg(array[9], NR), std_dev(array[9], NR)); -printf("APPE:\t%04.2lf\t%04.2f\n", avg(array[10], NR), std_dev(array[10], NR)); -printf("LOGO:\t%04.2lf\t%04.2f\n", avg(array[11], NR), std_dev(array[11], NR)); - -print "" -print "TOT samples", NR; -} -{ - it = cnt++; - array[1][it] = $1; - array[2][it] = $2; - array[3][it] = $3; - array[4][it] = $4; - array[5][it] = $5; - array[6][it] = $6; - array[7][it] = $7; - array[8][it] = $8; - array[9][it] = $9; - array[10][it] = $10; - array[11][it] = $11; -}' -} - - -{ test $1 = "--help" } && { - echo "Usage: $0 [user@provider] [/path/to/sample.mbox]" - exit 0 -} - -# If the first parameter is passed, take it as the user -{ test $1 } && { - USER=$1 -} - -# If the second parameter is passed, take it as the mbox -{ test $2 } && { - MBOX=$2 -} - -echo "[+] LEAP IMAP TESTS" -echo "[+] Running imaptest for $DURATION seconds with $NUM_MSG messages" -wait_and_kill & -stress_imap -print_results diff --git a/src/leap/bitmask/mail/imap/tests/test_imap.py b/src/leap/bitmask/mail/imap/tests/test_imap.py deleted file mode 100644 index 9cca17f..0000000 --- a/src/leap/bitmask/mail/imap/tests/test_imap.py +++ /dev/null @@ -1,1060 +0,0 @@ -# -*- coding: utf-8 -*- -# test_imap.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test case for leap.email.imap.server -TestCases taken from twisted tests and modified to make them work -against our implementation of the IMAPAccount. - -@authors: Kali Kaneko, <kali@leap.se> -XXX add authors from the original twisted tests. - -@license: GPLv3, see included LICENSE file -""" -# XXX review license of the original tests!!! -import os -import string -import types - - -from twisted.mail import imap4 -from twisted.internet import defer -from twisted.python import util -from twisted.python import failure - -from twisted import cred - -from leap.mail.imap.mailbox import IMAPMailbox -from leap.mail.imap.messages import CaseInsensitiveDict -from leap.mail.testing.imap import IMAP4HelperMixin - - -TEST_USER = "testuser@leap.se" -TEST_PASSWD = "1234" - - -def strip(f): - return lambda result, f=f: f() - - -def sortNest(l): - l = l[:] - l.sort() - for i in range(len(l)): - if isinstance(l[i], types.ListType): - l[i] = sortNest(l[i]) - elif isinstance(l[i], types.TupleType): - l[i] = tuple(sortNest(list(l[i]))) - return l - - -class TestRealm: - """ - A minimal auth realm for testing purposes only - """ - theAccount = None - - def requestAvatar(self, avatarId, mind, *interfaces): - return imap4.IAccount, self.theAccount, lambda: None - -# -# TestCases -# - -# DEBUG --- -# from twisted.internet.base import DelayedCall -# DelayedCall.debug = True - - -class LEAPIMAP4ServerTestCase(IMAP4HelperMixin): - - """ - Tests for the generic behavior of the LEAPIMAP4Server - which, right now, it's just implemented in this test file as - LEAPIMAPServer. We will move the implementation, together with - authentication bits, to leap.mail.imap.server so it can be instantiated - from the tac file. - - Right now this TestCase tries to mimmick as close as possible the - organization from the twisted.mail.imap tests so we can achieve - a complete implementation. The order in which they appear reflect - the intended order of implementation. - """ - - # - # mailboxes operations - # - - def testCreate(self): - """ - Test whether we can create mailboxes - """ - succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'foobox') - fail = ('testbox', 'test/box') - acc = self.server.theAccount - - def cb(): - self.result.append(1) - - def eb(failure): - self.result.append(0) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def create(): - create_deferreds = [] - for name in succeed + fail: - d = self.client.create(name) - d.addCallback(strip(cb)).addErrback(eb) - create_deferreds.append(d) - dd = defer.gatherResults(create_deferreds) - dd.addCallbacks(self._cbStopClient, self._ebGeneral) - return dd - - self.result = [] - d1 = self.connected.addCallback(strip(login)) - d1.addCallback(strip(create)) - d2 = self.loopback() - d = defer.gatherResults([d1, d2], consumeErrors=True) - d.addCallback(lambda _: acc.account.list_all_mailbox_names()) - return d.addCallback(self._cbTestCreate, succeed, fail) - - def _cbTestCreate(self, mailboxes, succeed, fail): - self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail)) - - answers = ([u'INBOX', u'testbox', u'test/box', u'test', - u'test/box/box', 'foobox']) - self.assertEqual(sorted(mailboxes), sorted([a for a in answers])) - - def testDelete(self): - """ - Test whether we can delete mailboxes - """ - def add_mailbox(): - return self.server.theAccount.addMailbox('test-delete/me') - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def delete(): - return self.client.delete('test-delete/me') - - acc = self.server.theAccount.account - - d1 = self.connected.addCallback(add_mailbox) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(delete), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: acc.list_all_mailbox_names()) - d.addCallback(lambda mboxes: self.assertEqual( - mboxes, ['INBOX'])) - return d - - def testIllegalInboxDelete(self): - """ - Test what happens if we try to delete the user Inbox. - We expect that operation to fail. - """ - self.stashed = None - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def delete(): - return self.client.delete('inbox') - - def stash(result): - self.stashed = result - - d1 = self.connected.addCallback(strip(login)) - d1.addCallbacks(strip(delete), self._ebGeneral) - d1.addBoth(stash) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: self.failUnless(isinstance(self.stashed, - failure.Failure))) - return d - - def testNonExistentDelete(self): - """ - Test what happens if we try to delete a non-existent mailbox. - We expect an error raised stating 'No such mailbox' - """ - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def delete(): - return self.client.delete('delete/me') - self.failure = failure - - def deleteFailed(failure): - self.failure = failure - - self.failure = None - d1 = self.connected.addCallback(strip(login)) - d1.addCallback(strip(delete)).addErrback(deleteFailed) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: self.assertTrue( - str(self.failure.value).startswith('No such mailbox'))) - return d - - def testIllegalDelete(self): - """ - Try deleting a mailbox with sub-folders, and \NoSelect flag set. - An exception is expected. - """ - acc = self.server.theAccount - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def create_mailboxes(): - d1 = acc.addMailbox('delete') - d2 = acc.addMailbox('delete/me') - d = defer.gatherResults([d1, d2]) - return d - - def get_noselect_mailbox(mboxes): - mbox = mboxes[0] - return mbox.setFlags((r'\Noselect',)) - - def delete_mbox(ignored): - return self.client.delete('delete') - - def deleteFailed(failure): - self.failure = failure - - self.failure = None - - d1 = self.connected.addCallback(strip(login)) - d1.addCallback(strip(create_mailboxes)) - d1.addCallback(get_noselect_mailbox) - - d1.addCallback(delete_mbox).addErrback(deleteFailed) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - expected = ("Hierarchically inferior mailboxes exist " - "and \\Noselect is set") - d.addCallback(lambda _: - self.assertTrue(self.failure is not None)) - d.addCallback(lambda _: - self.assertEqual(str(self.failure.value), expected)) - return d - - # FIXME --- this test sometimes FAILS (timing issue). - # Some of the deferreds used in the rename op is not waiting for the - # operations properly - def testRename(self): - """ - Test whether we can rename a mailbox - """ - def create_mbox(): - return self.server.theAccount.addMailbox('oldmbox') - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def rename(): - return self.client.rename('oldmbox', 'newname') - - d1 = self.connected.addCallback(strip(create_mbox)) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(rename), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: - self.server.theAccount.account.list_all_mailbox_names()) - d.addCallback(lambda mboxes: - self.assertItemsEqual(mboxes, ['INBOX', 'newname'])) - return d - - def testIllegalInboxRename(self): - """ - Try to rename inbox. We expect it to fail. Then it would be not - an inbox anymore, would it? - """ - self.stashed = None - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def rename(): - return self.client.rename('inbox', 'frotz') - - def stash(stuff): - self.stashed = stuff - - d1 = self.connected.addCallback(strip(login)) - d1.addCallbacks(strip(rename), self._ebGeneral) - d1.addBoth(stash) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: - self.failUnless(isinstance( - self.stashed, failure.Failure))) - return d - - def testHierarchicalRename(self): - """ - Try to rename hierarchical mailboxes - """ - acc = self.server.theAccount - - def add_mailboxes(): - return defer.gatherResults([ - acc.addMailbox('oldmbox/m1'), - acc.addMailbox('oldmbox/m2')]) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def rename(): - return self.client.rename('oldmbox', 'newname') - - d1 = self.connected.addCallback(strip(add_mailboxes)) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(rename), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: acc.account.list_all_mailbox_names()) - return d.addCallback(self._cbTestHierarchicalRename) - - def _cbTestHierarchicalRename(self, mailboxes): - expected = ['INBOX', 'newname/m1', 'newname/m2'] - self.assertEqual(sorted(mailboxes), sorted([s for s in expected])) - - def testSubscribe(self): - """ - Test whether we can mark a mailbox as subscribed to - """ - acc = self.server.theAccount - - def add_mailbox(): - return acc.addMailbox('this/mbox') - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def subscribe(): - return self.client.subscribe('this/mbox') - - def get_subscriptions(ignored): - return self.server.theAccount.getSubscriptions() - - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(subscribe), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(get_subscriptions) - d.addCallback(lambda subscriptions: - self.assertEqual(subscriptions, - ['this/mbox'])) - return d - - def testUnsubscribe(self): - """ - Test whether we can unsubscribe from a set of mailboxes - """ - acc = self.server.theAccount - - def add_mailboxes(): - return defer.gatherResults([ - acc.addMailbox('this/mbox'), - acc.addMailbox('that/mbox')]) - - def dc1(): - return acc.subscribe('this/mbox') - - def dc2(): - return acc.subscribe('that/mbox') - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def unsubscribe(): - return self.client.unsubscribe('this/mbox') - - def get_subscriptions(ignored): - return acc.getSubscriptions() - - d1 = self.connected.addCallback(strip(add_mailboxes)) - d1.addCallback(strip(login)) - d1.addCallback(strip(dc1)) - d1.addCallback(strip(dc2)) - d1.addCallbacks(strip(unsubscribe), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(get_subscriptions) - d.addCallback(lambda subscriptions: - self.assertEqual(subscriptions, - ['that/mbox'])) - return d - - def testSelect(self): - """ - Try to select a mailbox - """ - mbox_name = "TESTMAILBOXSELECT" - self.selectedArgs = None - - acc = self.server.theAccount - - def add_mailbox(): - return acc.addMailbox(mbox_name, creation_ts=42) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def select(): - def selected(args): - self.selectedArgs = args - self._cbStopClient(None) - d = self.client.select(mbox_name) - d.addCallback(selected) - return d - - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallback(strip(select)) - # d1.addErrback(self._ebGeneral) - - d2 = self.loopback() - - d = defer.gatherResults([d1, d2]) - d.addCallback(self._cbTestSelect) - return d - - def _cbTestSelect(self, ignored): - self.assertTrue(self.selectedArgs is not None) - - self.assertEqual(self.selectedArgs, { - 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42, - 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged', - '\\Deleted', '\\Draft', '\\Recent', 'List'), - 'READ-WRITE': True - }) - - # - # capabilities - # - - def testCapability(self): - caps = {} - - def getCaps(): - def gotCaps(c): - caps.update(c) - self.server.transport.loseConnection() - return self.client.getCapabilities().addCallback(gotCaps) - - d1 = self.connected - d1.addCallback( - strip(getCaps)).addErrback(self._ebGeneral) - - d = defer.gatherResults([self.loopback(), d1]) - expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'LITERAL+': None, - 'IDLE': None} - d.addCallback(lambda _: self.assertEqual(expected, caps)) - return d - - def testCapabilityWithAuth(self): - caps = {} - self.server.challengers[ - 'CRAM-MD5'] = cred.credentials.CramMD5Credentials - - def getCaps(): - def gotCaps(c): - caps.update(c) - self.server.transport.loseConnection() - return self.client.getCapabilities().addCallback(gotCaps) - d1 = self.connected.addCallback( - strip(getCaps)).addErrback(self._ebGeneral) - - d = defer.gatherResults([self.loopback(), d1]) - - expCap = {'IMAP4rev1': None, 'NAMESPACE': None, - 'IDLE': None, 'LITERAL+': None, - 'AUTH': ['CRAM-MD5']} - - d.addCallback(lambda _: self.assertEqual(expCap, caps)) - return d - - # - # authentication - # - - def testLogout(self): - """ - Test log out - """ - self.loggedOut = 0 - - def logout(): - def setLoggedOut(): - self.loggedOut = 1 - self.client.logout().addCallback(strip(setLoggedOut)) - self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral) - d = self.loopback() - return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1)) - - def testNoop(self): - """ - Test noop command - """ - self.responses = None - - def noop(): - def setResponses(responses): - self.responses = responses - self.server.transport.loseConnection() - self.client.noop().addCallback(setResponses) - self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral) - d = self.loopback() - return d.addCallback(lambda _: self.assertEqual(self.responses, [])) - - def testLogin(self): - """ - Test login - """ - def login(): - d = self.client.login(TEST_USER, TEST_PASSWD) - d.addCallback(self._cbStopClient) - d1 = self.connected.addCallback( - strip(login)).addErrback(self._ebGeneral) - d = defer.gatherResults([d1, self.loopback()]) - return d.addCallback(self._cbTestLogin) - - def _cbTestLogin(self, ignored): - self.assertEqual(self.server.state, 'auth') - - def testFailedLogin(self): - """ - Test bad login - """ - def login(): - d = self.client.login("bad_user@leap.se", TEST_PASSWD) - d.addBoth(self._cbStopClient) - - d1 = self.connected.addCallback( - strip(login)).addErrback(self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - return d.addCallback(self._cbTestFailedLogin) - - def _cbTestFailedLogin(self, ignored): - self.assertEqual(self.server.state, 'unauth') - self.assertEqual(self.server.account, None) - - def testLoginRequiringQuoting(self): - """ - Test login requiring quoting - """ - self.server.checker.userid = '{test}user@leap.se' - self.server.checker.password = '{test}password' - - def login(): - d = self.client.login('{test}user@leap.se', '{test}password') - d.addBoth(self._cbStopClient) - - d1 = self.connected.addCallback( - strip(login)).addErrback(self._ebGeneral) - d = defer.gatherResults([self.loopback(), d1]) - return d.addCallback(self._cbTestLoginRequiringQuoting) - - def _cbTestLoginRequiringQuoting(self, ignored): - self.assertEqual(self.server.state, 'auth') - - # - # Inspection - # - - def testNamespace(self): - """ - Test retrieving namespace - """ - self.namespaceArgs = None - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def namespace(): - def gotNamespace(args): - self.namespaceArgs = args - self._cbStopClient(None) - return self.client.namespace().addCallback(gotNamespace) - - d1 = self.connected.addCallback(strip(login)) - d1.addCallback(strip(namespace)) - d1.addErrback(self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: self.assertEqual(self.namespaceArgs, - [[['', '/']], [], []])) - return d - - def testExamine(self): - """ - L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and - returns a L{Deferred} which fires with a C{dict} with as many of the - following keys as the server includes in its response: C{'FLAGS'}, - C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'}, - C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}. - - Unfortunately the server doesn't generate all of these so it's hard to - test the client's handling of them here. See - L{IMAP4ClientExamineTests} below. - - See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2, - for details. - """ - # TODO implement the IMAP4ClientExamineTests testcase. - mbox_name = "test_mailbox_e" - acc = self.server.theAccount - self.examinedArgs = None - - def add_mailbox(): - return acc.addMailbox(mbox_name, creation_ts=42) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def examine(): - def examined(args): - self.examinedArgs = args - self._cbStopClient(None) - d = self.client.examine(mbox_name) - d.addCallback(examined) - return d - - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallback(strip(examine)) - d1.addErrback(self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - return d.addCallback(self._cbTestExamine) - - def _cbTestExamine(self, ignored): - self.assertEqual(self.examinedArgs, { - 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42, - 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged', - '\\Deleted', '\\Draft', '\\Recent', 'List'), - 'READ-WRITE': False}) - - def _listSetup(self, f, f2=None): - - acc = self.server.theAccount - - def dc1(): - return acc.addMailbox('root_subthing', creation_ts=42) - - def dc2(): - return acc.addMailbox('root_another_thing', creation_ts=42) - - def dc3(): - return acc.addMailbox('non_root_subthing', creation_ts=42) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def listed(answers): - self.listed = answers - - self.listed = None - d1 = self.connected.addCallback(strip(login)) - d1.addCallback(strip(dc1)) - d1.addCallback(strip(dc2)) - d1.addCallback(strip(dc3)) - - if f2 is not None: - d1.addCallback(f2) - - d1.addCallbacks(strip(f), self._ebGeneral) - d1.addCallbacks(listed, self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed) - - def testList(self): - """ - Test List command - """ - def list(): - return self.client.list('root', '%') - - d = self._listSetup(list) - d.addCallback(lambda listed: self.assertEqual( - sortNest(listed), - sortNest([ - (IMAPMailbox.init_flags, "/", "root_subthing"), - (IMAPMailbox.init_flags, "/", "root_another_thing") - ]) - )) - return d - - def testLSub(self): - """ - Test LSub command - """ - acc = self.server.theAccount - - def subs_mailbox(): - # why not client.subscribe instead? - return acc.subscribe('root_subthing') - - def lsub(): - return self.client.lsub('root', '%') - - d = self._listSetup(lsub, strip(subs_mailbox)) - d.addCallback(self.assertEqual, - [(IMAPMailbox.init_flags, "/", "root_subthing")]) - return d - - def testStatus(self): - """ - Test Status command - """ - acc = self.server.theAccount - - def add_mailbox(): - return acc.addMailbox('root_subthings') - - # XXX FIXME ---- should populate this a little bit, - # with unseen etc... - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def status(): - return self.client.status( - 'root_subthings', 'MESSAGES', 'UIDNEXT', 'UNSEEN') - - def statused(result): - self.statused = result - - self.statused = None - - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(status), self._ebGeneral) - d1.addCallbacks(statused, self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: self.assertEqual( - self.statused, - {'MESSAGES': 0, 'UIDNEXT': '1', 'UNSEEN': 0} - )) - return d - - def testFailedStatus(self): - """ - Test failed status command with a non-existent mailbox - """ - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def status(): - return self.client.status( - 'root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN') - - def statused(result): - self.statused = result - - def failed(failure): - self.failure = failure - - self.statused = self.failure = None - d1 = self.connected.addCallback(strip(login)) - d1.addCallbacks(strip(status), self._ebGeneral) - d1.addCallbacks(statused, failed) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - return defer.gatherResults([d1, d2]).addCallback( - self._cbTestFailedStatus) - - def _cbTestFailedStatus(self, ignored): - self.assertEqual( - self.statused, None - ) - self.assertEqual( - self.failure.value.args, - ('Could not open mailbox',) - ) - - # - # messages - # - - def testFullAppend(self): - """ - Test appending a full message to the mailbox - """ - infile = util.sibpath(__file__, 'rfc822.message') - message = open(infile) - acc = self.server.theAccount - mailbox_name = "appendmbox/subthing" - - def add_mailbox(): - return acc.addMailbox(mailbox_name) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def append(): - return self.client.append( - mailbox_name, message, - ('\\SEEN', '\\DELETED'), - 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', - ) - - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(append), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - - d.addCallback(lambda _: acc.getMailbox(mailbox_name)) - d.addCallback(lambda mb: mb.fetch(imap4.MessageSet(start=1), True)) - return d.addCallback(self._cbTestFullAppend, infile) - - def _cbTestFullAppend(self, fetched, infile): - fetched = list(fetched) - self.assertTrue(len(fetched) == 1) - self.assertTrue(len(fetched[0]) == 2) - uid, msg = fetched[0] - parsed = self.parser.parse(open(infile)) - expected_body = parsed.get_payload() - expected_headers = CaseInsensitiveDict(parsed.items()) - - def assert_flags(flags): - self.assertEqual( - set(('\\SEEN', '\\DELETED')), - set(flags)) - - def assert_date(date): - self.assertEqual( - 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', - date) - - def assert_body(body): - gotbody = body.read() - self.assertEqual(expected_body, gotbody) - - def assert_headers(headers): - self.assertItemsEqual(map(string.lower, expected_headers), headers) - - d = defer.maybeDeferred(msg.getFlags) - d.addCallback(assert_flags) - - d.addCallback(lambda _: defer.maybeDeferred(msg.getInternalDate)) - d.addCallback(assert_date) - - d.addCallback( - lambda _: defer.maybeDeferred( - msg.getBodyFile, self._soledad)) - d.addCallback(assert_body) - - d.addCallback(lambda _: defer.maybeDeferred(msg.getHeaders, True)) - d.addCallback(assert_headers) - - return d - - def testPartialAppend(self): - """ - Test partially appending a message to the mailbox - """ - # TODO this test sometimes will fail because of the notify_just_mdoc - infile = util.sibpath(__file__, 'rfc822.message') - - acc = self.server.theAccount - - def add_mailbox(): - return acc.addMailbox('PARTIAL/SUBTHING') - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def append(): - message = file(infile) - return self.client.sendCommand( - imap4.Command( - 'APPEND', - 'PARTIAL/SUBTHING (\\SEEN) "Right now" ' - '{%d}' % os.path.getsize(infile), - (), self.client._IMAP4Client__cbContinueAppend, message - ) - ) - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallbacks(strip(append), self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - - d.addCallback(lambda _: acc.getMailbox("PARTIAL/SUBTHING")) - d.addCallback(lambda mb: mb.fetch(imap4.MessageSet(start=1), True)) - return d.addCallback( - self._cbTestPartialAppend, infile) - - def _cbTestPartialAppend(self, fetched, infile): - fetched = list(fetched) - self.assertTrue(len(fetched) == 1) - self.assertTrue(len(fetched[0]) == 2) - uid, msg = fetched[0] - parsed = self.parser.parse(open(infile)) - expected_body = parsed.get_payload() - - def assert_flags(flags): - self.assertEqual( - set((['\\SEEN'])), set(flags)) - - def assert_body(body): - gotbody = body.read() - self.assertEqual(expected_body, gotbody) - - d = defer.maybeDeferred(msg.getFlags) - d.addCallback(assert_flags) - - d.addCallback(lambda _: defer.maybeDeferred(msg.getBodyFile)) - d.addCallback(assert_body) - return d - - def testCheck(self): - """ - Test check command - """ - def add_mailbox(): - return self.server.theAccount.addMailbox('root/subthing') - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def select(): - return self.client.select('root/subthing') - - def check(): - return self.client.check() - - d = self.connected.addCallbacks( - strip(add_mailbox), self._ebGeneral) - d.addCallbacks(lambda _: login(), self._ebGeneral) - d.addCallbacks(strip(select), self._ebGeneral) - d.addCallbacks(strip(check), self._ebGeneral) - d.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - return defer.gatherResults([d, d2]) - - # Okay, that was much fun indeed - - def testExpunge(self): - """ - Test expunge command - """ - acc = self.server.theAccount - mailbox_name = 'mailboxexpunge' - - def add_mailbox(): - return acc.addMailbox(mailbox_name) - - def login(): - return self.client.login(TEST_USER, TEST_PASSWD) - - def select(): - return self.client.select(mailbox_name) - - def save_mailbox(mailbox): - self.mailbox = mailbox - - def get_mailbox(): - d = acc.getMailbox(mailbox_name) - d.addCallback(save_mailbox) - return d - - def add_messages(): - d = self.mailbox.addMessage( - 'test 1', flags=('\\Deleted', 'AnotherFlag'), - notify_just_mdoc=False) - d.addCallback(lambda _: self.mailbox.addMessage( - 'test 2', flags=('AnotherFlag',), - notify_just_mdoc=False)) - d.addCallback(lambda _: self.mailbox.addMessage( - 'test 3', flags=('\\Deleted',), - notify_just_mdoc=False)) - return d - - def expunge(): - return self.client.expunge() - - def expunged(results): - self.failIf(self.server.mbox is None) - self.results = results - - self.results = None - d1 = self.connected.addCallback(strip(add_mailbox)) - d1.addCallback(strip(login)) - d1.addCallback(strip(get_mailbox)) - d1.addCallbacks(strip(add_messages), self._ebGeneral) - d1.addCallbacks(strip(select), self._ebGeneral) - d1.addCallbacks(strip(expunge), self._ebGeneral) - d1.addCallbacks(expunged, self._ebGeneral) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d2 = self.loopback() - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: self.mailbox.getMessageCount()) - return d.addCallback(self._cbTestExpunge) - - def _cbTestExpunge(self, count): - # we only left 1 mssage with no deleted flag - self.assertEqual(count, 1) - # the uids of the deleted messages - self.assertItemsEqual(self.results, [1, 3]) - - -class AccountTestCase(IMAP4HelperMixin): - """ - Test the Account. - """ - def _create_empty_mailbox(self): - return self.server.theAccount.addMailbox('') - - def _create_one_mailbox(self): - return self.server.theAccount.addMailbox('one') - - def test_illegalMailboxCreate(self): - self.assertRaises(AssertionError, self._create_empty_mailbox) - - -class IMAP4ServerSearchTestCase(IMAP4HelperMixin): - """ - Tests for the behavior of the search_* functions in L{imap5.IMAP4Server}. - """ - # XXX coming soon to your screens! - pass diff --git a/src/leap/bitmask/mail/imap/tests/walktree.py b/src/leap/bitmask/mail/imap/tests/walktree.py deleted file mode 100644 index f259a55..0000000 --- a/src/leap/bitmask/mail/imap/tests/walktree.py +++ /dev/null @@ -1,127 +0,0 @@ -# -*- coding: utf-8 -*- -# walktree.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for the walktree module. -""" -import os -import sys -import pprint -from email import parser - -from leap.mail import walk as W - -DEBUG = os.environ.get("BITMASK_MAIL_DEBUG") - - -p = parser.Parser() - -# TODO pass an argument of the type of message - -################################################## -# Input from hell - -if len(sys.argv) > 1: - FILENAME = sys.argv[1] -else: - FILENAME = "rfc822.multi-signed.message" - -""" -FILENAME = "rfc822.plain.message" -FILENAME = "rfc822.multi-minimal.message" -""" - -msg = p.parse(open(FILENAME)) -DO_CHECK = False -################################################# - -parts = W.get_parts(msg) - -if DEBUG: - def trim(item): - item = item[:10] - [trim(part["phash"]) for part in parts if part.get('phash', None)] - -raw_docs = list(W.get_raw_docs(msg, parts)) - -body_phash_fun = [W.get_body_phash_simple, - W.get_body_phash_multi][int(msg.is_multipart())] -body_phash = body_phash_fun(W.get_payloads(msg)) -parts_map = W.walk_msg_tree(parts, body_phash=body_phash) - - -# TODO add missing headers! -expected = { - 'body': '1ddfa80485', - 'multi': True, - 'part_map': { - 1: { - 'headers': {'Content-Disposition': 'inline', - 'Content-Type': 'multipart/mixed; ' - 'boundary="z0eOaCaDLjvTGF2l"'}, - 'multi': True, - 'part_map': {1: {'ctype': 'text/plain', - 'headers': [ - ('Content-Type', - 'text/plain; charset=utf-8'), - ('Content-Disposition', - 'inline'), - ('Content-Transfer-Encoding', - 'quoted-printable')], - 'multi': False, - 'parts': 1, - 'phash': '1ddfa80485', - 'size': 206}, - 2: {'ctype': 'text/plain', - 'headers': [('Content-Type', - 'text/plain; charset=us-ascii'), - ('Content-Disposition', - 'attachment; ' - 'filename="attach.txt"')], - 'multi': False, - 'parts': 1, - 'phash': '7a94e4d769', - 'size': 133}, - 3: {'ctype': 'application/octet-stream', - 'headers': [('Content-Type', - 'application/octet-stream'), - ('Content-Disposition', - 'attachment; filename="hack.ico"'), - ('Content-Transfer-Encoding', - 'base64')], - 'multi': False, - 'parts': 1, - 'phash': 'c42cccebbd', - 'size': 12736}}}, - 2: {'ctype': 'application/pgp-signature', - 'headers': [('Content-Type', 'application/pgp-signature')], - 'multi': False, - 'parts': 1, - 'phash': '8f49fbf749', - 'size': 877}}} - -if DEBUG and DO_CHECK: - # TODO turn this into a proper unittest - assert(parts_map == expected) - print "Structure: OK" - - -print -print "RAW DOCS" -pprint.pprint(raw_docs) -print -print "PARTS MAP" -pprint.pprint(parts_map) |