# -*- coding: utf-8 -*-
# test_smtprelay.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
SMTP relay tests.
"""
import re
from datetime import datetime
from twisted.test import proto_helpers
from twisted.mail.smtp import (
User,
SMTPBadRcpt,
)
from mock import Mock
from leap.mail.smtp.smtprelay import (
SMTPFactory,
EncryptedMessage,
)
from leap.mail.tests.smtp import TestCaseWithKeyManager
from leap.common.keymanager import openpgp
# some regexps
IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}" + \
"([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"
HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" + \
"([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])"
IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
class TestSmtpRelay(TestCaseWithKeyManager):
EMAIL_DATA = ['HELO relay.leap.se',
'MAIL FROM: ',
'RCPT TO: ',
'DATA',
'From: User ',
'To: Leap ',
'Date: ' + datetime.now().strftime('%c'),
'Subject: test message',
'',
'This is a secret message.',
'Yours,',
'A.',
'',
'.',
'QUIT']
def assertMatch(self, string, pattern, msg=None):
if not re.match(pattern, string):
msg = self._formatMessage(msg, '"%s" does not match pattern "%s".'
% (string, pattern))
raise self.failureException(msg)
def test_openpgp_encrypt_decrypt(self):
"Test if openpgp can encrypt and decrypt."
text = "simple raw text"
pubkey = self._km.get_key(
'leap@leap.se', openpgp.OpenPGPKey, private=False)
encrypted = openpgp.encrypt_asym(text, pubkey)
self.assertNotEqual(text, encrypted, "failed encrypting text")
privkey = self._km.get_key(
'leap@leap.se', openpgp.OpenPGPKey, private=True)
decrypted = openpgp.decrypt_asym(encrypted, privkey)
self.assertEqual(text, decrypted, "failed decrypting text")
def test_relay_accepts_valid_email(self):
"""
Test if SMTP server responds correctly for valid interaction.
"""
SMTP_ANSWERS = ['220 ' + IP_OR_HOST_REGEX +
' NO UCE NO UBE NO RELAY PROBES',
'250 ' + IP_OR_HOST_REGEX + ' Hello ' +
IP_OR_HOST_REGEX + ', nice to meet you',
'250 Sender address accepted',
'250 Recipient address accepted',
'354 Continue']
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
for i, line in enumerate(self.EMAIL_DATA):
proto.lineReceived(line + '\r\n')
self.assertMatch(transport.value(),
'\r\n'.join(SMTP_ANSWERS[0:i + 1]))
proto.setTimeout(None)
def test_message_encrypt(self):
"""
Test if message gets encrypted to destination email.
"""
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se')
m = EncryptedMessage(user, self._km, self._config)
for line in self.EMAIL_DATA[4:12]:
m.lineReceived(line)
m.eomReceived()
privkey = self._km.get_key(
'leap@leap.se', openpgp.OpenPGPKey, private=True)
decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey)
self.assertEqual(
'\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
decrypted)
def test_missing_key_rejects_address(self):
"""
Test if server rejects to send unencrypted when 'encrypted_only' is
True.
"""
# remove key from key manager
pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(self._soledad)
pgp.delete_key(pubkey)
# mock the key fetching
self._km.fetch_keys_from_server = Mock(return_value=[])
# prepare the SMTP factory
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
# ensure the address was rejected
lines = transport.value().rstrip().split('\n')
self.assertEqual(
'550 Cannot receive for specified address',
lines[-1])
def test_missing_key_accepts_address(self):
"""
Test if server accepts to send unencrypted when 'encrypted_only' is
False.
"""
# remove key from key manager
pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(self._soledad)
pgp.delete_key(pubkey)
# mock the key fetching
self._km.fetch_keys_from_server = Mock(return_value=[])
# change the configuration
self._config['encrypted_only'] = False
# prepare the SMTP factory
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
# ensure the address was rejected
lines = transport.value().rstrip().split('\n')
self.assertEqual(
'250 Recipient address accepted',
lines[-1])
def test_malformed_address_rejects(self):
"""
Test if server rejects to send to malformed addresses.
"""
# mock the key fetching
self._km.fetch_keys_from_server = Mock(return_value=[])
# prepare the SMTP factory
for malformed in ['leap@']:
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
proto.lineReceived('RCPT TO: <%s>%s' % (malformed, '\r\n'))
# ensure the address was rejected
lines = transport.value().rstrip().split('\n')
self.assertEqual(
'550 Cannot receive for specified address',
lines[-1])
def test_prepare_header_adds_from(self):
"""
Test if message headers are OK.
"""
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se')
m = EncryptedMessage(user, self._km, self._config)
for line in self.EMAIL_DATA[4:12]:
m.lineReceived(line)
m.eomReceived()
self.assertEqual('', m._message['From'])