summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/outgoing
diff options
context:
space:
mode:
authorKali Kaneko (leap communications) <kali@leap.se>2016-08-29 23:10:17 -0400
committerKali Kaneko (leap communications) <kali@leap.se>2016-08-29 23:11:41 -0400
commit5a3a2012bb8982ad0884ed659e61e969345e6fde (patch)
treefc2310d8d3244987bf5a1d2632cab99a60ba93f1 /src/leap/bitmask/mail/outgoing
parent43df4205af42fce5d097f70bb0345b69e9d16f1c (diff)
[pkg] move mail source to leap.bitmask.mail
Diffstat (limited to 'src/leap/bitmask/mail/outgoing')
-rw-r--r--src/leap/bitmask/mail/outgoing/__init__.py0
-rw-r--r--src/leap/bitmask/mail/outgoing/service.py518
-rw-r--r--src/leap/bitmask/mail/outgoing/tests/test_outgoing.py263
3 files changed, 781 insertions, 0 deletions
diff --git a/src/leap/bitmask/mail/outgoing/__init__.py b/src/leap/bitmask/mail/outgoing/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/bitmask/mail/outgoing/__init__.py
diff --git a/src/leap/bitmask/mail/outgoing/service.py b/src/leap/bitmask/mail/outgoing/service.py
new file mode 100644
index 00000000..8b02f2e4
--- /dev/null
+++ b/src/leap/bitmask/mail/outgoing/service.py
@@ -0,0 +1,518 @@
+# -*- coding: utf-8 -*-
+# outgoing/service.py
+# Copyright (C) 2013-2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+OutgoingMail module.
+
+The OutgoingMail class allows to send mail, and encrypts/signs it if needed.
+"""
+
+import os.path
+import re
+from StringIO import StringIO
+from copy import deepcopy
+from email.parser import Parser
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+from OpenSSL import SSL
+
+from twisted.mail import smtp
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.protocols.amp import ssl
+from twisted.python import log
+
+from leap.common.check import leap_assert_type, leap_assert
+from leap.common.events import emit_async, catalog
+from leap.keymanager.errors import KeyNotFound, KeyAddressMismatch
+from leap.mail import __version__
+from leap.mail import errors
+from leap.mail.utils import validate_address
+from leap.mail.rfc3156 import MultipartEncrypted
+from leap.mail.rfc3156 import MultipartSigned
+from leap.mail.rfc3156 import encode_base64_rec
+from leap.mail.rfc3156 import RFC3156CompliantGenerator
+from leap.mail.rfc3156 import PGPSignature
+from leap.mail.rfc3156 import PGPEncrypted
+
+# TODO
+# [ ] rename this module to something else, service should be the implementor
+# of IService
+
+
+class SSLContextFactory(ssl.ClientContextFactory):
+ def __init__(self, cert, key):
+ self.cert = cert
+ self.key = key
+
+ def getContext(self):
+ # FIXME -- we should use sslv23 to allow for tlsv1.2
+ # and, if possible, explicitely disable sslv3 clientside.
+ # Servers should avoid sslv3
+ self.method = SSL.TLSv1_METHOD # SSLv23_METHOD
+ ctx = ssl.ClientContextFactory.getContext(self)
+ ctx.use_certificate_file(self.cert)
+ ctx.use_privatekey_file(self.key)
+ return ctx
+
+
+def outgoingFactory(userid, keymanager, opts, check_cert=True, bouncer=None):
+
+ cert = unicode(opts.cert)
+ key = unicode(opts.key)
+ hostname = str(opts.hostname)
+ port = opts.port
+
+ if check_cert:
+ if not os.path.isfile(cert):
+ raise errors.ConfigurationError(
+ 'No valid SMTP certificate could be found for %s!' % userid)
+
+ return OutgoingMail(
+ str(userid), keymanager, cert, key, hostname, port,
+ bouncer)
+
+
+class OutgoingMail(object):
+ """
+ Sends Outgoing Mail, encrypting and signing if needed.
+ """
+
+ def __init__(self, from_address, keymanager, cert, key, host, port,
+ bouncer=None):
+ """
+ Initialize the outgoing mail service.
+
+ :param from_address: The sender address.
+ :type from_address: str
+ :param keymanager: A KeyManager for retrieving recipient's keys.
+ :type keymanager: leap.common.keymanager.KeyManager
+ :param cert: The client certificate for SSL authentication.
+ :type cert: str
+ :param key: The client private key for SSL authentication.
+ :type key: str
+ :param host: The hostname of the remote SMTP server.
+ :type host: str
+ :param port: The port of the remote SMTP server.
+ :type port: int
+ """
+
+ # assert params
+ leap_assert_type(from_address, str)
+ leap_assert('@' in from_address)
+
+ # XXX it can be a zope.proxy too
+ # leap_assert_type(keymanager, KeyManager)
+
+ leap_assert_type(host, str)
+ leap_assert(host != '')
+ leap_assert_type(port, int)
+ leap_assert(port is not 0)
+ leap_assert_type(cert, unicode)
+ leap_assert(cert != '')
+ leap_assert_type(key, unicode)
+ leap_assert(key != '')
+
+ self._port = port
+ self._host = host
+ self._key = key
+ self._cert = cert
+ self._from_address = from_address
+ self._keymanager = keymanager
+ self._bouncer = bouncer
+
+ def send_message(self, raw, recipient):
+ """
+ Sends a message to a recipient. Maybe encrypts and signs.
+
+ :param raw: The raw message
+ :type raw: str
+ :param recipient: The recipient for the message
+ :type recipient: smtp.User
+ :return: a deferred which delivers the message when fired
+ """
+ d = self._maybe_encrypt_and_sign(raw, recipient)
+ d.addCallback(self._route_msg, raw)
+ d.addErrback(self.sendError, raw)
+ return d
+
+ def sendSuccess(self, smtp_sender_result):
+ """
+ Callback for a successful send.
+
+ :param smtp_sender_result: The result from the ESMTPSender from
+ _route_msg
+ :type smtp_sender_result: tuple(int, list(tuple))
+ """
+ dest_addrstr = smtp_sender_result[1][0][0]
+ fromaddr = self._from_address
+ log.msg('Message sent from %s to %s' % (fromaddr, dest_addrstr))
+ emit_async(catalog.SMTP_SEND_MESSAGE_SUCCESS,
+ fromaddr, dest_addrstr)
+
+ def sendError(self, failure, origmsg):
+ """
+ Callback for an unsuccessfull send.
+
+ :param failure: The result from the last errback.
+ :type failure: anything
+ :param origmsg: the original, unencrypted, raw message, to be passed to
+ the bouncer.
+ :type origmsg: str
+ """
+ # XXX: need to get the address from the original message to send signal
+ # emit_async(catalog.SMTP_SEND_MESSAGE_ERROR, self._from_address,
+ # self._user.dest.addrstr)
+
+ # TODO when we implement outgoing queues/long-term-retries, we could
+ # examine the error *here* and delay the notification if it's just a
+ # temporal error. We might want to notify the permanent errors
+ # differently.
+
+ err = failure.value
+ log.err(err)
+
+ if self._bouncer:
+ self._bouncer.bounce_message(
+ err.message, to=self._from_address,
+ orig=origmsg)
+ else:
+ raise err
+
+ def _route_msg(self, encrypt_and_sign_result, raw):
+ """
+ Sends the msg using the ESMTPSenderFactory.
+
+ :param encrypt_and_sign_result: A tuple containing the 'maybe'
+ encrypted message and the recipient
+ :type encrypt_and_sign_result: tuple
+ """
+ message, recipient = encrypt_and_sign_result
+ log.msg("Connecting to SMTP server %s:%s" % (self._host, self._port))
+ msg = message.as_string(False)
+
+ # we construct a defer to pass to the ESMTPSenderFactory
+ d = defer.Deferred()
+ d.addCallback(self.sendSuccess)
+ d.addErrback(self.sendError, raw)
+ # we don't pass an ssl context factory to the ESMTPSenderFactory
+ # because ssl will be handled by reactor.connectSSL() below.
+ factory = smtp.ESMTPSenderFactory(
+ "", # username is blank, no client auth here
+ "", # password is blank, no client auth here
+ self._from_address,
+ recipient.dest.addrstr,
+ StringIO(msg),
+ d,
+ heloFallback=True,
+ requireAuthentication=False,
+ requireTransportSecurity=True)
+ factory.domain = bytes('leap.mail-' + __version__)
+ emit_async(catalog.SMTP_SEND_MESSAGE_START,
+ self._from_address, recipient.dest.addrstr)
+ reactor.connectSSL(
+ self._host, self._port, factory,
+ contextFactory=SSLContextFactory(self._cert, self._key))
+
+ def _maybe_encrypt_and_sign(self, raw, recipient, fetch_remote=True):
+ """
+ Attempt to encrypt and sign the outgoing message.
+
+ The behaviour of this method depends on:
+
+ 1. the original message's content-type, and
+ 2. the availability of the recipient's public key.
+
+ If the original message's content-type is "multipart/encrypted", then
+ the original message is not altered. For any other content-type, the
+ method attempts to fetch the recipient's public key. If the
+ recipient's public key is available, the message is encrypted and
+ signed; otherwise it is only signed.
+
+ Note that, if the C{encrypted_only} configuration is set to True and
+ the recipient's public key is not available, then the recipient
+ address would have been rejected in SMTPDelivery.validateTo().
+
+ The following table summarizes the overall behaviour of the gateway:
+
+ +---------------------------------------------------+----------------+
+ | content-type | rcpt pubkey | enforce encr. | action |
+ +---------------------+-------------+---------------+----------------+
+ | multipart/encrypted | any | any | pass |
+ | other | available | any | encrypt + sign |
+ | other | unavailable | yes | reject |
+ | other | unavailable | no | sign |
+ +---------------------+-------------+---------------+----------------+
+
+ :param raw: The raw message
+ :type raw: str
+ :param recipient: The recipient for the message
+ :type: recipient: smtp.User
+
+ :return: A Deferred that will be fired with a MIMEMultipart message
+ and the original recipient Message
+ :rtype: Deferred
+ """
+ # pass if the original message's content-type is "multipart/encrypted"
+ origmsg = Parser().parsestr(raw)
+
+ if origmsg.get_content_type() == 'multipart/encrypted':
+ return defer.succeed((origmsg, recipient))
+
+ from_address = validate_address(self._from_address)
+ username, domain = from_address.split('@')
+ to_address = validate_address(recipient.dest.addrstr)
+
+ def maybe_encrypt_and_sign(message):
+ d = self._encrypt_and_sign(
+ message, to_address, from_address,
+ fetch_remote=fetch_remote)
+ d.addCallbacks(signal_encrypt_sign,
+ if_key_not_found_send_unencrypted,
+ errbackArgs=(message,))
+ return d
+
+ def signal_encrypt_sign(newmsg):
+ emit_async(catalog.SMTP_END_ENCRYPT_AND_SIGN,
+ self._from_address,
+ "%s,%s" % (self._from_address, to_address))
+ return newmsg, recipient
+
+ def if_key_not_found_send_unencrypted(failure, message):
+ failure.trap(KeyNotFound, KeyAddressMismatch)
+
+ log.msg('Will send unencrypted message to %s.' % to_address)
+ emit_async(catalog.SMTP_START_SIGN, self._from_address, to_address)
+ d = self._sign(message, from_address)
+ d.addCallback(signal_sign)
+ return d
+
+ def signal_sign(newmsg):
+ emit_async(catalog.SMTP_END_SIGN, self._from_address)
+ return newmsg, recipient
+
+ log.msg("Will encrypt the message with %s and sign with %s."
+ % (to_address, from_address))
+ emit_async(catalog.SMTP_START_ENCRYPT_AND_SIGN,
+ self._from_address,
+ "%s,%s" % (self._from_address, to_address))
+ d = self._maybe_attach_key(origmsg, from_address, to_address)
+ d.addCallback(maybe_encrypt_and_sign)
+ return d
+
+ def _maybe_attach_key(self, origmsg, from_address, to_address):
+ filename = "%s-email-key.asc" % (from_address,)
+
+ def attach_if_address_hasnt_encrypted(to_key):
+ # if the sign_used flag is true that means that we got an encrypted
+ # email from this address, because we conly check signatures on
+ # encrypted emails. In this case we don't attach.
+ # XXX: this might not be true some time in the future
+ if to_key.sign_used:
+ return origmsg
+ return get_key_and_attach(None)
+
+ def get_key_and_attach(_):
+ d = self._keymanager.get_key(from_address, fetch_remote=False)
+ d.addCallback(attach_key)
+ return d
+
+ def attach_key(from_key):
+ msg = origmsg
+ if not origmsg.is_multipart():
+ msg = MIMEMultipart()
+ for h, v in origmsg.items():
+ msg.add_header(h, v)
+ msg.attach(MIMEText(origmsg.get_payload()))
+
+ keymsg = MIMEApplication(from_key.key_data, _subtype='pgp-keys',
+ _encoder=lambda x: x)
+ keymsg.add_header('content-disposition', 'attachment',
+ filename=filename)
+ msg.attach(keymsg)
+ return msg
+
+ d = self._keymanager.get_key(to_address, fetch_remote=False)
+ d.addCallbacks(attach_if_address_hasnt_encrypted, get_key_and_attach)
+ d.addErrback(lambda _: origmsg)
+ return d
+
+ def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address,
+ fetch_remote=True):
+ """
+ Create an RFC 3156 compliang PGP encrypted and signed message using
+ C{encrypt_address} to encrypt and C{sign_address} to sign.
+
+ :param origmsg: The original message
+ :type origmsg: email.message.Message
+ :param encrypt_address: The address used to encrypt the message.
+ :type encrypt_address: str
+ :param sign_address: The address used to sign the message.
+ :type sign_address: str
+
+ :return: A Deferred with the MultipartEncrypted message
+ :rtype: Deferred
+ """
+ # create new multipart/encrypted message with 'pgp-encrypted' protocol
+
+ def encrypt(res):
+ newmsg, origmsg = res
+ d = self._keymanager.encrypt(
+ origmsg.as_string(unixfrom=False),
+ encrypt_address, sign=sign_address,
+ fetch_remote=fetch_remote)
+ d.addCallback(lambda encstr: (newmsg, encstr))
+ return d
+
+ def create_encrypted_message(res):
+ newmsg, encstr = res
+ encmsg = MIMEApplication(
+ encstr, _subtype='octet-stream', _encoder=lambda x: x)
+ encmsg.add_header('content-disposition', 'attachment',
+ filename='msg.asc')
+ # create meta message
+ metamsg = PGPEncrypted()
+ metamsg.add_header('Content-Disposition', 'attachment')
+ # attach pgp message parts to new message
+ newmsg.attach(metamsg)
+ newmsg.attach(encmsg)
+ return newmsg
+
+ d = self._fix_headers(
+ origmsg,
+ MultipartEncrypted('application/pgp-encrypted'),
+ sign_address)
+ d.addCallback(encrypt)
+ d.addCallback(create_encrypted_message)
+ return d
+
+ def _sign(self, origmsg, sign_address):
+ """
+ Create an RFC 3156 compliant PGP signed MIME message using
+ C{sign_address}.
+
+ :param origmsg: The original message
+ :type origmsg: email.message.Message
+ :param sign_address: The address used to sign the message.
+ :type sign_address: str
+
+ :return: A Deferred with the MultipartSigned message.
+ :rtype: Deferred
+ """
+ # apply base64 content-transfer-encoding
+ encode_base64_rec(origmsg)
+ # get message text with headers and replace \n for \r\n
+ fp = StringIO()
+ g = RFC3156CompliantGenerator(
+ fp, mangle_from_=False, maxheaderlen=76)
+ g.flatten(origmsg)
+ msgtext = re.sub('\r?\n', '\r\n', fp.getvalue())
+ # make sure signed message ends with \r\n as per OpenPGP stantard.
+ if origmsg.is_multipart():
+ if not msgtext.endswith("\r\n"):
+ msgtext += "\r\n"
+
+ def create_signed_message(res):
+ (msg, _), signature = res
+ sigmsg = PGPSignature(signature)
+ # attach original message and signature to new message
+ msg.attach(origmsg)
+ msg.attach(sigmsg)
+ return msg
+
+ dh = self._fix_headers(
+ origmsg,
+ MultipartSigned('application/pgp-signature', 'pgp-sha512'),
+ sign_address)
+ ds = self._keymanager.sign(
+ msgtext, sign_address, digest_algo='SHA512',
+ clearsign=False, detach=True, binary=False)
+ d = defer.gatherResults([dh, ds])
+ d.addCallback(create_signed_message)
+ return d
+
+ def _fix_headers(self, msg, newmsg, sign_address):
+ """
+ Move some headers from C{origmsg} to C{newmsg}, delete unwanted
+ headers from C{origmsg} and add new headers to C{newms}.
+
+ Outgoing messages are either encrypted and signed or just signed
+ before being sent. Because of that, they are packed inside new
+ messages and some manipulation has to be made on their headers.
+
+ Allowed headers for passing through:
+
+ - From
+ - Date
+ - To
+ - Subject
+ - Reply-To
+ - References
+ - In-Reply-To
+ - Cc
+
+ Headers to be added:
+
+ - Message-ID (i.e. should not use origmsg's Message-Id)
+ - Received (this is added automatically by twisted smtp API)
+ - OpenPGP (see #4447)
+
+ Headers to be deleted:
+
+ - User-Agent
+
+ :param msg: The original message.
+ :type msg: email.message.Message
+ :param newmsg: The new message being created.
+ :type newmsg: email.message.Message
+ :param sign_address: The address used to sign C{newmsg}
+ :type sign_address: str
+
+ :return: A Deferred with a touple:
+ (new Message with the unencrypted headers,
+ original Message with headers removed)
+ :rtype: Deferred
+ """
+ origmsg = deepcopy(msg)
+ # move headers from origmsg to newmsg
+ headers = origmsg.items()
+ passthrough = [
+ 'from', 'date', 'to', 'subject', 'reply-to', 'references',
+ 'in-reply-to', 'cc'
+ ]
+ headers = filter(lambda x: x[0].lower() in passthrough, headers)
+ for hkey, hval in headers:
+ newmsg.add_header(hkey, hval)
+ del (origmsg[hkey])
+ # add a new message-id to newmsg
+ newmsg.add_header('Message-Id', smtp.messageid())
+ # delete user-agent from origmsg
+ del (origmsg['user-agent'])
+
+ def add_openpgp_header(signkey):
+ username, domain = sign_address.split('@')
+ newmsg.add_header(
+ 'OpenPGP', 'id=%s' % signkey.fingerprint,
+ url='https://%s/key/%s' % (domain, username),
+ preference='signencrypt')
+ return newmsg, origmsg
+
+ d = self._keymanager.get_key(sign_address, private=True)
+ d.addCallback(add_openpgp_header)
+ return d
diff --git a/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py b/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py
new file mode 100644
index 00000000..dd053c15
--- /dev/null
+++ b/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py
@@ -0,0 +1,263 @@
+# -*- coding: utf-8 -*-
+# test_gateway.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+SMTP gateway tests.
+"""
+import re
+from copy import deepcopy
+from StringIO import StringIO
+from email.parser import Parser
+from datetime import datetime
+from twisted.internet.defer import fail
+from twisted.mail.smtp import User
+from twisted.python import log
+
+from mock import Mock
+
+from leap.mail.rfc3156 import RFC3156CompliantGenerator
+from leap.mail.outgoing.service import OutgoingMail
+from leap.mail.testing import ADDRESS, ADDRESS_2, PUBLIC_KEY_2
+from leap.mail.testing import KeyManagerWithSoledadTestCase
+from leap.mail.testing.smtp import getSMTPFactory
+from leap.keymanager import errors
+
+
+BEGIN_PUBLIC_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----"
+
+TEST_USER = u'anotheruser@leap.se'
+
+
+class TestOutgoingMail(KeyManagerWithSoledadTestCase):
+ EMAIL_DATA = ['HELO gateway.leap.se',
+ 'MAIL FROM: <%s>' % ADDRESS_2,
+ 'RCPT TO: <%s>' % ADDRESS,
+ 'DATA',
+ 'From: User <%s>' % ADDRESS_2,
+ 'To: Leap <%s>' % ADDRESS,
+ 'Date: ' + datetime.now().strftime('%c'),
+ 'Subject: test message',
+ '',
+ 'This is a secret message.',
+ 'Yours,',
+ 'A.',
+ '',
+ '.',
+ 'QUIT']
+
+ def setUp(self):
+ self.lines = [line for line in self.EMAIL_DATA[4:12]]
+ self.lines.append('') # add a trailing newline
+ self.raw = '\r\n'.join(self.lines)
+ self.expected_body = '\r\n'.join(self.EMAIL_DATA[9:12]) + "\r\n"
+ self.fromAddr = ADDRESS_2
+
+ class opts:
+ cert = u'/tmp/cert'
+ key = u'/tmp/cert'
+ hostname = 'remote'
+ port = 666
+ self.opts = opts
+
+ def init_outgoing_and_proto(_):
+ self.outgoing_mail = OutgoingMail(
+ self.fromAddr, self.km, opts.cert,
+ opts.key, opts.hostname, opts.port)
+
+ user = TEST_USER
+
+ # TODO -- this shouldn't need SMTP to be tested!? or does it?
+ self.proto = getSMTPFactory(
+ {user: None}, {user: self.km}, {user: None})
+ self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS_2)
+
+ d = KeyManagerWithSoledadTestCase.setUp(self)
+ d.addCallback(init_outgoing_and_proto)
+ return d
+
+ def test_message_encrypt(self):
+ """
+ Test if message gets encrypted to destination email.
+ """
+ def check_decryption(res):
+ decrypted, _ = res
+ self.assertEqual(
+ '\n' + self.expected_body,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ d = self._set_sign_used(ADDRESS)
+ d.addCallback(
+ lambda _:
+ self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(lambda message: self.km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS))
+ d.addCallback(check_decryption)
+ return d
+
+ def test_message_encrypt_sign(self):
+ """
+ Test if message gets encrypted to destination email and signed with
+ sender key.
+ '"""
+ def check_decryption_and_verify(res):
+ decrypted, signkey = res
+ self.assertEqual(
+ '\n' + self.expected_body,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+ self.assertTrue(ADDRESS_2 in signkey.address,
+ "Verification failed")
+
+ d = self._set_sign_used(ADDRESS)
+ d.addCallback(
+ lambda _:
+ self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(lambda message: self.km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS, verify=ADDRESS_2))
+ d.addCallback(check_decryption_and_verify)
+ return d
+
+ def test_message_sign(self):
+ """
+ Test if message is signed with sender key.
+ """
+ # mock the key fetching
+ self.km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
+ recipient = User('ihavenopubkey@nonleap.se',
+ 'gateway.leap.se', self.proto, ADDRESS)
+ self.outgoing_mail = OutgoingMail(
+ self.fromAddr, self.km, self.opts.cert, self.opts.key,
+ self.opts.hostname, self.opts.port)
+
+ def check_signed(res):
+ message, _ = res
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/signed', message.get_content_type())
+ self.assertEqual('application/pgp-signature',
+ message.get_param('protocol'))
+ self.assertEqual('pgp-sha512', message.get_param('micalg'))
+ # assert content of message
+ body = (message.get_payload(0)
+ .get_payload(0)
+ .get_payload(decode=True))
+ self.assertEqual(self.expected_body,
+ body)
+ # assert content of signature
+ self.assertTrue(
+ message.get_payload(1).get_payload().startswith(
+ '-----BEGIN PGP SIGNATURE-----\n'),
+ 'Message does not start with signature header.')
+ self.assertTrue(
+ message.get_payload(1).get_payload().endswith(
+ '-----END PGP SIGNATURE-----\n'),
+ 'Message does not end with signature footer.')
+ return message
+
+ def verify(message):
+ # replace EOL before verifying (according to rfc3156)
+ fp = StringIO()
+ g = RFC3156CompliantGenerator(
+ fp, mangle_from_=False, maxheaderlen=76)
+ g.flatten(message.get_payload(0))
+ signed_text = re.sub('\r?\n', '\r\n',
+ fp.getvalue())
+
+ def assert_verify(key):
+ self.assertTrue(ADDRESS_2 in key.address,
+ 'Signature could not be verified.')
+
+ d = self.km.verify(
+ signed_text, ADDRESS_2,
+ detached_sig=message.get_payload(1).get_payload())
+ d.addCallback(assert_verify)
+ return d
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
+ d.addCallback(check_signed)
+ d.addCallback(verify)
+ return d
+
+ def test_attach_key(self):
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(self._check_headers, self.lines[:4])
+ d.addCallback(lambda message: self.km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS))
+ d.addCallback(lambda (decrypted, _):
+ self._check_key_attachment(Parser().parsestr(decrypted)))
+ return d
+
+ def test_attach_key_not_known(self):
+ unknown_address = "someunknownaddress@somewhere.com"
+ lines = deepcopy(self.lines)
+ lines[1] = "To: <%s>" % (unknown_address,)
+ raw = '\r\n'.join(lines)
+ dest = User(unknown_address, 'gateway.leap.se', self.proto, ADDRESS_2)
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(
+ raw, dest, fetch_remote=False)
+ d.addCallback(lambda (message, _):
+ self._check_headers(message, lines[:4]))
+ d.addCallback(self._check_key_attachment)
+ d.addErrback(log.err)
+ return d
+
+ def _check_headers(self, message, headers):
+ msgstr = message.as_string(unixfrom=False)
+ for header in headers:
+ self.assertTrue(header in msgstr,
+ "Missing header: %s" % (header,))
+ return message
+
+ def _check_key_attachment(self, message):
+ for payload in message.get_payload():
+ if payload.is_multipart():
+ return self._check_key_attachment(payload)
+ if 'application/pgp-keys' == payload.get_content_type():
+ keylines = PUBLIC_KEY_2.split('\n')
+ key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1])
+ self.assertTrue(key in payload.get_payload(decode=True),
+ "Key attachment don't match")
+ return
+ self.fail("No public key attachment found")
+
+ def _set_sign_used(self, address):
+ def set_sign(key):
+ key.sign_used = True
+ return self.km.put_key(key)
+
+ d = self.km.get_key(address, fetch_remote=False)
+ d.addCallback(set_sign)
+ return d
+
+ def _assert_encrypted(self, res):
+ message, _ = res
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/encrypted', message.get_content_type())
+ self.assertEqual('application/pgp-encrypted',
+ message.get_param('protocol'))
+ self.assertEqual(2, len(message.get_payload()))
+ self.assertEqual('application/pgp-encrypted',
+ message.get_payload(0).get_content_type())
+ self.assertEqual('application/octet-stream',
+ message.get_payload(1).get_content_type())
+ return message