summaryrefslogtreecommitdiff
path: root/mail/src/leap
diff options
context:
space:
mode:
authorDuda Dornelles <ddornell@thoughtworks.com>2014-11-12 18:27:46 -0200
committerDuda Dornelles <ddornell@thoughtworks.com>2014-11-17 14:32:13 -0200
commita6026b347db8cb56b56ba61a635b5a56e5107841 (patch)
tree8220d9b0abb465b9607e8c8bc7bc7bbb9cb0eb08 /mail/src/leap
parent03283d27d972429886583b2a4902c4887b1849b5 (diff)
Moving encrypt, sign and send logic from gateway (SMTP) to a MailService
Diffstat (limited to 'mail/src/leap')
-rw-r--r--mail/src/leap/mail/service.py384
-rw-r--r--mail/src/leap/mail/smtp/__init__.py6
-rw-r--r--mail/src/leap/mail/smtp/gateway.py458
-rw-r--r--mail/src/leap/mail/smtp/tests/__init__.py0
-rw-r--r--mail/src/leap/mail/smtp/tests/test_gateway.py185
-rw-r--r--mail/src/leap/mail/tests/test_service.py185
-rw-r--r--mail/src/leap/mail/utils.py23
7 files changed, 638 insertions, 603 deletions
diff --git a/mail/src/leap/mail/service.py b/mail/src/leap/mail/service.py
new file mode 100644
index 0000000..d595067
--- /dev/null
+++ b/mail/src/leap/mail/service.py
@@ -0,0 +1,384 @@
+# -*- coding: utf-8 -*-
+# service.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import re
+from StringIO import StringIO
+from email.parser import Parser
+from email.mime.application import MIMEApplication
+
+from OpenSSL import SSL
+
+from twisted.mail import smtp
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+from twisted.protocols.amp import ssl
+from twisted.python import log
+
+from leap.common.check import leap_assert_type, leap_assert
+from leap.common.events import proto, signal
+from leap.keymanager import KeyManager
+from leap.keymanager.openpgp import OpenPGPKey
+from leap.keymanager.errors import KeyNotFound
+from leap.mail import __version__
+from leap.mail.utils import validate_address
+from leap.mail.smtp.rfc3156 import MultipartEncrypted
+from leap.mail.smtp.rfc3156 import MultipartSigned
+from leap.mail.smtp.rfc3156 import encode_base64_rec
+from leap.mail.smtp.rfc3156 import RFC3156CompliantGenerator
+from leap.mail.smtp.rfc3156 import PGPSignature
+from leap.mail.smtp.rfc3156 import PGPEncrypted
+
+
+class SSLContextFactory(ssl.ClientContextFactory):
+ def __init__(self, cert, key):
+ self.cert = cert
+ self.key = key
+
+ def getContext(self):
+ self.method = SSL.TLSv1_METHOD # SSLv23_METHOD
+ ctx = ssl.ClientContextFactory.getContext(self)
+ ctx.use_certificate_file(self.cert)
+ ctx.use_privatekey_file(self.key)
+ return ctx
+
+
+class OutgoingMail:
+ """
+ A service for handling encrypted mail.
+ """
+
+ FOOTER_STRING = "I prefer encrypted email"
+
+ def __init__(self, from_address, keymanager, cert, key, host, port):
+ """
+ Initialize the mail service.
+
+ :param from_address: The sender address.
+ :type from_address: str
+ :param keymanager: A KeyManager for retrieving recipient's keys.
+ :type keymanager: leap.common.keymanager.KeyManager
+ :param cert: The client certificate for SSL authentication.
+ :type cert: str
+ :param key: The client private key for SSL authentication.
+ :type key: str
+ :param host: The hostname of the remote SMTP server.
+ :type host: str
+ :param port: The port of the remote SMTP server.
+ :type port: int
+ """
+
+ # XXX: should we keep these checks?
+ # assert params
+ leap_assert_type(keymanager, KeyManager)
+ leap_assert_type(host, str)
+ leap_assert(host != '')
+ leap_assert_type(port, int)
+ leap_assert(port is not 0)
+ leap_assert_type(cert, unicode)
+ leap_assert(cert != '')
+ leap_assert_type(key, unicode)
+ leap_assert(key != '')
+
+ self._port = port
+ self._host = host
+ self._key = key
+ self._cert = cert
+ self._from_address = from_address
+ self._keymanager = keymanager
+
+ def send_message(self, raw, recipient):
+ """
+ Sends a message to a recipient. Maybe encrypts and signs.
+
+ :param raw: The raw message
+ :type raw: str
+ :param recipient: The recipient for the message
+ :type recipient: smtp.User
+ :return: a deferred which delivers the message when fired
+ """
+ d = deferToThread(lambda: self._maybe_encrypt_and_sign(raw, recipient))
+ d.addCallback(self._route_msg)
+ d.addErrback(self.sendError)
+
+ return d
+
+ def sendSuccess(self, smtp_sender_result):
+ """
+ Callback for a successful send.
+
+ :param smtp_sender_result: The result from the ESMTPSender from _route_msg
+ :type smtp_sender_result: tuple(int, list(tuple))
+ """
+ dest_addrstr = smtp_sender_result[1][0][0]
+ log.msg('Message sent to %s' % dest_addrstr)
+ signal(proto.SMTP_SEND_MESSAGE_SUCCESS, dest_addrstr)
+
+ def sendError(self, failure):
+ """
+ Callback for an unsuccessfull send.
+
+ :param e: The result from the last errback.
+ :type e: anything
+ """
+ # XXX: need to get the address from the exception to send signal
+ # signal(proto.SMTP_SEND_MESSAGE_ERROR, self._user.dest.addrstr)
+ err = failure.value
+ log.err(err)
+ raise err
+
+ def _route_msg(self, encrypt_and_sign_result):
+ """
+ Sends the msg using the ESMTPSenderFactory.
+
+ :param encrypt_and_sign_result: A tuple containing the 'maybe' encrypted message and the recipient
+ :type encrypt_and_sign_result: tuple
+ """
+ message, recipient = encrypt_and_sign_result
+ log.msg("Connecting to SMTP server %s:%s" % (self._host, self._port))
+ msg = message.as_string(False)
+
+ # we construct a defer to pass to the ESMTPSenderFactory
+ d = defer.Deferred()
+ d.addCallbacks(self.sendSuccess, self.sendError)
+ # we don't pass an ssl context factory to the ESMTPSenderFactory
+ # because ssl will be handled by reactor.connectSSL() below.
+ factory = smtp.ESMTPSenderFactory(
+ "", # username is blank because client auth is done on SSL protocol level
+ "", # password is blank because client auth is done on SSL protocol level
+ self._from_address,
+ recipient.dest.addrstr,
+ StringIO(msg),
+ d,
+ heloFallback=True,
+ requireAuthentication=False,
+ requireTransportSecurity=True)
+ factory.domain = __version__
+ signal(proto.SMTP_SEND_MESSAGE_START, recipient.dest.addrstr)
+ reactor.connectSSL(
+ self._host, self._port, factory,
+ contextFactory=SSLContextFactory(self._cert, self._key))
+
+
+ def _maybe_encrypt_and_sign(self, raw, recipient):
+ """
+ Attempt to encrypt and sign the outgoing message.
+
+ The behaviour of this method depends on:
+
+ 1. the original message's content-type, and
+ 2. the availability of the recipient's public key.
+
+ If the original message's content-type is "multipart/encrypted", then
+ the original message is not altered. For any other content-type, the
+ method attempts to fetch the recipient's public key. If the
+ recipient's public key is available, the message is encrypted and
+ signed; otherwise it is only signed.
+
+ Note that, if the C{encrypted_only} configuration is set to True and
+ the recipient's public key is not available, then the recipient
+ address would have been rejected in SMTPDelivery.validateTo().
+
+ The following table summarizes the overall behaviour of the gateway:
+
+ +---------------------------------------------------+----------------+
+ | content-type | rcpt pubkey | enforce encr. | action |
+ +---------------------+-------------+---------------+----------------+
+ | multipart/encrypted | any | any | pass |
+ | other | available | any | encrypt + sign |
+ | other | unavailable | yes | reject |
+ | other | unavailable | no | sign |
+ +---------------------+-------------+---------------+----------------+
+
+ :param raw: The raw message
+ :type raw: str
+ :param recipient: The recipient for the message
+ :type: recipient: smtp.User
+
+ """
+ # pass if the original message's content-type is "multipart/encrypted"
+ lines = raw.split('\r\n')
+ origmsg = Parser().parsestr(raw)
+
+ if origmsg.get_content_type() == 'multipart/encrypted':
+ return origmsg
+
+ from_address = validate_address(self._from_address)
+ username, domain = from_address.split('@')
+
+ # add a nice footer to the outgoing message
+ # XXX: footer will eventually optional or be removed
+ if origmsg.get_content_type() == 'text/plain':
+ lines.append('--')
+ lines.append('%s - https://%s/key/%s' %
+ (self.FOOTER_STRING, domain, username))
+ lines.append('')
+
+ origmsg = Parser().parsestr('\r\n'.join(lines))
+
+ # get sender and recipient data
+ signkey = self._keymanager.get_key(from_address, OpenPGPKey, private=True)
+ log.msg("Will sign the message with %s." % signkey.fingerprint)
+ to_address = validate_address(recipient.dest.addrstr)
+ try:
+ # try to get the recipient pubkey
+ pubkey = self._keymanager.get_key(to_address, OpenPGPKey)
+ log.msg("Will encrypt the message to %s." % pubkey.fingerprint)
+ signal(proto.SMTP_START_ENCRYPT_AND_SIGN,
+ "%s,%s" % (self._from_address, to_address))
+ newmsg = self._encrypt_and_sign(origmsg, pubkey, signkey)
+
+ signal(proto.SMTP_END_ENCRYPT_AND_SIGN,
+ "%s,%s" % (self._from_address, to_address))
+ except KeyNotFound:
+ # at this point we _can_ send unencrypted mail, because if the
+ # configuration said the opposite the address would have been
+ # rejected in SMTPDelivery.validateTo().
+ log.msg('Will send unencrypted message to %s.' % to_address)
+ signal(proto.SMTP_START_SIGN, self._from_address)
+ newmsg = self._sign(origmsg, signkey)
+ signal(proto.SMTP_END_SIGN, self._from_address)
+ return newmsg, recipient
+
+
+ def _encrypt_and_sign(self, origmsg, pubkey, signkey):
+ """
+ Create an RFC 3156 compliang PGP encrypted and signed message using
+ C{pubkey} to encrypt and C{signkey} to sign.
+
+ :param origmsg: The original message
+ :type origmsg: email.message.Message
+ :param pubkey: The public key used to encrypt the message.
+ :type pubkey: OpenPGPKey
+ :param signkey: The private key used to sign the message.
+ :type signkey: OpenPGPKey
+ :return: The encrypted and signed message
+ :rtype: MultipartEncrypted
+ """
+ # create new multipart/encrypted message with 'pgp-encrypted' protocol
+ newmsg = MultipartEncrypted('application/pgp-encrypted')
+ # move (almost) all headers from original message to the new message
+ self._fix_headers(origmsg, newmsg, signkey)
+ # create 'application/octet-stream' encrypted message
+ encmsg = MIMEApplication(
+ self._keymanager.encrypt(origmsg.as_string(unixfrom=False), pubkey,
+ sign=signkey),
+ _subtype='octet-stream', _encoder=lambda x: x)
+ encmsg.add_header('content-disposition', 'attachment',
+ filename='msg.asc')
+ # create meta message
+ metamsg = PGPEncrypted()
+ metamsg.add_header('Content-Disposition', 'attachment')
+ # attach pgp message parts to new message
+ newmsg.attach(metamsg)
+ newmsg.attach(encmsg)
+ return newmsg
+
+
+ def _sign(self, origmsg, signkey):
+ """
+ Create an RFC 3156 compliant PGP signed MIME message using C{signkey}.
+
+ :param origmsg: The original message
+ :type origmsg: email.message.Message
+ :param signkey: The private key used to sign the message.
+ :type signkey: leap.common.keymanager.openpgp.OpenPGPKey
+ :return: The signed message.
+ :rtype: MultipartSigned
+ """
+ # create new multipart/signed message
+ newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512')
+ # move (almost) all headers from original message to the new message
+ self._fix_headers(origmsg, newmsg, signkey)
+ # apply base64 content-transfer-encoding
+ encode_base64_rec(origmsg)
+ # get message text with headers and replace \n for \r\n
+ fp = StringIO()
+ g = RFC3156CompliantGenerator(
+ fp, mangle_from_=False, maxheaderlen=76)
+ g.flatten(origmsg)
+ msgtext = re.sub('\r?\n', '\r\n', fp.getvalue())
+ # make sure signed message ends with \r\n as per OpenPGP stantard.
+ if origmsg.is_multipart():
+ if not msgtext.endswith("\r\n"):
+ msgtext += "\r\n"
+ # calculate signature
+ signature = self._keymanager.sign(msgtext, signkey, digest_algo='SHA512',
+ clearsign=False, detach=True, binary=False)
+ sigmsg = PGPSignature(signature)
+ # attach original message and signature to new message
+ newmsg.attach(origmsg)
+ newmsg.attach(sigmsg)
+ return newmsg
+
+
+ def _fix_headers(self, origmsg, newmsg, signkey):
+ """
+ Move some headers from C{origmsg} to C{newmsg}, delete unwanted
+ headers from C{origmsg} and add new headers to C{newms}.
+
+ Outgoing messages are either encrypted and signed or just signed
+ before being sent. Because of that, they are packed inside new
+ messages and some manipulation has to be made on their headers.
+
+ Allowed headers for passing through:
+
+ - From
+ - Date
+ - To
+ - Subject
+ - Reply-To
+ - References
+ - In-Reply-To
+ - Cc
+
+ Headers to be added:
+
+ - Message-ID (i.e. should not use origmsg's Message-Id)
+ - Received (this is added automatically by twisted smtp API)
+ - OpenPGP (see #4447)
+
+ Headers to be deleted:
+
+ - User-Agent
+
+ :param origmsg: The original message.
+ :type origmsg: email.message.Message
+ :param newmsg: The new message being created.
+ :type newmsg: email.message.Message
+ :param signkey: The key used to sign C{newmsg}
+ :type signkey: OpenPGPKey
+ """
+ # move headers from origmsg to newmsg
+ headers = origmsg.items()
+ passthrough = [
+ 'from', 'date', 'to', 'subject', 'reply-to', 'references',
+ 'in-reply-to', 'cc'
+ ]
+ headers = filter(lambda x: x[0].lower() in passthrough, headers)
+ for hkey, hval in headers:
+ newmsg.add_header(hkey, hval)
+ del (origmsg[hkey])
+ # add a new message-id to newmsg
+ newmsg.add_header('Message-Id', smtp.messageid())
+ # add openpgp header to newmsg
+ username, domain = signkey.address.split('@')
+ newmsg.add_header(
+ 'OpenPGP', 'id=%s' % signkey.key_id,
+ url='https://%s/key/%s' % (domain, username),
+ preference='signencrypt')
+ # delete user-agent from origmsg
+ del (origmsg['user-agent'])
diff --git a/mail/src/leap/mail/smtp/__init__.py b/mail/src/leap/mail/smtp/__init__.py
index bbd4064..f740f5e 100644
--- a/mail/src/leap/mail/smtp/__init__.py
+++ b/mail/src/leap/mail/smtp/__init__.py
@@ -22,6 +22,8 @@ import logging
from twisted.internet import reactor
from twisted.internet.error import CannotListenError
+from twisted.mail import smtp
+from leap.mail.service import OutgoingMail
logger = logging.getLogger(__name__)
@@ -59,8 +61,8 @@ def setup_smtp_gateway(port, userid, keymanager, smtp_host, smtp_port,
:returns: tuple of SMTPFactory, twisted.internet.tcp.Port
"""
# configure the use of this service with twistd
- factory = SMTPFactory(userid, keymanager, smtp_host, smtp_port, smtp_cert,
- smtp_key, encrypted_only)
+ outgoing_mail = OutgoingMail(str(userid), keymanager, smtp_cert, smtp_key, smtp_host, smtp_port)
+ factory = SMTPFactory(userid, keymanager, encrypted_only, outgoing_mail)
try:
tport = reactor.listenTCP(port, factory, interface="localhost")
signal(proto.SMTP_SERVICE_STARTED, str(port))
diff --git a/mail/src/leap/mail/smtp/gateway.py b/mail/src/leap/mail/smtp/gateway.py
index 13d3bbf..b022091 100644
--- a/mail/src/leap/mail/smtp/gateway.py
+++ b/mail/src/leap/mail/smtp/gateway.py
@@ -31,39 +31,27 @@ The following classes comprise the SMTP gateway service:
"""
-import re
-from StringIO import StringIO
-from email.Header import Header
-from email.utils import parseaddr
-from email.parser import Parser
-from email.mime.application import MIMEApplication
from zope.interface import implements
-from OpenSSL import SSL
from twisted.mail import smtp
from twisted.internet.protocol import ServerFactory
-from twisted.internet import reactor, ssl
-from twisted.internet import defer
-from twisted.internet.threads import deferToThread
from twisted.python import log
-from leap.common.check import leap_assert, leap_assert_type
+from email.Header import Header
+from leap.common.check import leap_assert_type
from leap.common.events import proto, signal
-from leap.keymanager import KeyManager
from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.errors import KeyNotFound
-from leap.mail import __version__
+from leap.mail.utils import validate_address
+
from leap.mail.smtp.rfc3156 import (
- MultipartSigned,
- MultipartEncrypted,
- PGPEncrypted,
- PGPSignature,
RFC3156CompliantGenerator,
- encode_base64_rec,
)
+from leap.mail.service import OutgoingMail
# replace email generator with a RFC 3156 compliant one.
from email import generator
+
generator.Generator = RFC3156CompliantGenerator
@@ -74,31 +62,6 @@ generator.Generator = RFC3156CompliantGenerator
LOCAL_FQDN = "bitmask.local"
-def validate_address(address):
- """
- Validate C{address} as defined in RFC 2822.
-
- :param address: The address to be validated.
- :type address: str
-
- @return: A valid address.
- @rtype: str
-
- @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid.
- """
- leap_assert_type(address, str)
- # in the following, the address is parsed as described in RFC 2822 and
- # ('', '') is returned if the parse fails.
- _, address = parseaddr(address)
- if address == '':
- raise smtp.SMTPBadRcpt(address)
- return address
-
-
-#
-# SMTPFactory
-#
-
class SMTPHeloLocalhost(smtp.SMTP):
"""
An SMTP class that ensures a proper FQDN
@@ -119,45 +82,26 @@ class SMTPFactory(ServerFactory):
"""
domain = LOCAL_FQDN
- def __init__(self, userid, keymanager, host, port, cert, key,
- encrypted_only):
+ def __init__(self, userid, keymanager, encrypted_only, outgoing_mail):
"""
Initialize the SMTP factory.
:param userid: The user currently logged in
:type userid: unicode
- :param keymanager: A KeyManager for retrieving recipient's keys.
- :type keymanager: leap.common.keymanager.KeyManager
- :param host: The hostname of the remote SMTP server.
- :type host: str
- :param port: The port of the remote SMTP server.
- :type port: int
- :param cert: The client certificate for authentication.
- :type cert: str
- :param key: The client key for authentication.
- :type key: str
+ :param keymanager: A Key Manager from where to get recipients' public
+ keys.
:param encrypted_only: Whether the SMTP gateway should send unencrypted
mail or not.
:type encrypted_only: bool
+ :param outgoing_mail: The outgoing mail to send the message
+ :type outgoing_mail: leap.mail.service.OutgoingMail
"""
- # assert params
- leap_assert_type(keymanager, KeyManager)
- leap_assert_type(host, str)
- leap_assert(host != '')
- leap_assert_type(port, int)
- leap_assert(port is not 0)
- leap_assert_type(cert, unicode)
- leap_assert(cert != '')
- leap_assert_type(key, unicode)
- leap_assert(key != '')
+
leap_assert_type(encrypted_only, bool)
# and store them
self._userid = userid
self._km = keymanager
- self._host = host
- self._port = port
- self._cert = cert
- self._key = key
+ self._outgoing_mail = outgoing_mail
self._encrypted_only = encrypted_only
def buildProtocol(self, addr):
@@ -170,9 +114,7 @@ class SMTPFactory(ServerFactory):
@return: The protocol.
@rtype: SMTPDelivery
"""
- smtpProtocol = SMTPHeloLocalhost(SMTPDelivery(
- self._userid, self._km, self._host, self._port, self._cert,
- self._key, self._encrypted_only))
+ smtpProtocol = SMTPHeloLocalhost(SMTPDelivery(self._userid, self._km, self._encrypted_only, self._outgoing_mail))
smtpProtocol.factory = self
return smtpProtocol
@@ -188,33 +130,23 @@ class SMTPDelivery(object):
implements(smtp.IMessageDelivery)
- def __init__(self, userid, keymanager, host, port, cert, key,
- encrypted_only):
+ def __init__(self, userid, keymanager, encrypted_only, outgoing_mail):
"""
Initialize the SMTP delivery object.
:param userid: The user currently logged in
:type userid: unicode
- :param keymanager: A KeyManager for retrieving recipient's keys.
- :type keymanager: leap.common.keymanager.KeyManager
- :param host: The hostname of the remote SMTP server.
- :type host: str
- :param port: The port of the remote SMTP server.
- :type port: int
- :param cert: The client certificate for authentication.
- :type cert: str
- :param key: The client key for authentication.
- :type key: str
+ :param keymanager: A Key Manager from where to get recipients' public
+ keys.
:param encrypted_only: Whether the SMTP gateway should send unencrypted
mail or not.
:type encrypted_only: bool
+ :param outgoing_mail: The outgoing mail to send the message
+ :type outgoing_mail: leap.mail.service.OutgoingMail
"""
self._userid = userid
+ self._outgoing_mail = outgoing_mail
self._km = keymanager
- self._host = host
- self._port = port
- self._cert = cert
- self._key = key
self._encrypted_only = encrypted_only
self._origin = None
@@ -280,9 +212,7 @@ class SMTPDelivery(object):
"encrypted_only' is set to False).")
signal(
proto.SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED, user.dest.addrstr)
- return lambda: EncryptedMessage(
- self._origin, user, self._km, self._host, self._port, self._cert,
- self._key)
+ return lambda: EncryptedMessage(user, self._outgoing_mail)
def validateFrom(self, helo, origin):
"""
@@ -314,19 +244,6 @@ class SMTPDelivery(object):
# EncryptedMessage
#
-class SSLContextFactory(ssl.ClientContextFactory):
- def __init__(self, cert, key):
- self.cert = cert
- self.key = key
-
- def getContext(self):
- self.method = SSL.TLSv1_METHOD # SSLv23_METHOD
- ctx = ssl.ClientContextFactory.getContext(self)
- ctx.use_certificate_file(self.cert)
- ctx.use_privatekey_file(self.key)
- return ctx
-
-
class EncryptedMessage(object):
"""
Receive plaintext from client, encrypt it and send message to a
@@ -334,44 +251,21 @@ class EncryptedMessage(object):
"""
implements(smtp.IMessage)
- FOOTER_STRING = "I prefer encrypted email"
-
- def __init__(self, fromAddress, user, keymanager, host, port, cert, key):
+ def __init__(self, user, outgoing_mail):
"""
Initialize the encrypted message.
- :param fromAddress: The address of the sender.
- :type fromAddress: twisted.mail.smtp.Address
:param user: The recipient of this message.
:type user: twisted.mail.smtp.User
- :param keymanager: A KeyManager for retrieving recipient's keys.
- :type keymanager: leap.common.keymanager.KeyManager
- :param host: The hostname of the remote SMTP server.
- :type host: str
- :param port: The port of the remote SMTP server.
- :type port: int
- :param cert: The client certificate for authentication.
- :type cert: str
- :param key: The client key for authentication.
- :type key: str
+ :param outgoing_mail: The outgoing mail to send the message
+ :type outgoing_mail: leap.mail.service.OutgoingMail
"""
# assert params
leap_assert_type(user, smtp.User)
- leap_assert_type(keymanager, KeyManager)
- # and store them
- self._fromAddress = fromAddress
- self._user = user
- self._km = keymanager
- self._host = host
- self._port = port
- self._cert = cert
- self._key = key
- # initialize list for message's lines
- self.lines = []
- #
- # methods from smtp.IMessage
- #
+ self._user = user
+ self._lines = []
+ self._outgoing_mail = outgoing_mail
def lineReceived(self, line):
"""
@@ -380,7 +274,7 @@ class EncryptedMessage(object):
:param line: The received line.
:type line: str
"""
- self.lines.append(line)
+ self._lines.append(line)
def eomReceived(self):
"""
@@ -391,10 +285,10 @@ class EncryptedMessage(object):
:returns: a deferred
"""
log.msg("Message data complete.")
- self.lines.append('') # add a trailing newline
- d = deferToThread(self._maybe_encrypt_and_sign)
- d.addCallbacks(self.sendMessage, self.skipNoKeyErrBack)
- return d
+ self._lines.append('') # add a trailing newline
+ raw_mail = '\r\n'.join(self._lines)
+
+ return self._outgoing_mail.send_message(raw_mail, self._user)
def connectionLost(self):
"""
@@ -404,290 +298,4 @@ class EncryptedMessage(object):
log.err()
signal(proto.SMTP_CONNECTION_LOST, self._user.dest.addrstr)
# unexpected loss of connection; don't save
- self.lines = []
-
- # ends IMessage implementation
-
- def skipNoKeyErrBack(self, failure):
- """
- Errback that ignores a KeyNotFound
-
- :param failure: the failure
- :type Failure: Failure
- """
- err = failure.value
- if failure.check(KeyNotFound):
- pass
- else:
- raise err
-
- def parseMessage(self):
- """
- Separate message headers from body.
- """
- parser = Parser()
- return parser.parsestr('\r\n'.join(self.lines))
-
- def sendQueued(self, r):
- """
- Callback for the queued message.
-
- :param r: The result from the last previous callback in the chain.
- :type r: anything
- """
- log.msg(r)
-
- def sendSuccess(self, r):
- """
- Callback for a successful send.
-
- :param r: The result from the last previous callback in the chain.
- :type r: anything
- """
- log.msg(r)
- signal(proto.SMTP_SEND_MESSAGE_SUCCESS, self._user.dest.addrstr)
-
- def sendError(self, failure):
- """
- Callback for an unsuccessfull send.
-
- :param e: The result from the last errback.
- :type e: anything
- """
- signal(proto.SMTP_SEND_MESSAGE_ERROR, self._user.dest.addrstr)
- err = failure.value
- log.err(err)
- raise err
-
- def sendMessage(self, *args):
- """
- Sends the message.
-
- :return: A deferred with callback and errback for
- this #message send.
- :rtype: twisted.internet.defer.Deferred
- """
- d = deferToThread(self._route_msg)
- d.addCallbacks(self.sendQueued, self.sendError)
- return d
-
- def _route_msg(self):
- """
- Sends the msg using the ESMTPSenderFactory.
- """
- log.msg("Connecting to SMTP server %s:%s" % (self._host, self._port))
- msg = self._msg.as_string(False)
-
- # we construct a defer to pass to the ESMTPSenderFactory
- d = defer.Deferred()
- d.addCallbacks(self.sendSuccess, self.sendError)
- # we don't pass an ssl context factory to the ESMTPSenderFactory
- # because ssl will be handled by reactor.connectSSL() below.
- factory = smtp.ESMTPSenderFactory(
- "", # username is blank because server does not use auth.
- "", # password is blank because server does not use auth.
- self._fromAddress.addrstr,
- self._user.dest.addrstr,
- StringIO(msg),
- d,
- heloFallback=True,
- requireAuthentication=False,
- requireTransportSecurity=True)
- factory.domain = __version__
- signal(proto.SMTP_SEND_MESSAGE_START, self._user.dest.addrstr)
- reactor.connectSSL(
- self._host, self._port, factory,
- contextFactory=SSLContextFactory(self._cert, self._key))
-
- #
- # encryption methods
- #
-
- def _encrypt_and_sign(self, pubkey, signkey):
- """
- Create an RFC 3156 compliang PGP encrypted and signed message using
- C{pubkey} to encrypt and C{signkey} to sign.
-
- :param pubkey: The public key used to encrypt the message.
- :type pubkey: OpenPGPKey
- :param signkey: The private key used to sign the message.
- :type signkey: OpenPGPKey
- """
- # create new multipart/encrypted message with 'pgp-encrypted' protocol
- newmsg = MultipartEncrypted('application/pgp-encrypted')
- # move (almost) all headers from original message to the new message
- self._fix_headers(self._origmsg, newmsg, signkey)
- # create 'application/octet-stream' encrypted message
- encmsg = MIMEApplication(
- self._km.encrypt(self._origmsg.as_string(unixfrom=False), pubkey,
- sign=signkey),
- _subtype='octet-stream', _encoder=lambda x: x)
- encmsg.add_header('content-disposition', 'attachment',
- filename='msg.asc')
- # create meta message
- metamsg = PGPEncrypted()
- metamsg.add_header('Content-Disposition', 'attachment')
- # attach pgp message parts to new message
- newmsg.attach(metamsg)
- newmsg.attach(encmsg)
- self._msg = newmsg
-
- def _sign(self, signkey):
- """
- Create an RFC 3156 compliant PGP signed MIME message using C{signkey}.
-
- :param signkey: The private key used to sign the message.
- :type signkey: leap.common.keymanager.openpgp.OpenPGPKey
- """
- # create new multipart/signed message
- newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512')
- # move (almost) all headers from original message to the new message
- self._fix_headers(self._origmsg, newmsg, signkey)
- # apply base64 content-transfer-encoding
- encode_base64_rec(self._origmsg)
- # get message text with headers and replace \n for \r\n
- fp = StringIO()
- g = RFC3156CompliantGenerator(
- fp, mangle_from_=False, maxheaderlen=76)
- g.flatten(self._origmsg)
- msgtext = re.sub('\r?\n', '\r\n', fp.getvalue())
- # make sure signed message ends with \r\n as per OpenPGP stantard.
- if self._origmsg.is_multipart():
- if not msgtext.endswith("\r\n"):
- msgtext += "\r\n"
- # calculate signature
- signature = self._km.sign(msgtext, signkey, digest_algo='SHA512',
- clearsign=False, detach=True, binary=False)
- sigmsg = PGPSignature(signature)
- # attach original message and signature to new message
- newmsg.attach(self._origmsg)
- newmsg.attach(sigmsg)
- self._msg = newmsg
-
- def _maybe_encrypt_and_sign(self):
- """
- Attempt to encrypt and sign the outgoing message.
-
- The behaviour of this method depends on:
-
- 1. the original message's content-type, and
- 2. the availability of the recipient's public key.
-
- If the original message's content-type is "multipart/encrypted", then
- the original message is not altered. For any other content-type, the
- method attempts to fetch the recipient's public key. If the
- recipient's public key is available, the message is encrypted and
- signed; otherwise it is only signed.
-
- Note that, if the C{encrypted_only} configuration is set to True and
- the recipient's public key is not available, then the recipient
- address would have been rejected in SMTPDelivery.validateTo().
-
- The following table summarizes the overall behaviour of the gateway:
-
- +---------------------------------------------------+----------------+
- | content-type | rcpt pubkey | enforce encr. | action |
- +---------------------+-------------+---------------+----------------+
- | multipart/encrypted | any | any | pass |
- | other | available | any | encrypt + sign |
- | other | unavailable | yes | reject |
- | other | unavailable | no | sign |
- +---------------------+-------------+---------------+----------------+
- """
- # pass if the original message's content-type is "multipart/encrypted"
- self._origmsg = self.parseMessage()
- if self._origmsg.get_content_type() == 'multipart/encrypted':
- self._msg = self._origmsg
- return
-
- from_address = validate_address(self._fromAddress.addrstr)
- username, domain = from_address.split('@')
-
- # add a nice footer to the outgoing message
- if self._origmsg.get_content_type() == 'text/plain':
- self.lines.append('--')
- self.lines.append('%s - https://%s/key/%s' %
- (self.FOOTER_STRING, domain, username))
- self.lines.append('')
-
- self._origmsg = self.parseMessage()
-
- # get sender and recipient data
- signkey = self._km.get_key(from_address, OpenPGPKey, private=True)
- log.msg("Will sign the message with %s." % signkey.fingerprint)
- to_address = validate_address(self._user.dest.addrstr)
- try:
- # try to get the recipient pubkey
- pubkey = self._km.get_key(to_address, OpenPGPKey)
- log.msg("Will encrypt the message to %s." % pubkey.fingerprint)
- signal(proto.SMTP_START_ENCRYPT_AND_SIGN,
- "%s,%s" % (self._fromAddress.addrstr, to_address))
- self._encrypt_and_sign(pubkey, signkey)
- signal(proto.SMTP_END_ENCRYPT_AND_SIGN,
- "%s,%s" % (self._fromAddress.addrstr, to_address))
- except KeyNotFound:
- # at this point we _can_ send unencrypted mail, because if the
- # configuration said the opposite the address would have been
- # rejected in SMTPDelivery.validateTo().
- log.msg('Will send unencrypted message to %s.' % to_address)
- signal(proto.SMTP_START_SIGN, self._fromAddress.addrstr)
- self._sign(signkey)
- signal(proto.SMTP_END_SIGN, self._fromAddress.addrstr)
-
- def _fix_headers(self, origmsg, newmsg, signkey):
- """
- Move some headers from C{origmsg} to C{newmsg}, delete unwanted
- headers from C{origmsg} and add new headers to C{newms}.
-
- Outgoing messages are either encrypted and signed or just signed
- before being sent. Because of that, they are packed inside new
- messages and some manipulation has to be made on their headers.
-
- Allowed headers for passing through:
-
- - From
- - Date
- - To
- - Subject
- - Reply-To
- - References
- - In-Reply-To
- - Cc
-
- Headers to be added:
-
- - Message-ID (i.e. should not use origmsg's Message-Id)
- - Received (this is added automatically by twisted smtp API)
- - OpenPGP (see #4447)
-
- Headers to be deleted:
-
- - User-Agent
-
- :param origmsg: The original message.
- :type origmsg: email.message.Message
- :param newmsg: The new message being created.
- :type newmsg: email.message.Message
- :param signkey: The key used to sign C{newmsg}
- :type signkey: OpenPGPKey
- """
- # move headers from origmsg to newmsg
- headers = origmsg.items()
- passthrough = [
- 'from', 'date', 'to', 'subject', 'reply-to', 'references',
- 'in-reply-to', 'cc'
- ]
- headers = filter(lambda x: x[0].lower() in passthrough, headers)
- for hkey, hval in headers:
- newmsg.add_header(hkey, hval)
- del(origmsg[hkey])
- # add a new message-id to newmsg
- newmsg.add_header('Message-Id', smtp.messageid())
- # add openpgp header to newmsg
- username, domain = signkey.address.split('@')
- newmsg.add_header(
- 'OpenPGP', 'id=%s' % signkey.key_id,
- url='https://%s/key/%s' % (domain, username),
- preference='signencrypt')
- # delete user-agent from origmsg
- del(origmsg['user-agent'])
+ self._lines = []
diff --git a/mail/src/leap/mail/smtp/tests/__init__.py b/mail/src/leap/mail/smtp/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/mail/src/leap/mail/smtp/tests/__init__.py
diff --git a/mail/src/leap/mail/smtp/tests/test_gateway.py b/mail/src/leap/mail/smtp/tests/test_gateway.py
index 3635a9f..aeace4a 100644
--- a/mail/src/leap/mail/smtp/tests/test_gateway.py
+++ b/mail/src/leap/mail/smtp/tests/test_gateway.py
@@ -20,17 +20,14 @@
SMTP gateway tests.
"""
-
import re
from datetime import datetime
+
from twisted.test import proto_helpers
-from twisted.mail.smtp import User, Address
from mock import Mock
-
from leap.mail.smtp.gateway import (
- SMTPFactory,
- EncryptedMessage,
+ SMTPFactory
)
from leap.mail.tests import (
TestCaseWithKeyManager,
@@ -39,6 +36,7 @@ from leap.mail.tests import (
)
from leap.keymanager import openpgp
+
# some regexps
IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \
"([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"
@@ -71,20 +69,6 @@ class TestSmtpGateway(TestCaseWithKeyManager):
% (string, pattern))
raise self.failureException(msg)
- def test_openpgp_encrypt_decrypt(self):
- "Test if openpgp can encrypt and decrypt."
- text = "simple raw text"
- pubkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=False)
- encrypted = self._km.encrypt(text, pubkey)
- self.assertNotEqual(
- text, encrypted, "Ciphertext is equal to plaintext.")
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(encrypted, privkey)
- self.assertEqual(text, decrypted,
- "Decrypted text differs from plaintext.")
-
def test_gateway_accepts_valid_email(self):
"""
Test if SMTP server responds correctly for valid interaction.
@@ -102,10 +86,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):
# method...
proto = SMTPFactory(
u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
+ self._km,
+ self._config['encrypted_only'], outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
# snip...
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
@@ -116,151 +98,6 @@ class TestSmtpGateway(TestCaseWithKeyManager):
'Did not get expected answer from gateway.')
proto.setTimeout(None)
- def test_message_encrypt(self):
- """
- Test if message gets encrypted to destination email.
- """
- proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
- fromAddr = Address(ADDRESS_2)
- dest = User(ADDRESS, 'gateway.leap.se', proto, ADDRESS)
- m = EncryptedMessage(
- fromAddr, dest, self._km, self._config['host'],
- self._config['port'], self._config['cert'], self._config['key'])
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- # m.eomReceived() # this includes a defer, so we avoid calling it here
- m.lines.append('') # add a trailing newline
- # we need to call the following explicitelly because it was deferred
- # inside the previous method
- m._maybe_encrypt_and_sign()
- # assert structure of encrypted message
- self.assertTrue('Content-Type' in m._msg)
- self.assertEqual('multipart/encrypted', m._msg.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_param('protocol'))
- self.assertEqual(2, len(m._msg.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- m._msg.get_payload(1).get_content_type())
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(
- m._msg.get_payload(1).get_payload(), privkey)
- self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- def test_message_encrypt_sign(self):
- """
- Test if message gets encrypted to destination email and signed with
- sender key.
- """
- proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
- user = User(ADDRESS, 'gateway.leap.se', proto, ADDRESS)
- fromAddr = Address(ADDRESS_2)
- m = EncryptedMessage(
- fromAddr, user, self._km, self._config['host'],
- self._config['port'], self._config['cert'], self._config['key'])
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- # trigger encryption and signing
- # m.eomReceived() # this includes a defer, so we avoid calling it here
- m.lines.append('') # add a trailing newline
- # we need to call the following explicitelly because it was deferred
- # inside the previous method
- m._maybe_encrypt_and_sign()
- # assert structure of encrypted message
- self.assertTrue('Content-Type' in m._msg)
- self.assertEqual('multipart/encrypted', m._msg.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_param('protocol'))
- self.assertEqual(2, len(m._msg.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- m._msg.get_payload(1).get_content_type())
- # decrypt and verify
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- decrypted = self._km.decrypt(
- m._msg.get_payload(1).get_payload(), privkey, verify=pubkey)
- self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- def test_message_sign(self):
- """
- Test if message is signed with sender key.
- """
- # mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
- proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
- user = User('ihavenopubkey@nonleap.se',
- 'gateway.leap.se', proto, ADDRESS)
- fromAddr = Address(ADDRESS_2)
- m = EncryptedMessage(
- fromAddr, user, self._km, self._config['host'],
- self._config['port'], self._config['cert'], self._config['key'])
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- # trigger signing
- # m.eomReceived() # this includes a defer, so we avoid calling it here
- m.lines.append('') # add a trailing newline
- # we need to call the following explicitelly because it was deferred
- # inside the previous method
- m._maybe_encrypt_and_sign()
- # assert structure of signed message
- self.assertTrue('Content-Type' in m._msg)
- self.assertEqual('multipart/signed', m._msg.get_content_type())
- self.assertEqual('application/pgp-signature',
- m._msg.get_param('protocol'))
- self.assertEqual('pgp-sha512', m._msg.get_param('micalg'))
- # assert content of message
- self.assertEqual(
- '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- m._msg.get_payload(0).get_payload(decode=True))
- # assert content of signature
- self.assertTrue(
- m._msg.get_payload(1).get_payload().startswith(
- '-----BEGIN PGP SIGNATURE-----\n'),
- 'Message does not start with signature header.')
- self.assertTrue(
- m._msg.get_payload(1).get_payload().endswith(
- '-----END PGP SIGNATURE-----\n'),
- 'Message does not end with signature footer.')
- # assert signature is valid
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- # replace EOL before verifying (according to rfc3156)
- signed_text = re.sub('\r?\n', '\r\n',
- m._msg.get_payload(0).as_string())
- self.assertTrue(
- self._km.verify(signed_text,
- pubkey,
- detached_sig=m._msg.get_payload(1).get_payload()),
- 'Signature could not be verified.')
-
def test_missing_key_rejects_address(self):
"""
Test if server rejects to send unencrypted when 'encrypted_only' is
@@ -276,10 +113,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):
# prepare the SMTP factory
proto = SMTPFactory(
u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
+ self._km,
+ self._config['encrypted_only'], outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
@@ -307,10 +142,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):
# prepare the SMTP factory with encrypted only equal to false
proto = SMTPFactory(
u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- False).buildProtocol(('127.0.0.1', 0))
+ self._km,
+ False, outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
diff --git a/mail/src/leap/mail/tests/test_service.py b/mail/src/leap/mail/tests/test_service.py
new file mode 100644
index 0000000..f0a807d
--- /dev/null
+++ b/mail/src/leap/mail/tests/test_service.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# test_gateway.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+SMTP gateway tests.
+"""
+
+import re
+from datetime import datetime
+from twisted.mail.smtp import User, Address
+
+from mock import Mock
+
+from leap.mail.smtp.gateway import SMTPFactory
+from leap.mail.service import OutgoingMail
+from leap.mail.tests import (
+ TestCaseWithKeyManager,
+ ADDRESS,
+ ADDRESS_2,
+)
+from leap.keymanager import openpgp
+
+
+class TestOutgoingMail(TestCaseWithKeyManager):
+ EMAIL_DATA = ['HELO gateway.leap.se',
+ 'MAIL FROM: <%s>' % ADDRESS_2,
+ 'RCPT TO: <%s>' % ADDRESS,
+ 'DATA',
+ 'From: User <%s>' % ADDRESS_2,
+ 'To: Leap <%s>' % ADDRESS,
+ 'Date: ' + datetime.now().strftime('%c'),
+ 'Subject: test message',
+ '',
+ 'This is a secret message.',
+ 'Yours,',
+ 'A.',
+ '',
+ '.',
+ 'QUIT']
+
+ def setUp(self):
+ TestCaseWithKeyManager.setUp(self)
+ self.lines = [line for line in self.EMAIL_DATA[4:12]]
+ self.lines.append('') # add a trailing newline
+ self.raw = '\r\n'.join(self.lines)
+ self.fromAddr = ADDRESS_2
+ self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'],
+ self._config['host'], self._config['port'])
+ self.proto = SMTPFactory(
+ u'anotheruser@leap.se',
+ self._km,
+ self._config['encrypted_only'],
+ self.outgoing_mail).buildProtocol(('127.0.0.1', 0))
+ self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS)
+
+ def test_openpgp_encrypt_decrypt(self):
+ "Test if openpgp can encrypt and decrypt."
+ text = "simple raw text"
+ pubkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=False)
+ encrypted = self._km.encrypt(text, pubkey)
+ self.assertNotEqual(
+ text, encrypted, "Ciphertext is equal to plaintext.")
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ decrypted = self._km.decrypt(encrypted, privkey)
+ self.assertEqual(text, decrypted,
+ "Decrypted text differs from plaintext.")
+
+ def test_message_encrypt(self):
+ """
+ Test if message gets encrypted to destination email.
+ """
+
+ message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+
+ # assert structure of encrypted message
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/encrypted', message.get_content_type())
+ self.assertEqual('application/pgp-encrypted',
+ message.get_param('protocol'))
+ self.assertEqual(2, len(message.get_payload()))
+ self.assertEqual('application/pgp-encrypted',
+ message.get_payload(0).get_content_type())
+ self.assertEqual('application/octet-stream',
+ message.get_payload(1).get_content_type())
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ decrypted = self._km.decrypt(
+ message.get_payload(1).get_payload(), privkey)
+
+ expected = '\n' + '\r\n'.join(
+ self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n'
+ self.assertEqual(
+ expected,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ def test_message_encrypt_sign(self):
+ """
+ Test if message gets encrypted to destination email and signed with
+ sender key.
+ """
+ message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+
+ # assert structure of encrypted message
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/encrypted', message.get_content_type())
+ self.assertEqual('application/pgp-encrypted',
+ message.get_param('protocol'))
+ self.assertEqual(2, len(message.get_payload()))
+ self.assertEqual('application/pgp-encrypted',
+ message.get_payload(0).get_content_type())
+ self.assertEqual('application/octet-stream',
+ message.get_payload(1).get_content_type())
+ # decrypt and verify
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
+ decrypted = self._km.decrypt(
+ message.get_payload(1).get_payload(), privkey, verify=pubkey)
+ self.assertEqual(
+ '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
+ 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ def test_message_sign(self):
+ """
+ Test if message is signed with sender key.
+ """
+ # mock the key fetching
+ self._km.fetch_keys_from_server = Mock(return_value=[])
+ recipient = User('ihavenopubkey@nonleap.se',
+ 'gateway.leap.se', self.proto, ADDRESS)
+ self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'],
+ self._config['host'], self._config['port'])
+
+ message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
+
+ # assert structure of signed message
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/signed', message.get_content_type())
+ self.assertEqual('application/pgp-signature',
+ message.get_param('protocol'))
+ self.assertEqual('pgp-sha512', message.get_param('micalg'))
+ # assert content of message
+ self.assertEqual(
+ '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' +
+ 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
+ message.get_payload(0).get_payload(decode=True))
+ # assert content of signature
+ self.assertTrue(
+ message.get_payload(1).get_payload().startswith(
+ '-----BEGIN PGP SIGNATURE-----\n'),
+ 'Message does not start with signature header.')
+ self.assertTrue(
+ message.get_payload(1).get_payload().endswith(
+ '-----END PGP SIGNATURE-----\n'),
+ 'Message does not end with signature footer.')
+ # assert signature is valid
+ pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
+ # replace EOL before verifying (according to rfc3156)
+ signed_text = re.sub('\r?\n', '\r\n',
+ message.get_payload(0).as_string())
+ self.assertTrue(
+ self._km.verify(signed_text,
+ pubkey,
+ detached_sig=message.get_payload(1).get_payload()),
+ 'Signature could not be verified.')
diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py
index fed24b3..457097b 100644
--- a/mail/src/leap/mail/utils.py
+++ b/mail/src/leap/mail/utils.py
@@ -17,12 +17,15 @@
"""
Mail utilities.
"""
+from email.utils import parseaddr
import json
import re
import traceback
import Queue
from leap.soledad.common.document import SoledadDocument
+from leap.common.check import leap_assert_type
+from twisted.mail import smtp
CHARSET_PATTERN = r"""charset=([\w-]+)"""
@@ -224,6 +227,26 @@ def accumulator_queue(fun, lim):
return _accumulator
+def validate_address(address):
+ """
+ Validate C{address} as defined in RFC 2822.
+
+ :param address: The address to be validated.
+ :type address: str
+
+ @return: A valid address.
+ @rtype: str
+
+ @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid.
+ """
+ leap_assert_type(address, str)
+ # in the following, the address is parsed as described in RFC 2822 and
+ # ('', '') is returned if the parse fails.
+ _, address = parseaddr(address)
+ if address == '':
+ raise smtp.SMTPBadRcpt(address)
+ return address
+
#
# String manipulation
#