diff options
| -rw-r--r-- | mail/changes/feature-5937_key_attachment | 1 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/fetch.py | 45 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/tests/test_incoming_mail.py | 31 | 
3 files changed, 63 insertions, 14 deletions
| diff --git a/mail/changes/feature-5937_key_attachment b/mail/changes/feature-5937_key_attachment new file mode 100644 index 0000000..08c37e0 --- /dev/null +++ b/mail/changes/feature-5937_key_attachment @@ -0,0 +1 @@ +- Discover public keys via attachment (Closes: #5937) diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 863f5fe..01373be 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -581,8 +581,8 @@ class LeapIncomingMail(object):      def _extract_keys(self, msgtuple):          """ -        Parse message headers for an *OpenPGP* header as described on the -        `IETF draft +        Retrieve attached keys to the mesage and parse message headers for an +        *OpenPGP* header as described on the `IETF draft          <http://tools.ietf.org/html/draft-josefsson-openpgp-mailnews-header-06>`          only urls with https and the same hostname than the email are supported          for security reasons. @@ -600,31 +600,35 @@ class LeapIncomingMail(object):          #     we should do it in this module so we don't need to parse it again          #     here          msg = self._parser.parsestr(data) +        _, fromAddress = parseaddr(msg['from']) +          header = msg.get(OpenPGP_HEADER, None)          if header is not None: -            self._extract_openpgp_header(msg, header) +            self._extract_openpgp_header(header, fromAddress) + +        if msg.is_multipart(): +            self._extract_attached_key(msg.get_payload(), fromAddress)          return msgtuple -    def _extract_openpgp_header(self, msg, header): +    def _extract_openpgp_header(self, header, address):          """          Import keys from the OpenPGP header -        :param msg: parsed email -        :type msg: email.Message          :param header: OpenPGP header string          :type header: str +        :param address: email address in the from header +        :type address: str          """          fields = dict([f.strip(' ').split('=') for f in header.split(';')])          if 'url' in fields:              url = shlex.split(fields['url'])[0]  # remove quotations -            _, fromAddress = parseaddr(msg['from'])              urlparts = urlparse(url) -            fromHostname = fromAddress.split('@')[1] +            addressHostname = address.split('@')[1]              if (urlparts.scheme == 'https' -                    and urlparts.hostname == fromHostname): +                    and urlparts.hostname == addressHostname):                  try: -                    self._keymanager.fetch_key(fromAddress, url, OpenPGPKey) +                    self._keymanager.fetch_key(address, url, OpenPGPKey)                      logger.info("Imported key from header %s" % (url,))                  except keymanager_errors.KeyNotFound:                      logger.warning("Url from OpenPGP header %s failed" @@ -632,13 +636,32 @@ class LeapIncomingMail(object):                  except keymanager_errors.KeyAttributesDiffer:                      logger.warning("Key from OpenPGP header url %s didn't "                                     "match the from address %s" -                                   % (url, fromAddress)) +                                   % (url, address))              else:                  logger.debug("No valid url on OpenPGP header %s" % (url,))          else:              logger.debug("There is no url on the OpenPGP header: %s"                           % (header,)) +    def _extract_attached_key(self, attachments, address): +        """ +        Import keys from the attachments + +        :param attachments: email attachment list +        :type attachments: list(email.Message) +        :param address: email address in the from header +        :type address: str +        """ +        MIME_KEY = "application/pgp-keys" + +        for attachment in attachments: +            if MIME_KEY == attachment.get_content_type(): +                logger.debug("Add key from attachment") +                self._keymanager.put_raw_key( +                    attachment.get_payload(), +                    OpenPGPKey, +                    address=address) +      def _add_message_locally(self, msgtuple):          """          Adds a message to local inbox and delete it from the incoming db diff --git a/mail/src/leap/mail/imap/tests/test_incoming_mail.py b/mail/src/leap/mail/imap/tests/test_incoming_mail.py index 5b72fac..ce6d56a 100644 --- a/mail/src/leap/mail/imap/tests/test_incoming_mail.py +++ b/mail/src/leap/mail/imap/tests/test_incoming_mail.py @@ -24,6 +24,8 @@ Test case for leap.email.imap.fetch  import json +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart  from email.parser import Parser  from mock import Mock  from twisted.trial import unittest @@ -105,13 +107,13 @@ subject: independence of cyberspace          email = self._create_incoming_email(message.as_string())          self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])          self.fetcher._keymanager.fetch_key = Mock() -        d = self.fetcher.fetch()          def fetch_key_called(ret):              self.fetcher._keymanager.fetch_key.assert_called_once_with(                  self.FROM_ADDRESS, KEYURL, OpenPGPKey) -        d.addCallback(fetch_key_called) +        d = self.fetcher.fetch() +        d.addCallback(fetch_key_called)          return d      def testExtractOpenPGPHeaderInvalidUrl(self): @@ -126,12 +128,35 @@ subject: independence of cyberspace          email = self._create_incoming_email(message.as_string())          self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])          self.fetcher._keymanager.fetch_key = Mock() -        d = self.fetcher.fetch()          def fetch_key_called(ret):              self.assertFalse(self.fetcher._keymanager.fetch_key.called) + +        d = self.fetcher.fetch()          d.addCallback(fetch_key_called) +        return d +    def testExtractAttachedKey(self): +        """ +        Test the OpenPGP header key extraction +        """ +        KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." + +        message = MIMEMultipart() +        message.add_header("from", self.FROM_ADDRESS) +        key = MIMEApplication("", "pgp-keys") +        key.set_payload(KEY) +        message.attach(key) +        email = self._create_incoming_email(message.as_string()) +        self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]) +        self.fetcher._keymanager.put_raw_key = Mock() + +        def put_raw_key_called(ret): +            self.fetcher._keymanager.put_raw_key.assert_called_once_with( +                KEY, OpenPGPKey, address=self.FROM_ADDRESS) + +        d = self.fetcher.fetch() +        d.addCallback(put_raw_key_called)          return d      def _create_incoming_email(self, email_str): | 
