From b2c69f5d01c5d31967453a0de6bb55ec50c5b5e2 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Mon, 2 Feb 2015 14:21:31 -0600 Subject: Attach key for addresses without known key This seems to fix the problem with some headers dissapearing (#6692) --- src/leap/mail/outgoing/tests/test_outgoing.py | 68 ++++++++++++++++++--------- 1 file changed, 46 insertions(+), 22 deletions(-) (limited to 'src/leap/mail/outgoing/tests/test_outgoing.py') diff --git a/src/leap/mail/outgoing/tests/test_outgoing.py b/src/leap/mail/outgoing/tests/test_outgoing.py index 0eb05c8..2376da9 100644 --- a/src/leap/mail/outgoing/tests/test_outgoing.py +++ b/src/leap/mail/outgoing/tests/test_outgoing.py @@ -21,6 +21,7 @@ SMTP gateway tests. """ import re +from StringIO import StringIO from email.parser import Parser from datetime import datetime from twisted.internet.defer import fail @@ -29,6 +30,7 @@ from twisted.mail.smtp import User from mock import Mock from leap.mail.smtp.gateway import SMTPFactory +from leap.mail.smtp.rfc3156 import RFC3156CompliantGenerator from leap.mail.outgoing.service import OutgoingMail from leap.mail.tests import ( TestCaseWithKeyManager, @@ -149,8 +151,11 @@ class TestOutgoingMail(TestCaseWithKeyManager): message.get_param('protocol')) self.assertEqual('pgp-sha512', message.get_param('micalg')) # assert content of message + body = (message.get_payload(0) + .get_payload(0) + .get_payload(decode=True)) self.assertEqual(self.expected_body, - message.get_payload(0).get_payload(decode=True)) + body) # assert content of signature self.assertTrue( message.get_payload(1).get_payload().startswith( @@ -164,8 +169,12 @@ class TestOutgoingMail(TestCaseWithKeyManager): def verify(message): # replace EOL before verifying (according to rfc3156) + fp = StringIO() + g = RFC3156CompliantGenerator( + fp, mangle_from_=False, maxheaderlen=76) + g.flatten(message.get_payload(0)) signed_text = re.sub('\r?\n', '\r\n', - message.get_payload(0).as_string()) + fp.getvalue()) def assert_verify(key): self.assertTrue(ADDRESS_2 in key.address, @@ -183,32 +192,47 @@ class TestOutgoingMail(TestCaseWithKeyManager): return d def test_attach_key(self): - def check_headers(message): - msgstr = message.as_string(unixfrom=False) - for header in self.EMAIL_DATA[4:8]: - self.assertTrue(header in msgstr, - "Missing header: %s" % (header,)) - return message - - def check_attachment((decrypted, _)): - msg = Parser().parsestr(decrypted) - for payload in msg.get_payload(): - if 'application/pgp-keys' == payload.get_content_type(): - keylines = PUBLIC_KEY_2.split('\n') - key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1]) - self.assertTrue(key in payload.get_payload(), - "Key attachment don't match") - return - self.fail("No public key attachment found") - d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) d.addCallback(self._assert_encrypted) - d.addCallback(check_headers) + d.addCallback(self._check_headers, self.lines[:4]) d.addCallback(lambda message: self._km.decrypt( message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey)) - d.addCallback(check_attachment) + d.addCallback(lambda (decrypted, _): + self._check_key_attachment(Parser().parsestr(decrypted))) return d + def test_attach_key_not_known(self): + address = "someunknownaddress@somewhere.com" + lines = self.lines + lines[1] = "To: <%s>" % (address,) + raw = '\r\n'.join(lines) + dest = User(address, 'gateway.leap.se', self.proto, ADDRESS_2) + + d = self.outgoing_mail._maybe_encrypt_and_sign(raw, dest) + d.addCallback(lambda (message, _): + self._check_headers(message, lines[:4])) + d.addCallback(self._check_key_attachment) + return d + + def _check_headers(self, message, headers): + msgstr = message.as_string(unixfrom=False) + for header in headers: + self.assertTrue(header in msgstr, + "Missing header: %s" % (header,)) + return message + + def _check_key_attachment(self, message): + for payload in message.get_payload(): + if payload.is_multipart(): + return self._check_key_attachment(payload) + if 'application/pgp-keys' == payload.get_content_type(): + keylines = PUBLIC_KEY_2.split('\n') + key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1]) + self.assertTrue(key in payload.get_payload(decode=True), + "Key attachment don't match") + return + self.fail("No public key attachment found") + def _set_sign_used(self, address): def set_sign(key): key.sign_used = True -- cgit v1.2.3