summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2014-11-06 00:47:32 -0600
committerRuben Pollan <meskio@sindominio.net>2014-11-10 13:41:28 -0600
commit88b2416234ed88bc7652c452f779c95f17bf210f (patch)
treeed0a46c1b89c67516504539ddb7b773908a31a0d
parent27d2c307b31dd72705323e835d879c7654d5cac1 (diff)
Implement the new encryption-key soledad document
-rw-r--r--keymanager/src/leap/keymanager/__init__.py10
-rw-r--r--keymanager/src/leap/keymanager/keys.py60
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py22
-rw-r--r--keymanager/src/leap/keymanager/tests/test_keymanager.py55
4 files changed, 89 insertions, 58 deletions
diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py
index 156aaf86..53dd9a78 100644
--- a/keymanager/src/leap/keymanager/__init__.py
+++ b/keymanager/src/leap/keymanager/__init__.py
@@ -319,7 +319,7 @@ class KeyManager(object):
return map(
lambda doc: build_key_from_dict(
self._key_class_from_type(doc.content['type']),
- doc.content['address'],
+ doc.content['address'][0],
doc.content),
self._soledad.get_from_index(
TAGS_PRIVATE_INDEX,
@@ -529,7 +529,7 @@ class KeyManager(object):
new one is not a valid update for it
"""
try:
- old_key = self._wrapper_map[type(key)].get_key(key.address,
+ old_key = self._wrapper_map[type(key)].get_key(key.address[0],
private=key.private)
except KeyNotFound:
old_key = None
@@ -564,7 +564,7 @@ class KeyManager(object):
new one is not a valid update for it
"""
pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key)
- if address is not None and address != pubkey.address:
+ if address is not None and address not in pubkey.address:
raise KeyAddressMismatch("Key UID %s, but expected %s"
% (pubkey.address, address))
@@ -600,9 +600,9 @@ class KeyManager(object):
pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
if pubkey is None:
raise KeyNotFound(uri)
- if pubkey.address != address:
+ if address not in pubkey.address:
raise KeyAddressMismatch("UID %s found, but expected %s"
- % (pubkey.address, address))
+ % (str(pubkey.address), address))
pubkey.validation = validation
self.put_key(pubkey)
diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py
index a61a8c79..4952b9b3 100644
--- a/keymanager/src/leap/keymanager/keys.py
+++ b/keymanager/src/leap/keymanager/keys.py
@@ -27,6 +27,7 @@ except ImportError:
import json # noqa
import logging
import re
+import time
from abc import ABCMeta, abstractmethod
@@ -50,9 +51,11 @@ KEY_DATA_KEY = 'key_data'
KEY_PRIVATE_KEY = 'private'
KEY_LENGTH_KEY = 'length'
KEY_EXPIRY_DATE_KEY = 'expiry_date'
-KEY_FIRST_SEEN_AT_KEY = 'first_seen_at'
KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
+KEY_REFRESHED_AT_KEY = 'refreshed_at'
KEY_VALIDATION_KEY = 'validation'
+KEY_ENCR_USED_KEY = 'encr_used'
+KEY_SIGN_USED_KEY = 'sign_used'
KEY_TAGS_KEY = 'tags'
@@ -110,7 +113,7 @@ def build_key_from_dict(kClass, address, kdict):
:rtype: C{kClass}
"""
leap_assert(
- address == kdict[KEY_ADDRESS_KEY],
+ address in kdict[KEY_ADDRESS_KEY],
'Wrong address in key data.')
try:
validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
@@ -119,24 +122,40 @@ def build_key_from_dict(kClass, address, kdict):
(kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
validation = ValidationLevel.Weak_Chain
- expiry_date = None
- if kdict[KEY_EXPIRY_DATE_KEY]:
- expiry_date = datetime.fromtimestamp(int(kdict[KEY_EXPIRY_DATE_KEY]))
+ expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
+ last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
+ refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
return kClass(
- address,
+ [address],
key_id=kdict[KEY_ID_KEY],
fingerprint=kdict[KEY_FINGERPRINT_KEY],
key_data=kdict[KEY_DATA_KEY],
private=kdict[KEY_PRIVATE_KEY],
length=kdict[KEY_LENGTH_KEY],
expiry_date=expiry_date,
- first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY],
- last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY],
+ last_audited_at=last_audited_at,
+ refreshed_at=refreshed_at,
validation=validation,
+ encr_used=kdict[KEY_ENCR_USED_KEY],
+ sign_used=kdict[KEY_SIGN_USED_KEY],
)
+def _to_datetime(unix_time):
+ if unix_time != 0:
+ return datetime.fromtimestamp(unix_time)
+ else:
+ return None
+
+
+def _to_unix_time(date):
+ if date is not None:
+ return int(time.mktime(date.timetuple()))
+ else:
+ return 0
+
+
#
# Abstraction for encryption keys
#
@@ -151,9 +170,10 @@ class EncryptionKey(object):
__metaclass__ = ABCMeta
- def __init__(self, address, key_id=None, fingerprint=None,
- key_data=None, private=None, length=None, expiry_date=None,
- validation=None, first_seen_at=None, last_audited_at=None):
+ def __init__(self, address, key_id="", fingerprint="",
+ key_data="", private=False, length=0, expiry_date=None,
+ validation=ValidationLevel.Weak_Chain, last_audited_at=None,
+ refreshed_at=None, encr_used=False, sign_used=False):
self.address = address
self.key_id = key_id
self.fingerprint = fingerprint
@@ -162,8 +182,10 @@ class EncryptionKey(object):
self.length = length
self.expiry_date = expiry_date
self.validation = validation
- self.first_seen_at = first_seen_at
self.last_audited_at = last_audited_at
+ self.refreshed_at = refreshed_at
+ self.encr_used = encr_used
+ self.sign_used = sign_used
def get_json(self):
"""
@@ -172,9 +194,9 @@ class EncryptionKey(object):
:return: The JSON string describing this key.
:rtype: str
"""
- expiry_str = ""
- if self.expiry_date is not None:
- expiry_str = self.expiry_date.strftime("%s")
+ expiry_date = _to_unix_time(self.expiry_date)
+ last_audited_at = _to_unix_time(self.last_audited_at)
+ refreshed_at = _to_unix_time(self.refreshed_at)
return json.dumps({
KEY_ADDRESS_KEY: self.address,
@@ -184,10 +206,12 @@ class EncryptionKey(object):
KEY_DATA_KEY: self.key_data,
KEY_PRIVATE_KEY: self.private,
KEY_LENGTH_KEY: self.length,
- KEY_EXPIRY_DATE_KEY: expiry_str,
+ KEY_EXPIRY_DATE_KEY: expiry_date,
+ KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+ KEY_REFRESHED_AT_KEY: refreshed_at,
KEY_VALIDATION_KEY: str(self.validation),
- KEY_FIRST_SEEN_AT_KEY: self.first_seen_at,
- KEY_LAST_AUDITED_AT_KEY: self.last_audited_at,
+ KEY_ENCR_USED_KEY: self.encr_used,
+ KEY_SIGN_USED_KEY: self.sign_used,
KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
})
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index d3c305e2..1160434c 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -41,7 +41,6 @@ from leap.keymanager.keys import (
KEY_FINGERPRINT_KEY,
KEY_DATA_KEY,
)
-from leap.keymanager.validation import ValidationLevel
logger = logging.getLogger(__name__)
@@ -109,9 +108,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address, privkeys)
+ privaddrs = map(lambda privkey: privkey.address[0], privkeys)
publkeys = filter(
- lambda pubkey: pubkey.address not in privaddrs, publkeys)
+ lambda pubkey: pubkey.address[0] not in privaddrs, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -184,14 +183,14 @@ def _build_key_from_gpg(address, key, key_data):
expiry_date = datetime.fromtimestamp(int(key['expires']))
return OpenPGPKey(
- address,
+ [address],
key_id=key['keyid'],
fingerprint=key['fingerprint'],
key_data=key_data,
private=True if key['type'] == 'sec' else False,
- length=key['length'],
+ length=int(key['length']),
expiry_date=expiry_date,
- validation=ValidationLevel.Weak_Chain,
+ refreshed_at=datetime.now(),
)
@@ -397,7 +396,7 @@ class OpenPGPScheme(EncryptionScheme):
:param key: The key to be stored.
:type key: OpenPGPKey
"""
- doc = self._get_key_doc(key.address, private=key.private)
+ doc = self._get_key_doc(key.address[0], private=key.private)
if doc is None:
self._soledad.create_doc_from_json(key.get_json())
else:
@@ -408,7 +407,7 @@ class OpenPGPScheme(EncryptionScheme):
gpg.import_keys(key.key_data)
gpgkey = gpg.list_keys(secret=key.private).pop()
key = _build_key_from_gpg(
- key.address, gpgkey,
+ key.address[0], gpgkey,
gpg.export_keys(gpgkey['fingerprint'],
secret=key.private))
doc.set_json(key.get_json())
@@ -452,12 +451,11 @@ class OpenPGPScheme(EncryptionScheme):
:type key: EncryptionKey
"""
leap_assert_type(key, OpenPGPKey)
- stored_key = self.get_key(key.address, private=key.private)
- if stored_key is None:
+ doc = self._get_key_doc(key.address[0], key.private)
+ if doc is None:
raise errors.KeyNotFound(key)
- if stored_key.__dict__ != key.__dict__:
+ if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint:
raise errors.KeyAttributesDiffer(key)
- doc = self._get_key_doc(key.address, key.private)
self._soledad.delete_doc(doc)
#
diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py
index 6a877bcf..4daf3465 100644
--- a/keymanager/src/leap/keymanager/tests/test_keymanager.py
+++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py
@@ -21,6 +21,7 @@ Tests for the Key Manager.
"""
+from datetime import datetime
from mock import Mock
from leap.common.testing.basetest import BaseLeapTest
from leap.keymanager import (
@@ -75,16 +76,18 @@ class KeyManagerUtilTestCase(BaseLeapTest):
def test_build_key_from_dict(self):
kdict = {
- 'address': ADDRESS,
- 'key_id': 'key_id',
- 'fingerprint': 'fingerprint',
- 'key_data': 'key_data',
- 'private': 'private',
- 'length': 'length',
- 'expiry_date': '',
- 'first_seen_at': 'first_seen_at',
- 'last_audited_at': 'last_audited_at',
+ 'address': [ADDRESS],
+ 'key_id': KEY_FINGERPRINT[-16:],
+ 'fingerprint': KEY_FINGERPRINT,
+ 'key_data': PUBLIC_KEY,
+ 'private': False,
+ 'length': 4096,
+ 'expiry_date': 0,
+ 'last_audited_at': 0,
+ 'refreshed_at': 1311239602,
'validation': str(ValidationLevel.Weak_Chain),
+ 'encr_used': False,
+ 'sign_used': True,
}
key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
self.assertEqual(
@@ -109,14 +112,20 @@ class KeyManagerUtilTestCase(BaseLeapTest):
None, key.expiry_date,
'Wrong data in key.')
self.assertEqual(
- kdict['first_seen_at'], key.first_seen_at,
+ None, key.last_audited_at,
'Wrong data in key.')
self.assertEqual(
- kdict['last_audited_at'], key.last_audited_at,
+ datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
'Wrong data in key.')
self.assertEqual(
toValidationLevel(kdict['validation']), key.validation,
'Wrong data in key.')
+ self.assertEqual(
+ kdict['encr_used'], key.encr_used,
+ 'Wrong data in key.')
+ self.assertEqual(
+ kdict['sign_used'], key.sign_used,
+ 'Wrong data in key.')
class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -127,9 +136,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
key = pgp.gen_key('user@leap.se')
self.assertIsInstance(key, openpgp.OpenPGPKey)
self.assertEqual(
- 'user@leap.se', key.address, 'Wrong address bound to key.')
+ ['user@leap.se'], key.address, 'Wrong address bound to key.')
self.assertEqual(
- '4096', key.length, 'Wrong key length.')
+ 4096, key.length, 'Wrong key length.')
def test_openpgp_put_delete_key(self):
pgp = openpgp.OpenPGPScheme(
@@ -147,10 +156,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp.put_ascii_key(PUBLIC_KEY)
key = pgp.get_key(ADDRESS, private=False)
self.assertIsInstance(key, openpgp.OpenPGPKey)
+ self.assertTrue(
+ ADDRESS in key.address, 'Wrong address bound to key.')
self.assertEqual(
- ADDRESS, key.address, 'Wrong address bound to key.')
- self.assertEqual(
- '4096', key.length, 'Wrong key length.')
+ 4096, key.length, 'Wrong key length.')
pgp.delete_key(key)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
@@ -162,7 +171,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
key = pgp.get_key(ADDRESS, private=False)
- self.assertEqual(ADDRESS, key.address)
+ self.assertTrue(ADDRESS in key.address)
self.assertFalse(key.private)
self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
pgp.delete_key(key)
@@ -311,12 +320,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
# get public keys
keys = km.get_all_keys(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual(ADDRESS, keys[0].address)
+ self.assertTrue(ADDRESS in keys[0].address)
self.assertFalse(keys[0].private)
# get private keys
keys = km.get_all_keys(True)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual(ADDRESS, keys[0].address)
+ self.assertTrue(ADDRESS in keys[0].address)
self.assertTrue(keys[0].private)
def test_get_public_key(self):
@@ -326,7 +335,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
key = km.get_key(ADDRESS, OpenPGPKey, private=False,
fetch_remote=False)
self.assertTrue(key is not None)
- self.assertEqual(key.address, ADDRESS)
+ self.assertTrue(ADDRESS in key.address)
self.assertEqual(
key.fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertFalse(key.private)
@@ -338,7 +347,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
key = km.get_key(ADDRESS, OpenPGPKey, private=True,
fetch_remote=False)
self.assertTrue(key is not None)
- self.assertEqual(key.address, ADDRESS)
+ self.assertTrue(ADDRESS in key.address)
self.assertEqual(
key.fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertTrue(key.private)
@@ -430,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
# try to get key fetching from server.
key = km.get_key(ADDRESS, OpenPGPKey)
self.assertIsInstance(key, OpenPGPKey)
- self.assertEqual(ADDRESS, key.address)
+ self.assertTrue(ADDRESS in key.address)
def test_put_key_ascii(self):
"""
@@ -441,7 +450,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
key = km.get_key(ADDRESS, OpenPGPKey)
self.assertIsInstance(key, OpenPGPKey)
- self.assertEqual(ADDRESS, key.address)
+ self.assertTrue(ADDRESS in key.address)
def test_fetch_uri_ascii_key(self):
"""