diff options
author | drebs <drebs@leap.se> | 2013-01-23 17:19:26 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-01-23 17:19:26 -0200 |
commit | c2af0a21cb1263ec354708b4623eca154a19d102 (patch) | |
tree | ec5296c03b44b2f6edbe044463be51dc784ba28b /src/leap/email/smtp | |
parent | 727b46f4267a8b75f4f575953a293c90cd2e2661 (diff) |
Add test for message encryption.
Diffstat (limited to 'src/leap/email/smtp')
-rw-r--r-- | src/leap/email/smtp/smtprelay.py (renamed from src/leap/email/smtp/smtprelay.tac) | 15 | ||||
-rw-r--r-- | src/leap/email/smtp/tests/__init__.py | 2 | ||||
-rw-r--r-- | src/leap/email/smtp/tests/test_smtprelay.py | 66 |
3 files changed, 46 insertions, 37 deletions
diff --git a/src/leap/email/smtp/smtprelay.tac b/src/leap/email/smtp/smtprelay.py index f7074dfb..f44aeb6f 100644 --- a/src/leap/email/smtp/smtprelay.tac +++ b/src/leap/email/smtp/smtprelay.py @@ -7,6 +7,7 @@ from twisted.internet.protocol import ServerFactory from twisted.internet import reactor from twisted.internet import defer from twisted.application import internet, service +from twisted.python import log from email.Header import Header @@ -56,7 +57,7 @@ class SMTPDelivery(object): # be encrypted. So, we check for this below #if trust != 'u': # raise smtp.SMTPBadRcpt(user) - print "Accepting mail for %s..." % user.dest + log.msg("Accepting mail for %s..." % user.dest) return lambda: EncryptedMessage(user, gpg=self._gpg) except LookupError: raise smtp.SMTPBadRcpt(user) @@ -92,7 +93,7 @@ class EncryptedMessage(): def eomReceived(self): """Encrypt and send message.""" - print "Message data complete." + log.msg("Message data complete.") self.lines.append('') # add a trailing newline self.parseMessage() try: @@ -108,15 +109,17 @@ class EncryptedMessage(): self.body = self.lines[sep+1:] def connectionLost(self): - print "Connection lost unexpectedly!" + log.msg("Connection lost unexpectedly!") + log.err() # unexpected loss of connection; don't save self.lines = [] def sendSuccess(self, r): - print r + log.msg(r) def sendError(self, e): - print e + log.msg(e) + log.err() def prepareHeader(self): self.headers.insert(1, "From: %s" % self.user.orig.addrstr) @@ -142,7 +145,7 @@ class EncryptedMessage(): def encrypt(self, always_trust=True): # TODO: do not "always trust" here. fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint'] - print "Encrypting to %s" % fp + log.msg("Encrypting to %s" % fp) self.cyphertext = str(self._gpg.encrypt('\n'.join(self.body), [fp], always_trust=always_trust)) diff --git a/src/leap/email/smtp/tests/__init__.py b/src/leap/email/smtp/tests/__init__.py index 1b2d8bd1..d00ebeb5 100644 --- a/src/leap/email/smtp/tests/__init__.py +++ b/src/leap/email/smtp/tests/__init__.py @@ -21,7 +21,7 @@ class OpenPGPTestCase(unittest.TestCase): def tearDown(self): shutil.rmtree(self.GNUPG_HOME) - def test_encrypt_decrypt(self): + def test_openpgp_encrypt_decrypt(self): text = "simple raw text" encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT, # TODO: handle always trust issue diff --git a/src/leap/email/smtp/tests/test_smtprelay.py b/src/leap/email/smtp/tests/test_smtprelay.py index 5410c75e..dc0055c6 100644 --- a/src/leap/email/smtp/tests/test_smtprelay.py +++ b/src/leap/email/smtp/tests/test_smtprelay.py @@ -1,22 +1,25 @@ from datetime import datetime import re from leap.email.smtp.smtprelay import ( - SMTPFactory, # a ServerFactory + SMTPFactory, #SMTPDelivery, # an object - #EncryptedMessage, + EncryptedMessage, ) from leap.email.smtp import tests from twisted.internet.error import ConnectionDone from twisted.test import proto_helpers +from twisted.internet import defer +from twisted.mail.smtp import User +# some regexps +IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"; +HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])"; +IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' + class TestSmtpRelay(tests.OpenPGPTestCase): - IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"; - HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])"; - IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' - - CRLF = '\r\n' + EMAIL_DATA = [ 'HELO relay.leap.se', 'MAIL FROM: <user@leap.se>', 'RCPT TO: <leap@leap.se>', @@ -32,25 +35,8 @@ class TestSmtpRelay(tests.OpenPGPTestCase): '', '.', 'QUIT' ] - SMTP_ANSWERS = [ '220 ' + IP_OR_HOST_REGEX + ' NO UCE NO UBE NO RELAY PROBES', - '250 ' + IP_OR_HOST_REGEX + ' Hello ' + IP_OR_HOST_REGEX + ', nice to meet you', - '250 Sender address accepted', - '250 Recipient address accepted', - '354 Continue' ] - - - def setUp(self): - super(TestSmtpRelay, self).setUp() - self.proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0)) - self.transport = proto_helpers.StringTransport() - self.proto.makeConnection(self.transport) - def tearDown(self): - self.proto.setTimeout(None) - super(TestSmtpRelay, self).tearDown() - - def assertMatch(self, string, pattern, msg=None): if not re.match(pattern, string): msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' @@ -58,13 +44,33 @@ class TestSmtpRelay(tests.OpenPGPTestCase): raise self.failureException(msg) - def test_send_email(self): + def test_relay_accepts_valid_email(self): """ - If L{smtp.SMTP} receives an empty line, it responds with a 500 error - response code and a message about a syntax error. + Test if SMTP server responds correctly for valid interaction. """ + SMTP_ANSWERS = [ '220 ' + IP_OR_HOST_REGEX + ' NO UCE NO UBE NO RELAY PROBES', + '250 ' + IP_OR_HOST_REGEX + ' Hello ' + IP_OR_HOST_REGEX + ', nice to meet you', + '250 Sender address accepted', + '250 Recipient address accepted', + '354 Continue' ] + proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0)) + transport = proto_helpers.StringTransport() + proto.makeConnection(transport) for i, line in enumerate(self.EMAIL_DATA): - self.proto.lineReceived(line+self.CRLF) - self.assertMatch(self.transport.value(), - self.CRLF.join(self.SMTP_ANSWERS[0:i+1])) + proto.lineReceived(line + '\r\n') + self.assertMatch(transport.value(), + '\r\n'.join(SMTP_ANSWERS[0:i+1])) + proto.setTimeout(None) + + + def test_message_encrypt(self): + proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0)) + user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se') + m = EncryptedMessage(user, self._gpg) + for line in self.EMAIL_DATA[4:12]: + m.lineReceived(line) + m.parseMessage() + m.encrypt() + decrypted = str(self._gpg.decrypt(m.cyphertext)) + self.assertEqual('\n'.join(self.EMAIL_DATA[9:12]), decrypted) |