summaryrefslogtreecommitdiff
path: root/src/leap/email/smtp
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-01-23 17:19:26 -0200
committerdrebs <drebs@leap.se>2013-01-23 17:19:26 -0200
commitc2af0a21cb1263ec354708b4623eca154a19d102 (patch)
treeec5296c03b44b2f6edbe044463be51dc784ba28b /src/leap/email/smtp
parent727b46f4267a8b75f4f575953a293c90cd2e2661 (diff)
Add test for message encryption.
Diffstat (limited to 'src/leap/email/smtp')
-rw-r--r--src/leap/email/smtp/smtprelay.py (renamed from src/leap/email/smtp/smtprelay.tac)15
-rw-r--r--src/leap/email/smtp/tests/__init__.py2
-rw-r--r--src/leap/email/smtp/tests/test_smtprelay.py66
3 files changed, 46 insertions, 37 deletions
diff --git a/src/leap/email/smtp/smtprelay.tac b/src/leap/email/smtp/smtprelay.py
index f7074dfb..f44aeb6f 100644
--- a/src/leap/email/smtp/smtprelay.tac
+++ b/src/leap/email/smtp/smtprelay.py
@@ -7,6 +7,7 @@ from twisted.internet.protocol import ServerFactory
from twisted.internet import reactor
from twisted.internet import defer
from twisted.application import internet, service
+from twisted.python import log
from email.Header import Header
@@ -56,7 +57,7 @@ class SMTPDelivery(object):
# be encrypted. So, we check for this below
#if trust != 'u':
# raise smtp.SMTPBadRcpt(user)
- print "Accepting mail for %s..." % user.dest
+ log.msg("Accepting mail for %s..." % user.dest)
return lambda: EncryptedMessage(user, gpg=self._gpg)
except LookupError:
raise smtp.SMTPBadRcpt(user)
@@ -92,7 +93,7 @@ class EncryptedMessage():
def eomReceived(self):
"""Encrypt and send message."""
- print "Message data complete."
+ log.msg("Message data complete.")
self.lines.append('') # add a trailing newline
self.parseMessage()
try:
@@ -108,15 +109,17 @@ class EncryptedMessage():
self.body = self.lines[sep+1:]
def connectionLost(self):
- print "Connection lost unexpectedly!"
+ log.msg("Connection lost unexpectedly!")
+ log.err()
# unexpected loss of connection; don't save
self.lines = []
def sendSuccess(self, r):
- print r
+ log.msg(r)
def sendError(self, e):
- print e
+ log.msg(e)
+ log.err()
def prepareHeader(self):
self.headers.insert(1, "From: %s" % self.user.orig.addrstr)
@@ -142,7 +145,7 @@ class EncryptedMessage():
def encrypt(self, always_trust=True):
# TODO: do not "always trust" here.
fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint']
- print "Encrypting to %s" % fp
+ log.msg("Encrypting to %s" % fp)
self.cyphertext = str(self._gpg.encrypt('\n'.join(self.body), [fp],
always_trust=always_trust))
diff --git a/src/leap/email/smtp/tests/__init__.py b/src/leap/email/smtp/tests/__init__.py
index 1b2d8bd1..d00ebeb5 100644
--- a/src/leap/email/smtp/tests/__init__.py
+++ b/src/leap/email/smtp/tests/__init__.py
@@ -21,7 +21,7 @@ class OpenPGPTestCase(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.GNUPG_HOME)
- def test_encrypt_decrypt(self):
+ def test_openpgp_encrypt_decrypt(self):
text = "simple raw text"
encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT,
# TODO: handle always trust issue
diff --git a/src/leap/email/smtp/tests/test_smtprelay.py b/src/leap/email/smtp/tests/test_smtprelay.py
index 5410c75e..dc0055c6 100644
--- a/src/leap/email/smtp/tests/test_smtprelay.py
+++ b/src/leap/email/smtp/tests/test_smtprelay.py
@@ -1,22 +1,25 @@
from datetime import datetime
import re
from leap.email.smtp.smtprelay import (
- SMTPFactory, # a ServerFactory
+ SMTPFactory,
#SMTPDelivery, # an object
- #EncryptedMessage,
+ EncryptedMessage,
)
from leap.email.smtp import tests
from twisted.internet.error import ConnectionDone
from twisted.test import proto_helpers
+from twisted.internet import defer
+from twisted.mail.smtp import User
+# some regexps
+IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])";
+HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])";
+IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
+
class TestSmtpRelay(tests.OpenPGPTestCase):
- IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])";
- HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])";
- IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
-
- CRLF = '\r\n'
+
EMAIL_DATA = [ 'HELO relay.leap.se',
'MAIL FROM: <user@leap.se>',
'RCPT TO: <leap@leap.se>',
@@ -32,25 +35,8 @@ class TestSmtpRelay(tests.OpenPGPTestCase):
'',
'.',
'QUIT' ]
- SMTP_ANSWERS = [ '220 ' + IP_OR_HOST_REGEX + ' NO UCE NO UBE NO RELAY PROBES',
- '250 ' + IP_OR_HOST_REGEX + ' Hello ' + IP_OR_HOST_REGEX + ', nice to meet you',
- '250 Sender address accepted',
- '250 Recipient address accepted',
- '354 Continue' ]
-
-
- def setUp(self):
- super(TestSmtpRelay, self).setUp()
- self.proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0))
- self.transport = proto_helpers.StringTransport()
- self.proto.makeConnection(self.transport)
- def tearDown(self):
- self.proto.setTimeout(None)
- super(TestSmtpRelay, self).tearDown()
-
-
def assertMatch(self, string, pattern, msg=None):
if not re.match(pattern, string):
msg = self._formatMessage(msg, '"%s" does not match pattern "%s".'
@@ -58,13 +44,33 @@ class TestSmtpRelay(tests.OpenPGPTestCase):
raise self.failureException(msg)
- def test_send_email(self):
+ def test_relay_accepts_valid_email(self):
"""
- If L{smtp.SMTP} receives an empty line, it responds with a 500 error
- response code and a message about a syntax error.
+ Test if SMTP server responds correctly for valid interaction.
"""
+ SMTP_ANSWERS = [ '220 ' + IP_OR_HOST_REGEX + ' NO UCE NO UBE NO RELAY PROBES',
+ '250 ' + IP_OR_HOST_REGEX + ' Hello ' + IP_OR_HOST_REGEX + ', nice to meet you',
+ '250 Sender address accepted',
+ '250 Recipient address accepted',
+ '354 Continue' ]
+ proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0))
+ transport = proto_helpers.StringTransport()
+ proto.makeConnection(transport)
for i, line in enumerate(self.EMAIL_DATA):
- self.proto.lineReceived(line+self.CRLF)
- self.assertMatch(self.transport.value(),
- self.CRLF.join(self.SMTP_ANSWERS[0:i+1]))
+ proto.lineReceived(line + '\r\n')
+ self.assertMatch(transport.value(),
+ '\r\n'.join(SMTP_ANSWERS[0:i+1]))
+ proto.setTimeout(None)
+
+
+ def test_message_encrypt(self):
+ proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0))
+ user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se')
+ m = EncryptedMessage(user, self._gpg)
+ for line in self.EMAIL_DATA[4:12]:
+ m.lineReceived(line)
+ m.parseMessage()
+ m.encrypt()
+ decrypted = str(self._gpg.decrypt(m.cyphertext))
+ self.assertEqual('\n'.join(self.EMAIL_DATA[9:12]), decrypted)