diff options
Diffstat (limited to 'mail/src')
| -rw-r--r-- | mail/src/leap/mail/smtp/__init__.py | 78 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/smtprelay.py | 533 | ||||
| -rw-r--r-- | mail/src/leap/mail/smtp/tests/test_smtprelay.py | 78 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/185CA770.key (renamed from mail/src/leap/mail/smtp/tests/185CA770.key) | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/185CA770.pub (renamed from mail/src/leap/mail/smtp/tests/185CA770.pub) | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/__init__.py (renamed from mail/src/leap/mail/smtp/tests/__init__.py) | 163 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/mail.txt (renamed from mail/src/leap/mail/smtp/tests/mail.txt) | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/tests/smtp/test_smtprelay.py | 248 | 
8 files changed, 856 insertions, 244 deletions
| diff --git a/mail/src/leap/mail/smtp/__init__.py b/mail/src/leap/mail/smtp/__init__.py index e69de29..ace79b5 100644 --- a/mail/src/leap/mail/smtp/__init__.py +++ b/mail/src/leap/mail/smtp/__init__.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay helper function. +""" + + +from twisted.application import internet, service +from twisted.internet import reactor + + +from leap import soledad +from leap.common.keymanager import KeyManager +from leap.mail.smtp.smtprelay import SMTPFactory + + +def setup_smtp_relay(port, keymanager, smtp_host, smtp_port, smtp_username, +                     smtp_password, encrypted_only): +    """ +    Setup SMTP relay to run with Twisted. + +    This function sets up the SMTP relay configuration and the Twisted +    reactor. + +    @param port: The port in which to run the server. +    @type port: int +    @param keymanager: A Key Manager from where to get recipients' public +        keys. +    @type keymanager: leap.common.keymanager.KeyManager +    @param smtp_host: The hostname of the remote SMTP server. +    @type smtp_host: str +    @param smtp_port:  The port of the remote SMTP server. +    @type smtp_port: int +    @param smtp_username: The username used to connect to remote SMTP server. +    @type smtp_username: str +    @param smtp_password: The password used to connect to remote SMTP server. +    @type smtp_password: str +    @param encrypted_only: Whether the SMTP relay should send unencrypted mail +        or not. +    @type encrypted_only: bool +    """ +    # The configuration for the SMTP relay is a dict with the following +    # format: +    # +    # { +    #     'host': '<host>', +    #     'port': <int>, +    #     'username': '<username>', +    #     'password': '<password>', +    #     'encrypted_only': <True/False> +    # } +    config = { +        'host': smtp_host, +        'port': smtp_port, +        'username': smtp_username, +        'password': smtp_password, +        'encrypted_only': encrypted_only +    } + +    # configure the use of this service with twistd +    factory = SMTPFactory(keymanager, config) +    reactor.listenTCP(port, factory) diff --git a/mail/src/leap/mail/smtp/smtprelay.py b/mail/src/leap/mail/smtp/smtprelay.py index 6479873..d87dc87 100644 --- a/mail/src/leap/mail/smtp/smtprelay.py +++ b/mail/src/leap/mail/smtp/smtprelay.py @@ -1,42 +1,182 @@ +# -*- coding: utf-8 -*- +# smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +  """  LEAP SMTP encrypted relay.  """  import re +import os  import gnupg +import tempfile + +  from zope.interface import implements  from StringIO import StringIO  from twisted.mail import smtp  from twisted.internet.protocol import ServerFactory  from twisted.internet import reactor  from twisted.internet import defer -from twisted.application import internet, service  from twisted.python import log  from email.Header import Header -from leap import soledad +from email.utils import parseaddr +from email.parser import Parser + + +from leap.common.check import leap_assert, leap_assert_type +from leap.common.keymanager import KeyManager +from leap.common.keymanager.openpgp import ( +    OpenPGPKey, +    encrypt_asym, +    sign, +) +from leap.common.keymanager.errors import KeyNotFound + +# +# Exceptions +# -class SMTPInfoNotAvailable(Exception): +class MalformedConfig(Exception): +    """ +    Raised when the configuration dictionary passed as parameter is malformed. +    """      pass +# +# Helper utilities +# + +HOST_KEY = 'host' +PORT_KEY = 'port' +USERNAME_KEY = 'username' +PASSWORD_KEY = 'password' +ENCRYPTED_ONLY_KEY = 'encrypted_only' + + +def assert_config_structure(config): +    """ +    Assert that C{config} is a dict with the following structure: + +        { +            HOST_KEY: '<str>', +            PORT_KEY: <int>, +            USERNAME_KEY: '<str>', +            PASSWORD_KEY: '<str>', +            ENCRYPTED_ONLY_KEY: <bool>, +        } + +    @param config: The dictionary to check. +    @type config: dict +    """ +    # assert smtp config structure is valid +    leap_assert_type(config, dict) +    leap_assert(HOST_KEY in config) +    leap_assert_type(config[HOST_KEY], str) +    leap_assert(PORT_KEY in config) +    leap_assert_type(config[PORT_KEY], int) +    leap_assert(USERNAME_KEY in config) +    leap_assert_type(config[USERNAME_KEY], str) +    leap_assert(PASSWORD_KEY in config) +    leap_assert_type(config[PASSWORD_KEY], str) +    leap_assert(ENCRYPTED_ONLY_KEY in config) +    leap_assert_type(config[ENCRYPTED_ONLY_KEY], bool) +    # assert received params are not empty +    leap_assert(config[HOST_KEY] != '') +    leap_assert(config[PORT_KEY] is not 0) +    leap_assert(config[USERNAME_KEY] != '') +    leap_assert(config[PASSWORD_KEY] != '') + + +def validate_address(address): +    """ +    Validate C{address} as defined in RFC 2822. + +    @param address: The address to be validated. +    @type address: str + +    @return: A valid address. +    @rtype: str + +    @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid. +    """ +    leap_assert_type(address, str) +    # the following parses the address as described in RFC 2822 and +    # returns ('', '') if the parse fails. +    _, address = parseaddr(address) +    if address == '': +        raise smtp.SMTPBadRcpt(address) +    return address + + +# +# SMTPFactory +# +  class SMTPFactory(ServerFactory):      """      Factory for an SMTP server with encrypted relaying capabilities.      """ -    def __init__(self, soledad, gpg=None): -        self._soledad = soledad -        self._gpg = gpg +    def __init__(self, keymanager, config): +        """ +        Initialize the SMTP factory. + +        @param keymanager: A KeyManager for retrieving recipient's keys. +        @type keymanager: leap.common.keymanager.KeyManager +        @param config: A dictionary with smtp configuration. Should have +            the following structure: +                { +                    HOST_KEY: '<str>', +                    PORT_KEY: <int>, +                    USERNAME_KEY: '<str>', +                    PASSWORD_KEY: '<str>', +                    ENCRYPTED_ONLY_KEY: <bool>, +                } +        @type config: dict +        """ +        # assert params +        leap_assert_type(keymanager, KeyManager) +        assert_config_structure(config) +        # and store them +        self._km = keymanager +        self._config = config      def buildProtocol(self, addr): -        "Return a protocol suitable for the job." -        # TODO: use ESMTP here. -        smtpProtocol = smtp.SMTP(SMTPDelivery(self._soledad, self._gpg)) +        """ +        Return a protocol suitable for the job. + +        @param addr: An address, e.g. a TCP (host, port). +        @type addr:  twisted.internet.interfaces.IAddress + +        @return: The protocol. +        @rtype: SMTPDelivery +        """ +        # If needed, we might use ESMTPDelivery here instead. +        smtpProtocol = smtp.SMTP(SMTPDelivery(self._km, self._config))          smtpProtocol.factory = self          return smtpProtocol +# +# SMTPDelivery +# +  class SMTPDelivery(object):      """      Validate email addresses and handle message delivery. @@ -44,14 +184,47 @@ class SMTPDelivery(object):      implements(smtp.IMessageDelivery) -    def __init__(self, soledad, gpg=None): -        self._soledad = soledad -        if gpg: -            self._gpg = gpg -        else: -            self._gpg = GPGWrapper() +    def __init__(self, keymanager, config): +        """ +        Initialize the SMTP delivery object. + +        @param keymanager: A KeyManager for retrieving recipient's keys. +        @type keymanager: leap.common.keymanager.KeyManager +        @param config: A dictionary with smtp configuration. Should have +            the following structure: +                { +                    HOST_KEY: '<str>', +                    PORT_KEY: <int>, +                    USERNAME_KEY: '<str>', +                    PASSWORD_KEY: '<str>', +                    ENCRYPTED_ONLY_KEY: <bool>, +                } +        @type config: dict +        """ +        # assert params +        leap_assert_type(keymanager, KeyManager) +        assert_config_structure(config) +        # and store them +        self._km = keymanager +        self._config = config +        self._origin = None      def receivedHeader(self, helo, origin, recipients): +        """ +        Generate the 'Received:' header for a message. + +        @param helo: The argument to the HELO command and the client's IP +            address. +        @type helo: (str, str) +        @param origin: The address the message is from. +        @type origin: twisted.mail.smtp.Address +        @param recipients: A list of the addresses for which this message is +            bound. +        @type: list of twisted.mail.smtp.User + +        @return: The full "Received" header string. +        @type: str +        """          myHostname, clientIP = helo          headerValue = "by %s from %s with ESMTP ; %s" % (              myHostname, clientIP, smtp.rfc822date()) @@ -59,188 +232,264 @@ class SMTPDelivery(object):          return "Received: %s" % Header(headerValue)      def validateTo(self, user): -        """Assert existence of and trust on recipient's GPG public key.""" +        """ +        Validate the address of C{user}, a recipient of the message. + +        This method is called once for each recipient and validates the +        C{user}'s address against the RFC 2822 definition. If the +        configuration option ENCRYPTED_ONLY_KEY is True, it also asserts the +        existence of the user's key. + +        In the end, it returns an encrypted message object that is able to +        send itself to the C{user}'s address. + +        @param user: The user whose address we wish to validate. +        @type: twisted.mail.smtp.User + +        @return: A Deferred which becomes, or a callable which takes no +            arguments and returns an object implementing IMessage. This will +            be called and the returned object used to deliver the message when +            it arrives. +        @rtype: no-argument callable + +        @raise SMTPBadRcpt: Raised if messages to the address are not to be +            accepted. +        """          # try to find recipient's public key          try: -            # this will raise an exception if key is not found -            trust = self._gpg.find_key(user.dest.addrstr)['trust'] -            # if key is not ultimatelly trusted, then the message will not -            # be encrypted. So, we check for this below -            #if trust != 'u': -            #    raise smtp.SMTPBadRcpt(user) +            address = validate_address(user.dest.addrstr) +            pubkey = self._km.get_key(address, OpenPGPKey)              log.msg("Accepting mail for %s..." % user.dest) -            return lambda: EncryptedMessage(user, soledad=self._soledad, -                                            gpg=self._gpg) -        except LookupError: +        except KeyNotFound:              # if key was not found, check config to see if will send anyway. -            if self.encrypted_only: -                raise smtp.SMTPBadRcpt(user) -            # TODO: send signal to cli/gui that this user's key was not found? +            if self._config[ENCRYPTED_ONLY_KEY]: +                raise smtp.SMTPBadRcpt(user.dest.addrstr)              log.msg("Warning: will send an unencrypted message (because "                      "encrypted_only' is set to False).") +        return lambda: EncryptedMessage( +            self._origin, user, self._km, self._config) + +    def validateFrom(self, helo, origin): +        """ +        Validate the address from which the message originates. + +        @param helo: The argument to the HELO command and the client's IP +            address. +        @type: (str, str) +        @param origin: The address the message is from. +        @type origin: twisted.mail.smtp.Address + +        @return: origin or a Deferred whose callback will be passed origin. +        @rtype: Deferred or Address -    def validateFrom(self, helo, originAddress): +        @raise twisted.mail.smtp.SMTPBadSender: Raised if messages from this +            address are not to be accepted. +        """          # accept mail from anywhere. To reject an address, raise          # smtp.SMTPBadSender here. -        return originAddress +        self._origin = origin +        return origin + +# +# EncryptedMessage +# -class EncryptedMessage(): +class EncryptedMessage(object):      """      Receive plaintext from client, encrypt it and send message to a      recipient.      """      implements(smtp.IMessage) -    SMTP_HOSTNAME = "mail.leap.se" -    SMTP_PORT = 25 - -    def __init__(self, user, soledad,  gpg=None): -        self.user = user -        self._soledad = soledad -        self.fetchConfig() +    def __init__(self, fromAddress, user, keymanager, config): +        """ +        Initialize the encrypted message. + +        @param fromAddress: The address of the sender. +        @type fromAddress: twisted.mail.smtp.Address +        @param user: The recipient of this message. +        @type user: twisted.mail.smtp.User +        @param keymanager: A KeyManager for retrieving recipient's keys. +        @type keymanager: leap.common.keymanager.KeyManager +        @param config: A dictionary with smtp configuration. Should have +            the following structure: +                { +                    HOST_KEY: '<str>', +                    PORT_KEY: <int>, +                    USERNAME_KEY: '<str>', +                    PASSWORD_KEY: '<str>', +                    ENCRYPTED_ONLY_KEY: <bool>, +                } +        @type config: dict +        """ +        # assert params +        leap_assert_type(user, smtp.User) +        leap_assert_type(keymanager, KeyManager) +        assert_config_structure(config) +        # and store them +        self._fromAddress = fromAddress +        self._user = user +        self._km = keymanager +        self._config = config +        # initialize list for message's lines          self.lines = [] -        if gpg: -            self._gpg = gpg -        else: -            self._gpg = GPGWrapper()      def lineReceived(self, line): -        """Store email DATA lines as they arrive.""" +        """ +        Handle another line. + +        @param line: The received line. +        @type line: str +        """          self.lines.append(line)      def eomReceived(self): -        """Encrypt and send message.""" +        """ +        Handle end of message. + +        This method will encrypt and send the message. +        """          log.msg("Message data complete.")          self.lines.append('')  # add a trailing newline          self.parseMessage()          try: -            self.encrypt() +            self._encrypt_and_sign()              return self.sendMessage() -        except LookupError: +        except KeyNotFound:              return None      def parseMessage(self): -        """Separate message headers from body.""" -        sep = self.lines.index('') -        self.headers = self.lines[:sep] -        self.body = self.lines[sep + 1:] +        """ +        Separate message headers from body. +        """ +        parser = Parser() +        self._message = parser.parsestr('\r\n'.join(self.lines))      def connectionLost(self): +        """ +        Log an error when the connection is lost. +        """          log.msg("Connection lost unexpectedly!")          log.err()          # unexpected loss of connection; don't save          self.lines = []      def sendSuccess(self, r): +        """ +        Callback for a successful send. + +        @param r: The result from the last previous callback in the chain. +        @type r: anything +        """          log.msg(r)      def sendError(self, e): +        """ +        Callback for an unsuccessfull send. + +        @param e: The result from the last errback. +        @type e: anything +        """          log.msg(e)          log.err() -    def prepareHeader(self): -        self.headers.insert(1, "From: %s" % self.user.orig.addrstr) -        self.headers.insert(2, "To: %s" % self.user.dest.addrstr) -        self.headers.append('') -      def sendMessage(self): -        self.prepareHeader() -        msg = '\n'.join(self.headers + [self.cyphertext]) +        """ +        Send the message. + +        This method will prepare the message (headers and possibly encrypted +        body) and send it using the ESMTPSenderFactory. + +        @return: A deferred with callbacks for error and success of this +            message send. +        @rtype: twisted.internet.defer.Deferred +        """ +        msg = self._message.as_string(False)          d = defer.Deferred() -        factory = smtp.ESMTPSenderFactory(self.smtp_username, -                                          self.smtp_password, -                                          self.smtp_username, -                                          self.user.dest.addrstr, -                                          StringIO(msg), -                                          d) -        # the next call is TSL-powered! -        reactor.connectTCP(self.SMTP_HOSTNAME, self.SMTP_PORT, factory) +        factory = smtp.ESMTPSenderFactory( +            self._config[USERNAME_KEY], +            self._config[PASSWORD_KEY], +            self._fromAddress.addrstr, +            self._user.dest.addrstr, +            StringIO(msg), +            d, +            requireAuthentication=False,  # for now do unauth, see issue #2474 +        ) +        # TODO: Change this to connectSSL when cert auth is in place in the platform +        reactor.connectTCP( +            self._config[HOST_KEY], +            self._config[PORT_KEY], +            factory +        )          d.addCallback(self.sendSuccess)          d.addErrback(self.sendError)          return d -    def encrypt(self, always_trust=True): -        # TODO: do not "always trust" here. -        try: -            fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint'] -            log.msg("Encrypting to %s" % fp) -            self.cyphertext = str( -                self._gpg.encrypt( -                    '\n'.join(self.body), [fp], always_trust=always_trust)) -        except LookupError: -            if self.encrypted_only: -                raise -            log.msg("Warning: sending unencrypted mail (because " -                    "'encrypted_only' is set to False).") - -    # this will be replaced by some other mechanism of obtaining credentials -    # for SMTP server. -    def fetchConfig(self): -        # TODO: Soledad/LEAP bootstrap should store the SMTP info on local db, -        # so this relay can load it when it needs. -        if not self._soledad: -            # TODO: uncomment below exception when integration with Soledad is -            # smooth. -            #raise SMTPInfoNotAvailable() -            # TODO: remove dummy settings below when soledad bootstrap is -            # working. -            self.smtp_host = '' -            self.smtp_port = '' -            self.smtp_username = '' -            self.smtp_password = '' -            self.encrypted_only = True +    def _encrypt_and_sign_payload_rec(self, message, pubkey, signkey): +        """ +        Recursivelly descend in C{message}'s payload encrypting to C{pubkey} +        and signing with C{signkey}. + +        @param message: The message whose payload we want to encrypt. +        @type message: email.message.Message +        @param pubkey: The public key used to encrypt the message. +        @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey +        @param signkey: The private key used to sign the message. +        @type signkey: leap.common.keymanager.openpgp.OpenPGPKey +        """ +        if message.is_multipart() is False: +            message.set_payload( +                encrypt_asym( +                    message.get_payload(), pubkey, sign=signkey))          else: -            self.smtp_config = self._soledad.get_doc('smtp_relay_config') -            for confname in [  -                'smtp_host', 'smtp_port', 'smtp_username', -                'smtp_password', 'encrypted_only', -            ]: -                setattr(self, confname, doc.content[confname]) - - -class GPGWrapper(): -    """ -    This is a temporary class for handling GPG requests, and should be -    replaced by a more general class used throughout the project. -    """ - -    GNUPG_HOME = "~/.config/leap/gnupg" -    GNUPG_BINARY = "/usr/bin/gpg"  # TODO: change this based on OS - -    def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY): -        self.gpg = gnupg.GPG(gnupghome=gpghome, gpgbinary=gpgbinary) +            for msg in message.get_payload(): +                self._encrypt_and_sign_payload_rec(msg, pubkey, signkey) -    def find_key(self, email): +    def _sign_payload_rec(self, message, signkey):          """ -        Find user's key based on their email. +        Recursivelly descend in C{message}'s payload signing with C{signkey}. + +        @param message: The message whose payload we want to encrypt. +        @type message: email.message.Message +        @param pubkey: The public key used to encrypt the message. +        @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey +        @param signkey: The private key used to sign the message. +        @type signkey: leap.common.keymanager.openpgp.OpenPGPKey          """ -        for key in self.gpg.list_keys(): -            for uid in key['uids']: -                if re.search(email, uid): -                    return key -        raise LookupError("GnuPG public key for %s not found!" % email) - -    def encrypt(self, data, recipient, always_trust=True): -        # TODO: do not 'always_trust'. -        return self.gpg.encrypt(data, recipient, always_trust=always_trust) - -    def decrypt(self, data): -        return self.gpg.decrypt(data) - -    def import_keys(self, data): -        return self.gpg.import_keys(data) - +        if message.is_multipart() is False: +            message.set_payload( +                sign( +                    message.get_payload(), signkey)) +        else: +            for msg in message.get_payload(): +                self._sign_payload_rec(msg, signkey) -# service configuration -port = 25 -user_email = 'user@leap.se' # TODO: replace for real mail from gui/cli +    def _encrypt_and_sign(self): +        """ +        Encrypt the message body. -# instantiate soledad for client app storage and sync -s = soledad.Soledad(user_email) -factory = SMTPFactory(s) +        Fetch the recipient key and encrypt the content to the +        recipient. If a key is not found, then the behaviour depends on the +        configuration parameter ENCRYPTED_ONLY_KEY. If it is False, the message +        is sent unencrypted and a warning is logged. If it is True, the +        encryption fails with a KeyNotFound exception. -# enable the use of this service with twistd -application = service.Application("LEAP SMTP Relay") -service = internet.TCPServer(port, factory) -service.setServiceParent(application) +        @raise KeyNotFound: Raised when the recipient key was not found and +            the ENCRYPTED_ONLY_KEY configuration parameter is set to True. +        """ +        from_address = validate_address(self._fromAddress.addrstr) +        signkey = self._km.get_key(from_address, OpenPGPKey, private=True) +        log.msg("Will sign the message with %s." % signkey.fingerprint) +        to_address = validate_address(self._user.dest.addrstr) +        try: +            # try to get the recipient pubkey +            pubkey = self._km.get_key(to_address, OpenPGPKey) +            log.msg("Will encrypt the message to %s." % pubkey.fingerprint) +            self._encrypt_and_sign_payload_rec(self._message, pubkey, signkey) +        except KeyNotFound: +            # at this point we _can_ send unencrypted mail, because if the +            # configuration said the opposite the address would have been +            # rejected in SMTPDelivery.validateTo(). +            self._sign_payload_rec(self._message, signkey) +            log.msg('Will send unencrypted message to %s.' % to_address) diff --git a/mail/src/leap/mail/smtp/tests/test_smtprelay.py b/mail/src/leap/mail/smtp/tests/test_smtprelay.py deleted file mode 100644 index eaa4d04..0000000 --- a/mail/src/leap/mail/smtp/tests/test_smtprelay.py +++ /dev/null @@ -1,78 +0,0 @@ -from datetime import datetime -import re -from leap.email.smtp.smtprelay import ( -    SMTPFactory, -    #SMTPDelivery, # an object -    EncryptedMessage, -) -from leap.email.smtp import tests -from twisted.test import proto_helpers -from twisted.mail.smtp import User - - -# some regexps -IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ -    "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" -HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ -    "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" -IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' - - -class TestSmtpRelay(tests.OpenPGPTestCase): - -    EMAIL_DATA = ['HELO relay.leap.se', -                  'MAIL FROM: <user@leap.se>', -                  'RCPT TO: <leap@leap.se>', -                  'DATA', -                  'From: User <user@leap.se>', -                  'To: Leap <leap@leap.se>', -                  'Date: ' + datetime.now().strftime('%c'), -                  'Subject: test message', -                  '', -                  'This is a secret message.', -                  'Yours,', -                  'A.', -                  '', -                  '.', -                  'QUIT'] - -    def assertMatch(self, string, pattern, msg=None): -        if not re.match(pattern, string): -            msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' -                                           % (string, pattern)) -            raise self.failureException(msg) - -    def test_relay_accepts_valid_email(self): -        """ -        Test if SMTP server responds correctly for valid interaction. -        """ - -        SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + -                        ' NO UCE NO UBE NO RELAY PROBES', -                        '250 ' + IP_OR_HOST_REGEX + ' Hello ' + -                        IP_OR_HOST_REGEX + ', nice to meet you', -                        '250 Sender address accepted', -                        '250 Recipient address accepted', -                        '354 Continue'] -        proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0)) -        transport = proto_helpers.StringTransport() -        proto.makeConnection(transport) -        for i, line in enumerate(self.EMAIL_DATA): -            proto.lineReceived(line + '\r\n') -            self.assertMatch(transport.value(), -                             '\r\n'.join(SMTP_ANSWERS[0:i + 1])) -        proto.setTimeout(None) - -    def test_message_encrypt(self): -        """ -        Test if message gets encrypted to destination email. -        """ -        proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0)) -        user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') -        m = EncryptedMessage(user, None, self._gpg) -        for line in self.EMAIL_DATA[4:12]: -            m.lineReceived(line) -        m.parseMessage() -        m.encrypt() -        decrypted = str(self._gpg.decrypt(m.cyphertext)) -        self.assertEqual('\n'.join(self.EMAIL_DATA[9:12]), decrypted) diff --git a/mail/src/leap/mail/smtp/tests/185CA770.key b/mail/src/leap/mail/tests/smtp/185CA770.key index 587b416..587b416 100644 --- a/mail/src/leap/mail/smtp/tests/185CA770.key +++ b/mail/src/leap/mail/tests/smtp/185CA770.key diff --git a/mail/src/leap/mail/smtp/tests/185CA770.pub b/mail/src/leap/mail/tests/smtp/185CA770.pub index 38af19f..38af19f 100644 --- a/mail/src/leap/mail/smtp/tests/185CA770.pub +++ b/mail/src/leap/mail/tests/smtp/185CA770.pub diff --git a/mail/src/leap/mail/smtp/tests/__init__.py b/mail/src/leap/mail/tests/smtp/__init__.py index d7b942a..c69c34f 100644 --- a/mail/src/leap/mail/smtp/tests/__init__.py +++ b/mail/src/leap/mail/tests/smtp/__init__.py @@ -1,17 +1,49 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Base classes and keys for SMTP relay tests. +""" +  import os  import shutil  import tempfile +from mock import Mock +  from twisted.trial import unittest + +from leap.soledad import Soledad +from leap.soledad.crypto import SoledadCrypto +from leap.common.keymanager import ( +    KeyManager, +    openpgp, +) + +  from leap.common.testing.basetest import BaseLeapTest -from leap.mail.smtp.smtprelay import GPGWrapper -class OpenPGPTestCase(unittest.TestCase, BaseLeapTest): +class TestCaseWithKeyManager(BaseLeapTest):      def setUp(self): -        # mimic LeapBaseTest.setUpClass behaviour, because this is deprecated +        # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated          # in Twisted: http://twistedmatrix.com/trac/ticket/1870          self.old_path = os.environ['PATH']          self.old_home = os.environ['HOME'] @@ -22,18 +54,46 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):              'bin')          os.environ["PATH"] = bin_tdir          os.environ["HOME"] = self.tempdir +          # setup our own stuff -        self.gnupg_home = self.tempdir + '/gnupg' -        os.mkdir(self.gnupg_home) -        self.email = 'leap@leap.se' -        self._gpg = GPGWrapper(gpghome=self.gnupg_home) - -        self.assertEqual(self._gpg.import_keys(PUBLIC_KEY).summary(), -                         '1 imported', "error importing public key") -        self.assertEqual(self._gpg.import_keys(PRIVATE_KEY).summary(), -                         # note that gnupg does not return a successful import -                         # for private keys. Bug? -                         '0 imported', "error importing private key") +        address = 'leap@leap.se'  # user's address in the form user@provider +        uuid = 'leap@leap.se' +        passphrase = '123' +        secret_path = os.path.join(self.tempdir, 'secret.gpg') +        local_db_path = os.path.join(self.tempdir, 'soledad.u1db') +        server_url = 'http://provider/' +        cert_file = '' + +        # mock key fetching and storing so Soledad doesn't fail when trying to +        # reach the server. +        Soledad._fetch_keys_from_shared_db = Mock(return_value=None) +        Soledad._assert_keys_in_shared_db = Mock(return_value=None) + +        # instantiate soledad +        self._soledad = Soledad( +            uuid, +            passphrase, +            secret_path, +            local_db_path, +            server_url, +            cert_file, +        ) + +        self._config = { +            'host': 'http://provider/', +            'port': 25, +            'username': address, +            'password': '<password>', +            'encrypted_only': True +        } + +        nickserver_url = ''  # the url of the nickserver +        self._km = KeyManager(address, nickserver_url, self._soledad) + +        # insert test keys in key manager. +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY_2)      def tearDown(self):          # mimic LeapBaseTest.tearDownClass behaviour @@ -43,19 +103,12 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):          assert self.tempdir.startswith('/tmp/leap_tests-')          shutil.rmtree(self.tempdir) -    def test_openpgp_encrypt_decrypt(self): -        "Test if openpgp can encrypt and decrypt." -        text = "simple raw text" -        encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT, -                                          # TODO: handle always trust issue -                                          always_trust=True)) -        self.assertNotEqual(text, encrypted, "failed encrypting text") -        decrypted = str(self._gpg.decrypt(encrypted)) -        self.assertEqual(text, decrypted, "failed decrypting text") -  # Key material for testing  KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" + +ADDRESS = 'leap@leap.se' +  PUBLIC_KEY = """  -----BEGIN PGP PUBLIC KEY BLOCK-----  Version: GnuPG v1.4.10 (GNU/Linux) @@ -109,6 +162,7 @@ ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX  =MuOY  -----END PGP PUBLIC KEY BLOCK-----  """ +  PRIVATE_KEY = """  -----BEGIN PGP PRIVATE KEY BLOCK-----  Version: GnuPG v1.4.10 (GNU/Linux) @@ -216,3 +270,64 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=  =JTFu  -----END PGP PRIVATE KEY BLOCK-----  """ + +ADDRESS_2 = 'anotheruser@leap.se' + +PUBLIC_KEY_2 = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR +gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq +Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0 +IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle +AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E +gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw +ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4 +JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz +VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt +Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63 +yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ +f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X +Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck +I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ= +=Thdu +-----END PGP PUBLIC KEY BLOCK----- +""" + +PRIVATE_KEY_2 = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD +kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1 +6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB +AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8 +H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks +7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X +C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje +uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty +GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI +1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v +dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG +CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh +8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD +izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT +oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL +juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw +cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe +94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC +rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx +77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2 +3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF +UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO +2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB +/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE +JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda +z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk +o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6 +THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0 +=a5gs +-----END PGP PRIVATE KEY BLOCK----- +""" + diff --git a/mail/src/leap/mail/smtp/tests/mail.txt b/mail/src/leap/mail/tests/smtp/mail.txt index 9542047..9542047 100644 --- a/mail/src/leap/mail/smtp/tests/mail.txt +++ b/mail/src/leap/mail/tests/smtp/mail.txt diff --git a/mail/src/leap/mail/tests/smtp/test_smtprelay.py b/mail/src/leap/mail/tests/smtp/test_smtprelay.py new file mode 100644 index 0000000..e48f129 --- /dev/null +++ b/mail/src/leap/mail/tests/smtp/test_smtprelay.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +# test_smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay tests. +""" + + +import re + + +from datetime import datetime +from twisted.test import proto_helpers +from twisted.mail.smtp import ( +    User, +    Address, +    SMTPBadRcpt, +) +from mock import Mock + + +from leap.mail.smtp.smtprelay import ( +    SMTPFactory, +    EncryptedMessage, +) +from leap.mail.tests.smtp import ( +    TestCaseWithKeyManager, +    ADDRESS, +    ADDRESS_2, +) +from leap.common.keymanager import openpgp + + +# some regexps +IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ +    "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" +HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ +    "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" +IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' + + +class TestSmtpRelay(TestCaseWithKeyManager): + +    EMAIL_DATA = ['HELO relay.leap.se', +                  'MAIL FROM: <%s>' % ADDRESS_2, +                  'RCPT TO: <%s>' % ADDRESS, +                  'DATA', +                  'From: User <%s>' % ADDRESS_2, +                  'To: Leap <%s>' % ADDRESS, +                  'Date: ' + datetime.now().strftime('%c'), +                  'Subject: test message', +                  '', +                  'This is a secret message.', +                  'Yours,', +                  'A.', +                  '', +                  '.', +                  'QUIT'] + +    def assertMatch(self, string, pattern, msg=None): +        if not re.match(pattern, string): +            msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' +                                           % (string, pattern)) +            raise self.failureException(msg) + +    def test_openpgp_encrypt_decrypt(self): +        "Test if openpgp can encrypt and decrypt." +        text = "simple raw text" +        pubkey = self._km.get_key( +            ADDRESS, openpgp.OpenPGPKey, private=False) +        encrypted = openpgp.encrypt_asym(text, pubkey) +        self.assertNotEqual( +            text, encrypted, "Ciphertext is equal to plaintext.") +        privkey = self._km.get_key( +            ADDRESS, openpgp.OpenPGPKey, private=True) +        decrypted = openpgp.decrypt_asym(encrypted, privkey) +        self.assertEqual(text, decrypted, +            "Decrypted text differs from plaintext.") + +    def test_relay_accepts_valid_email(self): +        """ +        Test if SMTP server responds correctly for valid interaction. +        """ + +        SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + +                        ' NO UCE NO UBE NO RELAY PROBES', +                        '250 ' + IP_OR_HOST_REGEX + ' Hello ' + +                        IP_OR_HOST_REGEX + ', nice to meet you', +                        '250 Sender address accepted', +                        '250 Recipient address accepted', +                        '354 Continue'] +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        transport = proto_helpers.StringTransport() +        proto.makeConnection(transport) +        for i, line in enumerate(self.EMAIL_DATA): +            proto.lineReceived(line + '\r\n') +            self.assertMatch(transport.value(), +                             '\r\n'.join(SMTP_ANSWERS[0:i + 1]), +                             'Did not get expected answer from relay.') +        proto.setTimeout(None) + +    def test_message_encrypt(self): +        """ +        Test if message gets encrypted to destination email. +        """ +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        fromAddr = Address(ADDRESS_2) +        dest = User(ADDRESS, 'relay.leap.se', proto, ADDRESS) +        m = EncryptedMessage(fromAddr, dest, self._km, self._config) +        for line in self.EMAIL_DATA[4:12]: +            m.lineReceived(line) +        m.eomReceived() +        privkey = self._km.get_key( +            ADDRESS, openpgp.OpenPGPKey, private=True) +        decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey) +        self.assertEqual( +            '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', +            decrypted, +            'Decrypted text differs from plaintext.') + +    def test_message_encrypt_sign(self): +        """ +        Test if message gets encrypted to destination email and signed with +        sender key. +        """ +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        user = User(ADDRESS, 'relay.leap.se', proto, ADDRESS) +        fromAddr = Address(ADDRESS_2) +        m = EncryptedMessage(fromAddr, user, self._km, self._config) +        for line in self.EMAIL_DATA[4:12]: +            m.lineReceived(line) +        # trigger encryption and signing +        m.eomReceived() +        # decrypt and verify +        privkey = self._km.get_key( +            ADDRESS, openpgp.OpenPGPKey, private=True) +        pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) +        decrypted = openpgp.decrypt_asym( +            m._message.get_payload(), privkey, verify=pubkey) +        self.assertEqual( +            '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', +            decrypted, +            'Decrypted text differs from plaintext.') + +    def test_message_sign(self): +        """ +        Test if message is signed with sender key. +        """ +        # mock the key fetching +        self._km.fetch_keys_from_server = Mock(return_value=[]) +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        user = User('ihavenopubkey@nonleap.se', 'relay.leap.se', proto, ADDRESS) +        fromAddr = Address(ADDRESS_2) +        m = EncryptedMessage(fromAddr, user, self._km, self._config) +        for line in self.EMAIL_DATA[4:12]: +            m.lineReceived(line) +        # trigger signing +        m.eomReceived() +        # assert content of message +        self.assertTrue( +            m._message.get_payload().startswith( +                '-----BEGIN PGP SIGNED MESSAGE-----\n' + +                'Hash: SHA1\n\n' +  +                ('\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n' + +                '-----BEGIN PGP SIGNATURE-----\n')), +            'Message does not start with signature header.') +        self.assertTrue( +            m._message.get_payload().endswith( +                '-----END PGP SIGNATURE-----\n'), +            'Message does not end with signature footer.') +        # assert signature is valid +        pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) +        self.assertTrue( +            openpgp.verify(m._message.get_payload(), pubkey), +            'Signature could not be verified.') + +    def test_missing_key_rejects_address(self): +        """ +        Test if server rejects to send unencrypted when 'encrypted_only' is +        True. +        """ +        # remove key from key manager +        pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey) +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.delete_key(pubkey) +        # mock the key fetching +        self._km.fetch_keys_from_server = Mock(return_value=[]) +        # prepare the SMTP factory +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        transport = proto_helpers.StringTransport() +        proto.makeConnection(transport) +        proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') +        # ensure the address was rejected +        lines = transport.value().rstrip().split('\n') +        self.assertEqual( +            '550 Cannot receive for specified address', +            lines[-1], +            'Address should have been rejecetd with appropriate message.') + +    def test_missing_key_accepts_address(self): +        """ +        Test if server accepts to send unencrypted when 'encrypted_only' is +        False. +        """ +        # remove key from key manager +        pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey) +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.delete_key(pubkey) +        # mock the key fetching +        self._km.fetch_keys_from_server = Mock(return_value=[]) +        # change the configuration +        self._config['encrypted_only'] = False +        # prepare the SMTP factory +        proto = SMTPFactory( +            self._km, self._config).buildProtocol(('127.0.0.1', 0)) +        transport = proto_helpers.StringTransport() +        proto.makeConnection(transport) +        proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') +        proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') +        # ensure the address was accepted +        lines = transport.value().rstrip().split('\n') +        self.assertEqual( +            '250 Recipient address accepted', +            lines[-1], +            'Address should have been accepted with appropriate message.') | 
