diff options
| author | drebs <drebs@leap.se> | 2013-01-23 16:26:17 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2013-01-23 16:26:17 -0200 | 
| commit | 727b46f4267a8b75f4f575953a293c90cd2e2661 (patch) | |
| tree | 21b4daf719d20c7627e941e0f7f2bf773a2912f0 | |
| parent | 9509a4e68af74737cf96460dc5af9e1cbb836e66 (diff) | |
Add test for basic email sending.
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | src/leap/email/__init__.py | 0 | ||||
| -rw-r--r-- | src/leap/email/smtp/smtprelay.tac | 44 | ||||
| -rw-r--r-- | src/leap/email/smtp/tests/__init__.py | 8 | ||||
| -rw-r--r-- | src/leap/email/smtp/tests/test_smtprelay.py | 70 | 
5 files changed, 105 insertions, 18 deletions
| @@ -20,3 +20,4 @@ src/leap/_branding.py  src/leap/certs/*.pem  src/*.egg-info  MANIFEST +_trial_temp* diff --git a/src/leap/email/__init__.py b/src/leap/email/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/email/__init__.py diff --git a/src/leap/email/smtp/smtprelay.tac b/src/leap/email/smtp/smtprelay.tac index cb302389..f7074dfb 100644 --- a/src/leap/email/smtp/smtprelay.tac +++ b/src/leap/email/smtp/smtprelay.tac @@ -15,9 +15,13 @@ class SMTPFactory(ServerFactory):      Factory for an SMTP server with encrypted relaying capabilities.      """ +    def __init__(self, gpg=None): +        self._gpg = gpg +      def buildProtocol(self, addr):          "Return a protocol suitable for the job." -        smtpProtocol = smtp.SMTP(SMTPDelivery()) +        # TODO: use ESMTP here. +        smtpProtocol = smtp.SMTP(SMTPDelivery(self._gpg))          smtpProtocol.factory = self          return smtpProtocol @@ -29,8 +33,11 @@ class SMTPDelivery(object):      implements(smtp.IMessageDelivery) -    def __init__(self): -        self.gpg = GPGWrapper() +    def __init__(self, gpg=None): +        if gpg: +            self._gpg = gpg +        else: +            self._gpg = GPGWrapper()      def receivedHeader(self, helo, origin, recipients):          myHostname, clientIP = helo @@ -44,13 +51,13 @@ class SMTPDelivery(object):          # try to find recipient's public key          try:              # this will raise an exception if key is not found -            trust = self.gpg.find_key(user.dest.addrstr)['trust'] +            trust = self._gpg.find_key(user.dest.addrstr)['trust']              # if key is not ultimatelly trusted, then the message will not              # be encrypted. So, we check for this below -            if trust != 'u': -                raise smtp.SMTPBadRcpt(user) +            #if trust != 'u': +            #    raise smtp.SMTPBadRcpt(user)              print "Accepting mail for %s..." % user.dest -            return lambda: EncryptedMessage(user) +            return lambda: EncryptedMessage(user, gpg=self._gpg)          except LookupError:              raise smtp.SMTPBadRcpt(user) @@ -70,11 +77,14 @@ class EncryptedMessage():      SMTP_HOSTNAME = "mail.riseup.net"      SMTP_PORT     = 25 -    def __init__(self, user): +    def __init__(self, user, gpg=None):          self.user = user          self.getSMTPInfo()          self.lines = [] -        self.gpg = GPGWrapper() +        if gpg: +            self._gpg = gpg +        else: +            self._gpg = GPGWrapper()      def lineReceived(self, line):          """Store email DATA lines as they arrive.""" @@ -129,10 +139,12 @@ class EncryptedMessage():          d.addErrback(self.sendError)          return d -    def encrypt(self): -        fp = self.gpg.find_key(self.user.dest.addrstr)['fingerprint'] +    def encrypt(self, always_trust=True): +        # TODO: do not "always trust" here. +        fp = self._gpg.find_key(self.user.dest.addrstr)['fingerprint']          print "Encrypting to %s" % fp -        self.cyphertext = str(self.gpg.encrypt('\n'.join(self.body), [fp])) +        self.cyphertext = str(self._gpg.encrypt('\n'.join(self.body), [fp], +                                                always_trust=always_trust))      # this will be replaced by some other mechanism of obtaining credentials      # for SMTP server. @@ -167,8 +179,12 @@ class GPGWrapper():                      return key          raise LookupError("GnuPG public key for %s not found!" % email) -    def encrypt(self, data, recipient): -        return self.gpg.encrypt(data, recipient) +    def encrypt(self, data, recipient, always_trust=True): +        # TODO: do not 'always_trust'. +        return self.gpg.encrypt(data, recipient, always_trust=always_trust) + +    def decrypt(self, data): +        return self.gpg.decrypt(data)      def import_keys(self, data):          return self.gpg.import_keys(data) diff --git a/src/leap/email/smtp/tests/__init__.py b/src/leap/email/smtp/tests/__init__.py index 3d72377e..1b2d8bd1 100644 --- a/src/leap/email/smtp/tests/__init__.py +++ b/src/leap/email/smtp/tests/__init__.py @@ -1,7 +1,6 @@ -import unittest -import gnupg +from leap.email.smtp.smtprelay import GPGWrapper  import shutil -import ipdb +from twisted.trial import unittest  class OpenPGPTestCase(unittest.TestCase): @@ -10,7 +9,7 @@ class OpenPGPTestCase(unittest.TestCase):      EMAIL      = 'leap@leap.se'      def setUp(self): -        self._gpg = gnupg.GPG(gnupghome=self.GNUPG_HOME) +        self._gpg = GPGWrapper(gpghome=self.GNUPG_HOME)          self.assertEqual(self._gpg.import_keys(PUBLIC_KEY).summary(),                           '1 imported', "error importing public key") @@ -25,6 +24,7 @@ class OpenPGPTestCase(unittest.TestCase):      def test_encrypt_decrypt(self):          text = "simple raw text"          encrypted = str(self._gpg.encrypt(text, KEY_FINGERPRINT, +                                          # TODO: handle always trust issue                                            always_trust=True))          self.assertNotEqual(text, encrypted, "failed encrypting text")          decrypted = str(self._gpg.decrypt(encrypted)) diff --git a/src/leap/email/smtp/tests/test_smtprelay.py b/src/leap/email/smtp/tests/test_smtprelay.py new file mode 100644 index 00000000..5410c75e --- /dev/null +++ b/src/leap/email/smtp/tests/test_smtprelay.py @@ -0,0 +1,70 @@ +from datetime import datetime +import re +from leap.email.smtp.smtprelay import ( +    SMTPFactory,   # a ServerFactory +    #SMTPDelivery, # an object +    #EncryptedMessage, +) +from leap.email.smtp import tests +from twisted.internet.error import ConnectionDone +from twisted.test import proto_helpers + + +class TestSmtpRelay(tests.OpenPGPTestCase): +     +    IP_REGEX = "(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"; +    HOSTNAME_REGEX = "(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])"; +    IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')' +     +    CRLF = '\r\n' +    EMAIL_DATA = [ 'HELO relay.leap.se', +                   'MAIL FROM: <user@leap.se>', +                   'RCPT TO: <leap@leap.se>', +                   'DATA', +                   'From: User <user@leap.se>', +                   'To: Leap <leap@leap.se>', +                   'Date: ' + datetime.now().strftime('%c'), +                   'Subject: test message', +                   '', +                   'This is a secret message.', +                   'Yours,', +                   'A.', +                   '', +                   '.', +                   'QUIT' ] +    SMTP_ANSWERS = [ '220 ' + IP_OR_HOST_REGEX + ' NO UCE NO UBE NO RELAY PROBES', +                     '250 ' + IP_OR_HOST_REGEX + ' Hello ' + IP_OR_HOST_REGEX + ', nice to meet you', +                     '250 Sender address accepted', +                     '250 Recipient address accepted', +                     '354 Continue' ] +     + +    def setUp(self): +        super(TestSmtpRelay, self).setUp() +        self.proto = SMTPFactory(self._gpg).buildProtocol(('127.0.0.1',0)) +        self.transport = proto_helpers.StringTransport() +        self.proto.makeConnection(self.transport) + + +    def tearDown(self): +        self.proto.setTimeout(None) +        super(TestSmtpRelay, self).tearDown() + +     +    def assertMatch(self, string, pattern, msg=None): +        if not re.match(pattern, string): +            msg = self._formatMessage(msg, '"%s" does not match pattern "%s".' +            % (string, pattern)) +            raise self.failureException(msg) + + +    def test_send_email(self): +        """ +        If L{smtp.SMTP} receives an empty line, it responds with a 500 error +        response code and a message about a syntax error. +        """ +        for i, line in enumerate(self.EMAIL_DATA): +            self.proto.lineReceived(line+self.CRLF) +            self.assertMatch(self.transport.value(), +                             self.CRLF.join(self.SMTP_ANSWERS[0:i+1])) + | 
