diff options
| -rwxr-xr-x | src/leap/mail/imap/tests/getmail | 2 | ||||
| -rwxr-xr-x | src/leap/mail/imap/tests/regressions | 451 | 
2 files changed, 451 insertions, 2 deletions
| diff --git a/src/leap/mail/imap/tests/getmail b/src/leap/mail/imap/tests/getmail index 17e195c..0fb00d2 100755 --- a/src/leap/mail/imap/tests/getmail +++ b/src/leap/mail/imap/tests/getmail @@ -5,8 +5,6 @@  # Modifications by LEAP Developers 2014 to fit  # Bitmask configuration settings. - -  """  Simple IMAP4 client which displays the subjects of all messages in a  particular mailbox. diff --git a/src/leap/mail/imap/tests/regressions b/src/leap/mail/imap/tests/regressions new file mode 100755 index 0000000..0a43398 --- /dev/null +++ b/src/leap/mail/imap/tests/regressions @@ -0,0 +1,451 @@ +#!/usr/bin/env python + +# -*- coding: utf-8 -*- +# regressions +# Copyright (C) 2014 LEAP +# Copyright (c) Twisted Matrix Laboratories. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Simple Regression Tests using IMAP4 client. + +Iterates trough all mails under a given folder and tries to APPEND them to +the server being tested. After FETCHING the pushed message, it compares +the received version with the one that was saved, and exits with an error +code if they do not match. +""" +import os +import StringIO +import sys + +from email.parser import Parser + +from twisted.internet import protocol +from twisted.internet import ssl +from twisted.internet import defer +from twisted.internet import stdio +from twisted.mail import imap4 +from twisted.protocols import basic +from twisted.python import log + + +REGRESSIONS_FOLDER = "regressions_test" + +parser = Parser() + + +def get_msg_parts(raw): +    """ +    Return a representation of the parts of a message suitable for +    comparison. + +    :param raw: string for the message +    :type raw: str +    """ +    m = parser.parsestr(raw) +    return [dict(part.items()) +            if part.is_multipart() +            else part.get_payload() +            for part in m.walk()] + + +def compare_msg_parts(a, b): +    """ +    Compare two sequences of parts of messages. + +    :param a: part sequence for message a +    :param b: part sequence for message b + +    :return: True if both message sequences are equivalent. +    :rtype: bool +    """ +    # XXX This could be smarter and show the differences in the +    # different parts when/where they differ. +    #import pprint; pprint.pprint(a[0]) +    #import pprint; pprint.pprint(b[0]) + +    def lowerkey(d): +        return dict((k.lower(), v.replace('\r', '')) +                    for k, v in d.iteritems()) + +    def eq(x, y): +        # For dicts, we compare a variation with their keys +        # in lowercase, and \r removed from their values +        if all(map(lambda i: isinstance(i, dict), (x, y))): +            x, y = map(lowerkey, (x, y)) +        return x == y + +    compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b)) +    all_match = all(compare_vector) + +    if not all_match: +        print "PARTS MISMATCH!" +        print "vector: ", compare_vector +        index = compare_vector.index(False) +        from pprint import pprint +        print "Expected:" +        pprint(a[index]) +        print ("***") +        print "Found:" +        pprint(b[index]) +        print + + +    return all_match + + +def get_fd(string): +    """ +    Return a file descriptor with the passed string +    as content. +    """ +    fd = StringIO.StringIO() +    fd.write(string) +    fd.seek(0) +    return fd + + +class TrivialPrompter(basic.LineReceiver): +    promptDeferred = None + +    def prompt(self, msg): +        assert self.promptDeferred is None +        self.display(msg) +        self.promptDeferred = defer.Deferred() +        return self.promptDeferred + +    def display(self, msg): +        self.transport.write(msg) + +    def lineReceived(self, line): +        if self.promptDeferred is None: +            return +        d, self.promptDeferred = self.promptDeferred, None +        d.callback(line) + + +class SimpleIMAP4Client(imap4.IMAP4Client): +    """ +    A client with callbacks for greeting messages from an IMAP server. +    """ +    greetDeferred = None + +    def serverGreeting(self, caps): +        self.serverCapabilities = caps +        if self.greetDeferred is not None: +            d, self.greetDeferred = self.greetDeferred, None +            d.callback(self) + + +class SimpleIMAP4ClientFactory(protocol.ClientFactory): +    usedUp = False +    protocol = SimpleIMAP4Client + +    def __init__(self, username, onConn): +        self.ctx = ssl.ClientContextFactory() + +        self.username = username +        self.onConn = onConn + +    def buildProtocol(self, addr): +        """ +        Initiate the protocol instance. Since we are building a simple IMAP +        client, we don't bother checking what capabilities the server has. We +        just add all the authenticators twisted.mail has.  Note: Gmail no +        longer uses any of the methods below, it's been using XOAUTH since +        2010. +        """ +        assert not self.usedUp +        self.usedUp = True + +        p = self.protocol(self.ctx) +        p.factory = self +        p.greetDeferred = self.onConn + +        p.registerAuthenticator(imap4.PLAINAuthenticator(self.username)) +        p.registerAuthenticator(imap4.LOGINAuthenticator(self.username)) +        p.registerAuthenticator( +            imap4.CramMD5ClientAuthenticator(self.username)) + +        return p + +    def clientConnectionFailed(self, connector, reason): +        d, self.onConn = self.onConn, None +        d.errback(reason) + + +def cbServerGreeting(proto, username, password): +    """ +    Initial callback - invoked after the server sends us its greet message. +    """ +    # Hook up stdio +    tp = TrivialPrompter() +    stdio.StandardIO(tp) + +    # And make it easily accessible +    proto.prompt = tp.prompt +    proto.display = tp.display + +    # Try to authenticate securely +    return proto.authenticate( +        password).addCallback( +        cbAuthentication, +        proto).addErrback( +        ebAuthentication, proto, username, password +    ) + + +def ebConnection(reason): +    """ +    Fallback error-handler. If anything goes wrong, log it and quit. +    """ +    log.startLogging(sys.stdout) +    log.err(reason) +    return reason + + +def cbAuthentication(result, proto): +    """ +    Callback after authentication has succeeded. + +    Lists a bunch of mailboxes. +    """ +    return proto.select( +        REGRESSIONS_FOLDER +    ).addCallback( +        cbSelectMbox, proto +    ).addErrback( +        ebSelectMbox, proto, REGRESSIONS_FOLDER) + + +def ebAuthentication(failure, proto, username, password): +    """ +    Errback invoked when authentication fails. + +    If it failed because no SASL mechanisms match, offer the user the choice +    of logging in insecurely. + +    If you are trying to connect to your Gmail account, you will be here! +    """ +    failure.trap(imap4.NoSupportedAuthentication) +    return InsecureLogin(proto, username, password) + + +def InsecureLogin(proto, username, password): +    """ +    Raise insecure-login error. +    """ +    return proto.login( +        username, password +    ).addCallback( +        cbAuthentication, proto) + + +def cbSelectMbox(result, proto): +    """ +    Callback invoked when select command finishes successfully. + +    If any message is in the test folder, it will flag them as deleted and +    expunge. +    If no messages found, it will start with the APPEND tests. +    """ +    print "SELECT: %s EXISTS " % result.get("EXISTS", "??") + +    if result["EXISTS"] != 0: +        # Flag as deleted, expunge, and do an examine again. +        #print "There is mail here, will delete..." +        return cbDeleteAndExpungeTestFolder(proto) + +    else: +        return cbAppendNextMessage(proto) + + +def ebSelectMbox(failure, proto, folder): +    """ +    Errback invoked when the examine command fails. + +    Creates the folder. +    """ +    print failure.getTraceback() +    log.msg("Folder %r does not exist. Creating..." % (folder,)) +    return proto.create(folder).addCallback(cbAuthentication, proto) + + +def cbDeleteAndExpungeTestFolder(proto): +    """ +    Callback invoked fom cbExamineMbox when the number of messages in the +    mailbox is not zero. It flags all messages as deleted and expunge the +    mailbox. +    """ +    return proto.setFlags( +        "1:*", ("\\Deleted",) +    ).addCallback( +        lambda r: proto.expunge() +    ).addCallback( +        cbExpunge, proto) + + +def cbExpunge(result, proto): +    return proto.select( +        REGRESSIONS_FOLDER +    ).addCallback( +        cbSelectMbox, proto +    ).addErrback(ebSettingDeleted, proto) + + +def ebSettingDeleted(failure, proto): +    """ +    Report errors during deletion of messages in the mailbox. +    """ +    print failure.getTraceback() + + +def cbAppendNextMessage(proto): +    """ +    Appends the next message in the global queue to the test folder. +    """ +    # 1. Get the next test message from global tuple. +    try: +        next_sample = SAMPLES.pop() +    except IndexError: +        # we're done! +        return proto.logout() + +    print "\nAPPEND %s" % (next_sample,) +    raw = open(next_sample).read() +    msg = get_fd(raw) +    return proto.append( +        REGRESSIONS_FOLDER, msg +    ).addCallback( +        lambda r: proto.examine(REGRESSIONS_FOLDER) +    ).addCallback( +        cbAppend, proto, raw +    ).addErrback( +        ebAppend, proto, raw) + + +def cbAppend(result, proto, orig_msg): +    """ +    Fetches the message right after an append. +    """ +    # XXX keep account of highest UID +    uid = "1:*" + +    return proto.fetchSpecific( +        '%s' % uid, +        headerType='', +        headerArgs=['BODY.PEEK[]'], +    ).addCallback( +        cbCompareMessage, proto, orig_msg +    ).addErrback(ebAppend, proto, orig_msg) + + +def ebAppend(failure, proto, raw): +    """ +    Errorback for the append operation +    """ +    print "ERROR WHILE APPENDING!" +    print failure.getTraceback() + + +def cbPickMessage(result, proto): +    """ +    Pick a message. +    """ +    return proto.fetchSpecific( +        '%s' % result, +        headerType='', +        headerArgs=['BODY.PEEK[]'], +        ).addCallback(cbCompareMessage, proto) + + +def cbCompareMessage(result, proto, raw): +    """ +    Display message and compare it with the original one. +    """ +    parts_orig = get_msg_parts(raw) + +    if result: +        keys = result.keys() +        keys.sort() + +    latest = max(keys) + +    fetched_msg = result[latest][0][2] +    parts_fetched = get_msg_parts(fetched_msg) + +    equal = compare_msg_parts( +        parts_orig, +        parts_fetched) + +    if equal: +        print "[+] MESSAGES MATCH" +        return cbAppendNextMessage(proto) +    else: +        print "[-] ERROR: MESSAGES DO NOT MATCH !!!" +        print "    ABORTING COMPARISON..." +        # FIXME logout and print the subject ... +        return proto.logout() + + +def cbClose(result): +    """ +    Close the connection when we finish everything. +    """ +    from twisted.internet import reactor +    reactor.stop() + + +def main(): +    import glob +    import sys + +    if len(sys.argv) != 4: +        print "Usage: regressions <user> <pass> <samples-folder>" +        sys.exit() + +    hostname = "localhost" +    port = "1984" +    username = sys.argv[1] +    password = sys.argv[2] + +    samplesdir = sys.argv[3] + +    if not os.path.isdir(samplesdir): +        print ("Could not find samples folder! " +               "Make sure of copying mail_breaker contents there.") +        sys.exit() + +    samples = glob.glob(samplesdir + '/*') + +    global SAMPLES +    SAMPLES = [] +    SAMPLES += samples + +    onConn = defer.Deferred( +    ).addCallback( +        cbServerGreeting, username, password +    ).addErrback( +        ebConnection +    ).addBoth(cbClose) + +    factory = SimpleIMAP4ClientFactory(username, onConn) + +    from twisted.internet import reactor +    reactor.connectTCP(hostname, int(port), factory) +    reactor.run() + + +if __name__ == '__main__': +    main() | 
