summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2016-08-24 13:17:50 -0400
committerKali Kaneko <kali@leap.se>2016-08-25 11:13:19 -0400
commiteee94a3dde0d8ecdfed81ef6e7ffa1f950250b72 (patch)
treefb5e9a55bfe6476a02c6e3adde350924ae730de5
parent96a6de7d780a2532fa15af17f0ccaa713bb7b469 (diff)
[tests] adapt and fix outgoing module tests
keymanager was hanging because it was trying to fetch a nonexistent key. therefore, fetch_remote flag has to be passed along.
-rw-r--r--mail/src/leap/mail/outgoing/service.py12
-rw-r--r--mail/src/leap/mail/outgoing/tests/test_outgoing.py58
2 files changed, 41 insertions, 29 deletions
diff --git a/mail/src/leap/mail/outgoing/service.py b/mail/src/leap/mail/outgoing/service.py
index 05c3bed2..7699ce79 100644
--- a/mail/src/leap/mail/outgoing/service.py
+++ b/mail/src/leap/mail/outgoing/service.py
@@ -230,7 +230,7 @@ class OutgoingMail(object):
self._host, self._port, factory,
contextFactory=SSLContextFactory(self._cert, self._key))
- def _maybe_encrypt_and_sign(self, raw, recipient):
+ def _maybe_encrypt_and_sign(self, raw, recipient, fetch_remote=True):
"""
Attempt to encrypt and sign the outgoing message.
@@ -280,7 +280,9 @@ class OutgoingMail(object):
to_address = validate_address(recipient.dest.addrstr)
def maybe_encrypt_and_sign(message):
- d = self._encrypt_and_sign(message, to_address, from_address)
+ d = self._encrypt_and_sign(
+ message, to_address, from_address,
+ fetch_remote=fetch_remote)
d.addCallbacks(signal_encrypt_sign,
if_key_not_found_send_unencrypted,
errbackArgs=(message,))
@@ -351,7 +353,8 @@ class OutgoingMail(object):
d.addErrback(lambda _: origmsg)
return d
- def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address):
+ def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address,
+ fetch_remote=True):
"""
Create an RFC 3156 compliang PGP encrypted and signed message using
C{encrypt_address} to encrypt and C{sign_address} to sign.
@@ -372,7 +375,8 @@ class OutgoingMail(object):
newmsg, origmsg = res
d = self._keymanager.encrypt(
origmsg.as_string(unixfrom=False),
- encrypt_address, sign=sign_address)
+ encrypt_address, sign=sign_address,
+ fetch_remote=fetch_remote)
d.addCallback(lambda encstr: (newmsg, encstr))
return d
diff --git a/mail/src/leap/mail/outgoing/tests/test_outgoing.py b/mail/src/leap/mail/outgoing/tests/test_outgoing.py
index 12a72a70..dd053c15 100644
--- a/mail/src/leap/mail/outgoing/tests/test_outgoing.py
+++ b/mail/src/leap/mail/outgoing/tests/test_outgoing.py
@@ -19,22 +19,22 @@
"""
SMTP gateway tests.
"""
-
import re
+from copy import deepcopy
from StringIO import StringIO
from email.parser import Parser
from datetime import datetime
from twisted.internet.defer import fail
from twisted.mail.smtp import User
+from twisted.python import log
from mock import Mock
from leap.mail.rfc3156 import RFC3156CompliantGenerator
from leap.mail.outgoing.service import OutgoingMail
-from leap.mail.tests import TestCaseWithKeyManager
-from leap.mail.tests import ADDRESS, ADDRESS_2, PUBLIC_KEY_2
-from leap.mail.smtp.tests.test_gateway import getSMTPFactory
-
+from leap.mail.testing import ADDRESS, ADDRESS_2, PUBLIC_KEY_2
+from leap.mail.testing import KeyManagerWithSoledadTestCase
+from leap.mail.testing.smtp import getSMTPFactory
from leap.keymanager import errors
@@ -43,7 +43,7 @@ BEGIN_PUBLIC_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----"
TEST_USER = u'anotheruser@leap.se'
-class TestOutgoingMail(TestCaseWithKeyManager):
+class TestOutgoingMail(KeyManagerWithSoledadTestCase):
EMAIL_DATA = ['HELO gateway.leap.se',
'MAIL FROM: <%s>' % ADDRESS_2,
'RCPT TO: <%s>' % ADDRESS,
@@ -67,20 +67,26 @@ class TestOutgoingMail(TestCaseWithKeyManager):
self.expected_body = '\r\n'.join(self.EMAIL_DATA[9:12]) + "\r\n"
self.fromAddr = ADDRESS_2
+ class opts:
+ cert = u'/tmp/cert'
+ key = u'/tmp/cert'
+ hostname = 'remote'
+ port = 666
+ self.opts = opts
+
def init_outgoing_and_proto(_):
self.outgoing_mail = OutgoingMail(
- self.fromAddr, self._km, self._config['cert'],
- self._config['key'], self._config['host'],
- self._config['port'])
+ self.fromAddr, self.km, opts.cert,
+ opts.key, opts.hostname, opts.port)
user = TEST_USER
# TODO -- this shouldn't need SMTP to be tested!? or does it?
self.proto = getSMTPFactory(
- {user: None}, {user: self._km}, {user: None})
+ {user: None}, {user: self.km}, {user: None})
self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS_2)
- d = TestCaseWithKeyManager.setUp(self)
+ d = KeyManagerWithSoledadTestCase.setUp(self)
d.addCallback(init_outgoing_and_proto)
return d
@@ -100,7 +106,7 @@ class TestOutgoingMail(TestCaseWithKeyManager):
lambda _:
self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))
d.addCallback(self._assert_encrypted)
- d.addCallback(lambda message: self._km.decrypt(
+ d.addCallback(lambda message: self.km.decrypt(
message.get_payload(1).get_payload(), ADDRESS))
d.addCallback(check_decryption)
return d
@@ -124,7 +130,7 @@ class TestOutgoingMail(TestCaseWithKeyManager):
lambda _:
self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))
d.addCallback(self._assert_encrypted)
- d.addCallback(lambda message: self._km.decrypt(
+ d.addCallback(lambda message: self.km.decrypt(
message.get_payload(1).get_payload(), ADDRESS, verify=ADDRESS_2))
d.addCallback(check_decryption_and_verify)
return d
@@ -134,13 +140,13 @@ class TestOutgoingMail(TestCaseWithKeyManager):
Test if message is signed with sender key.
"""
# mock the key fetching
- self._km._fetch_keys_from_server = Mock(
+ self.km._fetch_keys_from_server = Mock(
return_value=fail(errors.KeyNotFound()))
recipient = User('ihavenopubkey@nonleap.se',
'gateway.leap.se', self.proto, ADDRESS)
self.outgoing_mail = OutgoingMail(
- self.fromAddr, self._km, self._config['cert'], self._config['key'],
- self._config['host'], self._config['port'])
+ self.fromAddr, self.km, self.opts.cert, self.opts.key,
+ self.opts.hostname, self.opts.port)
def check_signed(res):
message, _ = res
@@ -179,7 +185,7 @@ class TestOutgoingMail(TestCaseWithKeyManager):
self.assertTrue(ADDRESS_2 in key.address,
'Signature could not be verified.')
- d = self._km.verify(
+ d = self.km.verify(
signed_text, ADDRESS_2,
detached_sig=message.get_payload(1).get_payload())
d.addCallback(assert_verify)
@@ -194,23 +200,25 @@ class TestOutgoingMail(TestCaseWithKeyManager):
d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
d.addCallback(self._assert_encrypted)
d.addCallback(self._check_headers, self.lines[:4])
- d.addCallback(lambda message: self._km.decrypt(
+ d.addCallback(lambda message: self.km.decrypt(
message.get_payload(1).get_payload(), ADDRESS))
d.addCallback(lambda (decrypted, _):
self._check_key_attachment(Parser().parsestr(decrypted)))
return d
def test_attach_key_not_known(self):
- address = "someunknownaddress@somewhere.com"
- lines = self.lines
- lines[1] = "To: <%s>" % (address,)
+ unknown_address = "someunknownaddress@somewhere.com"
+ lines = deepcopy(self.lines)
+ lines[1] = "To: <%s>" % (unknown_address,)
raw = '\r\n'.join(lines)
- dest = User(address, 'gateway.leap.se', self.proto, ADDRESS_2)
+ dest = User(unknown_address, 'gateway.leap.se', self.proto, ADDRESS_2)
- d = self.outgoing_mail._maybe_encrypt_and_sign(raw, dest)
+ d = self.outgoing_mail._maybe_encrypt_and_sign(
+ raw, dest, fetch_remote=False)
d.addCallback(lambda (message, _):
self._check_headers(message, lines[:4]))
d.addCallback(self._check_key_attachment)
+ d.addErrback(log.err)
return d
def _check_headers(self, message, headers):
@@ -235,9 +243,9 @@ class TestOutgoingMail(TestCaseWithKeyManager):
def _set_sign_used(self, address):
def set_sign(key):
key.sign_used = True
- return self._km.put_key(key)
+ return self.km.put_key(key)
- d = self._km.get_key(address, fetch_remote=False)
+ d = self.km.get_key(address, fetch_remote=False)
d.addCallback(set_sign)
return d