summaryrefslogtreecommitdiff
path: root/src/leap/email/smtp
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-01-23 16:26:17 -0200
committerdrebs <drebs@leap.se>2013-01-23 16:26:17 -0200
commit727b46f4267a8b75f4f575953a293c90cd2e2661 (patch)
tree21b4daf719d20c7627e941e0f7f2bf773a2912f0 /src/leap/email/smtp
parent9509a4e68af74737cf96460dc5af9e1cbb836e66 (diff)
Add test for basic email sending.
Diffstat (limited to 'src/leap/email/smtp')
-rw-r--r--src/leap/email/smtp/smtprelay.tac44
-rw-r--r--src/leap/email/smtp/tests/__init__.py8
-rw-r--r--src/leap/email/smtp/tests/test_smtprelay.py70
3 files changed, 104 insertions, 18 deletions
diff --git a/src/leap/email/smtp/smtprelay.tac b/src/leap/email/smtp/smtprelay.tac
index cb302389..f7074dfb 100644
--- a/src/leap/email/smtp/smtprelay.tac
+++ b/src/leap/email/smtp/smtprelay.tac
@@ -15,9 +15,13 @@ class SMTPFactory(ServerFactory):
Factory for an SMTP server with encrypted relaying capabilities.
"""
+ def __init__(self, gpg=None):
+ self._gpg = gpg
+
def buildProtocol(self, addr):
"Return a protocol suitable for the job."
- smtpProtocol = smtp.SMTP(SMTPDelivery())
+ # TODO: use ESMTP here.
+ smtpProtocol = smtp.SMTP(SMTPDelivery(self._gpg))
smtpProtocol.factory = self
return smtpProtocol
@@ -29,8 +33,11 @@ class SMTPDelivery(object):
implements(smtp.IMessageDelivery)
- def __init__(self):
- self.gpg = GPGWrapper()
+ def __init__(self, gpg=None):
+ if gpg:
+ self._gpg = gpg
+ else:
+ self._gpg = GPGWrapper()
def receivedHeader(self, helo, origin, recipients):
myHostname, clientIP = helo
@@ -44,13 +51,13 @@ class SMTPDelivery(object):
# try to find recipient's public key
try:
# this will raise an exception if key is not found
- trust = self.gpg.find_key(user.dest.addrstr)['trust']
+ trust = self._gpg.find_key(user.dest.addrstr)['trust']
# if key is not ultimatelly trusted, then the message will not
# be encrypted. So, we check for this below
- if trust != 'u':
- raise smtp.SMTPBadRcpt(user)
+ #if trust != 'u':
+ # raise smtp.SMTPBadRcpt(user)
print "Accepting mail for %s..." % user.dest
- return lambda: EncryptedMessage(user)
+ return lambda: EncryptedMessage(user, gpg=self._gpg)
except LookupError:
raise smtp.SMTPBadRcpt(user)
@@ -70,11 +77,14 @@ class EncryptedMessage():
SMTP_HOSTNAME = "mail.riseup.net"
SMTP_PORT = 25
- def __init__(self, user):
+ def __init__(self, user, gpg=None):
self.user = user
self.getSMTPInfo()
self.lines = []
- self.gpg = GPGWrapper()
+ if gpg:
+ self._gpg = gpg
+ else:
+ self._gpg = GPGWrapper()
def lineReceived(self, line):
"""Store email DATA lines as they arrive."""
@@ -129,10 +139,12 @@ class EncryptedMessage():
d.addErrback(self.sendError)
return d
- def encrypt(self):
- fp = self.gpg.find_key(self.user.dest.addrstr)['fingerprint']
+ def encrypt(self, always_trust=True):
+ # TODO: do not "always trust" here.
+ fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint']
print "Encrypting to %s" % fp
- self.cyphertext = str(self.gpg.encrypt('\n'.join(self.body), [fp]))
+ self.cyphertext = str(self._gpg.encrypt('\n'.join(self.body), [fp],
+ always_trust=always_trust))
# this will be replaced by some other mechanism of obtaining credentials
# for SMTP server.
@@ -167,8 +179,12 @@ class GPGWrapper():
return key
raise LookupError("GnuPG public key for %s not found!" % email)
- def encrypt(self, data, recipient):
- return self.gpg.encrypt(data, recipient)
+ def encrypt(self, data, recipient, always_trust=True):
+ # TODO: do not 'always_trust'.
+ return self.gpg.encrypt(data, recipient, always_trust=always_trust)
+
+ def decrypt(self, data):
+ return self.gpg.decrypt(data)
def import_keys(self, data):
return self.gpg.import_keys(data)
diff --git a/src/leap/email/smtp/tests/__init__.py b/src/leap/email/smtp/tests/__init__.py
index 3d72377e..1b2d8bd1 100644
--- a/src/leap/email/smtp/tests/__init__.py
+++ b/src/leap/email/smtp/tests/__init__.py
@@ -1,7 +1,6 @@
-import unittest
-import gnupg
+from leap.email.smtp.smtprelay import GPGWrapper
import shutil
-import ipdb
+from twisted.trial import unittest
class OpenPGPTestCase(unittest.TestCase):
@@ -10,7 +9,7 @@ class OpenPGPTestCase(unittest.TestCase):
EMAIL = 'leap@leap.se'
def setUp(self):
- self._gpg = gnupg.GPG(gnupghome=self.GNUPG_HOME)
+ self._gpg = GPGWrapper(gpghome=self.GNUPG_HOME)
self.assertEqual(self._gpg.import_keys(PUBLIC_KEY).summary(),
'1 imported', "error importing public key")
@@ -25,6 +24,7 @@ class OpenPGPTestCase(unittest.TestCase):
def test_encrypt_decrypt(self):
text = "simple raw text"
encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT,
+ # TODO: handle always trust issue
always_trust=True))
self.assertNotEqual(text, encrypted, "failed encrypting text")
decrypted = str(self._gpg.decrypt(encrypted))
diff --git a/src/leap/email/smtp/tests/test_smtprelay.py b/src/leap/email/smtp/tests/test_smtprelay.py
new file mode 100644
index 00000000..5410c75e
--- /dev/null
+++ b/src/leap/email/smtp/tests/test_smtprelay.py
@@ -0,0 +1,70 @@
+from datetime import datetime
+import re
+from leap.email.smtp.smtprelay import (
+ SMTPFactory, # a ServerFactory
+ #SMTPDelivery, # an object
+ #EncryptedMessage,
+)
+from leap.email.smtp import tests
+from twisted.internet.error import ConnectionDone
+from twisted.test import proto_helpers
+
+
+class TestSmtpRelay(tests.OpenPGPTestCase):
+
+ IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])";
+ HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])";
+ IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
+
+ CRLF = '\r\n'
+ EMAIL_DATA = [ 'HELO relay.leap.se',
+ 'MAIL FROM: <user@leap.se>',
+ 'RCPT TO: <leap@leap.se>',
+ 'DATA',
+ 'From: User <user@leap.se>',
+ 'To: Leap <leap@leap.se>',
+ 'Date: ' + datetime.now().strftime('%c'),
+ 'Subject: test message',
+ '',
+ 'This is a secret message.',
+ 'Yours,',
+ 'A.',
+ '',
+ '.',
+ 'QUIT' ]
+ SMTP_ANSWERS = [ '220 ' + IP_OR_HOST_REGEX + ' NO UCE NO UBE NO RELAY PROBES',
+ '250 ' + IP_OR_HOST_REGEX + ' Hello ' + IP_OR_HOST_REGEX + ', nice to meet you',
+ '250 Sender address accepted',
+ '250 Recipient address accepted',
+ '354 Continue' ]
+
+
+ def setUp(self):
+ super(TestSmtpRelay, self).setUp()
+ self.proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0))
+ self.transport = proto_helpers.StringTransport()
+ self.proto.makeConnection(self.transport)
+
+
+ def tearDown(self):
+ self.proto.setTimeout(None)
+ super(TestSmtpRelay, self).tearDown()
+
+
+ def assertMatch(self, string, pattern, msg=None):
+ if not re.match(pattern, string):
+ msg = self._formatMessage(msg, '"%s" does not match pattern "%s".'
+ % (string, pattern))
+ raise self.failureException(msg)
+
+
+ def test_send_email(self):
+ """
+ If L{smtp.SMTP} receives an empty line, it responds with a 500 error
+ response code and a message about a syntax error.
+ """
+ for i, line in enumerate(self.EMAIL_DATA):
+ self.proto.lineReceived(line+self.CRLF)
+ self.assertMatch(self.transport.value(),
+ self.CRLF.join(self.SMTP_ANSWERS[0:i+1]))
+