summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-11-25 12:30:40 -0200
committerdrebs <drebs@leap.se>2014-11-25 12:30:40 -0200
commit930bb9db48d31b7ad2528564ca7d23fb696f1899 (patch)
treec62b21cfba0d88e6eda3f202561bdf9404344152
parent27d2c307b31dd72705323e835d879c7654d5cac1 (diff)
parentb40e61f4d7624250325378d51a289a92773e785c (diff)
Merge branch 'feature/6299_new_doc' into develop
-rw-r--r--keymanager/changes/feature-6212_multi_uid_support1
-rw-r--r--keymanager/changes/feature-6299_new_doc2
-rw-r--r--keymanager/src/leap/keymanager/__init__.py35
-rw-r--r--keymanager/src/leap/keymanager/keys.py108
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py212
-rw-r--r--keymanager/src/leap/keymanager/tests/test_keymanager.py103
-rw-r--r--keymanager/src/leap/keymanager/tests/test_validation.py21
-rw-r--r--keymanager/src/leap/keymanager/validation.py4
8 files changed, 314 insertions, 172 deletions
diff --git a/keymanager/changes/feature-6212_multi_uid_support b/keymanager/changes/feature-6212_multi_uid_support
new file mode 100644
index 00000000..495b8c48
--- /dev/null
+++ b/keymanager/changes/feature-6212_multi_uid_support
@@ -0,0 +1 @@
+- Multi uid support (closes #6212)
diff --git a/keymanager/changes/feature-6299_new_doc b/keymanager/changes/feature-6299_new_doc
new file mode 100644
index 00000000..c8933386
--- /dev/null
+++ b/keymanager/changes/feature-6299_new_doc
@@ -0,0 +1,2 @@
+- new soledad doc struct for encryption-keys (closes #6299)
+- keep old key after upgrade (closes #6262)
diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py
index 156aaf86..e4be9c47 100644
--- a/keymanager/src/leap/keymanager/__init__.py
+++ b/keymanager/src/leap/keymanager/__init__.py
@@ -134,7 +134,7 @@ class KeyManager(object):
Return key class from string representation of key type.
"""
return filter(
- lambda klass: str(klass) == ktype,
+ lambda klass: klass.__name__ == ktype,
self._wrapper_map).pop()
def _get(self, uri, data=None):
@@ -319,7 +319,6 @@ class KeyManager(object):
return map(
lambda doc: build_key_from_dict(
self._key_class_from_type(doc.content['type']),
- doc.content['address'],
doc.content),
self._soledad.get_from_index(
TAGS_PRIVATE_INDEX,
@@ -494,8 +493,8 @@ class KeyManager(object):
verified using this detached signature.
:type detached_sig: str
- :return: The signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
leap_assert_type(pubkey, EncryptionKey)
leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
@@ -519,24 +518,32 @@ class KeyManager(object):
except IndexError as e:
leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored
:type key: EncryptionKey
+ :param address: address for which this key will be active
+ :type address: str
+
+ :raises KeyAddressMismatch: if address doesn't match any uid on the key
:raises KeyNotValidUpdate: if a key with the same uid exists and the
new one is not a valid update for it
"""
+ if address not in key.address:
+ raise KeyAddressMismatch("UID %s found, but expected %s"
+ % (str(key.address), address))
+
try:
- old_key = self._wrapper_map[type(key)].get_key(key.address,
+ old_key = self._wrapper_map[type(key)].get_key(address,
private=key.private)
except KeyNotFound:
old_key = None
if key.private or can_upgrade(key, old_key):
try:
- self._wrapper_map[type(key)].put_key(key)
+ self._wrapper_map[type(key)].put_key(key, address)
except IndexError as e:
leap_assert(
False, "Unsupported key type. Error {0!r}".format(e))
@@ -544,7 +551,7 @@ class KeyManager(object):
raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s"
% (old_key.key_id, key.key_id))
- def put_raw_key(self, key, ktype, address=None,
+ def put_raw_key(self, key, ktype, address,
validation=ValidationLevel.Weak_Chain):
"""
Put C{key} in local storage.
@@ -553,7 +560,7 @@ class KeyManager(object):
:type key: str
:param ktype: the type of the key.
:type ktype: subclass of EncryptionKey
- :param address: if set used to check that the key is for this address
+ :param address: address for which this key will be active
:type address: str
:param validation: validation level for this key
(default: 'Weak_Chain')
@@ -564,12 +571,9 @@ class KeyManager(object):
new one is not a valid update for it
"""
pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key)
- if address is not None and address != pubkey.address:
- raise KeyAddressMismatch("Key UID %s, but expected %s"
- % (pubkey.address, address))
pubkey.validation = validation
- self.put_key(pubkey)
+ self.put_key(pubkey, address)
def fetch_key(self, address, uri, ktype,
validation=ValidationLevel.Weak_Chain):
@@ -600,12 +604,9 @@ class KeyManager(object):
pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
if pubkey is None:
raise KeyNotFound(uri)
- if pubkey.address != address:
- raise KeyAddressMismatch("UID %s found, but expected %s"
- % (pubkey.address, address))
pubkey.validation = validation
- self.put_key(pubkey)
+ self.put_key(pubkey, address)
from ._version import get_versions
__version__ = get_versions()['version']
diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py
index a61a8c79..7c732e35 100644
--- a/keymanager/src/leap/keymanager/keys.py
+++ b/keymanager/src/leap/keymanager/keys.py
@@ -27,6 +27,7 @@ except ImportError:
import json # noqa
import logging
import re
+import time
from abc import ABCMeta, abstractmethod
@@ -50,9 +51,11 @@ KEY_DATA_KEY = 'key_data'
KEY_PRIVATE_KEY = 'private'
KEY_LENGTH_KEY = 'length'
KEY_EXPIRY_DATE_KEY = 'expiry_date'
-KEY_FIRST_SEEN_AT_KEY = 'first_seen_at'
KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
+KEY_REFRESHED_AT_KEY = 'refreshed_at'
KEY_VALIDATION_KEY = 'validation'
+KEY_ENCR_USED_KEY = 'encr_used'
+KEY_SIGN_USED_KEY = 'sign_used'
KEY_TAGS_KEY = 'tags'
@@ -61,6 +64,8 @@ KEY_TAGS_KEY = 'tags'
#
KEYMANAGER_KEY_TAG = 'keymanager-key'
+KEYMANAGER_ACTIVE_TAG = 'keymanager-active'
+KEYMANAGER_ACTIVE_TYPE = '-active'
#
@@ -68,14 +73,20 @@ KEYMANAGER_KEY_TAG = 'keymanager-key'
#
TAGS_PRIVATE_INDEX = 'by-tags-private'
-TAGS_ADDRESS_PRIVATE_INDEX = 'by-tags-address-private'
+TYPE_ID_PRIVATE_INDEX = 'by-type-id-private'
+TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private'
INDEXES = {
TAGS_PRIVATE_INDEX: [
KEY_TAGS_KEY,
'bool(%s)' % KEY_PRIVATE_KEY,
],
- TAGS_ADDRESS_PRIVATE_INDEX: [
- KEY_TAGS_KEY,
+ TYPE_ID_PRIVATE_INDEX: [
+ KEY_TYPE_KEY,
+ KEY_ID_KEY,
+ 'bool(%s)' % KEY_PRIVATE_KEY,
+ ],
+ TYPE_ADDRESS_PRIVATE_INDEX: [
+ KEY_TYPE_KEY,
KEY_ADDRESS_KEY,
'bool(%s)' % KEY_PRIVATE_KEY,
]
@@ -98,20 +109,15 @@ def is_address(address):
return bool(re.match('[\w.-]+@[\w.-]+', address))
-def build_key_from_dict(kClass, address, kdict):
+def build_key_from_dict(kClass, kdict):
"""
- Build an C{kClass} key bound to C{address} based on info in C{kdict}.
+ Build an C{kClass} key based on info in C{kdict}.
- :param address: The address bound to the key.
- :type address: str
:param kdict: Dictionary with key data.
:type kdict: dict
:return: An instance of the key.
:rtype: C{kClass}
"""
- leap_assert(
- address == kdict[KEY_ADDRESS_KEY],
- 'Wrong address in key data.')
try:
validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
except ValueError:
@@ -119,24 +125,40 @@ def build_key_from_dict(kClass, address, kdict):
(kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
validation = ValidationLevel.Weak_Chain
- expiry_date = None
- if kdict[KEY_EXPIRY_DATE_KEY]:
- expiry_date = datetime.fromtimestamp(int(kdict[KEY_EXPIRY_DATE_KEY]))
+ expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
+ last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
+ refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
return kClass(
- address,
+ kdict[KEY_ADDRESS_KEY],
key_id=kdict[KEY_ID_KEY],
fingerprint=kdict[KEY_FINGERPRINT_KEY],
key_data=kdict[KEY_DATA_KEY],
private=kdict[KEY_PRIVATE_KEY],
length=kdict[KEY_LENGTH_KEY],
expiry_date=expiry_date,
- first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY],
- last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY],
+ last_audited_at=last_audited_at,
+ refreshed_at=refreshed_at,
validation=validation,
+ encr_used=kdict[KEY_ENCR_USED_KEY],
+ sign_used=kdict[KEY_SIGN_USED_KEY],
)
+def _to_datetime(unix_time):
+ if unix_time != 0:
+ return datetime.fromtimestamp(unix_time)
+ else:
+ return None
+
+
+def _to_unix_time(date):
+ if date is not None:
+ return int(time.mktime(date.timetuple()))
+ else:
+ return 0
+
+
#
# Abstraction for encryption keys
#
@@ -151,9 +173,10 @@ class EncryptionKey(object):
__metaclass__ = ABCMeta
- def __init__(self, address, key_id=None, fingerprint=None,
- key_data=None, private=None, length=None, expiry_date=None,
- validation=None, first_seen_at=None, last_audited_at=None):
+ def __init__(self, address, key_id="", fingerprint="",
+ key_data="", private=False, length=0, expiry_date=None,
+ validation=ValidationLevel.Weak_Chain, last_audited_at=None,
+ refreshed_at=None, encr_used=False, sign_used=False):
self.address = address
self.key_id = key_id
self.fingerprint = fingerprint
@@ -162,8 +185,10 @@ class EncryptionKey(object):
self.length = length
self.expiry_date = expiry_date
self.validation = validation
- self.first_seen_at = first_seen_at
self.last_audited_at = last_audited_at
+ self.refreshed_at = refreshed_at
+ self.encr_used = encr_used
+ self.sign_used = sign_used
def get_json(self):
"""
@@ -172,25 +197,44 @@ class EncryptionKey(object):
:return: The JSON string describing this key.
:rtype: str
"""
- expiry_str = ""
- if self.expiry_date is not None:
- expiry_str = self.expiry_date.strftime("%s")
+ expiry_date = _to_unix_time(self.expiry_date)
+ last_audited_at = _to_unix_time(self.last_audited_at)
+ refreshed_at = _to_unix_time(self.refreshed_at)
return json.dumps({
KEY_ADDRESS_KEY: self.address,
- KEY_TYPE_KEY: str(self.__class__),
+ KEY_TYPE_KEY: self.__class__.__name__,
KEY_ID_KEY: self.key_id,
KEY_FINGERPRINT_KEY: self.fingerprint,
KEY_DATA_KEY: self.key_data,
KEY_PRIVATE_KEY: self.private,
KEY_LENGTH_KEY: self.length,
- KEY_EXPIRY_DATE_KEY: expiry_str,
+ KEY_EXPIRY_DATE_KEY: expiry_date,
+ KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+ KEY_REFRESHED_AT_KEY: refreshed_at,
KEY_VALIDATION_KEY: str(self.validation),
- KEY_FIRST_SEEN_AT_KEY: self.first_seen_at,
- KEY_LAST_AUDITED_AT_KEY: self.last_audited_at,
+ KEY_ENCR_USED_KEY: self.encr_used,
+ KEY_SIGN_USED_KEY: self.sign_used,
KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
})
+ def get_active_json(self, address):
+ """
+ Return a JSON string describing this key.
+
+ :param address: Address for wich the key is active
+ :type address: str
+ :return: The JSON string describing this key.
+ :rtype: str
+ """
+ return json.dumps({
+ KEY_ADDRESS_KEY: address,
+ KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE,
+ KEY_ID_KEY: self.key_id,
+ KEY_PRIVATE_KEY: self.private,
+ KEY_TAGS_KEY: [KEYMANAGER_ACTIVE_TAG],
+ })
+
def __repr__(self):
"""
Representation of this class
@@ -266,12 +310,14 @@ class EncryptionScheme(object):
pass
@abstractmethod
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put a key in local storage.
:param key: The key to be stored.
:type key: EncryptionKey
+ :param address: address for which this key will be active.
+ :type address: str
"""
pass
@@ -365,7 +411,7 @@ class EncryptionScheme(object):
verified against this sdetached signature.
:type detached_sig: str
- :return: The signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
pass
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index d3c305e2..3f298f71 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -36,12 +36,14 @@ from leap.keymanager.keys import (
EncryptionScheme,
is_address,
build_key_from_dict,
- KEYMANAGER_KEY_TAG,
- TAGS_ADDRESS_PRIVATE_INDEX,
+ TYPE_ID_PRIVATE_INDEX,
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ KEY_ADDRESS_KEY,
KEY_FINGERPRINT_KEY,
KEY_DATA_KEY,
+ KEY_ID_KEY,
+ KEYMANAGER_ACTIVE_TYPE,
)
-from leap.keymanager.validation import ValidationLevel
logger = logging.getLogger(__name__)
@@ -109,9 +111,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address, privkeys)
+ privids = map(lambda privkey: privkey.key_id, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.address not in privaddrs, publkeys)
+ lambda pubkey: pubkey.key_id not in privids, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -162,16 +164,13 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.homedir)
-def _build_key_from_gpg(address, key, key_data):
+def _build_key_from_gpg(key, key_data):
"""
- Build an OpenPGPKey for C{address} based on C{key} from
- local gpg storage.
+ Build an OpenPGPKey based on C{key} from local gpg storage.
ASCII armored GPG key data has to be queried independently in this
wrapper, so we receive it in C{key_data}.
- :param address: The address bound to the key.
- :type address: str
:param key: Key obtained from GPG storage.
:type key: dict
:param key_data: Key data obtained from GPG storage.
@@ -182,6 +181,9 @@ def _build_key_from_gpg(address, key, key_data):
expiry_date = None
if key['expires']:
expiry_date = datetime.fromtimestamp(int(key['expires']))
+ address = []
+ for uid in key['uids']:
+ address.append(_parse_address(uid))
return OpenPGPKey(
address,
@@ -189,12 +191,28 @@ def _build_key_from_gpg(address, key, key_data):
fingerprint=key['fingerprint'],
key_data=key_data,
private=True if key['type'] == 'sec' else False,
- length=key['length'],
+ length=int(key['length']),
expiry_date=expiry_date,
- validation=ValidationLevel.Weak_Chain,
+ refreshed_at=datetime.now(),
)
+def _parse_address(address):
+ """
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
+
+ :type address: str
+ :rtype: str
+ """
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
+
+
#
# The OpenPGP wrapper
#
@@ -211,6 +229,10 @@ class OpenPGPScheme(EncryptionScheme):
signing and verification).
"""
+ # type used on the soledad documents
+ KEY_TYPE = OpenPGPKey.__name__
+ ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
+
def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
@@ -272,16 +294,18 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(
len(key['uids']) is 1, # with just one uid!
'Wrong number of uids for key: %d.' % len(key['uids']))
- leap_assert(
- re.match('.*<%s>$' % address, key['uids'][0]) is not None,
- 'Key not correctly bound to address.')
+ uid_match = False
+ for uid in key['uids']:
+ if re.match('.*<%s>$' % address, uid) is not None:
+ uid_match = True
+ return
+ leap_assert(uid_match, 'Key not correctly bound to address.')
# insert both public and private keys in storage
for secret in [True, False]:
key = gpg.list_keys(secret=secret).pop()
openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint'], secret=secret))
- self.put_key(openpgp_key)
+ key, gpg.export_keys(key['fingerprint'], secret=secret))
+ self.put_key(openpgp_key, address)
return self.get_key(address, private=True)
@@ -298,15 +322,15 @@ class OpenPGPScheme(EncryptionScheme):
:rtype: OpenPGPKey
@raise KeyNotFound: If the key was not found on local storage.
"""
- # Remove the identity suffix after the '+' until the '@'
- # e.g.: test_user+something@provider.com becomes test_user@provider.com
- # since the key belongs to the identity without the '+' suffix.
- address = re.sub(r'\+.*\@', '@', address)
+ address = _parse_address(address)
doc = self._get_key_doc(address, private)
if doc is None:
raise errors.KeyNotFound(address)
- return build_key_from_dict(OpenPGPKey, address, doc.content)
+ leap_assert(
+ address in doc.content[KEY_ADDRESS_KEY],
+ 'Wrong address in key data.')
+ return build_key_from_dict(OpenPGPKey, doc.content)
def parse_ascii_key(self, key_data):
"""
@@ -323,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(key_data, (str, unicode))
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
with self._temporary_gpgwrapper() as gpg:
# TODO: inspect result, or use decorator
@@ -340,42 +363,31 @@ class OpenPGPScheme(EncryptionScheme):
except IndexError:
return (None, None)
- # extract adress from first uid on key
- match = re.match(mail_regex, pubkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- address = match.group(1)
-
openpgp_privkey = None
if privkey is not None:
- match = re.match(mail_regex, privkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- privaddress = match.group(1)
-
# build private key
openpgp_privkey = _build_key_from_gpg(
- privaddress, privkey,
+ privkey,
gpg.export_keys(privkey['fingerprint'], secret=True))
-
- leap_check(address == privaddress,
- 'Addresses in public and private key differ.',
- errors.KeyAddressMismatch)
leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
'Fingerprints for public and private key differ.',
errors.KeyFingerprintMismatch)
# build public key
openpgp_pubkey = _build_key_from_gpg(
- address, pubkey,
+ pubkey,
gpg.export_keys(pubkey['fingerprint'], secret=False))
return (openpgp_pubkey, openpgp_privkey)
- def put_ascii_key(self, key_data):
+ def put_ascii_key(self, key_data, address):
"""
Put key contained in ascii-armored C{key_data} in local storage.
:param key_data: The key data to be stored.
:type key_data: str or unicode
+ :param address: address for which this key will be active
+ :type address: str
"""
leap_assert_type(key_data, (str, unicode))
@@ -386,21 +398,35 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(False, repr(e))
if openpgp_pubkey is not None:
- self.put_key(openpgp_pubkey)
+ self.put_key(openpgp_pubkey, address)
if openpgp_privkey is not None:
- self.put_key(openpgp_privkey)
+ self.put_key(openpgp_privkey, address)
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored.
:type key: OpenPGPKey
+ :param address: address for which this key will be active.
+ :type address: str
"""
- doc = self._get_key_doc(key.address, private=key.private)
- if doc is None:
- self._soledad.create_doc_from_json(key.get_json())
- else:
+ self._put_key_doc(key)
+ self._put_active_doc(key, address)
+
+ def _put_key_doc(self, key):
+ """
+ Put key document in soledad
+
+ :type key: OpenPGPKey
+ """
+ docs = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ if len(docs) != 0:
+ doc = docs.pop()
if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:
# in case of an update of the key merge them with gnupg
with self._temporary_gpgwrapper() as gpg:
@@ -408,11 +434,43 @@ class OpenPGPScheme(EncryptionScheme):
gpg.import_keys(key.key_data)
gpgkey = gpg.list_keys(secret=key.private).pop()
key = _build_key_from_gpg(
- key.address, gpgkey,
+ gpgkey,
gpg.export_keys(gpgkey['fingerprint'],
secret=key.private))
- doc.set_json(key.get_json())
+ doc.set_json(key.get_json())
+ self._soledad.put_doc(doc)
+ else:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY]))
+ else:
+ self._soledad.create_doc_from_json(key.get_json())
+
+ def _put_active_doc(self, key, address):
+ """
+ Put active key document in soledad
+
+ :type key: OpenPGPKey
+ :type addresses: str
+ """
+ docs = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if key.private else '0')
+ if len(docs) == 1:
+ doc = docs.pop()
+ doc.set_json(key.get_active_json(address))
self._soledad.put_doc(doc)
+ else:
+ if len(docs) > 1:
+ logger.error("There is more than one active key document "
+ "for the address %s" % (address,))
+ for doc in docs:
+ self._soledad.delete_doc(doc)
+ self._soledad.create_doc_from_json(
+ key.get_active_json(address))
def _get_key_doc(self, address, private=False):
"""
@@ -427,17 +485,26 @@ class OpenPGPScheme(EncryptionScheme):
:return: The document with the key or None if it does not exist.
:rtype: leap.soledad.document.SoledadDocument
"""
- doclist = self._soledad.get_from_index(
- TAGS_ADDRESS_PRIVATE_INDEX,
- KEYMANAGER_KEY_TAG,
+ activedoc = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
address,
'1' if private else '0')
- if len(doclist) is 0:
+ if len(activedoc) is 0:
return None
leap_assert(
+ len(activedoc) is 1,
+ 'Found more than one key for address %s!' % (address,))
+
+ key_id = activedoc[0].content[KEY_ID_KEY]
+ doclist = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key_id,
+ '1' if private else '0')
+ leap_assert(
len(doclist) is 1,
- 'Found more than one %s key for address!' %
- 'private' if private else 'public')
+ 'There is %d keys for id %s!' % (len(doclist), key_id))
return doclist.pop()
def delete_key(self, key):
@@ -446,18 +513,37 @@ class OpenPGPScheme(EncryptionScheme):
May raise:
errors.KeyNotFound
- errors.KeyAttributesDiffer
:param key: The key to be removed.
:type key: EncryptionKey
"""
leap_assert_type(key, OpenPGPKey)
- stored_key = self.get_key(key.address, private=key.private)
- if stored_key is None:
+ activedocs = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ for doc in activedocs:
+ self._soledad.delete_doc(doc)
+
+ docs = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ if len(docs) == 0:
+ raise errors.KeyNotFound(key)
+ if len(docs) > 1:
+ logger.critical("There is more than one key for key_id %s"
+ % key.key_id)
+
+ doc = None
+ for d in docs:
+ if d.content['fingerprint'] == key.fingerprint:
+ doc = d
+ break
+ if doc is None:
raise errors.KeyNotFound(key)
- if stored_key.__dict__ != key.__dict__:
- raise errors.KeyAttributesDiffer(key)
- doc = self._get_key_doc(key.address, key.private)
self._soledad.delete_doc(doc)
#
@@ -656,8 +742,8 @@ class OpenPGPScheme(EncryptionScheme):
verified against this detached signature.
:type detached_sig: str
- :return: The ascii-armored signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
leap_assert_type(pubkey, OpenPGPKey)
leap_assert(pubkey.private is False)
diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py
index 6a877bcf..6aeb67a8 100644
--- a/keymanager/src/leap/keymanager/tests/test_keymanager.py
+++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py
@@ -21,6 +21,7 @@ Tests for the Key Manager.
"""
+from datetime import datetime
from mock import Mock
from leap.common.testing.basetest import BaseLeapTest
from leap.keymanager import (
@@ -75,18 +76,20 @@ class KeyManagerUtilTestCase(BaseLeapTest):
def test_build_key_from_dict(self):
kdict = {
- 'address': ADDRESS,
- 'key_id': 'key_id',
- 'fingerprint': 'fingerprint',
- 'key_data': 'key_data',
- 'private': 'private',
- 'length': 'length',
- 'expiry_date': '',
- 'first_seen_at': 'first_seen_at',
- 'last_audited_at': 'last_audited_at',
+ 'address': [ADDRESS],
+ 'key_id': KEY_FINGERPRINT[-16:],
+ 'fingerprint': KEY_FINGERPRINT,
+ 'key_data': PUBLIC_KEY,
+ 'private': False,
+ 'length': 4096,
+ 'expiry_date': 0,
+ 'last_audited_at': 0,
+ 'refreshed_at': 1311239602,
'validation': str(ValidationLevel.Weak_Chain),
+ 'encr_used': False,
+ 'sign_used': True,
}
- key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
+ key = build_key_from_dict(OpenPGPKey, kdict)
self.assertEqual(
kdict['address'], key.address,
'Wrong data in key.')
@@ -109,14 +112,20 @@ class KeyManagerUtilTestCase(BaseLeapTest):
None, key.expiry_date,
'Wrong data in key.')
self.assertEqual(
- kdict['first_seen_at'], key.first_seen_at,
+ None, key.last_audited_at,
'Wrong data in key.')
self.assertEqual(
- kdict['last_audited_at'], key.last_audited_at,
+ datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
'Wrong data in key.')
self.assertEqual(
toValidationLevel(kdict['validation']), key.validation,
'Wrong data in key.')
+ self.assertEqual(
+ kdict['encr_used'], key.encr_used,
+ 'Wrong data in key.')
+ self.assertEqual(
+ kdict['sign_used'], key.sign_used,
+ 'Wrong data in key.')
class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -127,15 +136,15 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
key = pgp.gen_key('user@leap.se')
self.assertIsInstance(key, openpgp.OpenPGPKey)
self.assertEqual(
- 'user@leap.se', key.address, 'Wrong address bound to key.')
+ ['user@leap.se'], key.address, 'Wrong address bound to key.')
self.assertEqual(
- '4096', key.length, 'Wrong key length.')
+ 4096, key.length, 'Wrong key length.')
def test_openpgp_put_delete_key(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
key = pgp.get_key(ADDRESS, private=False)
pgp.delete_key(key)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
@@ -144,13 +153,13 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
key = pgp.get_key(ADDRESS, private=False)
self.assertIsInstance(key, openpgp.OpenPGPKey)
+ self.assertTrue(
+ ADDRESS in key.address, 'Wrong address bound to key.')
self.assertEqual(
- ADDRESS, key.address, 'Wrong address bound to key.')
- self.assertEqual(
- '4096', key.length, 'Wrong key length.')
+ 4096, key.length, 'Wrong key length.')
pgp.delete_key(key)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
@@ -158,11 +167,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
key = pgp.get_key(ADDRESS, private=False)
- self.assertEqual(ADDRESS, key.address)
+ self.assertTrue(ADDRESS in key.address)
self.assertFalse(key.private)
self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
pgp.delete_key(key)
@@ -172,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
# encrypt
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
pubkey = pgp.get_key(ADDRESS, private=False)
cyphertext = pgp.encrypt('data', pubkey)
# assert
@@ -184,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
# decrypt
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
privkey = pgp.get_key(ADDRESS, private=True)
pgp.delete_key(pubkey)
pgp.delete_key(privkey)
@@ -207,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_sign_with_public_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
data = 'data'
pubkey = pgp.get_key(ADDRESS, private=False)
self.assertRaises(
@@ -217,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_verify_with_wrong_key_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
signed = pgp.sign(data, privkey)
- pgp.put_ascii_key(PUBLIC_KEY_2)
+ pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
@@ -230,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_encrypt_sign_with_public_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
@@ -241,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_decrypt_verify_with_private_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
@@ -255,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_decrypt_verify_with_wrong_key_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
- pgp.put_ascii_key(PUBLIC_KEY_2)
+ pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
@@ -269,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_sign_verify(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
signed = pgp.sign(data, privkey, detach=False)
@@ -279,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_encrypt_sign_decrypt_verify(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
pubkey = pgp.get_key(ADDRESS, private=False)
privkey = pgp.get_key(ADDRESS, private=True)
- pgp.put_ascii_key(PRIVATE_KEY_2)
+ pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2)
pubkey2 = pgp.get_key(ADDRESS_2, private=False)
privkey2 = pgp.get_key(ADDRESS_2, private=True)
data = 'data'
@@ -295,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_sign_verify_detached_sig(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
signature = pgp.sign(data, privkey, detach=True)
@@ -307,38 +316,38 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_get_all_keys_in_db(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get public keys
keys = km.get_all_keys(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual(ADDRESS, keys[0].address)
+ self.assertTrue(ADDRESS in keys[0].address)
self.assertFalse(keys[0].private)
# get private keys
keys = km.get_all_keys(True)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual(ADDRESS, keys[0].address)
+ self.assertTrue(ADDRESS in keys[0].address)
self.assertTrue(keys[0].private)
def test_get_public_key(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get the key
key = km.get_key(ADDRESS, OpenPGPKey, private=False,
fetch_remote=False)
self.assertTrue(key is not None)
- self.assertEqual(key.address, ADDRESS)
+ self.assertTrue(ADDRESS in key.address)
self.assertEqual(
key.fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertFalse(key.private)
def test_get_private_key(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get the key
key = km.get_key(ADDRESS, OpenPGPKey, private=True,
fetch_remote=False)
self.assertTrue(key is not None)
- self.assertEqual(key.address, ADDRESS)
+ self.assertTrue(ADDRESS in key.address)
self.assertEqual(
key.fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertTrue(key.private)
@@ -355,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
token = "mytoken"
km = self._key_manager(token=token)
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
km._fetcher.put = Mock()
# the following data will be used on the send
km.ca_cert_path = 'capath'
@@ -430,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
# try to get key fetching from server.
key = km.get_key(ADDRESS, OpenPGPKey)
self.assertIsInstance(key, OpenPGPKey)
- self.assertEqual(ADDRESS, key.address)
+ self.assertTrue(ADDRESS in key.address)
def test_put_key_ascii(self):
"""
@@ -438,10 +447,10 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url='http://nickserver.domain')
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey)
self.assertIsInstance(key, OpenPGPKey)
- self.assertEqual(ADDRESS, key.address)
+ self.assertTrue(ADDRESS in key.address)
def test_fetch_uri_ascii_key(self):
"""
@@ -500,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_keymanager_openpgp_encrypt_decrypt(self):
km = self._key_manager()
# put raw private key
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get public key
pubkey = km.get_key(
ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
@@ -517,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_keymanager_openpgp_sign_verify(self):
km = self._key_manager()
# put raw private keys
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get private key for signing
privkey = km.get_key(
ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
diff --git a/keymanager/src/leap/keymanager/tests/test_validation.py b/keymanager/src/leap/keymanager/tests/test_validation.py
index 3ae873d4..400d36e8 100644
--- a/keymanager/src/leap/keymanager/tests/test_validation.py
+++ b/keymanager/src/leap/keymanager/tests/test_validation.py
@@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
def test_none_old_key(self):
km = self._key_manager()
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
def test_cant_upgrade(self):
km = self._key_manager()
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey,
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevel.Provider_Trust)
self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY,
- OpenPGPKey)
+ OpenPGPKey, ADDRESS)
def test_fingerprint_level(self):
km = self._key_manager()
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
- km.put_raw_key(UNRELATED_KEY, OpenPGPKey,
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+ km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevel.Fingerprint)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
def test_expired_key(self):
km = self._key_manager()
- km.put_raw_key(EXPIRED_KEY, OpenPGPKey)
- km.put_raw_key(UNRELATED_KEY, OpenPGPKey)
+ km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
+ km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
def test_expired_fail_lower_level(self):
km = self._key_manager()
- km.put_raw_key(EXPIRED_KEY, OpenPGPKey,
+ km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevel.Third_Party_Endorsement)
self.assertRaises(
KeyNotValidUpgrade,
km.put_raw_key,
UNRELATED_KEY,
OpenPGPKey,
+ ADDRESS,
validation=ValidationLevel.Provider_Trust)
def test_roll_back(self):
km = self._key_manager()
- km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey)
- km.put_raw_key(EXPIRED_KEY, OpenPGPKey)
+ km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS)
+ km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)
diff --git a/keymanager/src/leap/keymanager/validation.py b/keymanager/src/leap/keymanager/validation.py
index cf5b4a83..245013e5 100644
--- a/keymanager/src/leap/keymanager/validation.py
+++ b/keymanager/src/leap/keymanager/validation.py
@@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key):
if old_key is None:
return True
- if new_key.address != old_key.address:
- # XXX how do we map multiple IDs? (#6212)
- return False
-
# An update of the same key
if new_key.fingerprint == old_key.fingerprint:
return True