From 2737ab3147ddb08ae024b7fd49c69153e01609c4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 9 Jul 2015 12:03:22 -0400 Subject: [style] slightly more meaningful test module name --- src/leap/mail/imap/tests/regressions | 459 ----------------------- src/leap/mail/imap/tests/regressions_mime_struct | 459 +++++++++++++++++++++++ 2 files changed, 459 insertions(+), 459 deletions(-) delete mode 100755 src/leap/mail/imap/tests/regressions create mode 100755 src/leap/mail/imap/tests/regressions_mime_struct (limited to 'src') diff --git a/src/leap/mail/imap/tests/regressions b/src/leap/mail/imap/tests/regressions deleted file mode 100755 index 475c893..0000000 --- a/src/leap/mail/imap/tests/regressions +++ /dev/null @@ -1,459 +0,0 @@ -#!/usr/bin/env python - -# -*- coding: utf-8 -*- -# regressions -# Copyright (C) 2014 LEAP -# Copyright (c) Twisted Matrix Laboratories. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Simple Regression Tests using IMAP4 client. - -Iterates trough all mails under a given folder and tries to APPEND them to -the server being tested. After FETCHING the pushed message, it compares -the received version with the one that was saved, and exits with an error -code if they do not match. -""" -import os -import StringIO -import sys - -from email.parser import Parser - -from twisted.internet import protocol -from twisted.internet import ssl -from twisted.internet import defer -from twisted.internet import stdio -from twisted.mail import imap4 -from twisted.protocols import basic -from twisted.python import log - - -REGRESSIONS_FOLDER = "regressions_test" - -parser = Parser() - - -def get_msg_parts(raw): - """ - Return a representation of the parts of a message suitable for - comparison. - - :param raw: string for the message - :type raw: str - """ - m = parser.parsestr(raw) - return [dict(part.items()) - if part.is_multipart() - else part.get_payload() - for part in m.walk()] - - -def compare_msg_parts(a, b): - """ - Compare two sequences of parts of messages. - - :param a: part sequence for message a - :param b: part sequence for message b - - :return: True if both message sequences are equivalent. - :rtype: bool - """ - # XXX This could be smarter and show the differences in the - # different parts when/where they differ. - #import pprint; pprint.pprint(a[0]) - #import pprint; pprint.pprint(b[0]) - - def lowerkey(d): - return dict((k.lower(), v.replace('\r', '')) - for k, v in d.iteritems()) - - def eq(x, y): - # For dicts, we compare a variation with their keys - # in lowercase, and \r removed from their values - if all(map(lambda i: isinstance(i, dict), (x, y))): - x, y = map(lowerkey, (x, y)) - return x == y - - compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b)) - all_match = all(compare_vector) - - if not all_match: - print "PARTS MISMATCH!" - print "vector: ", compare_vector - index = compare_vector.index(False) - from pprint import pprint - print "Expected:" - pprint(a[index]) - print ("***") - print "Found:" - pprint(b[index]) - print - - return all_match - - -def get_fd(string): - """ - Return a file descriptor with the passed string - as content. - """ - fd = StringIO.StringIO() - fd.write(string) - fd.seek(0) - return fd - - -class TrivialPrompter(basic.LineReceiver): - promptDeferred = None - - def prompt(self, msg): - assert self.promptDeferred is None - self.display(msg) - self.promptDeferred = defer.Deferred() - return self.promptDeferred - - def display(self, msg): - self.transport.write(msg) - - def lineReceived(self, line): - if self.promptDeferred is None: - return - d, self.promptDeferred = self.promptDeferred, None - d.callback(line) - - -class SimpleIMAP4Client(imap4.IMAP4Client): - """ - A client with callbacks for greeting messages from an IMAP server. - """ - greetDeferred = None - - def serverGreeting(self, caps): - self.serverCapabilities = caps - if self.greetDeferred is not None: - d, self.greetDeferred = self.greetDeferred, None - d.callback(self) - - -class SimpleIMAP4ClientFactory(protocol.ClientFactory): - usedUp = False - protocol = SimpleIMAP4Client - - def __init__(self, username, onConn): - self.ctx = ssl.ClientContextFactory() - - self.username = username - self.onConn = onConn - - def buildProtocol(self, addr): - """ - Initiate the protocol instance. Since we are building a simple IMAP - client, we don't bother checking what capabilities the server has. We - just add all the authenticators twisted.mail has. Note: Gmail no - longer uses any of the methods below, it's been using XOAUTH since - 2010. - """ - assert not self.usedUp - self.usedUp = True - - p = self.protocol(self.ctx) - p.factory = self - p.greetDeferred = self.onConn - - p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) - p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) - p.registerAuthenticator( - imap4.CramMD5ClientAuthenticator(self.username)) - - return p - - def clientConnectionFailed(self, connector, reason): - d, self.onConn = self.onConn, None - d.errback(reason) - - -def cbServerGreeting(proto, username, password): - """ - Initial callback - invoked after the server sends us its greet message. - """ - # Hook up stdio - tp = TrivialPrompter() - stdio.StandardIO(tp) - - # And make it easily accessible - proto.prompt = tp.prompt - proto.display = tp.display - - # Try to authenticate securely - return proto.authenticate( - password).addCallback( - cbAuthentication, - proto).addErrback( - ebAuthentication, proto, username, password - ) - - -def ebConnection(reason): - """ - Fallback error-handler. If anything goes wrong, log it and quit. - """ - log.startLogging(sys.stdout) - log.err(reason) - return reason - - -def cbAuthentication(result, proto): - """ - Callback after authentication has succeeded. - - Lists a bunch of mailboxes. - """ - return proto.select( - REGRESSIONS_FOLDER - ).addCallback( - cbSelectMbox, proto - ).addErrback( - ebSelectMbox, proto, REGRESSIONS_FOLDER) - - -def ebAuthentication(failure, proto, username, password): - """ - Errback invoked when authentication fails. - - If it failed because no SASL mechanisms match, offer the user the choice - of logging in insecurely. - - If you are trying to connect to your Gmail account, you will be here! - """ - failure.trap(imap4.NoSupportedAuthentication) - return InsecureLogin(proto, username, password) - - -def InsecureLogin(proto, username, password): - """ - Raise insecure-login error. - """ - return proto.login( - username, password - ).addCallback( - cbAuthentication, proto) - - -def cbSelectMbox(result, proto): - """ - Callback invoked when select command finishes successfully. - - If any message is in the test folder, it will flag them as deleted and - expunge. - If no messages found, it will start with the APPEND tests. - """ - print "SELECT: %s EXISTS " % result.get("EXISTS", "??") - - if result["EXISTS"] != 0: - # Flag as deleted, expunge, and do an examine again. - print "There is mail here, will delete..." - return cbDeleteAndExpungeTestFolder(proto) - - else: - return cbAppendNextMessage(proto) - - -def ebSelectMbox(failure, proto, folder): - """ - Errback invoked when the examine command fails. - - Creates the folder. - """ - log.err(failure) - log.msg("Folder %r does not exist. Creating..." % (folder,)) - return proto.create(folder).addCallback(cbAuthentication, proto) - - -def ebExpunge(failure): - log.err(failure) - - -def cbDeleteAndExpungeTestFolder(proto): - """ - Callback invoked fom cbExamineMbox when the number of messages in the - mailbox is not zero. It flags all messages as deleted and expunge the - mailbox. - """ - return proto.setFlags( - "1:*", ("\\Deleted",) - ).addCallback( - lambda r: proto.expunge() - ).addCallback( - cbExpunge, proto - ).addErrback( - ebExpunge) - - -def cbExpunge(result, proto): - return proto.select( - REGRESSIONS_FOLDER - ).addCallback( - cbSelectMbox, proto - ).addErrback(ebSettingDeleted, proto) - - -def ebSettingDeleted(failure, proto): - """ - Report errors during deletion of messages in the mailbox. - """ - print failure.getTraceback() - - -def cbAppendNextMessage(proto): - """ - Appends the next message in the global queue to the test folder. - """ - # 1. Get the next test message from global tuple. - try: - next_sample = SAMPLES.pop() - except IndexError: - # we're done! - return proto.logout() - - print "\nAPPEND %s" % (next_sample,) - raw = open(next_sample).read() - msg = get_fd(raw) - return proto.append( - REGRESSIONS_FOLDER, msg - ).addCallback( - lambda r: proto.select(REGRESSIONS_FOLDER) - ).addCallback( - cbAppend, proto, raw - ).addErrback( - ebAppend, proto, raw) - - -def cbAppend(result, proto, orig_msg): - """ - Fetches the message right after an append. - """ - # XXX keep account of highest UID - uid = "1:*" - - return proto.fetchSpecific( - '%s' % uid, - headerType='', - headerArgs=['BODY.PEEK[]'], - ).addCallback( - cbCompareMessage, proto, orig_msg - ).addErrback(ebAppend, proto, orig_msg) - - -def ebAppend(failure, proto, raw): - """ - Errorback for the append operation - """ - print "ERROR WHILE APPENDING!" - print failure.getTraceback() - - -def cbPickMessage(result, proto): - """ - Pick a message. - """ - return proto.fetchSpecific( - '%s' % result, - headerType='', - headerArgs=['BODY.PEEK[]'], - ).addCallback(cbCompareMessage, proto) - - -def cbCompareMessage(result, proto, raw): - """ - Display message and compare it with the original one. - """ - parts_orig = get_msg_parts(raw) - - if result: - keys = result.keys() - keys.sort() - else: - print "[-] GOT NO RESULT" - return proto.logout() - - latest = max(keys) - - fetched_msg = result[latest][0][2] - parts_fetched = get_msg_parts(fetched_msg) - - equal = compare_msg_parts( - parts_orig, - parts_fetched) - - if equal: - print "[+] MESSAGES MATCH" - return cbAppendNextMessage(proto) - else: - print "[-] ERROR: MESSAGES DO NOT MATCH !!!" - print " ABORTING COMPARISON..." - # FIXME logout and print the subject ... - return proto.logout() - - -def cbClose(result): - """ - Close the connection when we finish everything. - """ - from twisted.internet import reactor - reactor.stop() - - -def main(): - import glob - import sys - - if len(sys.argv) != 4: - print "Usage: regressions " - sys.exit() - - hostname = "localhost" - port = "1984" - username = sys.argv[1] - password = sys.argv[2] - - samplesdir = sys.argv[3] - - if not os.path.isdir(samplesdir): - print ("Could not find samples folder! " - "Make sure of copying mail_breaker contents there.") - sys.exit() - - samples = glob.glob(samplesdir + '/*') - - global SAMPLES - SAMPLES = [] - SAMPLES += samples - - onConn = defer.Deferred( - ).addCallback( - cbServerGreeting, username, password - ).addErrback( - ebConnection - ).addBoth(cbClose) - - factory = SimpleIMAP4ClientFactory(username, onConn) - - from twisted.internet import reactor - reactor.connectTCP(hostname, int(port), factory) - reactor.run() - - -if __name__ == '__main__': - main() diff --git a/src/leap/mail/imap/tests/regressions_mime_struct b/src/leap/mail/imap/tests/regressions_mime_struct new file mode 100755 index 0000000..1e0e870 --- /dev/null +++ b/src/leap/mail/imap/tests/regressions_mime_struct @@ -0,0 +1,459 @@ +#!/usr/bin/env python + +# -*- coding: utf-8 -*- +# regression_mime_struct +# Copyright (C) 2014 LEAP +# Copyright (c) Twisted Matrix Laboratories. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Simple Regression Tests for checking MIME struct handling using IMAP4 client. + +Iterates trough all mails under a given folder and tries to APPEND them to +the server being tested. After FETCHING the pushed message, it compares +the received version with the one that was saved, and exits with an error +code if they do not match. +""" +import os +import StringIO +import sys + +from email.parser import Parser + +from twisted.internet import protocol +from twisted.internet import ssl +from twisted.internet import defer +from twisted.internet import stdio +from twisted.mail import imap4 +from twisted.protocols import basic +from twisted.python import log + + +REGRESSIONS_FOLDER = "regressions_test" + +parser = Parser() + + +def get_msg_parts(raw): + """ + Return a representation of the parts of a message suitable for + comparison. + + :param raw: string for the message + :type raw: str + """ + m = parser.parsestr(raw) + return [dict(part.items()) + if part.is_multipart() + else part.get_payload() + for part in m.walk()] + + +def compare_msg_parts(a, b): + """ + Compare two sequences of parts of messages. + + :param a: part sequence for message a + :param b: part sequence for message b + + :return: True if both message sequences are equivalent. + :rtype: bool + """ + # XXX This could be smarter and show the differences in the + # different parts when/where they differ. + #import pprint; pprint.pprint(a[0]) + #import pprint; pprint.pprint(b[0]) + + def lowerkey(d): + return dict((k.lower(), v.replace('\r', '')) + for k, v in d.iteritems()) + + def eq(x, y): + # For dicts, we compare a variation with their keys + # in lowercase, and \r removed from their values + if all(map(lambda i: isinstance(i, dict), (x, y))): + x, y = map(lowerkey, (x, y)) + return x == y + + compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b)) + all_match = all(compare_vector) + + if not all_match: + print "PARTS MISMATCH!" + print "vector: ", compare_vector + index = compare_vector.index(False) + from pprint import pprint + print "Expected:" + pprint(a[index]) + print ("***") + print "Found:" + pprint(b[index]) + print + + return all_match + + +def get_fd(string): + """ + Return a file descriptor with the passed string + as content. + """ + fd = StringIO.StringIO() + fd.write(string) + fd.seek(0) + return fd + + +class TrivialPrompter(basic.LineReceiver): + promptDeferred = None + + def prompt(self, msg): + assert self.promptDeferred is None + self.display(msg) + self.promptDeferred = defer.Deferred() + return self.promptDeferred + + def display(self, msg): + self.transport.write(msg) + + def lineReceived(self, line): + if self.promptDeferred is None: + return + d, self.promptDeferred = self.promptDeferred, None + d.callback(line) + + +class SimpleIMAP4Client(imap4.IMAP4Client): + """ + A client with callbacks for greeting messages from an IMAP server. + """ + greetDeferred = None + + def serverGreeting(self, caps): + self.serverCapabilities = caps + if self.greetDeferred is not None: + d, self.greetDeferred = self.greetDeferred, None + d.callback(self) + + +class SimpleIMAP4ClientFactory(protocol.ClientFactory): + usedUp = False + protocol = SimpleIMAP4Client + + def __init__(self, username, onConn): + self.ctx = ssl.ClientContextFactory() + + self.username = username + self.onConn = onConn + + def buildProtocol(self, addr): + """ + Initiate the protocol instance. Since we are building a simple IMAP + client, we don't bother checking what capabilities the server has. We + just add all the authenticators twisted.mail has. Note: Gmail no + longer uses any of the methods below, it's been using XOAUTH since + 2010. + """ + assert not self.usedUp + self.usedUp = True + + p = self.protocol(self.ctx) + p.factory = self + p.greetDeferred = self.onConn + + p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) + p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) + p.registerAuthenticator( + imap4.CramMD5ClientAuthenticator(self.username)) + + return p + + def clientConnectionFailed(self, connector, reason): + d, self.onConn = self.onConn, None + d.errback(reason) + + +def cbServerGreeting(proto, username, password): + """ + Initial callback - invoked after the server sends us its greet message. + """ + # Hook up stdio + tp = TrivialPrompter() + stdio.StandardIO(tp) + + # And make it easily accessible + proto.prompt = tp.prompt + proto.display = tp.display + + # Try to authenticate securely + return proto.authenticate( + password).addCallback( + cbAuthentication, + proto).addErrback( + ebAuthentication, proto, username, password + ) + + +def ebConnection(reason): + """ + Fallback error-handler. If anything goes wrong, log it and quit. + """ + log.startLogging(sys.stdout) + log.err(reason) + return reason + + +def cbAuthentication(result, proto): + """ + Callback after authentication has succeeded. + + Lists a bunch of mailboxes. + """ + return proto.select( + REGRESSIONS_FOLDER + ).addCallback( + cbSelectMbox, proto + ).addErrback( + ebSelectMbox, proto, REGRESSIONS_FOLDER) + + +def ebAuthentication(failure, proto, username, password): + """ + Errback invoked when authentication fails. + + If it failed because no SASL mechanisms match, offer the user the choice + of logging in insecurely. + + If you are trying to connect to your Gmail account, you will be here! + """ + failure.trap(imap4.NoSupportedAuthentication) + return InsecureLogin(proto, username, password) + + +def InsecureLogin(proto, username, password): + """ + Raise insecure-login error. + """ + return proto.login( + username, password + ).addCallback( + cbAuthentication, proto) + + +def cbSelectMbox(result, proto): + """ + Callback invoked when select command finishes successfully. + + If any message is in the test folder, it will flag them as deleted and + expunge. + If no messages found, it will start with the APPEND tests. + """ + print "SELECT: %s EXISTS " % result.get("EXISTS", "??") + + if result["EXISTS"] != 0: + # Flag as deleted, expunge, and do an examine again. + print "There is mail here, will delete..." + return cbDeleteAndExpungeTestFolder(proto) + + else: + return cbAppendNextMessage(proto) + + +def ebSelectMbox(failure, proto, folder): + """ + Errback invoked when the examine command fails. + + Creates the folder. + """ + log.err(failure) + log.msg("Folder %r does not exist. Creating..." % (folder,)) + return proto.create(folder).addCallback(cbAuthentication, proto) + + +def ebExpunge(failure): + log.err(failure) + + +def cbDeleteAndExpungeTestFolder(proto): + """ + Callback invoked fom cbExamineMbox when the number of messages in the + mailbox is not zero. It flags all messages as deleted and expunge the + mailbox. + """ + return proto.setFlags( + "1:*", ("\\Deleted",) + ).addCallback( + lambda r: proto.expunge() + ).addCallback( + cbExpunge, proto + ).addErrback( + ebExpunge) + + +def cbExpunge(result, proto): + return proto.select( + REGRESSIONS_FOLDER + ).addCallback( + cbSelectMbox, proto + ).addErrback(ebSettingDeleted, proto) + + +def ebSettingDeleted(failure, proto): + """ + Report errors during deletion of messages in the mailbox. + """ + print failure.getTraceback() + + +def cbAppendNextMessage(proto): + """ + Appends the next message in the global queue to the test folder. + """ + # 1. Get the next test message from global tuple. + try: + next_sample = SAMPLES.pop() + except IndexError: + # we're done! + return proto.logout() + + print "\nAPPEND %s" % (next_sample,) + raw = open(next_sample).read() + msg = get_fd(raw) + return proto.append( + REGRESSIONS_FOLDER, msg + ).addCallback( + lambda r: proto.select(REGRESSIONS_FOLDER) + ).addCallback( + cbAppend, proto, raw + ).addErrback( + ebAppend, proto, raw) + + +def cbAppend(result, proto, orig_msg): + """ + Fetches the message right after an append. + """ + # XXX keep account of highest UID + uid = "1:*" + + return proto.fetchSpecific( + '%s' % uid, + headerType='', + headerArgs=['BODY.PEEK[]'], + ).addCallback( + cbCompareMessage, proto, orig_msg + ).addErrback(ebAppend, proto, orig_msg) + + +def ebAppend(failure, proto, raw): + """ + Errorback for the append operation + """ + print "ERROR WHILE APPENDING!" + print failure.getTraceback() + + +def cbPickMessage(result, proto): + """ + Pick a message. + """ + return proto.fetchSpecific( + '%s' % result, + headerType='', + headerArgs=['BODY.PEEK[]'], + ).addCallback(cbCompareMessage, proto) + + +def cbCompareMessage(result, proto, raw): + """ + Display message and compare it with the original one. + """ + parts_orig = get_msg_parts(raw) + + if result: + keys = result.keys() + keys.sort() + else: + print "[-] GOT NO RESULT" + return proto.logout() + + latest = max(keys) + + fetched_msg = result[latest][0][2] + parts_fetched = get_msg_parts(fetched_msg) + + equal = compare_msg_parts( + parts_orig, + parts_fetched) + + if equal: + print "[+] MESSAGES MATCH" + return cbAppendNextMessage(proto) + else: + print "[-] ERROR: MESSAGES DO NOT MATCH !!!" + print " ABORTING COMPARISON..." + # FIXME logout and print the subject ... + return proto.logout() + + +def cbClose(result): + """ + Close the connection when we finish everything. + """ + from twisted.internet import reactor + reactor.stop() + + +def main(): + import glob + import sys + + if len(sys.argv) != 4: + print "Usage: regressions " + sys.exit() + + hostname = "localhost" + port = "1984" + username = sys.argv[1] + password = sys.argv[2] + + samplesdir = sys.argv[3] + + if not os.path.isdir(samplesdir): + print ("Could not find samples folder! " + "Make sure of copying mail_breaker contents there.") + sys.exit() + + samples = glob.glob(samplesdir + '/*') + + global SAMPLES + SAMPLES = [] + SAMPLES += samples + + onConn = defer.Deferred( + ).addCallback( + cbServerGreeting, username, password + ).addErrback( + ebConnection + ).addBoth(cbClose) + + factory = SimpleIMAP4ClientFactory(username, onConn) + + from twisted.internet import reactor + reactor.connectTCP(hostname, int(port), factory) + reactor.run() + + +if __name__ == '__main__': + main() -- cgit v1.2.3