diff options
Diffstat (limited to 'tests/integration/mail/outgoing/test_outgoing.py')
-rw-r--r-- | tests/integration/mail/outgoing/test_outgoing.py | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/tests/integration/mail/outgoing/test_outgoing.py b/tests/integration/mail/outgoing/test_outgoing.py new file mode 100644 index 0000000..1684a54 --- /dev/null +++ b/tests/integration/mail/outgoing/test_outgoing.py @@ -0,0 +1,263 @@ +# -*- coding: utf-8 -*- +# test_gateway.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP gateway tests. +""" +import re +from copy import deepcopy +from StringIO import StringIO +from email.parser import Parser +from datetime import datetime +from twisted.internet.defer import fail +from twisted.mail.smtp import User +from twisted.python import log + +from mock import Mock + +from leap.bitmask.mail.rfc3156 import RFC3156CompliantGenerator +from leap.bitmask.mail.outgoing.service import OutgoingMail +from leap.bitmask.mail.testing import ADDRESS, ADDRESS_2, PUBLIC_KEY_2 +from leap.bitmask.mail.testing import KeyManagerWithSoledadTestCase +from leap.bitmask.mail.testing.smtp import getSMTPFactory +from leap.bitmask.keymanager import errors + + +BEGIN_PUBLIC_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----" + +TEST_USER = u'anotheruser@leap.se' + + +class TestOutgoingMail(KeyManagerWithSoledadTestCase): + EMAIL_DATA = ['HELO gateway.leap.se', + 'MAIL FROM: <%s>' % ADDRESS_2, + 'RCPT TO: <%s>' % ADDRESS, + 'DATA', + 'From: User <%s>' % ADDRESS_2, + 'To: Leap <%s>' % ADDRESS, + 'Date: ' + datetime.now().strftime('%c'), + 'Subject: test message', + '', + 'This is a secret message.', + 'Yours,', + 'A.', + '', + '.', + 'QUIT'] + + def setUp(self): + self.lines = [line for line in self.EMAIL_DATA[4:12]] + self.lines.append('') # add a trailing newline + self.raw = '\r\n'.join(self.lines) + self.expected_body = '\r\n'.join(self.EMAIL_DATA[9:12]) + "\r\n" + self.fromAddr = ADDRESS_2 + + class opts: + cert = u'/tmp/cert' + key = u'/tmp/cert' + hostname = 'remote' + port = 666 + self.opts = opts + + def init_outgoing_and_proto(_): + self.outgoing_mail = OutgoingMail( + self.fromAddr, self.km, opts.cert, + opts.key, opts.hostname, opts.port) + + user = TEST_USER + + # TODO -- this shouldn't need SMTP to be tested!? or does it? + self.proto = getSMTPFactory( + {user: None}, {user: self.km}, {user: None}) + self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS_2) + + d = KeyManagerWithSoledadTestCase.setUp(self) + d.addCallback(init_outgoing_and_proto) + return d + + def test_message_encrypt(self): + """ + Test if message gets encrypted to destination email. + """ + def check_decryption(res): + decrypted, _ = res + self.assertEqual( + '\n' + self.expected_body, + decrypted, + 'Decrypted text differs from plaintext.') + + d = self._set_sign_used(ADDRESS) + d.addCallback( + lambda _: + self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)) + d.addCallback(self._assert_encrypted) + d.addCallback(lambda message: self.km.decrypt( + message.get_payload(1).get_payload(), ADDRESS)) + d.addCallback(check_decryption) + return d + + def test_message_encrypt_sign(self): + """ + Test if message gets encrypted to destination email and signed with + sender key. + '""" + def check_decryption_and_verify(res): + decrypted, signkey = res + self.assertEqual( + '\n' + self.expected_body, + decrypted, + 'Decrypted text differs from plaintext.') + self.assertTrue(ADDRESS_2 in signkey.address, + "Verification failed") + + d = self._set_sign_used(ADDRESS) + d.addCallback( + lambda _: + self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)) + d.addCallback(self._assert_encrypted) + d.addCallback(lambda message: self.km.decrypt( + message.get_payload(1).get_payload(), ADDRESS, verify=ADDRESS_2)) + d.addCallback(check_decryption_and_verify) + return d + + def test_message_sign(self): + """ + Test if message is signed with sender key. + """ + # mock the key fetching + self.km._fetch_keys_from_server = Mock( + return_value=fail(errors.KeyNotFound())) + recipient = User('ihavenopubkey@nonleap.se', + 'gateway.leap.se', self.proto, ADDRESS) + self.outgoing_mail = OutgoingMail( + self.fromAddr, self.km, self.opts.cert, self.opts.key, + self.opts.hostname, self.opts.port) + + def check_signed(res): + message, _ = res + self.assertTrue('Content-Type' in message) + self.assertEqual('multipart/signed', message.get_content_type()) + self.assertEqual('application/pgp-signature', + message.get_param('protocol')) + self.assertEqual('pgp-sha512', message.get_param('micalg')) + # assert content of message + body = (message.get_payload(0) + .get_payload(0) + .get_payload(decode=True)) + self.assertEqual(self.expected_body, + body) + # assert content of signature + self.assertTrue( + message.get_payload(1).get_payload().startswith( + '-----BEGIN PGP SIGNATURE-----\n'), + 'Message does not start with signature header.') + self.assertTrue( + message.get_payload(1).get_payload().endswith( + '-----END PGP SIGNATURE-----\n'), + 'Message does not end with signature footer.') + return message + + def verify(message): + # replace EOL before verifying (according to rfc3156) + fp = StringIO() + g = RFC3156CompliantGenerator( + fp, mangle_from_=False, maxheaderlen=76) + g.flatten(message.get_payload(0)) + signed_text = re.sub('\r?\n', '\r\n', + fp.getvalue()) + + def assert_verify(key): + self.assertTrue(ADDRESS_2 in key.address, + 'Signature could not be verified.') + + d = self.km.verify( + signed_text, ADDRESS_2, + detached_sig=message.get_payload(1).get_payload()) + d.addCallback(assert_verify) + return d + + d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient) + d.addCallback(check_signed) + d.addCallback(verify) + return d + + def test_attach_key(self): + d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) + d.addCallback(self._assert_encrypted) + d.addCallback(self._check_headers, self.lines[:4]) + d.addCallback(lambda message: self.km.decrypt( + message.get_payload(1).get_payload(), ADDRESS)) + d.addCallback(lambda (decrypted, _): + self._check_key_attachment(Parser().parsestr(decrypted))) + return d + + def test_attach_key_not_known(self): + unknown_address = "someunknownaddress@somewhere.com" + lines = deepcopy(self.lines) + lines[1] = "To: <%s>" % (unknown_address,) + raw = '\r\n'.join(lines) + dest = User(unknown_address, 'gateway.leap.se', self.proto, ADDRESS_2) + + d = self.outgoing_mail._maybe_encrypt_and_sign( + raw, dest, fetch_remote=False) + d.addCallback(lambda (message, _): + self._check_headers(message, lines[:4])) + d.addCallback(self._check_key_attachment) + d.addErrback(log.err) + return d + + def _check_headers(self, message, headers): + msgstr = message.as_string(unixfrom=False) + for header in headers: + self.assertTrue(header in msgstr, + "Missing header: %s" % (header,)) + return message + + def _check_key_attachment(self, message): + for payload in message.get_payload(): + if payload.is_multipart(): + return self._check_key_attachment(payload) + if 'application/pgp-keys' == payload.get_content_type(): + keylines = PUBLIC_KEY_2.split('\n') + key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1]) + self.assertTrue(key in payload.get_payload(decode=True), + "Key attachment don't match") + return + self.fail("No public key attachment found") + + def _set_sign_used(self, address): + def set_sign(key): + key.sign_used = True + return self.km.put_key(key) + + d = self.km.get_key(address, fetch_remote=False) + d.addCallback(set_sign) + return d + + def _assert_encrypted(self, res): + message, _ = res + self.assertTrue('Content-Type' in message) + self.assertEqual('multipart/encrypted', message.get_content_type()) + self.assertEqual('application/pgp-encrypted', + message.get_param('protocol')) + self.assertEqual(2, len(message.get_payload())) + self.assertEqual('application/pgp-encrypted', + message.get_payload(0).get_content_type()) + self.assertEqual('application/octet-stream', + message.get_payload(1).get_content_type()) + return message |