summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2014-12-09 12:18:40 -0600
committerKali Kaneko <kali@leap.se>2015-01-21 15:07:19 -0400
commit02a88688344070120ef09287c5d7cb654bc28e6e (patch)
tree03e50f397188e30352b9bcf646940f6619f3e2c1 /src
parent1f6687d1375ff97f1ad0746e45f91f922866f32d (diff)
New keymanager async API
Diffstat (limited to 'src')
-rw-r--r--src/leap/mail/imap/fetch.py312
-rw-r--r--src/leap/mail/imap/tests/test_incoming_mail.py38
-rw-r--r--src/leap/mail/service.py201
-rw-r--r--src/leap/mail/smtp/gateway.py38
-rw-r--r--src/leap/mail/smtp/tests/test_gateway.py25
-rw-r--r--src/leap/mail/tests/__init__.py88
-rw-r--r--src/leap/mail/tests/test_service.py216
7 files changed, 474 insertions, 444 deletions
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 01373be..dbc726a 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -36,7 +36,6 @@ from twisted.internet import defer, reactor
from twisted.internet.task import LoopingCall
from twisted.internet.task import deferLater
from u1db import errors as u1db_errors
-from zope.proxy import sameProxiedObjects
from leap.common import events as leap_events
from leap.common.check import leap_assert, leap_assert_type
@@ -138,13 +137,6 @@ class LeapIncomingMail(object):
# initialize a mail parser only once
self._parser = Parser()
- @property
- def _pkey(self):
- if sameProxiedObjects(self._keymanager, None):
- logger.warning('tried to get key, but null keymanager found')
- return None
- return self._keymanager.get_key(self._userid, OpenPGPKey, private=True)
-
#
# Public API: fetch, start_loop, stop.
#
@@ -312,40 +304,46 @@ class LeapIncomingMail(object):
:param doc: A document containing an encrypted message.
:type doc: SoledadDocument
- :return: A tuple containing the document and the decrypted message.
- :rtype: (SoledadDocument, str)
+ :return: A Deferred that will be fired with the document and the
+ decrypted message.
+ :rtype: SoledadDocument, str
"""
log.msg('decrypting msg')
- success = False
- try:
- decrdata = self._keymanager.decrypt(
- doc.content[ENC_JSON_KEY],
- self._pkey)
- success = True
- except Exception as exc:
- # XXX move this to errback !!!
- logger.error("Error while decrypting msg: %r" % (exc,))
- decrdata = ""
- leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
+ def process_decrypted(res):
+ if isinstance(res, tuple):
+ decrdata, _ = res
+ success = True
+ else:
+ decrdata = ""
+ success = False
- data = self._process_decrypted_doc((doc, decrdata))
- return (doc, data)
+ leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
- def _process_decrypted_doc(self, msgtuple):
+ data = self._process_decrypted_doc(doc, decrdata)
+ return doc, data
+
+ d = self._keymanager.decrypt(
+ doc.content[ENC_JSON_KEY],
+ self._userid, OpenPGPKey)
+ d.addErrback(self._errback)
+ d.addCallback(process_decrypted)
+ return d
+
+ def _process_decrypted_doc(self, doc, data):
"""
Process a document containing a succesfully decrypted message.
- :param msgtuple: a tuple consisting of a SoledadDocument
- instance containing the incoming message
- and data, the json-encoded, decrypted content of the
- incoming message
- :type msgtuple: (SoledadDocument, str)
+ :param doc: the incoming message
+ :type doc: SoledadDocument
+ :param data: the json-encoded, decrypted content of the incoming
+ message
+ :type data: str
+
:return: the processed data.
:rtype: str
"""
log.msg('processing decrypted doc')
- doc, data = msgtuple
# XXX turn this into an errBack for each one of
# the deferreds that would process an individual document
@@ -421,45 +419,40 @@ class LeapIncomingMail(object):
encoding = get_email_charset(data)
msg = self._parser.parsestr(data)
- # try to obtain sender public key
- senderPubkey = None
fromHeader = msg.get('from', None)
+ senderAddress = None
if (fromHeader is not None
and (msg.get_content_type() == MULTIPART_ENCRYPTED
or msg.get_content_type() == MULTIPART_SIGNED)):
- _, senderAddress = parseaddr(fromHeader)
- try:
- senderPubkey = self._keymanager.get_key(
- senderAddress, OpenPGPKey)
- except keymanager_errors.KeyNotFound:
- pass
-
- valid_sig = False # we will add a header saying if sig is valid
- decrypt_multi = self._decrypt_multipart_encrypted_msg
- decrypt_inline = self._maybe_decrypt_inline_encrypted_msg
+ senderAddress = parseaddr(fromHeader)
+
+ def add_leap_header(decrmsg, signkey):
+ if (senderAddress is None or
+ isinstance(signkey, keymanager_errors.KeyNotFound)):
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_COULD_NOT_VERIFY)
+ elif isinstance(signkey, keymanager_errors.InvalidSignature):
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_INVALID)
+ else:
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_VALID,
+ pubkey=signkey.key_id)
+ return decrmsg.as_string()
if msg.get_content_type() == MULTIPART_ENCRYPTED:
- decrmsg, valid_sig = decrypt_multi(
- msg, encoding, senderPubkey)
+ d = self._decrypt_multipart_encrypted_msg(
+ msg, encoding, senderAddress)
else:
- decrmsg, valid_sig = decrypt_inline(
- msg, encoding, senderPubkey)
-
- # add x-leap-signature header
- if senderPubkey is None:
- decrmsg.add_header(
- self.LEAP_SIGNATURE_HEADER,
- self.LEAP_SIGNATURE_COULD_NOT_VERIFY)
- else:
- decrmsg.add_header(
- self.LEAP_SIGNATURE_HEADER,
- self.LEAP_SIGNATURE_VALID if valid_sig else
- self.LEAP_SIGNATURE_INVALID,
- pubkey=senderPubkey.key_id)
-
- return decrmsg.as_string()
+ d = self._maybe_decrypt_inline_encrypted_msg(
+ msg, encoding, senderAddress)
+ d.addCallback(add_leap_header)
+ return d
- def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderPubkey):
+ def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderAddress):
"""
Decrypt a message with content-type 'multipart/encrypted'.
@@ -467,12 +460,13 @@ class LeapIncomingMail(object):
:type msg: Message
:param encoding: The encoding of the email message.
:type encoding: str
- :param senderPubkey: The key of the sender of the message.
- :type senderPubkey: OpenPGPKey
+ :param senderAddress: The email address of the sender of the message.
+ :type senderAddress: str
- :return: A tuple containing a decrypted message and
- a bool indicating whether the signature is valid.
- :rtype: (Message, bool)
+ :return: A Deferred that will be fired with a tuple containing a
+ decrypted Message and the signing OpenPGPKey if the signature
+ is valid or InvalidSignature or KeyNotFound.
+ :rtype: Deferred
"""
log.msg('decrypting multipart encrypted msg')
msg = copy.deepcopy(msg)
@@ -483,33 +477,33 @@ class LeapIncomingMail(object):
encdata = pgpencmsg.get_payload()
# decrypt or fail gracefully
- try:
- decrdata, valid_sig = self._decrypt_and_verify_data(
- encdata, senderPubkey)
- except keymanager_errors.DecryptError as e:
- logger.warning('Failed to decrypt encrypted message (%s). '
- 'Storing message without modifications.' % str(e))
- # Bailing out!
- return (msg, False)
+ def build_msg(res):
+ decrdata, signkey = res
- decrmsg = self._parser.parsestr(decrdata)
- # remove original message's multipart/encrypted content-type
- del(msg['content-type'])
+ decrmsg = self._parser.parsestr(decrdata)
+ # remove original message's multipart/encrypted content-type
+ del(msg['content-type'])
- # replace headers back in original message
- for hkey, hval in decrmsg.items():
- try:
- # this will raise KeyError if header is not present
- msg.replace_header(hkey, hval)
- except KeyError:
- msg[hkey] = hval
+ # replace headers back in original message
+ for hkey, hval in decrmsg.items():
+ try:
+ # this will raise KeyError if header is not present
+ msg.replace_header(hkey, hval)
+ except KeyError:
+ msg[hkey] = hval
+
+ # all ok, replace payload by unencrypted payload
+ msg.set_payload(decrmsg.get_payload())
+ return (msg, signkey)
- # all ok, replace payload by unencrypted payload
- msg.set_payload(decrmsg.get_payload())
- return (msg, valid_sig)
+ d = self._keymanager.decrypt(
+ encdata, self._userid, OpenPGPKey,
+ verify=senderAddress)
+ d.addCallbacks(build_msg, self._decryption_error, errbackArgs=(msg,))
+ return d
def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
- senderPubkey):
+ senderAddress):
"""
Possibly decrypt an inline OpenPGP encrypted message.
@@ -517,12 +511,13 @@ class LeapIncomingMail(object):
:type origmsg: Message
:param encoding: The encoding of the email message.
:type encoding: str
- :param senderPubkey: The key of the sender of the message.
- :type senderPubkey: OpenPGPKey
+ :param senderAddress: The email address of the sender of the message.
+ :type senderAddress: str
- :return: A tuple containing a decrypted message and
- a bool indicating whether the signature is valid.
- :rtype: (Message, bool)
+ :return: A Deferred that will be fired with a tuple containing a
+ decrypted Message and the signing OpenPGPKey if the signature
+ is valid or InvalidSignature or KeyNotFound.
+ :rtype: Deferred
"""
log.msg('maybe decrypting inline encrypted msg')
# serialize the original message
@@ -530,54 +525,48 @@ class LeapIncomingMail(object):
g = Generator(buf)
g.flatten(origmsg)
data = buf.getvalue()
+
+ def decrypted_data(res):
+ decrdata, signkey = res
+ return data.replace(pgp_message, decrdata), signkey
+
+ def encode_and_return(res):
+ data, signkey = res
+ if isinstance(data, unicode):
+ data = data.encode(encoding, 'replace')
+ return (self._parser.parsestr(data), signkey)
+
# handle exactly one inline PGP message
- valid_sig = False
if PGP_BEGIN in data:
begin = data.find(PGP_BEGIN)
end = data.find(PGP_END)
pgp_message = data[begin:end + len(PGP_END)]
- try:
- decrdata, valid_sig = self._decrypt_and_verify_data(
- pgp_message, senderPubkey)
- # replace encrypted by decrypted content
- data = data.replace(pgp_message, decrdata)
- except keymanager_errors.DecryptError:
- logger.warning('Failed to decrypt potential inline encrypted '
- 'message. Storing message as is...')
-
- # if message is not encrypted, return raw data
- if isinstance(data, unicode):
- data = data.encode(encoding, 'replace')
- return (self._parser.parsestr(data), valid_sig)
+ d = self._keymanager.decrypt(
+ pgp_message, self._userid, OpenPGPKey,
+ verify=senderAddress)
+ d.addCallbacks(decrypted_data, self._decryption_error,
+ errbackArgs=(data,))
+ else:
+ d = defer.succeed((data, None))
+ d.addCallback(encode_and_return)
+ return d
- def _decrypt_and_verify_data(self, data, senderPubkey):
+ def _decryption_error(self, failure, msg):
"""
- Decrypt C{data} using our private key and attempt to verify a
- signature using C{senderPubkey}.
-
- :param data: The text to be decrypted.
- :type data: unicode
- :param senderPubkey: The public key of the sender of the message.
- :type senderPubkey: OpenPGPKey
-
- :return: The decrypted data and a boolean stating whether the
- signature could be verified.
- :rtype: (str, bool)
-
- :raise DecryptError: Raised if failed to decrypt.
+ Check for known decryption errors
"""
- log.msg('decrypting and verifying data')
- valid_sig = False
- try:
- decrdata = self._keymanager.decrypt(
- data, self._pkey,
- verify=senderPubkey)
- if senderPubkey is not None:
- valid_sig = True
- except keymanager_errors.InvalidSignature:
- decrdata = self._keymanager.decrypt(
- data, self._pkey)
- return (decrdata, valid_sig)
+ if failure.check(keymanager_errors.DecryptError):
+ logger.warning('Failed to decrypt encrypted message (%s). '
+ 'Storing message without modifications.'
+ % str(failure.value))
+ return (msg, None)
+ elif failure.check(keymanager_errors.KeyNotFound):
+ logger.error('Failed to find private key for decryption (%s). '
+ 'Storing message without modifications.'
+ % str(failure.value))
+ return (msg, None)
+ else:
+ return failure
def _extract_keys(self, msgtuple):
"""
@@ -592,6 +581,10 @@ class LeapIncomingMail(object):
and data, the json-encoded, decrypted content of the
incoming message
:type msgtuple: (SoledadDocument, str)
+
+ :return: A Deferred that will be fired with msgtuple when key
+ extraction finishes
+ :rtype: Deferred
"""
OpenPGP_HEADER = 'OpenPGP'
doc, data = msgtuple
@@ -603,13 +596,17 @@ class LeapIncomingMail(object):
_, fromAddress = parseaddr(msg['from'])
header = msg.get(OpenPGP_HEADER, None)
+ dh = defer.success()
if header is not None:
- self._extract_openpgp_header(header, fromAddress)
+ dh = self._extract_openpgp_header(header, fromAddress)
+ da = defer.success()
if msg.is_multipart():
- self._extract_attached_key(msg.get_payload(), fromAddress)
+ da = self._extract_attached_key(msg.get_payload(), fromAddress)
- return msgtuple
+ d = defer.gatherResults([dh, da])
+ d.addCallback(lambda _: msgtuple)
+ return d
def _extract_openpgp_header(self, header, address):
"""
@@ -619,7 +616,11 @@ class LeapIncomingMail(object):
:type header: str
:param address: email address in the from header
:type address: str
+
+ :return: A Deferred that will be fired when header extraction is done
+ :rtype: Deferred
"""
+ d = defer.success()
fields = dict([f.strip(' ').split('=') for f in header.split(';')])
if 'url' in fields:
url = shlex.split(fields['url'])[0] # remove quotations
@@ -627,21 +628,28 @@ class LeapIncomingMail(object):
addressHostname = address.split('@')[1]
if (urlparts.scheme == 'https'
and urlparts.hostname == addressHostname):
- try:
- self._keymanager.fetch_key(address, url, OpenPGPKey)
- logger.info("Imported key from header %s" % (url,))
- except keymanager_errors.KeyNotFound:
- logger.warning("Url from OpenPGP header %s failed"
- % (url,))
- except keymanager_errors.KeyAttributesDiffer:
- logger.warning("Key from OpenPGP header url %s didn't "
- "match the from address %s"
- % (url, address))
+ def fetch_error(failure):
+ if failure.check(keymanager_errors.KeyNotFound):
+ logger.warning("Url from OpenPGP header %s failed"
+ % (url,))
+ elif failure.check(keymanager_errors.KeyAttributesDiffer):
+ logger.warning("Key from OpenPGP header url %s didn't "
+ "match the from address %s"
+ % (url, address))
+ else:
+ return failure
+
+ d = self._keymanager.fetch_key(address, url, OpenPGPKey)
+ d.addCallback(
+ lambda _:
+ logger.info("Imported key from header %s" % (url,)))
+ d.addErrback(fetch_error)
else:
logger.debug("No valid url on OpenPGP header %s" % (url,))
else:
logger.debug("There is no url on the OpenPGP header: %s"
% (header,))
+ return d
def _extract_attached_key(self, attachments, address):
"""
@@ -651,16 +659,22 @@ class LeapIncomingMail(object):
:type attachments: list(email.Message)
:param address: email address in the from header
:type address: str
+
+ :return: A Deferred that will be fired when all the keys are stored
+ :rtype: Deferred
"""
MIME_KEY = "application/pgp-keys"
+ deferreds = []
for attachment in attachments:
if MIME_KEY == attachment.get_content_type():
logger.debug("Add key from attachment")
- self._keymanager.put_raw_key(
+ d = self._keymanager.put_raw_key(
attachment.get_payload(),
OpenPGPKey,
address=address)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
def _add_message_locally(self, msgtuple):
"""
@@ -672,6 +686,9 @@ class LeapIncomingMail(object):
and data, the json-encoded, decrypted content of the
incoming message
:type msgtuple: (SoledadDocument, str)
+
+ :return: A Deferred that will be fired when the messages is stored
+ :rtype: Defferred
"""
doc, data = msgtuple
log.msg('adding message %s to local db' % (doc.doc_id,))
@@ -690,6 +707,7 @@ class LeapIncomingMail(object):
d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,),
notify_on_disk=True)
d.addCallbacks(msgSavedCallback, self._errback)
+ return d
#
# helpers
diff --git a/src/leap/mail/imap/tests/test_incoming_mail.py b/src/leap/mail/imap/tests/test_incoming_mail.py
index ce6d56a..03c0164 100644
--- a/src/leap/mail/imap/tests/test_incoming_mail.py
+++ b/src/leap/mail/imap/tests/test_incoming_mail.py
@@ -28,7 +28,6 @@ from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.parser import Parser
from mock import Mock
-from twisted.trial import unittest
from leap.keymanager.openpgp import OpenPGPKey
from leap.mail.imap.account import SoledadBackedAccount
@@ -48,7 +47,7 @@ from leap.soledad.common.crypto import (
)
-class LeapIncomingMailTestCase(TestCaseWithKeyManager, unittest.TestCase):
+class LeapIncomingMailTestCase(TestCaseWithKeyManager):
"""
Tests for the incoming mail parser
"""
@@ -147,31 +146,42 @@ subject: independence of cyberspace
key = MIMEApplication("", "pgp-keys")
key.set_payload(KEY)
message.attach(key)
- email = self._create_incoming_email(message.as_string())
- self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])
- self.fetcher._keymanager.put_raw_key = Mock()
def put_raw_key_called(ret):
self.fetcher._keymanager.put_raw_key.assert_called_once_with(
KEY, OpenPGPKey, address=self.FROM_ADDRESS)
- d = self.fetcher.fetch()
+ d = self.mock_fetch(message.as_string())
d.addCallback(put_raw_key_called)
return d
+ def _mock_fetch(self, message):
+ self.fetcher._keymanager.fetch_key = Mock()
+ d = self._create_incoming_email(message)
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ return d
+
def _create_incoming_email(self, email_str):
email = SoledadDocument()
- pubkey = self._km.get_key(ADDRESS, OpenPGPKey)
data = json.dumps(
{"incoming": True, "content": email_str},
ensure_ascii=False)
- email.content = {
- fields.INCOMING_KEY: True,
- fields.ERROR_DECRYPTING_KEY: False,
- ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
- ENC_JSON_KEY: str(self._km.encrypt(data, pubkey))
- }
- return email
+
+ def set_email_content(pubkey):
+ email.content = {
+ fields.INCOMING_KEY: True,
+ fields.ERROR_DECRYPTING_KEY: False,
+ ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
+ ENC_JSON_KEY: str(self._km.encrypt(data, pubkey))
+ }
+ return email
+
+ d = self._km.get_key(ADDRESS, OpenPGPKey)
+ d.addCallback(set_email_content)
+ return d
def _mock_soledad_get_from_index(self, index_name, value):
get_from_index = self._soledad.get_from_index
diff --git a/src/leap/mail/service.py b/src/leap/mail/service.py
index f6e4d11..a99f13a 100644
--- a/src/leap/mail/service.py
+++ b/src/leap/mail/service.py
@@ -24,7 +24,6 @@ from OpenSSL import SSL
from twisted.mail import smtp
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.internet.threads import deferToThread
from twisted.protocols.amp import ssl
from twisted.python import log
@@ -111,17 +110,17 @@ class OutgoingMail:
:type recipient: smtp.User
:return: a deferred which delivers the message when fired
"""
- d = deferToThread(lambda: self._maybe_encrypt_and_sign(raw, recipient))
+ d = self._maybe_encrypt_and_sign(raw, recipient)
d.addCallback(self._route_msg)
d.addErrback(self.sendError)
-
return d
def sendSuccess(self, smtp_sender_result):
"""
Callback for a successful send.
- :param smtp_sender_result: The result from the ESMTPSender from _route_msg
+ :param smtp_sender_result: The result from the ESMTPSender from
+ _route_msg
:type smtp_sender_result: tuple(int, list(tuple))
"""
dest_addrstr = smtp_sender_result[1][0][0]
@@ -145,7 +144,8 @@ class OutgoingMail:
"""
Sends the msg using the ESMTPSenderFactory.
- :param encrypt_and_sign_result: A tuple containing the 'maybe' encrypted message and the recipient
+ :param encrypt_and_sign_result: A tuple containing the 'maybe'
+ encrypted message and the recipient
:type encrypt_and_sign_result: tuple
"""
message, recipient = encrypt_and_sign_result
@@ -173,7 +173,6 @@ class OutgoingMail:
self._host, self._port, factory,
contextFactory=SSLContextFactory(self._cert, self._key))
-
def _maybe_encrypt_and_sign(self, raw, recipient):
"""
Attempt to encrypt and sign the outgoing message.
@@ -209,16 +208,20 @@ class OutgoingMail:
:param recipient: The recipient for the message
:type: recipient: smtp.User
+ :return: A Deferred that will be fired with a MIMEMultipart message
+ and the original recipient Message
+ :rtype: Deferred
"""
# pass if the original message's content-type is "multipart/encrypted"
lines = raw.split('\r\n')
origmsg = Parser().parsestr(raw)
if origmsg.get_content_type() == 'multipart/encrypted':
- return origmsg
+ return defer.success((origmsg, recipient))
from_address = validate_address(self._from_address)
username, domain = from_address.split('@')
+ to_address = validate_address(recipient.dest.addrstr)
# add a nice footer to the outgoing message
# XXX: footer will eventually optional or be removed
@@ -230,80 +233,93 @@ class OutgoingMail:
origmsg = Parser().parsestr('\r\n'.join(lines))
- # get sender and recipient data
- signkey = self._keymanager.get_key(from_address, OpenPGPKey, private=True)
- log.msg("Will sign the message with %s." % signkey.fingerprint)
- to_address = validate_address(recipient.dest.addrstr)
- try:
- # try to get the recipient pubkey
- pubkey = self._keymanager.get_key(to_address, OpenPGPKey)
- log.msg("Will encrypt the message to %s." % pubkey.fingerprint)
- signal(proto.SMTP_START_ENCRYPT_AND_SIGN,
- "%s,%s" % (self._from_address, to_address))
- newmsg = self._encrypt_and_sign(origmsg, pubkey, signkey)
-
+ def signal_encrypt_sign(newmsg):
signal(proto.SMTP_END_ENCRYPT_AND_SIGN,
"%s,%s" % (self._from_address, to_address))
- except KeyNotFound:
- # at this point we _can_ send unencrypted mail, because if the
- # configuration said the opposite the address would have been
- # rejected in SMTPDelivery.validateTo().
- log.msg('Will send unencrypted message to %s.' % to_address)
- signal(proto.SMTP_START_SIGN, self._from_address)
- newmsg = self._sign(origmsg, signkey)
- signal(proto.SMTP_END_SIGN, self._from_address)
- return newmsg, recipient
+ return newmsg, recipient
+ def signal_sign(newmsg):
+ signal(proto.SMTP_END_SIGN, self._from_address)
+ return newmsg, recipient
+
+ def if_key_not_found_send_unencrypted(failure):
+ if failure.check(KeyNotFound):
+ log.msg('Will send unencrypted message to %s.' % to_address)
+ signal(proto.SMTP_START_SIGN, self._from_address)
+ d = self._sign(origmsg, from_address)
+ d.addCallback(signal_sign)
+ return d
+ else:
+ return failure
+
+ log.msg("Will encrypt the message with %s and sign with %s."
+ % (to_address, from_address))
+ signal(proto.SMTP_START_ENCRYPT_AND_SIGN,
+ "%s,%s" % (self._from_address, to_address))
+ d = self._encrypt_and_sign(origmsg, to_address, from_address)
+ d.addCallbacks(signal_encrypt_sign, if_key_not_found_send_unencrypted)
+ return d
- def _encrypt_and_sign(self, origmsg, pubkey, signkey):
+ def _encrypt_and_sign(self, origmsg, encrypt_address, sign_address):
"""
Create an RFC 3156 compliang PGP encrypted and signed message using
- C{pubkey} to encrypt and C{signkey} to sign.
+ C{encrypt_address} to encrypt and C{sign_address} to sign.
:param origmsg: The original message
:type origmsg: email.message.Message
- :param pubkey: The public key used to encrypt the message.
- :type pubkey: OpenPGPKey
- :param signkey: The private key used to sign the message.
- :type signkey: OpenPGPKey
- :return: The encrypted and signed message
- :rtype: MultipartEncrypted
+ :param encrypt_address: The address used to encrypt the message.
+ :type encrypt_address: str
+ :param sign_address: The address used to sign the message.
+ :type sign_address: str
+
+ :return: A Deferred with the MultipartEncrypted message
+ :rtype: Deferred
"""
# create new multipart/encrypted message with 'pgp-encrypted' protocol
- newmsg = MultipartEncrypted('application/pgp-encrypted')
- # move (almost) all headers from original message to the new message
- self._fix_headers(origmsg, newmsg, signkey)
- # create 'application/octet-stream' encrypted message
- encmsg = MIMEApplication(
- self._keymanager.encrypt(origmsg.as_string(unixfrom=False), pubkey,
- sign=signkey),
- _subtype='octet-stream', _encoder=lambda x: x)
- encmsg.add_header('content-disposition', 'attachment',
- filename='msg.asc')
- # create meta message
- metamsg = PGPEncrypted()
- metamsg.add_header('Content-Disposition', 'attachment')
- # attach pgp message parts to new message
- newmsg.attach(metamsg)
- newmsg.attach(encmsg)
- return newmsg
-
-
- def _sign(self, origmsg, signkey):
+
+ def encrypt(res):
+ newmsg, origmsg = res
+ d = self._keymanager.encrypt(
+ origmsg.as_string(unixfrom=False),
+ encrypt_address, OpenPGPKey, sign=sign_address)
+ d.addCallback(lambda encstr: (newmsg, encstr))
+ return d
+
+ def create_encrypted_message(res):
+ newmsg, encstr = res
+ encmsg = MIMEApplication(
+ encstr, _subtype='octet-stream', _encoder=lambda x: x)
+ encmsg.add_header('content-disposition', 'attachment',
+ filename='msg.asc')
+ # create meta message
+ metamsg = PGPEncrypted()
+ metamsg.add_header('Content-Disposition', 'attachment')
+ # attach pgp message parts to new message
+ newmsg.attach(metamsg)
+ newmsg.attach(encmsg)
+ return newmsg
+
+ d = self._fix_headers(
+ origmsg,
+ MultipartEncrypted('application/pgp-encrypted'),
+ sign_address)
+ d.addCallback(encrypt)
+ d.addCallback(create_encrypted_message)
+ return d
+
+ def _sign(self, origmsg, sign_address):
"""
- Create an RFC 3156 compliant PGP signed MIME message using C{signkey}.
+ Create an RFC 3156 compliant PGP signed MIME message using
+ C{sign_address}.
:param origmsg: The original message
:type origmsg: email.message.Message
- :param signkey: The private key used to sign the message.
- :type signkey: leap.common.keymanager.openpgp.OpenPGPKey
- :return: The signed message.
- :rtype: MultipartSigned
+ :param sign_address: The address used to sign the message.
+ :type sign_address: str
+
+ :return: A Deferred with the MultipartSigned message.
+ :rtype: Deferred
"""
- # create new multipart/signed message
- newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512')
- # move (almost) all headers from original message to the new message
- self._fix_headers(origmsg, newmsg, signkey)
# apply base64 content-transfer-encoding
encode_base64_rec(origmsg)
# get message text with headers and replace \n for \r\n
@@ -316,17 +332,27 @@ class OutgoingMail:
if origmsg.is_multipart():
if not msgtext.endswith("\r\n"):
msgtext += "\r\n"
- # calculate signature
- signature = self._keymanager.sign(msgtext, signkey, digest_algo='SHA512',
- clearsign=False, detach=True, binary=False)
- sigmsg = PGPSignature(signature)
- # attach original message and signature to new message
- newmsg.attach(origmsg)
- newmsg.attach(sigmsg)
- return newmsg
+ def create_signed_message(res):
+ (msg, _), signature = res
+ sigmsg = PGPSignature(signature)
+ # attach original message and signature to new message
+ msg.attach(origmsg)
+ msg.attach(sigmsg)
+ return msg
+
+ dh = self._fix_headers(
+ origmsg,
+ MultipartSigned('application/pgp-signature', 'pgp-sha512'),
+ sign_address)
+ ds = self._keymanager.sign(
+ msgtext, sign_address, OpenPGPKey, digest_algo='SHA512',
+ clearsign=False, detach=True, binary=False)
+ d = defer.gatherResults([dh, ds])
+ d.addCallback(create_signed_message)
+ return d
- def _fix_headers(self, origmsg, newmsg, signkey):
+ def _fix_headers(self, origmsg, newmsg, sign_address):
"""
Move some headers from C{origmsg} to C{newmsg}, delete unwanted
headers from C{origmsg} and add new headers to C{newms}.
@@ -360,8 +386,13 @@ class OutgoingMail:
:type origmsg: email.message.Message
:param newmsg: The new message being created.
:type newmsg: email.message.Message
- :param signkey: The key used to sign C{newmsg}
- :type signkey: OpenPGPKey
+ :param sign_address: The address used to sign C{newmsg}
+ :type sign_address: str
+
+ :return: A Deferred with a touple:
+ (new Message with the unencrypted headers,
+ original Message with headers removed)
+ :rtype: Deferred
"""
# move headers from origmsg to newmsg
headers = origmsg.items()
@@ -375,11 +406,17 @@ class OutgoingMail:
del (origmsg[hkey])
# add a new message-id to newmsg
newmsg.add_header('Message-Id', smtp.messageid())
- # add openpgp header to newmsg
- username, domain = signkey.address.split('@')
- newmsg.add_header(
- 'OpenPGP', 'id=%s' % signkey.key_id,
- url='https://%s/key/%s' % (domain, username),
- preference='signencrypt')
# delete user-agent from origmsg
del (origmsg['user-agent'])
+
+ def add_openpgp_header(signkey):
+ username, domain = sign_address.split('@')
+ newmsg.add_header(
+ 'OpenPGP', 'id=%s' % signkey.key_id,
+ url='https://%s/key/%s' % (domain, username),
+ preference='signencrypt')
+ return newmsg, origmsg
+
+ d = self._keymanager.get_key(sign_address, OpenPGPKey, private=True)
+ d.addCallback(add_openpgp_header)
+ return d
diff --git a/src/leap/mail/smtp/gateway.py b/src/leap/mail/smtp/gateway.py
index b022091..d58c581 100644
--- a/src/leap/mail/smtp/gateway.py
+++ b/src/leap/mail/smtp/gateway.py
@@ -48,7 +48,6 @@ from leap.mail.smtp.rfc3156 import (
RFC3156CompliantGenerator,
)
-from leap.mail.service import OutgoingMail
# replace email generator with a RFC 3156 compliant one.
from email import generator
@@ -197,22 +196,31 @@ class SMTPDelivery(object):
accepted.
"""
# try to find recipient's public key
- try:
- address = validate_address(user.dest.addrstr)
- # verify if recipient key is available in keyring
- self._km.get_key(address, OpenPGPKey) # might raise KeyNotFound
+ address = validate_address(user.dest.addrstr)
+
+ # verify if recipient key is available in keyring
+ def found(_):
log.msg("Accepting mail for %s..." % user.dest.addrstr)
signal(proto.SMTP_RECIPIENT_ACCEPTED_ENCRYPTED, user.dest.addrstr)
- except KeyNotFound:
- # if key was not found, check config to see if will send anyway.
- if self._encrypted_only:
- signal(proto.SMTP_RECIPIENT_REJECTED, user.dest.addrstr)
- raise smtp.SMTPBadRcpt(user.dest.addrstr)
- log.msg("Warning: will send an unencrypted message (because "
- "encrypted_only' is set to False).")
- signal(
- proto.SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED, user.dest.addrstr)
- return lambda: EncryptedMessage(user, self._outgoing_mail)
+
+ def not_found(failure):
+ if failure.check(KeyNotFound):
+ # if key was not found, check config to see if will send anyway
+ if self._encrypted_only:
+ signal(proto.SMTP_RECIPIENT_REJECTED, user.dest.addrstr)
+ raise smtp.SMTPBadRcpt(user.dest.addrstr)
+ log.msg("Warning: will send an unencrypted message (because "
+ "encrypted_only' is set to False).")
+ signal(
+ proto.SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED,
+ user.dest.addrstr)
+ else:
+ return failure
+
+ d = self._km.get_key(address, OpenPGPKey) # might raise KeyNotFound
+ d.addCallbacks(found, not_found)
+ d.addCallbac(lambda _: EncryptedMessage(user, self._outgoing_mail))
+ return d
def validateFrom(self, helo, origin):
"""
diff --git a/src/leap/mail/smtp/tests/test_gateway.py b/src/leap/mail/smtp/tests/test_gateway.py
index aeace4a..8cbff8f 100644
--- a/src/leap/mail/smtp/tests/test_gateway.py
+++ b/src/leap/mail/smtp/tests/test_gateway.py
@@ -23,6 +23,7 @@ SMTP gateway tests.
import re
from datetime import datetime
+from twisted.internet.defer import inlineCallbacks, fail
from twisted.test import proto_helpers
from mock import Mock
@@ -34,7 +35,7 @@ from leap.mail.tests import (
ADDRESS,
ADDRESS_2,
)
-from leap.keymanager import openpgp
+from leap.keymanager import openpgp, errors
# some regexps
@@ -87,7 +88,8 @@ class TestSmtpGateway(TestCaseWithKeyManager):
proto = SMTPFactory(
u'anotheruser@leap.se',
self._km,
- self._config['encrypted_only'], outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
+ self._config['encrypted_only'],
+ outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
# snip...
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
@@ -98,23 +100,26 @@ class TestSmtpGateway(TestCaseWithKeyManager):
'Did not get expected answer from gateway.')
proto.setTimeout(None)
+ @inlineCallbacks
def test_missing_key_rejects_address(self):
"""
Test if server rejects to send unencrypted when 'encrypted_only' is
True.
"""
# remove key from key manager
- pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
+ pubkey = yield self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.GPG_BINARY_PATH)
- pgp.delete_key(pubkey)
+ yield pgp.delete_key(pubkey)
# mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
+ self._km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
# prepare the SMTP factory
proto = SMTPFactory(
u'anotheruser@leap.se',
self._km,
- self._config['encrypted_only'], outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
+ self._config['encrypted_only'],
+ outgoing_mail=Mock()).buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
proto.makeConnection(transport)
proto.lineReceived(self.EMAIL_DATA[0] + '\r\n')
@@ -127,18 +132,20 @@ class TestSmtpGateway(TestCaseWithKeyManager):
lines[-1],
'Address should have been rejecetd with appropriate message.')
+ @inlineCallbacks
def test_missing_key_accepts_address(self):
"""
Test if server accepts to send unencrypted when 'encrypted_only' is
False.
"""
# remove key from key manager
- pubkey = self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
+ pubkey = yield self._km.get_key(ADDRESS, openpgp.OpenPGPKey)
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.GPG_BINARY_PATH)
- pgp.delete_key(pubkey)
+ yield pgp.delete_key(pubkey)
# mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
+ self._km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
# prepare the SMTP factory with encrypted only equal to false
proto = SMTPFactory(
u'anotheruser@leap.se',
diff --git a/src/leap/mail/tests/__init__.py b/src/leap/mail/tests/__init__.py
index 10bc5fe..b35107d 100644
--- a/src/leap/mail/tests/__init__.py
+++ b/src/leap/mail/tests/__init__.py
@@ -19,16 +19,14 @@ Base classes and keys for leap.mail tests.
"""
import os
import distutils.spawn
-import shutil
-import tempfile
from mock import Mock
+from twisted.internet.defer import gatherResults
+from twisted.trial import unittest
from leap.soledad.client import Soledad
-from leap.keymanager import (
- KeyManager,
- openpgp,
-)
+from leap.keymanager import KeyManager
+from leap.keymanager.openpgp import OpenPGPKey
from leap.common.testing.basetest import BaseLeapTest
@@ -39,22 +37,12 @@ def _find_gpg():
return os.path.realpath(gpg_path) if gpg_path is not None else "/usr/bin/gpg"
-class TestCaseWithKeyManager(BaseLeapTest):
+class TestCaseWithKeyManager(unittest.TestCase, BaseLeapTest):
GPG_BINARY_PATH = _find_gpg()
def setUp(self):
- # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated
- # in Twisted: http://twistedmatrix.com/trac/ticket/1870
- self.old_path = os.environ['PATH']
- self.old_home = os.environ['HOME']
- self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
- self.home = self.tempdir
- bin_tdir = os.path.join(
- self.tempdir,
- 'bin')
- os.environ["PATH"] = bin_tdir
- os.environ["HOME"] = self.tempdir
+ self.setUpEnv()
# setup our own stuff
address = 'leap@leap.se' # user's address in the form user@provider
@@ -65,36 +53,7 @@ class TestCaseWithKeyManager(BaseLeapTest):
server_url = 'http://provider/'
cert_file = ''
- self._soledad = self._soledad_instance(
- uuid, passphrase, secrets_path, local_db_path, server_url,
- cert_file)
- self._km = self._keymanager_instance(address)
-
- def _soledad_instance(self, uuid, passphrase, secrets_path, local_db_path,
- server_url, cert_file):
- """
- Return a Soledad instance for tests.
- """
- # mock key fetching and storing so Soledad doesn't fail when trying to
- # reach the server.
- Soledad._fetch_keys_from_shared_db = Mock(return_value=None)
- Soledad._assert_keys_in_shared_db = Mock(return_value=None)
-
- # instantiate soledad
- def _put_doc_side_effect(doc):
- self._doc_put = doc
-
- class MockSharedDB(object):
-
- get_doc = Mock(return_value=None)
- put_doc = Mock(side_effect=_put_doc_side_effect)
- lock = Mock(return_value=('atoken', 300))
- unlock = Mock(return_value=True)
-
- def __call__(self):
- return self
-
- soledad = Soledad(
+ self._soledad = Soledad(
uuid,
passphrase,
secrets_path=secrets_path,
@@ -103,13 +62,11 @@ class TestCaseWithKeyManager(BaseLeapTest):
cert_file=cert_file,
syncable=False
)
+ return self._setup_keymanager(address)
- soledad._shared_db = MockSharedDB()
- return soledad
-
- def _keymanager_instance(self, address):
+ def _setup_keymanager(self, address):
"""
- Return a Key Manager instance for tests.
+ Set up Key Manager and return a Deferred that will be fired when done.
"""
self._config = {
'host': 'https://provider/',
@@ -132,26 +89,17 @@ class TestCaseWithKeyManager(BaseLeapTest):
pass
nickserver_url = '' # the url of the nickserver
- km = KeyManager(address, nickserver_url, self._soledad,
- ca_cert_path='', gpgbinary=self.GPG_BINARY_PATH)
- km._fetcher.put = Mock()
- km._fetcher.get = Mock(return_value=Response())
-
- # insert test keys in key manager.
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=self.GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- pgp.put_ascii_key(PRIVATE_KEY_2)
+ self._km = KeyManager(address, nickserver_url, self._soledad,
+ ca_cert_path='', gpgbinary=self.GPG_BINARY_PATH)
+ self._km._fetcher.put = Mock()
+ self._km._fetcher.get = Mock(return_value=Response())
- return km
+ d1 = self._km.put_raw_key(PRIVATE_KEY, OpenPGPKey, ADDRESS)
+ d2 = self._km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2)
+ return gatherResults([d1, d2])
def tearDown(self):
- # mimic LeapBaseTest.tearDownClass behaviour
- os.environ["PATH"] = self.old_path
- os.environ["HOME"] = self.old_home
- # safety check
- assert 'leap_tests-' in self.tempdir
- shutil.rmtree(self.tempdir)
+ self.tearDownEnv()
# Key material for testing
diff --git a/src/leap/mail/tests/test_service.py b/src/leap/mail/tests/test_service.py
index f0a807d..43f354d 100644
--- a/src/leap/mail/tests/test_service.py
+++ b/src/leap/mail/tests/test_service.py
@@ -22,7 +22,8 @@ SMTP gateway tests.
import re
from datetime import datetime
-from twisted.mail.smtp import User, Address
+from twisted.internet.defer import fail
+from twisted.mail.smtp import User
from mock import Mock
@@ -33,7 +34,7 @@ from leap.mail.tests import (
ADDRESS,
ADDRESS_2,
)
-from leap.keymanager import openpgp
+from leap.keymanager import openpgp, errors
class TestOutgoingMail(TestCaseWithKeyManager):
@@ -54,71 +55,126 @@ class TestOutgoingMail(TestCaseWithKeyManager):
'QUIT']
def setUp(self):
- TestCaseWithKeyManager.setUp(self)
self.lines = [line for line in self.EMAIL_DATA[4:12]]
self.lines.append('') # add a trailing newline
self.raw = '\r\n'.join(self.lines)
+ self.expected_body = ('\r\n'.join(self.EMAIL_DATA[9:12]) +
+ "\r\n\r\n--\r\nI prefer encrypted email - "
+ "https://leap.se/key/anotheruser\r\n")
self.fromAddr = ADDRESS_2
- self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'],
- self._config['host'], self._config['port'])
- self.proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km,
- self._config['encrypted_only'],
- self.outgoing_mail).buildProtocol(('127.0.0.1', 0))
- self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS)
-
- def test_openpgp_encrypt_decrypt(self):
- "Test if openpgp can encrypt and decrypt."
- text = "simple raw text"
- pubkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=False)
- encrypted = self._km.encrypt(text, pubkey)
- self.assertNotEqual(
- text, encrypted, "Ciphertext is equal to plaintext.")
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(encrypted, privkey)
- self.assertEqual(text, decrypted,
- "Decrypted text differs from plaintext.")
+
+ def init_outgoing_and_proto(_):
+ self.outgoing_mail = OutgoingMail(
+ self.fromAddr, self._km, self._config['cert'],
+ self._config['key'], self._config['host'],
+ self._config['port'])
+ self.proto = SMTPFactory(
+ u'anotheruser@leap.se',
+ self._km,
+ self._config['encrypted_only'],
+ self.outgoing_mail).buildProtocol(('127.0.0.1', 0))
+ self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS)
+
+ d = TestCaseWithKeyManager.setUp(self)
+ d.addCallback(init_outgoing_and_proto)
+ return d
def test_message_encrypt(self):
"""
Test if message gets encrypted to destination email.
"""
-
- message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
-
- # assert structure of encrypted message
- self.assertTrue('Content-Type' in message)
- self.assertEqual('multipart/encrypted', message.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- message.get_param('protocol'))
- self.assertEqual(2, len(message.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- message.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- message.get_payload(1).get_content_type())
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(
- message.get_payload(1).get_payload(), privkey)
-
- expected = '\n' + '\r\n'.join(
- self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n'
- self.assertEqual(
- expected,
- decrypted,
- 'Decrypted text differs from plaintext.')
+ def check_decryption(res):
+ decrypted, _ = res
+ self.assertEqual(
+ '\n' + self.expected_body,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(lambda message: self._km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey))
+ d.addCallback(check_decryption)
+ return d
def test_message_encrypt_sign(self):
"""
Test if message gets encrypted to destination email and signed with
sender key.
- """
- message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ '"""
+ def check_decryption_and_verify(res):
+ decrypted, signkey = res
+ self.assertEqual(
+ '\n' + self.expected_body,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+ self.assertTrue(ADDRESS_2 in signkey.address,
+ "Verification failed")
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(lambda message: self._km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey,
+ verify=ADDRESS_2))
+ d.addCallback(check_decryption_and_verify)
+ return d
- # assert structure of encrypted message
+ def test_message_sign(self):
+ """
+ Test if message is signed with sender key.
+ """
+ # mock the key fetching
+ self._km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
+ recipient = User('ihavenopubkey@nonleap.se',
+ 'gateway.leap.se', self.proto, ADDRESS)
+ self.outgoing_mail = OutgoingMail(
+ self.fromAddr, self._km, self._config['cert'], self._config['key'],
+ self._config['host'], self._config['port'])
+
+ def check_signed(res):
+ message, _ = res
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/signed', message.get_content_type())
+ self.assertEqual('application/pgp-signature',
+ message.get_param('protocol'))
+ self.assertEqual('pgp-sha512', message.get_param('micalg'))
+ # assert content of message
+ self.assertEqual(self.expected_body,
+ message.get_payload(0).get_payload(decode=True))
+ # assert content of signature
+ self.assertTrue(
+ message.get_payload(1).get_payload().startswith(
+ '-----BEGIN PGP SIGNATURE-----\n'),
+ 'Message does not start with signature header.')
+ self.assertTrue(
+ message.get_payload(1).get_payload().endswith(
+ '-----END PGP SIGNATURE-----\n'),
+ 'Message does not end with signature footer.')
+ return message
+
+ def verify(message):
+ # replace EOL before verifying (according to rfc3156)
+ signed_text = re.sub('\r?\n', '\r\n',
+ message.get_payload(0).as_string())
+
+ def assert_verify(key):
+ self.assertTrue(ADDRESS_2 in key.address,
+ 'Signature could not be verified.')
+
+ d = self._km.verify(
+ signed_text, ADDRESS_2, openpgp.OpenPGPKey,
+ detached_sig=message.get_payload(1).get_payload())
+ d.addCallback(assert_verify)
+ return d
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
+ d.addCallback(check_signed)
+ d.addCallback(verify)
+ return d
+
+ def _assert_encrypted(self, res):
+ message, _ = res
self.assertTrue('Content-Type' in message)
self.assertEqual('multipart/encrypted', message.get_content_type())
self.assertEqual('application/pgp-encrypted',
@@ -128,58 +184,4 @@ class TestOutgoingMail(TestCaseWithKeyManager):
message.get_payload(0).get_content_type())
self.assertEqual('application/octet-stream',
message.get_payload(1).get_content_type())
- # decrypt and verify
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- decrypted = self._km.decrypt(
- message.get_payload(1).get_payload(), privkey, verify=pubkey)
- self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- def test_message_sign(self):
- """
- Test if message is signed with sender key.
- """
- # mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
- recipient = User('ihavenopubkey@nonleap.se',
- 'gateway.leap.se', self.proto, ADDRESS)
- self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'],
- self._config['host'], self._config['port'])
-
- message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
-
- # assert structure of signed message
- self.assertTrue('Content-Type' in message)
- self.assertEqual('multipart/signed', message.get_content_type())
- self.assertEqual('application/pgp-signature',
- message.get_param('protocol'))
- self.assertEqual('pgp-sha512', message.get_param('micalg'))
- # assert content of message
- self.assertEqual(
- '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- message.get_payload(0).get_payload(decode=True))
- # assert content of signature
- self.assertTrue(
- message.get_payload(1).get_payload().startswith(
- '-----BEGIN PGP SIGNATURE-----\n'),
- 'Message does not start with signature header.')
- self.assertTrue(
- message.get_payload(1).get_payload().endswith(
- '-----END PGP SIGNATURE-----\n'),
- 'Message does not end with signature footer.')
- # assert signature is valid
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- # replace EOL before verifying (according to rfc3156)
- signed_text = re.sub('\r?\n', '\r\n',
- message.get_payload(0).as_string())
- self.assertTrue(
- self._km.verify(signed_text,
- pubkey,
- detached_sig=message.get_payload(1).get_payload()),
- 'Signature could not be verified.')
+ return message