summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-05-09 18:09:07 -0300
committerdrebs <drebs@leap.se>2013-05-11 17:01:36 -0300
commitf492a7983ac4694ed8052f3ca79e5011e8711f83 (patch)
treebb3731d54f8753fe21d61f6d5899383c44fa5e3c
parenta7584ae58e7d08498199d8a556c928f7b8d6b20e (diff)
Make smtp-relay sign all outgoing message.
* Update docstrings. * Remove prepareHeader and its bug. * Make smtp-relay always sign outgoing mail.
-rw-r--r--src/leap/mail/smtp/__init__.py2
-rw-r--r--src/leap/mail/smtp/smtprelay.py129
-rw-r--r--src/leap/mail/tests/smtp/__init__.py65
-rw-r--r--src/leap/mail/tests/smtp/test_smtprelay.py142
4 files changed, 235 insertions, 103 deletions
diff --git a/src/leap/mail/smtp/__init__.py b/src/leap/mail/smtp/__init__.py
index 13af015..ace79b5 100644
--- a/src/leap/mail/smtp/__init__.py
+++ b/src/leap/mail/smtp/__init__.py
@@ -31,7 +31,7 @@ from leap.mail.smtp.smtprelay import SMTPFactory
def setup_smtp_relay(port, keymanager, smtp_host, smtp_port, smtp_username,
- smtp_password, encrypted_only):
+ smtp_password, encrypted_only):
"""
Setup SMTP relay to run with Twisted.
diff --git a/src/leap/mail/smtp/smtprelay.py b/src/leap/mail/smtp/smtprelay.py
index bd18fb5..d87dc87 100644
--- a/src/leap/mail/smtp/smtprelay.py
+++ b/src/leap/mail/smtp/smtprelay.py
@@ -40,11 +40,11 @@ from email.parser import Parser
from leap.common.check import leap_assert, leap_assert_type
from leap.common.keymanager import KeyManager
from leap.common.keymanager.openpgp import (
- encrypt_asym,
OpenPGPKey,
+ encrypt_asym,
+ sign,
)
from leap.common.keymanager.errors import KeyNotFound
-from leap.common.keymanager.keys import is_address
#
@@ -103,31 +103,25 @@ def assert_config_structure(config):
leap_assert(config[PASSWORD_KEY] != '')
-def strip_and_validate_address(address):
+def validate_address(address):
"""
- Helper function to (eventually) strip and validate an email address.
-
- This function first checks whether the incomming C{address} is of the form
- '<something>' and, if it is, then '<' and '>' are removed from the
- address. After that, a simple validation for user@provider form is
- carried.
+ Validate C{address} as defined in RFC 2822.
@param address: The address to be validated.
@type address: str
- @return: The (eventually) stripped address.
+ @return: A valid address.
@rtype: str
- @raise smtp.SMTPBadRcpt: Raised if C{address} does not have the expected
- format.
+ @raise smtp.SMTPBadRcpt: Raised if C{address} is invalid.
"""
- leap_assert(address is not None)
leap_assert_type(address, str)
+ # the following parses the address as described in RFC 2822 and
+ # returns ('', '') if the parse fails.
_, address = parseaddr(address)
- leap_assert(address != '')
- if is_address(address):
- return address
- raise smtp.SMTPBadRcpt(address)
+ if address == '':
+ raise smtp.SMTPBadRcpt(address)
+ return address
#
@@ -141,6 +135,8 @@ class SMTPFactory(ServerFactory):
def __init__(self, keymanager, config):
"""
+ Initialize the SMTP factory.
+
@param keymanager: A KeyManager for retrieving recipient's keys.
@type keymanager: leap.common.keymanager.KeyManager
@param config: A dictionary with smtp configuration. Should have
@@ -190,6 +186,8 @@ class SMTPDelivery(object):
def __init__(self, keymanager, config):
"""
+ Initialize the SMTP delivery object.
+
@param keymanager: A KeyManager for retrieving recipient's keys.
@type keymanager: leap.common.keymanager.KeyManager
@param config: A dictionary with smtp configuration. Should have
@@ -209,10 +207,11 @@ class SMTPDelivery(object):
# and store them
self._km = keymanager
self._config = config
+ self._origin = None
def receivedHeader(self, helo, origin, recipients):
"""
- Generate the Received header for a message.
+ Generate the 'Received:' header for a message.
@param helo: The argument to the HELO command and the client's IP
address.
@@ -234,12 +233,17 @@ class SMTPDelivery(object):
def validateTo(self, user):
"""
- Validate the address for which the message is destined.
+ Validate the address of C{user}, a recipient of the message.
+
+ This method is called once for each recipient and validates the
+ C{user}'s address against the RFC 2822 definition. If the
+ configuration option ENCRYPTED_ONLY_KEY is True, it also asserts the
+ existence of the user's key.
- For now, it just asserts the existence of the user's key if the
- configuration option ENCRYPTED_ONLY_KEY is True.
+ In the end, it returns an encrypted message object that is able to
+ send itself to the C{user}'s address.
- @param user: The address to validate.
+ @param user: The user whose address we wish to validate.
@type: twisted.mail.smtp.User
@return: A Deferred which becomes, or a callable which takes no
@@ -253,7 +257,7 @@ class SMTPDelivery(object):
"""
# try to find recipient's public key
try:
- address = strip_and_validate_address(user.dest.addrstr)
+ address = validate_address(user.dest.addrstr)
pubkey = self._km.get_key(address, OpenPGPKey)
log.msg("Accepting mail for %s..." % user.dest)
except KeyNotFound:
@@ -262,7 +266,8 @@ class SMTPDelivery(object):
raise smtp.SMTPBadRcpt(user.dest.addrstr)
log.msg("Warning: will send an unencrypted message (because "
"encrypted_only' is set to False).")
- return lambda: EncryptedMessage(user, self._km, self._config)
+ return lambda: EncryptedMessage(
+ self._origin, user, self._km, self._config)
def validateFrom(self, helo, origin):
"""
@@ -282,6 +287,7 @@ class SMTPDelivery(object):
"""
# accept mail from anywhere. To reject an address, raise
# smtp.SMTPBadSender here.
+ self._origin = origin
return origin
@@ -296,12 +302,14 @@ class EncryptedMessage(object):
"""
implements(smtp.IMessage)
- def __init__(self, user, keymanager, config):
+ def __init__(self, fromAddress, user, keymanager, config):
"""
Initialize the encrypted message.
- @param user: The address to validate.
- @type: twisted.mail.smtp.User
+ @param fromAddress: The address of the sender.
+ @type fromAddress: twisted.mail.smtp.Address
+ @param user: The recipient of this message.
+ @type user: twisted.mail.smtp.User
@param keymanager: A KeyManager for retrieving recipient's keys.
@type keymanager: leap.common.keymanager.KeyManager
@param config: A dictionary with smtp configuration. Should have
@@ -320,6 +328,7 @@ class EncryptedMessage(object):
leap_assert_type(keymanager, KeyManager)
assert_config_structure(config)
# and store them
+ self._fromAddress = fromAddress
self._user = user
self._km = keymanager
self._config = config
@@ -345,7 +354,7 @@ class EncryptedMessage(object):
self.lines.append('') # add a trailing newline
self.parseMessage()
try:
- self._encrypt()
+ self._encrypt_and_sign()
return self.sendMessage()
except KeyNotFound:
return None
@@ -385,12 +394,6 @@ class EncryptedMessage(object):
log.msg(e)
log.err()
- def prepareHeader(self):
- """
- Prepare the headers of the message.
- """
- self._message.replace_header('From', '<%s>' % self._user.orig.addrstr)
-
def sendMessage(self):
"""
Send the message.
@@ -402,7 +405,6 @@ class EncryptedMessage(object):
message send.
@rtype: twisted.internet.defer.Deferred
"""
- self.prepareHeader()
msg = self._message.as_string(False)
d = defer.Deferred()
factory = smtp.ESMTPSenderFactory(
@@ -424,26 +426,50 @@ class EncryptedMessage(object):
d.addErrback(self.sendError)
return d
- def _encrypt_payload_rec(self, message, pubkey):
+ def _encrypt_and_sign_payload_rec(self, message, pubkey, signkey):
"""
- Recursivelly descend in C{message}'s payload and encrypt to C{pubkey}.
+ Recursivelly descend in C{message}'s payload encrypting to C{pubkey}
+ and signing with C{signkey}.
@param message: The message whose payload we want to encrypt.
@type message: email.message.Message
@param pubkey: The public key used to encrypt the message.
@type pubkey: leap.common.keymanager.openpgp.OpenPGPKey
+ @param signkey: The private key used to sign the message.
+ @type signkey: leap.common.keymanager.openpgp.OpenPGPKey
"""
if message.is_multipart() is False:
- message.set_payload(encrypt_asym(message.get_payload(), pubkey))
+ message.set_payload(
+ encrypt_asym(
+ message.get_payload(), pubkey, sign=signkey))
else:
for msg in message.get_payload():
- self._encrypt_payload_rec(msg, pubkey)
+ self._encrypt_and_sign_payload_rec(msg, pubkey, signkey)
- def _encrypt(self):
+ def _sign_payload_rec(self, message, signkey):
+ """
+ Recursivelly descend in C{message}'s payload signing with C{signkey}.
+
+ @param message: The message whose payload we want to encrypt.
+ @type message: email.message.Message
+ @param pubkey: The public key used to encrypt the message.
+ @type pubkey: leap.common.keymanager.openpgp.OpenPGPKey
+ @param signkey: The private key used to sign the message.
+ @type signkey: leap.common.keymanager.openpgp.OpenPGPKey
+ """
+ if message.is_multipart() is False:
+ message.set_payload(
+ sign(
+ message.get_payload(), signkey))
+ else:
+ for msg in message.get_payload():
+ self._sign_payload_rec(msg, signkey)
+
+ def _encrypt_and_sign(self):
"""
Encrypt the message body.
- This method fetches the recipient key and encrypts the content to the
+ Fetch the recipient key and encrypt the content to the
recipient. If a key is not found, then the behaviour depends on the
configuration parameter ENCRYPTED_ONLY_KEY. If it is False, the message
is sent unencrypted and a warning is logged. If it is True, the
@@ -452,13 +478,18 @@ class EncryptedMessage(object):
@raise KeyNotFound: Raised when the recipient key was not found and
the ENCRYPTED_ONLY_KEY configuration parameter is set to True.
"""
+ from_address = validate_address(self._fromAddress.addrstr)
+ signkey = self._km.get_key(from_address, OpenPGPKey, private=True)
+ log.msg("Will sign the message with %s." % signkey.fingerprint)
+ to_address = validate_address(self._user.dest.addrstr)
try:
- address = strip_and_validate_address(self._user.dest.addrstr)
- pubkey = self._km.get_key(address, OpenPGPKey)
- log.msg("Encrypting to %s" % pubkey.fingerprint)
- self._encrypt_payload_rec(self._message, pubkey)
+ # try to get the recipient pubkey
+ pubkey = self._km.get_key(to_address, OpenPGPKey)
+ log.msg("Will encrypt the message to %s." % pubkey.fingerprint)
+ self._encrypt_and_sign_payload_rec(self._message, pubkey, signkey)
except KeyNotFound:
- if self._config[ENCRYPTED_ONLY_KEY]:
- raise
- log.msg("Warning: sending unencrypted mail (because "
- "'encrypted_only' is set to False).")
+ # at this point we _can_ send unencrypted mail, because if the
+ # configuration said the opposite the address would have been
+ # rejected in SMTPDelivery.validateTo().
+ self._sign_payload_rec(self._message, signkey)
+ log.msg('Will send unencrypted message to %s.' % to_address)
diff --git a/src/leap/mail/tests/smtp/__init__.py b/src/leap/mail/tests/smtp/__init__.py
index 113e047..c69c34f 100644
--- a/src/leap/mail/tests/smtp/__init__.py
+++ b/src/leap/mail/tests/smtp/__init__.py
@@ -106,6 +106,9 @@ class TestCaseWithKeyManager(BaseLeapTest):
# Key material for testing
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
+
+ADDRESS = 'leap@leap.se'
+
PUBLIC_KEY = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)
@@ -159,6 +162,7 @@ ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX
=MuOY
-----END PGP PUBLIC KEY BLOCK-----
"""
+
PRIVATE_KEY = """
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)
@@ -266,3 +270,64 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=
=JTFu
-----END PGP PRIVATE KEY BLOCK-----
"""
+
+ADDRESS_2 = 'anotheruser@leap.se'
+
+PUBLIC_KEY_2 = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR
+gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq
+Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0
+IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle
+AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E
+gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw
+ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4
+JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz
+VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt
+Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63
+yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ
+f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X
+Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck
+I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ=
+=Thdu
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+
+PRIVATE_KEY_2 = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD
+kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1
+6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB
+AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8
+H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks
+7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X
+C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje
+uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty
+GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI
+1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v
+dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG
+CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh
+8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD
+izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT
+oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL
+juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw
+cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe
+94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC
+rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx
+77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2
+3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF
+UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO
+2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB
+/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE
+JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda
+z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk
+o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6
+THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0
+=a5gs
+-----END PGP PRIVATE KEY BLOCK-----
+"""
+
diff --git a/src/leap/mail/tests/smtp/test_smtprelay.py b/src/leap/mail/tests/smtp/test_smtprelay.py
index 6ef4e85..e48f129 100644
--- a/src/leap/mail/tests/smtp/test_smtprelay.py
+++ b/src/leap/mail/tests/smtp/test_smtprelay.py
@@ -28,6 +28,7 @@ from datetime import datetime
from twisted.test import proto_helpers
from twisted.mail.smtp import (
User,
+ Address,
SMTPBadRcpt,
)
from mock import Mock
@@ -37,7 +38,11 @@ from leap.mail.smtp.smtprelay import (
SMTPFactory,
EncryptedMessage,
)
-from leap.mail.tests.smtp import TestCaseWithKeyManager
+from leap.mail.tests.smtp import (
+ TestCaseWithKeyManager,
+ ADDRESS,
+ ADDRESS_2,
+)
from leap.common.keymanager import openpgp
@@ -52,11 +57,11 @@ IP_OR_HOST_REGEX = '(' + IP_REGEX + '|' + HOSTNAME_REGEX + ')'
class TestSmtpRelay(TestCaseWithKeyManager):
EMAIL_DATA = ['HELO relay.leap.se',
- 'MAIL FROM: <user@leap.se>',
- 'RCPT TO: <leap@leap.se>',
+ 'MAIL FROM: <%s>' % ADDRESS_2,
+ 'RCPT TO: <%s>' % ADDRESS,
'DATA',
- 'From: User <user@leap.se>',
- 'To: Leap <leap@leap.se>',
+ 'From: User <%s>' % ADDRESS_2,
+ 'To: Leap <%s>' % ADDRESS,
'Date: ' + datetime.now().strftime('%c'),
'Subject: test message',
'',
@@ -77,13 +82,15 @@ class TestSmtpRelay(TestCaseWithKeyManager):
"Test if openpgp can encrypt and decrypt."
text = "simple raw text"
pubkey = self._km.get_key(
- 'leap@leap.se', openpgp.OpenPGPKey, private=False)
+ ADDRESS, openpgp.OpenPGPKey, private=False)
encrypted = openpgp.encrypt_asym(text, pubkey)
- self.assertNotEqual(text, encrypted, "failed encrypting text")
+ self.assertNotEqual(
+ text, encrypted, "Ciphertext is equal to plaintext.")
privkey = self._km.get_key(
- 'leap@leap.se', openpgp.OpenPGPKey, private=True)
+ ADDRESS, openpgp.OpenPGPKey, private=True)
decrypted = openpgp.decrypt_asym(encrypted, privkey)
- self.assertEqual(text, decrypted, "failed decrypting text")
+ self.assertEqual(text, decrypted,
+ "Decrypted text differs from plaintext.")
def test_relay_accepts_valid_email(self):
"""
@@ -104,7 +111,8 @@ class TestSmtpRelay(TestCaseWithKeyManager):
for i, line in enumerate(self.EMAIL_DATA):
proto.lineReceived(line + '\r\n')
self.assertMatch(transport.value(),
- '\r\n'.join(SMTP_ANSWERS[0:i + 1]))
+ '\r\n'.join(SMTP_ANSWERS[0:i + 1]),
+ 'Did not get expected answer from relay.')
proto.setTimeout(None)
def test_message_encrypt(self):
@@ -113,17 +121,77 @@ class TestSmtpRelay(TestCaseWithKeyManager):
"""
proto = SMTPFactory(
self._km, self._config).buildProtocol(('127.0.0.1', 0))
- user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se')
- m = EncryptedMessage(user, self._km, self._config)
+ fromAddr = Address(ADDRESS_2)
+ dest = User(ADDRESS, 'relay.leap.se', proto, ADDRESS)
+ m = EncryptedMessage(fromAddr, dest, self._km, self._config)
for line in self.EMAIL_DATA[4:12]:
m.lineReceived(line)
m.eomReceived()
privkey = self._km.get_key(
- 'leap@leap.se', openpgp.OpenPGPKey, private=True)
+ ADDRESS, openpgp.OpenPGPKey, private=True)
decrypted = openpgp.decrypt_asym(m._message.get_payload(), privkey)
self.assertEqual(
'\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
- decrypted)
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ def test_message_encrypt_sign(self):
+ """
+ Test if message gets encrypted to destination email and signed with
+ sender key.
+ """
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ user = User(ADDRESS, 'relay.leap.se', proto, ADDRESS)
+ fromAddr = Address(ADDRESS_2)
+ m = EncryptedMessage(fromAddr, user, self._km, self._config)
+ for line in self.EMAIL_DATA[4:12]:
+ m.lineReceived(line)
+ # trigger encryption and signing
+ m.eomReceived()
+ # decrypt and verify
+ privkey = self._km.get_key(
+ ADDRESS, openpgp.OpenPGPKey, private=True)
+ pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
+ decrypted = openpgp.decrypt_asym(
+ m._message.get_payload(), privkey, verify=pubkey)
+ self.assertEqual(
+ '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n',
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ def test_message_sign(self):
+ """
+ Test if message is signed with sender key.
+ """
+ # mock the key fetching
+ self._km.fetch_keys_from_server = Mock(return_value=[])
+ proto = SMTPFactory(
+ self._km, self._config).buildProtocol(('127.0.0.1', 0))
+ user = User('ihavenopubkey@nonleap.se', 'relay.leap.se', proto, ADDRESS)
+ fromAddr = Address(ADDRESS_2)
+ m = EncryptedMessage(fromAddr, user, self._km, self._config)
+ for line in self.EMAIL_DATA[4:12]:
+ m.lineReceived(line)
+ # trigger signing
+ m.eomReceived()
+ # assert content of message
+ self.assertTrue(
+ m._message.get_payload().startswith(
+ '-----BEGIN PGP SIGNED MESSAGE-----\n' +
+ 'Hash: SHA1\n\n' +
+ ('\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n' +
+ '-----BEGIN PGP SIGNATURE-----\n')),
+ 'Message does not start with signature header.')
+ self.assertTrue(
+ m._message.get_payload().endswith(
+ '-----END PGP SIGNATURE-----\n'),
+ 'Message does not end with signature footer.')
+ # assert signature is valid
+ pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
+ self.assertTrue(
+ openpgp.verify(m._message.get_payload(), pubkey),
+ 'Signature could not be verified.')
def test_missing_key_rejects_address(self):
"""
@@ -131,7 +199,7 @@ class TestSmtpRelay(TestCaseWithKeyManager):
True.
"""
# remove key from key manager
- pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey)
+ pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(self._soledad)
pgp.delete_key(pubkey)
# mock the key fetching
@@ -148,7 +216,8 @@ class TestSmtpRelay(TestCaseWithKeyManager):
lines = transport.value().rstrip().split('\n')
self.assertEqual(
'550 Cannot receive for specified address',
- lines[-1])
+ lines[-1],
+ 'Address should have been rejecetd with appropriate message.')
def test_missing_key_accepts_address(self):
"""
@@ -156,7 +225,7 @@ class TestSmtpRelay(TestCaseWithKeyManager):
False.
"""
# remove key from key manager
- pubkey = self._km.get_key('leap@leap.se', openpgp.OpenPGPKey)
+ pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(self._soledad)
pgp.delete_key(pubkey)
# mock the key fetching
@@ -171,42 +240,9 @@ class TestSmtpRelay(TestCaseWithKeyManager):
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
proto.lineReceived(self.EMAIL_DATA[2] + '\r\n')
- # ensure the address was rejected
+ # ensure the address was accepted
lines = transport.value().rstrip().split('\n')
self.assertEqual(
'250 Recipient address accepted',
- lines[-1])
-
- def test_malformed_address_rejects(self):
- """
- Test if server rejects to send to malformed addresses.
- """
- # mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
- # prepare the SMTP factory
- for malformed in ['leap@']:
- proto = SMTPFactory(
- self._km, self._config).buildProtocol(('127.0.0.1', 0))
- transport = proto_helpers.StringTransport()
- proto.makeConnection(transport)
- proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
- proto.lineReceived(self.EMAIL_DATA[1] + '\r\n')
- proto.lineReceived('RCPT TO: <%s>%s' % (malformed, '\r\n'))
- # ensure the address was rejected
- lines = transport.value().rstrip().split('\n')
- self.assertEqual(
- '550 Cannot receive for specified address',
- lines[-1])
-
- def test_prepare_header_adds_from(self):
- """
- Test if message headers are OK.
- """
- proto = SMTPFactory(
- self._km, self._config).buildProtocol(('127.0.0.1', 0))
- user = User('leap@leap.se', 'relay.leap.se', proto, 'leap@leap.se')
- m = EncryptedMessage(user, self._km, self._config)
- for line in self.EMAIL_DATA[4:12]:
- m.lineReceived(line)
- m.eomReceived()
- self.assertEqual('<leap@leap.se>', m._message['From'])
+ lines[-1],
+ 'Address should have been accepted with appropriate message.')