summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mail/changes/feature_smtp-relay-sign-outgoing-messages1
-rw-r--r--mail/pkg/requirements.pip1
-rw-r--r--mail/setup.py14
-rw-r--r--mail/src/leap/mail/smtp/__init__.py78
-rw-r--r--mail/src/leap/mail/smtp/smtprelay.py533
-rw-r--r--mail/src/leap/mail/smtp/tests/test_smtprelay.py78
-rw-r--r--mail/src/leap/mail/tests/smtp/185CA770.key (renamed from mail/src/leap/mail/smtp/tests/185CA770.key)0
-rw-r--r--mail/src/leap/mail/tests/smtp/185CA770.pub (renamed from mail/src/leap/mail/smtp/tests/185CA770.pub)0
-rw-r--r--mail/src/leap/mail/tests/smtp/__init__.py (renamed from mail/src/leap/mail/smtp/tests/__init__.py)163
-rw-r--r--mail/src/leap/mail/tests/smtp/mail.txt (renamed from mail/src/leap/mail/smtp/tests/mail.txt)0
-rw-r--r--mail/src/leap/mail/tests/smtp/test_smtprelay.py248
11 files changed, 867 insertions, 249 deletions
diff --git a/mail/changes/feature_smtp-relay-sign-outgoing-messages b/mail/changes/feature_smtp-relay-sign-outgoing-messages
new file mode 100644
index 0000000..e3035bf
--- /dev/null
+++ b/mail/changes/feature_smtp-relay-sign-outgoing-messages
@@ -0,0 +1 @@
+ o SMTP relay signs outgoing messages.
diff --git a/mail/pkg/requirements.pip b/mail/pkg/requirements.pip
index af633f9..d8888fd 100644
--- a/mail/pkg/requirements.pip
+++ b/mail/pkg/requirements.pip
@@ -1,3 +1,2 @@
-leap.common>=0.2.3-dev
leap.soledad>=0.0.2-dev
twisted
diff --git a/mail/setup.py b/mail/setup.py
index 4de7251..8d4e415 100644
--- a/mail/setup.py
+++ b/mail/setup.py
@@ -21,10 +21,15 @@ from setuptools import setup, find_packages
requirements = [
"leap.soledad",
- "leap.common",
+ "leap.common>=0.2.3-dev",
"twisted",
]
+tests_requirements = [
+ 'setuptools-trial',
+ 'mock',
+]
+
# XXX add classifiers, docs
setup(
@@ -40,7 +45,8 @@ setup(
),
namespace_packages=["leap"],
package_dir={'': 'src'},
- packages=find_packages('src'),
- #test_suite='leap.mail.tests',
- #install_requires=requirements,
+ packages=find_packages('src', exclude=['leap.mail.tests']),
+ test_suite='leap.mail.tests',
+ install_requires=requirements,
+ tests_require=tests_requirements,
)
diff --git a/mail/src/leap/mail/smtp/__init__.py b/mail/src/leap/mail/smtp/__init__.py
index e69de29..ace79b5 100644
--- a/mail/src/leap/mail/smtp/__init__.py
+++ b/mail/src/leap/mail/smtp/__init__.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+SMTP relay helper function.
+"""
+
+
+from twisted.application import internet, service
+from twisted.internet import reactor
+
+
+from leap import soledad
+from leap.common.keymanager import KeyManager
+from leap.mail.smtp.smtprelay import SMTPFactory
+
+
+def setup_smtp_relay(port, keymanager, smtp_host, smtp_port, smtp_username,
+ smtp_password, encrypted_only):
+ """
+ Setup SMTP relay to run with Twisted.
+
+ This function sets up the SMTP relay configuration and the Twisted
+ reactor.
+
+ @param port: The port in which to run the server.
+ @type port: int
+ @param keymanager: A Key Manager from where to get recipients' public
+ keys.
+ @type keymanager: leap.common.keymanager.KeyManager
+ @param smtp_host: The hostname of the remote SMTP server.
+ @type smtp_host: str
+ @param smtp_port: The port of the remote SMTP server.
+ @type smtp_port: int
+ @param smtp_username: The username used to connect to remote SMTP server.
+ @type smtp_username: str
+ @param smtp_password: The password used to connect to remote SMTP server.
+ @type smtp_password: str
+ @param encrypted_only: Whether the SMTP relay should send unencrypted mail
+ or not.
+ @type encrypted_only: bool
+ """
+ # The configuration for the SMTP relay is a dict with the following
+ # format:
+ #
+ # {
+ # 'host': '<host>',
+ # 'port': <int>,
+ # 'username': '<username>',
+ # 'password': '<password>',
+ # 'encrypted_only': <True/False>
+ # }
+ config = {
+ 'host': smtp_host,
+ 'port': smtp_port,
+ 'username': smtp_username,
+ 'password': smtp_password,
+ 'encrypted_only': encrypted_only
+ }
+
+ # configure the use of this service with twistd
+ factory = SMTPFactory(keymanager, config)
+ reactor.listenTCP(port, factory)
diff --git a/mail/src/leap/mail/smtp/smtprelay.py b/mail/src/leap/mail/smtp/smtprelay.py
index 6479873..d87dc87 100644
--- a/mail/src/leap/mail/smtp/smtprelay.py
+++ b/mail/src/leap/mail/smtp/smtprelay.py
@@ -1,42 +1,182 @@
+# -*- coding: utf-8 -*-
+# smtprelay.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
"""
LEAP SMTP encrypted relay.
"""
import re
+import os
import gnupg
+import tempfile
+
+
from zope.interface import implements
from StringIO import StringIO
from twisted.mail import smtp
from twisted.internet.protocol import ServerFactory
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.application import internet, service
from twisted.python import log
from email.Header import Header
-from leap import soledad
+from email.utils import parseaddr
+from email.parser import Parser
+
+
+from leap.common.check import leap_assert, leap_assert_type
+from leap.common.keymanager import KeyManager
+from leap.common.keymanager.openpgp import (
+ OpenPGPKey,
+ encrypt_asym,
+ sign,
+)
+from leap.common.keymanager.errors import KeyNotFound
+
+#
+# Exceptions
+#
-class SMTPInfoNotAvailable(Exception):
+class MalformedConfig(Exception):
+ """
+ Raised when the configuration dictionary passed as parameter is malformed.
+ """
pass
+#
+# Helper utilities
+#
+
+HOST_KEY = 'host'
+PORT_KEY = 'port'
+USERNAME_KEY = 'username'
+PASSWORD_KEY = 'password'
+ENCRYPTED_ONLY_KEY = 'encrypted_only'
+
+
+def assert_config_structure(config):
+ """
+ Assert that C{config} is a dict with the following structure:
+
+ {
+ HOST_KEY: '<str>',
+ PORT_KEY: <int>,
+ USERNAME_KEY: '<str>',
+ PASSWORD_KEY: '<str>',
+ ENCRYPTED_ONLY_KEY: <bool>,
+ }
+
+ @param config: The dictionary to check.
+ @type config: dict
+ """
+ # assert smtp config structure is valid
+ leap_assert_type(config, dict)
+ leap_assert(HOST_KEY in config)
+ leap_assert_type(config[HOST_KEY], str)
+ leap_assert(PORT_KEY in config)
+ leap_assert_type(config[PORT_KEY], int)
+ leap_assert(USERNAME_KEY in config)
+ leap_assert_type(config[USERNAME_KEY], str)
+ leap_assert(PASSWORD_KEY in config)
+ leap_assert_type(config[PASSWORD_KEY], str)
+ leap_assert(ENCRYPTED_ONLY_KEY in config)
+ leap_assert_type(config[ENCRYPTED_ONLY_KEY], bool)
+ # assert received params are not empty
+ leap_assert(config[HOST_KEY] != '')
+ leap_assert(config[PORT_KEY] is not 0)
+ leap_assert(config[USERNAME_KEY] != '')
+ leap_assert(config[PASSWORD_KEY] != '')
+
+
+def validate_address(address):
+ """
+ Validate C{address} as defined in RFC 2822.
+
+ @param address: The address to be validated.
+ @type address: str
+
+ @return: A valid address.
+ @rtype: str
+
+ @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid.
+ """
+ leap_assert_type(address, str)
+ # the following parses the address as described in RFC 2822 and
+ # returns ('', '') if the parse fails.
+ _, address = parseaddr(address)
+ if address == '':
+ raise smtp.SMTPBadRcpt(address)
+ return address
+
+
+#
+# SMTPFactory
+#
+
class SMTPFactory(ServerFactory):
"""
Factory for an SMTP server with encrypted relaying capabilities.
"""
- def __init__(self, soledad, gpg=None):
- self._soledad = soledad
- self._gpg = gpg
+ def __init__(self, keymanager, config):
+ """
+ Initialize the SMTP factory.
+
+ @param keymanager: A KeyManager for retrieving recipient's keys.
+ @type keymanager: leap.common.keymanager.KeyManager
+ @param config: A dictionary with smtp configuration. Should have
+ the following structure:
+ {
+ HOST_KEY: '<str>',
+ PORT_KEY: <int>,
+ USERNAME_KEY: '<str>',
+ PASSWORD_KEY: '<str>',
+ ENCRYPTED_ONLY_KEY: <bool>,
+ }
+ @type config: dict
+ """
+ # assert params
+ leap_assert_type(keymanager, KeyManager)
+ assert_config_structure(config)
+ # and store them
+ self._km = keymanager
+ self._config = config
def buildProtocol(self, addr):
- "Return a protocol suitable for the job."
- # TODO: use ESMTP here.
- smtpProtocol = smtp.SMTP(SMTPDelivery(self._soledad, self._gpg))
+ """
+ Return a protocol suitable for the job.
+
+ @param addr: An address, e.g. a TCP (host, port).
+ @type addr: twisted.internet.interfaces.IAddress
+
+ @return: The protocol.
+ @rtype: SMTPDelivery
+ """
+ # If needed, we might use ESMTPDelivery here instead.
+ smtpProtocol = smtp.SMTP(SMTPDelivery(self._km, self._config))
smtpProtocol.factory = self
return smtpProtocol
+#
+# SMTPDelivery
+#
+
class SMTPDelivery(object):
"""
Validate email addresses and handle message delivery.
@@ -44,14 +184,47 @@ class SMTPDelivery(object):
implements(smtp.IMessageDelivery)
- def __init__(self, soledad, gpg=None):
- self._soledad = soledad
- if gpg:
- self._gpg = gpg
- else:
- self._gpg = GPGWrapper()
+ def __init__(self, keymanager, config):
+ """
+ Initialize the SMTP delivery object.
+
+ @param keymanager: A KeyManager for retrieving recipient's keys.
+ @type keymanager: leap.common.keymanager.KeyManager
+ @param config: A dictionary with smtp configuration. Should have
+ the following structure:
+ {
+ HOST_KEY: '<str>',
+ PORT_KEY: <int>,
+ USERNAME_KEY: '<str>',
+ PASSWORD_KEY: '<str>',
+ ENCRYPTED_ONLY_KEY: <bool>,
+ }
+ @type config: dict
+ """
+ # assert params
+ leap_assert_type(keymanager, KeyManager)
+ assert_config_structure(config)
+ # and store them
+ self._km = keymanager
+ self._config = config
+ self._origin = None
def receivedHeader(self, helo, origin, recipients):
+ """
+ Generate the 'Received:' header for a message.
+
+ @param helo: The argument to the HELO command and the client's IP
+ address.
+ @type helo: (str, str)
+ @param origin: The address the message is from.
+ @type origin: twisted.mail.smtp.Address
+ @param recipients: A list of the addresses for which this message is
+ bound.
+ @type: list of twisted.mail.smtp.User
+
+ @return: The full "Received" header string.
+ @type: str
+ """
myHostname, clientIP = helo
headerValue = "by %s from %s with ESMTP ; %s" % (
myHostname, clientIP, smtp.rfc822date())
@@ -59,188 +232,264 @@ class SMTPDelivery(object):
return "Received: %s" % Header(headerValue)
def validateTo(self, user):
- """Assert existence of and trust on recipient's GPG public key."""
+ """
+ Validate the address of C{user}, a recipient of the message.
+
+ This method is called once for each recipient and validates the
+ C{user}'s address against the RFC 2822 definition. If the
+ configuration option ENCRYPTED_ONLY_KEY is True, it also asserts the
+ existence of the user's key.
+
+ In the end, it returns an encrypted message object that is able to
+ send itself to the C{user}'s address.
+
+ @param user: The user whose address we wish to validate.
+ @type: twisted.mail.smtp.User
+
+ @return: A Deferred which becomes, or a callable which takes no
+ arguments and returns an object implementing IMessage. This will
+ be called and the returned object used to deliver the message when
+ it arrives.
+ @rtype: no-argument callable
+
+ @raise SMTPBadRcpt: Raised if messages to the address are not to be
+ accepted.
+ """
# try to find recipient's public key
try:
- # this will raise an exception if key is not found
- trust = self._gpg.find_key(user.dest.addrstr)['trust']
- # if key is not ultimatelly trusted, then the message will not
- # be encrypted. So, we check for this below
- #if trust != 'u':
- # raise smtp.SMTPBadRcpt(user)
+ address = validate_address(user.dest.addrstr)
+ pubkey = self._km.get_key(address, OpenPGPKey)
log.msg("Accepting mail for %s..." % user.dest)
- return lambda: EncryptedMessage(user, soledad=self._soledad,
- gpg=self._gpg)
- except LookupError:
+ except KeyNotFound:
# if key was not found, check config to see if will send anyway.
- if self.encrypted_only:
- raise smtp.SMTPBadRcpt(user)
- # TODO: send signal to cli/gui that this user's key was not found?
+ if self._config[ENCRYPTED_ONLY_KEY]:
+ raise smtp.SMTPBadRcpt(user.dest.addrstr)
log.msg("Warning: will send an unencrypted message (because "
"encrypted_only' is set to False).")
+ return lambda: EncryptedMessage(
+ self._origin, user, self._km, self._config)
+
+ def validateFrom(self, helo, origin):
+ """
+ Validate the address from which the message originates.
+
+ @param helo: The argument to the HELO command and the client's IP
+ address.
+ @type: (str, str)
+ @param origin: The address the message is from.
+ @type origin: twisted.mail.smtp.Address
+
+ @return: origin or a Deferred whose callback will be passed origin.
+ @rtype: Deferred or Address
- def validateFrom(self, helo, originAddress):
+ @raise twisted.mail.smtp.SMTPBadSender: Raised if messages from this
+ address are not to be accepted.
+ """
# accept mail from anywhere. To reject an address, raise
# smtp.SMTPBadSender here.
- return originAddress
+ self._origin = origin
+ return origin
+
+#
+# EncryptedMessage
+#
-class EncryptedMessage():
+class EncryptedMessage(object):
"""
Receive plaintext from client, encrypt it and send message to a
recipient.
"""
implements(smtp.IMessage)
- SMTP_HOSTNAME = "mail.leap.se"
- SMTP_PORT = 25
-
- def __init__(self, user, soledad, gpg=None):
- self.user = user
- self._soledad = soledad
- self.fetchConfig()
+ def __init__(self, fromAddress, user, keymanager, config):
+ """
+ Initialize the encrypted message.
+
+ @param fromAddress: The address of the sender.
+ @type fromAddress: twisted.mail.smtp.Address
+ @param user: The recipient of this message.
+ @type user: twisted.mail.smtp.User
+ @param keymanager: A KeyManager for retrieving recipient's keys.
+ @type keymanager: leap.common.keymanager.KeyManager
+ @param config: A dictionary with smtp configuration. Should have
+ the following structure:
+ {
+ HOST_KEY: '<str>',
+ PORT_KEY: <int>,
+ USERNAME_KEY: '<str>',
+ PASSWORD_KEY: '<str>',
+ ENCRYPTED_ONLY_KEY: <bool>,
+ }
+ @type config: dict
+ """
+ # assert params
+ leap_assert_type(user, smtp.User)
+ leap_assert_type(keymanager, KeyManager)
+ assert_config_structure(config)
+ # and store them
+ self._fromAddress = fromAddress
+ self._user = user
+ self._km = keymanager
+ self._config = config
+ # initialize list for message's lines
self.lines = []
- if gpg:
- self._gpg = gpg
- else:
- self._gpg = GPGWrapper()
def lineReceived(self, line):
- """Store email DATA lines as they arrive."""
+ """
+ Handle another line.
+
+ @param line: The received line.
+ @type line: str
+ """
self.lines.append(line)
def eomReceived(self):
- """Encrypt and send message."""
+ """
+ Handle end of message.
+
+ This method will encrypt and send the message.
+ """
log.msg("Message data complete.")
self.lines.append('') # add a trailing newline
self.parseMessage()
try:
- self.encrypt()
+ self._encrypt_and_sign()
return self.sendMessage()
- except LookupError:
+ except KeyNotFound:
return None
def parseMessage(self):
- """Separate message headers from body."""
- sep = self.lines.index('')
- self.headers = self.lines[:sep]
- self.body = self.lines[sep + 1:]
+ """
+ Separate message headers from body.
+ """
+ parser = Parser()
+ self._message = parser.parsestr('\r\n'.join(self.lines))
def connectionLost(self):
+ """
+ Log an error when the connection is lost.
+ """
log.msg("Connection lost unexpectedly!")
log.err()
# unexpected loss of connection; don't save
self.lines = []
def sendSuccess(self, r):
+ """
+ Callback for a successful send.
+
+ @param r: The result from the last previous callback in the chain.
+ @type r: anything
+ """
log.msg(r)
def sendError(self, e):
+ """
+ Callback for an unsuccessfull send.
+
+ @param e: The result from the last errback.
+ @type e: anything
+ """
log.msg(e)
log.err()
- def prepareHeader(self):
- self.headers.insert(1, "From: %s" % self.user.orig.addrstr)
- self.headers.insert(2, "To: %s" % self.user.dest.addrstr)
- self.headers.append('')
-
def sendMessage(self):
- self.prepareHeader()
- msg = '\n'.join(self.headers + [self.cyphertext])
+ """
+ Send the message.
+
+ This method will prepare the message (headers and possibly encrypted
+ body) and send it using the ESMTPSenderFactory.
+
+ @return: A deferred with callbacks for error and success of this
+ message send.
+ @rtype: twisted.internet.defer.Deferred
+ """
+ msg = self._message.as_string(False)
d = defer.Deferred()
- factory = smtp.ESMTPSenderFactory(self.smtp_username,
- self.smtp_password,
- self.smtp_username,
- self.user.dest.addrstr,
- StringIO(msg),
- d)
- # the next call is TSL-powered!
- reactor.connectTCP(self.SMTP_HOSTNAME, self.SMTP_PORT, factory)
+ factory = smtp.ESMTPSenderFactory(
+ self._config[USERNAME_KEY],
+ self._config[PASSWORD_KEY],
+ self._fromAddress.addrstr,
+ self._user.dest.addrstr,
+ StringIO(msg),
+ d,
+ requireAuthentication=False, # for now do unauth, see issue #2474
+ )
+ # TODO: Change this to connectSSL when cert auth is in place in the platform
+ reactor.connectTCP(
+ self._config[HOST_KEY],
+ self._config[PORT_KEY],
+ factory
+ )
d.addCallback(self.sendSuccess)
d.addErrback(self.sendError)
return d
- def encrypt(self, always_trust=True):
- # TODO: do not "always trust" here.
- try:
- fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint']
- log.msg("Encrypting to %s" % fp)
- self.cyphertext = str(
- self._gpg.encrypt(
- '\n'.join(self.body), [fp], always_trust=always_trust))
- except LookupError:
- if self.encrypted_only:
- raise
- log.msg("Warning: sending unencrypted mail (because "
- "'encrypted_only' is set to False).")
-
- # this will be replaced by some other mechanism of obtaining credentials
- # for SMTP server.
- def fetchConfig(self):
- # TODO: Soledad/LEAP bootstrap should store the SMTP info on local db,
- # so this relay can load it when it needs.
- if not self._soledad:
- # TODO: uncomment below exception when integration with Soledad is
- # smooth.
- #raise SMTPInfoNotAvailable()
- # TODO: remove dummy settings below when soledad bootstrap is
- # working.
- self.smtp_host = ''
- self.smtp_port = ''
- self.smtp_username = ''
- self.smtp_password = ''
- self.encrypted_only = True
+ def _encrypt_and_sign_payload_rec(self, message, pubkey, signkey):
+ """
+ Recursivelly descend in C{message}'s payload encrypting to C{pubkey}
+ and signing with C{signkey}.
+
+ @param message: The message whose payload we want to encrypt.
+ @type message: email.message.Message
+ @param pubkey: The public key used to encrypt the message.
+ @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey
+ @param signkey: The private key used to sign the message.
+ @type signkey: leap.common.keymanager.openpgp.OpenPGPKey
+ """
+ if message.is_multipart() is False:
+ message.set_payload(
+ encrypt_asym(
+ message.get_payload(), pubkey, sign=signkey))
else:
- self.smtp_config = self._soledad.get_doc('smtp_relay_config')
- for confname in [
- 'smtp_host', 'smtp_port', 'smtp_username',
- 'smtp_password', 'encrypted_only',
- ]:
- setattr(self, confname, doc.content[confname])
-
-
-class GPGWrapper():
- """
- This is a temporary class for handling GPG requests, and should be
- replaced by a more general class used throughout the project.
- """
-
- GNUPG_HOME = "~/.config/leap/gnupg"
- GNUPG_BINARY = "/usr/bin/gpg" # TODO: change this based on OS
-
- def __init__(self, gpghome=GNUPG_HOME, gpgbinary=GNUPG_BINARY):
- self.gpg = gnupg.GPG(gnupghome=gpghome, gpgbinary=gpgbinary)
+ for msg in message.get_payload():
+ self._encrypt_and_sign_payload_rec(msg, pubkey, signkey)
- def find_key(self, email):
+ def _sign_payload_rec(self, message, signkey):
"""
- Find user's key based on their email.
+ Recursivelly descend in C{message}'s payload signing with C{signkey}.
+
+ @param message: The message whose payload we want to encrypt.
+ @type message: email.message.Message
+ @param pubkey: The public key used to encrypt the message.
+ @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey
+ @param signkey: The private key used to sign the message.
+ @type signkey: leap.common.keymanager.openpgp.OpenPGPKey
"""
- for key in self.gpg.list_keys():
- for uid in key['uids']:
- if re.search(email, uid):
- return key
- raise LookupError("GnuPG public key for %s not found!" % email)
-
- def encrypt(self, data, recipient, always_trust=True):
- # TODO: do not 'always_trust'.
- return self.gpg.encrypt(data, recipient, always_trust=always_trust)
-
- def decrypt(self, data):
- return self.gpg.decrypt(data)
-
- def import_keys(self, data):
- return self.gpg.import_keys(data)
-
+ if message.is_multipart() is False:
+ message.set_payload(
+ sign(
+ message.get_payload(), signkey))
+ else:
+ for msg in message.get_payload():
+ self._sign_payload_rec(msg, signkey)
-# service configuration
-port = 25
-user_email = 'user@leap.se' # TODO: replace for real mail from gui/cli
+ def _encrypt_and_sign(self):
+ """
+ Encrypt the message body.
-# instantiate soledad for client app storage and sync
-s = soledad.Soledad(user_email)
-factory = SMTPFactory(s)
+ Fetch the recipient key and encrypt the content to the
+ recipient. If a key is not found, then the behaviour depends on the
+ configuration parameter ENCRYPTED_ONLY_KEY. If it is False, the message
+ is sent unencrypted and a warning is logged. If it is True, the
+ encryption fails with a KeyNotFound exception.
-# enable the use of this service with twistd
-application = service.Application("LEAP SMTP Relay")
-service = internet.TCPServer(port, factory)
-service.setServiceParent(application)
+ @raise KeyNotFound: Raised when the recipient key was not found and
+ the ENCRYPTED_ONLY_KEY configuration parameter is set to True.
+ """
+ from_address = validate_address(self._fromAddress.addrstr)
+ signkey = self._km.get_key(from_address, OpenPGPKey, private=True)
+ log.msg("Will sign the message with %s." % signkey.fingerprint)
+ to_address = validate_address(self._user.dest.addrstr)
+ try:
+ # try to get the recipient pubkey
+ pubkey = self._km.get_key(to_address, OpenPGPKey)
+ log.msg("Will encrypt the message to %s." % pubkey.fingerprint)
+ self._encrypt_and_sign_payload_rec(self._message, pubkey, signkey)
+ except KeyNotFound:
+ # at this point we _can_ send unencrypted mail, because if the
+ # configuration said the opposite the address would have been
+ # rejected in SMTPDelivery.validateTo().
+ self._sign_payload_rec(self._message, signkey)
+ log.msg('Will send unencrypted message to %s.' % to_address)
diff --git a/mail/src/leap/mail/smtp/tests/test_smtprelay.py b/mail/src/leap/mail/smtp/tests/test_smtprelay.py
deleted file mode 100644
index eaa4d04..0000000
--- a/mail/src/leap/mail/smtp/tests/test_smtprelay.py
+++ /dev/null
@@ -1,78 +0,0 @@
-from datetime import datetime
-import re
-from leap.email.smtp.smtprelay import (
- SMTPFactory,
- #SMTPDelivery, # an object
- EncryptedMessage,
-)
-from leap.email.smtp import tests
-from twisted.test import proto_helpers
-from twisted.mail.smtp import User
-
-
-# some regexps
-IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \
- "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"
-HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \
- "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])"
-IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
-
-
-class TestSmtpRelay(tests.OpenPGPTestCase):
-
- EMAIL_DATA = ['HELO relay.leap.se',
- 'MAIL FROM: <user@leap.se>',
- 'RCPT TO: <leap@leap.se>',
- 'DATA',
- 'From: User <user@leap.se>',
- 'To: Leap <leap@leap.se>',
- 'Date: ' + datetime.now().strftime('%c'),
- 'Subject: test message',
- '',
- 'This is a secret message.',
- 'Yours,',
- 'A.',
- '',
- '.',
- 'QUIT']
-
- def assertMatch(self, string, pattern, msg=None):
- if not re.match(pattern, string):
- msg = self._formatMessage(msg, '"%s" does not match pattern "%s".'
- % (string, pattern))
- raise self.failureException(msg)
-
- def test_relay_accepts_valid_email(self):
- """
- Test if SMTP server responds correctly for valid interaction.
- """
-
- SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX +
- ' NO UCE NO UBE NO RELAY PROBES',
- '250 ' + IP_OR_HOST_REGEX + ' Hello ' +
- IP_OR_HOST_REGEX + ', nice to meet you',
- '250 Sender address accepted',
- '250 Recipient address accepted',
- '354 Continue']
- proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0))
- transport = proto_helpers.StringTransport()
- proto.makeConnection(transport)
- for i, line in enumerate(self.EMAIL_DATA):
- proto.lineReceived(line + '\r\n')
- self.assertMatch(transport.value(),
- '\r\n'.join(SMTP_ANSWERS[0:i + 1]))
- proto.setTimeout(None)
-
- def test_message_encrypt(self):
- """
- Test if message gets encrypted to destination email.
- """
- proto = SMTPFactory(None, self._gpg).buildProtocol(('127.0.0.1', 0))
- user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se')
- m = EncryptedMessage(user, None, self._gpg)
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- m.parseMessage()
- m.encrypt()
- decrypted = str(self._gpg.decrypt(m.cyphertext))
- self.assertEqual('\n'.join(self.EMAIL_DATA[9:12]), decrypted)
diff --git a/mail/src/leap/mail/smtp/tests/185CA770.key b/mail/src/leap/mail/tests/smtp/185CA770.key
index 587b416..587b416 100644
--- a/mail/src/leap/mail/smtp/tests/185CA770.key
+++ b/mail/src/leap/mail/tests/smtp/185CA770.key
diff --git a/mail/src/leap/mail/smtp/tests/185CA770.pub b/mail/src/leap/mail/tests/smtp/185CA770.pub
index 38af19f..38af19f 100644
--- a/mail/src/leap/mail/smtp/tests/185CA770.pub
+++ b/mail/src/leap/mail/tests/smtp/185CA770.pub
diff --git a/mail/src/leap/mail/smtp/tests/__init__.py b/mail/src/leap/mail/tests/smtp/__init__.py
index d7b942a..c69c34f 100644
--- a/mail/src/leap/mail/smtp/tests/__init__.py
+++ b/mail/src/leap/mail/tests/smtp/__init__.py
@@ -1,17 +1,49 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Base classes and keys for SMTP relay tests.
+"""
+
import os
import shutil
import tempfile
+from mock import Mock
+
from twisted.trial import unittest
+
+from leap.soledad import Soledad
+from leap.soledad.crypto import SoledadCrypto
+from leap.common.keymanager import (
+ KeyManager,
+ openpgp,
+)
+
+
from leap.common.testing.basetest import BaseLeapTest
-from leap.mail.smtp.smtprelay import GPGWrapper
-class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):
+class TestCaseWithKeyManager(BaseLeapTest):
def setUp(self):
- # mimic LeapBaseTest.setUpClass behaviour, because this is deprecated
+ # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated
# in Twisted: http://twistedmatrix.com/trac/ticket/1870
self.old_path = os.environ['PATH']
self.old_home = os.environ['HOME']
@@ -22,18 +54,46 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):
'bin')
os.environ["PATH"] = bin_tdir
os.environ["HOME"] = self.tempdir
+
# setup our own stuff
- self.gnupg_home = self.tempdir + '/gnupg'
- os.mkdir(self.gnupg_home)
- self.email = 'leap@leap.se'
- self._gpg = GPGWrapper(gpghome=self.gnupg_home)
-
- self.assertEqual(self._gpg.import_keys(PUBLIC_KEY).summary(),
- '1 imported', "error importing public key")
- self.assertEqual(self._gpg.import_keys(PRIVATE_KEY).summary(),
- # note that gnupg does not return a successful import
- # for private keys. Bug?
- '0 imported', "error importing private key")
+ address = 'leap@leap.se' # user's address in the form user@provider
+ uuid = 'leap@leap.se'
+ passphrase = '123'
+ secret_path = os.path.join(self.tempdir, 'secret.gpg')
+ local_db_path = os.path.join(self.tempdir, 'soledad.u1db')
+ server_url = 'http://provider/'
+ cert_file = ''
+
+ # mock key fetching and storing so Soledad doesn't fail when trying to
+ # reach the server.
+ Soledad._fetch_keys_from_shared_db = Mock(return_value=None)
+ Soledad._assert_keys_in_shared_db = Mock(return_value=None)
+
+ # instantiate soledad
+ self._soledad = Soledad(
+ uuid,
+ passphrase,
+ secret_path,
+ local_db_path,
+ server_url,
+ cert_file,
+ )
+
+ self._config = {
+ 'host': 'http://provider/',
+ 'port': 25,
+ 'username': address,
+ 'password': '<password>',
+ 'encrypted_only': True
+ }
+
+ nickserver_url = '' # the url of the nickserver
+ self._km = KeyManager(address, nickserver_url, self._soledad)
+
+ # insert test keys in key manager.
+ pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY_2)
def tearDown(self):
# mimic LeapBaseTest.tearDownClass behaviour
@@ -43,19 +103,12 @@ class OpenPGPTestCase(unittest.TestCase, BaseLeapTest):
assert self.tempdir.startswith('/tmp/leap_tests-')
shutil.rmtree(self.tempdir)
- def test_openpgp_encrypt_decrypt(self):
- "Test if openpgp can encrypt and decrypt."
- text = "simple raw text"
- encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT,
- # TODO: handle always trust issue
- always_trust=True))
- self.assertNotEqual(text, encrypted, "failed encrypting text")
- decrypted = str(self._gpg.decrypt(encrypted))
- self.assertEqual(text, decrypted, "failed decrypting text")
-
# Key material for testing
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
+
+ADDRESS = 'leap@leap.se'
+
PUBLIC_KEY = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)
@@ -109,6 +162,7 @@ ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX
=MuOY
-----END PGP PUBLIC KEY BLOCK-----
"""
+
PRIVATE_KEY = """
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)
@@ -216,3 +270,64 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=
=JTFu
-----END PGP PRIVATE KEY BLOCK-----
"""
+
+ADDRESS_2 = 'anotheruser@leap.se'
+
+PUBLIC_KEY_2 = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR
+gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq
+Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0
+IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle
+AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E
+gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw
+ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4
+JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz
+VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt
+Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63
+yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ
+f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X
+Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck
+I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ=
+=Thdu
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+
+PRIVATE_KEY_2 = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD
+kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1
+6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB
+AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8
+H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks
+7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X
+C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje
+uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty
+GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI
+1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v
+dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG
+CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh
+8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD
+izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT
+oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL
+juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw
+cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe
+94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC
+rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx
+77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2
+3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF
+UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO
+2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB
+/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE
+JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda
+z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk
+o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6
+THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0
+=a5gs
+-----END PGP PRIVATE KEY BLOCK-----
+"""
+
diff --git a/mail/src/leap/mail/smtp/tests/mail.txt b/mail/src/leap/mail/tests/smtp/mail.txt
index 9542047..9542047 100644
--- a/mail/src/leap/mail/smtp/tests/mail.txt
+++ b/mail/src/leap/mail/tests/smtp/mail.txt
diff --git a/mail/src/leap/mail/tests/smtp/test_smtprelay.py b/mail/src/leap/mail/tests/smtp/test_smtprelay.py
new file mode 100644
index 0000000..e48f129
--- /dev/null
+++ b/mail/src/leap/mail/tests/smtp/test_smtprelay.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+# test_smtprelay.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+SMTP relay tests.
+"""
+
+
+import re
+
+
+from datetime import datetime
+from twisted.test import proto_helpers
+from twisted.mail.smtp import (
+ User,
+ Address,
+ SMTPBadRcpt,
+)
+from mock import Mock
+
+
+from leap.mail.smtp.smtprelay import (
+ SMTPFactory,
+ EncryptedMessage,
+)
+from leap.mail.tests.smtp import (
+ TestCaseWithKeyManager,
+ ADDRESS,
+ ADDRESS_2,
+)
+from leap.common.keymanager import openpgp
+
+
+# some regexps
+IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \
+ "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"
+HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \
+ "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])"
+IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
+
+
+class TestSmtpRelay(TestCaseWithKeyManager):
+
+ EMAIL_DATA = ['HELO relay.leap.se',
+ 'MAIL FROM: <%s>' % ADDRESS_2,
+ 'RCPT TO: <%s>' % ADDRESS,
+ 'DATA',
+ 'From: User <%s>' % ADDRESS_2,
+ 'To: Leap <%s>' % ADDRESS,
+ 'Date: ' + datetime.now().strftime('%c'),
+ 'Subject: test message',
+ '',
+ 'This is a secret message.',
+ 'Yours,',
+ 'A.',
+ '',
+ '.',
+ 'QUIT']
+
+ def assertMatch(self, string, pattern, msg=None):
+ if not re.match(pattern, string):
+ msg = self._formatMessage(msg, '"%s" does not match pattern "%s".'
+ % (string, pattern))
+ raise self.failureException(msg)
+
+ def test_openpgp_encrypt_decrypt(self):
+ "Test if openpgp can encrypt and decrypt."
+ text = "simple raw text"
+ pubkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=False)
+ encrypted = openpgp.encrypt_asym(text, pubkey)
+ self.assertNotEqual(
+ text, encrypted, "Ciphertext is equal to plaintext.")
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ decrypted = openpgp.decrypt_asym(encrypted, privkey)
+ self.assertEqual(text, decrypted,
+ "Decrypted text differs from plaintext.")
+
+ def test_relay_accepts_valid_email(self):
+ """
+ Test if SMTP server responds correctly for valid interaction.
+ """
+
+ SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX +
+ ' NO UCE NO UBE NO RELAY PROBES',
+ '250 ' + IP_OR_HOST_REGEX + ' Hello ' +
+ IP_OR_HOST_REGEX + ', nice to meet you',
+ '250 Sender address accepted',
+ '250 Recipient address accepted',
+ '354 Continue']
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ transport = proto_helpers.StringTransport()
+ proto.makeConnection(transport)
+ for i, line in enumerate(self.EMAIL_DATA):
+ proto.lineReceived(line + '\r\n')
+ self.assertMatch(transport.value(),
+ '\r\n'.join(SMTP_ANSWERS[0:i + 1]),
+ 'Did not get expected answer from relay.')
+ proto.setTimeout(None)
+
+ def test_message_encrypt(self):
+ """
+ Test if message gets encrypted to destination email.
+ """
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ fromAddr = Address(ADDRESS_2)
+ dest = User(ADDRESS, 'relay.leap.se', proto, ADDRESS)
+ m = EncryptedMessage(fromAddr, dest, self._km, self._config)
+ for line in self.EMAIL_DATA[4:12]:
+ m.lineReceived(line)
+ m.eomReceived()
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey)
+ self.assertEqual(
+ '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ def test_message_encrypt_sign(self):
+ """
+ Test if message gets encrypted to destination email and signed with
+ sender key.
+ """
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ user = User(ADDRESS, 'relay.leap.se', proto, ADDRESS)
+ fromAddr = Address(ADDRESS_2)
+ m = EncryptedMessage(fromAddr, user, self._km, self._config)
+ for line in self.EMAIL_DATA[4:12]:
+ m.lineReceived(line)
+ # trigger encryption and signing
+ m.eomReceived()
+ # decrypt and verify
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
+ decrypted = openpgp.decrypt_asym(
+ m._message.get_payload(), privkey, verify=pubkey)
+ self.assertEqual(
+ '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ def test_message_sign(self):
+ """
+ Test if message is signed with sender key.
+ """
+ # mock the key fetching
+ self._km.fetch_keys_from_server = Mock(return_value=[])
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ user = User('ihavenopubkey@nonleap.se', 'relay.leap.se', proto, ADDRESS)
+ fromAddr = Address(ADDRESS_2)
+ m = EncryptedMessage(fromAddr, user, self._km, self._config)
+ for line in self.EMAIL_DATA[4:12]:
+ m.lineReceived(line)
+ # trigger signing
+ m.eomReceived()
+ # assert content of message
+ self.assertTrue(
+ m._message.get_payload().startswith(
+ '-----BEGIN PGP SIGNED MESSAGE-----\n' +
+ 'Hash: SHA1\n\n' +
+ ('\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n' +
+ '-----BEGIN PGP SIGNATURE-----\n')),
+ 'Message does not start with signature header.')
+ self.assertTrue(
+ m._message.get_payload().endswith(
+ '-----END PGP SIGNATURE-----\n'),
+ 'Message does not end with signature footer.')
+ # assert signature is valid
+ pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
+ self.assertTrue(
+ openpgp.verify(m._message.get_payload(), pubkey),
+ 'Signature could not be verified.')
+
+ def test_missing_key_rejects_address(self):
+ """
+ Test if server rejects to send unencrypted when 'encrypted_only' is
+ True.
+ """
+ # remove key from key manager
+ pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
+ pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp.delete_key(pubkey)
+ # mock the key fetching
+ self._km.fetch_keys_from_server = Mock(return_value=[])
+ # prepare the SMTP factory
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ transport = proto_helpers.StringTransport()
+ proto.makeConnection(transport)
+ proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
+ proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
+ proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
+ # ensure the address was rejected
+ lines = transport.value().rstrip().split('\n')
+ self.assertEqual(
+ '550 Cannot receive for specified address',
+ lines[-1],
+ 'Address should have been rejecetd with appropriate message.')
+
+ def test_missing_key_accepts_address(self):
+ """
+ Test if server accepts to send unencrypted when 'encrypted_only' is
+ False.
+ """
+ # remove key from key manager
+ pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
+ pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp.delete_key(pubkey)
+ # mock the key fetching
+ self._km.fetch_keys_from_server = Mock(return_value=[])
+ # change the configuration
+ self._config['encrypted_only'] = False
+ # prepare the SMTP factory
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ transport = proto_helpers.StringTransport()
+ proto.makeConnection(transport)
+ proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
+ proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
+ proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
+ # ensure the address was accepted
+ lines = transport.value().rstrip().split('\n')
+ self.assertEqual(
+ '250 Recipient address accepted',
+ lines[-1],
+ 'Address should have been accepted with appropriate message.')