summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/bitmask/mail/outgoing/tests/test_outgoing.py')
-rw-r--r--src/leap/bitmask/mail/outgoing/tests/test_outgoing.py263
1 files changed, 0 insertions, 263 deletions
diff --git a/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py b/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py
deleted file mode 100644
index dd053c15..00000000
--- a/src/leap/bitmask/mail/outgoing/tests/test_outgoing.py
+++ /dev/null
@@ -1,263 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_gateway.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-SMTP gateway tests.
-"""
-import re
-from copy import deepcopy
-from StringIO import StringIO
-from email.parser import Parser
-from datetime import datetime
-from twisted.internet.defer import fail
-from twisted.mail.smtp import User
-from twisted.python import log
-
-from mock import Mock
-
-from leap.mail.rfc3156 import RFC3156CompliantGenerator
-from leap.mail.outgoing.service import OutgoingMail
-from leap.mail.testing import ADDRESS, ADDRESS_2, PUBLIC_KEY_2
-from leap.mail.testing import KeyManagerWithSoledadTestCase
-from leap.mail.testing.smtp import getSMTPFactory
-from leap.keymanager import errors
-
-
-BEGIN_PUBLIC_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----"
-
-TEST_USER = u'anotheruser@leap.se'
-
-
-class TestOutgoingMail(KeyManagerWithSoledadTestCase):
- EMAIL_DATA = ['HELO gateway.leap.se',
- 'MAIL FROM: <%s>' % ADDRESS_2,
- 'RCPT TO: <%s>' % ADDRESS,
- 'DATA',
- 'From: User <%s>' % ADDRESS_2,
- 'To: Leap <%s>' % ADDRESS,
- 'Date: ' + datetime.now().strftime('%c'),
- 'Subject: test message',
- '',
- 'This is a secret message.',
- 'Yours,',
- 'A.',
- '',
- '.',
- 'QUIT']
-
- def setUp(self):
- self.lines = [line for line in self.EMAIL_DATA[4:12]]
- self.lines.append('') # add a trailing newline
- self.raw = '\r\n'.join(self.lines)
- self.expected_body = '\r\n'.join(self.EMAIL_DATA[9:12]) + "\r\n"
- self.fromAddr = ADDRESS_2
-
- class opts:
- cert = u'/tmp/cert'
- key = u'/tmp/cert'
- hostname = 'remote'
- port = 666
- self.opts = opts
-
- def init_outgoing_and_proto(_):
- self.outgoing_mail = OutgoingMail(
- self.fromAddr, self.km, opts.cert,
- opts.key, opts.hostname, opts.port)
-
- user = TEST_USER
-
- # TODO -- this shouldn't need SMTP to be tested!? or does it?
- self.proto = getSMTPFactory(
- {user: None}, {user: self.km}, {user: None})
- self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS_2)
-
- d = KeyManagerWithSoledadTestCase.setUp(self)
- d.addCallback(init_outgoing_and_proto)
- return d
-
- def test_message_encrypt(self):
- """
- Test if message gets encrypted to destination email.
- """
- def check_decryption(res):
- decrypted, _ = res
- self.assertEqual(
- '\n' + self.expected_body,
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- d = self._set_sign_used(ADDRESS)
- d.addCallback(
- lambda _:
- self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))
- d.addCallback(self._assert_encrypted)
- d.addCallback(lambda message: self.km.decrypt(
- message.get_payload(1).get_payload(), ADDRESS))
- d.addCallback(check_decryption)
- return d
-
- def test_message_encrypt_sign(self):
- """
- Test if message gets encrypted to destination email and signed with
- sender key.
- '"""
- def check_decryption_and_verify(res):
- decrypted, signkey = res
- self.assertEqual(
- '\n' + self.expected_body,
- decrypted,
- 'Decrypted text differs from plaintext.')
- self.assertTrue(ADDRESS_2 in signkey.address,
- "Verification failed")
-
- d = self._set_sign_used(ADDRESS)
- d.addCallback(
- lambda _:
- self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest))
- d.addCallback(self._assert_encrypted)
- d.addCallback(lambda message: self.km.decrypt(
- message.get_payload(1).get_payload(), ADDRESS, verify=ADDRESS_2))
- d.addCallback(check_decryption_and_verify)
- return d
-
- def test_message_sign(self):
- """
- Test if message is signed with sender key.
- """
- # mock the key fetching
- self.km._fetch_keys_from_server = Mock(
- return_value=fail(errors.KeyNotFound()))
- recipient = User('ihavenopubkey@nonleap.se',
- 'gateway.leap.se', self.proto, ADDRESS)
- self.outgoing_mail = OutgoingMail(
- self.fromAddr, self.km, self.opts.cert, self.opts.key,
- self.opts.hostname, self.opts.port)
-
- def check_signed(res):
- message, _ = res
- self.assertTrue('Content-Type' in message)
- self.assertEqual('multipart/signed', message.get_content_type())
- self.assertEqual('application/pgp-signature',
- message.get_param('protocol'))
- self.assertEqual('pgp-sha512', message.get_param('micalg'))
- # assert content of message
- body = (message.get_payload(0)
- .get_payload(0)
- .get_payload(decode=True))
- self.assertEqual(self.expected_body,
- body)
- # assert content of signature
- self.assertTrue(
- message.get_payload(1).get_payload().startswith(
- '-----BEGIN PGP SIGNATURE-----\n'),
- 'Message does not start with signature header.')
- self.assertTrue(
- message.get_payload(1).get_payload().endswith(
- '-----END PGP SIGNATURE-----\n'),
- 'Message does not end with signature footer.')
- return message
-
- def verify(message):
- # replace EOL before verifying (according to rfc3156)
- fp = StringIO()
- g = RFC3156CompliantGenerator(
- fp, mangle_from_=False, maxheaderlen=76)
- g.flatten(message.get_payload(0))
- signed_text = re.sub('\r?\n', '\r\n',
- fp.getvalue())
-
- def assert_verify(key):
- self.assertTrue(ADDRESS_2 in key.address,
- 'Signature could not be verified.')
-
- d = self.km.verify(
- signed_text, ADDRESS_2,
- detached_sig=message.get_payload(1).get_payload())
- d.addCallback(assert_verify)
- return d
-
- d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
- d.addCallback(check_signed)
- d.addCallback(verify)
- return d
-
- def test_attach_key(self):
- d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
- d.addCallback(self._assert_encrypted)
- d.addCallback(self._check_headers, self.lines[:4])
- d.addCallback(lambda message: self.km.decrypt(
- message.get_payload(1).get_payload(), ADDRESS))
- d.addCallback(lambda (decrypted, _):
- self._check_key_attachment(Parser().parsestr(decrypted)))
- return d
-
- def test_attach_key_not_known(self):
- unknown_address = "someunknownaddress@somewhere.com"
- lines = deepcopy(self.lines)
- lines[1] = "To: <%s>" % (unknown_address,)
- raw = '\r\n'.join(lines)
- dest = User(unknown_address, 'gateway.leap.se', self.proto, ADDRESS_2)
-
- d = self.outgoing_mail._maybe_encrypt_and_sign(
- raw, dest, fetch_remote=False)
- d.addCallback(lambda (message, _):
- self._check_headers(message, lines[:4]))
- d.addCallback(self._check_key_attachment)
- d.addErrback(log.err)
- return d
-
- def _check_headers(self, message, headers):
- msgstr = message.as_string(unixfrom=False)
- for header in headers:
- self.assertTrue(header in msgstr,
- "Missing header: %s" % (header,))
- return message
-
- def _check_key_attachment(self, message):
- for payload in message.get_payload():
- if payload.is_multipart():
- return self._check_key_attachment(payload)
- if 'application/pgp-keys' == payload.get_content_type():
- keylines = PUBLIC_KEY_2.split('\n')
- key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1])
- self.assertTrue(key in payload.get_payload(decode=True),
- "Key attachment don't match")
- return
- self.fail("No public key attachment found")
-
- def _set_sign_used(self, address):
- def set_sign(key):
- key.sign_used = True
- return self.km.put_key(key)
-
- d = self.km.get_key(address, fetch_remote=False)
- d.addCallback(set_sign)
- return d
-
- def _assert_encrypted(self, res):
- message, _ = res
- self.assertTrue('Content-Type' in message)
- self.assertEqual('multipart/encrypted', message.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- message.get_param('protocol'))
- self.assertEqual(2, len(message.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- message.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- message.get_payload(1).get_content_type())
- return message