Implement the new encryption-key soledad document
authorRuben Pollan <meskio@sindominio.net>
Thu, 6 Nov 2014 06:47:32 +0000 (00:47 -0600)
committerRuben Pollan <meskio@sindominio.net>
Mon, 10 Nov 2014 19:41:28 +0000 (13:41 -0600)
src/leap/keymanager/__init__.py
src/leap/keymanager/keys.py
src/leap/keymanager/openpgp.py
src/leap/keymanager/tests/test_keymanager.py

index 156aaf8..53dd9a7 100644 (file)
@@ -319,7 +319,7 @@ class KeyManager(object):
         return map(
             lambda doc: build_key_from_dict(
                 self._key_class_from_type(doc.content['type']),
-                doc.content['address'],
+                doc.content['address'][0],
                 doc.content),
             self._soledad.get_from_index(
                 TAGS_PRIVATE_INDEX,
@@ -529,7 +529,7 @@ class KeyManager(object):
                                    new one is not a valid update for it
         """
         try:
-            old_key = self._wrapper_map[type(key)].get_key(key.address,
+            old_key = self._wrapper_map[type(key)].get_key(key.address[0],
                                                            private=key.private)
         except KeyNotFound:
             old_key = None
@@ -564,7 +564,7 @@ class KeyManager(object):
                                    new one is not a valid update for it
         """
         pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key)
-        if address is not None and address != pubkey.address:
+        if address is not None and address not in pubkey.address:
             raise KeyAddressMismatch("Key UID %s, but expected %s"
                                      % (pubkey.address, address))
 
@@ -600,9 +600,9 @@ class KeyManager(object):
         pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
         if pubkey is None:
             raise KeyNotFound(uri)
-        if pubkey.address != address:
+        if address not in pubkey.address:
             raise KeyAddressMismatch("UID %s found, but expected %s"
-                                     % (pubkey.address, address))
+                                     % (str(pubkey.address), address))
 
         pubkey.validation = validation
         self.put_key(pubkey)
index a61a8c7..4952b9b 100644 (file)
@@ -27,6 +27,7 @@ except ImportError:
     import json  # noqa
 import logging
 import re
+import time
 
 
 from abc import ABCMeta, abstractmethod
@@ -50,9 +51,11 @@ KEY_DATA_KEY = 'key_data'
 KEY_PRIVATE_KEY = 'private'
 KEY_LENGTH_KEY = 'length'
 KEY_EXPIRY_DATE_KEY = 'expiry_date'
-KEY_FIRST_SEEN_AT_KEY = 'first_seen_at'
 KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
+KEY_REFRESHED_AT_KEY = 'refreshed_at'
 KEY_VALIDATION_KEY = 'validation'
+KEY_ENCR_USED_KEY = 'encr_used'
+KEY_SIGN_USED_KEY = 'sign_used'
 KEY_TAGS_KEY = 'tags'
 
 
