summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2016-05-01 15:23:15 -0300
committerRuben Pollan <meskio@sindominio.net>2016-06-01 18:49:15 +0200
commit4612ba27e83e478baac5a5f8b3037608774934d6 (patch)
treeba396dbd33acfaae5db83ca9890c729df068d41f
parente7583ac12e0d594bb0be648b62a9d58eff3bf631 (diff)
[feat] remove the keytype support
- Resolves: #8031
-rw-r--r--changes/next-changelog.txt2
-rw-r--r--src/leap/keymanager/__init__.py161
-rw-r--r--src/leap/keymanager/tests/__init__.py5
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py147
-rw-r--r--src/leap/keymanager/tests/test_validation.py94
5 files changed, 154 insertions, 255 deletions
diff --git a/changes/next-changelog.txt b/changes/next-changelog.txt
index 09b2e71..7fad645 100644
--- a/changes/next-changelog.txt
+++ b/changes/next-changelog.txt
@@ -10,6 +10,8 @@ I've added a new category `Misc` so we can track doc/style/packaging stuff.
Features
~~~~~~~~
+- `#8031 <https://leap.se/code/issues/8031>`_: Remove support for multiple key types.
+
- `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234.
- New feature without related issue number.
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index aa0a9ac..f72f403 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -71,9 +71,7 @@ from leap.common.decorators import memoized_method
from leap.keymanager.errors import (
KeyNotFound,
- KeyAddressMismatch,
KeyNotValidUpgrade,
- UnsupportedKeyTypeError,
InvalidSignature
)
from leap.keymanager.validation import ValidationLevels, can_upgrade
@@ -138,11 +136,7 @@ class KeyManager(object):
self.api_uri = api_uri
self.api_version = api_version
self.uid = uid
- # a dict to map key types to their handlers
- self._wrapper_map = {
- OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary),
- # other types of key will be added to this mapper.
- }
+ self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary)
# the following are used to perform https requests
self._fetcher = requests
self._combined_ca_bundle = self._create_combined_bundle_file()
@@ -184,20 +178,6 @@ class KeyManager(object):
return tmp_file.name
- def _key_class_from_type(self, ktype):
- """
- Given a class type, return a class
-
- :param ktype: string representation of a class name
- :type ktype: str
-
- :return: A class with the matching name
- :rtype: classobj or type
- """
- return filter(
- lambda klass: klass.__name__ == ktype,
- self._wrapper_map).pop()
-
@defer.inlineCallbacks
def _get_key_from_nicknym(self, address):
"""
@@ -349,7 +329,6 @@ class KeyManager(object):
yield self.put_raw_key(
server_keys['openpgp'],
- OpenPGPKey,
address=address,
validation=validation_level)
@@ -357,24 +336,19 @@ class KeyManager(object):
# key management
#
- def send_key(self, ktype):
+ def send_key(self):
"""
- Send user's key of type ktype to provider.
+ Send user's key to provider.
Public key bound to user's is sent to provider, which will sign it and
replace any prior keys for the same address in its database.
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
-
:return: A Deferred which fires when the key is sent, or which fails
with KeyNotFound if the key was not found in local database.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
-
def send(pubkey):
data = {
self.PUBKEY_KEY: pubkey.key_data
@@ -390,40 +364,33 @@ class KeyManager(object):
return d
d = self.get_key(
- self._address, ktype, private=False, fetch_remote=False)
+ self._address, private=False, fetch_remote=False)
d.addCallback(send)
return d
- def get_key(self, address, ktype, private=False, fetch_remote=True):
+ def get_key(self, address, private=False, fetch_remote=True):
"""
- Return a key of type ktype bound to address.
+ Return a key bound to address.
First, search for the key in local storage. If it is not available,
then try to fetch from nickserver.
:param address: The address bound to the key.
:type address: str
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param private: Look for a private key instead of a public one?
:type private: bool
:param fetch_remote: If key not found in local storage try to fetch
from nickserver
:type fetch_remote: bool
- :return: A Deferred which fires with an EncryptionKey of type ktype
- bound to address, or which fails with KeyNotFound if no key
- was found neither locally or in keyserver.
+ :return: A Deferred which fires with an EncryptionKey bound to address,
+ or which fails with KeyNotFound if no key was found neither
+ locally or in keyserver.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
logger.debug("getting key for %s" % (address,))
- leap_assert(
- ktype in self._wrapper_map,
- 'Unkown key type: %s.' % str(ktype))
- _keys = self._wrapper_map[ktype]
emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
@@ -445,12 +412,12 @@ class KeyManager(object):
emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
d = self._fetch_keys_from_server(address)
d.addCallback(
- lambda _: _keys.get_key(address, private=False))
+ lambda _: self._openpgp.get_key(address, private=False))
d.addCallback(key_found)
return d
# return key if it exists in local database
- d = _keys.get_key(address, private=private)
+ d = self._openpgp.get_key(address, private=private)
d.addCallbacks(key_found, key_not_found)
return d
@@ -467,9 +434,7 @@ class KeyManager(object):
# TODO: should it be based on activedocs?
def build_keys(docs):
return map(
- lambda doc: build_key_from_dict(
- self._key_class_from_type(doc.content['type']),
- doc.content),
+ lambda doc: build_key_from_dict(OpenPGPKey, doc.content),
docs)
# XXX: there is no check that the soledad indexes are ready, as it
@@ -484,20 +449,15 @@ class KeyManager(object):
d.addCallback(build_keys)
return d
- def gen_key(self, ktype):
+ def gen_key(self):
"""
- Generate a key of type ktype bound to the user's address.
-
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
+ Generate a key bound to the user's address.
:return: A Deferred which fires with the generated EncryptionKey.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def signal_finished(key):
emit_async(
@@ -506,7 +466,7 @@ class KeyManager(object):
emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
- d = _keys.gen_key(self._address)
+ d = self.openpg.gen_key(self._address)
d.addCallback(signal_finished)
return d
@@ -564,7 +524,7 @@ class KeyManager(object):
# encrypt/decrypt and sign/verify API
#
- def encrypt(self, data, address, ktype, passphrase=None, sign=None,
+ def encrypt(self, data, address, passphrase=None, sign=None,
cipher_algo='AES256', fetch_remote=True):
"""
Encrypt data with the public key bound to address and sign with with
@@ -574,8 +534,6 @@ class KeyManager(object):
:type data: str
:param address: The address to encrypt it for.
:type address: str
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param passphrase: The passphrase for the secret key used for the
signature.
:type passphrase: str
@@ -595,30 +553,28 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
@defer.inlineCallbacks
def encrypt(keys):
pubkey, signkey = keys
- encrypted = yield _keys.encrypt(
+ encrypted = yield self._openpgp.encrypt(
data, pubkey, passphrase, sign=signkey,
cipher_algo=cipher_algo)
if not pubkey.encr_used:
pubkey.encr_used = True
- yield _keys.put_key(pubkey)
+ yield self._openpgp.put_key(pubkey)
defer.returnValue(encrypted)
- dpub = self.get_key(address, ktype, private=False,
+ dpub = self.get_key(address, private=False,
fetch_remote=fetch_remote)
dpriv = defer.succeed(None)
if sign is not None:
- dpriv = self.get_key(sign, ktype, private=True)
+ dpriv = self.get_key(sign, private=True)
d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
d.addCallbacks(encrypt, self._extract_first_error)
return d
- def decrypt(self, data, address, ktype, passphrase=None, verify=None,
+ def decrypt(self, data, address, passphrase=None, verify=None,
fetch_remote=True):
"""
Decrypt data using private key from address and verify with public key
@@ -628,8 +584,6 @@ class KeyManager(object):
:type data: str
:param address: The address to whom data was encrypted.
:type address: str
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param passphrase: The passphrase for the secret key used for
decryption.
:type passphrase: str
@@ -649,13 +603,11 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
@defer.inlineCallbacks
def decrypt(keys):
pubkey, privkey = keys
- decrypted, signed = yield _keys.decrypt(
+ decrypted, signed = yield self._openpgp.decrypt(
data, privkey, passphrase=passphrase, verify=pubkey)
if pubkey is None:
signature = KeyNotFound(verify)
@@ -663,7 +615,7 @@ class KeyManager(object):
signature = pubkey
if not pubkey.sign_used:
pubkey.sign_used = True
- yield _keys.put_key(pubkey)
+ yield self._openpgp.put_key(pubkey)
defer.returnValue((decrypted, signature))
else:
signature = InvalidSignature(
@@ -671,10 +623,10 @@ class KeyManager(object):
(pubkey.fingerprint,))
defer.returnValue((decrypted, signature))
- dpriv = self.get_key(address, ktype, private=True)
+ dpriv = self.get_key(address, private=True)
dpub = defer.succeed(None)
if verify is not None:
- dpub = self.get_key(verify, ktype, private=False,
+ dpub = self.get_key(verify, private=False,
fetch_remote=fetch_remote)
dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f)
d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
@@ -684,7 +636,7 @@ class KeyManager(object):
def _extract_first_error(self, failure):
return failure.value.subFailure
- def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False,
+ def sign(self, data, address, digest_algo='SHA512', clearsign=False,
detach=True, binary=False):
"""
Sign data with private key bound to address.
@@ -693,8 +645,6 @@ class KeyManager(object):
:type data: str
:param address: The address to be used to sign.
:type address: EncryptionKey
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param digest_algo: The hash digest to use.
:type digest_algo: str
:param clearsign: If True, create a cleartext signature.
@@ -712,19 +662,17 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def sign(privkey):
- return _keys.sign(
+ return self._openpgp.sign(
data, privkey, digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
- d = self.get_key(address, ktype, private=True)
+ d = self.get_key(address, private=True)
d.addCallback(sign)
return d
- def verify(self, data, address, ktype, detached_sig=None,
+ def verify(self, data, address, detached_sig=None,
fetch_remote=True):
"""
Verify signed data with private key bound to address, eventually using
@@ -734,8 +682,6 @@ class KeyManager(object):
:type data: str
:param address: The address to be used to verify.
:type address: EncryptionKey
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param detached_sig: A detached signature. If given, C{data} is
verified using this detached signature.
:type detached_sig: str
@@ -751,16 +697,14 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def verify(pubkey):
- signed = _keys.verify(
+ signed = self._openpgp.verify(
data, pubkey, detached_sig=detached_sig)
if signed:
if not pubkey.sign_used:
pubkey.sign_used = True
- d = _keys.put_key(pubkey)
+ d = self._openpgp.put_key(pubkey)
d.addCallback(lambda _: pubkey)
return d
return pubkey
@@ -769,7 +713,7 @@ class KeyManager(object):
'Failed to verify signature with key %s' %
(pubkey.fingerprint,))
- d = self.get_key(address, ktype, private=False,
+ d = self.get_key(address, private=False,
fetch_remote=fetch_remote)
d.addCallback(verify)
return d
@@ -787,9 +731,7 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(type(key))
- _keys = self._wrapper_map[type(key)]
- return _keys.delete_key(key)
+ return self._openpgp.delete_key(key)
def put_key(self, key):
"""
@@ -805,9 +747,6 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- ktype = type(key)
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def old_key_not_found(failure):
if failure.check(KeyNotFound):
@@ -817,26 +756,24 @@ class KeyManager(object):
def check_upgrade(old_key):
if key.private or can_upgrade(key, old_key):
- return _keys.put_key(key)
+ return self._openpgp.put_key(key)
else:
raise KeyNotValidUpgrade(
"Key %s can not be upgraded by new key %s"
% (old_key.fingerprint, key.fingerprint))
- d = _keys.get_key(key.address, private=key.private)
+ d = self._openpgp.get_key(key.address, private=key.private)
d.addErrback(old_key_not_found)
d.addCallback(check_upgrade)
return d
- def put_raw_key(self, key, ktype, address,
+ def put_raw_key(self, key, address,
validation=ValidationLevels.Weak_Chain):
"""
Put raw key bound to address in local storage.
:param key: The ascii key to be stored
:type key: str
- :param ktype: the type of the key.
- :type ktype: subclass of EncryptionKey
:param address: address for which this key will be active
:type address: str
:param validation: validation level for this key
@@ -853,10 +790,7 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
-
- pubkey, privkey = _keys.parse_key(key, address)
+ pubkey, privkey = self._openpgp.parse_key(key, address)
if pubkey is None:
return defer.fail(KeyNotFound(key))
@@ -868,8 +802,7 @@ class KeyManager(object):
return d
@defer.inlineCallbacks
- def fetch_key(self, address, uri, ktype,
- validation=ValidationLevels.Weak_Chain):
+ def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain):
"""
Fetch a public key bound to address from the network and put it in
local storage.
@@ -878,8 +811,6 @@ class KeyManager(object):
:type address: str
:param uri: The URI of the key.
:type uri: str
- :param ktype: the type of the key.
- :type ktype: subclass of EncryptionKey
:param validation: validation level for this key
(default: 'Weak_Chain')
:type validation: ValidationLevels
@@ -893,32 +824,18 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
logger.info("Fetch key for %s from %s" % (address, uri))
ascii_content = yield self._get_with_combined_ca_bundle(uri)
# XXX parse binary keys
- pubkey, _ = _keys.parse_key(ascii_content, address)
+ pubkey, _ = self._openpgp.parse_key(ascii_content, address)
if pubkey is None:
raise KeyNotFound(uri)
pubkey.validation = validation
yield self.put_key(pubkey)
- def _assert_supported_key_type(self, ktype):
- """
- Check if ktype is one of the supported key types
-
- :param ktype: the type of the key.
- :type ktype: subclass of EncryptionKey
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
- if ktype not in self._wrapper_map:
- raise UnsupportedKeyTypeError(str(ktype))
-
def _split_email(address):
"""
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index 2a6a3f1..a20e1fd 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -27,7 +27,6 @@ from twisted.trial import unittest
from leap.common.testing.basetest import BaseLeapTest
from leap.soledad.client import Soledad
from leap.keymanager import KeyManager
-from leap.keymanager.openpgp import OpenPGPKey
PATH = os.path.dirname(os.path.realpath(__file__))
@@ -56,7 +55,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
km = self._key_manager()
# wait for the indexes to be ready for the tear down
- d = km._wrapper_map[OpenPGPKey].deferred_init
+ d = km._openpgp.deferred_init
d.addCallback(lambda _: self.delete_all_keys(km))
d.addCallback(lambda _: self.tearDownEnv())
d.addCallback(lambda _: self._soledad.close())
@@ -66,7 +65,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
def delete_keys(keys):
deferreds = []
for key in keys:
- d = km._wrapper_map[key.__class__].delete_key(key)
+ d = km._openpgp.delete_key(key)
deferreds.append(d)
return gatherResults(deferreds)
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 14f47f6..c55a3c3 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -34,13 +34,9 @@ from twisted.web._responses import NOT_FOUND
from leap.keymanager import client
-from leap.keymanager import (
- KeyNotFound,
- KeyAddressMismatch,
- errors
-)
-from leap.keymanager.openpgp import OpenPGPKey
+from leap.keymanager import errors
from leap.keymanager.keys import (
+ OpenPGPKey,
is_address,
build_key_from_dict,
)
@@ -140,7 +136,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
@defer.inlineCallbacks
def test_get_all_keys_in_db(self):
km = self._key_manager()
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
# get public keys
keys = yield km.get_all_keys(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
@@ -155,10 +151,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
@defer.inlineCallbacks
def test_get_public_key(self):
km = self._key_manager()
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
# get the key
- key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
- fetch_remote=False)
+ key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)
self.assertTrue(key is not None)
self.assertTrue(ADDRESS in key.uids)
self.assertEqual(
@@ -168,10 +163,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
@defer.inlineCallbacks
def test_get_public_key_with_binary_private_key(self):
km = self._key_manager()
- yield km._wrapper_map[OpenPGPKey].put_raw_key(self.get_private_binary_key(), ADDRESS)
+ yield km._openpgp.put_raw_key(self.get_private_binary_key(), ADDRESS)
# get the key
- key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
- fetch_remote=False)
+ key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)
self.assertTrue(key is not None)
self.assertTrue(ADDRESS in key.uids)
self.assertEqual(
@@ -181,10 +175,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
@defer.inlineCallbacks
def test_get_private_key(self):
km = self._key_manager()
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
# get the key
- key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
- fetch_remote=False)
+ key = yield km.get_key(ADDRESS, private=True, fetch_remote=False)
self.assertTrue(key is not None)
self.assertTrue(ADDRESS in key.uids)
self.assertEqual(
@@ -193,8 +186,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_send_key_raises_key_not_found(self):
km = self._key_manager()
- d = km.send_key(OpenPGPKey)
- return self.assertFailure(d, KeyNotFound)
+ d = km.send_key()
+ return self.assertFailure(d, errors.KeyNotFound)
@defer.inlineCallbacks
def test_send_key(self):
@@ -203,7 +196,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
token = "mytoken"
km = self._key_manager(token=token)
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PUBLIC_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS)
km._async_client_pinned.request = Mock(return_value=defer.succeed(''))
# the following data will be used on the send
km.ca_cert_path = 'capath'
@@ -211,9 +204,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km.uid = 'myuid'
km.api_uri = 'apiuri'
km.api_version = 'apiver'
- yield km.send_key(OpenPGPKey)
+ yield km.send_key()
# setup expected args
- pubkey = yield km.get_key(km._address, OpenPGPKey)
+ pubkey = yield km.get_key(km._address)
data = urllib.urlencode({
km.PUBKEY_KEY: pubkey.key_data,
})
@@ -312,10 +305,10 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km._async_client_pinned.request = Mock(return_value=defer.succeed(None))
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
- d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
- d = self.assertFailure(d_fail, KeyNotFound)
+ d_fail = km.get_key(address, fetch_remote=False)
+ d = self.assertFailure(d_fail, errors.KeyNotFound)
# try to get key fetching from server.
- d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
+ d.addCallback(lambda _: km.get_key(address))
return d
@defer.inlineCallbacks
@@ -325,8 +318,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
- yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
- key = yield km.get_key(ADDRESS, OpenPGPKey)
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.uids)
@@ -337,8 +330,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
- yield km.put_raw_key(self.get_public_binary_key(), OpenPGPKey, ADDRESS)
- key = yield km.get_key(ADDRESS, OpenPGPKey)
+ yield km.put_raw_key(self.get_public_binary_key(), ADDRESS)
+ key = yield km.get_key(ADDRESS)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.uids)
@@ -353,8 +346,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
- yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
- key = yield km.get_key(ADDRESS, OpenPGPKey)
+ yield km.fetch_key(ADDRESS, "http://site.domain/key")
+ key = yield km.get_key(ADDRESS)
self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
@defer.inlineCallbacks
@@ -365,10 +358,11 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager()
- km._async_client.request = Mock(return_value=defer.succeed(self.get_public_binary_key()))
+ km._async_client.request = Mock(
+ return_value=defer.succeed(self.get_public_binary_key()))
- yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
- key = yield km.get_key(ADDRESS, OpenPGPKey)
+ yield km.fetch_key(ADDRESS, "http://site.domain/key")
+ key = yield km.get_key(ADDRESS)
self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
def test_fetch_uri_empty_key(self):
@@ -382,8 +376,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = ""
km._fetcher.get = Mock(return_value=Response())
- d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
- return self.assertFailure(d, KeyNotFound)
+ d = km.fetch_key(ADDRESS, "http://site.domain/key")
+ return self.assertFailure(d, errors.KeyNotFound)
def test_fetch_uri_address_differ(self):
"""
@@ -393,8 +387,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager()
km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
- d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
- return self.assertFailure(d, KeyAddressMismatch)
+ d = km.fetch_key(ADDRESS_2, "http://site.domain/key")
+ return self.assertFailure(d, errors.KeyAddressMismatch)
def _mock_get_response(self, km, body):
km._async_client.request = MagicMock(return_value=defer.succeed(body))
@@ -407,7 +401,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager(ca_cert_path=ca_cert_path)
get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
- yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
@@ -417,7 +411,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager(ca_cert_path=ca_cert_path)
get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
- yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
@@ -427,7 +421,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager(ca_cert_path=ca_cert_path)
get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
- yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
@@ -445,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager(ca_cert_path=ca_cert_path)
get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
- yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
# assert that combined bundle file is passed to get call
get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
@@ -470,16 +464,15 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_decrypt_updates_sign_used_for_signer(self):
# given
km = self._key_manager()
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
- yield km._wrapper_map[OpenPGPKey].put_raw_key(
- PRIVATE_KEY_2, ADDRESS_2)
- encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
- sign=ADDRESS_2, fetch_remote=False)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
yield km.decrypt(
- encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+ encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)
# when
- key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
+ key = yield km.get_key(ADDRESS_2, fetch_remote=False)
# then
self.assertEqual(True, key.sign_used)
@@ -488,18 +481,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_decrypt_does_not_update_sign_used_for_recipient(self):
# given
km = self._key_manager()
- yield km._wrapper_map[OpenPGPKey].put_raw_key(
- PRIVATE_KEY, ADDRESS)
- yield km._wrapper_map[OpenPGPKey].put_raw_key(
- PRIVATE_KEY_2, ADDRESS_2)
- encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
- sign=ADDRESS_2, fetch_remote=False)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
yield km.decrypt(
- encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+ encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)
# when
key = yield km.get_key(
- ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+ ADDRESS, private=False, fetch_remote=False)
# then
self.assertEqual(False, key.sign_used)
@@ -513,35 +504,32 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_keymanager_openpgp_encrypt_decrypt(self):
km = self._key_manager()
# put raw private key
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
- yield km._wrapper_map[OpenPGPKey].put_raw_key(
- PRIVATE_KEY_2, ADDRESS_2)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
# encrypt
- encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
- sign=ADDRESS_2, fetch_remote=False)
+ encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
self.assertNotEqual(self.RAW_DATA, encdata)
# decrypt
rawdata, signingkey = yield km.decrypt(
- encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+ encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)
self.assertEqual(self.RAW_DATA, rawdata)
- key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
- fetch_remote=False)
+ key = yield km.get_key(ADDRESS_2, private=False, fetch_remote=False)
self.assertEqual(signingkey.fingerprint, key.fingerprint)
@defer.inlineCallbacks
def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
km = self._key_manager()
# put raw keys
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
- yield km._wrapper_map[OpenPGPKey].put_raw_key(
- PRIVATE_KEY_2, ADDRESS_2)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
# encrypt
- encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
- sign=ADDRESS_2, fetch_remote=False)
+ encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
self.assertNotEqual(self.RAW_DATA, encdata)
# verify
rawdata, signingkey = yield km.decrypt(
- encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
+ encdata, ADDRESS, verify=ADDRESS, fetch_remote=False)
self.assertEqual(self.RAW_DATA, rawdata)
self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
@@ -549,24 +537,21 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_keymanager_openpgp_sign_verify(self):
km = self._key_manager()
# put raw private keys
- yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
- signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
- detach=False)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ signdata = yield km.sign(self.RAW_DATA, ADDRESS, detach=False)
self.assertNotEqual(self.RAW_DATA, signdata)
# verify
- signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
- fetch_remote=False)
- key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
- fetch_remote=False)
+ signingkey = yield km.verify(signdata, ADDRESS, fetch_remote=False)
+ key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)
self.assertEqual(signingkey.fingerprint, key.fingerprint)
def test_keymanager_encrypt_key_not_found(self):
km = self._key_manager()
- d = km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS)
+ d = km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
d.addCallback(
- lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
- sign=ADDRESS, fetch_remote=False))
- return self.assertFailure(d, KeyNotFound)
+ lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, sign=ADDRESS,
+ fetch_remote=False))
+ return self.assertFailure(d, errors.KeyNotFound)
if __name__ == "__main__":
import unittest
diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py
index 44d6e39..954b1df 100644
--- a/src/leap/keymanager/tests/test_validation.py
+++ b/src/leap/keymanager/tests/test_validation.py
@@ -22,7 +22,6 @@ import unittest
from datetime import datetime
from twisted.internet.defer import inlineCallbacks
-from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.errors import (
KeyNotValidUpgrade
)
@@ -43,64 +42,62 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
@inlineCallbacks
def test_none_old_key(self):
km = self._key_manager()
- yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
@inlineCallbacks
def test_cant_upgrade(self):
km = self._key_manager()
- yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS,
validation=ValidationLevels.Provider_Trust)
- d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
+ d = km.put_raw_key(UNRELATED_KEY, ADDRESS)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
def test_fingerprint_level(self):
km = self._key_manager()
- yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
- yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ yield km.put_raw_key(UNRELATED_KEY, ADDRESS,
validation=ValidationLevels.Fingerprint)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@inlineCallbacks
def test_expired_key(self):
km = self._key_manager()
- yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
- yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ yield km.put_raw_key(EXPIRED_KEY, ADDRESS)
+ yield km.put_raw_key(UNRELATED_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@inlineCallbacks
def test_expired_fail_lower_level(self):
km = self._key_manager()
yield km.put_raw_key(
- EXPIRED_KEY, OpenPGPKey, ADDRESS,
+ EXPIRED_KEY, ADDRESS,
validation=ValidationLevels.Third_Party_Endorsement)
d = km.put_raw_key(
- UNRELATED_KEY,
- OpenPGPKey,
- ADDRESS,
+ UNRELATED_KEY, ADDRESS,
validation=ValidationLevels.Provider_Trust)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
def test_roll_back(self):
km = self._key_manager()
- yield km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS)
- yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ yield km.put_raw_key(EXPIRED_KEY_UPDATED, ADDRESS)
+ yield km.put_raw_key(EXPIRED_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)
@inlineCallbacks
def test_not_used(self):
km = self._key_manager()
- yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS,
+ yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS,
validation=ValidationLevels.Provider_Trust)
- yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ yield km.put_raw_key(UNRELATED_KEY, ADDRESS,
validation=ValidationLevels.Provider_Endorsement)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@inlineCallbacks
@@ -108,16 +105,16 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
TEXT = "some text"
km = self._key_manager()
- yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
- signature = yield km.sign(TEXT, ADDRESS, OpenPGPKey)
+ yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS)
+ signature = yield km.sign(TEXT, ADDRESS)
yield self.delete_all_keys(km)
- yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
- yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
- yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature)
+ yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS)
+ yield km.encrypt(TEXT, ADDRESS)
+ yield km.verify(TEXT, ADDRESS, detached_sig=signature)
d = km.put_raw_key(
- UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ UNRELATED_KEY, ADDRESS,
validation=ValidationLevels.Provider_Endorsement)
yield self.assertFailure(d, KeyNotValidUpgrade)
@@ -126,28 +123,27 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
TEXT = "some text"
km = self._key_manager()
- yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
- yield km.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2)
- encrypted = yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey,
- sign=ADDRESS)
+ yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS)
+ yield km.put_raw_key(PUBLIC_KEY_2, ADDRESS_2)
+ encrypted = yield km.encrypt(TEXT, ADDRESS_2, sign=ADDRESS)
yield self.delete_all_keys(km)
- yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
- yield km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2)
- yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
- yield km.decrypt(encrypted, ADDRESS_2, OpenPGPKey, verify=ADDRESS)
+ yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS)
+ yield km.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ yield km.encrypt(TEXT, ADDRESS)
+ yield km.decrypt(encrypted, ADDRESS_2, verify=ADDRESS)
d = km.put_raw_key(
- UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ UNRELATED_KEY, ADDRESS,
validation=ValidationLevels.Provider_Endorsement)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
def test_signed_key(self):
km = self._key_manager()
- yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
- yield km.put_raw_key(SIGNED_KEY, OpenPGPKey, ADDRESS)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ yield km.put_raw_key(SIGNED_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT)
@inlineCallbacks
@@ -155,26 +151,26 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
TEXT = "some text"
km = self._key_manager()
- yield km.put_raw_key(UUIDS_PRIVATE, OpenPGPKey, ADDRESS_2)
- signature = yield km.sign(TEXT, ADDRESS_2, OpenPGPKey)
+ yield km.put_raw_key(UUIDS_PRIVATE, ADDRESS_2)
+ signature = yield km.sign(TEXT, ADDRESS_2)
yield self.delete_all_keys(km)
- yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS_2)
- yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS)
- yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey)
- yield km.verify(TEXT, ADDRESS_2, OpenPGPKey, detached_sig=signature)
+ yield km.put_raw_key(UUIDS_KEY, ADDRESS_2)
+ yield km.put_raw_key(UUIDS_KEY, ADDRESS)
+ yield km.encrypt(TEXT, ADDRESS_2)
+ yield km.verify(TEXT, ADDRESS_2, detached_sig=signature)
d = km.put_raw_key(
- PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2,
+ PUBLIC_KEY_2, ADDRESS_2,
validation=ValidationLevels.Provider_Endorsement)
yield self.assertFailure(d, KeyNotValidUpgrade)
- key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
+ key = yield km.get_key(ADDRESS_2, fetch_remote=False)
self.assertEqual(key.fingerprint, UUIDS_FINGERPRINT)
yield km.put_raw_key(
- PUBLIC_KEY, OpenPGPKey, ADDRESS,
+ PUBLIC_KEY, ADDRESS,
validation=ValidationLevels.Provider_Endorsement)
- key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
self.assertEqual(key.fingerprint, KEY_FINGERPRINT)