From eee94a3dde0d8ecdfed81ef6e7ffa1f950250b72 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 24 Aug 2016 13:17:50 -0400 Subject: [tests] adapt and fix outgoing module tests keymanager was hanging because it was trying to fetch a nonexistent key. therefore, fetch_remote flag has to be passed along. --- mail/src/leap/mail/outgoing/service.py | 12 +++-- mail/src/leap/mail/outgoing/tests/test_outgoing.py | 58 ++++++++++++---------- 2 files changed, 41 insertions(+), 29 deletions(-) (limited to 'mail') diff --git a/mail/src/leap/mail/outgoing/service.py b/mail/src/leap/mail/outgoing/service.py index 05c3bed..7699ce7 100644 --- a/mail/src/leap/mail/outgoing/service.py +++ b/mail/src/leap/mail/outgoing/service.py @@ -230,7 +230,7 @@ class OutgoingMail(object): self._host, self._port, factory, contextFactory=SSLContextFactory(self._cert, self._key)) - def _maybe_encrypt_and_sign(self, raw, recipient): + def _maybe_encrypt_and_sign(self, raw, recipient, fetch_remote=True): """ Attempt to encrypt and sign the outgoing message. @@ -280,7 +280,9 @@ class OutgoingMail(object): to_address = validate_address(recipient.dest.addrstr) def maybe_encrypt_and_sign(message): - d = self._encrypt_and_sign(message, to_address, from_address) + d = self._encrypt_and_sign( + message, to_address, from_address, + fetch_remote=fetch_remote) d.addCallbacks(signal_encrypt_sign, if_key_not_found_send_unencrypted, errbackArgs=(message,)) @@ -351,7 +353,8 @@ class OutgoingMail(object): d.addErrback(lambda _: origmsg) return d - def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address): + def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address, + fetch_remote=True): """ Create an RFC 3156 compliang PGP encrypted and signed message using C{encrypt_address} to encrypt and C{sign_address} to sign. @@ -372,7 +375,8 @@ class OutgoingMail(object): newmsg, origmsg = res d = self._keymanager.encrypt( origmsg.as_string(unixfrom=False), - encrypt_address, sign=sign_address) + encrypt_address, sign=sign_address, + fetch_remote=fetch_remote) d.addCallback(lambda encstr: (newmsg, encstr)) return d diff --git a/mail/src/leap/mail/outgoing/tests/test_outgoing.py b/mail/src/leap/mail/outgoing/tests/test_outgoing.py index 12a72a7..dd053c1 100644 --- a/mail/src/leap/mail/outgoing/tests/test_outgoing.py +++ b/mail/src/leap/mail/outgoing/tests/test_outgoing.py @@ -19,22 +19,22 @@ """ SMTP gateway tests. """ - import re +from copy import deepcopy from StringIO import StringIO from email.parser import Parser from datetime import datetime from twisted.internet.defer import fail from twisted.mail.smtp import User +from twisted.python import log from mock import Mock from leap.mail.rfc3156 import RFC3156CompliantGenerator from leap.mail.outgoing.service import OutgoingMail -from leap.mail.tests import TestCaseWithKeyManager -from leap.mail.tests import ADDRESS, ADDRESS_2, PUBLIC_KEY_2 -from leap.mail.smtp.tests.test_gateway import getSMTPFactory - +from leap.mail.testing import ADDRESS, ADDRESS_2, PUBLIC_KEY_2 +from leap.mail.testing import KeyManagerWithSoledadTestCase +from leap.mail.testing.smtp import getSMTPFactory from leap.keymanager import errors @@ -43,7 +43,7 @@ BEGIN_PUBLIC_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----" TEST_USER = u'anotheruser@leap.se' -class TestOutgoingMail(TestCaseWithKeyManager): +class TestOutgoingMail(KeyManagerWithSoledadTestCase): EMAIL_DATA = ['HELO gateway.leap.se', 'MAIL FROM: <%s>' % ADDRESS_2, 'RCPT TO: <%s>' % ADDRESS, @@ -67,20 +67,26 @@ class TestOutgoingMail(TestCaseWithKeyManager): self.expected_body = '\r\n'.join(self.EMAIL_DATA[9:12]) + "\r\n" self.fromAddr = ADDRESS_2 + class opts: + cert = u'/tmp/cert' + key = u'/tmp/cert' + hostname = 'remote' + port = 666 + self.opts = opts + def init_outgoing_and_proto(_): self.outgoing_mail = OutgoingMail( - self.fromAddr, self._km, self._config['cert'], - self._config['key'], self._config['host'], - self._config['port']) + self.fromAddr, self.km, opts.cert, + opts.key, opts.hostname, opts.port) user = TEST_USER # TODO -- this shouldn't need SMTP to be tested!? or does it? self.proto = getSMTPFactory( - {user: None}, {user: self._km}, {user: None}) + {user: None}, {user: self.km}, {user: None}) self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS_2) - d = TestCaseWithKeyManager.setUp(self) + d = KeyManagerWithSoledadTestCase.setUp(self) d.addCallback(init_outgoing_and_proto) return d @@ -100,7 +106,7 @@ class TestOutgoingMail(TestCaseWithKeyManager): lambda _: self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)) d.addCallback(self._assert_encrypted) - d.addCallback(lambda message: self._km.decrypt( + d.addCallback(lambda message: self.km.decrypt( message.get_payload(1).get_payload(), ADDRESS)) d.addCallback(check_decryption) return d @@ -124,7 +130,7 @@ class TestOutgoingMail(TestCaseWithKeyManager): lambda _: self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)) d.addCallback(self._assert_encrypted) - d.addCallback(lambda message: self._km.decrypt( + d.addCallback(lambda message: self.km.decrypt( message.get_payload(1).get_payload(), ADDRESS, verify=ADDRESS_2)) d.addCallback(check_decryption_and_verify) return d @@ -134,13 +140,13 @@ class TestOutgoingMail(TestCaseWithKeyManager): Test if message is signed with sender key. """ # mock the key fetching - self._km._fetch_keys_from_server = Mock( + self.km._fetch_keys_from_server = Mock( return_value=fail(errors.KeyNotFound())) recipient = User('ihavenopubkey@nonleap.se', 'gateway.leap.se', self.proto, ADDRESS) self.outgoing_mail = OutgoingMail( - self.fromAddr, self._km, self._config['cert'], self._config['key'], - self._config['host'], self._config['port']) + self.fromAddr, self.km, self.opts.cert, self.opts.key, + self.opts.hostname, self.opts.port) def check_signed(res): message, _ = res @@ -179,7 +185,7 @@ class TestOutgoingMail(TestCaseWithKeyManager): self.assertTrue(ADDRESS_2 in key.address, 'Signature could not be verified.') - d = self._km.verify( + d = self.km.verify( signed_text, ADDRESS_2, detached_sig=message.get_payload(1).get_payload()) d.addCallback(assert_verify) @@ -194,23 +200,25 @@ class TestOutgoingMail(TestCaseWithKeyManager): d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) d.addCallback(self._assert_encrypted) d.addCallback(self._check_headers, self.lines[:4]) - d.addCallback(lambda message: self._km.decrypt( + d.addCallback(lambda message: self.km.decrypt( message.get_payload(1).get_payload(), ADDRESS)) d.addCallback(lambda (decrypted, _): self._check_key_attachment(Parser().parsestr(decrypted))) return d def test_attach_key_not_known(self): - address = "someunknownaddress@somewhere.com" - lines = self.lines - lines[1] = "To: <%s>" % (address,) + unknown_address = "someunknownaddress@somewhere.com" + lines = deepcopy(self.lines) + lines[1] = "To: <%s>" % (unknown_address,) raw = '\r\n'.join(lines) - dest = User(address, 'gateway.leap.se', self.proto, ADDRESS_2) + dest = User(unknown_address, 'gateway.leap.se', self.proto, ADDRESS_2) - d = self.outgoing_mail._maybe_encrypt_and_sign(raw, dest) + d = self.outgoing_mail._maybe_encrypt_and_sign( + raw, dest, fetch_remote=False) d.addCallback(lambda (message, _): self._check_headers(message, lines[:4])) d.addCallback(self._check_key_attachment) + d.addErrback(log.err) return d def _check_headers(self, message, headers): @@ -235,9 +243,9 @@ class TestOutgoingMail(TestCaseWithKeyManager): def _set_sign_used(self, address): def set_sign(key): key.sign_used = True - return self._km.put_key(key) + return self.km.put_key(key) - d = self._km.get_key(address, fetch_remote=False) + d = self.km.get_key(address, fetch_remote=False) d.addCallback(set_sign) return d -- cgit v1.2.3