diff options
Diffstat (limited to 'src/leap/mail/tests/smtp/test_smtprelay.py')
-rw-r--r-- | src/leap/mail/tests/smtp/test_smtprelay.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/src/leap/mail/tests/smtp/test_smtprelay.py b/src/leap/mail/tests/smtp/test_smtprelay.py new file mode 100644 index 0000000..6ef4e85 --- /dev/null +++ b/src/leap/mail/tests/smtp/test_smtprelay.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# test_smtprelay.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +SMTP relay tests. +""" + + +import re + + +from datetime import datetime +from twisted.test import proto_helpers +from twisted.mail.smtp import ( + User, + SMTPBadRcpt, +) +from mock import Mock + + +from leap.mail.smtp.smtprelay import ( + SMTPFactory, + EncryptedMessage, +) +from leap.mail.tests.smtp import TestCaseWithKeyManager +from leap.common.keymanager import openpgp + + +# some regexps +IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \ + "([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])" +HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \ + "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])" +IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' + + +class TestSmtpRelay(TestCaseWithKeyManager): + + EMAIL_DATA = ['HELO relay.leap.se', + 'MAIL FROM: <user@leap.se>', + 'RCPT TO: <leap@leap.se>', + 'DATA', + 'From: User <user@leap.se>', + 'To: Leap <leap@leap.se>', + 'Date: ' + datetime.now().strftime('%c'), + 'Subject: test message', + '', + 'This is a secret message.', + 'Yours,', + 'A.', + '', + '.', + 'QUIT'] + + def assertMatch(self, string, pattern, msg=None): + if not re.match(pattern, string): + msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' + % (string, pattern)) + raise self.failureException(msg) + + def test_openpgp_encrypt_decrypt(self): + "Test if openpgp can encrypt and decrypt." + text = "simple raw text" + pubkey = self._km.get_key( + 'leap@leap.se', openpgp.OpenPGPKey, private=False) + encrypted = openpgp.encrypt_asym(text, pubkey) + self.assertNotEqual(text, encrypted, "failed encrypting text") + privkey = self._km.get_key( + 'leap@leap.se', openpgp.OpenPGPKey, private=True) + decrypted = openpgp.decrypt_asym(encrypted, privkey) + self.assertEqual(text, decrypted, "failed decrypting text") + + def test_relay_accepts_valid_email(self): + """ + Test if SMTP server responds correctly for valid interaction. + """ + + SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX + + ' NO UCE NO UBE NO RELAY PROBES', + '250 ' + IP_OR_HOST_REGEX + ' Hello ' + + IP_OR_HOST_REGEX + ', nice to meet you', + '250 Sender address accepted', + '250 Recipient address accepted', + '354 Continue'] + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + for i, line in enumerate(self.EMAIL_DATA): + proto.lineReceived(line + '\r\n') + self.assertMatch(transport.value(), + '\r\n'.join(SMTP_ANSWERS[0:i + 1])) + proto.setTimeout(None) + + def test_message_encrypt(self): + """ + Test if message gets encrypted to destination email. + """ + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') + m = EncryptedMessage(user, self._km, self._config) + for line in self.EMAIL_DATA[4:12]: + m.lineReceived(line) + m.eomReceived() + privkey = self._km.get_key( + 'leap@leap.se', openpgp.OpenPGPKey, private=True) + decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey) + self.assertEqual( + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n', + decrypted) + + def test_missing_key_rejects_address(self): + """ + Test if server rejects to send unencrypted when 'encrypted_only' is + True. + """ + # remove key from key manager + pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey) + pgp = openpgp.OpenPGPScheme(self._soledad) + pgp.delete_key(pubkey) + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + # prepare the SMTP factory + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') + # ensure the address was rejected + lines = transport.value().rstrip().split('\n') + self.assertEqual( + '550 Cannot receive for specified address', + lines[-1]) + + def test_missing_key_accepts_address(self): + """ + Test if server accepts to send unencrypted when 'encrypted_only' is + False. + """ + # remove key from key manager + pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey) + pgp = openpgp.OpenPGPScheme(self._soledad) + pgp.delete_key(pubkey) + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + # change the configuration + self._config['encrypted_only'] = False + # prepare the SMTP factory + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[2] + '\r\n') + # ensure the address was rejected + lines = transport.value().rstrip().split('\n') + self.assertEqual( + '250 Recipient address accepted', + lines[-1]) + + def test_malformed_address_rejects(self): + """ + Test if server rejects to send to malformed addresses. + """ + # mock the key fetching + self._km.fetch_keys_from_server = Mock(return_value=[]) + # prepare the SMTP factory + for malformed in ['leap@']: + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) + proto.lineReceived(self.EMAIL_DATA[0] + '\r\n') + proto.lineReceived(self.EMAIL_DATA[1] + '\r\n') + proto.lineReceived('RCPT TO: <%s>%s' % (malformed, '\r\n')) + # ensure the address was rejected + lines = transport.value().rstrip().split('\n') + self.assertEqual( + '550 Cannot receive for specified address', + lines[-1]) + + def test_prepare_header_adds_from(self): + """ + Test if message headers are OK. + """ + proto = SMTPFactory( + self._km, self._config).buildProtocol(('127.0.0.1', 0)) + user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') + m = EncryptedMessage(user, self._km, self._config) + for line in self.EMAIL_DATA[4:12]: + m.lineReceived(line) + m.eomReceived() + self.assertEqual('<leap@leap.se>', m._message['From']) |