summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mail/src/leap/mail/outgoing/service.py43
-rw-r--r--mail/src/leap/mail/outgoing/tests/test_outgoing.py68
2 files changed, 72 insertions, 39 deletions
diff --git a/mail/src/leap/mail/outgoing/service.py b/mail/src/leap/mail/outgoing/service.py
index 97771870..c8818304 100644
--- a/mail/src/leap/mail/outgoing/service.py
+++ b/mail/src/leap/mail/outgoing/service.py
@@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from StringIO import StringIO
+from copy import deepcopy
from email.parser import Parser
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
@@ -33,7 +34,7 @@ from leap.common.check import leap_assert_type, leap_assert
from leap.common.events import proto, signal
from leap.keymanager import KeyManager
from leap.keymanager.openpgp import OpenPGPKey
-from leap.keymanager.errors import KeyNotFound
+from leap.keymanager.errors import KeyNotFound, KeyAddressMismatch
from leap.mail import __version__
from leap.mail.utils import validate_address
from leap.mail.smtp.rfc3156 import MultipartEncrypted
@@ -229,32 +230,37 @@ class OutgoingMail:
username, domain = from_address.split('@')
to_address = validate_address(recipient.dest.addrstr)
+ def maybe_encrypt_and_sign(message):
+ d = self._encrypt_and_sign(message, to_address, from_address)
+ d.addCallbacks(signal_encrypt_sign,
+ if_key_not_found_send_unencrypted,
+ errbackArgs=(message,))
+ return d
+
def signal_encrypt_sign(newmsg):
signal(proto.SMTP_END_ENCRYPT_AND_SIGN,
"%s,%s" % (self._from_address, to_address))
return newmsg, recipient
+ def if_key_not_found_send_unencrypted(failure, message):
+ failure.trap(KeyNotFound, KeyAddressMismatch)
+
+ log.msg('Will send unencrypted message to %s.' % to_address)
+ signal(proto.SMTP_START_SIGN, self._from_address)
+ d = self._sign(message, from_address)
+ d.addCallback(signal_sign)
+ return d
+
def signal_sign(newmsg):
signal(proto.SMTP_END_SIGN, self._from_address)
return newmsg, recipient
- def if_key_not_found_send_unencrypted(failure):
- if failure.check(KeyNotFound):
- log.msg('Will send unencrypted message to %s.' % to_address)
- signal(proto.SMTP_START_SIGN, self._from_address)
- d = self._sign(origmsg, from_address)
- d.addCallback(signal_sign)
- return d
- else:
- return failure
-
log.msg("Will encrypt the message with %s and sign with %s."
% (to_address, from_address))
signal(proto.SMTP_START_ENCRYPT_AND_SIGN,
"%s,%s" % (self._from_address, to_address))
d = self._maybe_attach_key(origmsg, from_address, to_address)
- d.addCallback(self._encrypt_and_sign, to_address, from_address)
- d.addCallbacks(signal_encrypt_sign, if_key_not_found_send_unencrypted)
+ d.addCallback(maybe_encrypt_and_sign)
return d
def _maybe_attach_key(self, origmsg, from_address, to_address):
@@ -267,7 +273,9 @@ class OutgoingMail:
# XXX: this might not be true some time in the future
if to_key.sign_used:
return origmsg
+ return get_key_and_attach(None)
+ def get_key_and_attach(_):
d = self._keymanager.get_key(from_address, OpenPGPKey,
fetch_remote=False)
d.addCallback(attach_key)
@@ -290,7 +298,7 @@ class OutgoingMail:
d = self._keymanager.get_key(to_address, OpenPGPKey,
fetch_remote=False)
- d.addCallback(attach_if_address_hasnt_encrypted)
+ d.addCallbacks(attach_if_address_hasnt_encrypted, get_key_and_attach)
d.addErrback(lambda _: origmsg)
return d
@@ -386,7 +394,7 @@ class OutgoingMail:
d.addCallback(create_signed_message)
return d
- def _fix_headers(self, origmsg, newmsg, sign_address):
+ def _fix_headers(self, msg, newmsg, sign_address):
"""
Move some headers from C{origmsg} to C{newmsg}, delete unwanted
headers from C{origmsg} and add new headers to C{newms}.
@@ -416,8 +424,8 @@ class OutgoingMail:
- User-Agent
- :param origmsg: The original message.
- :type origmsg: email.message.Message
+ :param msg: The original message.
+ :type msg: email.message.Message
:param newmsg: The new message being created.
:type newmsg: email.message.Message
:param sign_address: The address used to sign C{newmsg}
@@ -428,6 +436,7 @@ class OutgoingMail:
original Message with headers removed)
:rtype: Deferred
"""
+ origmsg = deepcopy(msg)
# move headers from origmsg to newmsg
headers = origmsg.items()
passthrough = [
diff --git a/mail/src/leap/mail/outgoing/tests/test_outgoing.py b/mail/src/leap/mail/outgoing/tests/test_outgoing.py
index 0eb05c87..2376da91 100644
--- a/mail/src/leap/mail/outgoing/tests/test_outgoing.py
+++ b/mail/src/leap/mail/outgoing/tests/test_outgoing.py
@@ -21,6 +21,7 @@ SMTP gateway tests.
"""
import re
+from StringIO import StringIO
from email.parser import Parser
from datetime import datetime
from twisted.internet.defer import fail
@@ -29,6 +30,7 @@ from twisted.mail.smtp import User
from mock import Mock
from leap.mail.smtp.gateway import SMTPFactory
+from leap.mail.smtp.rfc3156 import RFC3156CompliantGenerator
from leap.mail.outgoing.service import OutgoingMail
from leap.mail.tests import (
TestCaseWithKeyManager,
@@ -149,8 +151,11 @@ class TestOutgoingMail(TestCaseWithKeyManager):
message.get_param('protocol'))
self.assertEqual('pgp-sha512', message.get_param('micalg'))
# assert content of message
+ body = (message.get_payload(0)
+ .get_payload(0)
+ .get_payload(decode=True))
self.assertEqual(self.expected_body,
- message.get_payload(0).get_payload(decode=True))
+ body)
# assert content of signature
self.assertTrue(
message.get_payload(1).get_payload().startswith(
@@ -164,8 +169,12 @@ class TestOutgoingMail(TestCaseWithKeyManager):
def verify(message):
# replace EOL before verifying (according to rfc3156)
+ fp = StringIO()
+ g = RFC3156CompliantGenerator(
+ fp, mangle_from_=False, maxheaderlen=76)
+ g.flatten(message.get_payload(0))
signed_text = re.sub('\r?\n', '\r\n',
- message.get_payload(0).as_string())
+ fp.getvalue())
def assert_verify(key):
self.assertTrue(ADDRESS_2 in key.address,
@@ -183,32 +192,47 @@ class TestOutgoingMail(TestCaseWithKeyManager):
return d
def test_attach_key(self):
- def check_headers(message):
- msgstr = message.as_string(unixfrom=False)
- for header in self.EMAIL_DATA[4:8]:
- self.assertTrue(header in msgstr,
- "Missing header: %s" % (header,))
- return message
-
- def check_attachment((decrypted, _)):
- msg = Parser().parsestr(decrypted)
- for payload in msg.get_payload():
- if 'application/pgp-keys' == payload.get_content_type():
- keylines = PUBLIC_KEY_2.split('\n')
- key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1])
- self.assertTrue(key in payload.get_payload(),
- "Key attachment don't match")
- return
- self.fail("No public key attachment found")
-
d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
d.addCallback(self._assert_encrypted)
- d.addCallback(check_headers)
+ d.addCallback(self._check_headers, self.lines[:4])
d.addCallback(lambda message: self._km.decrypt(
message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey))
- d.addCallback(check_attachment)
+ d.addCallback(lambda (decrypted, _):
+ self._check_key_attachment(Parser().parsestr(decrypted)))
return d
+ def test_attach_key_not_known(self):
+ address = "someunknownaddress@somewhere.com"
+ lines = self.lines
+ lines[1] = "To: <%s>" % (address,)
+ raw = '\r\n'.join(lines)
+ dest = User(address, 'gateway.leap.se', self.proto, ADDRESS_2)
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(raw, dest)
+ d.addCallback(lambda (message, _):
+ self._check_headers(message, lines[:4]))
+ d.addCallback(self._check_key_attachment)
+ return d
+
+ def _check_headers(self, message, headers):
+ msgstr = message.as_string(unixfrom=False)
+ for header in headers:
+ self.assertTrue(header in msgstr,
+ "Missing header: %s" % (header,))
+ return message
+
+ def _check_key_attachment(self, message):
+ for payload in message.get_payload():
+ if payload.is_multipart():
+ return self._check_key_attachment(payload)
+ if 'application/pgp-keys' == payload.get_content_type():
+ keylines = PUBLIC_KEY_2.split('\n')
+ key = BEGIN_PUBLIC_KEY + '\n\n' + '\n'.join(keylines[4:-1])
+ self.assertTrue(key in payload.get_payload(decode=True),
+ "Key attachment don't match")
+ return
+ self.fail("No public key attachment found")
+
def _set_sign_used(self, address):
def set_sign(key):
key.sign_used = True