diff options
Diffstat (limited to 'keymanager/src/leap/keymanager/openpgp.py')
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 120 | 
1 files changed, 54 insertions, 66 deletions
| diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 52655d0..4f96574 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -38,6 +38,7 @@ from leap.keymanager.keys import (      build_key_from_dict,      TYPE_ID_PRIVATE_INDEX,      TYPE_ADDRESS_PRIVATE_INDEX, +    KEY_ADDRESS_KEY,      KEY_FINGERPRINT_KEY,      KEY_DATA_KEY,      KEY_ID_KEY, @@ -110,9 +111,9 @@ class TempGPGWrapper(object):          # itself is enough to also have the public key in the keyring,          # and we want to count the keys afterwards. -        privaddrs = map(lambda privkey: privkey.address[0], privkeys) +        privids = map(lambda privkey: privkey.key_id, privkeys)          publkeys = filter( -            lambda pubkey: pubkey.address[0] not in privaddrs, publkeys) +            lambda pubkey: pubkey.key_id not in privids, publkeys)          listkeys = lambda: self._gpg.list_keys()          listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -163,16 +164,13 @@ class TempGPGWrapper(object):              shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data):      """ -    Build an OpenPGPKey for C{address} based on C{key} from -    local gpg storage. +    Build an OpenPGPKey based on C{key} from local gpg storage.      ASCII armored GPG key data has to be queried independently in this      wrapper, so we receive it in C{key_data}. -    :param address: The address bound to the key. -    :type address: str      :param key: Key obtained from GPG storage.      :type key: dict      :param key_data: Key data obtained from GPG storage. @@ -183,9 +181,12 @@ def _build_key_from_gpg(address, key, key_data):      expiry_date = None      if key['expires']:          expiry_date = datetime.fromtimestamp(int(key['expires'])) +    address = [] +    for uid in key['uids']: +        address.append(_parse_address(uid))      return OpenPGPKey( -        [address], +        address,          key_id=key['keyid'],          fingerprint=key['fingerprint'],          key_data=key_data, @@ -198,14 +199,18 @@ def _build_key_from_gpg(address, key, key_data):  def _parse_address(address):      """ -    Remove the identity suffix after the '+' until the '@' +    Remove name, '<', '>' and the identity suffix after the '+' until the '@'      e.g.: test_user+something@provider.com becomes test_user@provider.com      since the key belongs to the identity without the '+' suffix.      :type address: str      :rtype: str      """ -    return re.sub(r'\+.*\@', '@', address) +    mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' +    match = re.match(mail_regex, address) +    if match is None: +        return None +    return ''.join(match.group(2, 4))  # @@ -289,15 +294,17 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert(                  len(key['uids']) is 1,  # with just one uid!                  'Wrong number of uids for key: %d.' % len(key['uids'])) -            leap_assert( -                re.match('.*<%s>$' % address, key['uids'][0]) is not None, -                'Key not correctly bound to address.') +            uid_match = False +            for uid in key['uids']: +                if re.match('.*<%s>$' % address, uid) is not None: +                    uid_match = True +                    return +            leap_assert(uid_match, 'Key not correctly bound to address.')              # insert both public and private keys in storage              for secret in [True, False]:                  key = gpg.list_keys(secret=secret).pop()                  openpgp_key = _build_key_from_gpg( -                    address, key, -                    gpg.export_keys(key['fingerprint'], secret=secret)) +                    key, gpg.export_keys(key['fingerprint'], secret=secret))                  self.put_key(openpgp_key, address)          return self.get_key(address, private=True) @@ -320,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):          doc = self._get_key_doc(address, private)          if doc is None:              raise errors.KeyNotFound(address) -        return build_key_from_dict(OpenPGPKey, address, doc.content) +        leap_assert( +            address in doc.content[KEY_ADDRESS_KEY], +            'Wrong address in key data.') +        return build_key_from_dict(OpenPGPKey, doc.content)      def parse_ascii_key(self, key_data):          """ @@ -337,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert_type(key_data, (str, unicode))          # TODO: add more checks for correct key data.          leap_assert(key_data is not None, 'Data does not represent a key.') -        mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'          with self._temporary_gpgwrapper() as gpg:              # TODO: inspect result, or use decorator @@ -354,44 +363,30 @@ class OpenPGPScheme(EncryptionScheme):              except IndexError:                  return (None, None) -            # extract adress from first uid on key -            match = re.match(mail_regex, pubkey['uids'].pop()) -            leap_assert(match is not None, 'No user address in key data.') -            address = match.group(1) -              openpgp_privkey = None              if privkey is not None: -                match = re.match(mail_regex, privkey['uids'].pop()) -                leap_assert(match is not None, 'No user address in key data.') -                privaddress = match.group(1) -                  # build private key                  openpgp_privkey = _build_key_from_gpg( -                    privaddress, privkey, +                    privkey,                      gpg.export_keys(privkey['fingerprint'], secret=True)) - -                leap_check(address == privaddress, -                           'Addresses in public and private key differ.', -                           errors.KeyAddressMismatch)                  leap_check(pubkey['fingerprint'] == privkey['fingerprint'],                             'Fingerprints for public and private key differ.',                             errors.KeyFingerprintMismatch)              # build public key              openpgp_pubkey = _build_key_from_gpg( -                address, pubkey, +                pubkey,                  gpg.export_keys(pubkey['fingerprint'], secret=False))              return (openpgp_pubkey, openpgp_privkey) -    def put_ascii_key(self, key_data, address=None): +    def put_ascii_key(self, key_data, address):          """          Put key contained in ascii-armored C{key_data} in local storage.          :param key_data: The key data to be stored.          :type key_data: str or unicode -        :param address: address for which this key will be active. If not set -                        all the uids will be activated +        :param address: address for which this key will be active          :type address: str          """          leap_assert_type(key_data, (str, unicode)) @@ -407,23 +402,17 @@ class OpenPGPScheme(EncryptionScheme):          if openpgp_privkey is not None:              self.put_key(openpgp_privkey, address) -    def put_key(self, key, address=None): +    def put_key(self, key, address):          """          Put C{key} in local storage.          :param key: The key to be stored.          :type key: OpenPGPKey -        :param address: address for which this key will be active. If not set -                        all the uids will be activated +        :param address: address for which this key will be active.          :type address: str          """ -        if address is not None: -            active_address = [_parse_address(address)] -        else: -            active_address = key.address -          self._put_key_doc(key) -        self._put_active_doc(key, active_address) +        self._put_active_doc(key, address)      def _put_key_doc(self, key):          """ @@ -445,7 +434,7 @@ class OpenPGPScheme(EncryptionScheme):                      gpg.import_keys(key.key_data)                      gpgkey = gpg.list_keys(secret=key.private).pop()                      key = _build_key_from_gpg( -                        key.address[0], gpgkey, +                        gpgkey,                          gpg.export_keys(gpgkey['fingerprint'],                                          secret=key.private))                  doc.set_json(key.get_json()) @@ -458,31 +447,30 @@ class OpenPGPScheme(EncryptionScheme):          else:              self._soledad.create_doc_from_json(key.get_json()) -    def _put_active_doc(self, key, addresses): +    def _put_active_doc(self, key, address):          """          Put active key document in soledad          :type key: OpenPGPKey -        :type addresses: list(str) -        """ -        for address in addresses: -            docs = self._soledad.get_from_index( -                TYPE_ADDRESS_PRIVATE_INDEX, -                self.ACTIVE_TYPE, -                address, -                '1' if key.private else '0') -            if len(docs) == 1: -                doc = docs.pop() -                doc.set_json(key.get_active_json(address)) -                self._soledad.put_doc(doc) -            else: -                if len(docs) > 1: -                    logger.error("There is more than one active key document " -                                 "for the address %s" % (address,)) -                    for doc in docs: -                        self._soledad.delete_doc(doc) -                self._soledad.create_doc_from_json( -                    key.get_active_json(address)) +        :type addresses: str +        """ +        docs = self._soledad.get_from_index( +            TYPE_ADDRESS_PRIVATE_INDEX, +            self.ACTIVE_TYPE, +            address, +            '1' if key.private else '0') +        if len(docs) == 1: +            doc = docs.pop() +            doc.set_json(key.get_active_json(address)) +            self._soledad.put_doc(doc) +        else: +            if len(docs) > 1: +                logger.error("There is more than one active key document " +                             "for the address %s" % (address,)) +                for doc in docs: +                    self._soledad.delete_doc(doc) +            self._soledad.create_doc_from_json( +                key.get_active_json(address))      def _get_key_doc(self, address, private=False):          """ | 
