summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/imap/tests/regressions_mime_struct
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/bitmask/mail/imap/tests/regressions_mime_struct')
-rwxr-xr-xsrc/leap/bitmask/mail/imap/tests/regressions_mime_struct461
1 files changed, 0 insertions, 461 deletions
diff --git a/src/leap/bitmask/mail/imap/tests/regressions_mime_struct b/src/leap/bitmask/mail/imap/tests/regressions_mime_struct
deleted file mode 100755
index 0332664..0000000
--- a/src/leap/bitmask/mail/imap/tests/regressions_mime_struct
+++ /dev/null
@@ -1,461 +0,0 @@
-#!/usr/bin/env python
-
-# -*- coding: utf-8 -*-
-# regression_mime_struct
-# Copyright (C) 2014 LEAP
-# Copyright (c) Twisted Matrix Laboratories.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Simple Regression Tests for checking MIME struct handling using IMAP4 client.
-
-Iterates trough all mails under a given folder and tries to APPEND them to
-the server being tested. After FETCHING the pushed message, it compares
-the received version with the one that was saved, and exits with an error
-code if they do not match.
-"""
-import os
-import StringIO
-import sys
-
-from email.parser import Parser
-
-from twisted.internet import protocol
-from twisted.internet import ssl
-from twisted.internet import defer
-from twisted.internet import stdio
-from twisted.mail import imap4
-from twisted.protocols import basic
-from twisted.python import log
-
-
-REGRESSIONS_FOLDER = os.environ.get(
- "REGRESSIONS_FOLDER", "regressions_test")
-print "[+] Using regressions folder:", REGRESSIONS_FOLDER
-
-parser = Parser()
-
-
-def get_msg_parts(raw):
- """
- Return a representation of the parts of a message suitable for
- comparison.
-
- :param raw: string for the message
- :type raw: str
- """
- m = parser.parsestr(raw)
- return [dict(part.items())
- if part.is_multipart()
- else part.get_payload()
- for part in m.walk()]
-
-
-def compare_msg_parts(a, b):
- """
- Compare two sequences of parts of messages.
-
- :param a: part sequence for message a
- :param b: part sequence for message b
-
- :return: True if both message sequences are equivalent.
- :rtype: bool
- """
- # XXX This could be smarter and show the differences in the
- # different parts when/where they differ.
- #import pprint; pprint.pprint(a[0])
- #import pprint; pprint.pprint(b[0])
-
- def lowerkey(d):
- return dict((k.lower(), v.replace('\r', ''))
- for k, v in d.iteritems())
-
- def eq(x, y):
- # For dicts, we compare a variation with their keys
- # in lowercase, and \r removed from their values
- if all(map(lambda i: isinstance(i, dict), (x, y))):
- x, y = map(lowerkey, (x, y))
- return x == y
-
- compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b))
- all_match = all(compare_vector)
-
- if not all_match:
- print "PARTS MISMATCH!"
- print "vector: ", compare_vector
- index = compare_vector.index(False)
- from pprint import pprint
- print "Expected:"
- pprint(a[index])
- print ("***")
- print "Found:"
- pprint(b[index])
- print
-
- return all_match
-
-
-def get_fd(string):
- """
- Return a file descriptor with the passed string
- as content.
- """
- fd = StringIO.StringIO()
- fd.write(string)
- fd.seek(0)
- return fd
-
-
-class TrivialPrompter(basic.LineReceiver):
- promptDeferred = None
-
- def prompt(self, msg):
- assert self.promptDeferred is None
- self.display(msg)
- self.promptDeferred = defer.Deferred()
- return self.promptDeferred
-
- def display(self, msg):
- self.transport.write(msg)
-
- def lineReceived(self, line):
- if self.promptDeferred is None:
- return
- d, self.promptDeferred = self.promptDeferred, None
- d.callback(line)
-
-
-class SimpleIMAP4Client(imap4.IMAP4Client):
- """
- A client with callbacks for greeting messages from an IMAP server.
- """
- greetDeferred = None
-
- def serverGreeting(self, caps):
- self.serverCapabilities = caps
- if self.greetDeferred is not None:
- d, self.greetDeferred = self.greetDeferred, None
- d.callback(self)
-
-
-class SimpleIMAP4ClientFactory(protocol.ClientFactory):
- usedUp = False
- protocol = SimpleIMAP4Client
-
- def __init__(self, username, onConn):
- self.ctx = ssl.ClientContextFactory()
-
- self.username = username
- self.onConn = onConn
-
- def buildProtocol(self, addr):
- """
- Initiate the protocol instance. Since we are building a simple IMAP
- client, we don't bother checking what capabilities the server has. We
- just add all the authenticators twisted.mail has. Note: Gmail no
- longer uses any of the methods below, it's been using XOAUTH since
- 2010.
- """
- assert not self.usedUp
- self.usedUp = True
-
- p = self.protocol(self.ctx)
- p.factory = self
- p.greetDeferred = self.onConn
-
- p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
- p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
- p.registerAuthenticator(
- imap4.CramMD5ClientAuthenticator(self.username))
-
- return p
-
- def clientConnectionFailed(self, connector, reason):
- d, self.onConn = self.onConn, None
- d.errback(reason)
-
-
-def cbServerGreeting(proto, username, password):
- """
- Initial callback - invoked after the server sends us its greet message.
- """
- # Hook up stdio
- tp = TrivialPrompter()
- stdio.StandardIO(tp)
-
- # And make it easily accessible
- proto.prompt = tp.prompt
- proto.display = tp.display
-
- # Try to authenticate securely
- return proto.authenticate(
- password).addCallback(
- cbAuthentication,
- proto).addErrback(
- ebAuthentication, proto, username, password
- )
-
-
-def ebConnection(reason):
- """
- Fallback error-handler. If anything goes wrong, log it and quit.
- """
- log.startLogging(sys.stdout)
- log.err(reason)
- return reason
-
-
-def cbAuthentication(result, proto):
- """
- Callback after authentication has succeeded.
-
- Lists a bunch of mailboxes.
- """
- return proto.select(
- REGRESSIONS_FOLDER
- ).addCallback(
- cbSelectMbox, proto
- ).addErrback(
- ebSelectMbox, proto, REGRESSIONS_FOLDER)
-
-
-def ebAuthentication(failure, proto, username, password):
- """
- Errback invoked when authentication fails.
-
- If it failed because no SASL mechanisms match, offer the user the choice
- of logging in insecurely.
-
- If you are trying to connect to your Gmail account, you will be here!
- """
- failure.trap(imap4.NoSupportedAuthentication)
- return InsecureLogin(proto, username, password)
-
-
-def InsecureLogin(proto, username, password):
- """
- Raise insecure-login error.
- """
- return proto.login(
- username, password
- ).addCallback(
- cbAuthentication, proto)
-
-
-def cbSelectMbox(result, proto):
- """
- Callback invoked when select command finishes successfully.
-
- If any message is in the test folder, it will flag them as deleted and
- expunge.
- If no messages found, it will start with the APPEND tests.
- """
- print "SELECT: %s EXISTS " % result.get("EXISTS", "??")
-
- if result["EXISTS"] != 0:
- # Flag as deleted, expunge, and do an examine again.
- print "There is mail here, will delete..."
- return cbDeleteAndExpungeTestFolder(proto)
-
- else:
- return cbAppendNextMessage(proto)
-
-
-def ebSelectMbox(failure, proto, folder):
- """
- Errback invoked when the examine command fails.
-
- Creates the folder.
- """
- log.err(failure)
- log.msg("Folder %r does not exist. Creating..." % (folder,))
- return proto.create(folder).addCallback(cbAuthentication, proto)
-
-
-def ebExpunge(failure):
- log.err(failure)
-
-
-def cbDeleteAndExpungeTestFolder(proto):
- """
- Callback invoked fom cbExamineMbox when the number of messages in the
- mailbox is not zero. It flags all messages as deleted and expunge the
- mailbox.
- """
- return proto.setFlags(
- "1:*", ("\\Deleted",)
- ).addCallback(
- lambda r: proto.expunge()
- ).addCallback(
- cbExpunge, proto
- ).addErrback(
- ebExpunge)
-
-
-def cbExpunge(result, proto):
- return proto.select(
- REGRESSIONS_FOLDER
- ).addCallback(
- cbSelectMbox, proto
- ).addErrback(ebSettingDeleted, proto)
-
-
-def ebSettingDeleted(failure, proto):
- """
- Report errors during deletion of messages in the mailbox.
- """
- print failure.getTraceback()
-
-
-def cbAppendNextMessage(proto):
- """
- Appends the next message in the global queue to the test folder.
- """
- # 1. Get the next test message from global tuple.
- try:
- next_sample = SAMPLES.pop()
- except IndexError:
- # we're done!
- return proto.logout()
-
- print "\nAPPEND %s" % (next_sample,)
- raw = open(next_sample).read()
- msg = get_fd(raw)
- return proto.append(
- REGRESSIONS_FOLDER, msg
- ).addCallback(
- lambda r: proto.select(REGRESSIONS_FOLDER)
- ).addCallback(
- cbAppend, proto, raw
- ).addErrback(
- ebAppend, proto, raw)
-
-
-def cbAppend(result, proto, orig_msg):
- """
- Fetches the message right after an append.
- """
- # XXX keep account of highest UID
- uid = "1:*"
-
- return proto.fetchSpecific(
- '%s' % uid,
- headerType='',
- headerArgs=['BODY.PEEK[]'],
- ).addCallback(
- cbCompareMessage, proto, orig_msg
- ).addErrback(ebAppend, proto, orig_msg)
-
-
-def ebAppend(failure, proto, raw):
- """
- Errorback for the append operation
- """
- print "ERROR WHILE APPENDING!"
- print failure.getTraceback()
-
-
-def cbPickMessage(result, proto):
- """
- Pick a message.
- """
- return proto.fetchSpecific(
- '%s' % result,
- headerType='',
- headerArgs=['BODY.PEEK[]'],
- ).addCallback(cbCompareMessage, proto)
-
-
-def cbCompareMessage(result, proto, raw):
- """
- Display message and compare it with the original one.
- """
- parts_orig = get_msg_parts(raw)
-
- if result:
- keys = result.keys()
- keys.sort()
- else:
- print "[-] GOT NO RESULT"
- return proto.logout()
-
- latest = max(keys)
-
- fetched_msg = result[latest][0][2]
- parts_fetched = get_msg_parts(fetched_msg)
-
- equal = compare_msg_parts(
- parts_orig,
- parts_fetched)
-
- if equal:
- print "[+] MESSAGES MATCH"
- return cbAppendNextMessage(proto)
- else:
- print "[-] ERROR: MESSAGES DO NOT MATCH !!!"
- print " ABORTING COMPARISON..."
- # FIXME logout and print the subject ...
- return proto.logout()
-
-
-def cbClose(result):
- """
- Close the connection when we finish everything.
- """
- from twisted.internet import reactor
- reactor.stop()
-
-
-def main():
- import glob
- import sys
-
- if len(sys.argv) != 4:
- print "Usage: regressions <user> <pass> <samples-folder>"
- sys.exit()
-
- hostname = "localhost"
- port = "1984"
- username = sys.argv[1]
- password = sys.argv[2]
-
- samplesdir = sys.argv[3]
-
- if not os.path.isdir(samplesdir):
- print ("Could not find samples folder! "
- "Make sure of copying mail_breaker contents there.")
- sys.exit()
-
- samples = glob.glob(samplesdir + '/*')
-
- global SAMPLES
- SAMPLES = []
- SAMPLES += samples
-
- onConn = defer.Deferred(
- ).addCallback(
- cbServerGreeting, username, password
- ).addErrback(
- ebConnection
- ).addBoth(cbClose)
-
- factory = SimpleIMAP4ClientFactory(username, onConn)
-
- from twisted.internet import reactor
- reactor.connectTCP(hostname, int(port), factory)
- reactor.run()
-
-
-if __name__ == '__main__':
- main()