summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2014-11-03 22:54:15 -0600
committerRuben Pollan <meskio@sindominio.net>2014-11-04 12:37:01 -0600
commit03283d27d972429886583b2a4902c4887b1849b5 (patch)
tree68546a06a49d16903ecd94ccfe8651b7d4e6c003
parent9fd5c69b281423a2481df3ed51c20f5370d27ab8 (diff)
Discover public key via attachment
-rw-r--r--mail/changes/feature-5937_key_attachment1
-rw-r--r--mail/src/leap/mail/imap/fetch.py45
-rw-r--r--mail/src/leap/mail/imap/tests/test_incoming_mail.py31
3 files changed, 63 insertions, 14 deletions
diff --git a/mail/changes/feature-5937_key_attachment b/mail/changes/feature-5937_key_attachment
new file mode 100644
index 00000000..08c37e02
--- /dev/null
+++ b/mail/changes/feature-5937_key_attachment
@@ -0,0 +1 @@
+- Discover public keys via attachment (Closes: #5937)
diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py
index 863f5fe7..01373be1 100644
--- a/mail/src/leap/mail/imap/fetch.py
+++ b/mail/src/leap/mail/imap/fetch.py
@@ -581,8 +581,8 @@ class LeapIncomingMail(object):
def _extract_keys(self, msgtuple):
"""
- Parse message headers for an *OpenPGP* header as described on the
- `IETF draft
+ Retrieve attached keys to the mesage and parse message headers for an
+ *OpenPGP* header as described on the `IETF draft
<http://tools.ietf.org/html/draft-josefsson-openpgp-mailnews-header-06>`
only urls with https and the same hostname than the email are supported
for security reasons.
@@ -600,31 +600,35 @@ class LeapIncomingMail(object):
# we should do it in this module so we don't need to parse it again
# here
msg = self._parser.parsestr(data)
+ _, fromAddress = parseaddr(msg['from'])
+
header = msg.get(OpenPGP_HEADER, None)
if header is not None:
- self._extract_openpgp_header(msg, header)
+ self._extract_openpgp_header(header, fromAddress)
+
+ if msg.is_multipart():
+ self._extract_attached_key(msg.get_payload(), fromAddress)
return msgtuple
- def _extract_openpgp_header(self, msg, header):
+ def _extract_openpgp_header(self, header, address):
"""
Import keys from the OpenPGP header
- :param msg: parsed email
- :type msg: email.Message
:param header: OpenPGP header string
:type header: str
+ :param address: email address in the from header
+ :type address: str
"""
fields = dict([f.strip(' ').split('=') for f in header.split(';')])
if 'url' in fields:
url = shlex.split(fields['url'])[0] # remove quotations
- _, fromAddress = parseaddr(msg['from'])
urlparts = urlparse(url)
- fromHostname = fromAddress.split('@')[1]
+ addressHostname = address.split('@')[1]
if (urlparts.scheme == 'https'
- and urlparts.hostname == fromHostname):
+ and urlparts.hostname == addressHostname):
try:
- self._keymanager.fetch_key(fromAddress, url, OpenPGPKey)
+ self._keymanager.fetch_key(address, url, OpenPGPKey)
logger.info("Imported key from header %s" % (url,))
except keymanager_errors.KeyNotFound:
logger.warning("Url from OpenPGP header %s failed"
@@ -632,13 +636,32 @@ class LeapIncomingMail(object):
except keymanager_errors.KeyAttributesDiffer:
logger.warning("Key from OpenPGP header url %s didn't "
"match the from address %s"
- % (url, fromAddress))
+ % (url, address))
else:
logger.debug("No valid url on OpenPGP header %s" % (url,))
else:
logger.debug("There is no url on the OpenPGP header: %s"
% (header,))
+ def _extract_attached_key(self, attachments, address):
+ """
+ Import keys from the attachments
+
+ :param attachments: email attachment list
+ :type attachments: list(email.Message)
+ :param address: email address in the from header
+ :type address: str
+ """
+ MIME_KEY = "application/pgp-keys"
+
+ for attachment in attachments:
+ if MIME_KEY == attachment.get_content_type():
+ logger.debug("Add key from attachment")
+ self._keymanager.put_raw_key(
+ attachment.get_payload(),
+ OpenPGPKey,
+ address=address)
+
def _add_message_locally(self, msgtuple):
"""
Adds a message to local inbox and delete it from the incoming db
diff --git a/mail/src/leap/mail/imap/tests/test_incoming_mail.py b/mail/src/leap/mail/imap/tests/test_incoming_mail.py
index 5b72facb..ce6d56aa 100644
--- a/mail/src/leap/mail/imap/tests/test_incoming_mail.py
+++ b/mail/src/leap/mail/imap/tests/test_incoming_mail.py
@@ -24,6 +24,8 @@ Test case for leap.email.imap.fetch
import json
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
from email.parser import Parser
from mock import Mock
from twisted.trial import unittest
@@ -105,13 +107,13 @@ subject: independence of cyberspace
email = self._create_incoming_email(message.as_string())
self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])
self.fetcher._keymanager.fetch_key = Mock()
- d = self.fetcher.fetch()
def fetch_key_called(ret):
self.fetcher._keymanager.fetch_key.assert_called_once_with(
self.FROM_ADDRESS, KEYURL, OpenPGPKey)
- d.addCallback(fetch_key_called)
+ d = self.fetcher.fetch()
+ d.addCallback(fetch_key_called)
return d
def testExtractOpenPGPHeaderInvalidUrl(self):
@@ -126,12 +128,35 @@ subject: independence of cyberspace
email = self._create_incoming_email(message.as_string())
self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])
self.fetcher._keymanager.fetch_key = Mock()
- d = self.fetcher.fetch()
def fetch_key_called(ret):
self.assertFalse(self.fetcher._keymanager.fetch_key.called)
+
+ d = self.fetcher.fetch()
d.addCallback(fetch_key_called)
+ return d
+ def testExtractAttachedKey(self):
+ """
+ Test the OpenPGP header key extraction
+ """
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+
+ message = MIMEMultipart()
+ message.add_header("from", self.FROM_ADDRESS)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+ email = self._create_incoming_email(message.as_string())
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])
+ self.fetcher._keymanager.put_raw_key = Mock()
+
+ def put_raw_key_called(ret):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, OpenPGPKey, address=self.FROM_ADDRESS)
+
+ d = self.fetcher.fetch()
+ d.addCallback(put_raw_key_called)
return d
def _create_incoming_email(self, email_str):