diff options
| -rw-r--r-- | CHANGELOG | 8 | ||||
| -rw-r--r-- | src/leap/keymanager/__init__.py | 54 | ||||
| -rw-r--r-- | src/leap/keymanager/errors.py | 12 | ||||
| -rw-r--r-- | src/leap/keymanager/keys.py | 9 | ||||
| -rw-r--r-- | src/leap/keymanager/openpgp.py | 107 | ||||
| -rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 14 | 
6 files changed, 171 insertions, 33 deletions
| @@ -1,3 +1,11 @@ +0.3.6 Nov 15: +  o Default encoding to 'utf-8' in case of system encodings not +    set. Closes #4427. +  o Add verification of detached signatures. Closes #4375. +  o New openpgp method: parse_ascii_keys. +  o Expose openpgp methods in keymanager (parse_ascii_keys, put_key, +    delete_key). +  0.3.5 Nov 1:    o Return unicode decrypted text to avoid encoding issues. Related to      #4000. diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index a5505981..dbc54891 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -473,15 +473,18 @@ class KeyManager(object):              data, privkey, digest_algo=digest_algo, clearsign=clearsign,              detach=detach, binary=binary) -    def verify(self, data, pubkey): +    def verify(self, data, pubkey, detached_sig=None):          """ -        Verify signed C{data} with C{pubkey}. +        Verify signed C{data} with C{pubkey}, eventually using +        C{detached_sig}.          :param data: The data to be verified.          :type data: str -          :param pubkey: The public key to be used on verification.          :type pubkey: EncryptionKey +        :param detached_sig: A detached signature. If given, C{data} is +                             verified using this detached signature. +        :type detached_sig: str          :return: The signed data.          :rtype: str @@ -489,7 +492,50 @@ class KeyManager(object):          leap_assert_type(pubkey, EncryptionKey)          leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')          leap_assert(pubkey.private is False, 'Key is not public.') -        return self._wrapper_map[pubkey.__class__].verify(data, pubkey) +        return self._wrapper_map[pubkey.__class__].verify( +            data, pubkey, detached_sig=detached_sig) + +    def parse_openpgp_ascii_key(self, key_data): +        """ +        Parses an ascii armored key (or key pair) data and returns +        the OpenPGPKey keys. + +        :param key_data: the key data to be parsed. +        :type key_data: str or unicode + +        :returns: the public key and private key (if applies) for that data. +        :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey) +                the tuple may have one or both components None +        """ +        return self._wrapper_map[OpenPGPKey].parse_ascii_key(key_data) + +    def delete_key(self, key): +        """ +        Remove C{key} from storage. + +        May raise: +            openpgp.errors.KeyNotFound +            openpgp.errors.KeyAttributesDiffer + +        :param key: The key to be removed. +        :type key: EncryptionKey +        """ +        try: +            self._wrapper_map[type(key)].delete_key(key) +        except IndexError as e: +            leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) + +    def put_key(self, key): +        """ +        Put C{key} in local storage. + +        :param key: The key to be stored. +        :type key: OpenPGPKey +        """ +        try: +            self._wrapper_map[type(key)].put_key(key) +        except IndexError as e: +            leap_assert(False, "Unsupported key type. Error {0!r}".format(e))  from ._version import get_versions  __version__ = get_versions()['version'] diff --git a/src/leap/keymanager/errors.py b/src/leap/keymanager/errors.py index 89949d29..27180dbd 100644 --- a/src/leap/keymanager/errors.py +++ b/src/leap/keymanager/errors.py @@ -84,3 +84,15 @@ class SignFailed(Exception):      Raised when failed to sign.      """      pass + + +class KeyAddressMismatch(Exception): +    """ +    A mismatch between addresses. +    """ + + +class KeyFingerprintMismatch(Exception): +    """ +    A mismatch between fingerprints. +    """ diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py index b8e88d46..ec1bfeb4 100644 --- a/src/leap/keymanager/keys.py +++ b/src/leap/keymanager/keys.py @@ -340,15 +340,18 @@ class EncryptionScheme(object):          pass      @abstractmethod -    def verify(self, data, pubkey): +    def verify(self, data, pubkey, detached_sig=None):          """ -        Verify signed C{data} with C{pubkey}. +        Verify signed C{data} with C{pubkey}, eventually using +        C{detached_sig}.          :param data: The data to be verified.          :type data: str -          :param pubkey: The public key to be used on verification.          :type pubkey: EncryptionKey +        :param detached_sig: A detached signature. If given, C{data} is +                             verified against this sdetached signature. +        :type detached_sig: str          :return: The signed data.          :rtype: str diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 8ec86391..f6223d57 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -21,17 +21,21 @@ Infrastructure for using OpenPGP keys in Key Manager.  """ +import locale  import logging  import os  import re  import shutil +import sys  import tempfile -import locale + +from contextlib import closing  from gnupg import GPG  from gnupg.gnupg import GPGUtilities +from gnupg._util import _make_binary_stream -from leap.common.check import leap_assert, leap_assert_type +from leap.common.check import leap_assert, leap_assert_type, leap_check  from leap.keymanager import errors  from leap.keymanager.keys import (      EncryptionKey, @@ -46,6 +50,10 @@ from leap.keymanager.keys import (  logger = logging.getLogger(__name__) +# +# A temporary GPG keyring wrapped to provide OpenPGP functionality. +# +  class TempGPGWrapper(object):      """      A context manager that wraps a temporary GPG keyring which only contains @@ -243,7 +251,7 @@ class OpenPGPScheme(EncryptionScheme):                  key_length=4096,                  name_real=address,                  name_email=address, -                name_comment='Generated by LEAP Key Manager.') +                name_comment='')              logger.info("About to generate keys... This might take SOME time.")              gpg.gen_key(params)              logger.info("Keys for %s have been successfully " @@ -295,16 +303,22 @@ class OpenPGPScheme(EncryptionScheme):              raise errors.KeyNotFound(address)          return build_key_from_dict(OpenPGPKey, address, doc.content) -    def put_ascii_key(self, key_data): +    def parse_ascii_key(self, key_data):          """ -        Put key contained in ascii-armored C{key_data} in local storage. +        Parses an ascii armored key (or key pair) data and returns +        the OpenPGPKey keys. -        :param key_data: The key data to be stored. +        :param key_data: the key data to be parsed.          :type key_data: str or unicode + +        :returns: the public key and private key (if applies) for that data. +        :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey) +                the tuple may have one or both components None          """          leap_assert_type(key_data, (str, unicode))          # TODO: add more checks for correct key data.          leap_assert(key_data is not None, 'Data does not represent a key.') +        mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'          with self._temporary_gpgwrapper() as gpg:              # TODO: inspect result, or use decorator @@ -317,31 +331,54 @@ class OpenPGPScheme(EncryptionScheme):              except IndexError:                  pass              pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring +              # extract adress from first uid on key -            match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop()) +            match = re.match(mail_regex, pubkey['uids'].pop())              leap_assert(match is not None, 'No user address in key data.')              address = match.group(1) +              if privkey is not None: -                match = re.match( -                    '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop()) +                match = re.match(mail_regex, privkey['uids'].pop())                  leap_assert(match is not None, 'No user address in key data.')                  privaddress = match.group(1) -                leap_assert( -                    address == privaddress, -                    'Addresses in pub and priv key differ.') -                leap_assert( -                    pubkey['fingerprint'] == privkey['fingerprint'], -                    'Fingerprints for pub and priv key differ.') -                # insert private key in storage + +                # build private key                  openpgp_privkey = _build_key_from_gpg( -                    address, privkey, +                    privaddress, privkey,                      gpg.export_keys(privkey['fingerprint'], secret=True)) -                self.put_key(openpgp_privkey) -            # insert public key in storage + +                leap_check(address == privaddress, +                           'Addresses in public and private key differ.', +                           errors.KeyAddressMismatch) +                leap_check(pubkey['fingerprint'] == privkey['fingerprint'], +                           'Fingerprints for public and private key differ.', +                           errors.KeyFingerprintMismatch) + +            # build public key              openpgp_pubkey = _build_key_from_gpg(                  address, pubkey,                  gpg.export_keys(pubkey['fingerprint'], secret=False)) + +            return (openpgp_pubkey, openpgp_privkey) + +    def put_ascii_key(self, key_data): +        """ +        Put key contained in ascii-armored C{key_data} in local storage. + +        :param key_data: The key data to be stored. +        :type key_data: str or unicode +        """ +        leap_assert_type(key_data, (str, unicode)) + +        try: +            openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data) +        except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e: +            leap_assert(False, repr(e)) + +        if openpgp_pubkey is not None:              self.put_key(openpgp_pubkey) +        if openpgp_privkey is not None: +            self.put_key(openpgp_privkey)      def put_key(self, key):          """ @@ -387,10 +424,14 @@ class OpenPGPScheme(EncryptionScheme):          """          Remove C{key} from storage. +        May raise: +            errors.KeyNotFound +            errors.KeyAttributesDiffer +          :param key: The key to be removed.          :type key: EncryptionKey          """ -        leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.') +        leap_assert_type(key, OpenPGPKey)          stored_key = self.get_key(key.address, private=key.private)          if stored_key is None:              raise errors.KeyNotFound(key) @@ -510,6 +551,10 @@ class OpenPGPScheme(EncryptionScheme):              # https://github.com/isislovecruft/python-gnupg/\              #   blob/master/gnupg/_meta.py#L121              encoding = locale.getpreferredencoding() +            if encoding is None: +                encoding = sys.stdin.encoding +            if encoding is None: +                encoding = 'utf-8'              return result.data.decode(encoding, 'replace')      def is_encrypted(self, data): @@ -570,15 +615,18 @@ class OpenPGPScheme(EncryptionScheme):                  '%s != %s' % (rfprint, kfprint))          return result.data -    def verify(self, data, pubkey): +    def verify(self, data, pubkey, detached_sig=None):          """ -        Verify signed C{data} with C{pubkey}. +        Verify signed C{data} with C{pubkey}, eventually using +        C{detached_sig}.          :param data: The data to be verified.          :type data: str -          :param pubkey: The public key to be used on verification.          :type pubkey: OpenPGPKey +        :param detached_sig: A detached signature. If given, C{data} is +                             verified against this detached signature. +        :type detached_sig: str          :return: The ascii-armored signed data.          :rtype: str @@ -586,7 +634,18 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert_type(pubkey, OpenPGPKey)          leap_assert(pubkey.private is False)          with self._temporary_gpgwrapper(pubkey) as gpg: -            result = gpg.verify(data) +            result = None +            if detached_sig is None: +                result = gpg.verify(data) +            else: +                # to verify using a detached sig we have to use +                # gpg.verify_file(), which receives the data as a binary +                # stream and the name of a file containing the signature. +                sf, sfname = tempfile.mkstemp() +                with os.fdopen(sf, 'w') as sfd: +                    sfd.write(detached_sig) +                with closing(_make_binary_stream(data, gpg._encoding)) as df: +                    result = gpg.verify_file(df, sig_file=sfname)              gpgpubkey = gpg.list_keys().pop()              valid = result.valid              rfprint = result.fingerprint diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 25126047..67676e9c 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -294,7 +294,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True) -        signed = pgp.sign(data, privkey) +        signed = pgp.sign(data, privkey, detach=False)          pubkey = pgp.get_key(ADDRESS, private=False)          self.assertTrue(pgp.verify(signed, pubkey)) @@ -314,6 +314,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):              encrypted_and_signed, privkey2, verify=pubkey)          self.assertTrue(data, res) +    def test_sign_verify_detached_sig(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH) +        pgp.put_ascii_key(PRIVATE_KEY) +        data = 'data' +        privkey = pgp.get_key(ADDRESS, private=True) +        signature = pgp.sign(data, privkey, detach=True) +        pubkey = pgp.get_key(ADDRESS, private=False) +        self.assertTrue(pgp.verify(data, pubkey, detached_sig=signature)) +  class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): @@ -490,7 +500,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):          privkey = km.get_key(              ADDRESS, OpenPGPKey, private=True, fetch_remote=False)          # encrypt -        signdata = km.sign(self.RAW_DATA, privkey) +        signdata = km.sign(self.RAW_DATA, privkey, detach=False)          self.assertNotEqual(self.RAW_DATA, signdata)          # get public key for verifying          pubkey = km.get_key( | 