@@ -110,7 +113,7 @@ def build_key_from_dict(kClass, address, kdict):
     :rtype: C{kClass}
     """
     leap_assert(
-        address == kdict[KEY_ADDRESS_KEY],
+        address in kdict[KEY_ADDRESS_KEY],
         'Wrong address in key data.')
     try:
         validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
@@ -119,24 +122,40 @@ def build_key_from_dict(kClass, address, kdict):
                      (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
         validation = ValidationLevel.Weak_Chain
 
-    expiry_date = None
-    if kdict[KEY_EXPIRY_DATE_KEY]:
-        expiry_date = datetime.fromtimestamp(int(kdict[KEY_EXPIRY_DATE_KEY]))
+    expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
+    last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
+    refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
 
     return kClass(
-        address,
+        [address],
         key_id=kdict[KEY_ID_KEY],
         fingerprint=kdict[KEY_FINGERPRINT_KEY],
         key_data=kdict[KEY_DATA_KEY],
         private=kdict[KEY_PRIVATE_KEY],
         length=kdict[KEY_LENGTH_KEY],
         expiry_date=expiry_date,
-        first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY],
-        last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY],
+        last_audited_at=last_audited_at,
+        refreshed_at=refreshed_at,
         validation=validation,
+        encr_used=kdict[KEY_ENCR_USED_KEY],
+        sign_used=kdict[KEY_SIGN_USED_KEY],
     )
 
 
+def _to_datetime(unix_time):
+    if unix_time != 0:
+        return datetime.fromtimestamp(unix_time)
+    else:
+        return None
+
+
+def _to_unix_time(date):
+    if date is not None:
+        return int(time.mktime(date.timetuple()))
+    else:
+        return 0
+
+
 #
 # Abstraction for encryption keys
 #
@@ -151,9 +170,10 @@ class EncryptionKey(object):
 
     __metaclass__ = ABCMeta
 
-    def __init__(self, address, key_id=None, fingerprint=None,
-                 key_data=None, private=None, length=None, expiry_date=None,
-                 validation=None, first_seen_at=None, last_audited_at=None):
+    def __init__(self, address, key_id="", fingerprint="",
+                 key_data="", private=False, length=0, expiry_date=None,
+                 validation=ValidationLevel.Weak_Chain, last_audited_at=None,
+                 refreshed_at=None, encr_used=False, sign_used=False):
         self.address = address
         self.key_id = key_id
         self.fingerprint = fingerprint
@@ -162,8 +182,10 @@ class EncryptionKey(object):
         self.length = length
         self.expiry_date = expiry_date
         self.validation = validation
-        self.first_seen_at = first_seen_at
         self.last_audited_at = last_audited_at
+        self.refreshed_at = refreshed_at
+        self.encr_used = encr_used
+        self.sign_used = sign_used
 
     def get_json(self):
         """
@@ -172,9 +194,9 @@ class EncryptionKey(object):
         :return: The JSON string describing this key.
         :rtype: str
         """
-        expiry_str = ""
-        if self.expiry_date is not None:
-            expiry_str = self.expiry_date.strftime("%s")
+        expiry_date = _to_unix_time(self.expiry_date)
+        last_audited_at = _to_unix_time(self.last_audited_at)
+        refreshed_at = _to_unix_time(self.refreshed_at)
 
         return json.dumps({
             KEY_ADDRESS_KEY: self.address,
@@ -184,10 +206,12 @@ class EncryptionKey(object):
             KEY_DATA_KEY: self.key_data,
             KEY_PRIVATE_KEY: self.private,
             KEY_LENGTH_KEY: self.length,
-            KEY_EXPIRY_DATE_KEY: expiry_str,
+            KEY_EXPIRY_DATE_KEY: expiry_date,
+            KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+            KEY_REFRESHED_AT_KEY: refreshed_at,
             KEY_VALIDATION_KEY: str(self.validation),
-            KEY_FIRST_SEEN_AT_KEY: self.first_seen_at,
-            KEY_LAST_AUDITED_AT_KEY: self.last_audited_at,
+            KEY_ENCR_USED_KEY: self.encr_used,
+            KEY_SIGN_USED_KEY: self.sign_used,
             KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
         })
 
index d3c305e..1160434 100644 (file)
@@ -41,7 +41,6 @@ from leap.keymanager.keys import (
     KEY_FINGERPRINT_KEY,
     KEY_DATA_KEY,
 )
-from leap.keymanager.validation import ValidationLevel
 
 
 logger = logging.getLogger(__name__)
@@ -109,9 +108,9 @@ class TempGPGWrapper(object):
         # itself is enough to also have the public key in the keyring,
         # and we want to count the keys afterwards.
 
-        privaddrs = map(lambda privkey: privkey.address, privkeys)
+        privaddrs = map(lambda privkey: privkey.address[0], privkeys)
         publkeys = filter(
-            lambda pubkey: pubkey.address not in privaddrs, publkeys)
+            lambda pubkey: pubkey.address[0] not in privaddrs, publkeys)
 
         listkeys = lambda: self._gpg.list_keys()
         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -184,14 +183,14 @@ def _build_key_from_gpg(address, key, key_data):
         expiry_date = datetime.fromtimestamp(int(key['expires']))
 
     return OpenPGPKey(
-        address,
+        [address],
         key_id=key['keyid'],
         fingerprint=key['fingerprint'],
         key_data=key_data,
         private=True if key['type'] == 'sec' else False,
-        length=key['length'],
+        length=int(key['length']),
         expiry_date=expiry_date,
-        validation=ValidationLevel.Weak_Chain,
+        refreshed_at=datetime.now(),
     )
 
 
@@ -397,7 +396,7 @@ class OpenPGPScheme(EncryptionScheme):
         :param key: The key to be stored.
         :type key: OpenPGPKey
         """
