diff options
author | drebs <drebs@leap.se> | 2013-05-06 19:31:56 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-05-11 17:01:04 -0300 |
commit | 98bb3f2cd3bf007041265583af95845264d7e189 (patch) | |
tree | ed66f71aa46a07bb94b90fdb74816e557b0773c0 | |
parent | b17d57d763e2d206e6dcff7b3e245b3dc6b480c4 (diff) |
Adapt smtp-relay to use leap.common.keymanager
* Add docstrings to smtp-relay classes.
* Setup test suite.
* Add setuptools-trial as dependency for tests.
* Move smtp-relay tests to default test directory.
* Add tests for smtp-relay.
* Send of unencrypted mail depending on 'encrypted_only' param.
* Malformed email address.
* Add a helper function for smtp-relay.
* Assert params for initializing SMTPFactory.
* Use mail.util.parseaddr to parse email address.
* Use email.message.Message to represent an email message in smtp-relay.
* Make requirements effective and fix leap.common version in setup.py.
* Add/remove dependencies in setup.py.
* Fix Soledad instantiation in tests.
* Fix sender address in smtp-relay.
* Fix some comments regarding twisted's SSL and SMTP.
* Remove authentication from smtp-relay when sending.
This closes #2446.
-rw-r--r-- | mail/pkg/requirements.pip | 2 | ||||
-rw-r--r-- | mail/setup.py | 14 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/tests/test_imap.py | 4 | ||||
-rw-r--r-- | mail/src/leap/mail/smtp/__init__.py | 78 | ||||
-rw-r--r-- | mail/src/leap/mail/smtp/smtprelay.py | 498 | ||||
-rw-r--r-- | mail/src/leap/mail/smtp/tests/test_smtprelay.py | 78 | ||||
-rw-r--r-- | mail/src/leap/mail/tests/smtp/185CA770.key (renamed from mail/src/leap/mail/smtp/tests/185CA770.key) | 0 | ||||
-rw-r--r-- | mail/src/leap/mail/tests/smtp/185CA770.pub (renamed from mail/src/leap/mail/smtp/tests/185CA770.pub) | 0 | ||||
-rw-r--r-- | mail/src/leap/mail/tests/smtp/__init__.py (renamed from mail/src/leap/mail/smtp/tests/__init__.py) | 98 | ||||
-rw-r--r-- | mail/src/leap/mail/tests/smtp/mail.txt (renamed from mail/src/leap/mail/smtp/tests/mail.txt) | 0 | ||||
-rw-r--r-- | mail/src/leap/mail/tests/smtp/test_smtprelay.py | 212 |
11 files changed, 735 insertions, 249 deletions
diff --git a/mail/pkg/requirements.pip b/mail/pkg/requirements.pip index 1b5e5efe..5f4e7ef0 100644 --- a/mail/pkg/requirements.pip +++ b/mail/pkg/requirements.pip @@ -1,3 +1,3 @@ -leap.common +leap.common>=0.2.3-dev leap.soledad twisted diff --git a/mail/setup.py b/mail/setup.py index 4de72512..8d4e415a 100644 --- a/mail/setup.py +++ b/mail/setup.py @@ -21,10 +21,15 @@ from setuptools import setup, find_packages requirements = [ "leap.soledad", - "leap.common", + "leap.common>=0.2.3-dev", "twisted", ] +tests_requirements = [ + 'setuptools-trial', + 'mock', +] + # XXX add classifiers, docs setup( @@ -40,7 +45,8 @@ setup( ), namespace_packages=["leap"], package_dir={'': 'src'}, - packages=find_packages('src'), - #test_suite='leap.mail.tests', - #install_requires=requirements, + packages=find_packages('src', exclude=['leap.mail.tests']), + test_suite='leap.mail.tests', + install_requires=requirements, + tests_require=tests_requirements, ) diff --git a/mail/src/leap/mail/imap/tests/test_imap.py b/mail/src/leap/mail/imap/tests/test_imap.py index 6792e4bb..7bfa1d79 100644 --- a/mail/src/leap/mail/imap/tests/test_imap.py +++ b/mail/src/leap/mail/imap/tests/test_imap.py @@ -49,8 +49,8 @@ import u1db from leap.common.testing.basetest import BaseLeapTest from leap.mail.imap.server import SoledadMailbox -from leap.mail.imap.tests import PUBLIC_KEY -from leap.mail.imap.tests import PRIVATE_KEY +from leap.mail.tests.imap import PUBLIC_KEY +from leap.mail.tests.imap import PRIVATE_KEY from leap.soledad import Soledad from leap.soledad.util import GPGWrapper diff --git a/mail/src/leap/mail/smtp/__init__.py b/mail/src/leap/mail/smtp/__init__.py index e69de29b..13af0156 100644 --- a/mail/src/leap/mail/smtp/__init__.py +++ b/mail/src/leap/mail/smtp/__init__.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay helper function. +""" + + +from twisted.application import internet, service +from twisted.internet import reactor + + +from leap import soledad +from leap.common.keymanager import KeyManager +from leap.mail.smtp.smtprelay import SMTPFactory + + +def setup_smtp_relay(port, keymanager, smtp_host, smtp_port, smtp_username, + smtp_password, encrypted_only): + """ + Setup SMTP relay to run with Twisted. + + This function sets up the SMTP relay configuration and the Twisted + reactor. + + @param port: The port in which to run the server. + @type port: int + @param keymanager: A Key Manager from where to get recipients' public + keys. + @type keymanager: leap.common.keymanager.KeyManager + @param smtp_host: The hostname of the remote SMTP server. + @type smtp_host: str + @param smtp_port: The port of the remote SMTP server. + @type smtp_port: int + @param smtp_username: The username used to connect to remote SMTP server. + @type smtp_username: str + @param smtp_password: The password used to connect to remote SMTP server. + @type smtp_password: str + @param encrypted_only: Whether the SMTP relay should send unencrypted mail + or not. + @type encrypted_only: bool + """ + # The configuration for the SMTP relay is a dict with the following + # format: + # + # { + # 'host': '<host>', + # 'port': <int>, + # 'username': '<username>', + # 'password': '<password>', + # 'encrypted_only': <True/False> + # } + config = { + 'host': smtp_host, + 'port': smtp_port, + 'username': smtp_username, + 'password': smtp_password, + 'encrypted_only': encrypted_only + } + + # configure the use of this service with twistd + factory = SMTPFactory(keymanager, config) + reactor.listenTCP(port, factory) diff --git a/mail/src/leap/mail/smtp/smtprelay.py b/mail/src/leap/mail/smtp/smtprelay.py index 6479873a..bd18fb56 100644 --- a/mail/src/leap/mail/smtp/smtprelay.py +++ b/mail/src/leap/mail/smtp/smtprelay.py @@ -1,42 +1,186 @@ +# -*- coding: utf-8 -*- +# smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + """ LEAP SMTP encrypted relay. """ import re +import os import gnupg +import tempfile + + from zope.interface import implements from StringIO import StringIO from twisted.mail import smtp from twisted.internet.protocol import ServerFactory from twisted.internet import reactor from twisted.internet import defer -from twisted.application import internet, service from twisted.python import log from email.Header import Header -from leap import soledad +from email.utils import parseaddr +from email.parser import Parser + + +from leap.common.check import leap_assert, leap_assert_type +from leap.common.keymanager import KeyManager +from leap.common.keymanager.openpgp import ( + encrypt_asym, + OpenPGPKey, +) +from leap.common.keymanager.errors import KeyNotFound +from leap.common.keymanager.keys import is_address + +# +# Exceptions +# -class SMTPInfoNotAvailable(Exception): +class MalformedConfig(Exception): + """ + Raised when the configuration dictionary passed as parameter is malformed. + """ pass +# +# Helper utilities +# + +HOST_KEY = 'host' +PORT_KEY = 'port' +USERNAME_KEY = 'username' +PASSWORD_KEY = 'password' +ENCRYPTED_ONLY_KEY = 'encrypted_only' + + +def assert_config_structure(config): + """ + Assert that C{config} is a dict with the following structure: + + { + HOST_KEY: '<str>', + PORT_KEY: <int>, + USERNAME_KEY: '<str>', + PASSWORD_KEY: '<str>', + ENCRYPTED_ONLY_KEY: <bool>, + } + + @param config: The dictionary to check. + @type config: dict + """ + # assert smtp config structure is valid + leap_assert_type(config, dict) + leap_assert(HOST_KEY in config) + leap_assert_type(config[HOST_KEY], str) + leap_assert(PORT_KEY in config) + leap_assert_type(config[PORT_KEY], int) + leap_assert(USERNAME_KEY in config) + leap_assert_type(config[USERNAME_KEY], str) + leap_assert(PASSWORD_KEY in config) + leap_assert_type(config[PASSWORD_KEY], str) + leap_assert(ENCRYPTED_ONLY_KEY in config) + leap_assert_type(config[ENCRYPTED_ONLY_KEY], bool) + # assert received params are not empty + leap_assert(config[HOST_KEY] != '') + leap_assert(config[PORT_KEY] is not 0) + leap_assert(config[USERNAME_KEY] != '') + leap_assert(config[PASSWORD_KEY] != '') + + +def strip_and_validate_address(address): + """ + Helper function to (eventually) strip and validate an email address. + + This function first checks whether the incomming C{address} is of the form + '<something>' and, if it is, then '<' and '>' are removed from the + address. After that, a simple validation for user@provider form is + carried. + + @param address: The address to be validated. + @type address: str + + @return: The (eventually) stripped address. + @rtype: str + + @raise smtp.SMTPBadRcpt: Raised if C{address} does not have the expected + format. + """ + leap_assert(address is not None) + leap_assert_type(address, str) + _, address = parseaddr(address) + leap_assert(address != '') + if is_address(address): + return address + raise smtp.SMTPBadRcpt(address) + + +# +# SMTPFactory +# + class SMTPFactory(ServerFactory): """ Factory for an SMTP server with encrypted relaying capabilities. """ - def __init__(self, soledad, gpg=None): - self._soledad = soledad - self._gpg = gpg + def __init__(self, keymanager, config): + """ + @param keymanager: A KeyManager for retrieving recipient's keys. + @type keymanager: leap.common.keymanager.KeyManager + @param config: A dictionary with smtp configuration. Should have + the following structure: + { + HOST_KEY: '<str>', + PORT_KEY: <int>, + USERNAME_KEY: '<str>', + PASSWORD_KEY: '<str>', + ENCRYPTED_ONLY_KEY: <bool>, + } + @type config: dict + """ + # assert params + leap_assert_type(keymanager, KeyManager) + assert_config_structure(config) + # and store them + self._km = keymanager + self._config = config def buildProtocol(self, addr): - "Return a protocol suitable for the job." - # TODO: use ESMTP here. - smtpProtocol = smtp.SMTP(SMTPDelivery(self._soledad, self._gpg)) + """ + Return a protocol suitable for the job. + + @param addr: An address, e.g. a TCP (host, port). + @type addr: twisted.internet.interfaces.IAddress + + @return: The protocol. + @rtype: SMTPDelivery + """ + # If needed, we might use ESMTPDelivery here instead. + smtpProtocol = smtp.SMTP(SMTPDelivery(self._km, self._config)) smtpProtocol.factory = self return smtpProtocol +# +# SMTPDelivery +# + class SMTPDelivery(object): """ Validate email addresses and handle message delivery. @@ -44,14 +188,44 @@ class SMTPDelivery(object): implements(smtp.IMessageDelivery) - def __init__(self, soledad, gpg=None): - self._soledad = soledad - if gpg: - self._gpg = gpg - else: - self._gpg = GPGWrapper() + def __init__(self, keymanager, config): + """ + @param keymanager: A KeyManager for retrieving recipient's keys. + @type keymanager: leap.common.keymanager.KeyManager + @param config: A dictionary with smtp configuration. Should have + the following structure: + { + HOST_KEY: '<str>', + PORT_KEY: <int>, + USERNAME_KEY: '<str>', + PASSWORD_KEY: '<str>', + ENCRYPTED_ONLY_KEY: <bool>, + } + @type config: dict + """ + # assert params + leap_assert_type(keymanager, KeyManager) + assert_config_structure(config) + # and store them + self._km = keymanager + self._config = config def receivedHeader(self, helo, origin, recipients): + """ + Generate the Received header for a message. + + @param helo: The argument to the HELO command and the client's IP + address. + @type helo: (str, str) + @param origin: The address the message is from. + @type origin: twisted.mail.smtp.Address + @param recipients: A list of the addresses for which this message is + bound. + @type: list of twisted.mail.smtp.User + + @return: The full "Received" header string. + @type: str + """ myHostname, clientIP = helo headerValue = "by %s from %s with ESMTP ; %s" % ( myHostname, clientIP, smtp.rfc822date()) @@ -59,188 +233,232 @@ class SMTPDelivery(object): return "Received: %s" % Header(headerValue) def validateTo(self, user): - """Assert existence of and trust on recipient's GPG public key.""" + """ + Validate the address for which the message is destined. + + For now, it just asserts the existence of the user's key if the + configuration option ENCRYPTED_ONLY_KEY is True. + + @param user: The address to validate. + @type: twisted.mail.smtp.User + + @return: A Deferred which becomes, or a callable which takes no + arguments and returns an object implementing IMessage. This will + be called and the returned object used to deliver the message when + it arrives. + @rtype: no-argument callable + + @raise SMTPBadRcpt: Raised if messages to the address are not to be + accepted. + """ # try to find recipient's public key try: - # this will raise an exception if key is not found - trust = self._gpg.find_key(user.dest.addrstr)['trust'] - # if key is not ultimatelly trusted, then the message will not - # be encrypted. So, we check for this below - #if trust != 'u': - # raise smtp.SMTPBadRcpt(user) + address = strip_and_validate_address(user.dest.addrstr) + pubkey = self._km.get_key(address, OpenPGPKey) log.msg("Accepting mail for %s..." % user.dest) - return lambda: EncryptedMessage(user, soledad=self._soledad, - gpg=self._gpg) - except LookupError: + except KeyNotFound: # if key was not found, check config to see if will send anyway. - if self.encrypted_only: - raise smtp.SMTPBadRcpt(user) - # TODO: send signal to cli/gui that this user's key was not found? + if self._config[ENCRYPTED_ONLY_KEY]: + raise smtp.SMTPBadRcpt(user.dest.addrstr) log.msg("Warning: will send an unencrypted message (because " "encrypted_only' is set to False).") + return lambda: EncryptedMessage(user, self._km, self._config) + + def validateFrom(self, helo, origin): + """ + Validate the address from which the message originates. - def validateFrom(self, helo, originAddress): + @param helo: The argument to the HELO command and the client's IP + address. + @type: (str, str) + @param origin: The address the message is from. + @type origin: twisted.mail.smtp.Address + + @return: origin or a Deferred whose callback will be passed origin. + @rtype: Deferred or Address + + @raise twisted.mail.smtp.SMTPBadSender: Raised if messages from this + address are not to be accepted. + """ # accept mail from anywhere. To reject an address, raise # smtp.SMTPBadSender here. - return originAddress + return origin -class EncryptedMessage(): +# +# EncryptedMessage +# + +class EncryptedMessage(object): """ Receive plaintext from client, encrypt it and send message to a recipient. """ implements(smtp.IMessage) - SMTP_HOSTNAME = "mail.leap.se" - SMTP_PORT = 25 - - def __init__(self, user, soledad, gpg=None): - self.user = user - self._soledad = soledad - self.fetchConfig() + def __init__(self, user, keymanager, config): + """ + Initialize the encrypted message. + + @param user: The address to validate. + @type: twisted.mail.smtp.User + @param keymanager: A KeyManager for retrieving recipient's keys. + @type keymanager: leap.common.keymanager.KeyManager + @param config: A dictionary with smtp configuration. Should have + the following structure: + { + HOST_KEY: '<str>', + PORT_KEY: <int>, + USERNAME_KEY: '<str>', + PASSWORD_KEY: '<str>', + ENCRYPTED_ONLY_KEY: <bool>, + } + @type config: dict + """ + # assert params + leap_assert_type(user, smtp.User) + leap_assert_type(keymanager, KeyManager) + assert_config_structure(config) + # and store them + self._user = user + self._km = keymanager + self._config = config + # initialize list for message's lines self.lines = [] - if gpg: - self._gpg = gpg - else: - self._gpg = GPGWrapper() def lineReceived(self, line): - """Store email DATA lines as they arrive.""" + """ + Handle another line. + + @param line: The received line. + @type line: str + """ self.lines.append(line) def eomReceived(self): - """Encrypt and send message.""" + """ + Handle end of message. + + This method will encrypt and send the message. + """ log.msg("Message data complete.") self.lines.append('') # add a trailing newline self.parseMessage() try: - self.encrypt() + self._encrypt() return self.sendMessage() - except LookupError: + except KeyNotFound: return None def parseMessage(self): - """Separate message headers from body.""" - sep = self.lines.index('') - self.headers = self.lines[:sep] - self.body = self.lines[sep + 1:] + """ + Separate message headers from body. + """ + parser = Parser() + self._message = parser.parsestr('\r\n'.join(self.lines)) def connectionLost(self): + """ + Log an error when the connection is lost. + """ log.msg("Connection lost unexpectedly!") log.err() # unexpected loss of connection; don't save self.lines = [] def sendSuccess(self, r): + """ + Callback for a successful send. + + @param r: The result from the last previous callback in the chain. + @type r: anything + """ log.msg(r) def sendError(self, e): + """ + Callback for an unsuccessfull send. + + @param e: The result from the last errback. + @type e: anything + """ log.msg(e) log.err() def prepareHeader(self): - self.headers.insert(1, "From: %s" % self.user.orig.addrstr) - self.headers.insert(2, "To: %s" % self.user.dest.addrstr) - self.headers.append('') + """ + Prepare the headers of the message. + """ + self._message.replace_header('From', '<%s>' % self._user.orig.addrstr) def sendMessage(self): + """ + Send the message. + + This method will prepare the message (headers and possibly encrypted + body) and send it using the ESMTPSenderFactory. + + @return: A deferred with callbacks for error and success of this + message send. + @rtype: twisted.internet.defer.Deferred + """ self.prepareHeader() - msg = '\n'.join(self.headers + [self.cyphertext]) + msg = self._message.as_string(False) d = defer.Deferred() - factory = smtp.ESMTPSenderFactory(self.smtp_username, - self.smtp_password, - self.smtp_username, - self.user.dest.addrstr, - StringIO(msg), - d) - # the next call is TSL-powered! - reactor.connectTCP(self.SMTP_HOSTNAME, self.SMTP_PORT, factory) + factory = smtp.ESMTPSenderFactory( + self._config[USERNAME_KEY], + self._config[PASSWORD_KEY], + self._fromAddress.addrstr, + self._user.dest.addrstr, + StringIO(msg), + d, + requireAuthentication=False, # for now do unauth, see issue #2474 + ) + # TODO: Change this to connectSSL when cert auth is in place in the platform + reactor.connectTCP( + self._config[HOST_KEY], + self._config[PORT_KEY], + factory + ) d.addCallback(self.sendSuccess) d.addErrback(self.sendError) return d - def encrypt(self, always_trust=True): - # TODO: do not "always trust" here. - try: - fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint'] - log.msg("Encrypting to %s" % fp) - self.cyphertext = str( - self._gpg.encrypt( - '\n'.join(self.body), [fp], always_trust=always_trust)) - except LookupError: - if self.encrypted_only: - raise - log.msg("Warning: sending unencrypted mail (because " - "'encrypted_only' is set to False).") + def _encrypt_payload_rec(self, message, pubkey): + """ + Recursivelly descend in C{message}'s payload and encrypt to C{pubkey}. - # this will be replaced by some other mechanism of obtaining credentials - # for SMTP server. - def fetchConfig(self): - # TODO: Soledad/LEAP bootstrap should store the SMTP info on local db, - # so this relay can load it when it needs. - if not self._soledad: - # TODO: uncomment below exception when integration with Soledad is - # smooth. - #raise SMTPInfoNotAvailable() - # TODO: remove dummy settings below when soledad bootstrap is - # working. - self.smtp_host = '' - self.smtp_port = '' - self.smtp_username = '' - self.smtp_password = '' - self.encrypted_only = True + @param message: The message whose payload we want to encrypt. + @type message: email.message.Message + @param pubkey: The public key used to encrypt the message. + @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey + """ + if message.is_multipart() is False: + message.set_payload(encrypt_asym(message.get_payload(), pubkey)) else: - self.smtp_config = self._soledad.get_doc('smtp_relay_config') - for confname in [ - 'smtp_host', 'smtp_port', 'smtp_username', - 'smtp_password', 'encrypted_only', - ]: - setattr(self, confname, doc.content[confname]) - - -class GPGWrapper(): - """ - This is a temporary class for handling GPG requests, and should be - replaced by a more general class used throughout the project. - """ - - GNUPG_HOME = "~/.config/leap/gnupg" - GNUPG_BINARY = "/usr/bin/gpg" # TODO: change this based on OS - - def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY): - self.gpg = gnupg.GPG(gnupghome=gpghome, gpgbinary=gpgbinary) + for msg in message.get_payload(): + self._encrypt_payload_rec(msg, pubkey) - def find_key(self, email): + def _encrypt(self): """ - Find user's key based on their email. - """ - for key in self.gpg.list_keys(): - for uid in key['uids']: - if re.search(email, uid): - return key - raise LookupError("GnuPG public key for %s not found!" % email) - - def encrypt(self, data, recipient, always_trust=True): - # TODO: do not 'always_trust'. - return self.gpg.encrypt(data, recipient, always_trust=always_trust) - - def decrypt(self, data): - return self.gpg.decrypt(data) - - def import_keys(self, data): - return self.gpg.import_keys(data) - - -# service configuration -port = 25 -user_email = 'user@leap.se' # TODO: replace for real mail from gui/cli + Encrypt the message body. -# instantiate soledad for client app storage and sync -s = soledad.Soledad(user_email) -factory = SMTPFactory(s) + This method fetches the recipient key and encrypts the content to the + recipient. If a key is not found, then the behaviour depends on the + configuration parameter ENCRYPTED_ONLY_KEY. If it is False, the message + is sent unencrypted and a warning is logged. If it is True, the + encryption fails with a KeyNotFound exception. -# enable the use of this service with twistd -application = service.Application("LEAP SMTP Relay") -service = internet.TCPServer(port, factory) -service.setServiceParent(application) + @raise KeyNotFound: Raised when the recipient key was not found and + the ENCRYPTED_ONLY_KEY configuration parameter is set to True. + """ + try: + address = strip_and_validate_address(self._user.dest.addrstr) + pubkey = self._km.get_key(address, OpenPGPKey) + log.msg("Encrypting to %s" % pubkey.fingerprint) + self._encrypt_payload_rec(self._message, pubkey) + except KeyNotFound: + if self._config[ENCRYPTED_ONLY_KEY]: + raise + log.msg("Warning: sending unencrypted mail (because " + "'encrypted_only' is set to False).") diff --git a/mail/src/leap/mail/smtp/tests/test_smtprelay.py b/mail/src/leap/mail/smtp/tests/test_smtprelay.py deleted file mode 100644 index eaa4d040..00000000 --- a/mail/src/leap/mail/smtp/tests/test_smtprelay.py +++ /dev/null @@ -1,78 +0,0 @@ -from datetime import datetime -import re -from leap.email.smtp.smtprelay import ( - SMTPFactory, - #SMTPDelivery, # an object - EncryptedMessage, -) -from leap.email.smtp import tests -from twisted.test import proto_helpers -from twisted.mail.smtp import User - - -# some regexps -IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ - "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" -HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ - "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" -IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' - - -class TestSmtpRelay(tests.OpenPGPTestCase): - - EMAIL_DATA = ['HELO relay.leap.se', - 'MAIL FROM: <user@leap.se>', - 'RCPT TO: <leap@leap.se>', - 'DATA', - 'From: User <user@leap.se>', - 'To: Leap <leap@leap.se>', - 'Date: ' + datetime.now().strftime('%c'), - 'Subject: test message', - '', - 'This is a secret message.', - 'Yours,', - 'A.', - '', - '.', - 'QUIT'] - - def assertMatch(self, string, pattern, msg=None): - if not re.match(pattern, string): - msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' - % (string, pattern)) - raise self.failureException(msg) - - def test_relay_accepts_valid_email(self): - """ - Test if SMTP server responds correctly for valid interaction. - """ - - SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + - ' NO UCE NO UBE NO RELAY PROBES', - '250 ' + IP_OR_HOST_REGEX + ' Hello ' + - IP_OR_HOST_REGEX + ', nice to meet you', - '250 Sender address accepted', - '250 Recipient address accepted', - '354 Continue'] - proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0)) - transport = proto_helpers.StringTransport() - proto.makeConnection(transport) - for i, line in enumerate(self.EMAIL_DATA): - proto.lineReceived(line + '\r\n') - self.assertMatch(transport.value(), - '\r\n'.join(SMTP_ANSWERS[0:i + 1])) - proto.setTimeout(None) - - def test_message_encrypt(self): - """ - Test if message gets encrypted to destination email. - """ - proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0)) - user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') - m = EncryptedMessage(user, None, self._gpg) - for line in self.EMAIL_DATA[4:12]: - m.lineReceived(line) - m.parseMessage() - m.encrypt() - decrypted = str(self._gpg.decrypt(m.cyphertext)) - self.assertEqual('\n'.join(self.EMAIL_DATA[9:12]), decrypted) diff --git a/mail/src/leap/mail/smtp/tests/185CA770.key b/mail/src/leap/mail/tests/smtp/185CA770.key index 587b4164..587b4164 100644 --- a/mail/src/leap/mail/smtp/tests/185CA770.key +++ b/mail/src/leap/mail/tests/smtp/185CA770.key diff --git a/mail/src/leap/mail/smtp/tests/185CA770.pub b/mail/src/leap/mail/tests/smtp/185CA770.pub index 38af19f8..38af19f8 100644 --- a/mail/src/leap/mail/smtp/tests/185CA770.pub +++ b/mail/src/leap/mail/tests/smtp/185CA770.pub diff --git a/mail/src/leap/mail/smtp/tests/__init__.py b/mail/src/leap/mail/tests/smtp/__init__.py index d7b942a1..113e0473 100644 --- a/mail/src/leap/mail/smtp/tests/__init__.py +++ b/mail/src/leap/mail/tests/smtp/__init__.py @@ -1,17 +1,49 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Base classes and keys for SMTP relay tests. +""" + import os import shutil import tempfile +from mock import Mock + from twisted.trial import unittest + +from leap.soledad import Soledad +from leap.soledad.crypto import SoledadCrypto +from leap.common.keymanager import ( + KeyManager, + openpgp, +) + + from leap.common.testing.basetest import BaseLeapTest -from leap.mail.smtp.smtprelay import GPGWrapper -class OpenPGPTestCase(unittest.TestCase, BaseLeapTest): +class TestCaseWithKeyManager(BaseLeapTest): def setUp(self): - # mimic LeapBaseTest.setUpClass behaviour, because this is deprecated + # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated # in Twisted: http://twistedmatrix.com/trac/ticket/1870 self.old_path = os.environ['PATH'] self.old_home = os.environ['HOME'] @@ -22,18 +54,46 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest): 'bin') os.environ["PATH"] = bin_tdir os.environ["HOME"] = self.tempdir + # setup our own stuff - self.gnupg_home = self.tempdir + '/gnupg' - os.mkdir(self.gnupg_home) - self.email = 'leap@leap.se' - self._gpg = GPGWrapper(gpghome=self.gnupg_home) - - self.assertEqual(self._gpg.import_keys(PUBLIC_KEY).summary(), - '1 imported', "error importing public key") - self.assertEqual(self._gpg.import_keys(PRIVATE_KEY).summary(), - # note that gnupg does not return a successful import - # for private keys. Bug? - '0 imported', "error importing private key") + address = 'leap@leap.se' # user's address in the form user@provider + uuid = 'leap@leap.se' + passphrase = '123' + secret_path = os.path.join(self.tempdir, 'secret.gpg') + local_db_path = os.path.join(self.tempdir, 'soledad.u1db') + server_url = 'http://provider/' + cert_file = '' + + # mock key fetching and storing so Soledad doesn't fail when trying to + # reach the server. + Soledad._fetch_keys_from_shared_db = Mock(return_value=None) + Soledad._assert_keys_in_shared_db = Mock(return_value=None) + + # instantiate soledad + self._soledad = Soledad( + uuid, + passphrase, + secret_path, + local_db_path, + server_url, + cert_file, + ) + + self._config = { + 'host': 'http://provider/', + 'port': 25, + 'username': address, + 'password': '<password>', + 'encrypted_only': True + } + + nickserver_url = '' # the url of the nickserver + self._km = KeyManager(address, nickserver_url, self._soledad) + + # insert test keys in key manager. + pgp = openpgp.OpenPGPScheme(self._soledad) + pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY_2) def tearDown(self): # mimic LeapBaseTest.tearDownClass behaviour @@ -43,16 +103,6 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest): assert self.tempdir.startswith('/tmp/leap_tests-') shutil.rmtree(self.tempdir) - def test_openpgp_encrypt_decrypt(self): - "Test if openpgp can encrypt and decrypt." - text = "simple raw text" - encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT, - # TODO: handle always trust issue - always_trust=True)) - self.assertNotEqual(text, encrypted, "failed encrypting text") - decrypted = str(self._gpg.decrypt(encrypted)) - self.assertEqual(text, decrypted, "failed decrypting text") - # Key material for testing KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" diff --git a/mail/src/leap/mail/smtp/tests/mail.txt b/mail/src/leap/mail/tests/smtp/mail.txt index 95420470..95420470 100644 --- a/mail/src/leap/mail/smtp/tests/mail.txt +++ b/mail/src/leap/mail/tests/smtp/mail.txt diff --git a/mail/src/leap/mail/tests/smtp/test_smtprelay.py b/mail/src/leap/mail/tests/smtp/test_smtprelay.py new file mode 100644 index 00000000..6ef4e853 --- /dev/null +++ b/mail/src/leap/mail/tests/smtp/test_smtprelay.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# test_smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay tests. +""" + + +import re + + +from datetime import datetime +from twisted.test import proto_helpers +from twisted.mail.smtp import ( + User, + SMTPBadRcpt, +) +from mock import Mock + + +from leap.mail.smtp.smtprelay import ( + SMTPFactory, + EncryptedMessage, +) +from leap.mail.tests.smtp import TestCaseWithKeyManager +from leap.common.keymanager import openpgp + + +# some regexps +IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ + "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" +HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ + "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" +IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' + + +class TestSmtpRelay(TestCaseWithKeyManager): + + EMAIL_DATA = ['HELO relay.leap.se', + 'MAIL FROM: <user@leap.se>', + 'RCPT TO: <leap@leap.se>', + 'DATA', + 'From: User <user@leap.se>', + 'To: Leap <leap@leap.se>', + 'Date: ' + datetime.now().strftime('%c'), + 'Subject: test message', + '', + 'This is a secret message.', + 'Yours,', + 'A.', + '', + '.', + 'QUIT'] + + def assertMatch(self, string, pattern, msg=None): + if not re.match(pattern, string): + msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' + % (string, pattern)) + raise self.failureException(msg) + + def test_openpgp_encrypt_decrypt(self): + "Test if openpgp can encrypt and decrypt." + text = "simple raw text" + pubkey = self._km.get_key( + 'leap@leap.se', openpgp.OpenPGPKey, private=False) + encrypted = openpgp.encrypt_asym(text, pubkey) + self.assertNotEqual(text, encrypted, "failed encrypting text") + privkey = self._km.get_key( + 'leap@leap.se', openpgp.OpenPGPKey, private=True) + decrypted = openpgp.decrypt_asym(encrypted, privkey) + self.assertEqual(text, decrypted, "failed decrypting text") + + def test_relay_accepts_valid_email(self): + """ + Test if SMTP server responds correctly for valid interaction. + """ + + SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + + ' NO UCE NO UBE NO RELAY PROBES', + '250 ' + IP_OR_HOST_REGEX + ' Hello ' + + IP_OR_HOST_REGEX + ', nice to meet you', + '250 Sender address accepted', + '250 Recipient address accepted', + '354 Continue'] + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + for i, line in enumerate(self.EMAIL_DATA): + proto.lineReceived(line + '\r\n') + self.assertMatch(transport.value(), + '\r\n'.join(SMTP_ANSWERS[0:i + 1])) + proto.setTimeout(None) + + def test_message_encrypt(self): + """ + Test if message gets encrypted to destination email. + """ + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') + m = EncryptedMessage(user, self._km, self._config) + for line in self.EMAIL_DATA[4:12]: + m.lineReceived(line) + m.eomReceived() + privkey = self._km.get_key( + 'leap@leap.se', openpgp.OpenPGPKey, private=True) + decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey) + self.assertEqual( + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', + decrypted) + + def test_missing_key_rejects_address(self): + """ + Test if server rejects to send unencrypted when 'encrypted_only' is + True. + """ + # remove key from key manager + pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey) + pgp = openpgp.OpenPGPScheme(self._soledad) + pgp.delete_key(pubkey) + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + # prepare the SMTP factory + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') + # ensure the address was rejected + lines = transport.value().rstrip().split('\n') + self.assertEqual( + '550 Cannot receive for specified address', + lines[-1]) + + def test_missing_key_accepts_address(self): + """ + Test if server accepts to send unencrypted when 'encrypted_only' is + False. + """ + # remove key from key manager + pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey) + pgp = openpgp.OpenPGPScheme(self._soledad) + pgp.delete_key(pubkey) + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + # change the configuration + self._config['encrypted_only'] = False + # prepare the SMTP factory + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') + # ensure the address was rejected + lines = transport.value().rstrip().split('\n') + self.assertEqual( + '250 Recipient address accepted', + lines[-1]) + + def test_malformed_address_rejects(self): + """ + Test if server rejects to send to malformed addresses. + """ + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + # prepare the SMTP factory + for malformed in ['leap@']: + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') + proto.lineReceived('RCPT TO: <%s>%s' % (malformed, '\r\n')) + # ensure the address was rejected + lines = transport.value().rstrip().split('\n') + self.assertEqual( + '550 Cannot receive for specified address', + lines[-1]) + + def test_prepare_header_adds_from(self): + """ + Test if message headers are OK. + """ + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') + m = EncryptedMessage(user, self._km, self._config) + for line in self.EMAIL_DATA[4:12]: + m.lineReceived(line) + m.eomReceived() + self.assertEqual('<leap@leap.se>', m._message['From']) |