diff options
Diffstat (limited to 'src/leap/keymanager/openpgp.py')
-rw-r--r-- | src/leap/keymanager/openpgp.py | 212 |
1 files changed, 149 insertions, 63 deletions
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index d3c305e2..3f298f71 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -36,12 +36,14 @@ from leap.keymanager.keys import ( EncryptionScheme, is_address, build_key_from_dict, - KEYMANAGER_KEY_TAG, - TAGS_ADDRESS_PRIVATE_INDEX, + TYPE_ID_PRIVATE_INDEX, + TYPE_ADDRESS_PRIVATE_INDEX, + KEY_ADDRESS_KEY, KEY_FINGERPRINT_KEY, KEY_DATA_KEY, + KEY_ID_KEY, + KEYMANAGER_ACTIVE_TYPE, ) -from leap.keymanager.validation import ValidationLevel logger = logging.getLogger(__name__) @@ -109,9 +111,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address, privkeys) + privids = map(lambda privkey: privkey.key_id, privkeys) publkeys = filter( - lambda pubkey: pubkey.address not in privaddrs, publkeys) + lambda pubkey: pubkey.key_id not in privids, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -162,16 +164,13 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data): """ - Build an OpenPGPKey for C{address} based on C{key} from - local gpg storage. + Build an OpenPGPKey based on C{key} from local gpg storage. ASCII armored GPG key data has to be queried independently in this wrapper, so we receive it in C{key_data}. - :param address: The address bound to the key. - :type address: str :param key: Key obtained from GPG storage. :type key: dict :param key_data: Key data obtained from GPG storage. @@ -182,6 +181,9 @@ def _build_key_from_gpg(address, key, key_data): expiry_date = None if key['expires']: expiry_date = datetime.fromtimestamp(int(key['expires'])) + address = [] + for uid in key['uids']: + address.append(_parse_address(uid)) return OpenPGPKey( address, @@ -189,12 +191,28 @@ def _build_key_from_gpg(address, key, key_data): fingerprint=key['fingerprint'], key_data=key_data, private=True if key['type'] == 'sec' else False, - length=key['length'], + length=int(key['length']), expiry_date=expiry_date, - validation=ValidationLevel.Weak_Chain, + refreshed_at=datetime.now(), ) +def _parse_address(address): + """ + Remove name, '<', '>' and the identity suffix after the '+' until the '@' + e.g.: test_user+something@provider.com becomes test_user@provider.com + since the key belongs to the identity without the '+' suffix. + + :type address: str + :rtype: str + """ + mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' + match = re.match(mail_regex, address) + if match is None: + return None + return ''.join(match.group(2, 4)) + + # # The OpenPGP wrapper # @@ -211,6 +229,10 @@ class OpenPGPScheme(EncryptionScheme): signing and verification). """ + # type used on the soledad documents + KEY_TYPE = OpenPGPKey.__name__ + ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE + def __init__(self, soledad, gpgbinary=None): """ Initialize the OpenPGP wrapper. @@ -272,16 +294,18 @@ class OpenPGPScheme(EncryptionScheme): leap_assert( len(key['uids']) is 1, # with just one uid! 'Wrong number of uids for key: %d.' % len(key['uids'])) - leap_assert( - re.match('.*<%s>$' % address, key['uids'][0]) is not None, - 'Key not correctly bound to address.') + uid_match = False + for uid in key['uids']: + if re.match('.*<%s>$' % address, uid) is not None: + uid_match = True + return + leap_assert(uid_match, 'Key not correctly bound to address.') # insert both public and private keys in storage for secret in [True, False]: key = gpg.list_keys(secret=secret).pop() openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'], secret=secret)) - self.put_key(openpgp_key) + key, gpg.export_keys(key['fingerprint'], secret=secret)) + self.put_key(openpgp_key, address) return self.get_key(address, private=True) @@ -298,15 +322,15 @@ class OpenPGPScheme(EncryptionScheme): :rtype: OpenPGPKey @raise KeyNotFound: If the key was not found on local storage. """ - # Remove the identity suffix after the '+' until the '@' - # e.g.: test_user+something@provider.com becomes test_user@provider.com - # since the key belongs to the identity without the '+' suffix. - address = re.sub(r'\+.*\@', '@', address) + address = _parse_address(address) doc = self._get_key_doc(address, private) if doc is None: raise errors.KeyNotFound(address) - return build_key_from_dict(OpenPGPKey, address, doc.content) + leap_assert( + address in doc.content[KEY_ADDRESS_KEY], + 'Wrong address in key data.') + return build_key_from_dict(OpenPGPKey, doc.content) def parse_ascii_key(self, key_data): """ @@ -323,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(key_data, (str, unicode)) # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - mail_regex = '.*<([\w.-]+@[\w.-]+)>.*' with self._temporary_gpgwrapper() as gpg: # TODO: inspect result, or use decorator @@ -340,42 +363,31 @@ class OpenPGPScheme(EncryptionScheme): except IndexError: return (None, None) - # extract adress from first uid on key - match = re.match(mail_regex, pubkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - address = match.group(1) - openpgp_privkey = None if privkey is not None: - match = re.match(mail_regex, privkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - privaddress = match.group(1) - # build private key openpgp_privkey = _build_key_from_gpg( - privaddress, privkey, + privkey, gpg.export_keys(privkey['fingerprint'], secret=True)) - - leap_check(address == privaddress, - 'Addresses in public and private key differ.', - errors.KeyAddressMismatch) leap_check(pubkey['fingerprint'] == privkey['fingerprint'], 'Fingerprints for public and private key differ.', errors.KeyFingerprintMismatch) # build public key openpgp_pubkey = _build_key_from_gpg( - address, pubkey, + pubkey, gpg.export_keys(pubkey['fingerprint'], secret=False)) return (openpgp_pubkey, openpgp_privkey) - def put_ascii_key(self, key_data): + def put_ascii_key(self, key_data, address): """ Put key contained in ascii-armored C{key_data} in local storage. :param key_data: The key data to be stored. :type key_data: str or unicode + :param address: address for which this key will be active + :type address: str """ leap_assert_type(key_data, (str, unicode)) @@ -386,21 +398,35 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(False, repr(e)) if openpgp_pubkey is not None: - self.put_key(openpgp_pubkey) + self.put_key(openpgp_pubkey, address) if openpgp_privkey is not None: - self.put_key(openpgp_privkey) + self.put_key(openpgp_privkey, address) - def put_key(self, key): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored. :type key: OpenPGPKey + :param address: address for which this key will be active. + :type address: str """ - doc = self._get_key_doc(key.address, private=key.private) - if doc is None: - self._soledad.create_doc_from_json(key.get_json()) - else: + self._put_key_doc(key) + self._put_active_doc(key, address) + + def _put_key_doc(self, key): + """ + Put key document in soledad + + :type key: OpenPGPKey + """ + docs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + if len(docs) != 0: + doc = docs.pop() if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]: # in case of an update of the key merge them with gnupg with self._temporary_gpgwrapper() as gpg: @@ -408,11 +434,43 @@ class OpenPGPScheme(EncryptionScheme): gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() key = _build_key_from_gpg( - key.address, gpgkey, + gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) - doc.set_json(key.get_json()) + doc.set_json(key.get_json()) + self._soledad.put_doc(doc) + else: + logger.critical( + "Can't put a key whith the same key_id and different " + "fingerprint: %s, %s" + % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY])) + else: + self._soledad.create_doc_from_json(key.get_json()) + + def _put_active_doc(self, key, address): + """ + Put active key document in soledad + + :type key: OpenPGPKey + :type addresses: str + """ + docs = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, + address, + '1' if key.private else '0') + if len(docs) == 1: + doc = docs.pop() + doc.set_json(key.get_active_json(address)) self._soledad.put_doc(doc) + else: + if len(docs) > 1: + logger.error("There is more than one active key document " + "for the address %s" % (address,)) + for doc in docs: + self._soledad.delete_doc(doc) + self._soledad.create_doc_from_json( + key.get_active_json(address)) def _get_key_doc(self, address, private=False): """ @@ -427,17 +485,26 @@ class OpenPGPScheme(EncryptionScheme): :return: The document with the key or None if it does not exist. :rtype: leap.soledad.document.SoledadDocument """ - doclist = self._soledad.get_from_index( - TAGS_ADDRESS_PRIVATE_INDEX, - KEYMANAGER_KEY_TAG, + activedoc = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, address, '1' if private else '0') - if len(doclist) is 0: + if len(activedoc) is 0: return None leap_assert( + len(activedoc) is 1, + 'Found more than one key for address %s!' % (address,)) + + key_id = activedoc[0].content[KEY_ID_KEY] + doclist = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key_id, + '1' if private else '0') + leap_assert( len(doclist) is 1, - 'Found more than one %s key for address!' % - 'private' if private else 'public') + 'There is %d keys for id %s!' % (len(doclist), key_id)) return doclist.pop() def delete_key(self, key): @@ -446,18 +513,37 @@ class OpenPGPScheme(EncryptionScheme): May raise: errors.KeyNotFound - errors.KeyAttributesDiffer :param key: The key to be removed. :type key: EncryptionKey """ leap_assert_type(key, OpenPGPKey) - stored_key = self.get_key(key.address, private=key.private) - if stored_key is None: + activedocs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.ACTIVE_TYPE, + key.key_id, + '1' if key.private else '0') + for doc in activedocs: + self._soledad.delete_doc(doc) + + docs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + if len(docs) == 0: + raise errors.KeyNotFound(key) + if len(docs) > 1: + logger.critical("There is more than one key for key_id %s" + % key.key_id) + + doc = None + for d in docs: + if d.content['fingerprint'] == key.fingerprint: + doc = d + break + if doc is None: raise errors.KeyNotFound(key) - if stored_key.__dict__ != key.__dict__: - raise errors.KeyAttributesDiffer(key) - doc = self._get_key_doc(key.address, key.private) self._soledad.delete_doc(doc) # @@ -656,8 +742,8 @@ class OpenPGPScheme(EncryptionScheme): verified against this detached signature. :type detached_sig: str - :return: The ascii-armored signed data. - :rtype: str + :return: signature matches + :rtype: bool """ leap_assert_type(pubkey, OpenPGPKey) leap_assert(pubkey.private is False) |