summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/imap/tests
diff options
context:
space:
mode:
authorKali Kaneko (leap communications) <kali@leap.se>2016-09-01 00:06:52 -0400
committerKali Kaneko (leap communications) <kali@leap.se>2016-09-01 00:06:52 -0400
commitf826bc473a0c50fcf55f4e8609aa07622814f902 (patch)
tree32665c6608c536c3b3db5b3fa504567043171c91 /src/leap/bitmask/mail/imap/tests
parentc74c51f9fc753c6a870f7c14d5fdd10b152e0991 (diff)
[tests] move tests to root folder
Diffstat (limited to 'src/leap/bitmask/mail/imap/tests')
-rw-r--r--src/leap/bitmask/mail/imap/tests/.gitignore1
-rwxr-xr-xsrc/leap/bitmask/mail/imap/tests/getmail344
-rwxr-xr-xsrc/leap/bitmask/mail/imap/tests/imapclient.py207
-rwxr-xr-xsrc/leap/bitmask/mail/imap/tests/regressions_mime_struct461
l---------src/leap/bitmask/mail/imap/tests/rfc822.message1
l---------src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message1
l---------src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message1
l---------src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message1
l---------src/leap/bitmask/mail/imap/tests/rfc822.multi.message1
l---------src/leap/bitmask/mail/imap/tests/rfc822.plain.message1
-rwxr-xr-xsrc/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh178
-rw-r--r--src/leap/bitmask/mail/imap/tests/test_imap.py1060
-rw-r--r--src/leap/bitmask/mail/imap/tests/walktree.py127
13 files changed, 0 insertions, 2384 deletions
diff --git a/src/leap/bitmask/mail/imap/tests/.gitignore b/src/leap/bitmask/mail/imap/tests/.gitignore
deleted file mode 100644
index 60baa9cb..00000000
--- a/src/leap/bitmask/mail/imap/tests/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-data/*
diff --git a/src/leap/bitmask/mail/imap/tests/getmail b/src/leap/bitmask/mail/imap/tests/getmail
deleted file mode 100755
index dd3fa0bb..00000000
--- a/src/leap/bitmask/mail/imap/tests/getmail
+++ /dev/null
@@ -1,344 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE in twisted for details.
-
-# Modifications by LEAP Developers 2014 to fit
-# Bitmask configuration settings.
-"""
-Simple IMAP4 client which displays the subjects of all messages in a
-particular mailbox.
-"""
-
-import os
-import sys
-
-from twisted.internet import protocol
-from twisted.internet import ssl
-from twisted.internet import defer
-from twisted.internet import stdio
-from twisted.mail import imap4
-from twisted.protocols import basic
-from twisted.python import log
-
-# Global options stored here from main
-_opts = {}
-
-
-class TrivialPrompter(basic.LineReceiver):
- from os import linesep as delimiter
-
- promptDeferred = None
-
- def prompt(self, msg):
- assert self.promptDeferred is None
- self.display(msg)
- self.promptDeferred = defer.Deferred()
- return self.promptDeferred
-
- def display(self, msg):
- self.transport.write(msg)
-
- def lineReceived(self, line):
- if self.promptDeferred is None:
- return
- d, self.promptDeferred = self.promptDeferred, None
- d.callback(line)
-
-
-class SimpleIMAP4Client(imap4.IMAP4Client):
- """
- A client with callbacks for greeting messages from an IMAP server.
- """
- greetDeferred = None
-
- def serverGreeting(self, caps):
- self.serverCapabilities = caps
- if self.greetDeferred is not None:
- d, self.greetDeferred = self.greetDeferred, None
- d.callback(self)
-
-
-class SimpleIMAP4ClientFactory(protocol.ClientFactory):
- usedUp = False
-
- protocol = SimpleIMAP4Client
-
- def __init__(self, username, onConn):
- self.ctx = ssl.ClientContextFactory()
-
- self.username = username
- self.onConn = onConn
-
- def buildProtocol(self, addr):
- """
- Initiate the protocol instance. Since we are building a simple IMAP
- client, we don't bother checking what capabilities the server has. We
- just add all the authenticators twisted.mail has.
- """
- assert not self.usedUp
- self.usedUp = True
-
- p = self.protocol(self.ctx)
- p.factory = self
- p.greetDeferred = self.onConn
-
- p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
- p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
- p.registerAuthenticator(
- imap4.CramMD5ClientAuthenticator(self.username))
-
- return p
-
- def clientConnectionFailed(self, connector, reason):
- d, self.onConn = self.onConn, None
- d.errback(reason)
-
-
-def cbServerGreeting(proto, username, password):
- """
- Initial callback - invoked after the server sends us its greet message.
- """
- # Hook up stdio
- tp = TrivialPrompter()
- stdio.StandardIO(tp)
-
- # And make it easily accessible
- proto.prompt = tp.prompt
- proto.display = tp.display
-
- # Try to authenticate securely
- return proto.authenticate(
- password).addCallback(
- cbAuthentication,
- proto).addErrback(
- ebAuthentication, proto, username, password
- )
-
-
-def ebConnection(reason):
- """
- Fallback error-handler. If anything goes wrong, log it and quit.
- """
- log.startLogging(sys.stdout)
- log.err(reason)
- return reason
-
-
-def cbAuthentication(result, proto):
- """
- Callback after authentication has succeeded.
-
- Lists a bunch of mailboxes.
- """
- return proto.list("", "*"
- ).addCallback(cbMailboxList, proto
- )
-
-
-def ebAuthentication(failure, proto, username, password):
- """
- Errback invoked when authentication fails.
-
- If it failed because no SASL mechanisms match, offer the user the choice
- of logging in insecurely.
-
- If you are trying to connect to your Gmail account, you will be here!
- """
- failure.trap(imap4.NoSupportedAuthentication)
- return InsecureLogin(proto, username, password)
-
-
-def InsecureLogin(proto, username, password):
- """
- insecure-login.
- """
- return proto.login(username, password
- ).addCallback(cbAuthentication, proto
- )
-
-
-def cbMailboxList(result, proto):
- """
- Callback invoked when a list of mailboxes has been retrieved.
- If we have a selected mailbox in the global options, we directly pick it.
- Otherwise, we offer a prompt to let user choose one.
- """
- all_mbox_list = [e[2] for e in result]
- s = '\n'.join(['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(all_mbox_list)), all_mbox_list)])
- if not s:
- return defer.fail(Exception("No mailboxes exist on server!"))
-
- selected_mailbox = _opts.get('mailbox')
-
- if not selected_mailbox:
- return proto.prompt(s + "\nWhich mailbox? [1] "
- ).addCallback(cbPickMailbox, proto, all_mbox_list
- )
- else:
- mboxes_lower = map(lambda s: s.lower(), all_mbox_list)
- index = mboxes_lower.index(selected_mailbox.lower()) + 1
- return cbPickMailbox(index, proto, all_mbox_list)
-
-
-def cbPickMailbox(result, proto, mboxes):
- """
- When the user selects a mailbox, "examine" it.
- """
- mbox = mboxes[int(result or '1') - 1]
- return proto.examine(mbox
- ).addCallback(cbExamineMbox, proto
- )
-
-
-def cbExamineMbox(result, proto):
- """
- Callback invoked when examine command completes.
-
- Retrieve the subject header of every message in the mailbox.
- """
- return proto.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'],
- ).addCallback(cbFetch, proto,
- )
-
-
-def cbFetch(result, proto):
- """
- Display a listing of the messages in the mailbox, based on the collected
- headers.
- """
- selected_subject = _opts.get('subject', None)
- index = None
-
- if result:
- keys = result.keys()
- keys.sort()
-
- if selected_subject:
- for k in keys:
- # remove 'Subject: ' preffix plus eol
- subject = result[k][0][2][9:].rstrip('\r\n')
- if subject.lower() == selected_subject.lower():
- index = k
- break
- else:
- for k in keys:
- proto.display('%s %s' % (k, result[k][0][2]))
- else:
- print "Hey, an empty mailbox!"
-
- if not index:
- return proto.prompt("\nWhich message? [1] (Q quits) "
- ).addCallback(cbPickMessage, proto)
- else:
- return cbPickMessage(index, proto)
-
-
-def cbPickMessage(result, proto):
- """
- Pick a message.
- """
- if result == "Q":
- print "Bye!"
- return proto.logout()
-
- return proto.fetchSpecific(
- '%s' % result,
- headerType='',
- headerArgs=['BODY.PEEK[]'],
- ).addCallback(cbShowmessage, proto)
-
-
-def cbShowmessage(result, proto):
- """
- Display message.
- """
- if result:
- keys = result.keys()
- keys.sort()
- for k in keys:
- proto.display('%s %s' % (k, result[k][0][2]))
- else:
- print "Hey, an empty message!"
-
- return proto.logout()
-
-
-def cbClose(result):
- """
- Close the connection when we finish everything.
- """
- from twisted.internet import reactor
- reactor.stop()
-
-
-def main():
- import argparse
- import ConfigParser
- import sys
- from twisted.internet import reactor
-
- description = (
- 'Get messages from a LEAP IMAP Proxy.\nThis is a '
- 'debugging tool, do not use this to retrieve any sensitive '
- 'information, or we will send ninjas to your house!')
- epilog = (
- 'In case you want to automate the usage of this utility '
- 'you can place your credentials in a file pointed by '
- 'BITMASK_CREDENTIALS. You need to have a [Credentials] '
- 'section, with username=<user@provider> and password fields')
-
- parser = argparse.ArgumentParser(description=description, epilog=epilog)
- credentials = os.environ.get('BITMASK_CREDENTIALS')
-
- if credentials:
- try:
- config = ConfigParser.ConfigParser()
- config.read(credentials)
- username = config.get('Credentials', 'username')
- password = config.get('Credentials', 'password')
- except Exception, e:
- print "Error reading credentials file: {0}".format(e)
- sys.exit()
- else:
- parser.add_argument('username', type=str)
- parser.add_argument('password', type=str)
-
- parser.add_argument('--mailbox', dest='mailbox', default=None,
- help='Which mailbox to retrieve. Empty for interactive prompt.')
- parser.add_argument('--subject', dest='subject', default=None,
- help='A subject for retrieve a mail that matches. Empty for interactive prompt.')
-
- ns = parser.parse_args()
-
- if not credentials:
- username = ns.username
- password = ns.password
-
- _opts['mailbox'] = ns.mailbox
- _opts['subject'] = ns.subject
-
- hostname = "localhost"
- port = "1984"
-
- onConn = defer.Deferred(
- ).addCallback(cbServerGreeting, username, password
- ).addErrback(ebConnection
- ).addBoth(cbClose)
-
- factory = SimpleIMAP4ClientFactory(username, onConn)
-
- if port == '993':
- reactor.connectSSL(
- hostname, int(port), factory, ssl.ClientContextFactory())
- else:
- if not port:
- port = 143
- reactor.connectTCP(hostname, int(port), factory)
- reactor.run()
-
-
-if __name__ == '__main__':
- main()
diff --git a/src/leap/bitmask/mail/imap/tests/imapclient.py b/src/leap/bitmask/mail/imap/tests/imapclient.py
deleted file mode 100755
index c353ceed..00000000
--- a/src/leap/bitmask/mail/imap/tests/imapclient.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-Simple IMAP4 client which connects to our custome
-IMAP4 server: imapserver.py.
-"""
-
-import sys
-
-from twisted.internet import protocol
-from twisted.internet import defer
-from twisted.internet import stdio
-from twisted.mail import imap4
-from twisted.protocols import basic
-from twisted.python import util
-from twisted.python import log
-
-
-class TrivialPrompter(basic.LineReceiver):
- # from os import linesep as delimiter
-
- promptDeferred = None
-
- def prompt(self, msg):
- assert self.promptDeferred is None
- self.display(msg)
- self.promptDeferred = defer.Deferred()
- return self.promptDeferred
-
- def display(self, msg):
- self.transport.write(msg)
-
- def lineReceived(self, line):
- if self.promptDeferred is None:
- return
- d, self.promptDeferred = self.promptDeferred, None
- d.callback(line)
-
-
-class SimpleIMAP4Client(imap4.IMAP4Client):
-
- """
- Add callbacks when the client receives greeting messages from
- an IMAP server.
- """
- greetDeferred = None
-
- def serverGreeting(self, caps):
- self.serverCapabilities = caps
- if self.greetDeferred is not None:
- d, self.greetDeferred = self.greetDeferred, None
- d.callback(self)
-
-
-class SimpleIMAP4ClientFactory(protocol.ClientFactory):
- usedUp = False
- protocol = SimpleIMAP4Client
-
- def __init__(self, username, onConn):
- self.username = username
- self.onConn = onConn
-
- def buildProtocol(self, addr):
- assert not self.usedUp
- self.usedUp = True
-
- p = self.protocol()
- p.factory = self
- p.greetDeferred = self.onConn
-
- p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
- p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
- p.registerAuthenticator(
- imap4.CramMD5ClientAuthenticator(self.username))
-
- return p
-
- def clientConnectionFailed(self, connector, reason):
- d, self.onConn = self.onConn, None
- d.errback(reason)
-
-
-def cbServerGreeting(proto, username, password):
- """
- Initial callback - invoked after the server sends us its greet message.
- """
- # Hook up stdio
- tp = TrivialPrompter()
- stdio.StandardIO(tp)
-
- # And make it easily accessible
- proto.prompt = tp.prompt
- proto.display = tp.display
-
- # Try to authenticate securely
- return proto.authenticate(
- password).addCallback(
- cbAuthentication, proto).addErrback(
- ebAuthentication, proto, username, password)
-
-
-def ebConnection(reason):
- """
- Fallback error-handler. If anything goes wrong, log it and quit.
- """
- log.startLogging(sys.stdout)
- log.err(reason)
- return reason
-
-
-def cbAuthentication(result, proto):
- """
- Callback after authentication has succeeded.
- List a bunch of mailboxes.
- """
- return proto.list("", "*"
- ).addCallback(cbMailboxList, proto
- )
-
-
-def ebAuthentication(failure, proto, username, password):
- """
- Errback invoked when authentication fails.
- If it failed because no SASL mechanisms match, offer the user the choice
- of logging in insecurely.
- If you are trying to connect to your Gmail account, you will be here!
- """
- failure.trap(imap4.NoSupportedAuthentication)
- return proto.prompt(
- "No secure authentication available. Login insecurely? (y/N) "
- ).addCallback(cbInsecureLogin, proto, username, password
- )
-
-
-def cbInsecureLogin(result, proto, username, password):
- """
- Callback for "insecure-login" prompt.
- """
- if result.lower() == "y":
- # If they said yes, do it.
- return proto.login(username, password
- ).addCallback(cbAuthentication, proto
- )
- return defer.fail(Exception("Login failed for security reasons."))
-
-
-def cbMailboxList(result, proto):
- """
- Callback invoked when a list of mailboxes has been retrieved.
- """
- result = [e[2] for e in result]
- s = '\n'.join(
- ['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(result)), result)])
- if not s:
- return defer.fail(Exception("No mailboxes exist on server!"))
- return proto.prompt(s + "\nWhich mailbox? [1] "
- ).addCallback(cbPickMailbox, proto, result
- )
-
-
-def cbPickMailbox(result, proto, mboxes):
- """
- When the user selects a mailbox, "examine" it.
- """
- mbox = mboxes[int(result or '1') - 1]
- return proto.status(mbox, 'MESSAGES', 'UNSEEN'
- ).addCallback(cbMboxStatus, proto)
-
-
-def cbMboxStatus(result, proto):
- print "You have %s messages (%s unseen)!" % (
- result['MESSAGES'], result['UNSEEN'])
- return proto.logout()
-
-
-def cbClose(result):
- """
- Close the connection when we finish everything.
- """
- from twisted.internet import reactor
- reactor.stop()
-
-
-def main():
- hostname = raw_input('IMAP4 Server Hostname: ')
- port = raw_input('IMAP4 Server Port (the default is 143): ')
- username = raw_input('IMAP4 Username: ')
- password = util.getPassword('IMAP4 Password: ')
-
- onConn = defer.Deferred(
- ).addCallback(cbServerGreeting, username, password
- ).addErrback(ebConnection
- ).addBoth(cbClose)
-
- factory = SimpleIMAP4ClientFactory(username, onConn)
-
- from twisted.internet import reactor
- conn = reactor.connectTCP(hostname, int(port), factory)
- reactor.run()
-
-
-if __name__ == '__main__':
- main()
diff --git a/src/leap/bitmask/mail/imap/tests/regressions_mime_struct b/src/leap/bitmask/mail/imap/tests/regressions_mime_struct
deleted file mode 100755
index 03326646..00000000
--- a/src/leap/bitmask/mail/imap/tests/regressions_mime_struct
+++ /dev/null
@@ -1,461 +0,0 @@
-#!/usr/bin/env python
-
-# -*- coding: utf-8 -*-
-# regression_mime_struct
-# Copyright (C) 2014 LEAP
-# Copyright (c) Twisted Matrix Laboratories.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Simple Regression Tests for checking MIME struct handling using IMAP4 client.
-
-Iterates trough all mails under a given folder and tries to APPEND them to
-the server being tested. After FETCHING the pushed message, it compares
-the received version with the one that was saved, and exits with an error
-code if they do not match.
-"""
-import os
-import StringIO
-import sys
-
-from email.parser import Parser
-
-from twisted.internet import protocol
-from twisted.internet import ssl
-from twisted.internet import defer
-from twisted.internet import stdio
-from twisted.mail import imap4
-from twisted.protocols import basic
-from twisted.python import log
-
-
-REGRESSIONS_FOLDER = os.environ.get(
- "REGRESSIONS_FOLDER", "regressions_test")
-print "[+] Using regressions folder:", REGRESSIONS_FOLDER
-
-parser = Parser()
-
-
-def get_msg_parts(raw):
- """
- Return a representation of the parts of a message suitable for
- comparison.
-
- :param raw: string for the message
- :type raw: str
- """
- m = parser.parsestr(raw)
- return [dict(part.items())
- if part.is_multipart()
- else part.get_payload()
- for part in m.walk()]
-
-
-def compare_msg_parts(a, b):
- """
- Compare two sequences of parts of messages.
-
- :param a: part sequence for message a
- :param b: part sequence for message b
-
- :return: True if both message sequences are equivalent.
- :rtype: bool
- """
- # XXX This could be smarter and show the differences in the
- # different parts when/where they differ.
- #import pprint; pprint.pprint(a[0])
- #import pprint; pprint.pprint(b[0])
-
- def lowerkey(d):
- return dict((k.lower(), v.replace('\r', ''))
- for k, v in d.iteritems())
-
- def eq(x, y):
- # For dicts, we compare a variation with their keys
- # in lowercase, and \r removed from their values
- if all(map(lambda i: isinstance(i, dict), (x, y))):
- x, y = map(lowerkey, (x, y))
- return x == y
-
- compare_vector = map(lambda tup: eq(tup[0], tup[1]), zip(a, b))
- all_match = all(compare_vector)
-
- if not all_match:
- print "PARTS MISMATCH!"
- print "vector: ", compare_vector
- index = compare_vector.index(False)
- from pprint import pprint
- print "Expected:"
- pprint(a[index])
- print ("***")
- print "Found:"
- pprint(b[index])
- print
-
- return all_match
-
-
-def get_fd(string):
- """
- Return a file descriptor with the passed string
- as content.
- """
- fd = StringIO.StringIO()
- fd.write(string)
- fd.seek(0)
- return fd
-
-
-class TrivialPrompter(basic.LineReceiver):
- promptDeferred = None
-
- def prompt(self, msg):
- assert self.promptDeferred is None
- self.display(msg)
- self.promptDeferred = defer.Deferred()
- return self.promptDeferred
-
- def display(self, msg):
- self.transport.write(msg)
-
- def lineReceived(self, line):
- if self.promptDeferred is None:
- return
- d, self.promptDeferred = self.promptDeferred, None
- d.callback(line)
-
-
-class SimpleIMAP4Client(imap4.IMAP4Client):
- """
- A client with callbacks for greeting messages from an IMAP server.
- """
- greetDeferred = None
-
- def serverGreeting(self, caps):
- self.serverCapabilities = caps
- if self.greetDeferred is not None:
- d, self.greetDeferred = self.greetDeferred, None
- d.callback(self)
-
-
-class SimpleIMAP4ClientFactory(protocol.ClientFactory):
- usedUp = False
- protocol = SimpleIMAP4Client
-
- def __init__(self, username, onConn):
- self.ctx = ssl.ClientContextFactory()
-
- self.username = username
- self.onConn = onConn
-
- def buildProtocol(self, addr):
- """
- Initiate the protocol instance. Since we are building a simple IMAP
- client, we don't bother checking what capabilities the server has. We
- just add all the authenticators twisted.mail has. Note: Gmail no
- longer uses any of the methods below, it's been using XOAUTH since
- 2010.
- """
- assert not self.usedUp
- self.usedUp = True
-
- p = self.protocol(self.ctx)
- p.factory = self
- p.greetDeferred = self.onConn
-
- p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
- p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
- p.registerAuthenticator(
- imap4.CramMD5ClientAuthenticator(self.username))
-
- return p
-
- def clientConnectionFailed(self, connector, reason):
- d, self.onConn = self.onConn, None
- d.errback(reason)
-
-
-def cbServerGreeting(proto, username, password):
- """
- Initial callback - invoked after the server sends us its greet message.
- """
- # Hook up stdio
- tp = TrivialPrompter()
- stdio.StandardIO(tp)
-
- # And make it easily accessible
- proto.prompt = tp.prompt
- proto.display = tp.display
-
- # Try to authenticate securely
- return proto.authenticate(
- password).addCallback(
- cbAuthentication,
- proto).addErrback(
- ebAuthentication, proto, username, password
- )
-
-
-def ebConnection(reason):
- """
- Fallback error-handler. If anything goes wrong, log it and quit.
- """
- log.startLogging(sys.stdout)
- log.err(reason)
- return reason
-
-
-def cbAuthentication(result, proto):
- """
- Callback after authentication has succeeded.
-
- Lists a bunch of mailboxes.
- """
- return proto.select(
- REGRESSIONS_FOLDER
- ).addCallback(
- cbSelectMbox, proto
- ).addErrback(
- ebSelectMbox, proto, REGRESSIONS_FOLDER)
-
-
-def ebAuthentication(failure, proto, username, password):
- """
- Errback invoked when authentication fails.
-
- If it failed because no SASL mechanisms match, offer the user the choice
- of logging in insecurely.
-
- If you are trying to connect to your Gmail account, you will be here!
- """
- failure.trap(imap4.NoSupportedAuthentication)
- return InsecureLogin(proto, username, password)
-
-
-def InsecureLogin(proto, username, password):
- """
- Raise insecure-login error.
- """
- return proto.login(
- username, password
- ).addCallback(
- cbAuthentication, proto)
-
-
-def cbSelectMbox(result, proto):
- """
- Callback invoked when select command finishes successfully.
-
- If any message is in the test folder, it will flag them as deleted and
- expunge.
- If no messages found, it will start with the APPEND tests.
- """
- print "SELECT: %s EXISTS " % result.get("EXISTS", "??")
-
- if result["EXISTS"] != 0:
- # Flag as deleted, expunge, and do an examine again.
- print "There is mail here, will delete..."
- return cbDeleteAndExpungeTestFolder(proto)
-
- else:
- return cbAppendNextMessage(proto)
-
-
-def ebSelectMbox(failure, proto, folder):
- """
- Errback invoked when the examine command fails.
-
- Creates the folder.
- """
- log.err(failure)
- log.msg("Folder %r does not exist. Creating..." % (folder,))
- return proto.create(folder).addCallback(cbAuthentication, proto)
-
-
-def ebExpunge(failure):
- log.err(failure)
-
-
-def cbDeleteAndExpungeTestFolder(proto):
- """
- Callback invoked fom cbExamineMbox when the number of messages in the
- mailbox is not zero. It flags all messages as deleted and expunge the
- mailbox.
- """
- return proto.setFlags(
- "1:*", ("\\Deleted",)
- ).addCallback(
- lambda r: proto.expunge()
- ).addCallback(
- cbExpunge, proto
- ).addErrback(
- ebExpunge)
-
-
-def cbExpunge(result, proto):
- return proto.select(
- REGRESSIONS_FOLDER
- ).addCallback(
- cbSelectMbox, proto
- ).addErrback(ebSettingDeleted, proto)
-
-
-def ebSettingDeleted(failure, proto):
- """
- Report errors during deletion of messages in the mailbox.
- """
- print failure.getTraceback()
-
-
-def cbAppendNextMessage(proto):
- """
- Appends the next message in the global queue to the test folder.
- """
- # 1. Get the next test message from global tuple.
- try:
- next_sample = SAMPLES.pop()
- except IndexError:
- # we're done!
- return proto.logout()
-
- print "\nAPPEND %s" % (next_sample,)
- raw = open(next_sample).read()
- msg = get_fd(raw)
- return proto.append(
- REGRESSIONS_FOLDER, msg
- ).addCallback(
- lambda r: proto.select(REGRESSIONS_FOLDER)
- ).addCallback(
- cbAppend, proto, raw
- ).addErrback(
- ebAppend, proto, raw)
-
-
-def cbAppend(result, proto, orig_msg):
- """
- Fetches the message right after an append.
- """
- # XXX keep account of highest UID
- uid = "1:*"
-
- return proto.fetchSpecific(
- '%s' % uid,
- headerType='',
- headerArgs=['BODY.PEEK[]'],
- ).addCallback(
- cbCompareMessage, proto, orig_msg
- ).addErrback(ebAppend, proto, orig_msg)
-
-
-def ebAppend(failure, proto, raw):
- """
- Errorback for the append operation
- """
- print "ERROR WHILE APPENDING!"
- print failure.getTraceback()
-
-
-def cbPickMessage(result, proto):
- """
- Pick a message.
- """
- return proto.fetchSpecific(
- '%s' % result,
- headerType='',
- headerArgs=['BODY.PEEK[]'],
- ).addCallback(cbCompareMessage, proto)
-
-
-def cbCompareMessage(result, proto, raw):
- """
- Display message and compare it with the original one.
- """
- parts_orig = get_msg_parts(raw)
-
- if result:
- keys = result.keys()
- keys.sort()
- else:
- print "[-] GOT NO RESULT"
- return proto.logout()
-
- latest = max(keys)
-
- fetched_msg = result[latest][0][2]
- parts_fetched = get_msg_parts(fetched_msg)
-
- equal = compare_msg_parts(
- parts_orig,
- parts_fetched)
-
- if equal:
- print "[+] MESSAGES MATCH"
- return cbAppendNextMessage(proto)
- else:
- print "[-] ERROR: MESSAGES DO NOT MATCH !!!"
- print " ABORTING COMPARISON..."
- # FIXME logout and print the subject ...
- return proto.logout()
-
-
-def cbClose(result):
- """
- Close the connection when we finish everything.
- """
- from twisted.internet import reactor
- reactor.stop()
-
-
-def main():
- import glob
- import sys
-
- if len(sys.argv) != 4:
- print "Usage: regressions <user> <pass> <samples-folder>"
- sys.exit()
-
- hostname = "localhost"
- port = "1984"
- username = sys.argv[1]
- password = sys.argv[2]
-
- samplesdir = sys.argv[3]
-
- if not os.path.isdir(samplesdir):
- print ("Could not find samples folder! "
- "Make sure of copying mail_breaker contents there.")
- sys.exit()
-
- samples = glob.glob(samplesdir + '/*')
-
- global SAMPLES
- SAMPLES = []
- SAMPLES += samples
-
- onConn = defer.Deferred(
- ).addCallback(
- cbServerGreeting, username, password
- ).addErrback(
- ebConnection
- ).addBoth(cbClose)
-
- factory = SimpleIMAP4ClientFactory(username, onConn)
-
- from twisted.internet import reactor
- reactor.connectTCP(hostname, int(port), factory)
- reactor.run()
-
-
-if __name__ == '__main__':
- main()
diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.message b/src/leap/bitmask/mail/imap/tests/rfc822.message
deleted file mode 120000
index b19cc280..00000000
--- a/src/leap/bitmask/mail/imap/tests/rfc822.message
+++ /dev/null
@@ -1 +0,0 @@
-../../tests/rfc822.message \ No newline at end of file
diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message
deleted file mode 120000
index e0aa678b..00000000
--- a/src/leap/bitmask/mail/imap/tests/rfc822.multi-minimal.message
+++ /dev/null
@@ -1 +0,0 @@
-../../tests/rfc822.multi-minimal.message \ No newline at end of file
diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message
deleted file mode 120000
index 306d0dec..00000000
--- a/src/leap/bitmask/mail/imap/tests/rfc822.multi-nested.message
+++ /dev/null
@@ -1 +0,0 @@
-../../tests/rfc822.multi-nested.message \ No newline at end of file
diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message
deleted file mode 120000
index 4172244e..00000000
--- a/src/leap/bitmask/mail/imap/tests/rfc822.multi-signed.message
+++ /dev/null
@@ -1 +0,0 @@
-../../tests/rfc822.multi-signed.message \ No newline at end of file
diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.multi.message b/src/leap/bitmask/mail/imap/tests/rfc822.multi.message
deleted file mode 120000
index 62057d20..00000000
--- a/src/leap/bitmask/mail/imap/tests/rfc822.multi.message
+++ /dev/null
@@ -1 +0,0 @@
-../../tests/rfc822.multi.message \ No newline at end of file
diff --git a/src/leap/bitmask/mail/imap/tests/rfc822.plain.message b/src/leap/bitmask/mail/imap/tests/rfc822.plain.message
deleted file mode 120000
index 5bab0e8d..00000000
--- a/src/leap/bitmask/mail/imap/tests/rfc822.plain.message
+++ /dev/null
@@ -1 +0,0 @@
-../../tests/rfc822.plain.message \ No newline at end of file
diff --git a/src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh b/src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh
deleted file mode 100755
index 544facaa..00000000
--- a/src/leap/bitmask/mail/imap/tests/stress_tests_imap.zsh
+++ /dev/null
@@ -1,178 +0,0 @@
-#!/bin/zsh
-# BATCH STRESS TEST FOR IMAP ----------------------
-# http://imgs.xkcd.com/comics/science.jpg
-#
-# Run imaptest against a LEAP IMAP server
-# for a fixed period of time, and collect output.
-#
-# Author: Kali Kaneko
-# Date: 2014 01 26
-#
-# To run, you need to have `imaptest` in your path.
-# See:
-# http://www.imapwiki.org/ImapTest/Installation
-#
-# For the tests, I'm using a 10MB file sample that
-# can be downloaded from:
-# http://www.dovecot.org/tmp/dovecot-crlf
-#
-# Want to contribute to benchmarking?
-#
-# 1. Create a pristine account in a bitmask provider.
-#
-# 2. Launch your bitmask client, with different flags
-# if you desire.
-#
-# For example to try the nosync flag in sqlite:
-#
-# LEAP_SQLITE_NOSYNC=1 bitmask --debug -N --offline -l /tmp/leap.log
-#
-# 3. Run at several points in time (ie: just after
-# launching the bitmask client. one minute after,
-# ten minutes after)
-#
-# mkdir data
-# cd data
-# ../leap_tests_imap.zsh | tee sqlite_nosync_run2.log
-#
-# 4. Submit your results to: kali at leap dot se
-# together with the logs of the bitmask run.
-#
-# Please provide also details about your system, and
-# the type of hard disk setup you are running against.
-#
-
-# ------------------------------------------------
-# Edit these variables if you are too lazy to pass
-# the user and mbox as parameters. Like me.
-
-USER="test_f14@dev.bitmask.net"
-MBOX="~/leap/imaptest/data/dovecot-crlf"
-
-HOST="localhost"
-PORT="1984"
-
-# in case you have it aliased
-GREP="/bin/grep"
-IMAPTEST="imaptest"
-
-# -----------------------------------------------
-#
-# These should be kept constant across benchmarking
-# runs across different machines, for comparability.
-
-DURATION=200
-NUM_MSG=200
-
-
-# TODO add another function, and a cli flag, to be able
-# to take several aggretates spaced in time, along a period
-# of several minutes.
-
-imaptest_cmd() {
- stdbuf -o0 ${IMAPTEST} user=${USER} pass=1234 host=${HOST} \
- port=${PORT} mbox=${MBOX} clients=1 msgs=${NUM_MSG} \
- no_pipelining 2>/dev/null
-}
-
-stress_imap() {
- mkfifo imap_pipe
- cat imap_pipe | tee output &
- imaptest_cmd >> imap_pipe
-}
-
-wait_and_kill() {
- while :
- do
- sleep $DURATION
- pkill -2 imaptest
- rm imap_pipe
- break
- done
-}
-
-print_results() {
- sleep 1
- echo
- echo
- echo "AGGREGATED RESULTS"
- echo "----------------------"
- echo "\tavg\tstdev"
- $GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \
- gawk '
-function avg(data, count) {
- sum=0;
- for( x=0; x <= count-1; x++) {
- sum += data[x];
- }
- return sum/count;
-}
-function std_dev(data, count) {
- sum=0;
- for( x=0; x <= count-1; x++) {
- sum += data[x];
- }
- average = sum/count;
-
- sumsq=0;
- for( x=0; x <= count-1; x++) {
- sumsq += (data[x] - average)^2;
- }
- return sqrt(sumsq/count);
-}
-BEGIN {
- cnt = 0
-} END {
-
-printf("LOGI:\t%04.2lf\t%04.2f\n", avg(array[1], NR), std_dev(array[1], NR));
-printf("LIST:\t%04.2lf\t%04.2f\n", avg(array[2], NR), std_dev(array[2], NR));
-printf("STAT:\t%04.2lf\t%04.2f\n", avg(array[3], NR), std_dev(array[3], NR));
-printf("SELE:\t%04.2lf\t%04.2f\n", avg(array[4], NR), std_dev(array[4], NR));
-printf("FETC:\t%04.2lf\t%04.2f\n", avg(array[5], NR), std_dev(array[5], NR));
-printf("FET2:\t%04.2lf\t%04.2f\n", avg(array[6], NR), std_dev(array[6], NR));
-printf("STOR:\t%04.2lf\t%04.2f\n", avg(array[7], NR), std_dev(array[7], NR));
-printf("DELE:\t%04.2lf\t%04.2f\n", avg(array[8], NR), std_dev(array[8], NR));
-printf("EXPU:\t%04.2lf\t%04.2f\n", avg(array[9], NR), std_dev(array[9], NR));
-printf("APPE:\t%04.2lf\t%04.2f\n", avg(array[10], NR), std_dev(array[10], NR));
-printf("LOGO:\t%04.2lf\t%04.2f\n", avg(array[11], NR), std_dev(array[11], NR));
-
-print ""
-print "TOT samples", NR;
-}
-{
- it = cnt++;
- array[1][it] = $1;
- array[2][it] = $2;
- array[3][it] = $3;
- array[4][it] = $4;
- array[5][it] = $5;
- array[6][it] = $6;
- array[7][it] = $7;
- array[8][it] = $8;
- array[9][it] = $9;
- array[10][it] = $10;
- array[11][it] = $11;
-}'
-}
-
-
-{ test $1 = "--help" } && {
- echo "Usage: $0 [user@provider] [/path/to/sample.mbox]"
- exit 0
-}
-
-# If the first parameter is passed, take it as the user
-{ test $1 } && {
- USER=$1
-}
-
-# If the second parameter is passed, take it as the mbox
-{ test $2 } && {
- MBOX=$2
-}
-
-echo "[+] LEAP IMAP TESTS"
-echo "[+] Running imaptest for $DURATION seconds with $NUM_MSG messages"
-wait_and_kill &
-stress_imap
-print_results
diff --git a/src/leap/bitmask/mail/imap/tests/test_imap.py b/src/leap/bitmask/mail/imap/tests/test_imap.py
deleted file mode 100644
index 9cca17ff..00000000
--- a/src/leap/bitmask/mail/imap/tests/test_imap.py
+++ /dev/null
@@ -1,1060 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_imap.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test case for leap.email.imap.server
-TestCases taken from twisted tests and modified to make them work
-against our implementation of the IMAPAccount.
-
-@authors: Kali Kaneko, <kali@leap.se>
-XXX add authors from the original twisted tests.
-
-@license: GPLv3, see included LICENSE file
-"""
-# XXX review license of the original tests!!!
-import os
-import string
-import types
-
-
-from twisted.mail import imap4
-from twisted.internet import defer
-from twisted.python import util
-from twisted.python import failure
-
-from twisted import cred
-
-from leap.mail.imap.mailbox import IMAPMailbox
-from leap.mail.imap.messages import CaseInsensitiveDict
-from leap.mail.testing.imap import IMAP4HelperMixin
-
-
-TEST_USER = "testuser@leap.se"
-TEST_PASSWD = "1234"
-
-
-def strip(f):
- return lambda result, f=f: f()
-
-
-def sortNest(l):
- l = l[:]
- l.sort()
- for i in range(len(l)):
- if isinstance(l[i], types.ListType):
- l[i] = sortNest(l[i])
- elif isinstance(l[i], types.TupleType):
- l[i] = tuple(sortNest(list(l[i])))
- return l
-
-
-class TestRealm:
- """
- A minimal auth realm for testing purposes only
- """
- theAccount = None
-
- def requestAvatar(self, avatarId, mind, *interfaces):
- return imap4.IAccount, self.theAccount, lambda: None
-
-#
-# TestCases
-#
-
-# DEBUG ---
-# from twisted.internet.base import DelayedCall
-# DelayedCall.debug = True
-
-
-class LEAPIMAP4ServerTestCase(IMAP4HelperMixin):
-
- """
- Tests for the generic behavior of the LEAPIMAP4Server
- which, right now, it's just implemented in this test file as
- LEAPIMAPServer. We will move the implementation, together with
- authentication bits, to leap.mail.imap.server so it can be instantiated
- from the tac file.
-
- Right now this TestCase tries to mimmick as close as possible the
- organization from the twisted.mail.imap tests so we can achieve
- a complete implementation. The order in which they appear reflect
- the intended order of implementation.
- """
-
- #
- # mailboxes operations
- #
-
- def testCreate(self):
- """
- Test whether we can create mailboxes
- """
- succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'foobox')
- fail = ('testbox', 'test/box')
- acc = self.server.theAccount
-
- def cb():
- self.result.append(1)
-
- def eb(failure):
- self.result.append(0)
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def create():
- create_deferreds = []
- for name in succeed + fail:
- d = self.client.create(name)
- d.addCallback(strip(cb)).addErrback(eb)
- create_deferreds.append(d)
- dd = defer.gatherResults(create_deferreds)
- dd.addCallbacks(self._cbStopClient, self._ebGeneral)
- return dd
-
- self.result = []
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(create))
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2], consumeErrors=True)
- d.addCallback(lambda _: acc.account.list_all_mailbox_names())
- return d.addCallback(self._cbTestCreate, succeed, fail)
-
- def _cbTestCreate(self, mailboxes, succeed, fail):
- self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail))
-
- answers = ([u'INBOX', u'testbox', u'test/box', u'test',
- u'test/box/box', 'foobox'])
- self.assertEqual(sorted(mailboxes), sorted([a for a in answers]))
-
- def testDelete(self):
- """
- Test whether we can delete mailboxes
- """
- def add_mailbox():
- return self.server.theAccount.addMailbox('test-delete/me')
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def delete():
- return self.client.delete('test-delete/me')
-
- acc = self.server.theAccount.account
-
- d1 = self.connected.addCallback(add_mailbox)
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(delete), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: acc.list_all_mailbox_names())
- d.addCallback(lambda mboxes: self.assertEqual(
- mboxes, ['INBOX']))
- return d
-
- def testIllegalInboxDelete(self):
- """
- Test what happens if we try to delete the user Inbox.
- We expect that operation to fail.
- """
- self.stashed = None
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def delete():
- return self.client.delete('inbox')
-
- def stash(result):
- self.stashed = result
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(delete), self._ebGeneral)
- d1.addBoth(stash)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
- failure.Failure)))
- return d
-
- def testNonExistentDelete(self):
- """
- Test what happens if we try to delete a non-existent mailbox.
- We expect an error raised stating 'No such mailbox'
- """
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def delete():
- return self.client.delete('delete/me')
- self.failure = failure
-
- def deleteFailed(failure):
- self.failure = failure
-
- self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(delete)).addErrback(deleteFailed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertTrue(
- str(self.failure.value).startswith('No such mailbox')))
- return d
-
- def testIllegalDelete(self):
- """
- Try deleting a mailbox with sub-folders, and \NoSelect flag set.
- An exception is expected.
- """
- acc = self.server.theAccount
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def create_mailboxes():
- d1 = acc.addMailbox('delete')
- d2 = acc.addMailbox('delete/me')
- d = defer.gatherResults([d1, d2])
- return d
-
- def get_noselect_mailbox(mboxes):
- mbox = mboxes[0]
- return mbox.setFlags((r'\Noselect',))
-
- def delete_mbox(ignored):
- return self.client.delete('delete')
-
- def deleteFailed(failure):
- self.failure = failure
-
- self.failure = None
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(create_mailboxes))
- d1.addCallback(get_noselect_mailbox)
-
- d1.addCallback(delete_mbox).addErrback(deleteFailed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- expected = ("Hierarchically inferior mailboxes exist "
- "and \\Noselect is set")
- d.addCallback(lambda _:
- self.assertTrue(self.failure is not None))
- d.addCallback(lambda _:
- self.assertEqual(str(self.failure.value), expected))
- return d
-
- # FIXME --- this test sometimes FAILS (timing issue).
- # Some of the deferreds used in the rename op is not waiting for the
- # operations properly
- def testRename(self):
- """
- Test whether we can rename a mailbox
- """
- def create_mbox():
- return self.server.theAccount.addMailbox('oldmbox')
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def rename():
- return self.client.rename('oldmbox', 'newname')
-
- d1 = self.connected.addCallback(strip(create_mbox))
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.server.theAccount.account.list_all_mailbox_names())
- d.addCallback(lambda mboxes:
- self.assertItemsEqual(mboxes, ['INBOX', 'newname']))
- return d
-
- def testIllegalInboxRename(self):
- """
- Try to rename inbox. We expect it to fail. Then it would be not
- an inbox anymore, would it?
- """
- self.stashed = None
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def rename():
- return self.client.rename('inbox', 'frotz')
-
- def stash(stuff):
- self.stashed = stuff
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addBoth(stash)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.failUnless(isinstance(
- self.stashed, failure.Failure)))
- return d
-
- def testHierarchicalRename(self):
- """
- Try to rename hierarchical mailboxes
- """
- acc = self.server.theAccount
-
- def add_mailboxes():
- return defer.gatherResults([
- acc.addMailbox('oldmbox/m1'),
- acc.addMailbox('oldmbox/m2')])
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def rename():
- return self.client.rename('oldmbox', 'newname')
-
- d1 = self.connected.addCallback(strip(add_mailboxes))
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: acc.account.list_all_mailbox_names())
- return d.addCallback(self._cbTestHierarchicalRename)
-
- def _cbTestHierarchicalRename(self, mailboxes):
- expected = ['INBOX', 'newname/m1', 'newname/m2']
- self.assertEqual(sorted(mailboxes), sorted([s for s in expected]))
-
- def testSubscribe(self):
- """
- Test whether we can mark a mailbox as subscribed to
- """
- acc = self.server.theAccount
-
- def add_mailbox():
- return acc.addMailbox('this/mbox')
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def subscribe():
- return self.client.subscribe('this/mbox')
-
- def get_subscriptions(ignored):
- return self.server.theAccount.getSubscriptions()
-
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(subscribe), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(get_subscriptions)
- d.addCallback(lambda subscriptions:
- self.assertEqual(subscriptions,
- ['this/mbox']))
- return d
-
- def testUnsubscribe(self):
- """
- Test whether we can unsubscribe from a set of mailboxes
- """
- acc = self.server.theAccount
-
- def add_mailboxes():
- return defer.gatherResults([
- acc.addMailbox('this/mbox'),
- acc.addMailbox('that/mbox')])
-
- def dc1():
- return acc.subscribe('this/mbox')
-
- def dc2():
- return acc.subscribe('that/mbox')
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def unsubscribe():
- return self.client.unsubscribe('this/mbox')
-
- def get_subscriptions(ignored):
- return acc.getSubscriptions()
-
- d1 = self.connected.addCallback(strip(add_mailboxes))
- d1.addCallback(strip(login))
- d1.addCallback(strip(dc1))
- d1.addCallback(strip(dc2))
- d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(get_subscriptions)
- d.addCallback(lambda subscriptions:
- self.assertEqual(subscriptions,
- ['that/mbox']))
- return d
-
- def testSelect(self):
- """
- Try to select a mailbox
- """
- mbox_name = "TESTMAILBOXSELECT"
- self.selectedArgs = None
-
- acc = self.server.theAccount
-
- def add_mailbox():
- return acc.addMailbox(mbox_name, creation_ts=42)
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def select():
- def selected(args):
- self.selectedArgs = args
- self._cbStopClient(None)
- d = self.client.select(mbox_name)
- d.addCallback(selected)
- return d
-
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallback(strip(select))
- # d1.addErrback(self._ebGeneral)
-
- d2 = self.loopback()
-
- d = defer.gatherResults([d1, d2])
- d.addCallback(self._cbTestSelect)
- return d
-
- def _cbTestSelect(self, ignored):
- self.assertTrue(self.selectedArgs is not None)
-
- self.assertEqual(self.selectedArgs, {
- 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42,
- 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged',
- '\\Deleted', '\\Draft', '\\Recent', 'List'),
- 'READ-WRITE': True
- })
-
- #
- # capabilities
- #
-
- def testCapability(self):
- caps = {}
-
- def getCaps():
- def gotCaps(c):
- caps.update(c)
- self.server.transport.loseConnection()
- return self.client.getCapabilities().addCallback(gotCaps)
-
- d1 = self.connected
- d1.addCallback(
- strip(getCaps)).addErrback(self._ebGeneral)
-
- d = defer.gatherResults([self.loopback(), d1])
- expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'LITERAL+': None,
- 'IDLE': None}
- d.addCallback(lambda _: self.assertEqual(expected, caps))
- return d
-
- def testCapabilityWithAuth(self):
- caps = {}
- self.server.challengers[
- 'CRAM-MD5'] = cred.credentials.CramMD5Credentials
-
- def getCaps():
- def gotCaps(c):
- caps.update(c)
- self.server.transport.loseConnection()
- return self.client.getCapabilities().addCallback(gotCaps)
- d1 = self.connected.addCallback(
- strip(getCaps)).addErrback(self._ebGeneral)
-
- d = defer.gatherResults([self.loopback(), d1])
-
- expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
- 'IDLE': None, 'LITERAL+': None,
- 'AUTH': ['CRAM-MD5']}
-
- d.addCallback(lambda _: self.assertEqual(expCap, caps))
- return d
-
- #
- # authentication
- #
-
- def testLogout(self):
- """
- Test log out
- """
- self.loggedOut = 0
-
- def logout():
- def setLoggedOut():
- self.loggedOut = 1
- self.client.logout().addCallback(strip(setLoggedOut))
- self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
- d = self.loopback()
- return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1))
-
- def testNoop(self):
- """
- Test noop command
- """
- self.responses = None
-
- def noop():
- def setResponses(responses):
- self.responses = responses
- self.server.transport.loseConnection()
- self.client.noop().addCallback(setResponses)
- self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
- d = self.loopback()
- return d.addCallback(lambda _: self.assertEqual(self.responses, []))
-
- def testLogin(self):
- """
- Test login
- """
- def login():
- d = self.client.login(TEST_USER, TEST_PASSWD)
- d.addCallback(self._cbStopClient)
- d1 = self.connected.addCallback(
- strip(login)).addErrback(self._ebGeneral)
- d = defer.gatherResults([d1, self.loopback()])
- return d.addCallback(self._cbTestLogin)
-
- def _cbTestLogin(self, ignored):
- self.assertEqual(self.server.state, 'auth')
-
- def testFailedLogin(self):
- """
- Test bad login
- """
- def login():
- d = self.client.login("bad_user@leap.se", TEST_PASSWD)
- d.addBoth(self._cbStopClient)
-
- d1 = self.connected.addCallback(
- strip(login)).addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestFailedLogin)
-
- def _cbTestFailedLogin(self, ignored):
- self.assertEqual(self.server.state, 'unauth')
- self.assertEqual(self.server.account, None)
-
- def testLoginRequiringQuoting(self):
- """
- Test login requiring quoting
- """
- self.server.checker.userid = '{test}user@leap.se'
- self.server.checker.password = '{test}password'
-
- def login():
- d = self.client.login('{test}user@leap.se', '{test}password')
- d.addBoth(self._cbStopClient)
-
- d1 = self.connected.addCallback(
- strip(login)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestLoginRequiringQuoting)
-
- def _cbTestLoginRequiringQuoting(self, ignored):
- self.assertEqual(self.server.state, 'auth')
-
- #
- # Inspection
- #
-
- def testNamespace(self):
- """
- Test retrieving namespace
- """
- self.namespaceArgs = None
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def namespace():
- def gotNamespace(args):
- self.namespaceArgs = args
- self._cbStopClient(None)
- return self.client.namespace().addCallback(gotNamespace)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(namespace))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEqual(self.namespaceArgs,
- [[['', '/']], [], []]))
- return d
-
- def testExamine(self):
- """
- L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and
- returns a L{Deferred} which fires with a C{dict} with as many of the
- following keys as the server includes in its response: C{'FLAGS'},
- C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'},
- C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}.
-
- Unfortunately the server doesn't generate all of these so it's hard to
- test the client's handling of them here. See
- L{IMAP4ClientExamineTests} below.
-
- See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2,
- for details.
- """
- # TODO implement the IMAP4ClientExamineTests testcase.
- mbox_name = "test_mailbox_e"
- acc = self.server.theAccount
- self.examinedArgs = None
-
- def add_mailbox():
- return acc.addMailbox(mbox_name, creation_ts=42)
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def examine():
- def examined(args):
- self.examinedArgs = args
- self._cbStopClient(None)
- d = self.client.examine(mbox_name)
- d.addCallback(examined)
- return d
-
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallback(strip(examine))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestExamine)
-
- def _cbTestExamine(self, ignored):
- self.assertEqual(self.examinedArgs, {
- 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42,
- 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged',
- '\\Deleted', '\\Draft', '\\Recent', 'List'),
- 'READ-WRITE': False})
-
- def _listSetup(self, f, f2=None):
-
- acc = self.server.theAccount
-
- def dc1():
- return acc.addMailbox('root_subthing', creation_ts=42)
-
- def dc2():
- return acc.addMailbox('root_another_thing', creation_ts=42)
-
- def dc3():
- return acc.addMailbox('non_root_subthing', creation_ts=42)
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def listed(answers):
- self.listed = answers
-
- self.listed = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(dc1))
- d1.addCallback(strip(dc2))
- d1.addCallback(strip(dc3))
-
- if f2 is not None:
- d1.addCallback(f2)
-
- d1.addCallbacks(strip(f), self._ebGeneral)
- d1.addCallbacks(listed, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
-
- def testList(self):
- """
- Test List command
- """
- def list():
- return self.client.list('root', '%')
-
- d = self._listSetup(list)
- d.addCallback(lambda listed: self.assertEqual(
- sortNest(listed),
- sortNest([
- (IMAPMailbox.init_flags, "/", "root_subthing"),
- (IMAPMailbox.init_flags, "/", "root_another_thing")
- ])
- ))
- return d
-
- def testLSub(self):
- """
- Test LSub command
- """
- acc = self.server.theAccount
-
- def subs_mailbox():
- # why not client.subscribe instead?
- return acc.subscribe('root_subthing')
-
- def lsub():
- return self.client.lsub('root', '%')
-
- d = self._listSetup(lsub, strip(subs_mailbox))
- d.addCallback(self.assertEqual,
- [(IMAPMailbox.init_flags, "/", "root_subthing")])
- return d
-
- def testStatus(self):
- """
- Test Status command
- """
- acc = self.server.theAccount
-
- def add_mailbox():
- return acc.addMailbox('root_subthings')
-
- # XXX FIXME ---- should populate this a little bit,
- # with unseen etc...
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def status():
- return self.client.status(
- 'root_subthings', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
-
- def statused(result):
- self.statused = result
-
- self.statused = None
-
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(status), self._ebGeneral)
- d1.addCallbacks(statused, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEqual(
- self.statused,
- {'MESSAGES': 0, 'UIDNEXT': '1', 'UNSEEN': 0}
- ))
- return d
-
- def testFailedStatus(self):
- """
- Test failed status command with a non-existent mailbox
- """
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def status():
- return self.client.status(
- 'root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
-
- def statused(result):
- self.statused = result
-
- def failed(failure):
- self.failure = failure
-
- self.statused = self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(status), self._ebGeneral)
- d1.addCallbacks(statused, failed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(
- self._cbTestFailedStatus)
-
- def _cbTestFailedStatus(self, ignored):
- self.assertEqual(
- self.statused, None
- )
- self.assertEqual(
- self.failure.value.args,
- ('Could not open mailbox',)
- )
-
- #
- # messages
- #
-
- def testFullAppend(self):
- """
- Test appending a full message to the mailbox
- """
- infile = util.sibpath(__file__, 'rfc822.message')
- message = open(infile)
- acc = self.server.theAccount
- mailbox_name = "appendmbox/subthing"
-
- def add_mailbox():
- return acc.addMailbox(mailbox_name)
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def append():
- return self.client.append(
- mailbox_name, message,
- ('\\SEEN', '\\DELETED'),
- 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
- )
-
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
-
- d.addCallback(lambda _: acc.getMailbox(mailbox_name))
- d.addCallback(lambda mb: mb.fetch(imap4.MessageSet(start=1), True))
- return d.addCallback(self._cbTestFullAppend, infile)
-
- def _cbTestFullAppend(self, fetched, infile):
- fetched = list(fetched)
- self.assertTrue(len(fetched) == 1)
- self.assertTrue(len(fetched[0]) == 2)
- uid, msg = fetched[0]
- parsed = self.parser.parse(open(infile))
- expected_body = parsed.get_payload()
- expected_headers = CaseInsensitiveDict(parsed.items())
-
- def assert_flags(flags):
- self.assertEqual(
- set(('\\SEEN', '\\DELETED')),
- set(flags))
-
- def assert_date(date):
- self.assertEqual(
- 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
- date)
-
- def assert_body(body):
- gotbody = body.read()
- self.assertEqual(expected_body, gotbody)
-
- def assert_headers(headers):
- self.assertItemsEqual(map(string.lower, expected_headers), headers)
-
- d = defer.maybeDeferred(msg.getFlags)
- d.addCallback(assert_flags)
-
- d.addCallback(lambda _: defer.maybeDeferred(msg.getInternalDate))
- d.addCallback(assert_date)
-
- d.addCallback(
- lambda _: defer.maybeDeferred(
- msg.getBodyFile, self._soledad))
- d.addCallback(assert_body)
-
- d.addCallback(lambda _: defer.maybeDeferred(msg.getHeaders, True))
- d.addCallback(assert_headers)
-
- return d
-
- def testPartialAppend(self):
- """
- Test partially appending a message to the mailbox
- """
- # TODO this test sometimes will fail because of the notify_just_mdoc
- infile = util.sibpath(__file__, 'rfc822.message')
-
- acc = self.server.theAccount
-
- def add_mailbox():
- return acc.addMailbox('PARTIAL/SUBTHING')
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def append():
- message = file(infile)
- return self.client.sendCommand(
- imap4.Command(
- 'APPEND',
- 'PARTIAL/SUBTHING (\\SEEN) "Right now" '
- '{%d}' % os.path.getsize(infile),
- (), self.client._IMAP4Client__cbContinueAppend, message
- )
- )
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
-
- d.addCallback(lambda _: acc.getMailbox("PARTIAL/SUBTHING"))
- d.addCallback(lambda mb: mb.fetch(imap4.MessageSet(start=1), True))
- return d.addCallback(
- self._cbTestPartialAppend, infile)
-
- def _cbTestPartialAppend(self, fetched, infile):
- fetched = list(fetched)
- self.assertTrue(len(fetched) == 1)
- self.assertTrue(len(fetched[0]) == 2)
- uid, msg = fetched[0]
- parsed = self.parser.parse(open(infile))
- expected_body = parsed.get_payload()
-
- def assert_flags(flags):
- self.assertEqual(
- set((['\\SEEN'])), set(flags))
-
- def assert_body(body):
- gotbody = body.read()
- self.assertEqual(expected_body, gotbody)
-
- d = defer.maybeDeferred(msg.getFlags)
- d.addCallback(assert_flags)
-
- d.addCallback(lambda _: defer.maybeDeferred(msg.getBodyFile))
- d.addCallback(assert_body)
- return d
-
- def testCheck(self):
- """
- Test check command
- """
- def add_mailbox():
- return self.server.theAccount.addMailbox('root/subthing')
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def select():
- return self.client.select('root/subthing')
-
- def check():
- return self.client.check()
-
- d = self.connected.addCallbacks(
- strip(add_mailbox), self._ebGeneral)
- d.addCallbacks(lambda _: login(), self._ebGeneral)
- d.addCallbacks(strip(select), self._ebGeneral)
- d.addCallbacks(strip(check), self._ebGeneral)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d, d2])
-
- # Okay, that was much fun indeed
-
- def testExpunge(self):
- """
- Test expunge command
- """
- acc = self.server.theAccount
- mailbox_name = 'mailboxexpunge'
-
- def add_mailbox():
- return acc.addMailbox(mailbox_name)
-
- def login():
- return self.client.login(TEST_USER, TEST_PASSWD)
-
- def select():
- return self.client.select(mailbox_name)
-
- def save_mailbox(mailbox):
- self.mailbox = mailbox
-
- def get_mailbox():
- d = acc.getMailbox(mailbox_name)
- d.addCallback(save_mailbox)
- return d
-
- def add_messages():
- d = self.mailbox.addMessage(
- 'test 1', flags=('\\Deleted', 'AnotherFlag'),
- notify_just_mdoc=False)
- d.addCallback(lambda _: self.mailbox.addMessage(
- 'test 2', flags=('AnotherFlag',),
- notify_just_mdoc=False))
- d.addCallback(lambda _: self.mailbox.addMessage(
- 'test 3', flags=('\\Deleted',),
- notify_just_mdoc=False))
- return d
-
- def expunge():
- return self.client.expunge()
-
- def expunged(results):
- self.failIf(self.server.mbox is None)
- self.results = results
-
- self.results = None
- d1 = self.connected.addCallback(strip(add_mailbox))
- d1.addCallback(strip(login))
- d1.addCallback(strip(get_mailbox))
- d1.addCallbacks(strip(add_messages), self._ebGeneral)
- d1.addCallbacks(strip(select), self._ebGeneral)
- d1.addCallbacks(strip(expunge), self._ebGeneral)
- d1.addCallbacks(expunged, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.mailbox.getMessageCount())
- return d.addCallback(self._cbTestExpunge)
-
- def _cbTestExpunge(self, count):
- # we only left 1 mssage with no deleted flag
- self.assertEqual(count, 1)
- # the uids of the deleted messages
- self.assertItemsEqual(self.results, [1, 3])
-
-
-class AccountTestCase(IMAP4HelperMixin):
- """
- Test the Account.
- """
- def _create_empty_mailbox(self):
- return self.server.theAccount.addMailbox('')
-
- def _create_one_mailbox(self):
- return self.server.theAccount.addMailbox('one')
-
- def test_illegalMailboxCreate(self):
- self.assertRaises(AssertionError, self._create_empty_mailbox)
-
-
-class IMAP4ServerSearchTestCase(IMAP4HelperMixin):
- """
- Tests for the behavior of the search_* functions in L{imap5.IMAP4Server}.
- """
- # XXX coming soon to your screens!
- pass
diff --git a/src/leap/bitmask/mail/imap/tests/walktree.py b/src/leap/bitmask/mail/imap/tests/walktree.py
deleted file mode 100644
index f259a556..00000000
--- a/src/leap/bitmask/mail/imap/tests/walktree.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# -*- coding: utf-8 -*-
-# walktree.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for the walktree module.
-"""
-import os
-import sys
-import pprint
-from email import parser
-
-from leap.mail import walk as W
-
-DEBUG = os.environ.get("BITMASK_MAIL_DEBUG")
-
-
-p = parser.Parser()
-
-# TODO pass an argument of the type of message
-
-##################################################
-# Input from hell
-
-if len(sys.argv) > 1:
- FILENAME = sys.argv[1]
-else:
- FILENAME = "rfc822.multi-signed.message"
-
-"""
-FILENAME = "rfc822.plain.message"
-FILENAME = "rfc822.multi-minimal.message"
-"""
-
-msg = p.parse(open(FILENAME))
-DO_CHECK = False
-#################################################
-
-parts = W.get_parts(msg)
-
-if DEBUG:
- def trim(item):
- item = item[:10]
- [trim(part["phash"]) for part in parts if part.get('phash', None)]
-
-raw_docs = list(W.get_raw_docs(msg, parts))
-
-body_phash_fun = [W.get_body_phash_simple,
- W.get_body_phash_multi][int(msg.is_multipart())]
-body_phash = body_phash_fun(W.get_payloads(msg))
-parts_map = W.walk_msg_tree(parts, body_phash=body_phash)
-
-
-# TODO add missing headers!
-expected = {
- 'body': '1ddfa80485',
- 'multi': True,
- 'part_map': {
- 1: {
- 'headers': {'Content-Disposition': 'inline',
- 'Content-Type': 'multipart/mixed; '
- 'boundary="z0eOaCaDLjvTGF2l"'},
- 'multi': True,
- 'part_map': {1: {'ctype': 'text/plain',
- 'headers': [
- ('Content-Type',
- 'text/plain; charset=utf-8'),
- ('Content-Disposition',
- 'inline'),
- ('Content-Transfer-Encoding',
- 'quoted-printable')],
- 'multi': False,
- 'parts': 1,
- 'phash': '1ddfa80485',
- 'size': 206},
- 2: {'ctype': 'text/plain',
- 'headers': [('Content-Type',
- 'text/plain; charset=us-ascii'),
- ('Content-Disposition',
- 'attachment; '
- 'filename="attach.txt"')],
- 'multi': False,
- 'parts': 1,
- 'phash': '7a94e4d769',
- 'size': 133},
- 3: {'ctype': 'application/octet-stream',
- 'headers': [('Content-Type',
- 'application/octet-stream'),
- ('Content-Disposition',
- 'attachment; filename="hack.ico"'),
- ('Content-Transfer-Encoding',
- 'base64')],
- 'multi': False,
- 'parts': 1,
- 'phash': 'c42cccebbd',
- 'size': 12736}}},
- 2: {'ctype': 'application/pgp-signature',
- 'headers': [('Content-Type', 'application/pgp-signature')],
- 'multi': False,
- 'parts': 1,
- 'phash': '8f49fbf749',
- 'size': 877}}}
-
-if DEBUG and DO_CHECK:
- # TODO turn this into a proper unittest
- assert(parts_map == expected)
- print "Structure: OK"
-
-
-print
-print "RAW DOCS"
-pprint.pprint(raw_docs)
-print
-print "PARTS MAP"
-pprint.pprint(parts_map)