diff options
author | Duda Dornelles <ddornell@thoughtworks.com> | 2014-11-12 18:27:46 -0200 |
---|---|---|
committer | Duda Dornelles <ddornell@thoughtworks.com> | 2014-11-17 14:32:13 -0200 |
commit | 040faf2bf94db227246a00b38da8f92bfa0c8fa8 (patch) | |
tree | c2d41d770ba2d7d7beb86ef462b9be6b0ee8044e /src | |
parent | 6102c35586bb925de2624d3164e6b9e7d8838a0e (diff) |
Moving encrypt, sign and send logic from gateway (SMTP) to a MailService
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/mail/service.py | 384 | ||||
-rw-r--r-- | src/leap/mail/smtp/__init__.py | 6 | ||||
-rw-r--r-- | src/leap/mail/smtp/gateway.py | 458 | ||||
-rw-r--r-- | src/leap/mail/smtp/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/mail/smtp/tests/test_gateway.py | 185 | ||||
-rw-r--r-- | src/leap/mail/tests/test_service.py | 185 | ||||
-rw-r--r-- | src/leap/mail/utils.py | 23 |
7 files changed, 638 insertions, 603 deletions
diff --git a/src/leap/mail/service.py b/src/leap/mail/service.py new file mode 100644 index 0000000..d595067 --- /dev/null +++ b/src/leap/mail/service.py @@ -0,0 +1,384 @@ +# -*- coding: utf-8 -*- +# service.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import re +from StringIO import StringIO +from email.parser import Parser +from email.mime.application import MIMEApplication + +from OpenSSL import SSL + +from twisted.mail import smtp +from twisted.internet import reactor +from twisted.internet import defer +from twisted.internet.threads import deferToThread +from twisted.protocols.amp import ssl +from twisted.python import log + +from leap.common.check import leap_assert_type, leap_assert +from leap.common.events import proto, signal +from leap.keymanager import KeyManager +from leap.keymanager.openpgp import OpenPGPKey +from leap.keymanager.errors import KeyNotFound +from leap.mail import __version__ +from leap.mail.utils import validate_address +from leap.mail.smtp.rfc3156 import MultipartEncrypted +from leap.mail.smtp.rfc3156 import MultipartSigned +from leap.mail.smtp.rfc3156 import encode_base64_rec +from leap.mail.smtp.rfc3156 import RFC3156CompliantGenerator +from leap.mail.smtp.rfc3156 import PGPSignature +from leap.mail.smtp.rfc3156 import PGPEncrypted + + +class SSLContextFactory(ssl.ClientContextFactory): + def __init__(self, cert, key): + self.cert = cert + self.key = key + + def getContext(self): + self.method = SSL.TLSv1_METHOD # SSLv23_METHOD + ctx = ssl.ClientContextFactory.getContext(self) + ctx.use_certificate_file(self.cert) + ctx.use_privatekey_file(self.key) + return ctx + + +class OutgoingMail: + """ + A service for handling encrypted mail. + """ + + FOOTER_STRING = "I prefer encrypted email" + + def __init__(self, from_address, keymanager, cert, key, host, port): + """ + Initialize the mail service. + + :param from_address: The sender address. + :type from_address: str + :param keymanager: A KeyManager for retrieving recipient's keys. + :type keymanager: leap.common.keymanager.KeyManager + :param cert: The client certificate for SSL authentication. + :type cert: str + :param key: The client private key for SSL authentication. + :type key: str + :param host: The hostname of the remote SMTP server. + :type host: str + :param port: The port of the remote SMTP server. + :type port: int + """ + + # XXX: should we keep these checks? + # assert params + leap_assert_type(keymanager, KeyManager) + leap_assert_type(host, str) + leap_assert(host != '') + leap_assert_type(port, int) + leap_assert(port is not 0) + leap_assert_type(cert, unicode) + leap_assert(cert != '') + leap_assert_type(key, unicode) + leap_assert(key != '') + + self._port = port + self._host = host + self._key = key + self._cert = cert + self._from_address = from_address + self._keymanager = keymanager + + def send_message(self, raw, recipient): + """ + Sends a message to a recipient. Maybe encrypts and signs. + + :param raw: The raw message + :type raw: str + :param recipient: The recipient for the message + :type recipient: smtp.User + :return: a deferred which delivers the message when fired + """ + d = deferToThread(lambda: self._maybe_encrypt_and_sign(raw, recipient)) + d.addCallback(self._route_msg) + d.addErrback(self.sendError) + + return d + + def sendSuccess(self, smtp_sender_result): + """ + Callback for a successful send. + + :param smtp_sender_result: The result from the ESMTPSender from _route_msg + :type smtp_sender_result: tuple(int, list(tuple)) + """ + dest_addrstr = smtp_sender_result[1][0][0] + log.msg('Message sent to %s' % dest_addrstr) + signal(proto.SMTP_SEND_MESSAGE_SUCCESS, dest_addrstr) + + def sendError(self, failure): + """ + Callback for an unsuccessfull send. + + :param e: The result from the last errback. + :type e: anything + """ + # XXX: need to get the address from the exception to send signal + # signal(proto.SMTP_SEND_MESSAGE_ERROR, self._user.dest.addrstr) + err = failure.value + log.err(err) + raise err + + def _route_msg(self, encrypt_and_sign_result): + """ + Sends the msg using the ESMTPSenderFactory. + + :param encrypt_and_sign_result: A tuple containing the 'maybe' encrypted message and the recipient + :type encrypt_and_sign_result: tuple + """ + message, recipient = encrypt_and_sign_result + log.msg("Connecting to SMTP server %s:%s" % (self._host, self._port)) + msg = message.as_string(False) + + # we construct a defer to pass to the ESMTPSenderFactory + d = defer.Deferred() + d.addCallbacks(self.sendSuccess, self.sendError) + # we don't pass an ssl context factory to the ESMTPSenderFactory + # because ssl will be handled by reactor.connectSSL() below. + factory = smtp.ESMTPSenderFactory( + "", # username is blank because client auth is done on SSL protocol level + "", # password is blank because client auth is done on SSL protocol level + self._from_address, + recipient.dest.addrstr, + StringIO(msg), + d, + heloFallback=True, + requireAuthentication=False, + requireTransportSecurity=True) + factory.domain = __version__ + signal(proto.SMTP_SEND_MESSAGE_START, recipient.dest.addrstr) + reactor.connectSSL( + self._host, self._port, factory, + contextFactory=SSLContextFactory(self._cert, self._key)) + + + def _maybe_encrypt_and_sign(self, raw, recipient): + """ + Attempt to encrypt and sign the outgoing message. + + The behaviour of this method depends on: + + 1. the original message's content-type, and + 2. the availability of the recipient's public key. + + If the original message's content-type is "multipart/encrypted", then + the original message is not altered. For any other content-type, the + method attempts to fetch the recipient's public key. If the + recipient's public key is available, the message is encrypted and + signed; otherwise it is only signed. + + Note that, if the C{encrypted_only} configuration is set to True and + the recipient's public key is not available, then the recipient + address would have been rejected in SMTPDelivery.validateTo(). + + The following table summarizes the overall behaviour of the gateway: + + +---------------------------------------------------+----------------+ + | content-type | rcpt pubkey | enforce encr. | action | + +---------------------+-------------+---------------+----------------+ + | multipart/encrypted | any | any | pass | + | other | available | any | encrypt + sign | + | other | unavailable | yes | reject | + | other | unavailable | no | sign | + +---------------------+-------------+---------------+----------------+ + + :param raw: The raw message + :type raw: str + :param recipient: The recipient for the message + :type: recipient: smtp.User + + """ + # pass if the original message's content-type is "multipart/encrypted" + lines = raw.split('\r\n') + origmsg = Parser().parsestr(raw) + + if origmsg.get_content_type() == 'multipart/encrypted': + return origmsg + + from_address = validate_address(self._from_address) + username, domain = from_address.split('@') + + # add a nice footer to the outgoing message + # XXX: footer will eventually optional or be removed + if origmsg.get_content_type() == 'text/plain': + lines.append('--') + lines.append('%s - https://%s/key/%s' % + (self.FOOTER_STRING, domain, username)) + lines.append('') + + origmsg = Parser().parsestr('\r\n'.join(lines)) + + # get sender and recipient data + signkey = self._keymanager.get_key(from_address, OpenPGPKey, private=True) + log.msg("Will sign the message with %s." % signkey.fingerprint) + to_address = validate_address(recipient.dest.addrstr) + try: + # try to get the recipient pubkey + pubkey = self._keymanager.get_key(to_address, OpenPGPKey) + log.msg("Will encrypt the message to %s." % pubkey.fingerprint) + signal(proto.SMTP_START_ENCRYPT_AND_SIGN, + "%s,%s" % (self._from_address, to_address)) + newmsg = self._encrypt_and_sign(origmsg, pubkey, signkey) + + signal(proto.SMTP_END_ENCRYPT_AND_SIGN, + "%s,%s" % (self._from_address, to_address)) + except KeyNotFound: + # at this point we _can_ send unencrypted mail, because if the + # configuration said the opposite the address would have been + # rejected in SMTPDelivery.validateTo(). + log.msg('Will send unencrypted message to %s.' % to_address) + signal(proto.SMTP_START_SIGN, self._from_address) + newmsg = self._sign(origmsg, signkey) + signal(proto.SMTP_END_SIGN, self._from_address) + return newmsg, recipient + + + def _encrypt_and_sign(self, origmsg, pubkey, signkey): + """ + Create an RFC 3156 compliang PGP encrypted and signed message using + C{pubkey} to encrypt and C{signkey} to sign. + + :param origmsg: The original message + :type origmsg: email.message.Message + :param pubkey: The public key used to encrypt the message. + :type pubkey: OpenPGPKey + :param signkey: The private key used to sign the message. + :type signkey: OpenPGPKey + :return: The encrypted and signed message + :rtype: MultipartEncrypted + """ + # create new multipart/encrypted message with 'pgp-encrypted' protocol + newmsg = MultipartEncrypted('application/pgp-encrypted') + # move (almost) all headers from original message to the new message + self._fix_headers(origmsg, newmsg, signkey) + # create 'application/octet-stream' encrypted message + encmsg = MIMEApplication( + self._keymanager.encrypt(origmsg.as_string(unixfrom=False), pubkey, + sign=signkey), + _subtype='octet-stream', _encoder=lambda x: x) + encmsg.add_header('content-disposition', 'attachment', + filename='msg.asc') + # create meta message + metamsg = PGPEncrypted() + metamsg.add_header('Content-Disposition', 'attachment') + # attach pgp message parts to new message + newmsg.attach(metamsg) + newmsg.attach(encmsg) + return newmsg + + + def _sign(self, origmsg, signkey): + """ + Create an RFC 3156 compliant PGP signed MIME message using C{signkey}. + + :param origmsg: The original message + :type origmsg: email.message.Message + :param signkey: The private key used to sign the message. + :type signkey: leap.common.keymanager.openpgp.OpenPGPKey + :return: The signed message. + :rtype: MultipartSigned + """ + # create new multipart/signed message + newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512') + # move (almost) all headers from original message to the new message + self._fix_headers(origmsg, newmsg, signkey) + # apply base64 content-transfer-encoding + encode_base64_rec(origmsg) + # get message text with headers and replace \n for \r\n + fp = StringIO() + g = RFC3156CompliantGenerator( + fp, mangle_from_=False, maxheaderlen=76) + g.flatten(origmsg) + msgtext = re.sub('\r?\n', '\r\n', fp.getvalue()) + # make sure signed message ends with \r\n as per OpenPGP stantard. + if origmsg.is_multipart(): + if not msgtext.endswith("\r\n"): + msgtext += "\r\n" + # calculate signature + signature = self._keymanager.sign(msgtext, signkey, digest_algo='SHA512', + clearsign=False, detach=True, binary=False) + sigmsg = PGPSignature(signature) + # attach original message and signature to new message + newmsg.attach(origmsg) + newmsg.attach(sigmsg) + return newmsg + + + def _fix_headers(self, origmsg, newmsg, signkey): + """ + Move some headers from C{origmsg} to C{newmsg}, delete unwanted + headers from C{origmsg} and add new headers to C{newms}. + + Outgoing messages are either encrypted and signed or just signed + before being sent. Because of that, they are packed inside new + messages and some manipulation has to be made on their headers. + + Allowed headers for passing through: + + - From + - Date + - To + - Subject + - Reply-To + - References + - In-Reply-To + - Cc + + Headers to be added: + + - Message-ID (i.e. should not use origmsg's Message-Id) + - Received (this is added automatically by twisted smtp API) + - OpenPGP (see #4447) + + Headers to be deleted: + + - User-Agent + + :param origmsg: The original message. + :type origmsg: email.message.Message + :param newmsg: The new message being created. + :type newmsg: email.message.Message + :param signkey: The key used to sign C{newmsg} + :type signkey: OpenPGPKey + """ + # move headers from origmsg to newmsg + headers = origmsg.items() + passthrough = [ + 'from', 'date', 'to', 'subject', 'reply-to', 'references', + 'in-reply-to', 'cc' + ] + headers = filter(lambda x: x[0].lower() in passthrough, headers) + for hkey, hval in headers: + newmsg.add_header(hkey, hval) + del (origmsg[hkey]) + # add a new message-id to newmsg + newmsg.add_header('Message-Id', smtp.messageid()) + # add openpgp header to newmsg + username, domain = signkey.address.split('@') + newmsg.add_header( + 'OpenPGP', 'id=%s' % signkey.key_id, + url='https://%s/key/%s' % (domain, username), + preference='signencrypt') + # delete user-agent from origmsg + del (origmsg['user-agent']) diff --git a/src/leap/mail/smtp/__init__.py b/src/leap/mail/smtp/__init__.py index bbd4064..f740f5e 100644 --- a/src/leap/mail/smtp/__init__.py +++ b/src/leap/mail/smtp/__init__.py @@ -22,6 +22,8 @@ import logging from twisted.internet import reactor from twisted.internet.error import CannotListenError +from twisted.mail import smtp +from leap.mail.service import OutgoingMail logger = logging.getLogger(__name__) @@ -59,8 +61,8 @@ def setup_smtp_gateway(port, userid, keymanager, smtp_host, smtp_port, :returns: tuple of SMTPFactory, twisted.internet.tcp.Port """ # configure the use of this service with twistd - factory = SMTPFactory(userid, keymanager, smtp_host, smtp_port, smtp_cert, - smtp_key, encrypted_only) + outgoing_mail = OutgoingMail(str(userid), keymanager, smtp_cert, smtp_key, smtp_host, smtp_port) + factory = SMTPFactory(userid, keymanager, encrypted_only, outgoing_mail) try: tport = reactor.listenTCP(port, factory, interface="localhost") signal(proto.SMTP_SERVICE_STARTED, str(port)) diff --git a/src/leap/mail/smtp/gateway.py b/src/leap/mail/smtp/gateway.py index 13d3bbf..b022091 100644 --- a/src/leap/mail/smtp/gateway.py +++ b/src/leap/mail/smtp/gateway.py @@ -31,39 +31,27 @@ The following classes comprise the SMTP gateway service: """ -import re -from StringIO import StringIO -from email.Header import Header -from email.utils import parseaddr -from email.parser import Parser -from email.mime.application import MIMEApplication from zope.interface import implements -from OpenSSL import SSL from twisted.mail import smtp from twisted.internet.protocol import ServerFactory -from twisted.internet import reactor, ssl -from twisted.internet import defer -from twisted.internet.threads import deferToThread from twisted.python import log -from leap.common.check import leap_assert, leap_assert_type +from email.Header import Header +from leap.common.check import leap_assert_type from leap.common.events import proto, signal -from leap.keymanager import KeyManager from leap.keymanager.openpgp import OpenPGPKey from leap.keymanager.errors import KeyNotFound -from leap.mail import __version__ +from leap.mail.utils import validate_address + from leap.mail.smtp.rfc3156 import ( - MultipartSigned, - MultipartEncrypted, - PGPEncrypted, - PGPSignature, RFC3156CompliantGenerator, - encode_base64_rec, ) +from leap.mail.service import OutgoingMail # replace email generator with a RFC 3156 compliant one. from email import generator + generator.Generator = RFC3156CompliantGenerator @@ -74,31 +62,6 @@ generator.Generator = RFC3156CompliantGenerator LOCAL_FQDN = "bitmask.local" -def validate_address(address): - """ - Validate C{address} as defined in RFC 2822. - - :param address: The address to be validated. - :type address: str - - @return: A valid address. - @rtype: str - - @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid. - """ - leap_assert_type(address, str) - # in the following, the address is parsed as described in RFC 2822 and - # ('', '') is returned if the parse fails. - _, address = parseaddr(address) - if address == '': - raise smtp.SMTPBadRcpt(address) - return address - - -# -# SMTPFactory -# - class SMTPHeloLocalhost(smtp.SMTP): """ An SMTP class that ensures a proper FQDN @@ -119,45 +82,26 @@ class SMTPFactory(ServerFactory): """ domain = LOCAL_FQDN - def __init__(self, userid, keymanager, host, port, cert, key, - encrypted_only): + def __init__(self, userid, keymanager, encrypted_only, outgoing_mail): """ Initialize the SMTP factory. :param userid: The user currently logged in :type userid: unicode - :param keymanager: A KeyManager for retrieving recipient's keys. - :type keymanager: leap.common.keymanager.KeyManager - :param host: The hostname of the remote SMTP server. - :type host: str - :param port: The port of the remote SMTP server. - :type port: int - :param cert: The client certificate for authentication. - :type cert: str - :param key: The client key for authentication. - :type key: str + :param keymanager: A Key Manager from where to get recipients' public + keys. :param encrypted_only: Whether the SMTP gateway should send unencrypted mail or not. :type encrypted_only: bool + :param outgoing_mail: The outgoing mail to send the message + :type outgoing_mail: leap.mail.service.OutgoingMail """ - # assert params - leap_assert_type(keymanager, KeyManager) - leap_assert_type(host, str) - leap_assert(host != '') - leap_assert_type(port, int) - leap_assert(port is not 0) - leap_assert_type(cert, unicode) - leap_assert(cert != '') - leap_assert_type(key, unicode) - leap_assert(key != '') + leap_assert_type(encrypted_only, bool) # and store them self._userid = userid self._km = keymanager - self._host = host - self._port = port - self._cert = cert - self._key = key + self._outgoing_mail = outgoing_mail self._encrypted_only = encrypted_only def buildProtocol(self, addr): @@ -170,9 +114,7 @@ class SMTPFactory(ServerFactory): @return: The protocol. @rtype: SMTPDelivery """ - smtpProtocol = SMTPHeloLocalhost(SMTPDelivery( - self._userid, self._km, self._host, self._port, self._cert, - self._key, self._encrypted_only)) + smtpProtocol = SMTPHeloLocalhost(SMTPDelivery(self._userid, self._km, self._encrypted_only, self._outgoing_mail)) smtpProtocol.factory = self return smtpProtocol @@ -188,33 +130,23 @@ class SMTPDelivery(object): implements(smtp.IMessageDelivery) - def __init__(self, userid, keymanager, host, port, cert, key, - encrypted_only): + def __init__(self, userid, keymanager, encrypted_only, outgoing_mail): """ Initialize the SMTP delivery object. :param userid: The user currently logged in :type userid: unicode - :param keymanager: A KeyManager for retrieving recipient's keys. - :type keymanager: leap.common.keymanager.KeyManager - :param host: The hostname of the remote SMTP server. - :type host: str - :param port: The port of the remote SMTP server. - :type port: int - :param cert: The client certificate for authentication. - :type cert: str - :param key: The client key for authentication. - :type key: str + :param keymanager: A Key Manager from where to get recipients' public + keys. :param encrypted_only: Whether the SMTP gateway should send unencrypted mail or not. :type encrypted_only: bool + :param outgoing_mail: The outgoing mail to send the message + :type outgoing_mail: leap.mail.service.OutgoingMail """ self._userid = userid + self._outgoing_mail = outgoing_mail self._km = keymanager - self._host = host - self._port = port - self._cert = cert - self._key = key self._encrypted_only = encrypted_only self._origin = None @@ -280,9 +212,7 @@ class SMTPDelivery(object): "encrypted_only' is set to False).") signal( proto.SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED, user.dest.addrstr) - return lambda: EncryptedMessage( - self._origin, user, self._km, self._host, self._port, self._cert, - self._key) + return lambda: EncryptedMessage(user, self._outgoing_mail) def validateFrom(self, helo, origin): """ @@ -314,19 +244,6 @@ class SMTPDelivery(object): # EncryptedMessage # -class SSLContextFactory(ssl.ClientContextFactory): - def __init__(self, cert, key): - self.cert = cert - self.key = key - - def getContext(self): - self.method = SSL.TLSv1_METHOD # SSLv23_METHOD - ctx = ssl.ClientContextFactory.getContext(self) - ctx.use_certificate_file(self.cert) - ctx.use_privatekey_file(self.key) - return ctx - - class EncryptedMessage(object): """ Receive plaintext from client, encrypt it and send message to a @@ -334,44 +251,21 @@ class EncryptedMessage(object): """ implements(smtp.IMessage) - FOOTER_STRING = "I prefer encrypted email" - - def __init__(self, fromAddress, user, keymanager, host, port, cert, key): + def __init__(self, user, outgoing_mail): """ Initialize the encrypted message. - :param fromAddress: The address of the sender. - :type fromAddress: twisted.mail.smtp.Address :param user: The recipient of this message. :type user: twisted.mail.smtp.User - :param keymanager: A KeyManager for retrieving recipient's keys. - :type keymanager: leap.common.keymanager.KeyManager - :param host: The hostname of the remote SMTP server. - :type host: str - :param port: The port of the remote SMTP server. - :type port: int - :param cert: The client certificate for authentication. - :type cert: str - :param key: The client key for authentication. - :type key: str + :param outgoing_mail: The outgoing mail to send the message + :type outgoing_mail: leap.mail.service.OutgoingMail """ # assert params leap_assert_type(user, smtp.User) - leap_assert_type(keymanager, KeyManager) - # and store them - self._fromAddress = fromAddress - self._user = user - self._km = keymanager - self._host = host - self._port = port - self._cert = cert - self._key = key - # initialize list for message's lines - self.lines = [] - # - # methods from smtp.IMessage - # + self._user = user + self._lines = [] + self._outgoing_mail = outgoing_mail def lineReceived(self, line): """ @@ -380,7 +274,7 @@ class EncryptedMessage(object): :param line: The received line. :type line: str """ - self.lines.append(line) + self._lines.append(line) def eomReceived(self): """ @@ -391,10 +285,10 @@ class EncryptedMessage(object): :returns: a deferred """ log.msg("Message data complete.") - self.lines.append('') # add a trailing newline - d = deferToThread(self._maybe_encrypt_and_sign) - d.addCallbacks(self.sendMessage, self.skipNoKeyErrBack) - return d + self._lines.append('') # add a trailing newline + raw_mail = '\r\n'.join(self._lines) + + return self._outgoing_mail.send_message(raw_mail, self._user) def connectionLost(self): """ @@ -404,290 +298,4 @@ class EncryptedMessage(object): log.err() signal(proto.SMTP_CONNECTION_LOST, self._user.dest.addrstr) # unexpected loss of connection; don't save - self.lines = [] - - # ends IMessage implementation - - def skipNoKeyErrBack(self, failure): - """ - Errback that ignores a KeyNotFound - - :param failure: the failure - :type Failure: Failure - """ - err = failure.value - if failure.check(KeyNotFound): - pass - else: - raise err - - def parseMessage(self): - """ - Separate message headers from body. - """ - parser = Parser() - return parser.parsestr('\r\n'.join(self.lines)) - - def sendQueued(self, r): - """ - Callback for the queued message. - - :param r: The result from the last previous callback in the chain. - :type r: anything - """ - log.msg(r) - - def sendSuccess(self, r): - """ - Callback for a successful send. - - :param r: The result from the last previous callback in the chain. - :type r: anything - """ - log.msg(r) - signal(proto.SMTP_SEND_MESSAGE_SUCCESS, self._user.dest.addrstr) - - def sendError(self, failure): - """ - Callback for an unsuccessfull send. - - :param e: The result from the last errback. - :type e: anything - """ - signal(proto.SMTP_SEND_MESSAGE_ERROR, self._user.dest.addrstr) - err = failure.value - log.err(err) - raise err - - def sendMessage(self, *args): - """ - Sends the message. - - :return: A deferred with callback and errback for - this #message send. - :rtype: twisted.internet.defer.Deferred - """ - d = deferToThread(self._route_msg) - d.addCallbacks(self.sendQueued, self.sendError) - return d - - def _route_msg(self): - """ - Sends the msg using the ESMTPSenderFactory. - """ - log.msg("Connecting to SMTP server %s:%s" % (self._host, self._port)) - msg = self._msg.as_string(False) - - # we construct a defer to pass to the ESMTPSenderFactory - d = defer.Deferred() - d.addCallbacks(self.sendSuccess, self.sendError) - # we don't pass an ssl context factory to the ESMTPSenderFactory - # because ssl will be handled by reactor.connectSSL() below. - factory = smtp.ESMTPSenderFactory( - "", # username is blank because server does not use auth. - "", # password is blank because server does not use auth. - self._fromAddress.addrstr, - self._user.dest.addrstr, - StringIO(msg), - d, - heloFallback=True, - requireAuthentication=False, - requireTransportSecurity=True) - factory.domain = __version__ - signal(proto.SMTP_SEND_MESSAGE_START, self._user.dest.addrstr) - reactor.connectSSL( - self._host, self._port, factory, - contextFactory=SSLContextFactory(self._cert, self._key)) - - # - # encryption methods - # - - def _encrypt_and_sign(self, pubkey, signkey): - """ - Create an RFC 3156 compliang PGP encrypted and signed message using - C{pubkey} to encrypt and C{signkey} to sign. - - :param pubkey: The public key used to encrypt the message. - :type pubkey: OpenPGPKey - :param signkey: The private key used to sign the message. - :type signkey: OpenPGPKey - """ - # create new multipart/encrypted message with 'pgp-encrypted' protocol - newmsg = MultipartEncrypted('application/pgp-encrypted') - # move (almost) all headers from original message to the new message - self._fix_headers(self._origmsg, newmsg, signkey) - # create 'application/octet-stream' encrypted message - encmsg = MIMEApplication( - self._km.encrypt(self._origmsg.as_string(unixfrom=False), pubkey, - sign=signkey), - _subtype='octet-stream', _encoder=lambda x: x) - encmsg.add_header('content-disposition', 'attachment', - filename='msg.asc') - # create meta message - metamsg = PGPEncrypted() - metamsg.add_header('Content-Disposition', 'attachment') - # attach pgp message parts to new message - newmsg.attach(metamsg) - newmsg.attach(encmsg) - self._msg = newmsg - - def _sign(self, signkey): - """ - Create an RFC 3156 compliant PGP signed MIME message using C{signkey}. - - :param signkey: The private key used to sign the message. - :type signkey: leap.common.keymanager.openpgp.OpenPGPKey - """ - # create new multipart/signed message - newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512') - # move (almost) all headers from original message to the new message - self._fix_headers(self._origmsg, newmsg, signkey) - # apply base64 content-transfer-encoding - encode_base64_rec(self._origmsg) - # get message text with headers and replace \n for \r\n - fp = StringIO() - g = RFC3156CompliantGenerator( - fp, mangle_from_=False, maxheaderlen=76) - g.flatten(self._origmsg) - msgtext = re.sub('\r?\n', '\r\n', fp.getvalue()) - # make sure signed message ends with \r\n as per OpenPGP stantard. - if self._origmsg.is_multipart(): - if not msgtext.endswith("\r\n"): - msgtext += "\r\n" - # calculate signature - signature = self._km.sign(msgtext, signkey, digest_algo='SHA512', - clearsign=False, detach=True, binary=False) - sigmsg = PGPSignature(signature) - # attach original message and signature to new message - newmsg.attach(self._origmsg) - newmsg.attach(sigmsg) - self._msg = newmsg - - def _maybe_encrypt_and_sign(self): - """ - Attempt to encrypt and sign the outgoing message. - - The behaviour of this method depends on: - - 1. the original message's content-type, and - 2. the availability of the recipient's public key. - - If the original message's content-type is "multipart/encrypted", then - the original message is not altered. For any other content-type, the - method attempts to fetch the recipient's public key. If the - recipient's public key is available, the message is encrypted and - signed; otherwise it is only signed. - - Note that, if the C{encrypted_only} configuration is set to True and - the recipient's public key is not available, then the recipient - address would have been rejected in SMTPDelivery.validateTo(). - - The following table summarizes the overall behaviour of the gateway: - - +---------------------------------------------------+----------------+ - | content-type | rcpt pubkey | enforce encr. | action | - +---------------------+-------------+---------------+----------------+ - | multipart/encrypted | any | any | pass | - | other | available | any | encrypt + sign | - | other | unavailable | yes | reject | - | other | unavailable | no | sign | - +---------------------+-------------+---------------+----------------+ - """ - # pass if the original message's content-type is "multipart/encrypted" - self._origmsg = self.parseMessage() - if self._origmsg.get_content_type() == 'multipart/encrypted': - self._msg = self._origmsg - return - - from_address = validate_address(self._fromAddress.addrstr) - username, domain = from_address.split('@') - - # add a nice footer to the outgoing message - if self._origmsg.get_content_type() == 'text/plain': - self.lines.append('--') - self.lines.append('%s - https://%s/key/%s' % - (self.FOOTER_STRING, domain, username)) - self.lines.append('') - - self._origmsg = self.parseMessage() - - # get sender and recipient data - signkey = self._km.get_key(from_address, OpenPGPKey, private=True) - log.msg("Will sign the message with %s." % signkey.fingerprint) - to_address = validate_address(self._user.dest.addrstr) - try: - # try to get the recipient pubkey - pubkey = self._km.get_key(to_address, OpenPGPKey) - log.msg("Will encrypt the message to %s." % pubkey.fingerprint) - signal(proto.SMTP_START_ENCRYPT_AND_SIGN, - "%s,%s" % (self._fromAddress.addrstr, to_address)) - self._encrypt_and_sign(pubkey, signkey) - signal(proto.SMTP_END_ENCRYPT_AND_SIGN, - "%s,%s" % (self._fromAddress.addrstr, to_address)) - except KeyNotFound: - # at this point we _can_ send unencrypted mail, because if the - # configuration said the opposite the address would have been - # rejected in SMTPDelivery.validateTo(). - log.msg('Will send unencrypted message to %s.' % to_address) - signal(proto.SMTP_START_SIGN, self._fromAddress.addrstr) - self._sign(signkey) - signal(proto.SMTP_END_SIGN, self._fromAddress.addrstr) - - def _fix_headers(self, origmsg, newmsg, signkey): - """ - Move some headers from C{origmsg} to C{newmsg}, delete unwanted - headers from C{origmsg} and add new headers to C{newms}. - - Outgoing messages are either encrypted and signed or just signed - before being sent. Because of that, they are packed inside new - messages and some manipulation has to be made on their headers. - - Allowed headers for passing through: - - - From - - Date - - To - - Subject - - Reply-To - - References - - In-Reply-To - - Cc - - Headers to be added: - - - Message-ID (i.e. should not use origmsg's Message-Id) - - Received (this is added automatically by twisted smtp API) - - OpenPGP (see #4447) - - Headers to be deleted: - - - User-Agent - - :param origmsg: The original message. - :type origmsg: email.message.Message - :param newmsg: The new message being created. - :type newmsg: email.message.Message - :param signkey: The key used to sign C{newmsg} - :type signkey: OpenPGPKey - """ - # move headers from origmsg to newmsg - headers = origmsg.items() - passthrough = [ - 'from', 'date', 'to', 'subject', 'reply-to', 'references', - 'in-reply-to', 'cc' - ] - headers = filter(lambda x: x[0].lower() in passthrough, headers) - for hkey, hval in headers: - newmsg.add_header(hkey, hval) - del(origmsg[hkey]) - # add a new message-id to newmsg - newmsg.add_header('Message-Id', smtp.messageid()) - # add openpgp header to newmsg - username, domain = signkey.address.split('@') - newmsg.add_header( - 'OpenPGP', 'id=%s' % signkey.key_id, - url='https://%s/key/%s' % (domain, username), - preference='signencrypt') - # delete user-agent from origmsg - del(origmsg['user-agent']) + self._lines = [] diff --git a/src/leap/mail/smtp/tests/__init__.py b/src/leap/mail/smtp/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/mail/smtp/tests/__init__.py diff --git a/src/leap/mail/smtp/tests/test_gateway.py b/src/leap/mail/smtp/tests/test_gateway.py index 3635a9f..aeace4a 100644 --- a/src/leap/mail/smtp/tests/test_gateway.py +++ b/src/leap/mail/smtp/tests/test_gateway.py @@ -20,17 +20,14 @@ SMTP gateway tests. """ - import re from datetime import datetime + from twisted.test import proto_helpers -from twisted.mail.smtp import User, Address from mock import Mock - from leap.mail.smtp.gateway import ( - SMTPFactory, - EncryptedMessage, + SMTPFactory ) from leap.mail.tests import ( TestCaseWithKeyManager, @@ -39,6 +36,7 @@ from leap.mail.tests import ( ) from leap.keymanager import openpgp + # some regexps IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" @@ -71,20 +69,6 @@ class TestSmtpGateway(TestCaseWithKeyManager): % (string, pattern)) raise self.failureException(msg) - def test_openpgp_encrypt_decrypt(self): - "Test if openpgp can encrypt and decrypt." - text = "simple raw text" - pubkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=False) - encrypted = self._km.encrypt(text, pubkey) - self.assertNotEqual( - text, encrypted, "Ciphertext is equal to plaintext.") - privkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=True) - decrypted = self._km.decrypt(encrypted, privkey) - self.assertEqual(text, decrypted, - "Decrypted text differs from plaintext.") - def test_gateway_accepts_valid_email(self): """ Test if SMTP server responds correctly for valid interaction. @@ -102,10 +86,8 @@ class TestSmtpGateway(TestCaseWithKeyManager): # method... proto = SMTPFactory( u'anotheruser@leap.se', - self._km, self._config['host'], - self._config['port'], - self._config['cert'], self._config['key'], - self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0)) + self._km, + self._config['encrypted_only'], outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0)) # snip... transport = proto_helpers.StringTransport() proto.makeConnection(transport) @@ -116,151 +98,6 @@ class TestSmtpGateway(TestCaseWithKeyManager): 'Did not get expected answer from gateway.') proto.setTimeout(None) - def test_message_encrypt(self): - """ - Test if message gets encrypted to destination email. - """ - proto = SMTPFactory( - u'anotheruser@leap.se', - self._km, self._config['host'], - self._config['port'], - self._config['cert'], self._config['key'], - self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0)) - fromAddr = Address(ADDRESS_2) - dest = User(ADDRESS, 'gateway.leap.se', proto, ADDRESS) - m = EncryptedMessage( - fromAddr, dest, self._km, self._config['host'], - self._config['port'], self._config['cert'], self._config['key']) - for line in self.EMAIL_DATA[4:12]: - m.lineReceived(line) - # m.eomReceived() # this includes a defer, so we avoid calling it here - m.lines.append('') # add a trailing newline - # we need to call the following explicitelly because it was deferred - # inside the previous method - m._maybe_encrypt_and_sign() - # assert structure of encrypted message - self.assertTrue('Content-Type' in m._msg) - self.assertEqual('multipart/encrypted', m._msg.get_content_type()) - self.assertEqual('application/pgp-encrypted', - m._msg.get_param('protocol')) - self.assertEqual(2, len(m._msg.get_payload())) - self.assertEqual('application/pgp-encrypted', - m._msg.get_payload(0).get_content_type()) - self.assertEqual('application/octet-stream', - m._msg.get_payload(1).get_content_type()) - privkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=True) - decrypted = self._km.decrypt( - m._msg.get_payload(1).get_payload(), privkey) - self.assertEqual( - '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + - 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', - decrypted, - 'Decrypted text differs from plaintext.') - - def test_message_encrypt_sign(self): - """ - Test if message gets encrypted to destination email and signed with - sender key. - """ - proto = SMTPFactory( - u'anotheruser@leap.se', - self._km, self._config['host'], - self._config['port'], - self._config['cert'], self._config['key'], - self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0)) - user = User(ADDRESS, 'gateway.leap.se', proto, ADDRESS) - fromAddr = Address(ADDRESS_2) - m = EncryptedMessage( - fromAddr, user, self._km, self._config['host'], - self._config['port'], self._config['cert'], self._config['key']) - for line in self.EMAIL_DATA[4:12]: - m.lineReceived(line) - # trigger encryption and signing - # m.eomReceived() # this includes a defer, so we avoid calling it here - m.lines.append('') # add a trailing newline - # we need to call the following explicitelly because it was deferred - # inside the previous method - m._maybe_encrypt_and_sign() - # assert structure of encrypted message - self.assertTrue('Content-Type' in m._msg) - self.assertEqual('multipart/encrypted', m._msg.get_content_type()) - self.assertEqual('application/pgp-encrypted', - m._msg.get_param('protocol')) - self.assertEqual(2, len(m._msg.get_payload())) - self.assertEqual('application/pgp-encrypted', - m._msg.get_payload(0).get_content_type()) - self.assertEqual('application/octet-stream', - m._msg.get_payload(1).get_content_type()) - # decrypt and verify - privkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=True) - pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) - decrypted = self._km.decrypt( - m._msg.get_payload(1).get_payload(), privkey, verify=pubkey) - self.assertEqual( - '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + - 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', - decrypted, - 'Decrypted text differs from plaintext.') - - def test_message_sign(self): - """ - Test if message is signed with sender key. - """ - # mock the key fetching - self._km.fetch_keys_from_server = Mock(return_value=[]) - proto = SMTPFactory( - u'anotheruser@leap.se', - self._km, self._config['host'], - self._config['port'], - self._config['cert'], self._config['key'], - self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0)) - user = User('ihavenopubkey@nonleap.se', - 'gateway.leap.se', proto, ADDRESS) - fromAddr = Address(ADDRESS_2) - m = EncryptedMessage( - fromAddr, user, self._km, self._config['host'], - self._config['port'], self._config['cert'], self._config['key']) - for line in self.EMAIL_DATA[4:12]: - m.lineReceived(line) - # trigger signing - # m.eomReceived() # this includes a defer, so we avoid calling it here - m.lines.append('') # add a trailing newline - # we need to call the following explicitelly because it was deferred - # inside the previous method - m._maybe_encrypt_and_sign() - # assert structure of signed message - self.assertTrue('Content-Type' in m._msg) - self.assertEqual('multipart/signed', m._msg.get_content_type()) - self.assertEqual('application/pgp-signature', - m._msg.get_param('protocol')) - self.assertEqual('pgp-sha512', m._msg.get_param('micalg')) - # assert content of message - self.assertEqual( - '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' + - 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', - m._msg.get_payload(0).get_payload(decode=True)) - # assert content of signature - self.assertTrue( - m._msg.get_payload(1).get_payload().startswith( - '-----BEGIN PGP SIGNATURE-----\n'), - 'Message does not start with signature header.') - self.assertTrue( - m._msg.get_payload(1).get_payload().endswith( - '-----END PGP SIGNATURE-----\n'), - 'Message does not end with signature footer.') - # assert signature is valid - pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) - # replace EOL before verifying (according to rfc3156) - signed_text = re.sub('\r?\n', '\r\n', - m._msg.get_payload(0).as_string()) - self.assertTrue( - self._km.verify(signed_text, - pubkey, - detached_sig=m._msg.get_payload(1).get_payload()), - 'Signature could not be verified.') - def test_missing_key_rejects_address(self): """ Test if server rejects to send unencrypted when 'encrypted_only' is @@ -276,10 +113,8 @@ class TestSmtpGateway(TestCaseWithKeyManager): # prepare the SMTP factory proto = SMTPFactory( u'anotheruser@leap.se', - self._km, self._config['host'], - self._config['port'], - self._config['cert'], self._config['key'], - self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0)) + self._km, + self._config['encrypted_only'], outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0)) transport = proto_helpers.StringTransport() proto.makeConnection(transport) proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') @@ -307,10 +142,8 @@ class TestSmtpGateway(TestCaseWithKeyManager): # prepare the SMTP factory with encrypted only equal to false proto = SMTPFactory( u'anotheruser@leap.se', - self._km, self._config['host'], - self._config['port'], - self._config['cert'], self._config['key'], - False).buildProtocol(('127.0.0.1', 0)) + self._km, + False, outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0)) transport = proto_helpers.StringTransport() proto.makeConnection(transport) proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') diff --git a/src/leap/mail/tests/test_service.py b/src/leap/mail/tests/test_service.py new file mode 100644 index 0000000..f0a807d --- /dev/null +++ b/src/leap/mail/tests/test_service.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# test_gateway.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP gateway tests. +""" + +import re +from datetime import datetime +from twisted.mail.smtp import User, Address + +from mock import Mock + +from leap.mail.smtp.gateway import SMTPFactory +from leap.mail.service import OutgoingMail +from leap.mail.tests import ( + TestCaseWithKeyManager, + ADDRESS, + ADDRESS_2, +) +from leap.keymanager import openpgp + + +class TestOutgoingMail(TestCaseWithKeyManager): + EMAIL_DATA = ['HELO gateway.leap.se', + 'MAIL FROM: <%s>' % ADDRESS_2, + 'RCPT TO: <%s>' % ADDRESS, + 'DATA', + 'From: User <%s>' % ADDRESS_2, + 'To: Leap <%s>' % ADDRESS, + 'Date: ' + datetime.now().strftime('%c'), + 'Subject: test message', + '', + 'This is a secret message.', + 'Yours,', + 'A.', + '', + '.', + 'QUIT'] + + def setUp(self): + TestCaseWithKeyManager.setUp(self) + self.lines = [line for line in self.EMAIL_DATA[4:12]] + self.lines.append('') # add a trailing newline + self.raw = '\r\n'.join(self.lines) + self.fromAddr = ADDRESS_2 + self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'], + self._config['host'], self._config['port']) + self.proto = SMTPFactory( + u'anotheruser@leap.se', + self._km, + self._config['encrypted_only'], + self.outgoing_mail).buildProtocol(('127.0.0.1', 0)) + self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS) + + def test_openpgp_encrypt_decrypt(self): + "Test if openpgp can encrypt and decrypt." + text = "simple raw text" + pubkey = self._km.get_key( + ADDRESS, openpgp.OpenPGPKey, private=False) + encrypted = self._km.encrypt(text, pubkey) + self.assertNotEqual( + text, encrypted, "Ciphertext is equal to plaintext.") + privkey = self._km.get_key( + ADDRESS, openpgp.OpenPGPKey, private=True) + decrypted = self._km.decrypt(encrypted, privkey) + self.assertEqual(text, decrypted, + "Decrypted text differs from plaintext.") + + def test_message_encrypt(self): + """ + Test if message gets encrypted to destination email. + """ + + message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) + + # assert structure of encrypted message + self.assertTrue('Content-Type' in message) + self.assertEqual('multipart/encrypted', message.get_content_type()) + self.assertEqual('application/pgp-encrypted', + message.get_param('protocol')) + self.assertEqual(2, len(message.get_payload())) + self.assertEqual('application/pgp-encrypted', + message.get_payload(0).get_content_type()) + self.assertEqual('application/octet-stream', + message.get_payload(1).get_content_type()) + privkey = self._km.get_key( + ADDRESS, openpgp.OpenPGPKey, private=True) + decrypted = self._km.decrypt( + message.get_payload(1).get_payload(), privkey) + + expected = '\n' + '\r\n'.join( + self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n' + self.assertEqual( + expected, + decrypted, + 'Decrypted text differs from plaintext.') + + def test_message_encrypt_sign(self): + """ + Test if message gets encrypted to destination email and signed with + sender key. + """ + message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) + + # assert structure of encrypted message + self.assertTrue('Content-Type' in message) + self.assertEqual('multipart/encrypted', message.get_content_type()) + self.assertEqual('application/pgp-encrypted', + message.get_param('protocol')) + self.assertEqual(2, len(message.get_payload())) + self.assertEqual('application/pgp-encrypted', + message.get_payload(0).get_content_type()) + self.assertEqual('application/octet-stream', + message.get_payload(1).get_content_type()) + # decrypt and verify + privkey = self._km.get_key( + ADDRESS, openpgp.OpenPGPKey, private=True) + pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) + decrypted = self._km.decrypt( + message.get_payload(1).get_payload(), privkey, verify=pubkey) + self.assertEqual( + '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', + decrypted, + 'Decrypted text differs from plaintext.') + + def test_message_sign(self): + """ + Test if message is signed with sender key. + """ + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + recipient = User('ihavenopubkey@nonleap.se', + 'gateway.leap.se', self.proto, ADDRESS) + self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'], + self._config['host'], self._config['port']) + + message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient) + + # assert structure of signed message + self.assertTrue('Content-Type' in message) + self.assertEqual('multipart/signed', message.get_content_type()) + self.assertEqual('application/pgp-signature', + message.get_param('protocol')) + self.assertEqual('pgp-sha512', message.get_param('micalg')) + # assert content of message + self.assertEqual( + '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' + + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', + message.get_payload(0).get_payload(decode=True)) + # assert content of signature + self.assertTrue( + message.get_payload(1).get_payload().startswith( + '-----BEGIN PGP SIGNATURE-----\n'), + 'Message does not start with signature header.') + self.assertTrue( + message.get_payload(1).get_payload().endswith( + '-----END PGP SIGNATURE-----\n'), + 'Message does not end with signature footer.') + # assert signature is valid + pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) + # replace EOL before verifying (according to rfc3156) + signed_text = re.sub('\r?\n', '\r\n', + message.get_payload(0).as_string()) + self.assertTrue( + self._km.verify(signed_text, + pubkey, + detached_sig=message.get_payload(1).get_payload()), + 'Signature could not be verified.') diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py index fed24b3..457097b 100644 --- a/src/leap/mail/utils.py +++ b/src/leap/mail/utils.py @@ -17,12 +17,15 @@ """ Mail utilities. """ +from email.utils import parseaddr import json import re import traceback import Queue from leap.soledad.common.document import SoledadDocument +from leap.common.check import leap_assert_type +from twisted.mail import smtp CHARSET_PATTERN = r"""charset=([\w-]+)""" @@ -224,6 +227,26 @@ def accumulator_queue(fun, lim): return _accumulator +def validate_address(address): + """ + Validate C{address} as defined in RFC 2822. + + :param address: The address to be validated. + :type address: str + + @return: A valid address. + @rtype: str + + @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid. + """ + leap_assert_type(address, str) + # in the following, the address is parsed as described in RFC 2822 and + # ('', '') is returned if the parse fails. + _, address = parseaddr(address) + if address == '': + raise smtp.SMTPBadRcpt(address) + return address + # # String manipulation # |