diff options
author | drebs <drebs@leap.se> | 2014-11-25 12:30:40 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-11-25 12:30:40 -0200 |
commit | 22fddf85a3fa50f7ab164b2a9311cb01a9818e62 (patch) | |
tree | 84f4552818b9efa7858a3e9f5f5c0a94bd04a7cc | |
parent | 27776fbab6fe963082a882dfb5232c54b0195d5f (diff) | |
parent | 2f29739946db6cd360296639830a3bfe7d8c3f31 (diff) |
Merge branch 'feature/6299_new_doc' into develop
-rw-r--r-- | changes/feature-6212_multi_uid_support | 1 | ||||
-rw-r--r-- | changes/feature-6299_new_doc | 2 | ||||
-rw-r--r-- | src/leap/keymanager/__init__.py | 35 | ||||
-rw-r--r-- | src/leap/keymanager/keys.py | 108 | ||||
-rw-r--r-- | src/leap/keymanager/openpgp.py | 212 | ||||
-rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 103 | ||||
-rw-r--r-- | src/leap/keymanager/tests/test_validation.py | 21 | ||||
-rw-r--r-- | src/leap/keymanager/validation.py | 4 |
8 files changed, 314 insertions, 172 deletions
diff --git a/changes/feature-6212_multi_uid_support b/changes/feature-6212_multi_uid_support new file mode 100644 index 00000000..495b8c48 --- /dev/null +++ b/changes/feature-6212_multi_uid_support @@ -0,0 +1 @@ +- Multi uid support (closes #6212) diff --git a/changes/feature-6299_new_doc b/changes/feature-6299_new_doc new file mode 100644 index 00000000..c8933386 --- /dev/null +++ b/changes/feature-6299_new_doc @@ -0,0 +1,2 @@ +- new soledad doc struct for encryption-keys (closes #6299) +- keep old key after upgrade (closes #6262) diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index 156aaf86..e4be9c47 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -134,7 +134,7 @@ class KeyManager(object): Return key class from string representation of key type. """ return filter( - lambda klass: str(klass) == ktype, + lambda klass: klass.__name__ == ktype, self._wrapper_map).pop() def _get(self, uri, data=None): @@ -319,7 +319,6 @@ class KeyManager(object): return map( lambda doc: build_key_from_dict( self._key_class_from_type(doc.content['type']), - doc.content['address'], doc.content), self._soledad.get_from_index( TAGS_PRIVATE_INDEX, @@ -494,8 +493,8 @@ class KeyManager(object): verified using this detached signature. :type detached_sig: str - :return: The signed data. - :rtype: str + :return: signature matches + :rtype: bool """ leap_assert_type(pubkey, EncryptionKey) leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') @@ -519,24 +518,32 @@ class KeyManager(object): except IndexError as e: leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) - def put_key(self, key): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored :type key: EncryptionKey + :param address: address for which this key will be active + :type address: str + + :raises KeyAddressMismatch: if address doesn't match any uid on the key :raises KeyNotValidUpdate: if a key with the same uid exists and the new one is not a valid update for it """ + if address not in key.address: + raise KeyAddressMismatch("UID %s found, but expected %s" + % (str(key.address), address)) + try: - old_key = self._wrapper_map[type(key)].get_key(key.address, + old_key = self._wrapper_map[type(key)].get_key(address, private=key.private) except KeyNotFound: old_key = None if key.private or can_upgrade(key, old_key): try: - self._wrapper_map[type(key)].put_key(key) + self._wrapper_map[type(key)].put_key(key, address) except IndexError as e: leap_assert( False, "Unsupported key type. Error {0!r}".format(e)) @@ -544,7 +551,7 @@ class KeyManager(object): raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s" % (old_key.key_id, key.key_id)) - def put_raw_key(self, key, ktype, address=None, + def put_raw_key(self, key, ktype, address, validation=ValidationLevel.Weak_Chain): """ Put C{key} in local storage. @@ -553,7 +560,7 @@ class KeyManager(object): :type key: str :param ktype: the type of the key. :type ktype: subclass of EncryptionKey - :param address: if set used to check that the key is for this address + :param address: address for which this key will be active :type address: str :param validation: validation level for this key (default: 'Weak_Chain') @@ -564,12 +571,9 @@ class KeyManager(object): new one is not a valid update for it """ pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key) - if address is not None and address != pubkey.address: - raise KeyAddressMismatch("Key UID %s, but expected %s" - % (pubkey.address, address)) pubkey.validation = validation - self.put_key(pubkey) + self.put_key(pubkey, address) def fetch_key(self, address, uri, ktype, validation=ValidationLevel.Weak_Chain): @@ -600,12 +604,9 @@ class KeyManager(object): pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content) if pubkey is None: raise KeyNotFound(uri) - if pubkey.address != address: - raise KeyAddressMismatch("UID %s found, but expected %s" - % (pubkey.address, address)) pubkey.validation = validation - self.put_key(pubkey) + self.put_key(pubkey, address) from ._version import get_versions __version__ = get_versions()['version'] diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py index a61a8c79..7c732e35 100644 --- a/src/leap/keymanager/keys.py +++ b/src/leap/keymanager/keys.py @@ -27,6 +27,7 @@ except ImportError: import json # noqa import logging import re +import time from abc import ABCMeta, abstractmethod @@ -50,9 +51,11 @@ KEY_DATA_KEY = 'key_data' KEY_PRIVATE_KEY = 'private' KEY_LENGTH_KEY = 'length' KEY_EXPIRY_DATE_KEY = 'expiry_date' -KEY_FIRST_SEEN_AT_KEY = 'first_seen_at' KEY_LAST_AUDITED_AT_KEY = 'last_audited_at' +KEY_REFRESHED_AT_KEY = 'refreshed_at' KEY_VALIDATION_KEY = 'validation' +KEY_ENCR_USED_KEY = 'encr_used' +KEY_SIGN_USED_KEY = 'sign_used' KEY_TAGS_KEY = 'tags' @@ -61,6 +64,8 @@ KEY_TAGS_KEY = 'tags' # KEYMANAGER_KEY_TAG = 'keymanager-key' +KEYMANAGER_ACTIVE_TAG = 'keymanager-active' +KEYMANAGER_ACTIVE_TYPE = '-active' # @@ -68,14 +73,20 @@ KEYMANAGER_KEY_TAG = 'keymanager-key' # TAGS_PRIVATE_INDEX = 'by-tags-private' -TAGS_ADDRESS_PRIVATE_INDEX = 'by-tags-address-private' +TYPE_ID_PRIVATE_INDEX = 'by-type-id-private' +TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private' INDEXES = { TAGS_PRIVATE_INDEX: [ KEY_TAGS_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ], - TAGS_ADDRESS_PRIVATE_INDEX: [ - KEY_TAGS_KEY, + TYPE_ID_PRIVATE_INDEX: [ + KEY_TYPE_KEY, + KEY_ID_KEY, + 'bool(%s)' % KEY_PRIVATE_KEY, + ], + TYPE_ADDRESS_PRIVATE_INDEX: [ + KEY_TYPE_KEY, KEY_ADDRESS_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ] @@ -98,20 +109,15 @@ def is_address(address): return bool(re.match('[\w.-]+@[\w.-]+', address)) -def build_key_from_dict(kClass, address, kdict): +def build_key_from_dict(kClass, kdict): """ - Build an C{kClass} key bound to C{address} based on info in C{kdict}. + Build an C{kClass} key based on info in C{kdict}. - :param address: The address bound to the key. - :type address: str :param kdict: Dictionary with key data. :type kdict: dict :return: An instance of the key. :rtype: C{kClass} """ - leap_assert( - address == kdict[KEY_ADDRESS_KEY], - 'Wrong address in key data.') try: validation = toValidationLevel(kdict[KEY_VALIDATION_KEY]) except ValueError: @@ -119,24 +125,40 @@ def build_key_from_dict(kClass, address, kdict): (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY])) validation = ValidationLevel.Weak_Chain - expiry_date = None - if kdict[KEY_EXPIRY_DATE_KEY]: - expiry_date = datetime.fromtimestamp(int(kdict[KEY_EXPIRY_DATE_KEY])) + expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY]) + last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY]) + refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY]) return kClass( - address, + kdict[KEY_ADDRESS_KEY], key_id=kdict[KEY_ID_KEY], fingerprint=kdict[KEY_FINGERPRINT_KEY], key_data=kdict[KEY_DATA_KEY], private=kdict[KEY_PRIVATE_KEY], length=kdict[KEY_LENGTH_KEY], expiry_date=expiry_date, - first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY], - last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY], + last_audited_at=last_audited_at, + refreshed_at=refreshed_at, validation=validation, + encr_used=kdict[KEY_ENCR_USED_KEY], + sign_used=kdict[KEY_SIGN_USED_KEY], ) +def _to_datetime(unix_time): + if unix_time != 0: + return datetime.fromtimestamp(unix_time) + else: + return None + + +def _to_unix_time(date): + if date is not None: + return int(time.mktime(date.timetuple())) + else: + return 0 + + # # Abstraction for encryption keys # @@ -151,9 +173,10 @@ class EncryptionKey(object): __metaclass__ = ABCMeta - def __init__(self, address, key_id=None, fingerprint=None, - key_data=None, private=None, length=None, expiry_date=None, - validation=None, first_seen_at=None, last_audited_at=None): + def __init__(self, address, key_id="", fingerprint="", + key_data="", private=False, length=0, expiry_date=None, + validation=ValidationLevel.Weak_Chain, last_audited_at=None, + refreshed_at=None, encr_used=False, sign_used=False): self.address = address self.key_id = key_id self.fingerprint = fingerprint @@ -162,8 +185,10 @@ class EncryptionKey(object): self.length = length self.expiry_date = expiry_date self.validation = validation - self.first_seen_at = first_seen_at self.last_audited_at = last_audited_at + self.refreshed_at = refreshed_at + self.encr_used = encr_used + self.sign_used = sign_used def get_json(self): """ @@ -172,25 +197,44 @@ class EncryptionKey(object): :return: The JSON string describing this key. :rtype: str """ - expiry_str = "" - if self.expiry_date is not None: - expiry_str = self.expiry_date.strftime("%s") + expiry_date = _to_unix_time(self.expiry_date) + last_audited_at = _to_unix_time(self.last_audited_at) + refreshed_at = _to_unix_time(self.refreshed_at) return json.dumps({ KEY_ADDRESS_KEY: self.address, - KEY_TYPE_KEY: str(self.__class__), + KEY_TYPE_KEY: self.__class__.__name__, KEY_ID_KEY: self.key_id, KEY_FINGERPRINT_KEY: self.fingerprint, KEY_DATA_KEY: self.key_data, KEY_PRIVATE_KEY: self.private, KEY_LENGTH_KEY: self.length, - KEY_EXPIRY_DATE_KEY: expiry_str, + KEY_EXPIRY_DATE_KEY: expiry_date, + KEY_LAST_AUDITED_AT_KEY: last_audited_at, + KEY_REFRESHED_AT_KEY: refreshed_at, KEY_VALIDATION_KEY: str(self.validation), - KEY_FIRST_SEEN_AT_KEY: self.first_seen_at, - KEY_LAST_AUDITED_AT_KEY: self.last_audited_at, + KEY_ENCR_USED_KEY: self.encr_used, + KEY_SIGN_USED_KEY: self.sign_used, KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG], }) + def get_active_json(self, address): + """ + Return a JSON string describing this key. + + :param address: Address for wich the key is active + :type address: str + :return: The JSON string describing this key. + :rtype: str + """ + return json.dumps({ + KEY_ADDRESS_KEY: address, + KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE, + KEY_ID_KEY: self.key_id, + KEY_PRIVATE_KEY: self.private, + KEY_TAGS_KEY: [KEYMANAGER_ACTIVE_TAG], + }) + def __repr__(self): """ Representation of this class @@ -266,12 +310,14 @@ class EncryptionScheme(object): pass @abstractmethod - def put_key(self, key): + def put_key(self, key, address): """ Put a key in local storage. :param key: The key to be stored. :type key: EncryptionKey + :param address: address for which this key will be active. + :type address: str """ pass @@ -365,7 +411,7 @@ class EncryptionScheme(object): verified against this sdetached signature. :type detached_sig: str - :return: The signed data. - :rtype: str + :return: signature matches + :rtype: bool """ pass diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index d3c305e2..3f298f71 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -36,12 +36,14 @@ from leap.keymanager.keys import ( EncryptionScheme, is_address, build_key_from_dict, - KEYMANAGER_KEY_TAG, - TAGS_ADDRESS_PRIVATE_INDEX, + TYPE_ID_PRIVATE_INDEX, + TYPE_ADDRESS_PRIVATE_INDEX, + KEY_ADDRESS_KEY, KEY_FINGERPRINT_KEY, KEY_DATA_KEY, + KEY_ID_KEY, + KEYMANAGER_ACTIVE_TYPE, ) -from leap.keymanager.validation import ValidationLevel logger = logging.getLogger(__name__) @@ -109,9 +111,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address, privkeys) + privids = map(lambda privkey: privkey.key_id, privkeys) publkeys = filter( - lambda pubkey: pubkey.address not in privaddrs, publkeys) + lambda pubkey: pubkey.key_id not in privids, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -162,16 +164,13 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data): """ - Build an OpenPGPKey for C{address} based on C{key} from - local gpg storage. + Build an OpenPGPKey based on C{key} from local gpg storage. ASCII armored GPG key data has to be queried independently in this wrapper, so we receive it in C{key_data}. - :param address: The address bound to the key. - :type address: str :param key: Key obtained from GPG storage. :type key: dict :param key_data: Key data obtained from GPG storage. @@ -182,6 +181,9 @@ def _build_key_from_gpg(address, key, key_data): expiry_date = None if key['expires']: expiry_date = datetime.fromtimestamp(int(key['expires'])) + address = [] + for uid in key['uids']: + address.append(_parse_address(uid)) return OpenPGPKey( address, @@ -189,12 +191,28 @@ def _build_key_from_gpg(address, key, key_data): fingerprint=key['fingerprint'], key_data=key_data, private=True if key['type'] == 'sec' else False, - length=key['length'], + length=int(key['length']), expiry_date=expiry_date, - validation=ValidationLevel.Weak_Chain, + refreshed_at=datetime.now(), ) +def _parse_address(address): + """ + Remove name, '<', '>' and the identity suffix after the '+' until the '@' + e.g.: test_user+something@provider.com becomes test_user@provider.com + since the key belongs to the identity without the '+' suffix. + + :type address: str + :rtype: str + """ + mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' + match = re.match(mail_regex, address) + if match is None: + return None + return ''.join(match.group(2, 4)) + + # # The OpenPGP wrapper # @@ -211,6 +229,10 @@ class OpenPGPScheme(EncryptionScheme): signing and verification). """ + # type used on the soledad documents + KEY_TYPE = OpenPGPKey.__name__ + ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE + def __init__(self, soledad, gpgbinary=None): """ Initialize the OpenPGP wrapper. @@ -272,16 +294,18 @@ class OpenPGPScheme(EncryptionScheme): leap_assert( len(key['uids']) is 1, # with just one uid! 'Wrong number of uids for key: %d.' % len(key['uids'])) - leap_assert( - re.match('.*<%s>$' % address, key['uids'][0]) is not None, - 'Key not correctly bound to address.') + uid_match = False + for uid in key['uids']: + if re.match('.*<%s>$' % address, uid) is not None: + uid_match = True + return + leap_assert(uid_match, 'Key not correctly bound to address.') # insert both public and private keys in storage for secret in [True, False]: key = gpg.list_keys(secret=secret).pop() openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'], secret=secret)) - self.put_key(openpgp_key) + key, gpg.export_keys(key['fingerprint'], secret=secret)) + self.put_key(openpgp_key, address) return self.get_key(address, private=True) @@ -298,15 +322,15 @@ class OpenPGPScheme(EncryptionScheme): :rtype: OpenPGPKey @raise KeyNotFound: If the key was not found on local storage. """ - # Remove the identity suffix after the '+' until the '@' - # e.g.: test_user+something@provider.com becomes test_user@provider.com - # since the key belongs to the identity without the '+' suffix. - address = re.sub(r'\+.*\@', '@', address) + address = _parse_address(address) doc = self._get_key_doc(address, private) if doc is None: raise errors.KeyNotFound(address) - return build_key_from_dict(OpenPGPKey, address, doc.content) + leap_assert( + address in doc.content[KEY_ADDRESS_KEY], + 'Wrong address in key data.') + return build_key_from_dict(OpenPGPKey, doc.content) def parse_ascii_key(self, key_data): """ @@ -323,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(key_data, (str, unicode)) # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - mail_regex = '.*<([\w.-]+@[\w.-]+)>.*' with self._temporary_gpgwrapper() as gpg: # TODO: inspect result, or use decorator @@ -340,42 +363,31 @@ class OpenPGPScheme(EncryptionScheme): except IndexError: return (None, None) - # extract adress from first uid on key - match = re.match(mail_regex, pubkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - address = match.group(1) - openpgp_privkey = None if privkey is not None: - match = re.match(mail_regex, privkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - privaddress = match.group(1) - # build private key openpgp_privkey = _build_key_from_gpg( - privaddress, privkey, + privkey, gpg.export_keys(privkey['fingerprint'], secret=True)) - - leap_check(address == privaddress, - 'Addresses in public and private key differ.', - errors.KeyAddressMismatch) leap_check(pubkey['fingerprint'] == privkey['fingerprint'], 'Fingerprints for public and private key differ.', errors.KeyFingerprintMismatch) # build public key openpgp_pubkey = _build_key_from_gpg( - address, pubkey, + pubkey, gpg.export_keys(pubkey['fingerprint'], secret=False)) return (openpgp_pubkey, openpgp_privkey) - def put_ascii_key(self, key_data): + def put_ascii_key(self, key_data, address): """ Put key contained in ascii-armored C{key_data} in local storage. :param key_data: The key data to be stored. :type key_data: str or unicode + :param address: address for which this key will be active + :type address: str """ leap_assert_type(key_data, (str, unicode)) @@ -386,21 +398,35 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(False, repr(e)) if openpgp_pubkey is not None: - self.put_key(openpgp_pubkey) + self.put_key(openpgp_pubkey, address) if openpgp_privkey is not None: - self.put_key(openpgp_privkey) + self.put_key(openpgp_privkey, address) - def put_key(self, key): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored. :type key: OpenPGPKey + :param address: address for which this key will be active. + :type address: str """ - doc = self._get_key_doc(key.address, private=key.private) - if doc is None: - self._soledad.create_doc_from_json(key.get_json()) - else: + self._put_key_doc(key) + self._put_active_doc(key, address) + + def _put_key_doc(self, key): + """ + Put key document in soledad + + :type key: OpenPGPKey + """ + docs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + if len(docs) != 0: + doc = docs.pop() if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]: # in case of an update of the key merge them with gnupg with self._temporary_gpgwrapper() as gpg: @@ -408,11 +434,43 @@ class OpenPGPScheme(EncryptionScheme): gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() key = _build_key_from_gpg( - key.address, gpgkey, + gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) - doc.set_json(key.get_json()) + doc.set_json(key.get_json()) + self._soledad.put_doc(doc) + else: + logger.critical( + "Can't put a key whith the same key_id and different " + "fingerprint: %s, %s" + % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY])) + else: + self._soledad.create_doc_from_json(key.get_json()) + + def _put_active_doc(self, key, address): + """ + Put active key document in soledad + + :type key: OpenPGPKey + :type addresses: str + """ + docs = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, + address, + '1' if key.private else '0') + if len(docs) == 1: + doc = docs.pop() + doc.set_json(key.get_active_json(address)) self._soledad.put_doc(doc) + else: + if len(docs) > 1: + logger.error("There is more than one active key document " + "for the address %s" % (address,)) + for doc in docs: + self._soledad.delete_doc(doc) + self._soledad.create_doc_from_json( + key.get_active_json(address)) def _get_key_doc(self, address, private=False): """ @@ -427,17 +485,26 @@ class OpenPGPScheme(EncryptionScheme): :return: The document with the key or None if it does not exist. :rtype: leap.soledad.document.SoledadDocument """ - doclist = self._soledad.get_from_index( - TAGS_ADDRESS_PRIVATE_INDEX, - KEYMANAGER_KEY_TAG, + activedoc = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, address, '1' if private else '0') - if len(doclist) is 0: + if len(activedoc) is 0: return None leap_assert( + len(activedoc) is 1, + 'Found more than one key for address %s!' % (address,)) + + key_id = activedoc[0].content[KEY_ID_KEY] + doclist = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key_id, + '1' if private else '0') + leap_assert( len(doclist) is 1, - 'Found more than one %s key for address!' % - 'private' if private else 'public') + 'There is %d keys for id %s!' % (len(doclist), key_id)) return doclist.pop() def delete_key(self, key): @@ -446,18 +513,37 @@ class OpenPGPScheme(EncryptionScheme): May raise: errors.KeyNotFound - errors.KeyAttributesDiffer :param key: The key to be removed. :type key: EncryptionKey """ leap_assert_type(key, OpenPGPKey) - stored_key = self.get_key(key.address, private=key.private) - if stored_key is None: + activedocs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.ACTIVE_TYPE, + key.key_id, + '1' if key.private else '0') + for doc in activedocs: + self._soledad.delete_doc(doc) + + docs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + if len(docs) == 0: + raise errors.KeyNotFound(key) + if len(docs) > 1: + logger.critical("There is more than one key for key_id %s" + % key.key_id) + + doc = None + for d in docs: + if d.content['fingerprint'] == key.fingerprint: + doc = d + break + if doc is None: raise errors.KeyNotFound(key) - if stored_key.__dict__ != key.__dict__: - raise errors.KeyAttributesDiffer(key) - doc = self._get_key_doc(key.address, key.private) self._soledad.delete_doc(doc) # @@ -656,8 +742,8 @@ class OpenPGPScheme(EncryptionScheme): verified against this detached signature. :type detached_sig: str - :return: The ascii-armored signed data. - :rtype: str + :return: signature matches + :rtype: bool """ leap_assert_type(pubkey, OpenPGPKey) leap_assert(pubkey.private is False) diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 6a877bcf..6aeb67a8 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -21,6 +21,7 @@ Tests for the Key Manager. """ +from datetime import datetime from mock import Mock from leap.common.testing.basetest import BaseLeapTest from leap.keymanager import ( @@ -75,18 +76,20 @@ class KeyManagerUtilTestCase(BaseLeapTest): def test_build_key_from_dict(self): kdict = { - 'address': ADDRESS, - 'key_id': 'key_id', - 'fingerprint': 'fingerprint', - 'key_data': 'key_data', - 'private': 'private', - 'length': 'length', - 'expiry_date': '', - 'first_seen_at': 'first_seen_at', - 'last_audited_at': 'last_audited_at', + 'address': [ADDRESS], + 'key_id': KEY_FINGERPRINT[-16:], + 'fingerprint': KEY_FINGERPRINT, + 'key_data': PUBLIC_KEY, + 'private': False, + 'length': 4096, + 'expiry_date': 0, + 'last_audited_at': 0, + 'refreshed_at': 1311239602, 'validation': str(ValidationLevel.Weak_Chain), + 'encr_used': False, + 'sign_used': True, } - key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict) + key = build_key_from_dict(OpenPGPKey, kdict) self.assertEqual( kdict['address'], key.address, 'Wrong data in key.') @@ -109,14 +112,20 @@ class KeyManagerUtilTestCase(BaseLeapTest): None, key.expiry_date, 'Wrong data in key.') self.assertEqual( - kdict['first_seen_at'], key.first_seen_at, + None, key.last_audited_at, 'Wrong data in key.') self.assertEqual( - kdict['last_audited_at'], key.last_audited_at, + datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at, 'Wrong data in key.') self.assertEqual( toValidationLevel(kdict['validation']), key.validation, 'Wrong data in key.') + self.assertEqual( + kdict['encr_used'], key.encr_used, + 'Wrong data in key.') + self.assertEqual( + kdict['sign_used'], key.sign_used, + 'Wrong data in key.') class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -127,15 +136,15 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): key = pgp.gen_key('user@leap.se') self.assertIsInstance(key, openpgp.OpenPGPKey) self.assertEqual( - 'user@leap.se', key.address, 'Wrong address bound to key.') + ['user@leap.se'], key.address, 'Wrong address bound to key.') self.assertEqual( - '4096', key.length, 'Wrong key length.') + 4096, key.length, 'Wrong key length.') def test_openpgp_put_delete_key(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) key = pgp.get_key(ADDRESS, private=False) pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -144,13 +153,13 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) key = pgp.get_key(ADDRESS, private=False) self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertTrue( + ADDRESS in key.address, 'Wrong address bound to key.') self.assertEqual( - ADDRESS, key.address, 'Wrong address bound to key.') - self.assertEqual( - '4096', key.length, 'Wrong key length.') + 4096, key.length, 'Wrong key length.') pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -158,11 +167,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) key = pgp.get_key(ADDRESS, private=False) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) self.assertFalse(key.private) self.assertEqual(KEY_FINGERPRINT, key.fingerprint) pgp.delete_key(key) @@ -172,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): # encrypt pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) pubkey = pgp.get_key(ADDRESS, private=False) cyphertext = pgp.encrypt('data', pubkey) # assert @@ -184,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): # decrypt self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) privkey = pgp.get_key(ADDRESS, private=True) pgp.delete_key(pubkey) pgp.delete_key(privkey) @@ -207,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_with_public_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) data = 'data' pubkey = pgp.get_key(ADDRESS, private=False) self.assertRaises( @@ -217,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_verify_with_wrong_key_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signed = pgp.sign(data, privkey) - pgp.put_ascii_key(PUBLIC_KEY_2) + pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, @@ -230,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_encrypt_sign_with_public_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) @@ -241,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_verify_with_private_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) @@ -255,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_verify_with_wrong_key_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) - pgp.put_ascii_key(PUBLIC_KEY_2) + pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, @@ -269,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_verify(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signed = pgp.sign(data, privkey, detach=False) @@ -279,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_encrypt_sign_decrypt_verify(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) pubkey = pgp.get_key(ADDRESS, private=False) privkey = pgp.get_key(ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY_2) + pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2) pubkey2 = pgp.get_key(ADDRESS_2, private=False) privkey2 = pgp.get_key(ADDRESS_2, private=True) data = 'data' @@ -295,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_verify_detached_sig(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signature = pgp.sign(data, privkey, detach=True) @@ -307,38 +316,38 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_all_keys_in_db(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public keys keys = km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') - self.assertEqual(ADDRESS, keys[0].address) + self.assertTrue(ADDRESS in keys[0].address) self.assertFalse(keys[0].private) # get private keys keys = km.get_all_keys(True) self.assertEqual(len(keys), 1, 'Wrong number of keys') - self.assertEqual(ADDRESS, keys[0].address) + self.assertTrue(ADDRESS in keys[0].address) self.assertTrue(keys[0].private) def test_get_public_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key key = km.get_key(ADDRESS, OpenPGPKey, private=False, fetch_remote=False) self.assertTrue(key is not None) - self.assertEqual(key.address, ADDRESS) + self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertFalse(key.private) def test_get_private_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key key = km.get_key(ADDRESS, OpenPGPKey, private=True, fetch_remote=False) self.assertTrue(key is not None) - self.assertEqual(key.address, ADDRESS) + self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertTrue(key.private) @@ -355,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ token = "mytoken" km = self._key_manager(token=token) - km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS) km._fetcher.put = Mock() # the following data will be used on the send km.ca_cert_path = 'capath' @@ -430,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): # try to get key fetching from server. key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) def test_put_key_ascii(self): """ @@ -438,10 +447,10 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url='http://nickserver.domain') - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) def test_fetch_uri_ascii_key(self): """ @@ -500,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_encrypt_decrypt(self): km = self._key_manager() # put raw private key - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public key pubkey = km.get_key( ADDRESS, OpenPGPKey, private=False, fetch_remote=False) @@ -517,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_sign_verify(self): km = self._key_manager() # put raw private keys - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get private key for signing privkey = km.get_key( ADDRESS, OpenPGPKey, private=True, fetch_remote=False) diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py index 3ae873d4..400d36e8 100644 --- a/src/leap/keymanager/tests/test_validation.py +++ b/src/leap/keymanager/tests/test_validation.py @@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase): def test_none_old_key(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, KEY_FINGERPRINT) def test_cant_upgrade(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Provider_Trust) self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, - OpenPGPKey) + OpenPGPKey, ADDRESS) def test_fingerprint_level(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey, + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Fingerprint) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) def test_expired_key(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey) + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) + km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) def test_expired_fail_lower_level(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey, + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Third_Party_Endorsement) self.assertRaises( KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, OpenPGPKey, + ADDRESS, validation=ValidationLevel.Provider_Trust) def test_roll_back(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey) - km.put_raw_key(EXPIRED_KEY, OpenPGPKey) + km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) diff --git a/src/leap/keymanager/validation.py b/src/leap/keymanager/validation.py index cf5b4a83..245013e5 100644 --- a/src/leap/keymanager/validation.py +++ b/src/leap/keymanager/validation.py @@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key): if old_key is None: return True - if new_key.address != old_key.address: - # XXX how do we map multiple IDs? (#6212) - return False - # An update of the same key if new_key.fingerprint == old_key.fingerprint: return True |