-        doc = self._get_key_doc(key.address, private=key.private)
+        doc = self._get_key_doc(key.address[0], private=key.private)
         if doc is None:
             self._soledad.create_doc_from_json(key.get_json())
         else:
@@ -408,7 +407,7 @@ class OpenPGPScheme(EncryptionScheme):
                     gpg.import_keys(key.key_data)
                     gpgkey = gpg.list_keys(secret=key.private).pop()
                     key = _build_key_from_gpg(
-                        key.address, gpgkey,
+                        key.address[0], gpgkey,
                         gpg.export_keys(gpgkey['fingerprint'],
                                         secret=key.private))
             doc.set_json(key.get_json())
@@ -452,12 +451,11 @@ class OpenPGPScheme(EncryptionScheme):
         :type key: EncryptionKey
         """
         leap_assert_type(key, OpenPGPKey)
-        stored_key = self.get_key(key.address, private=key.private)
-        if stored_key is None:
+        doc = self._get_key_doc(key.address[0], key.private)
+        if doc is None:
             raise errors.KeyNotFound(key)
-        if stored_key.__dict__ != key.__dict__:
+        if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint:
             raise errors.KeyAttributesDiffer(key)
-        doc = self._get_key_doc(key.address, key.private)
         self._soledad.delete_doc(doc)
 
     #
index 6a877bc..4daf346 100644 (file)
@@ -21,6 +21,7 @@ Tests for the Key Manager.
 """
 
 
+from datetime import datetime
 from mock import Mock
 from leap.common.testing.basetest import BaseLeapTest
 from leap.keymanager import (
@@ -75,16 +76,18 @@ class KeyManagerUtilTestCase(BaseLeapTest):
 
     def test_build_key_from_dict(self):
         kdict = {
-            'address': ADDRESS,
-            'key_id': 'key_id',
-            'fingerprint': 'fingerprint',
-            'key_data': 'key_data',
-            'private': 'private',
-            'length': 'length',
-            'expiry_date': '',
-            'first_seen_at': 'first_seen_at',
-            'last_audited_at': 'last_audited_at',
+            'address': [ADDRESS],
+            'key_id': KEY_FINGERPRINT[-16:],
+            'fingerprint': KEY_FINGERPRINT,
+            'key_data': PUBLIC_KEY,
+            'private': False,
+            'length': 4096,
+            'expiry_date': 0,
+            'last_audited_at': 0,
+            'refreshed_at': 1311239602,
             'validation': str(ValidationLevel.Weak_Chain),
+            'encr_used': False,
+            'sign_used': True,
         }
         key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
         self.assertEqual(
@@ -109,14 +112,20 @@ class KeyManagerUtilTestCase(BaseLeapTest):
             None, key.expiry_date,
             'Wrong data in key.')
         self.assertEqual(
-            kdict['first_seen_at'], key.first_seen_at,
+            None, key.last_audited_at,
             'Wrong data in key.')
         self.assertEqual(
-            kdict['last_audited_at'], key.last_audited_at,
+            datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
             'Wrong data in key.')
         self.assertEqual(
             toValidationLevel(kdict['validation']), key.validation,
             'Wrong data in key.')
+        self.assertEqual(
+            kdict['encr_used'], key.encr_used,
+            'Wrong data in key.')
+        self.assertEqual(
+            kdict['sign_used'], key.sign_used,
+            'Wrong data in key.')
 
 
 class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -127,9 +136,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         key = pgp.gen_key('user@leap.se')
         self.assertIsInstance(key, openpgp.OpenPGPKey)
         self.assertEqual(
-            'user@leap.se', key.address, 'Wrong address bound to key.')
+            ['user@leap.se'], key.address, 'Wrong address bound to key.')
         self.assertEqual(
-            '4096', key.length, 'Wrong key length.')
+            4096, key.length, 'Wrong key length.')
 
     def test_openpgp_put_delete_key(self):
         pgp = openpgp.OpenPGPScheme(
@@ -147,10 +156,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pgp.put_ascii_key(PUBLIC_KEY)
         key = pgp.get_key(ADDRESS, private=False)
         self.assertIsInstance(key, openpgp.OpenPGPKey)
+        self.assertTrue(
+            ADDRESS in key.address, 'Wrong address bound to key.')
         self.assertEqual(
-            ADDRESS, key.address, 'Wrong address bound to key.')
-        self.assertEqual(
-            '4096', key.length, 'Wrong key length.')
+            4096, key.length, 'Wrong key length.')
         pgp.delete_key(key)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
