summaryrefslogtreecommitdiff
path: root/src/leap/mail/smtp/tests/test_gateway.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/smtp/tests/test_gateway.py')
-rw-r--r--src/leap/mail/smtp/tests/test_gateway.py263
1 files changed, 63 insertions, 200 deletions
diff --git a/src/leap/mail/smtp/tests/test_gateway.py b/src/leap/mail/smtp/tests/test_gateway.py
index 466677f..0b9a364 100644
--- a/src/leap/mail/smtp/tests/test_gateway.py
+++ b/src/leap/mail/smtp/tests/test_gateway.py
@@ -20,24 +20,24 @@
SMTP gateway tests.
"""
-
import re
from datetime import datetime
+
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, fail, succeed, Deferred
from twisted.test import proto_helpers
-from twisted.mail.smtp import User, Address
from mock import Mock
-
from leap.mail.smtp.gateway import (
- SMTPFactory,
- EncryptedMessage,
+ SMTPFactory
)
-from leap.mail.smtp.tests import (
+from leap.mail.tests import (
TestCaseWithKeyManager,
ADDRESS,
ADDRESS_2,
)
-from leap.keymanager import openpgp
+from leap.keymanager import openpgp, errors
+
# some regexps
IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \
@@ -71,20 +71,7 @@ class TestSmtpGateway(TestCaseWithKeyManager):
% (string, pattern))
raise self.failureException(msg)
- def test_openpgp_encrypt_decrypt(self):
- "Test if openpgp can encrypt and decrypt."
- text = "simple raw text"
- pubkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=False)
- encrypted = self._km.encrypt(text, pubkey)
- self.assertNotEqual(
- text, encrypted, "Ciphertext is equal to plaintext.")
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(encrypted, privkey)
- self.assertEqual(text, decrypted,
- "Decrypted text differs from plaintext.")
-
+ @inlineCallbacks
def test_gateway_accepts_valid_email(self):
"""
Test if SMTP server responds correctly for valid interaction.
@@ -102,223 +89,99 @@ class TestSmtpGateway(TestCaseWithKeyManager):
# method...
proto = SMTPFactory(
u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
+ self._km,
+ self._config['encrypted_only'],
+ outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
# snip...
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
+ reply = ""
for i, line in enumerate(self.EMAIL_DATA):
- proto.lineReceived(line + '\r\n')
- self.assertMatch(transport.value(),
- '\r\n'.join(SMTP_ANSWERS[0:i + 1]),
- 'Did not get expected answer from gateway.')
+ reply += yield self.getReply(line + '\r\n', proto, transport)
+ self.assertMatch(reply, '\r\n'.join(SMTP_ANSWERS),
+ 'Did not get expected answer from gateway.')
proto.setTimeout(None)
- def test_message_encrypt(self):
- """
- Test if message gets encrypted to destination email.
- """
- proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
- fromAddr = Address(ADDRESS_2)
- dest = User(ADDRESS, 'gateway.leap.se', proto, ADDRESS)
- m = EncryptedMessage(
- fromAddr, dest, self._km, self._config['host'],
- self._config['port'], self._config['cert'], self._config['key'])
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- # m.eomReceived() # this includes a defer, so we avoid calling it here
- m.lines.append('') # add a trailing newline
- # we need to call the following explicitelly because it was deferred
- # inside the previous method
- m._maybe_encrypt_and_sign()
- # assert structure of encrypted message
- self.assertTrue('Content-Type' in m._msg)
- self.assertEqual('multipart/encrypted', m._msg.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_param('protocol'))
- self.assertEqual(2, len(m._msg.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- m._msg.get_payload(1).get_content_type())
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(
- m._msg.get_payload(1).get_payload(), privkey)
- self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- def test_message_encrypt_sign(self):
- """
- Test if message gets encrypted to destination email and signed with
- sender key.
- """
- proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
- user = User(ADDRESS, 'gateway.leap.se', proto, ADDRESS)
- fromAddr = Address(ADDRESS_2)
- m = EncryptedMessage(
- fromAddr, user, self._km, self._config['host'],
- self._config['port'], self._config['cert'], self._config['key'])
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- # trigger encryption and signing
- # m.eomReceived() # this includes a defer, so we avoid calling it here
- m.lines.append('') # add a trailing newline
- # we need to call the following explicitelly because it was deferred
- # inside the previous method
- m._maybe_encrypt_and_sign()
- # assert structure of encrypted message
- self.assertTrue('Content-Type' in m._msg)
- self.assertEqual('multipart/encrypted', m._msg.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_param('protocol'))
- self.assertEqual(2, len(m._msg.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- m._msg.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- m._msg.get_payload(1).get_content_type())
- # decrypt and verify
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- decrypted = self._km.decrypt(
- m._msg.get_payload(1).get_payload(), privkey, verify=pubkey)
- self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- def test_message_sign(self):
- """
- Test if message is signed with sender key.
- """
- # mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
- proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
- user = User('ihavenopubkey@nonleap.se',
- 'gateway.leap.se', proto, ADDRESS)
- fromAddr = Address(ADDRESS_2)
- m = EncryptedMessage(
- fromAddr, user, self._km, self._config['host'],
- self._config['port'], self._config['cert'], self._config['key'])
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- # trigger signing
- # m.eomReceived() # this includes a defer, so we avoid calling it here
- m.lines.append('') # add a trailing newline
- # we need to call the following explicitelly because it was deferred
- # inside the previous method
- m._maybe_encrypt_and_sign()
- # assert structure of signed message
- self.assertTrue('Content-Type' in m._msg)
- self.assertEqual('multipart/signed', m._msg.get_content_type())
- self.assertEqual('application/pgp-signature',
- m._msg.get_param('protocol'))
- self.assertEqual('pgp-sha512', m._msg.get_param('micalg'))
- # assert content of message
- self.assertEqual(
- '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- m._msg.get_payload(0).get_payload(decode=True))
- # assert content of signature
- self.assertTrue(
- m._msg.get_payload(1).get_payload().startswith(
- '-----BEGIN PGP SIGNATURE-----\n'),
- 'Message does not start with signature header.')
- self.assertTrue(
- m._msg.get_payload(1).get_payload().endswith(
- '-----END PGP SIGNATURE-----\n'),
- 'Message does not end with signature footer.')
- # assert signature is valid
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- # replace EOL before verifying (according to rfc3156)
- signed_text = re.sub('\r?\n', '\r\n',
- m._msg.get_payload(0).as_string())
- self.assertTrue(
- self._km.verify(signed_text,
- pubkey,
- detached_sig=m._msg.get_payload(1).get_payload()),
- 'Signature could not be verified.')
-
+ @inlineCallbacks
def test_missing_key_rejects_address(self):
"""
Test if server rejects to send unencrypted when 'encrypted_only' is
True.
"""
# remove key from key manager
- pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
+ pubkey = yield self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.GPG_BINARY_PATH)
- pgp.delete_key(pubkey)
+ yield pgp.delete_key(pubkey)
# mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
+ self._km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
# prepare the SMTP factory
proto = SMTPFactory(
u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- self._config['encrypted_only']).buildProtocol(('127.0.0.1', 0))
+ self._km,
+ self._config['encrypted_only'],
+ outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
- proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
- proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
- proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
+ yield self.getReply(self.EMAIL_DATA[0] + '\r\n', proto, transport)
+ yield self.getReply(self.EMAIL_DATA[1] + '\r\n', proto, transport)
+ reply = yield self.getReply(self.EMAIL_DATA[2] + '\r\n',
+ proto, transport)
# ensure the address was rejected
- lines = transport.value().rstrip().split('\n')
self.assertEqual(
- '550 Cannot receive for specified address',
- lines[-1],
+ '550 Cannot receive for specified address\r\n',
+ reply,
'Address should have been rejecetd with appropriate message.')
+ proto.setTimeout(None)
+ @inlineCallbacks
def test_missing_key_accepts_address(self):
"""
Test if server accepts to send unencrypted when 'encrypted_only' is
False.
"""
# remove key from key manager
- pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
+ pubkey = yield self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.GPG_BINARY_PATH)
- pgp.delete_key(pubkey)
+ yield pgp.delete_key(pubkey)
# mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
+ self._km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
# prepare the SMTP factory with encrypted only equal to false
proto = SMTPFactory(
u'anotheruser@leap.se',
- self._km, self._config['host'],
- self._config['port'],
- self._config['cert'], self._config['key'],
- False).buildProtocol(('127.0.0.1', 0))
+ self._km,
+ False, outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
- proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
- proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
- proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
+ yield self.getReply(self.EMAIL_DATA[0] + '\r\n', proto, transport)
+ yield self.getReply(self.EMAIL_DATA[1] + '\r\n', proto, transport)
+ reply = yield self.getReply(self.EMAIL_DATA[2] + '\r\n',
+ proto, transport)
# ensure the address was accepted
- lines = transport.value().rstrip().split('\n')
self.assertEqual(
- '250 Recipient address accepted',
- lines[-1],
+ '250 Recipient address accepted\r\n',
+ reply,
'Address should have been accepted with appropriate message.')
+ proto.setTimeout(None)
+
+ def getReply(self, line, proto, transport):
+ proto.lineReceived(line)
+
+ if line[:4] not in ['HELO', 'MAIL', 'RCPT', 'DATA']:
+ return succeed("")
+
+ def check_transport(_):
+ reply = transport.value()
+ if reply:
+ transport.clear()
+ return succeed(reply)
+
+ d = Deferred()
+ d.addCallback(check_transport)
+ reactor.callLater(0, lambda: d.callback(None))
+ return d
+
+ return check_transport(None)