From 88b2416234ed88bc7652c452f779c95f17bf210f Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Thu, 6 Nov 2014 00:47:32 -0600 Subject: Implement the new encryption-key soledad document --- keymanager/src/leap/keymanager/__init__.py | 10 ++-- keymanager/src/leap/keymanager/keys.py | 60 +++++++++++++++------- keymanager/src/leap/keymanager/openpgp.py | 22 ++++---- .../src/leap/keymanager/tests/test_keymanager.py | 55 +++++++++++--------- 4 files changed, 89 insertions(+), 58 deletions(-) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 156aaf8..53dd9a7 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -319,7 +319,7 @@ class KeyManager(object): return map( lambda doc: build_key_from_dict( self._key_class_from_type(doc.content['type']), - doc.content['address'], + doc.content['address'][0], doc.content), self._soledad.get_from_index( TAGS_PRIVATE_INDEX, @@ -529,7 +529,7 @@ class KeyManager(object): new one is not a valid update for it """ try: - old_key = self._wrapper_map[type(key)].get_key(key.address, + old_key = self._wrapper_map[type(key)].get_key(key.address[0], private=key.private) except KeyNotFound: old_key = None @@ -564,7 +564,7 @@ class KeyManager(object): new one is not a valid update for it """ pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key) - if address is not None and address != pubkey.address: + if address is not None and address not in pubkey.address: raise KeyAddressMismatch("Key UID %s, but expected %s" % (pubkey.address, address)) @@ -600,9 +600,9 @@ class KeyManager(object): pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content) if pubkey is None: raise KeyNotFound(uri) - if pubkey.address != address: + if address not in pubkey.address: raise KeyAddressMismatch("UID %s found, but expected %s" - % (pubkey.address, address)) + % (str(pubkey.address), address)) pubkey.validation = validation self.put_key(pubkey) diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index a61a8c7..4952b9b 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -27,6 +27,7 @@ except ImportError: import json # noqa import logging import re +import time from abc import ABCMeta, abstractmethod @@ -50,9 +51,11 @@ KEY_DATA_KEY = 'key_data' KEY_PRIVATE_KEY = 'private' KEY_LENGTH_KEY = 'length' KEY_EXPIRY_DATE_KEY = 'expiry_date' -KEY_FIRST_SEEN_AT_KEY = 'first_seen_at' KEY_LAST_AUDITED_AT_KEY = 'last_audited_at' +KEY_REFRESHED_AT_KEY = 'refreshed_at' KEY_VALIDATION_KEY = 'validation' +KEY_ENCR_USED_KEY = 'encr_used' +KEY_SIGN_USED_KEY = 'sign_used' KEY_TAGS_KEY = 'tags' @@ -110,7 +113,7 @@ def build_key_from_dict(kClass, address, kdict): :rtype: C{kClass} """ leap_assert( - address == kdict[KEY_ADDRESS_KEY], + address in kdict[KEY_ADDRESS_KEY], 'Wrong address in key data.') try: validation = toValidationLevel(kdict[KEY_VALIDATION_KEY]) @@ -119,24 +122,40 @@ def build_key_from_dict(kClass, address, kdict): (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY])) validation = ValidationLevel.Weak_Chain - expiry_date = None - if kdict[KEY_EXPIRY_DATE_KEY]: - expiry_date = datetime.fromtimestamp(int(kdict[KEY_EXPIRY_DATE_KEY])) + expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY]) + last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY]) + refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY]) return kClass( - address, + [address], key_id=kdict[KEY_ID_KEY], fingerprint=kdict[KEY_FINGERPRINT_KEY], key_data=kdict[KEY_DATA_KEY], private=kdict[KEY_PRIVATE_KEY], length=kdict[KEY_LENGTH_KEY], expiry_date=expiry_date, - first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY], - last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY], + last_audited_at=last_audited_at, + refreshed_at=refreshed_at, validation=validation, + encr_used=kdict[KEY_ENCR_USED_KEY], + sign_used=kdict[KEY_SIGN_USED_KEY], ) +def _to_datetime(unix_time): + if unix_time != 0: + return datetime.fromtimestamp(unix_time) + else: + return None + + +def _to_unix_time(date): + if date is not None: + return int(time.mktime(date.timetuple())) + else: + return 0 + + # # Abstraction for encryption keys # @@ -151,9 +170,10 @@ class EncryptionKey(object): __metaclass__ = ABCMeta - def __init__(self, address, key_id=None, fingerprint=None, - key_data=None, private=None, length=None, expiry_date=None, - validation=None, first_seen_at=None, last_audited_at=None): + def __init__(self, address, key_id="", fingerprint="", + key_data="", private=False, length=0, expiry_date=None, + validation=ValidationLevel.Weak_Chain, last_audited_at=None, + refreshed_at=None, encr_used=False, sign_used=False): self.address = address self.key_id = key_id self.fingerprint = fingerprint @@ -162,8 +182,10 @@ class EncryptionKey(object): self.length = length self.expiry_date = expiry_date self.validation = validation - self.first_seen_at = first_seen_at self.last_audited_at = last_audited_at + self.refreshed_at = refreshed_at + self.encr_used = encr_used + self.sign_used = sign_used def get_json(self): """ @@ -172,9 +194,9 @@ class EncryptionKey(object): :return: The JSON string describing this key. :rtype: str """ - expiry_str = "" - if self.expiry_date is not None: - expiry_str = self.expiry_date.strftime("%s") + expiry_date = _to_unix_time(self.expiry_date) + last_audited_at = _to_unix_time(self.last_audited_at) + refreshed_at = _to_unix_time(self.refreshed_at) return json.dumps({ KEY_ADDRESS_KEY: self.address, @@ -184,10 +206,12 @@ class EncryptionKey(object): KEY_DATA_KEY: self.key_data, KEY_PRIVATE_KEY: self.private, KEY_LENGTH_KEY: self.length, - KEY_EXPIRY_DATE_KEY: expiry_str, + KEY_EXPIRY_DATE_KEY: expiry_date, + KEY_LAST_AUDITED_AT_KEY: last_audited_at, + KEY_REFRESHED_AT_KEY: refreshed_at, KEY_VALIDATION_KEY: str(self.validation), - KEY_FIRST_SEEN_AT_KEY: self.first_seen_at, - KEY_LAST_AUDITED_AT_KEY: self.last_audited_at, + KEY_ENCR_USED_KEY: self.encr_used, + KEY_SIGN_USED_KEY: self.sign_used, KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG], }) diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index d3c305e..1160434 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -41,7 +41,6 @@ from leap.keymanager.keys import ( KEY_FINGERPRINT_KEY, KEY_DATA_KEY, ) -from leap.keymanager.validation import ValidationLevel logger = logging.getLogger(__name__) @@ -109,9 +108,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address, privkeys) + privaddrs = map(lambda privkey: privkey.address[0], privkeys) publkeys = filter( - lambda pubkey: pubkey.address not in privaddrs, publkeys) + lambda pubkey: pubkey.address[0] not in privaddrs, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -184,14 +183,14 @@ def _build_key_from_gpg(address, key, key_data): expiry_date = datetime.fromtimestamp(int(key['expires'])) return OpenPGPKey( - address, + [address], key_id=key['keyid'], fingerprint=key['fingerprint'], key_data=key_data, private=True if key['type'] == 'sec' else False, - length=key['length'], + length=int(key['length']), expiry_date=expiry_date, - validation=ValidationLevel.Weak_Chain, + refreshed_at=datetime.now(), ) @@ -397,7 +396,7 @@ class OpenPGPScheme(EncryptionScheme): :param key: The key to be stored. :type key: OpenPGPKey """ - doc = self._get_key_doc(key.address, private=key.private) + doc = self._get_key_doc(key.address[0], private=key.private) if doc is None: self._soledad.create_doc_from_json(key.get_json()) else: @@ -408,7 +407,7 @@ class OpenPGPScheme(EncryptionScheme): gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() key = _build_key_from_gpg( - key.address, gpgkey, + key.address[0], gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) doc.set_json(key.get_json()) @@ -452,12 +451,11 @@ class OpenPGPScheme(EncryptionScheme): :type key: EncryptionKey """ leap_assert_type(key, OpenPGPKey) - stored_key = self.get_key(key.address, private=key.private) - if stored_key is None: + doc = self._get_key_doc(key.address[0], key.private) + if doc is None: raise errors.KeyNotFound(key) - if stored_key.__dict__ != key.__dict__: + if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint: raise errors.KeyAttributesDiffer(key) - doc = self._get_key_doc(key.address, key.private) self._soledad.delete_doc(doc) # diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 6a877bc..4daf346 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -21,6 +21,7 @@ Tests for the Key Manager. """ +from datetime import datetime from mock import Mock from leap.common.testing.basetest import BaseLeapTest from leap.keymanager import ( @@ -75,16 +76,18 @@ class KeyManagerUtilTestCase(BaseLeapTest): def test_build_key_from_dict(self): kdict = { - 'address': ADDRESS, - 'key_id': 'key_id', - 'fingerprint': 'fingerprint', - 'key_data': 'key_data', - 'private': 'private', - 'length': 'length', - 'expiry_date': '', - 'first_seen_at': 'first_seen_at', - 'last_audited_at': 'last_audited_at', + 'address': [ADDRESS], + 'key_id': KEY_FINGERPRINT[-16:], + 'fingerprint': KEY_FINGERPRINT, + 'key_data': PUBLIC_KEY, + 'private': False, + 'length': 4096, + 'expiry_date': 0, + 'last_audited_at': 0, + 'refreshed_at': 1311239602, 'validation': str(ValidationLevel.Weak_Chain), + 'encr_used': False, + 'sign_used': True, } key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict) self.assertEqual( @@ -109,14 +112,20 @@ class KeyManagerUtilTestCase(BaseLeapTest): None, key.expiry_date, 'Wrong data in key.') self.assertEqual( - kdict['first_seen_at'], key.first_seen_at, + None, key.last_audited_at, 'Wrong data in key.') self.assertEqual( - kdict['last_audited_at'], key.last_audited_at, + datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at, 'Wrong data in key.') self.assertEqual( toValidationLevel(kdict['validation']), key.validation, 'Wrong data in key.') + self.assertEqual( + kdict['encr_used'], key.encr_used, + 'Wrong data in key.') + self.assertEqual( + kdict['sign_used'], key.sign_used, + 'Wrong data in key.') class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -127,9 +136,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): key = pgp.gen_key('user@leap.se') self.assertIsInstance(key, openpgp.OpenPGPKey) self.assertEqual( - 'user@leap.se', key.address, 'Wrong address bound to key.') + ['user@leap.se'], key.address, 'Wrong address bound to key.') self.assertEqual( - '4096', key.length, 'Wrong key length.') + 4096, key.length, 'Wrong key length.') def test_openpgp_put_delete_key(self): pgp = openpgp.OpenPGPScheme( @@ -147,10 +156,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp.put_ascii_key(PUBLIC_KEY) key = pgp.get_key(ADDRESS, private=False) self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertTrue( + ADDRESS in key.address, 'Wrong address bound to key.') self.assertEqual( - ADDRESS, key.address, 'Wrong address bound to key.') - self.assertEqual( - '4096', key.length, 'Wrong key length.') + 4096, key.length, 'Wrong key length.') pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -162,7 +171,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) key = pgp.get_key(ADDRESS, private=False) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) self.assertFalse(key.private) self.assertEqual(KEY_FINGERPRINT, key.fingerprint) pgp.delete_key(key) @@ -311,12 +320,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): # get public keys keys = km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') - self.assertEqual(ADDRESS, keys[0].address) + self.assertTrue(ADDRESS in keys[0].address) self.assertFalse(keys[0].private) # get private keys keys = km.get_all_keys(True) self.assertEqual(len(keys), 1, 'Wrong number of keys') - self.assertEqual(ADDRESS, keys[0].address) + self.assertTrue(ADDRESS in keys[0].address) self.assertTrue(keys[0].private) def test_get_public_key(self): @@ -326,7 +335,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): key = km.get_key(ADDRESS, OpenPGPKey, private=False, fetch_remote=False) self.assertTrue(key is not None) - self.assertEqual(key.address, ADDRESS) + self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertFalse(key.private) @@ -338,7 +347,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): key = km.get_key(ADDRESS, OpenPGPKey, private=True, fetch_remote=False) self.assertTrue(key is not None) - self.assertEqual(key.address, ADDRESS) + self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertTrue(key.private) @@ -430,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): # try to get key fetching from server. key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) def test_put_key_ascii(self): """ @@ -441,7 +450,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km.put_raw_key(PUBLIC_KEY, OpenPGPKey) key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) - self.assertEqual(ADDRESS, key.address) + self.assertTrue(ADDRESS in key.address) def test_fetch_uri_ascii_key(self): """ -- cgit v1.2.3