From 88b2416234ed88bc7652c452f779c95f17bf210f Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Thu, 6 Nov 2014 00:47:32 -0600 Subject: Implement the new encryption-key soledad document --- keymanager/src/leap/keymanager/__init__.py | 10 ++-- keymanager/src/leap/keymanager/keys.py | 60 +++++++++++++++------- keymanager/src/leap/keymanager/openpgp.py | 22 ++++---- .../src/leap/keymanager/tests/test_keymanager.py | 55 +++++++++++--------- 4 files changed, 89 insertions(+), 58 deletions(-) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 156aaf86..53dd9a78 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -319,7 +319,7 @@ class KeyManager(object): return map( lambda doc: build_key_from_dict( self._key_class_from_type(doc.content['type']), - doc.content['address'], + doc.content['address'][0], doc.content), self._soledad.get_from_index( TAGS_PRIVATE_INDEX, @@ -529,7 +529,7 @@ class KeyManager(object): new one is not a valid update for it """ try: - old_key = self._wrapper_map[type(key)].get_key(key.address, + old_key = self._wrapper_map[type(key)].get_key(key.address[0], private=key.private) except KeyNotFound: old_key = None @@ -564,7 +564,7 @@ class KeyManager(object): new one is not a valid update for it """ pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key) - if address is not None and address != pubkey.address: + if address is not None and address not in pubkey.address: raise KeyAddressMismatch("Key UID %s, but expected %s" % (pubkey.address, address)) @@ -600,9 +600,9 @@ class KeyManager(object): pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content) if pubkey is None: raise KeyNotFound(uri) - if pubkey.address != address: + if address not in pubkey.address: raise KeyAddressMismatch("UID %s found, but expected %s" - % (pubkey.address, address)) + % (str(pubkey.address), address)) pubkey.validation = validation self.put_key(pubkey) diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index a61a8c79..4952b9b3 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -27,6 +27,7 @@ except ImportError: import json # noqa import logging import re +import time from abc import ABCMeta, abstractmethod @@ -50,9 +51,11 @@ KEY_DATA_KEY = 'key_data' KEY_PRIVATE_KEY = 'private' KEY_LENGTH_KEY = 'length' KEY_EXPIRY_DATE_KEY = 'expiry_date' -KEY_FIRST_SEEN_AT_KEY = 'first_seen_at' KEY_LAST_AUDITED_AT_KEY = 'last_audited_at' +KEY_REFRESHED_AT_KEY = 'refreshed_at' KEY_VALIDATION_KEY = 'validation' +KEY_ENCR_USED_KEY = 'encr_used' +KEY_SIGN_USED_KEY = 'sign_used' KEY_TAGS_KEY = 'tags' @@ -110,7 +113,7 @@ def build_key_from_dict(kClass, address, kdict): :rtype: C{kClass} """ leap_assert( - address == kdict[KEY_ADDRESS_KEY], + address in kdict[KEY_ADDRESS_KEY], 'Wrong address in key data.') try: validation = toValidationLevel(kdict[KEY_VALIDATION_KEY]) @@ -119,24 +122,40 @@ def build_key_from_dict(kClass, address, kdict): (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY])) validation = ValidationLevel.Weak_Chain - expiry_date = None - if kdict[KEY_EXPIRY_DATE_KEY]: - expiry_date = datetime.fromtimestamp(int(kdict[KEY_EXPIRY_DATE_KEY])) + expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY]) + last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY]) + refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY]) return kClass( - address, + [address], key_id=kdict[KEY_ID_KEY], fingerprint=kdict[KEY_FINGERPRINT_KEY], key_data=kdict[KEY_DATA_KEY], private=kdict[KEY_PRIVATE_KEY], length=kdict[KEY_LENGTH_KEY], expiry_date=expiry_date, - first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY], - last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY], + last_audited_at=last_audited_at, + refreshed_at=refreshed_at, validation=validation, + encr_used=kdict[KEY_ENCR_USED_KEY], + sign_used=kdict[KEY_SIGN_USED_KEY], ) +def _to_datetime(unix_time): + if unix_time != 0: + return datetime.fromtimestamp(unix_time) + else: + return None + + +def _to_unix_time(date): + if date is not None: + return int(time.mktime(date.timetuple())) + else: + return 0 + + # # Abstraction for encryption keys # @@ -151,9 +170,10 @@ class EncryptionKey(object): __metaclass__ = ABCMeta - def __init__(self, address, key_id=None, fingerprint=None, - key_data=None, private=None, length=None, expiry_date=None, - validation=None, first_seen_at=None, last_audited_at=None): + def __init__(self, address, key_id="", fingerprint="", + key_data="", private=False, length=0, expiry_date=None, + validation=ValidationLevel.Weak_Chain, last_audited_at=None, + refreshed_at=None, encr_used=False, sign_used=False): self.address = address self.key_id = key_id self.fingerprint = fingerprint @@ -162,8 +182,10 @@ class EncryptionKey(object): self.length = length self.expiry_date = expiry_date self.validation = validation - self.first_seen_at = first_seen_at self.last_audited_at = last_audited_at + self.refreshed_at = refreshed_at + self.encr_used = encr_used + self.sign_used = sign_used def get_json(self): """ @@ -172,9 +194,9 @@ class EncryptionKey(object): :return: The JSON string describing this key. :rtype: str """ - expiry_str = "" - if self.expiry_date is not None: - expiry_str = self.expiry_date.strftime("%s") + expiry_date = _to_unix_time(self.expiry_date) + last_audited_at = _to_unix_time(self.last_audited_at) + refreshed_at = _to_unix_time(self.refreshed_at) return json.dumps({ KEY_ADDRESS_KEY: self.address, @@ -184,10 +206,12 @@ class EncryptionKey(object): KEY_DATA_KEY: self.key_data, KEY_PRIVATE_KEY: self.private, KEY_LENGTH_KEY: self.length, - KEY_EXPIRY_DATE_KEY: expiry_str, + KEY_EXPIRY_DATE_KEY: expiry_date, + KEY_LAST_AUDITED_AT_KEY: last_audited_at, + KEY_REFRESHED_AT_KEY: refreshed_at, KEY_VALIDATION_KEY: str(self.validation), - KEY_FIRST_SEEN_AT_KEY: self.first_seen_at, - KEY_LAST_AUDITED_AT_KEY: self.last_audited_at, + KEY_ENCR_USED_KEY: self.encr_used, + KEY_SIGN_USED_KEY: self.sign_used, KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG], }) diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index d3c305e2..1160434c 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -41,7 +41,6 @@ from leap.keymanager.keys import ( KEY_FINGERPRINT_KEY, KEY_DATA_KEY, ) -from leap.keymanager.validation import ValidationLevel logger = logging.getLogger(__name__) @@ -109,9 +108,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address, privkeys) + privaddrs = map(lambda privkey: privkey.address[0], privkeys) publkeys = filter( - lambda pubkey: pubkey.address not in privaddrs, publkeys) + lambda pubkey: pubkey.address[0] not in privaddrs, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -184,14 +183,14 @@ def _build_key_from_gpg(address, key, key_data): expiry_date = datetime.fromtimestamp(int(key['expires'])) return OpenPGPKey( - address, + [address], key_id=key['keyid'], fingerprint=key['fingerprint'], key_data=key_data, private=True if key['type'] == 'sec' else False, - length=key['length'], + length=int(key['length']), expiry_date=expiry_date, - validation=ValidationLevel.Weak_Chain, + refreshed_at=datetime.now(), ) @@ -397,7 +396,7 @@ class OpenPGPScheme(EncryptionScheme): :param key: The key to be stored. :type key: OpenPGPKey """ - doc = self._get_key_doc(key.address, private=key.private) + doc = self._get_key_doc(key.address[0], private=key.private) if doc is None: self._soledad.create_doc_from_json(key.get_json()) else: @@ -408,7 +407,7 @@ class OpenPGPScheme(EncryptionScheme): gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() key = _build_key_from_gpg( - key.address, gpgkey, + key.address[0], gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) doc.set_json(key.get_json()) @@ -452,12 +451,11 @@ class OpenPGPScheme(EncryptionScheme): :type key: EncryptionKey """ leap_assert_type(key, OpenPGPKey) - stored_key = self.get_key(key.address, private=key.private) - if stored_key is None: + doc = self._get_key_doc(key.address[0], key.private) + if doc is None: raise errors.KeyNotFound(key) - if stored_key.__dict__ != key.__dict__: + if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint: raise errors.KeyAttributesDiffer(key) - doc = self._get_key_doc(key.address, key.private) self._soledad.delete_doc(doc) # diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 6a877bcf..4daf3465 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -21,6 +21,7 @@ Tests for the Key Manager. """ +from datetime import datetime from mock import Mock from leap.common.testing.basetest import BaseLeapTest from leap.keymanager import ( @@ -75,16 +76,18 @@ class KeyManagerUtilTestCase(BaseLeapTest): def test_build_key_from_dict(self): kdict = { - 'address': ADDRESS, - 'key_id': 'key_id', - 'fingerprint': 'fingerprint', - 'key_data': 'key_data', - 'private': 'private', - 'length': 'length', - 'expiry_date': '', - 'first_seen_at': 'first_seen_at', - 'last_audited_at': 'last_audited_at', + 'address': [ADDRESS], + 'key_id': KEY_FINGERPRINT[-16:], + 'fingerprint': KEY_FINGERPRINT, + 'key_data': PUBLIC_KEY, + 'private': False, + 'length': 4096, + 'expiry_date': 0, + 'last_audited_at': 0, + 'refreshed_at': 1311239602, 'validation': str(ValidationLevel.Weak_Chain), + 'encr_used': False, + 'sign_used': True, } key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict) self.assertEqual( @@ -109,14 +112,20 @@ class KeyManagerUtilTestCase(BaseLeapTest): None, key.expiry_date, 'Wrong data in key.') self.assertEqual( - kdict['first_seen_at'], key.first_seen_at, + None, key.last_audited_at, 'Wrong data in key.') self.assertEqual( - kdict['last_audited_at'], key.last_audited_at, + datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at, 'Wrong data in key.') self.assertEqual( toValidationLevel(kdict['validation']), key.validation, 'Wrong data in key.') + self.assertEqual( + kdict['encr_used'], key.encr_used, + 'Wrong data in key.') + self.assertEqual( + kdict['sign_used'], key.sign_used, + 'Wrong data in key.') class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -127,9 +136,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): key = pgp.gen_key('user@leap.se') self.assertIsInstance(key, openpgp.OpenPGPKey) self.assertEqual( - 'user@leap.se', key.address, 'Wrong address bound to key.') + ['user@leap.se'], key.address, 'Wrong address bound to key.') self.assertEqual( - '4096', key.length, 'Wrong key length.') + 4096, key.length, 'Wrong key length.') def test_openpgp_put_delete_key(self): pgp = openpgp.OpenPGPScheme( @@ -147,10 +156,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp.put_ascii_key(PUBLIC_KEY) key = pgp.get_key(ADDRESS, private=False) self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertTrue( + ADDRESS in key.address, 'Wrong address bound to key.') self.assertEqual( - ADDRESS, key.address, 'Wrong address bound to key.') - self.assertEqual( - '4096', key.length, 'Wrong key length.') + 4096, key.length, 'Wrong key length.') pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -162,7 +171,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) key = pgp.get_key(ADDRESS, private=False) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) self.assertFalse(key.private) self.assertEqual(KEY_FINGERPRINT, key.fingerprint) pgp.delete_key(key) @@ -311,12 +320,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): # get public keys keys = km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') - self.assertEqual(ADDRESS, keys[0].address) + self.assertTrue(ADDRESS in keys[0].address) self.assertFalse(keys[0].private) # get private keys keys = km.get_all_keys(True) self.assertEqual(len(keys), 1, 'Wrong number of keys') - self.assertEqual(ADDRESS, keys[0].address) + self.assertTrue(ADDRESS in keys[0].address) self.assertTrue(keys[0].private) def test_get_public_key(self): @@ -326,7 +335,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): key = km.get_key(ADDRESS, OpenPGPKey, private=False, fetch_remote=False) self.assertTrue(key is not None) - self.assertEqual(key.address, ADDRESS) + self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertFalse(key.private) @@ -338,7 +347,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): key = km.get_key(ADDRESS, OpenPGPKey, private=True, fetch_remote=False) self.assertTrue(key is not None) - self.assertEqual(key.address, ADDRESS) + self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertTrue(key.private) @@ -430,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): # try to get key fetching from server. key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) def test_put_key_ascii(self): """ @@ -441,7 +450,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km.put_raw_key(PUBLIC_KEY, OpenPGPKey) key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) def test_fetch_uri_ascii_key(self): """ -- cgit v1.2.3 From 5d5f60afb1878aa65156fa294cc97a12bfe497d3 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Mon, 10 Nov 2014 09:50:56 -0600 Subject: Use type instead of tags to get docs in openpgp For that that now the type is the class.__name__ instead of str(class) --- keymanager/src/leap/keymanager/__init__.py | 2 +- keymanager/src/leap/keymanager/keys.py | 8 ++++---- keymanager/src/leap/keymanager/openpgp.py | 10 ++++++---- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 53dd9a78..0ffb6fc7 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -134,7 +134,7 @@ class KeyManager(object): Return key class from string representation of key type. """ return filter( - lambda klass: str(klass) == ktype, + lambda klass: klass.__name__ == ktype, self._wrapper_map).pop() def _get(self, uri, data=None): diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index 4952b9b3..5aeb7945 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -71,14 +71,14 @@ KEYMANAGER_KEY_TAG = 'keymanager-key' # TAGS_PRIVATE_INDEX = 'by-tags-private' -TAGS_ADDRESS_PRIVATE_INDEX = 'by-tags-address-private' +TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private' INDEXES = { TAGS_PRIVATE_INDEX: [ KEY_TAGS_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ], - TAGS_ADDRESS_PRIVATE_INDEX: [ - KEY_TAGS_KEY, + TYPE_ADDRESS_PRIVATE_INDEX: [ + KEY_TYPE_KEY, KEY_ADDRESS_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ] @@ -200,7 +200,7 @@ class EncryptionKey(object): return json.dumps({ KEY_ADDRESS_KEY: self.address, - KEY_TYPE_KEY: str(self.__class__), + KEY_TYPE_KEY: self.__class__.__name__, KEY_ID_KEY: self.key_id, KEY_FINGERPRINT_KEY: self.fingerprint, KEY_DATA_KEY: self.key_data, diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 1160434c..38db178c 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -36,8 +36,7 @@ from leap.keymanager.keys import ( EncryptionScheme, is_address, build_key_from_dict, - KEYMANAGER_KEY_TAG, - TAGS_ADDRESS_PRIVATE_INDEX, + TYPE_ADDRESS_PRIVATE_INDEX, KEY_FINGERPRINT_KEY, KEY_DATA_KEY, ) @@ -210,6 +209,9 @@ class OpenPGPScheme(EncryptionScheme): signing and verification). """ + # type used on the soledad documents + OPENPGP_KEY_TYPE = OpenPGPKey.__name__ + def __init__(self, soledad, gpgbinary=None): """ Initialize the OpenPGP wrapper. @@ -427,8 +429,8 @@ class OpenPGPScheme(EncryptionScheme): :rtype: leap.soledad.document.SoledadDocument """ doclist = self._soledad.get_from_index( - TAGS_ADDRESS_PRIVATE_INDEX, - KEYMANAGER_KEY_TAG, + TYPE_ADDRESS_PRIVATE_INDEX, + self.OPENPGP_KEY_TYPE, address, '1' if private else '0') if len(doclist) is 0: -- cgit v1.2.3 From 4ec8d345a32b9169300974941c5bdd87d85f3915 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Mon, 10 Nov 2014 13:36:35 -0600 Subject: Implement active key document --- keymanager/changes/feature-6299_new_doc | 2 + keymanager/src/leap/keymanager/__init__.py | 25 ++--- keymanager/src/leap/keymanager/keys.py | 25 +++++ keymanager/src/leap/keymanager/openpgp.py | 146 ++++++++++++++++++++++++----- 4 files changed, 163 insertions(+), 35 deletions(-) create mode 100644 keymanager/changes/feature-6299_new_doc diff --git a/keymanager/changes/feature-6299_new_doc b/keymanager/changes/feature-6299_new_doc new file mode 100644 index 00000000..c8933386 --- /dev/null +++ b/keymanager/changes/feature-6299_new_doc @@ -0,0 +1,2 @@ +- new soledad doc struct for encryption-keys (closes #6299) +- keep old key after upgrade (closes #6262) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 0ffb6fc7..562bfbfa 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -519,15 +519,24 @@ class KeyManager(object): except IndexError as e: leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) - def put_key(self, key): + def put_key(self, key, address=None): """ Put C{key} in local storage. :param key: The key to be stored :type key: EncryptionKey + :param address: address for which this key will be active. If not set + all the uids will be activated + :type address: str + + :raises KeyAddressMismatch: if address doesn't match any uid on the key :raises KeyNotValidUpdate: if a key with the same uid exists and the new one is not a valid update for it """ + if address is not None and address not in key.address: + raise KeyAddressMismatch("UID %s found, but expected %s" + % (str(key.address), address)) + try: old_key = self._wrapper_map[type(key)].get_key(key.address[0], private=key.private) @@ -536,7 +545,7 @@ class KeyManager(object): if key.private or can_upgrade(key, old_key): try: - self._wrapper_map[type(key)].put_key(key) + self._wrapper_map[type(key)].put_key(key, address) except IndexError as e: leap_assert( False, "Unsupported key type. Error {0!r}".format(e)) @@ -553,7 +562,7 @@ class KeyManager(object): :type key: str :param ktype: the type of the key. :type ktype: subclass of EncryptionKey - :param address: if set used to check that the key is for this address + :param address: address for which this key will be active :type address: str :param validation: validation level for this key (default: 'Weak_Chain') @@ -564,12 +573,9 @@ class KeyManager(object): new one is not a valid update for it """ pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key) - if address is not None and address not in pubkey.address: - raise KeyAddressMismatch("Key UID %s, but expected %s" - % (pubkey.address, address)) pubkey.validation = validation - self.put_key(pubkey) + self.put_key(pubkey, address) def fetch_key(self, address, uri, ktype, validation=ValidationLevel.Weak_Chain): @@ -600,12 +606,9 @@ class KeyManager(object): pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content) if pubkey is None: raise KeyNotFound(uri) - if address not in pubkey.address: - raise KeyAddressMismatch("UID %s found, but expected %s" - % (str(pubkey.address), address)) pubkey.validation = validation - self.put_key(pubkey) + self.put_key(pubkey, address) from ._version import get_versions __version__ = get_versions()['version'] diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index 5aeb7945..d1b5d933 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -64,6 +64,8 @@ KEY_TAGS_KEY = 'tags' # KEYMANAGER_KEY_TAG = 'keymanager-key' +KEYMANAGER_ACTIVE_TAG = 'keymanager-active' +KEYMANAGER_ACTIVE_TYPE = '-active' # @@ -71,12 +73,18 @@ KEYMANAGER_KEY_TAG = 'keymanager-key' # TAGS_PRIVATE_INDEX = 'by-tags-private' +TYPE_ID_PRIVATE_INDEX = 'by-type-id-private' TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private' INDEXES = { TAGS_PRIVATE_INDEX: [ KEY_TAGS_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ], + TYPE_ID_PRIVATE_INDEX: [ + KEY_TYPE_KEY, + KEY_ID_KEY, + 'bool(%s)' % KEY_PRIVATE_KEY, + ], TYPE_ADDRESS_PRIVATE_INDEX: [ KEY_TYPE_KEY, KEY_ADDRESS_KEY, @@ -215,6 +223,23 @@ class EncryptionKey(object): KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG], }) + def get_active_json(self, address): + """ + Return a JSON string describing this key. + + :param address: Address for wich the key is active + :type address: str + :return: The JSON string describing this key. + :rtype: str + """ + return json.dumps({ + KEY_ADDRESS_KEY: address, + KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE, + KEY_ID_KEY: self.key_id, + KEY_PRIVATE_KEY: self.private, + KEY_TAGS_KEY: [KEYMANAGER_ACTIVE_TAG], + }) + def __repr__(self): """ Representation of this class diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 38db178c..52655d0e 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -36,9 +36,12 @@ from leap.keymanager.keys import ( EncryptionScheme, is_address, build_key_from_dict, + TYPE_ID_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, KEY_FINGERPRINT_KEY, KEY_DATA_KEY, + KEY_ID_KEY, + KEYMANAGER_ACTIVE_TYPE, ) @@ -193,6 +196,18 @@ def _build_key_from_gpg(address, key, key_data): ) +def _parse_address(address): + """ + Remove the identity suffix after the '+' until the '@' + e.g.: test_user+something@provider.com becomes test_user@provider.com + since the key belongs to the identity without the '+' suffix. + + :type address: str + :rtype: str + """ + return re.sub(r'\+.*\@', '@', address) + + # # The OpenPGP wrapper # @@ -210,7 +225,8 @@ class OpenPGPScheme(EncryptionScheme): """ # type used on the soledad documents - OPENPGP_KEY_TYPE = OpenPGPKey.__name__ + KEY_TYPE = OpenPGPKey.__name__ + ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE def __init__(self, soledad, gpgbinary=None): """ @@ -282,7 +298,7 @@ class OpenPGPScheme(EncryptionScheme): openpgp_key = _build_key_from_gpg( address, key, gpg.export_keys(key['fingerprint'], secret=secret)) - self.put_key(openpgp_key) + self.put_key(openpgp_key, address) return self.get_key(address, private=True) @@ -299,10 +315,7 @@ class OpenPGPScheme(EncryptionScheme): :rtype: OpenPGPKey @raise KeyNotFound: If the key was not found on local storage. """ - # Remove the identity suffix after the '+' until the '@' - # e.g.: test_user+something@provider.com becomes test_user@provider.com - # since the key belongs to the identity without the '+' suffix. - address = re.sub(r'\+.*\@', '@', address) + address = _parse_address(address) doc = self._get_key_doc(address, private) if doc is None: @@ -371,12 +384,15 @@ class OpenPGPScheme(EncryptionScheme): return (openpgp_pubkey, openpgp_privkey) - def put_ascii_key(self, key_data): + def put_ascii_key(self, key_data, address=None): """ Put key contained in ascii-armored C{key_data} in local storage. :param key_data: The key data to be stored. :type key_data: str or unicode + :param address: address for which this key will be active. If not set + all the uids will be activated + :type address: str """ leap_assert_type(key_data, (str, unicode)) @@ -387,21 +403,41 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(False, repr(e)) if openpgp_pubkey is not None: - self.put_key(openpgp_pubkey) + self.put_key(openpgp_pubkey, address) if openpgp_privkey is not None: - self.put_key(openpgp_privkey) + self.put_key(openpgp_privkey, address) - def put_key(self, key): + def put_key(self, key, address=None): """ Put C{key} in local storage. :param key: The key to be stored. :type key: OpenPGPKey + :param address: address for which this key will be active. If not set + all the uids will be activated + :type address: str """ - doc = self._get_key_doc(key.address[0], private=key.private) - if doc is None: - self._soledad.create_doc_from_json(key.get_json()) + if address is not None: + active_address = [_parse_address(address)] else: + active_address = key.address + + self._put_key_doc(key) + self._put_active_doc(key, active_address) + + def _put_key_doc(self, key): + """ + Put key document in soledad + + :type key: OpenPGPKey + """ + docs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + if len(docs) != 0: + doc = docs.pop() if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]: # in case of an update of the key merge them with gnupg with self._temporary_gpgwrapper() as gpg: @@ -412,8 +448,41 @@ class OpenPGPScheme(EncryptionScheme): key.address[0], gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) - doc.set_json(key.get_json()) - self._soledad.put_doc(doc) + doc.set_json(key.get_json()) + self._soledad.put_doc(doc) + else: + logger.critical( + "Can't put a key whith the same key_id and different " + "fingerprint: %s, %s" + % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY])) + else: + self._soledad.create_doc_from_json(key.get_json()) + + def _put_active_doc(self, key, addresses): + """ + Put active key document in soledad + + :type key: OpenPGPKey + :type addresses: list(str) + """ + for address in addresses: + docs = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, + address, + '1' if key.private else '0') + if len(docs) == 1: + doc = docs.pop() + doc.set_json(key.get_active_json(address)) + self._soledad.put_doc(doc) + else: + if len(docs) > 1: + logger.error("There is more than one active key document " + "for the address %s" % (address,)) + for doc in docs: + self._soledad.delete_doc(doc) + self._soledad.create_doc_from_json( + key.get_active_json(address)) def _get_key_doc(self, address, private=False): """ @@ -428,17 +497,26 @@ class OpenPGPScheme(EncryptionScheme): :return: The document with the key or None if it does not exist. :rtype: leap.soledad.document.SoledadDocument """ - doclist = self._soledad.get_from_index( + activedoc = self._soledad.get_from_index( TYPE_ADDRESS_PRIVATE_INDEX, - self.OPENPGP_KEY_TYPE, + self.ACTIVE_TYPE, address, '1' if private else '0') - if len(doclist) is 0: + if len(activedoc) is 0: return None + leap_assert( + len(activedoc) is 1, + 'Found more than one key for address %s!' % (address,)) + + key_id = activedoc[0].content[KEY_ID_KEY] + doclist = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key_id, + '1' if private else '0') leap_assert( len(doclist) is 1, - 'Found more than one %s key for address!' % - 'private' if private else 'public') + 'There is %d keys for id %s!' % (len(doclist), key_id)) return doclist.pop() def delete_key(self, key): @@ -447,17 +525,37 @@ class OpenPGPScheme(EncryptionScheme): May raise: errors.KeyNotFound - errors.KeyAttributesDiffer :param key: The key to be removed. :type key: EncryptionKey """ leap_assert_type(key, OpenPGPKey) - doc = self._get_key_doc(key.address[0], key.private) + activedocs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.ACTIVE_TYPE, + key.key_id, + '1' if key.private else '0') + for doc in activedocs: + self._soledad.delete_doc(doc) + + docs = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + if len(docs) == 0: + raise errors.KeyNotFound(key) + if len(docs) > 1: + logger.critical("There is more than one key for key_id %s" + % key.key_id) + + doc = None + for d in docs: + if d.content['fingerprint'] == key.fingerprint: + doc = d + break if doc is None: raise errors.KeyNotFound(key) - if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint: - raise errors.KeyAttributesDiffer(key) self._soledad.delete_doc(doc) # -- cgit v1.2.3 From 1d1274e57f5e466fb195c829a628f519ef51186a Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Mon, 10 Nov 2014 18:36:59 -0600 Subject: Implement multi uid support --- keymanager/changes/feature-6212_multi_uid_support | 1 + keymanager/src/leap/keymanager/__init__.py | 12 +-- keymanager/src/leap/keymanager/keys.py | 15 ++- keymanager/src/leap/keymanager/openpgp.py | 120 ++++++++++----------- .../src/leap/keymanager/tests/test_keymanager.py | 48 ++++----- .../src/leap/keymanager/tests/test_validation.py | 21 ++-- keymanager/src/leap/keymanager/validation.py | 4 - 7 files changed, 101 insertions(+), 120 deletions(-) create mode 100644 keymanager/changes/feature-6212_multi_uid_support diff --git a/keymanager/changes/feature-6212_multi_uid_support b/keymanager/changes/feature-6212_multi_uid_support new file mode 100644 index 00000000..495b8c48 --- /dev/null +++ b/keymanager/changes/feature-6212_multi_uid_support @@ -0,0 +1 @@ +- Multi uid support (closes #6212) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 562bfbfa..c64cdeab 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -319,7 +319,6 @@ class KeyManager(object): return map( lambda doc: build_key_from_dict( self._key_class_from_type(doc.content['type']), - doc.content['address'][0], doc.content), self._soledad.get_from_index( TAGS_PRIVATE_INDEX, @@ -519,26 +518,25 @@ class KeyManager(object): except IndexError as e: leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) - def put_key(self, key, address=None): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored :type key: EncryptionKey - :param address: address for which this key will be active. If not set - all the uids will be activated + :param address: address for which this key will be active :type address: str :raises KeyAddressMismatch: if address doesn't match any uid on the key :raises KeyNotValidUpdate: if a key with the same uid exists and the new one is not a valid update for it """ - if address is not None and address not in key.address: + if address not in key.address: raise KeyAddressMismatch("UID %s found, but expected %s" % (str(key.address), address)) try: - old_key = self._wrapper_map[type(key)].get_key(key.address[0], + old_key = self._wrapper_map[type(key)].get_key(address, private=key.private) except KeyNotFound: old_key = None @@ -553,7 +551,7 @@ class KeyManager(object): raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s" % (old_key.key_id, key.key_id)) - def put_raw_key(self, key, ktype, address=None, + def put_raw_key(self, key, ktype, address, validation=ValidationLevel.Weak_Chain): """ Put C{key} in local storage. diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index d1b5d933..2108aa26 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -109,20 +109,15 @@ def is_address(address): return bool(re.match('[\w.-]+@[\w.-]+', address)) -def build_key_from_dict(kClass, address, kdict): +def build_key_from_dict(kClass, kdict): """ - Build an C{kClass} key bound to C{address} based on info in C{kdict}. + Build an C{kClass} key based on info in C{kdict}. - :param address: The address bound to the key. - :type address: str :param kdict: Dictionary with key data. :type kdict: dict :return: An instance of the key. :rtype: C{kClass} """ - leap_assert( - address in kdict[KEY_ADDRESS_KEY], - 'Wrong address in key data.') try: validation = toValidationLevel(kdict[KEY_VALIDATION_KEY]) except ValueError: @@ -135,7 +130,7 @@ def build_key_from_dict(kClass, address, kdict): refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY]) return kClass( - [address], + kdict[KEY_ADDRESS_KEY], key_id=kdict[KEY_ID_KEY], fingerprint=kdict[KEY_FINGERPRINT_KEY], key_data=kdict[KEY_DATA_KEY], @@ -315,12 +310,14 @@ class EncryptionScheme(object): pass @abstractmethod - def put_key(self, key): + def put_key(self, key, address): """ Put a key in local storage. :param key: The key to be stored. :type key: EncryptionKey + :param address: address for which this key will be active. + :type address: str """ pass diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 52655d0e..4f965745 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -38,6 +38,7 @@ from leap.keymanager.keys import ( build_key_from_dict, TYPE_ID_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, + KEY_ADDRESS_KEY, KEY_FINGERPRINT_KEY, KEY_DATA_KEY, KEY_ID_KEY, @@ -110,9 +111,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address[0], privkeys) + privids = map(lambda privkey: privkey.key_id, privkeys) publkeys = filter( - lambda pubkey: pubkey.address[0] not in privaddrs, publkeys) + lambda pubkey: pubkey.key_id not in privids, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -163,16 +164,13 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data): """ - Build an OpenPGPKey for C{address} based on C{key} from - local gpg storage. + Build an OpenPGPKey based on C{key} from local gpg storage. ASCII armored GPG key data has to be queried independently in this wrapper, so we receive it in C{key_data}. - :param address: The address bound to the key. - :type address: str :param key: Key obtained from GPG storage. :type key: dict :param key_data: Key data obtained from GPG storage. @@ -183,9 +181,12 @@ def _build_key_from_gpg(address, key, key_data): expiry_date = None if key['expires']: expiry_date = datetime.fromtimestamp(int(key['expires'])) + address = [] + for uid in key['uids']: + address.append(_parse_address(uid)) return OpenPGPKey( - [address], + address, key_id=key['keyid'], fingerprint=key['fingerprint'], key_data=key_data, @@ -198,14 +199,18 @@ def _build_key_from_gpg(address, key, key_data): def _parse_address(address): """ - Remove the identity suffix after the '+' until the '@' + Remove name, '<', '>' and the identity suffix after the '+' until the '@' e.g.: test_user+something@provider.com becomes test_user@provider.com since the key belongs to the identity without the '+' suffix. :type address: str :rtype: str """ - return re.sub(r'\+.*\@', '@', address) + mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' + match = re.match(mail_regex, address) + if match is None: + return None + return ''.join(match.group(2, 4)) # @@ -289,15 +294,17 @@ class OpenPGPScheme(EncryptionScheme): leap_assert( len(key['uids']) is 1, # with just one uid! 'Wrong number of uids for key: %d.' % len(key['uids'])) - leap_assert( - re.match('.*<%s>$' % address, key['uids'][0]) is not None, - 'Key not correctly bound to address.') + uid_match = False + for uid in key['uids']: + if re.match('.*<%s>$' % address, uid) is not None: + uid_match = True + return + leap_assert(uid_match, 'Key not correctly bound to address.') # insert both public and private keys in storage for secret in [True, False]: key = gpg.list_keys(secret=secret).pop() openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'], secret=secret)) + key, gpg.export_keys(key['fingerprint'], secret=secret)) self.put_key(openpgp_key, address) return self.get_key(address, private=True) @@ -320,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme): doc = self._get_key_doc(address, private) if doc is None: raise errors.KeyNotFound(address) - return build_key_from_dict(OpenPGPKey, address, doc.content) + leap_assert( + address in doc.content[KEY_ADDRESS_KEY], + 'Wrong address in key data.') + return build_key_from_dict(OpenPGPKey, doc.content) def parse_ascii_key(self, key_data): """ @@ -337,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(key_data, (str, unicode)) # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - mail_regex = '.*<([\w.-]+@[\w.-]+)>.*' with self._temporary_gpgwrapper() as gpg: # TODO: inspect result, or use decorator @@ -354,44 +363,30 @@ class OpenPGPScheme(EncryptionScheme): except IndexError: return (None, None) - # extract adress from first uid on key - match = re.match(mail_regex, pubkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - address = match.group(1) - openpgp_privkey = None if privkey is not None: - match = re.match(mail_regex, privkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - privaddress = match.group(1) - # build private key openpgp_privkey = _build_key_from_gpg( - privaddress, privkey, + privkey, gpg.export_keys(privkey['fingerprint'], secret=True)) - - leap_check(address == privaddress, - 'Addresses in public and private key differ.', - errors.KeyAddressMismatch) leap_check(pubkey['fingerprint'] == privkey['fingerprint'], 'Fingerprints for public and private key differ.', errors.KeyFingerprintMismatch) # build public key openpgp_pubkey = _build_key_from_gpg( - address, pubkey, + pubkey, gpg.export_keys(pubkey['fingerprint'], secret=False)) return (openpgp_pubkey, openpgp_privkey) - def put_ascii_key(self, key_data, address=None): + def put_ascii_key(self, key_data, address): """ Put key contained in ascii-armored C{key_data} in local storage. :param key_data: The key data to be stored. :type key_data: str or unicode - :param address: address for which this key will be active. If not set - all the uids will be activated + :param address: address for which this key will be active :type address: str """ leap_assert_type(key_data, (str, unicode)) @@ -407,23 +402,17 @@ class OpenPGPScheme(EncryptionScheme): if openpgp_privkey is not None: self.put_key(openpgp_privkey, address) - def put_key(self, key, address=None): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored. :type key: OpenPGPKey - :param address: address for which this key will be active. If not set - all the uids will be activated + :param address: address for which this key will be active. :type address: str """ - if address is not None: - active_address = [_parse_address(address)] - else: - active_address = key.address - self._put_key_doc(key) - self._put_active_doc(key, active_address) + self._put_active_doc(key, address) def _put_key_doc(self, key): """ @@ -445,7 +434,7 @@ class OpenPGPScheme(EncryptionScheme): gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() key = _build_key_from_gpg( - key.address[0], gpgkey, + gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) doc.set_json(key.get_json()) @@ -458,31 +447,30 @@ class OpenPGPScheme(EncryptionScheme): else: self._soledad.create_doc_from_json(key.get_json()) - def _put_active_doc(self, key, addresses): + def _put_active_doc(self, key, address): """ Put active key document in soledad :type key: OpenPGPKey - :type addresses: list(str) - """ - for address in addresses: - docs = self._soledad.get_from_index( - TYPE_ADDRESS_PRIVATE_INDEX, - self.ACTIVE_TYPE, - address, - '1' if key.private else '0') - if len(docs) == 1: - doc = docs.pop() - doc.set_json(key.get_active_json(address)) - self._soledad.put_doc(doc) - else: - if len(docs) > 1: - logger.error("There is more than one active key document " - "for the address %s" % (address,)) - for doc in docs: - self._soledad.delete_doc(doc) - self._soledad.create_doc_from_json( - key.get_active_json(address)) + :type addresses: str + """ + docs = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, + address, + '1' if key.private else '0') + if len(docs) == 1: + doc = docs.pop() + doc.set_json(key.get_active_json(address)) + self._soledad.put_doc(doc) + else: + if len(docs) > 1: + logger.error("There is more than one active key document " + "for the address %s" % (address,)) + for doc in docs: + self._soledad.delete_doc(doc) + self._soledad.create_doc_from_json( + key.get_active_json(address)) def _get_key_doc(self, address, private=False): """ diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 4daf3465..6aeb67a8 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -89,7 +89,7 @@ class KeyManagerUtilTestCase(BaseLeapTest): 'encr_used': False, 'sign_used': True, } - key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict) + key = build_key_from_dict(OpenPGPKey, kdict) self.assertEqual( kdict['address'], key.address, 'Wrong data in key.') @@ -144,7 +144,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) key = pgp.get_key(ADDRESS, private=False) pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -153,7 +153,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) key = pgp.get_key(ADDRESS, private=False) self.assertIsInstance(key, openpgp.OpenPGPKey) self.assertTrue( @@ -167,7 +167,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) key = pgp.get_key(ADDRESS, private=False) @@ -181,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): # encrypt pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) pubkey = pgp.get_key(ADDRESS, private=False) cyphertext = pgp.encrypt('data', pubkey) # assert @@ -193,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): # decrypt self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) privkey = pgp.get_key(ADDRESS, private=True) pgp.delete_key(pubkey) pgp.delete_key(privkey) @@ -216,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_with_public_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) data = 'data' pubkey = pgp.get_key(ADDRESS, private=False) self.assertRaises( @@ -226,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_verify_with_wrong_key_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signed = pgp.sign(data, privkey) - pgp.put_ascii_key(PUBLIC_KEY_2) + pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, @@ -239,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_encrypt_sign_with_public_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) @@ -250,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_verify_with_private_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) @@ -264,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_verify_with_wrong_key_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) - pgp.put_ascii_key(PUBLIC_KEY_2) + pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, @@ -278,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_verify(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signed = pgp.sign(data, privkey, detach=False) @@ -288,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_encrypt_sign_decrypt_verify(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) pubkey = pgp.get_key(ADDRESS, private=False) privkey = pgp.get_key(ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY_2) + pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2) pubkey2 = pgp.get_key(ADDRESS_2, private=False) privkey2 = pgp.get_key(ADDRESS_2, private=True) data = 'data' @@ -304,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_verify_detached_sig(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signature = pgp.sign(data, privkey, detach=True) @@ -316,7 +316,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_all_keys_in_db(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public keys keys = km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') @@ -330,7 +330,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_public_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key key = km.get_key(ADDRESS, OpenPGPKey, private=False, fetch_remote=False) @@ -342,7 +342,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_private_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key key = km.get_key(ADDRESS, OpenPGPKey, private=True, fetch_remote=False) @@ -364,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ token = "mytoken" km = self._key_manager(token=token) - km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS) km._fetcher.put = Mock() # the following data will be used on the send km.ca_cert_path = 'capath' @@ -447,7 +447,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url='http://nickserver.domain') - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.address) @@ -509,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_encrypt_decrypt(self): km = self._key_manager() # put raw private key - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public key pubkey = km.get_key( ADDRESS, OpenPGPKey, private=False, fetch_remote=False) @@ -526,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_sign_verify(self): km = self._key_manager() # put raw private keys - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get private key for signing privkey = km.get_key( ADDRESS, OpenPGPKey, private=True, fetch_remote=False) diff --git a/keymanager/src/leap/keymanager/tests/test_validation.py b/keymanager/src/leap/keymanager/tests/test_validation.py index 3ae873d4..400d36e8 100644 --- a/keymanager/src/leap/keymanager/tests/test_validation.py +++ b/keymanager/src/leap/keymanager/tests/test_validation.py @@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase): def test_none_old_key(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, KEY_FINGERPRINT) def test_cant_upgrade(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Provider_Trust) self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, - OpenPGPKey) + OpenPGPKey, ADDRESS) def test_fingerprint_level(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey, + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Fingerprint) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) def test_expired_key(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey) + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) + km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) def test_expired_fail_lower_level(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey, + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Third_Party_Endorsement) self.assertRaises( KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, OpenPGPKey, + ADDRESS, validation=ValidationLevel.Provider_Trust) def test_roll_back(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey) - km.put_raw_key(EXPIRED_KEY, OpenPGPKey) + km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) diff --git a/keymanager/src/leap/keymanager/validation.py b/keymanager/src/leap/keymanager/validation.py index cf5b4a83..245013e5 100644 --- a/keymanager/src/leap/keymanager/validation.py +++ b/keymanager/src/leap/keymanager/validation.py @@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key): if old_key is None: return True - if new_key.address != old_key.address: - # XXX how do we map multiple IDs? (#6212) - return False - # An update of the same key if new_key.fingerprint == old_key.fingerprint: return True -- cgit v1.2.3 From b40e61f4d7624250325378d51a289a92773e785c Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Mon, 10 Nov 2014 19:00:04 -0600 Subject: Fix comments --- keymanager/src/leap/keymanager/__init__.py | 4 ++-- keymanager/src/leap/keymanager/keys.py | 4 ++-- keymanager/src/leap/keymanager/openpgp.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index c64cdeab..e4be9c47 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -493,8 +493,8 @@ class KeyManager(object): verified using this detached signature. :type detached_sig: str - :return: The signed data. - :rtype: str + :return: signature matches + :rtype: bool """ leap_assert_type(pubkey, EncryptionKey) leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index 2108aa26..7c732e35 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -411,7 +411,7 @@ class EncryptionScheme(object): verified against this sdetached signature. :type detached_sig: str - :return: The signed data. - :rtype: str + :return: signature matches + :rtype: bool """ pass diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 4f965745..3f298f71 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -742,8 +742,8 @@ class OpenPGPScheme(EncryptionScheme): verified against this detached signature. :type detached_sig: str - :return: The ascii-armored signed data. - :rtype: str + :return: signature matches + :rtype: bool """ leap_assert_type(pubkey, OpenPGPKey) leap_assert(pubkey.private is False) -- cgit v1.2.3