diff options
| -rw-r--r-- | mail/pkg/requirements.pip | 2 | ||||
| -rw-r--r-- | mail/setup.py | 14 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/tests/test_imap.py | 4 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/__init__.py | 78 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/smtprelay.py | 498 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/tests/test_smtprelay.py | 78 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/185CA770.key (renamed from mail/src/leap/mail/smtp/tests/185CA770.key) | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/185CA770.pub (renamed from mail/src/leap/mail/smtp/tests/185CA770.pub) | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/__init__.py (renamed from mail/src/leap/mail/smtp/tests/__init__.py) | 98 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/mail.txt (renamed from mail/src/leap/mail/smtp/tests/mail.txt) | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/test_smtprelay.py | 212 | 
11 files changed, 735 insertions, 249 deletions
| diff --git a/mail/pkg/requirements.pip b/mail/pkg/requirements.pip index 1b5e5ef..5f4e7ef 100644 --- a/mail/pkg/requirements.pip +++ b/mail/pkg/requirements.pip @@ -1,3 +1,3 @@ -leap.common +leap.common>=0.2.3-dev  leap.soledad  twisted diff --git a/mail/setup.py b/mail/setup.py index 4de7251..8d4e415 100644 --- a/mail/setup.py +++ b/mail/setup.py @@ -21,10 +21,15 @@ from setuptools import setup, find_packages  requirements = [      "leap.soledad", -    "leap.common", +    "leap.common>=0.2.3-dev",      "twisted",  ] +tests_requirements = [ +    'setuptools-trial', +    'mock', +] +  # XXX add classifiers, docs  setup( @@ -40,7 +45,8 @@ setup(      ),      namespace_packages=["leap"],      package_dir={'': 'src'}, -    packages=find_packages('src'), -    #test_suite='leap.mail.tests', -    #install_requires=requirements, +    packages=find_packages('src', exclude=['leap.mail.tests']), +    test_suite='leap.mail.tests', +    install_requires=requirements, +    tests_require=tests_requirements,  ) diff --git a/mail/src/leap/mail/imap/tests/test_imap.py b/mail/src/leap/mail/imap/tests/test_imap.py index 6792e4b..7bfa1d7 100644 --- a/mail/src/leap/mail/imap/tests/test_imap.py +++ b/mail/src/leap/mail/imap/tests/test_imap.py @@ -49,8 +49,8 @@ import u1db  from leap.common.testing.basetest import BaseLeapTest  from leap.mail.imap.server import SoledadMailbox -from leap.mail.imap.tests import PUBLIC_KEY -from leap.mail.imap.tests import PRIVATE_KEY +from leap.mail.tests.imap import PUBLIC_KEY +from leap.mail.tests.imap import PRIVATE_KEY  from leap.soledad import Soledad  from leap.soledad.util import GPGWrapper diff --git a/mail/src/leap/mail/smtp/__init__.py b/mail/src/leap/mail/smtp/__init__.py index e69de29..13af015 100644 --- a/mail/src/leap/mail/smtp/__init__.py +++ b/mail/src/leap/mail/smtp/__init__.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay helper function. +""" + + +from twisted.application import internet, service +from twisted.internet import reactor + + +from leap import soledad +from leap.common.keymanager import KeyManager +from leap.mail.smtp.smtprelay import SMTPFactory + + +def setup_smtp_relay(port, keymanager, smtp_host, smtp_port, smtp_username, +                    smtp_password, encrypted_only): +    """ +    Setup SMTP relay to run with Twisted. + +    This function sets up the SMTP relay configuration and the Twisted +    reactor. + +    @param port: The port in which to run the server. +    @type port: int +    @param keymanager: A Key Manager from where to get recipients' public +        keys. +    @type keymanager: leap.common.keymanager.KeyManager +    @param smtp_host: The hostname of the remote SMTP server. +    @type smtp_host: str +    @param smtp_port:  The port of the remote SMTP server. +    @type smtp_port: int +    @param smtp_username: The username used to connect to remote SMTP server. +    @type smtp_username: str +    @param smtp_password: The password used to connect to remote SMTP server. +    @type smtp_password: str +    @param encrypted_only: Whether the SMTP relay should send unencrypted mail +        or not. +    @type encrypted_only: bool +    """ +    # The configuration for the SMTP relay is a dict with the following +    # format: +    # +    # { +    #     'host': '<host>', +    #     'port': <int>, +    #     'username': '<username>', +    #     'password': '<password>', +    #     'encrypted_only': <True/False> +    # } +    config = { +        'host': smtp_host, +        'port': smtp_port, +        'username': smtp_username, +        'password': smtp_password, +        'encrypted_only': encrypted_only +    } + +    # configure the use of this service with twistd +    factory = SMTPFactory(keymanager, config) +    reactor.listenTCP(port, factory) diff --git a/mail/src/leap/mail/smtp/smtprelay.py b/mail/src/leap/mail/smtp/smtprelay.py index 6479873..bd18fb5 100644 --- a/mail/src/leap/mail/smtp/smtprelay.py +++ b/mail/src/leap/mail/smtp/smtprelay.py @@ -1,42 +1,186 @@ +# -*- coding: utf-8 -*- +# smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +  """  LEAP SMTP encrypted relay.  """  import re +import os  import gnupg +import tempfile + +  from zope.interface import implements  from StringIO import StringIO  from twisted.mail import smtp  from twisted.internet.protocol import ServerFactory  from twisted.internet import reactor  from twisted.internet import defer -from twisted.application import internet, service  from twisted.python import log  from email.Header import Header -from leap import soledad +from email.utils import parseaddr +from email.parser import Parser + + +from leap.common.check import leap_assert, leap_assert_type +from leap.common.keymanager import KeyManager +from leap.common.keymanager.openpgp import ( +    encrypt_asym, +    OpenPGPKey, +) +from leap.common.keymanager.errors import KeyNotFound +from leap.common.keymanager.keys import is_address + +# +# Exceptions +# -class SMTPInfoNotAvailable(Exception): +class MalformedConfig(Exception): +    """ +    Raised when the configuration dictionary passed as parameter is malformed. +    """      pass +# +# Helper utilities +# + +HOST_KEY = 'host' +PORT_KEY = 'port' +USERNAME_KEY = 'username' +PASSWORD_KEY = 'password' +ENCRYPTED_ONLY_KEY = 'encrypted_only' + + +def assert_config_structure(config): +    """ +    Assert that C{config} is a dict with the following structure: + +        { +            HOST_KEY: '<str>', +            PORT_KEY: <int>, +            USERNAME_KEY: '<str>', +            PASSWORD_KEY: '<str>', +            ENCRYPTED_ONLY_KEY: <bool>, +        } + +    @param config: The dictionary to check. +    @type config: dict +    """ +    # assert smtp config structure is valid +    leap_assert_type(config, dict) +    leap_assert(HOST_KEY in config) +    leap_assert_type(config[HOST_KEY], str) +    leap_assert(PORT_KEY in config) +    leap_assert_type(config[PORT_KEY], int) +    leap_assert(USERNAME_KEY in config) +    leap_assert_type(config[USERNAME_KEY], str) +    leap_assert(PASSWORD_KEY in config) +    leap_assert_type(config[PASSWORD_KEY], str) +    leap_assert(ENCRYPTED_ONLY_KEY in config) +    leap_assert_type(config[ENCRYPTED_ONLY_KEY], bool) +    # assert received params are not empty +    leap_assert(config[HOST_KEY] != '') +    leap_assert(config[PORT_KEY] is not 0) +    leap_assert(config[USERNAME_KEY] != '') +    leap_assert(config[PASSWORD_KEY] != '') + + +def strip_and_validate_address(address): +    """ +    Helper function to (eventually) strip and validate an email address. + +    This function first checks whether the incomming C{address} is of the form +    '<something>' and, if it is, then '<' and '>' are removed from the +    address. After that, a simple validation for user@provider form is +    carried. + +    @param address: The address to be validated. +    @type address: str + +    @return: The (eventually) stripped address. +    @rtype: str + +    @raise smtp.SMTPBadRcpt: Raised if C{address} does not have the expected +        format. +    """ +    leap_assert(address is not None) +    leap_assert_type(address, str) +    _, address = parseaddr(address) +    leap_assert(address != '') +    if is_address(address): +        return address +    raise smtp.SMTPBadRcpt(address) + + +# +# SMTPFactory +# +  class SMTPFactory(ServerFactory):      """      Factory for an SMTP server with encrypted relaying capabilities.      """ -    def __init__(self, soledad, gpg=None): -        self._soledad = soledad -        self._gpg = gpg +    def __init__(self, keymanager, config): +        """ +        @param keymanager: A KeyManager for retrieving recipient's keys. +        @type keymanager: leap.common.keymanager.KeyManager +        @param config: A dictionary with smtp configuration. Should have +            the following structure: +                { +                    HOST_KEY: '<str>', +                    PORT_KEY: <int>, +                    USERNAME_KEY: '<str>', +                    PASSWORD_KEY: '<str>', +                    ENCRYPTED_ONLY_KEY: <bool>, +                } +        @type config: dict +        """ +        # assert params +        leap_assert_type(keymanager, KeyManager) +        assert_config_structure(config) +        # and store them +        self._km = keymanager +        self._config = config      def buildProtocol(self, addr): -        "Return a protocol suitable for the job." -        # TODO: use ESMTP here. -        smtpProtocol = smtp.SMTP(SMTPDelivery(self._soledad, self._gpg)) +        """ +        Return a protocol suitable for the job. + +        @param addr: An address, e.g. a TCP (host, port). +        @type addr:  twisted.internet.interfaces.IAddress + +        @return: The protocol. +        @rtype: SMTPDelivery +        """ +        # If needed, we might use ESMTPDelivery here instead. +        smtpProtocol = smtp.SMTP(SMTPDelivery(self._km, self._config))          smtpProtocol.factory = self          return smtpProtocol +# +# SMTPDelivery +# +  class SMTPDelivery(object):      """      Validate email addresses and handle message delivery. @@ -44,14 +188,44 @@ class SMTPDelivery(object):      implements(smtp.IMessageDelivery) -    def __init__(self, soledad, gpg=None): -        self._soledad = soledad -        if gpg: -            self._gpg = gpg -        else: -            self._gpg = GPGWrapper() +    def __init__(self, keymanager, config): +        """ +        @param keymanager: A KeyManager for retrieving recipient's keys. +        @type keymanager: leap.common.keymanager.KeyManager +        @param config: A dictionary with smtp configuration. Should have +            the following structure: +                { +                    HOST_KEY: '<str>', +                    PORT_KEY: <int>, +                    USERNAME_KEY: '<str>', +                    PASSWORD_KEY: '<str>', +                    ENCRYPTED_ONLY_KEY: <bool>, +                } +        @type config: dict +        """ +        # assert params +        leap_assert_type(keymanager, KeyManager) +        assert_config_structure(config) +        # and store them +        self._km = keymanager +        self._config = config      def receivedHeader(self, helo, origin, recipients): +        """ +        Generate the Received header for a message. + +        @param helo: The argument to the HELO command and the client's IP +            address. +        @type helo: (str, str) +        @param origin: The address the message is from. +        @type origin: twisted.mail.smtp.Address +        @param recipients: A list of the addresses for which this message is +            bound. +        @type: list of twisted.mail.smtp.User + +        @return: The full "Received" header string. +        @type: str +        """          myHostname, clientIP = helo          headerValue = "by %s from %s with ESMTP ; %s" % (              myHostname, clientIP, smtp.rfc822date()) @@ -59,188 +233,232 @@ class SMTPDelivery(object):          return "Received: %s" % Header(headerValue)      def validateTo(self, user): -        """Assert existence of and trust on recipient's GPG public key.""" +        """ +        Validate the address for which the message is destined. + +        For now, it just asserts the existence of the user's key if the +        configuration option ENCRYPTED_ONLY_KEY is True. + +        @param user: The address to validate. +        @type: twisted.mail.smtp.User + +        @return: A Deferred which becomes, or a callable which takes no +            arguments and returns an object implementing IMessage. This will +            be called and the returned object used to deliver the message when +            it arrives. +        @rtype: no-argument callable + +        @raise SMTPBadRcpt: Raised if messages to the address are not to be +            accepted. +        """          # try to find recipient's public key          try: -            # this will raise an exception if key is not found -            trust = self._gpg.find_key(user.dest.addrstr)['trust'] -            # if key is not ultimatelly trusted, then the message will not -            # be encrypted. So, we check for this below -            #if trust != 'u': -            #    raise smtp.SMTPBadRcpt(user) +            address = strip_and_validate_address(user.dest.addrstr) +            pubkey = self._km.get_key(address, OpenPGPKey)              log.msg("Accepting mail for %s..." % user.dest) -            return lambda: EncryptedMessage(user, soledad=self._soledad, -                                            gpg=self._gpg) -        except LookupError: +        except KeyNotFound:              # if key was not found, check config to see if will send anyway. -            if self.encrypted_only: -                raise smtp.SMTPBadRcpt(user) -            # TODO: send signal to cli/gui that this user's key was not found? +            if self._config[ENCRYPTED_ONLY_KEY]: +                raise smtp.SMTPBadRcpt(user.dest.addrstr)              log.msg("Warning: will send an unencrypted message (because "                      "encrypted_only' is set to False).") +        return lambda: EncryptedMessage(user, self._km, self._config) + +    def validateFrom(self, helo, origin): +        """ +        Validate the address from which the message originates. -    def validateFrom(self, helo, originAddress): +        @param helo: The argument to the HELO command and the client's IP +            address. +        @type: (str, str) +        @param origin: The address the message is from. +        @type origin: twisted.mail.smtp.Address + +        @return: origin or a Deferred whose callback will be passed origin. +        @rtype: Deferred or Address + +        @raise twisted.mail.smtp.SMTPBadSender: Raised if messages from this +            address are not to be accepted. +        """          # accept mail from anywhere. To reject an address, raise          # smtp.SMTPBadSender here. -        return originAddress +        return origin -class EncryptedMessage(): +# +# EncryptedMessage +# + +class EncryptedMessage(object):      """      Receive plaintext from client, encrypt it and send message to a      recipient.      """      implements(smtp.IMessage) -    SMTP_HOSTNAME = "mail.leap.se" -    SMTP_PORT = 25 - -    def __init__(self, user, soledad,  gpg=None): -        self.user = user -        self._soledad = soledad -        self.fetchConfig() +    def __init__(self, user, keymanager, config): +        """ +        Initialize the encrypted message. + +        @param user: The address to validate. +        @type: twisted.mail.smtp.User +        @param keymanager: A KeyManager for retrieving recipient's keys. +        @type keymanager: leap.common.keymanager.KeyManager +        @param config: A dictionary with smtp configuration. Should have +            the following structure: +                { +                    HOST_KEY: '<str>', +                    PORT_KEY: <int>, +                    USERNAME_KEY: '<str>', +                    PASSWORD_KEY: '<str>', +                    ENCRYPTED_ONLY_KEY: <bool>, +                } +        @type config: dict +        """ +        # assert params +        leap_assert_type(user, smtp.User) +        leap_assert_type(keymanager, KeyManager) +        assert_config_structure(config) +        # and store them +        self._user = user +        self._km = keymanager +        self._config = config +        # initialize list for message's lines          self.lines = [] -        if gpg: -            self._gpg = gpg -        else: -            self._gpg = GPGWrapper()      def lineReceived(self, line): -        """Store email DATA lines as they arrive.""" +        """ +        Handle another line. + +        @param line: The received line. +        @type line: str +        """          self.lines.append(line)      def eomReceived(self): -        """Encrypt and send message.""" +        """ +        Handle end of message. + +        This method will encrypt and send the message. +        """          log.msg("Message data complete.")          self.lines.append('')  # add a trailing newline          self.parseMessage()          try: -            self.encrypt() +            self._encrypt()              return self.sendMessage() -        except LookupError: +        except KeyNotFound:              return None      def parseMessage(self): -        """Separate message headers from body.""" -        sep = self.lines.index('') -        self.headers = self.lines[:sep] -        self.body = self.lines[sep + 1:] +        """ +        Separate message headers from body. +        """ +        parser = Parser() +        self._message = parser.parsestr('\r\n'.join(self.lines))      def connectionLost(self): +        """ +        Log an error when the connection is lost. +        """          log.msg("Connection lost unexpectedly!")          log.err()          # unexpected loss of connection; don't save          self.lines = []      def sendSuccess(self, r): +        """ +        Callback for a successful send. + +        @param r: The result from the last previous callback in the chain. +        @type r: anything +        """          log.msg(r)      def sendError(self, e): +        """ +        Callback for an unsuccessfull send. + +        @param e: The result from the last errback. +        @type e: anything +        """          log.msg(e)          log.err()      def prepareHeader(self): -        self.headers.insert(1, "From: %s" % self.user.orig.addrstr) -        self.headers.insert(2, "To: %s" % self.user.dest.addrstr) -        self.headers.append('') +        """ +        Prepare the headers of the message. +        """ +        self._message.replace_header('From', '<%s>' % self._user.orig.addrstr)      def sendMessage(self): +        """ +        Send the message. + +        This method will prepare the message (headers and possibly encrypted +        body) and send it using the ESMTPSenderFactory. + +        @return: A deferred with callbacks for error and success of this +            message send. +        @rtype: twisted.internet.defer.Deferred +        """          self.prepareHeader() -        msg = '\n'.join(self.headers + [self.cyphertext]) +        msg = self._message.as_string(False)          d = defer.Deferred() -        factory = smtp.ESMTPSenderFactory(self.smtp_username, -                                          self.smtp_password, -                                          self.smtp_username, -                                          self.user.dest.addrstr, -                                          StringIO(msg), -                                          d) -        # the next call is TSL-powered! -        reactor.connectTCP(self.SMTP_HOSTNAME, self.SMTP_PORT, factory) +        factory = smtp.ESMTPSenderFactory( +            self._config[USERNAME_KEY], +            self._config[PASSWORD_KEY], +            self._fromAddress.addrstr, +            self._user.dest.addrstr, +            StringIO(msg), +            d, +            requireAuthentication=False,  # for now do unauth, see issue #2474 +        ) +        # TODO: Change this to connectSSL when cert auth is in place in the platform +        reactor.connectTCP( +            self._config[HOST_KEY], +            self._config[PORT_KEY], +            factory +        )          d.addCallback(self.sendSuccess)          d.addErrback(self.sendError)          return d -    def encrypt(self, always_trust=True): -        # TODO: do not "always trust" here. -        try: -            fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint'] -            log.msg("Encrypting to %s" % fp) -            self.cyphertext = str( -                self._gpg.encrypt( -                    '\n'.join(self.body), [fp], always_trust=always_trust)) -        except LookupError: -            if self.encrypted_only: -                raise -            log.msg("Warning: sending unencrypted mail (because " -                    "'encrypted_only' is set to False).") +    def _encrypt_payload_rec(self, message, pubkey): +        """ +        Recursivelly descend in C{message}'s payload and encrypt to C{pubkey}. -    # this will be replaced by some other mechanism of obtaining credentials -    # for SMTP server. -    def fetchConfig(self): -        # TODO: Soledad/LEAP bootstrap should store the SMTP info on local db, -        # so this relay can load it when it needs. -        if not self._soledad: -            # TODO: uncomment below exception when integration with Soledad is -            # smooth. -            #raise SMTPInfoNotAvailable() -            # TODO: remove dummy settings below when soledad bootstrap is -            # working. -            self.smtp_host = '' -            self.smtp_port = '' -            self.smtp_username = '' -            self.smtp_password = '' -            self.encrypted_only = True +        @param message: The message whose payload we want to encrypt. +        @type message: email.message.Message +        @param pubkey: The public key used to encrypt the message. +        @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey +        """ +        if message.is_multipart() is False: +            message.set_payload(encrypt_asym(message.get_payload(), pubkey))          else: -            self.smtp_config = self._soledad.get_doc('smtp_relay_config') -            for confname in [  -                'smtp_host', 'smtp_port', 'smtp_username', -                'smtp_password', 'encrypted_only', -            ]: -                setattr(self, confname, doc.content[confname]) - - -class GPGWrapper(): -    """ -    This is a temporary class for handling GPG requests, and should be -    replaced by a more general class used throughout the project. -    """ - -    GNUPG_HOME = "~/.config/leap/gnupg" -    GNUPG_BINARY = "/usr/bin/gpg"  # TODO: change this based on OS - -    def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY): -        self.gpg = gnupg.GPG(gnupghome=gpghome, gpgbinary=gpgbinary) +            for msg in message.get_payload(): +                self._encrypt_payload_rec(msg, pubkey) -    def find_key(self, email): +    def _encrypt(self):          """ -        Find user's key based on their email. -        """ -        for key in self.gpg.list_keys(): -            for uid in key['uids']: -                if re.search(email, uid): -                    return key -        raise LookupError("GnuPG public key for %s not found!" % email) - -    def encrypt(self, data, recipient, always_trust=True): -        # TODO: do not 'always_trust'. -        return self.gpg.encrypt(data, recipient, always_trust=always_trust) - -    def decrypt(self, data): -        return self.gpg.decrypt(data) - -    def import_keys(self, data): -        return self.gpg.import_keys(data) - - -# service configuration -port = 25 -user_email = 'user@leap.se' # TODO: replace for real mail from gui/cli +        Encrypt the message body. -# instantiate soledad for client app storage and sync -s = soledad.Soledad(user_email) -factory = SMTPFactory(s) +        This method fetches the recipient key and encrypts the content to the +        recipient. If a key is not found, then the behaviour depends on the +        configuration parameter ENCRYPTED_ONLY_KEY. If it is False, the message +        is sent unencrypted and a warning is logged. If it is True, the +        encryption fails with a KeyNotFound exception. -# enable the use of this service with twistd -application = service.Application("LEAP SMTP Relay") -service = internet.TCPServer(port, factory) -service.setServiceParent(application) +        @raise KeyNotFound: Raised when the recipient key was not found and +            the ENCRYPTED_ONLY_KEY configuration parameter is set to True. +        """ +        try: +            address = strip_and_validate_address(self._user.dest.addrstr) +            pubkey = self._km.get_key(address, OpenPGPKey) +            log.msg("Encrypting to %s" % pubkey.fingerprint) +            self._encrypt_payload_rec(self._message, pubkey) +        except KeyNotFound: +            if self._config[ENCRYPTED_ONLY_KEY]: +                raise +            log.msg("Warning: sending unencrypted mail (because " +                    "'encrypted_only' is set to False).") diff --git a/mail/src/leap/mail/smtp/tests/test_smtprelay.py b/mail/src/leap/mail/smtp/tests/test_smtprelay.py deleted file mode 100644 index eaa4d04..0000000 --- a/mail/src/leap/mail/smtp/tests/test_smtprelay.py +++ /dev/null @@ -1,78 +0,0 @@ -from datetime import datetime -import re -from leap.email.smtp.smtprelay import ( -    SMTPFactory, -    #SMTPDelivery, # an object -    EncryptedMessage, -) -from leap.email.smtp import tests -from twisted.test import proto_helpers -from twisted.mail.smtp import User - - -# some regexps -IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ -    "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" -HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ -    "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" -IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' - - -class TestSmtpRelay(tests.OpenPGPTestCase): - -    EMAIL_DATA = ['HELO relay.leap.se', -                  'MAIL FROM: <user@leap.se>', -                  'RCPT TO: <leap@leap.se>', -                  'DATA', -                  'From: User <user@leap.se>', -                  'To: Leap <leap@leap.se>', -                  'Date: ' + datetime.now().strftime('%c'), -                  'Subject: test message', -                  '', -                  'This is a secret message.', -                  'Yours,', -                  'A.', -                  '', -                  '.', -                  'QUIT'] - -    def assertMatch(self, string, pattern, msg=None): -        if not re.match(pattern, string): -            msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' -                                           % (string, pattern)) -            raise self.failureException(msg) - -    def test_relay_accepts_valid_email(self): -        """ -        Test if SMTP server responds correctly for valid interaction. -        """ - -        SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + -                        ' NO UCE NO UBE NO RELAY PROBES', -                        '250 ' + IP_OR_HOST_REGEX + ' Hello ' + -                        IP_OR_HOST_REGEX + ', nice to meet you', -                        '250 Sender address accepted', -                        '250 Recipient address accepted', -                        '354 Continue'] -        proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0)) -        transport = proto_helpers.StringTransport() -        proto.makeConnection(transport) -        for i, line in enumerate(self.EMAIL_DATA): -            proto.lineReceived(line + '\r\n') -            self.assertMatch(transport.value(), -                             '\r\n'.join(SMTP_ANSWERS[0:i + 1])) -        proto.setTimeout(None) - -    def test_message_encrypt(self): -        """ -        Test if message gets encrypted to destination email. -        """ -        proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0)) -        user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') -        m = EncryptedMessage(user, None, self._gpg) -        for line in self.EMAIL_DATA[4:12]: -            m.lineReceived(line) -        m.parseMessage() -        m.encrypt() -        decrypted = str(self._gpg.decrypt(m.cyphertext)) -        self.assertEqual('\n'.join(self.EMAIL_DATA[9:12]), decrypted) diff --git a/mail/src/leap/mail/smtp/tests/185CA770.key b/mail/src/leap/mail/tests/smtp/185CA770.key index 587b416..587b416 100644 --- a/mail/src/leap/mail/smtp/tests/185CA770.key +++ b/mail/src/leap/mail/tests/smtp/185CA770.key diff --git a/mail/src/leap/mail/smtp/tests/185CA770.pub b/mail/src/leap/mail/tests/smtp/185CA770.pub index 38af19f..38af19f 100644 --- a/mail/src/leap/mail/smtp/tests/185CA770.pub +++ b/mail/src/leap/mail/tests/smtp/185CA770.pub diff --git a/mail/src/leap/mail/smtp/tests/__init__.py b/mail/src/leap/mail/tests/smtp/__init__.py index d7b942a..113e047 100644 --- a/mail/src/leap/mail/smtp/tests/__init__.py +++ b/mail/src/leap/mail/tests/smtp/__init__.py @@ -1,17 +1,49 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Base classes and keys for SMTP relay tests. +""" +  import os  import shutil  import tempfile +from mock import Mock +  from twisted.trial import unittest + +from leap.soledad import Soledad +from leap.soledad.crypto import SoledadCrypto +from leap.common.keymanager import ( +    KeyManager, +    openpgp, +) + +  from leap.common.testing.basetest import BaseLeapTest -from leap.mail.smtp.smtprelay import GPGWrapper -class OpenPGPTestCase(unittest.TestCase, BaseLeapTest): +class TestCaseWithKeyManager(BaseLeapTest):      def setUp(self): -        # mimic LeapBaseTest.setUpClass behaviour, because this is deprecated +        # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated          # in Twisted: http://twistedmatrix.com/trac/ticket/1870          self.old_path = os.environ['PATH']          self.old_home = os.environ['HOME'] @@ -22,18 +54,46 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):              'bin')          os.environ["PATH"] = bin_tdir          os.environ["HOME"] = self.tempdir +          # setup our own stuff -        self.gnupg_home = self.tempdir + '/gnupg' -        os.mkdir(self.gnupg_home) -        self.email = 'leap@leap.se' -        self._gpg = GPGWrapper(gpghome=self.gnupg_home) - -        self.assertEqual(self._gpg.import_keys(PUBLIC_KEY).summary(), -                         '1 imported', "error importing public key") -        self.assertEqual(self._gpg.import_keys(PRIVATE_KEY).summary(), -                         # note that gnupg does not return a successful import -                         # for private keys. Bug? -                         '0 imported', "error importing private key") +        address = 'leap@leap.se'  # user's address in the form user@provider +        uuid = 'leap@leap.se' +        passphrase = '123' +        secret_path = os.path.join(self.tempdir, 'secret.gpg') +        local_db_path = os.path.join(self.tempdir, 'soledad.u1db') +        server_url = 'http://provider/' +        cert_file = '' + +        # mock key fetching and storing so Soledad doesn't fail when trying to +        # reach the server. +        Soledad._fetch_keys_from_shared_db = Mock(return_value=None) +        Soledad._assert_keys_in_shared_db = Mock(return_value=None) + +        # instantiate soledad +        self._soledad = Soledad( +            uuid, +            passphrase, +            secret_path, +            local_db_path, +            server_url, +            cert_file, +        ) + +        self._config = { +            'host': 'http://provider/', +            'port': 25, +            'username': address, +            'password': '<password>', +            'encrypted_only': True +        } + +        nickserver_url = ''  # the url of the nickserver +        self._km = KeyManager(address, nickserver_url, self._soledad) + +        # insert test keys in key manager. +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY_2)      def tearDown(self):          # mimic LeapBaseTest.tearDownClass behaviour @@ -43,16 +103,6 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):          assert self.tempdir.startswith('/tmp/leap_tests-')          shutil.rmtree(self.tempdir) -    def test_openpgp_encrypt_decrypt(self): -        "Test if openpgp can encrypt and decrypt." -        text = "simple raw text" -        encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT, -                                          # TODO: handle always trust issue -                                          always_trust=True)) -        self.assertNotEqual(text, encrypted, "failed encrypting text") -        decrypted = str(self._gpg.decrypt(encrypted)) -        self.assertEqual(text, decrypted, "failed decrypting text") -  # Key material for testing  KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" diff --git a/mail/src/leap/mail/smtp/tests/mail.txt b/mail/src/leap/mail/tests/smtp/mail.txt index 9542047..9542047 100644 --- a/mail/src/leap/mail/smtp/tests/mail.txt +++ b/mail/src/leap/mail/tests/smtp/mail.txt diff --git a/mail/src/leap/mail/tests/smtp/test_smtprelay.py b/mail/src/leap/mail/tests/smtp/test_smtprelay.py new file mode 100644 index 0000000..6ef4e85 --- /dev/null +++ b/mail/src/leap/mail/tests/smtp/test_smtprelay.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# test_smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay tests. +""" + + +import re + + +from datetime import datetime +from twisted.test import proto_helpers +from twisted.mail.smtp import ( +    User, +    SMTPBadRcpt, +) +from mock import Mock + + +from leap.mail.smtp.smtprelay import ( +    SMTPFactory, +    EncryptedMessage, +) +from leap.mail.tests.smtp import TestCaseWithKeyManager +from leap.common.keymanager import openpgp + + +# some regexps +IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ +    "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" +HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ +    "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" +IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' + + +class TestSmtpRelay(TestCaseWithKeyManager): + +    EMAIL_DATA = ['HELO relay.leap.se', +                  'MAIL FROM: <user@leap.se>', +                  'RCPT TO: <leap@leap.se>', +                  'DATA', +                  'From: User <user@leap.se>', +                  'To: Leap <leap@leap.se>', +                  'Date: ' + datetime.now().strftime('%c'), +                  'Subject: test message', +                  '', +                  'This is a secret message.', +                  'Yours,', +                  'A.', +                  '', +                  '.', +                  'QUIT'] + +    def assertMatch(self, string, pattern, msg=None): +        if not re.match(pattern, string): +            msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' +                                           % (string, pattern)) +            raise self.failureException(msg) + +    def test_openpgp_encrypt_decrypt(self): +        "Test if openpgp can encrypt and decrypt." +        text = "simple raw text" +        pubkey = self._km.get_key( +            'leap@leap.se', openpgp.OpenPGPKey, private=False) +        encrypted = openpgp.encrypt_asym(text, pubkey) +        self.assertNotEqual(text, encrypted, "failed encrypting text") +        privkey = self._km.get_key( +            'leap@leap.se', openpgp.OpenPGPKey, private=True) +        decrypted = openpgp.decrypt_asym(encrypted, privkey) +        self.assertEqual(text, decrypted, "failed decrypting text") + +    def test_relay_accepts_valid_email(self): +        """ +        Test if SMTP server responds correctly for valid interaction. +        """ + +        SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + +                        ' NO UCE NO UBE NO RELAY PROBES', +                        '250 ' + IP_OR_HOST_REGEX + ' Hello ' + +                        IP_OR_HOST_REGEX + ', nice to meet you', +                        '250 Sender address accepted', +                        '250 Recipient address accepted', +                        '354 Continue'] +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        transport = proto_helpers.StringTransport() +        proto.makeConnection(transport) +        for i, line in enumerate(self.EMAIL_DATA): +            proto.lineReceived(line + '\r\n') +            self.assertMatch(transport.value(), +                             '\r\n'.join(SMTP_ANSWERS[0:i + 1])) +        proto.setTimeout(None) + +    def test_message_encrypt(self): +        """ +        Test if message gets encrypted to destination email. +        """ +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') +        m = EncryptedMessage(user, self._km, self._config) +        for line in self.EMAIL_DATA[4:12]: +            m.lineReceived(line) +        m.eomReceived() +        privkey = self._km.get_key( +            'leap@leap.se', openpgp.OpenPGPKey, private=True) +        decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey) +        self.assertEqual( +            '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', +            decrypted) + +    def test_missing_key_rejects_address(self): +        """ +        Test if server rejects to send unencrypted when 'encrypted_only' is +        True. +        """ +        # remove key from key manager +        pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey) +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.delete_key(pubkey) +        # mock the key fetching +        self._km.fetch_keys_from_server = Mock(return_value=[]) +        # prepare the SMTP factory +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        transport = proto_helpers.StringTransport() +        proto.makeConnection(transport) +        proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') +        # ensure the address was rejected +        lines = transport.value().rstrip().split('\n') +        self.assertEqual( +            '550 Cannot receive for specified address', +            lines[-1]) + +    def test_missing_key_accepts_address(self): +        """ +        Test if server accepts to send unencrypted when 'encrypted_only' is +        False. +        """ +        # remove key from key manager +        pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey) +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.delete_key(pubkey) +        # mock the key fetching +        self._km.fetch_keys_from_server = Mock(return_value=[]) +        # change the configuration +        self._config['encrypted_only'] = False +        # prepare the SMTP factory +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        transport = proto_helpers.StringTransport() +        proto.makeConnection(transport) +        proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') +        # ensure the address was rejected +        lines = transport.value().rstrip().split('\n') +        self.assertEqual( +            '250 Recipient address accepted', +            lines[-1]) + +    def test_malformed_address_rejects(self): +        """ +        Test if server rejects to send to malformed addresses. +        """ +        # mock the key fetching +        self._km.fetch_keys_from_server = Mock(return_value=[]) +        # prepare the SMTP factory +        for malformed in ['leap@']: +            proto = SMTPFactory( +                self._km, self._config).buildProtocol(('127.0.0.1', 0)) +            transport = proto_helpers.StringTransport() +            proto.makeConnection(transport) +            proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') +            proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') +            proto.lineReceived('RCPT TO: <%s>%s' % (malformed, '\r\n')) +            # ensure the address was rejected +            lines = transport.value().rstrip().split('\n') +            self.assertEqual( +                '550 Cannot receive for specified address', +                lines[-1]) + +    def test_prepare_header_adds_from(self): +        """ +        Test if message headers are OK. +        """ +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') +        m = EncryptedMessage(user, self._km, self._config) +        for line in self.EMAIL_DATA[4:12]: +            m.lineReceived(line) +        m.eomReceived() +        self.assertEqual('<leap@leap.se>', m._message['From']) | 
