diff options
| author | Kali Kaneko <kali@leap.se> | 2016-08-24 13:17:50 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2016-08-25 11:13:19 -0400 | 
| commit | eee94a3dde0d8ecdfed81ef6e7ffa1f950250b72 (patch) | |
| tree | fb5e9a55bfe6476a02c6e3adde350924ae730de5 | |
| parent | 96a6de7d780a2532fa15af17f0ccaa713bb7b469 (diff) | |
[tests] adapt and fix outgoing module tests
keymanager was hanging because it was trying to fetch a nonexistent key.
therefore, fetch_remote flag has to be passed along.
| -rw-r--r-- | mail/src/leap/mail/outgoing/service.py | 12 | ||||
| -rw-r--r-- | mail/src/leap/mail/outgoing/tests/test_outgoing.py | 58 | 
2 files changed, 41 insertions, 29 deletions
| diff --git a/mail/src/leap/mail/outgoing/service.py b/mail/src/leap/mail/outgoing/service.py index 05c3bed2..7699ce79 100644 --- a/mail/src/leap/mail/outgoing/service.py +++ b/mail/src/leap/mail/outgoing/service.py @@ -230,7 +230,7 @@ class OutgoingMail(object):              self._host, self._port, factory,              contextFactory=SSLContextFactory(self._cert, self._key)) -    def _maybe_encrypt_and_sign(self, raw, recipient): +    def _maybe_encrypt_and_sign(self, raw, recipient, fetch_remote=True):          """          Attempt to encrypt and sign the outgoing message. @@ -280,7 +280,9 @@ class OutgoingMail(object):          to_address = validate_address(recipient.dest.addrstr)          def maybe_encrypt_and_sign(message): -            d = self._encrypt_and_sign(message, to_address, from_address) +            d = self._encrypt_and_sign( +                message, to_address, from_address, +                fetch_remote=fetch_remote)              d.addCallbacks(signal_encrypt_sign,                             if_key_not_found_send_unencrypted,                             errbackArgs=(message,)) @@ -351,7 +353,8 @@ class OutgoingMail(object):          d.addErrback(lambda _: origmsg)          return d -    def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address): +    def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address, +            fetch_remote=True):          """          Create an RFC 3156 compliang PGP encrypted and signed message using          C{encrypt_address} to encrypt and C{sign_address} to sign. @@ -372,7 +375,8 @@ class OutgoingMail(object):              newmsg, origmsg = res              d = self._keymanager.encrypt(                  origmsg.as_string(unixfrom=False), -                encrypt_address, sign=sign_address) +                encrypt_address, sign=sign_address, +                fetch_remote=fetch_remote)              d.addCallback(lambda encstr: (newmsg, encstr))              return d diff --git a/mail/src/leap/mail/outgoing/tests/test_outgoing.py b/mail/src/leap/mail/outgoing/tests/test_outgoing.py index 12a72a70..dd053c15 100644 --- a/mail/src/leap/mail/outgoing/tests/test_outgoing.py +++ b/mail/src/leap/mail/outgoing/tests/test_outgoing.py @@ -19,22 +19,22 @@  """  SMTP gateway tests.  """ -  import re +from copy import deepcopy  from StringIO import StringIO  from email.parser import Parser  from datetime import datetime  from twisted.internet.defer import fail  from twisted.mail.smtp import User +from twisted.python import log  from mock import Mock  from leap.mail.rfc3156 import RFC3156CompliantGenerator  from leap.mail.outgoing.service import OutgoingMail -from leap.mail.tests import TestCaseWithKeyManager -from leap.mail.tests import ADDRESS, ADDRESS_2, PUBLIC_KEY_2 -from leap.mail.smtp.tests.test_gateway import getSMTPFactory - +from leap.mail.testing import ADDRESS, ADDRESS_2, PUBLIC_KEY_2 +from leap.mail.testing import KeyManagerWithSoledadTestCase +from leap.mail.testing.smtp import getSMTPFactory  from leap.keymanager import errors @@ -43,7 +43,7 @@ BEGIN_PUBLIC_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----"  TEST_USER = u'anotheruser@leap.se' -class TestOutgoingMail(TestCaseWithKeyManager): +class TestOutgoingMail(KeyManagerWithSoledadTestCase):      EMAIL_DATA = ['HELO gateway.leap.se',                    'MAIL FROM: <%s>' % ADDRESS_2,                    'RCPT TO: <%s>' % ADDRESS, @@ -67,20 +67,26 @@ class TestOutgoingMail(TestCaseWithKeyManager):          self.expected_body = '\r\n'.join(self.EMAIL_DATA[9:12]) + "\r\n"          self.fromAddr = ADDRESS_2 +        class opts: +            cert = u'/tmp/cert' +            key = u'/tmp/cert' +            hostname = 'remote' +            port = 666 +        self.opts = opts +          def init_outgoing_and_proto(_):              self.outgoing_mail = OutgoingMail( -                self.fromAddr, self._km, self._config['cert'], -                self._config['key'], self._config['host'], -                self._config['port']) +                self.fromAddr, self.km, opts.cert, +                opts.key, opts.hostname, opts.port)              user = TEST_USER              # TODO -- this shouldn't need SMTP to be tested!? or does it?              self.proto = getSMTPFactory( -                {user: None}, {user: self._km}, {user: None}) +                {user: None}, {user: self.km}, {user: None})              self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS_2) -        d = TestCaseWithKeyManager.setUp(self) +        d = KeyManagerWithSoledadTestCase.setUp(self)          d.addCallback(init_outgoing_and_proto)          return d @@ -100,7 +106,7 @@ class TestOutgoingMail(TestCaseWithKeyManager):              lambda _:              self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))          d.addCallback(self._assert_encrypted) -        d.addCallback(lambda message: self._km.decrypt( +        d.addCallback(lambda message: self.km.decrypt(              message.get_payload(1).get_payload(), ADDRESS))          d.addCallback(check_decryption)          return d @@ -124,7 +130,7 @@ class TestOutgoingMail(TestCaseWithKeyManager):              lambda _:              self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))          d.addCallback(self._assert_encrypted) -        d.addCallback(lambda message: self._km.decrypt( +        d.addCallback(lambda message: self.km.decrypt(              message.get_payload(1).get_payload(), ADDRESS, verify=ADDRESS_2))          d.addCallback(check_decryption_and_verify)          return d @@ -134,13 +140,13 @@ class TestOutgoingMail(TestCaseWithKeyManager):          Test if message is signed with sender key.          """          # mock the key fetching -        self._km._fetch_keys_from_server = Mock( +        self.km._fetch_keys_from_server = Mock(              return_value=fail(errors.KeyNotFound()))          recipient = User('ihavenopubkey@nonleap.se',                           'gateway.leap.se', self.proto, ADDRESS)          self.outgoing_mail = OutgoingMail( -            self.fromAddr, self._km, self._config['cert'], self._config['key'], -            self._config['host'], self._config['port']) +            self.fromAddr, self.km, self.opts.cert, self.opts.key, +            self.opts.hostname, self.opts.port)          def check_signed(res):              message, _ = res @@ -179,7 +185,7 @@ class TestOutgoingMail(TestCaseWithKeyManager):                  self.assertTrue(ADDRESS_2 in key.address,                                  'Signature could not be verified.') -            d = self._km.verify( +            d = self.km.verify(                  signed_text, ADDRESS_2,                  detached_sig=message.get_payload(1).get_payload())              d.addCallback(assert_verify) @@ -194,23 +200,25 @@ class TestOutgoingMail(TestCaseWithKeyManager):          d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)          d.addCallback(self._assert_encrypted)          d.addCallback(self._check_headers, self.lines[:4]) -        d.addCallback(lambda message: self._km.decrypt( +        d.addCallback(lambda message: self.km.decrypt(              message.get_payload(1).get_payload(), ADDRESS))          d.addCallback(lambda (decrypted, _):                        self._check_key_attachment(Parser().parsestr(decrypted)))          return d      def test_attach_key_not_known(self): -        address = "someunknownaddress@somewhere.com" -        lines = self.lines -        lines[1] = "To: <%s>" % (address,) +        unknown_address = "someunknownaddress@somewhere.com" +        lines = deepcopy(self.lines) +        lines[1] = "To: <%s>" % (unknown_address,)          raw = '\r\n'.join(lines) -        dest = User(address, 'gateway.leap.se', self.proto, ADDRESS_2) +        dest = User(unknown_address, 'gateway.leap.se', self.proto, ADDRESS_2) -        d = self.outgoing_mail._maybe_encrypt_and_sign(raw, dest) +        d = self.outgoing_mail._maybe_encrypt_and_sign( +                raw, dest, fetch_remote=False)          d.addCallback(lambda (message, _):                        self._check_headers(message, lines[:4]))          d.addCallback(self._check_key_attachment) +        d.addErrback(log.err)          return d      def _check_headers(self, message, headers): @@ -235,9 +243,9 @@ class TestOutgoingMail(TestCaseWithKeyManager):      def _set_sign_used(self, address):          def set_sign(key):              key.sign_used = True -            return self._km.put_key(key) +            return self.km.put_key(key) -        d = self._km.get_key(address, fetch_remote=False) +        d = self.km.get_key(address, fetch_remote=False)          d.addCallback(set_sign)          return d | 