@@ -162,7 +171,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         self.assertRaises(
             KeyNotFound, pgp.get_key, ADDRESS, private=True)
         key = pgp.get_key(ADDRESS, private=False)
-        self.assertEqual(ADDRESS, key.address)
+        self.assertTrue(ADDRESS in key.address)
         self.assertFalse(key.private)
         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
         pgp.delete_key(key)
@@ -311,12 +320,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         # get public keys
         keys = km.get_all_keys(False)
         self.assertEqual(len(keys), 1, 'Wrong number of keys')
-        self.assertEqual(ADDRESS, keys[0].address)
+        self.assertTrue(ADDRESS in keys[0].address)
         self.assertFalse(keys[0].private)
         # get private keys
         keys = km.get_all_keys(True)
         self.assertEqual(len(keys), 1, 'Wrong number of keys')
-        self.assertEqual(ADDRESS, keys[0].address)
+        self.assertTrue(ADDRESS in keys[0].address)
         self.assertTrue(keys[0].private)
 
     def test_get_public_key(self):
@@ -326,7 +335,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         key = km.get_key(ADDRESS, OpenPGPKey, private=False,
                          fetch_remote=False)
         self.assertTrue(key is not None)
-        self.assertEqual(key.address, ADDRESS)
+        self.assertTrue(ADDRESS in key.address)
         self.assertEqual(
             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
         self.assertFalse(key.private)
@@ -338,7 +347,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         key = km.get_key(ADDRESS, OpenPGPKey, private=True,
                          fetch_remote=False)
         self.assertTrue(key is not None)
-        self.assertEqual(key.address, ADDRESS)
+        self.assertTrue(ADDRESS in key.address)
         self.assertEqual(
             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
         self.assertTrue(key.private)
@@ -430,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         # try to get key fetching from server.
         key = km.get_key(ADDRESS, OpenPGPKey)
         self.assertIsInstance(key, OpenPGPKey)
-        self.assertEqual(ADDRESS, key.address)
+        self.assertTrue(ADDRESS in key.address)
 
     def test_put_key_ascii(self):
         """
@@ -441,7 +450,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
         key = km.get_key(ADDRESS, OpenPGPKey)
         self.assertIsInstance(key, OpenPGPKey)
-        self.assertEqual(ADDRESS, key.address)
+        self.assertTrue(ADDRESS in key.address)
 
     def test_fetch_uri_ascii_key(self):
         """