diff options
| author | Ruben Pollan <meskio@sindominio.net> | 2016-05-01 15:23:15 -0300 | 
|---|---|---|
| committer | Ruben Pollan <meskio@sindominio.net> | 2016-06-01 18:49:15 +0200 | 
| commit | ea0dfc4f873c2a23800b92bc83a952160ce9decc (patch) | |
| tree | 9642a75c3d00a7344789b3ae805e1f17bbdcc2f2 /keymanager | |
| parent | e9d00c34a8b2dc7004d0e90098683fb599108b2e (diff) | |
[feat] remove the keytype support
- Resolves: #8031
Diffstat (limited to 'keymanager')
| -rw-r--r-- | keymanager/changes/next-changelog.txt | 2 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 161 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/__init__.py | 5 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 147 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_validation.py | 94 | 
5 files changed, 154 insertions, 255 deletions
| diff --git a/keymanager/changes/next-changelog.txt b/keymanager/changes/next-changelog.txt index 09b2e71..7fad645 100644 --- a/keymanager/changes/next-changelog.txt +++ b/keymanager/changes/next-changelog.txt @@ -10,6 +10,8 @@ I've added a new category `Misc` so we can track doc/style/packaging stuff.  Features  ~~~~~~~~ +- `#8031 <https://leap.se/code/issues/8031>`_: Remove support for multiple key types. +  - `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234.  - New feature without related issue number. diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index aa0a9ac..f72f403 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -71,9 +71,7 @@ from leap.common.decorators import memoized_method  from leap.keymanager.errors import (      KeyNotFound, -    KeyAddressMismatch,      KeyNotValidUpgrade, -    UnsupportedKeyTypeError,      InvalidSignature  )  from leap.keymanager.validation import ValidationLevels, can_upgrade @@ -138,11 +136,7 @@ class KeyManager(object):          self.api_uri = api_uri          self.api_version = api_version          self.uid = uid -        # a dict to map key types to their handlers -        self._wrapper_map = { -            OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary), -            # other types of key will be added to this mapper. -        } +        self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary)          # the following are used to perform https requests          self._fetcher = requests          self._combined_ca_bundle = self._create_combined_bundle_file() @@ -184,20 +178,6 @@ class KeyManager(object):          return tmp_file.name -    def _key_class_from_type(self, ktype): -        """ -        Given a class type, return a class - -        :param ktype: string representation of a class name -        :type ktype: str - -        :return: A class with the matching name -        :rtype: classobj or type -        """ -        return filter( -            lambda klass: klass.__name__ == ktype, -            self._wrapper_map).pop() -      @defer.inlineCallbacks      def _get_key_from_nicknym(self, address):          """ @@ -349,7 +329,6 @@ class KeyManager(object):              yield self.put_raw_key(                  server_keys['openpgp'], -                OpenPGPKey,                  address=address,                  validation=validation_level) @@ -357,24 +336,19 @@ class KeyManager(object):      # key management      # -    def send_key(self, ktype): +    def send_key(self):          """ -        Send user's key of type ktype to provider. +        Send user's key to provider.          Public key bound to user's is sent to provider, which will sign it and          replace any prior keys for the same address in its database. -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey -          :return: A Deferred which fires when the key is sent, or which fails                   with KeyNotFound if the key was not found in local database.          :rtype: Deferred          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -          def send(pubkey):              data = {                  self.PUBKEY_KEY: pubkey.key_data @@ -390,40 +364,33 @@ class KeyManager(object):              return d          d = self.get_key( -            self._address, ktype, private=False, fetch_remote=False) +            self._address, private=False, fetch_remote=False)          d.addCallback(send)          return d -    def get_key(self, address, ktype, private=False, fetch_remote=True): +    def get_key(self, address, private=False, fetch_remote=True):          """ -        Return a key of type ktype bound to address. +        Return a key bound to address.          First, search for the key in local storage. If it is not available,          then try to fetch from nickserver.          :param address: The address bound to the key.          :type address: str -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey          :param private: Look for a private key instead of a public one?          :type private: bool          :param fetch_remote: If key not found in local storage try to fetch                               from nickserver          :type fetch_remote: bool -        :return: A Deferred which fires with an EncryptionKey of type ktype -                 bound to address, or which fails with KeyNotFound if no key -                 was found neither locally or in keyserver. +        :return: A Deferred which fires with an EncryptionKey bound to address, +                 or which fails with KeyNotFound if no key was found neither +                 locally or in keyserver.          :rtype: Deferred          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype)          logger.debug("getting key for %s" % (address,)) -        leap_assert( -            ktype in self._wrapper_map, -            'Unkown key type: %s.' % str(ktype)) -        _keys = self._wrapper_map[ktype]          emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) @@ -445,12 +412,12 @@ class KeyManager(object):              emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)              d = self._fetch_keys_from_server(address)              d.addCallback( -                lambda _: _keys.get_key(address, private=False)) +                lambda _: self._openpgp.get_key(address, private=False))              d.addCallback(key_found)              return d          # return key if it exists in local database -        d = _keys.get_key(address, private=private) +        d = self._openpgp.get_key(address, private=private)          d.addCallbacks(key_found, key_not_found)          return d @@ -467,9 +434,7 @@ class KeyManager(object):          # TODO: should it be based on activedocs?          def build_keys(docs):              return map( -                lambda doc: build_key_from_dict( -                    self._key_class_from_type(doc.content['type']), -                    doc.content), +                lambda doc: build_key_from_dict(OpenPGPKey, doc.content),                  docs)          # XXX: there is no check that the soledad indexes are ready, as it @@ -484,20 +449,15 @@ class KeyManager(object):          d.addCallback(build_keys)          return d -    def gen_key(self, ktype): +    def gen_key(self):          """ -        Generate a key of type ktype bound to the user's address. - -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey +        Generate a key bound to the user's address.          :return: A Deferred which fires with the generated EncryptionKey.          :rtype: Deferred          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          def signal_finished(key):              emit_async( @@ -506,7 +466,7 @@ class KeyManager(object):          emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address) -        d = _keys.gen_key(self._address) +        d = self.openpg.gen_key(self._address)          d.addCallback(signal_finished)          return d @@ -564,7 +524,7 @@ class KeyManager(object):      # encrypt/decrypt and sign/verify API      # -    def encrypt(self, data, address, ktype, passphrase=None, sign=None, +    def encrypt(self, data, address, passphrase=None, sign=None,                  cipher_algo='AES256', fetch_remote=True):          """          Encrypt data with the public key bound to address and sign with with @@ -574,8 +534,6 @@ class KeyManager(object):          :type data: str          :param address: The address to encrypt it for.          :type address: str -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey          :param passphrase: The passphrase for the secret key used for the                             signature.          :type passphrase: str @@ -595,30 +553,28 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          @defer.inlineCallbacks          def encrypt(keys):              pubkey, signkey = keys -            encrypted = yield _keys.encrypt( +            encrypted = yield self._openpgp.encrypt(                  data, pubkey, passphrase, sign=signkey,                  cipher_algo=cipher_algo)              if not pubkey.encr_used:                  pubkey.encr_used = True -                yield _keys.put_key(pubkey) +                yield self._openpgp.put_key(pubkey)              defer.returnValue(encrypted) -        dpub = self.get_key(address, ktype, private=False, +        dpub = self.get_key(address, private=False,                              fetch_remote=fetch_remote)          dpriv = defer.succeed(None)          if sign is not None: -            dpriv = self.get_key(sign, ktype, private=True) +            dpriv = self.get_key(sign, private=True)          d = defer.gatherResults([dpub, dpriv], consumeErrors=True)          d.addCallbacks(encrypt, self._extract_first_error)          return d -    def decrypt(self, data, address, ktype, passphrase=None, verify=None, +    def decrypt(self, data, address, passphrase=None, verify=None,                  fetch_remote=True):          """          Decrypt data using private key from address and verify with public key @@ -628,8 +584,6 @@ class KeyManager(object):          :type data: str          :param address: The address to whom data was encrypted.          :type address: str -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey          :param passphrase: The passphrase for the secret key used for                             decryption.          :type passphrase: str @@ -649,13 +603,11 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          @defer.inlineCallbacks          def decrypt(keys):              pubkey, privkey = keys -            decrypted, signed = yield _keys.decrypt( +            decrypted, signed = yield self._openpgp.decrypt(                  data, privkey, passphrase=passphrase, verify=pubkey)              if pubkey is None:                  signature = KeyNotFound(verify) @@ -663,7 +615,7 @@ class KeyManager(object):                  signature = pubkey                  if not pubkey.sign_used:                      pubkey.sign_used = True -                    yield _keys.put_key(pubkey) +                    yield self._openpgp.put_key(pubkey)                      defer.returnValue((decrypted, signature))              else:                  signature = InvalidSignature( @@ -671,10 +623,10 @@ class KeyManager(object):                      (pubkey.fingerprint,))              defer.returnValue((decrypted, signature)) -        dpriv = self.get_key(address, ktype, private=True) +        dpriv = self.get_key(address, private=True)          dpub = defer.succeed(None)          if verify is not None: -            dpub = self.get_key(verify, ktype, private=False, +            dpub = self.get_key(verify, private=False,                                  fetch_remote=fetch_remote)              dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f)          d = defer.gatherResults([dpub, dpriv], consumeErrors=True) @@ -684,7 +636,7 @@ class KeyManager(object):      def _extract_first_error(self, failure):          return failure.value.subFailure -    def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False, +    def sign(self, data, address, digest_algo='SHA512', clearsign=False,               detach=True, binary=False):          """          Sign data with private key bound to address. @@ -693,8 +645,6 @@ class KeyManager(object):          :type data: str          :param address: The address to be used to sign.          :type address: EncryptionKey -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey          :param digest_algo: The hash digest to use.          :type digest_algo: str          :param clearsign: If True, create a cleartext signature. @@ -712,19 +662,17 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          def sign(privkey): -            return _keys.sign( +            return self._openpgp.sign(                  data, privkey, digest_algo=digest_algo, clearsign=clearsign,                  detach=detach, binary=binary) -        d = self.get_key(address, ktype, private=True) +        d = self.get_key(address, private=True)          d.addCallback(sign)          return d -    def verify(self, data, address, ktype, detached_sig=None, +    def verify(self, data, address, detached_sig=None,                 fetch_remote=True):          """          Verify signed data with private key bound to address, eventually using @@ -734,8 +682,6 @@ class KeyManager(object):          :type data: str          :param address: The address to be used to verify.          :type address: EncryptionKey -        :param ktype: The type of the key. -        :type ktype: subclass of EncryptionKey          :param detached_sig: A detached signature. If given, C{data} is                               verified using this detached signature.          :type detached_sig: str @@ -751,16 +697,14 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          def verify(pubkey): -            signed = _keys.verify( +            signed = self._openpgp.verify(                  data, pubkey, detached_sig=detached_sig)              if signed:                  if not pubkey.sign_used:                      pubkey.sign_used = True -                    d = _keys.put_key(pubkey) +                    d = self._openpgp.put_key(pubkey)                      d.addCallback(lambda _: pubkey)                      return d                  return pubkey @@ -769,7 +713,7 @@ class KeyManager(object):                      'Failed to verify signature with key %s' %                      (pubkey.fingerprint,)) -        d = self.get_key(address, ktype, private=False, +        d = self.get_key(address, private=False,                           fetch_remote=fetch_remote)          d.addCallback(verify)          return d @@ -787,9 +731,7 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(type(key)) -        _keys = self._wrapper_map[type(key)] -        return _keys.delete_key(key) +        return self._openpgp.delete_key(key)      def put_key(self, key):          """ @@ -805,9 +747,6 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        ktype = type(key) -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          def old_key_not_found(failure):              if failure.check(KeyNotFound): @@ -817,26 +756,24 @@ class KeyManager(object):          def check_upgrade(old_key):              if key.private or can_upgrade(key, old_key): -                return _keys.put_key(key) +                return self._openpgp.put_key(key)              else:                  raise KeyNotValidUpgrade(                      "Key %s can not be upgraded by new key %s"                      % (old_key.fingerprint, key.fingerprint)) -        d = _keys.get_key(key.address, private=key.private) +        d = self._openpgp.get_key(key.address, private=key.private)          d.addErrback(old_key_not_found)          d.addCallback(check_upgrade)          return d -    def put_raw_key(self, key, ktype, address, +    def put_raw_key(self, key, address,                      validation=ValidationLevels.Weak_Chain):          """          Put raw key bound to address in local storage.          :param key: The ascii key to be stored          :type key: str -        :param ktype: the type of the key. -        :type ktype: subclass of EncryptionKey          :param address: address for which this key will be active          :type address: str          :param validation: validation level for this key @@ -853,10 +790,7 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype] - -        pubkey, privkey = _keys.parse_key(key, address) +        pubkey, privkey = self._openpgp.parse_key(key, address)          if pubkey is None:              return defer.fail(KeyNotFound(key)) @@ -868,8 +802,7 @@ class KeyManager(object):          return d      @defer.inlineCallbacks -    def fetch_key(self, address, uri, ktype, -                  validation=ValidationLevels.Weak_Chain): +    def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain):          """          Fetch a public key bound to address from the network and put it in          local storage. @@ -878,8 +811,6 @@ class KeyManager(object):          :type address: str          :param uri: The URI of the key.          :type uri: str -        :param ktype: the type of the key. -        :type ktype: subclass of EncryptionKey          :param validation: validation level for this key                             (default: 'Weak_Chain')          :type validation: ValidationLevels @@ -893,32 +824,18 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -        self._assert_supported_key_type(ktype) -        _keys = self._wrapper_map[ktype]          logger.info("Fetch key for %s from %s" % (address, uri))          ascii_content = yield self._get_with_combined_ca_bundle(uri)          # XXX parse binary keys -        pubkey, _ = _keys.parse_key(ascii_content, address) +        pubkey, _ = self._openpgp.parse_key(ascii_content, address)          if pubkey is None:              raise KeyNotFound(uri)          pubkey.validation = validation          yield self.put_key(pubkey) -    def _assert_supported_key_type(self, ktype): -        """ -        Check if ktype is one of the supported key types - -        :param ktype: the type of the key. -        :type ktype: subclass of EncryptionKey - -        :raise UnsupportedKeyTypeError: if invalid key type -        """ -        if ktype not in self._wrapper_map: -            raise UnsupportedKeyTypeError(str(ktype)) -  def _split_email(address):      """ diff --git a/keymanager/src/leap/keymanager/tests/__init__.py b/keymanager/src/leap/keymanager/tests/__init__.py index 2a6a3f1..a20e1fd 100644 --- a/keymanager/src/leap/keymanager/tests/__init__.py +++ b/keymanager/src/leap/keymanager/tests/__init__.py @@ -27,7 +27,6 @@ from twisted.trial import unittest  from leap.common.testing.basetest import BaseLeapTest  from leap.soledad.client import Soledad  from leap.keymanager import KeyManager -from leap.keymanager.openpgp import OpenPGPKey  PATH = os.path.dirname(os.path.realpath(__file__)) @@ -56,7 +55,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):          km = self._key_manager()          # wait for the indexes to be ready for the tear down -        d = km._wrapper_map[OpenPGPKey].deferred_init +        d = km._openpgp.deferred_init          d.addCallback(lambda _: self.delete_all_keys(km))          d.addCallback(lambda _: self.tearDownEnv())          d.addCallback(lambda _: self._soledad.close()) @@ -66,7 +65,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):          def delete_keys(keys):              deferreds = []              for key in keys: -                d = km._wrapper_map[key.__class__].delete_key(key) +                d = km._openpgp.delete_key(key)                  deferreds.append(d)              return gatherResults(deferreds) diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 14f47f6..c55a3c3 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -34,13 +34,9 @@ from twisted.web._responses import NOT_FOUND  from leap.keymanager import client -from leap.keymanager import ( -    KeyNotFound, -    KeyAddressMismatch, -    errors -) -from leap.keymanager.openpgp import OpenPGPKey +from leap.keymanager import errors  from leap.keymanager.keys import ( +    OpenPGPKey,      is_address,      build_key_from_dict,  ) @@ -140,7 +136,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      @defer.inlineCallbacks      def test_get_all_keys_in_db(self):          km = self._key_manager() -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)          # get public keys          keys = yield km.get_all_keys(False)          self.assertEqual(len(keys), 1, 'Wrong number of keys') @@ -155,10 +151,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      @defer.inlineCallbacks      def test_get_public_key(self):          km = self._key_manager() -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)          # get the key -        key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, -                               fetch_remote=False) +        key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)          self.assertTrue(key is not None)          self.assertTrue(ADDRESS in key.uids)          self.assertEqual( @@ -168,10 +163,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      @defer.inlineCallbacks      def test_get_public_key_with_binary_private_key(self):          km = self._key_manager() -        yield km._wrapper_map[OpenPGPKey].put_raw_key(self.get_private_binary_key(), ADDRESS) +        yield km._openpgp.put_raw_key(self.get_private_binary_key(), ADDRESS)          # get the key -        key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, -                               fetch_remote=False) +        key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)          self.assertTrue(key is not None)          self.assertTrue(ADDRESS in key.uids)          self.assertEqual( @@ -181,10 +175,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      @defer.inlineCallbacks      def test_get_private_key(self):          km = self._key_manager() -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)          # get the key -        key = yield km.get_key(ADDRESS, OpenPGPKey, private=True, -                               fetch_remote=False) +        key = yield km.get_key(ADDRESS, private=True, fetch_remote=False)          self.assertTrue(key is not None)          self.assertTrue(ADDRESS in key.uids)          self.assertEqual( @@ -193,8 +186,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_send_key_raises_key_not_found(self):          km = self._key_manager() -        d = km.send_key(OpenPGPKey) -        return self.assertFailure(d, KeyNotFound) +        d = km.send_key() +        return self.assertFailure(d, errors.KeyNotFound)      @defer.inlineCallbacks      def test_send_key(self): @@ -203,7 +196,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          token = "mytoken"          km = self._key_manager(token=token) -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PUBLIC_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS)          km._async_client_pinned.request = Mock(return_value=defer.succeed(''))          # the following data will be used on the send          km.ca_cert_path = 'capath' @@ -211,9 +204,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km.uid = 'myuid'          km.api_uri = 'apiuri'          km.api_version = 'apiver' -        yield km.send_key(OpenPGPKey) +        yield km.send_key()          # setup expected args -        pubkey = yield km.get_key(km._address, OpenPGPKey) +        pubkey = yield km.get_key(km._address)          data = urllib.urlencode({              km.PUBKEY_KEY: pubkey.key_data,          }) @@ -312,10 +305,10 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km._async_client_pinned.request = Mock(return_value=defer.succeed(None))          km.ca_cert_path = 'cacertpath'          # try to key get without fetching from server -        d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False) -        d = self.assertFailure(d_fail, KeyNotFound) +        d_fail = km.get_key(address, fetch_remote=False) +        d = self.assertFailure(d_fail, errors.KeyNotFound)          # try to get key fetching from server. -        d.addCallback(lambda _: km.get_key(address, OpenPGPKey)) +        d.addCallback(lambda _: km.get_key(address))          return d      @defer.inlineCallbacks @@ -325,8 +318,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url=NICKSERVER_URI) -        yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) -        key = yield km.get_key(ADDRESS, OpenPGPKey) +        yield km.put_raw_key(PUBLIC_KEY, ADDRESS) +        key = yield km.get_key(ADDRESS)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS in key.uids) @@ -337,8 +330,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url=NICKSERVER_URI) -        yield km.put_raw_key(self.get_public_binary_key(), OpenPGPKey, ADDRESS) -        key = yield km.get_key(ADDRESS, OpenPGPKey) +        yield km.put_raw_key(self.get_public_binary_key(), ADDRESS) +        key = yield km.get_key(ADDRESS)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS in key.uids) @@ -353,8 +346,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY)) -        yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) -        key = yield km.get_key(ADDRESS, OpenPGPKey) +        yield km.fetch_key(ADDRESS, "http://site.domain/key") +        key = yield km.get_key(ADDRESS)          self.assertEqual(KEY_FINGERPRINT, key.fingerprint)      @defer.inlineCallbacks @@ -365,10 +358,11 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager() -        km._async_client.request = Mock(return_value=defer.succeed(self.get_public_binary_key())) +        km._async_client.request = Mock( +            return_value=defer.succeed(self.get_public_binary_key())) -        yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) -        key = yield km.get_key(ADDRESS, OpenPGPKey) +        yield km.fetch_key(ADDRESS, "http://site.domain/key") +        key = yield km.get_key(ADDRESS)          self.assertEqual(KEY_FINGERPRINT, key.fingerprint)      def test_fetch_uri_empty_key(self): @@ -382,8 +376,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              content = ""          km._fetcher.get = Mock(return_value=Response()) -        d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) -        return self.assertFailure(d, KeyNotFound) +        d = km.fetch_key(ADDRESS, "http://site.domain/key") +        return self.assertFailure(d, errors.KeyNotFound)      def test_fetch_uri_address_differ(self):          """ @@ -393,8 +387,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km = self._key_manager()          km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY)) -        d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey) -        return self.assertFailure(d, KeyAddressMismatch) +        d = km.fetch_key(ADDRESS_2, "http://site.domain/key") +        return self.assertFailure(d, errors.KeyAddressMismatch)      def _mock_get_response(self, km, body):          km._async_client.request = MagicMock(return_value=defer.succeed(body)) @@ -407,7 +401,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km = self._key_manager(ca_cert_path=ca_cert_path)          get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) -        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) +        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)          get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -417,7 +411,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km = self._key_manager(ca_cert_path=ca_cert_path)          get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) -        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) +        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)          get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -427,7 +421,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km = self._key_manager(ca_cert_path=ca_cert_path)          get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) -        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) +        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)          get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -445,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):                  km = self._key_manager(ca_cert_path=ca_cert_path)                  get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) -                yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) +                yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)                  # assert that combined bundle file is passed to get call                  get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -470,16 +464,15 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_decrypt_updates_sign_used_for_signer(self):          # given          km = self._key_manager() -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) -        yield km._wrapper_map[OpenPGPKey].put_raw_key( -            PRIVATE_KEY_2, ADDRESS_2) -        encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey, -                                   sign=ADDRESS_2, fetch_remote=False) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) +        encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2, +                                   fetch_remote=False)          yield km.decrypt( -            encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) +            encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)          # when -        key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False) +        key = yield km.get_key(ADDRESS_2, fetch_remote=False)          # then          self.assertEqual(True, key.sign_used) @@ -488,18 +481,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_decrypt_does_not_update_sign_used_for_recipient(self):          # given          km = self._key_manager() -        yield km._wrapper_map[OpenPGPKey].put_raw_key( -            PRIVATE_KEY, ADDRESS) -        yield km._wrapper_map[OpenPGPKey].put_raw_key( -            PRIVATE_KEY_2, ADDRESS_2) -        encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey, -                                   sign=ADDRESS_2, fetch_remote=False) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) +        encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2, +                                   fetch_remote=False)          yield km.decrypt( -            encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) +            encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)          # when          key = yield km.get_key( -            ADDRESS, OpenPGPKey, private=False, fetch_remote=False) +            ADDRESS, private=False, fetch_remote=False)          # then          self.assertEqual(False, key.sign_used) @@ -513,35 +504,32 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_keymanager_openpgp_encrypt_decrypt(self):          km = self._key_manager()          # put raw private key -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) -        yield km._wrapper_map[OpenPGPKey].put_raw_key( -            PRIVATE_KEY_2, ADDRESS_2) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)          # encrypt -        encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey, -                                   sign=ADDRESS_2, fetch_remote=False) +        encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2, +                                   fetch_remote=False)          self.assertNotEqual(self.RAW_DATA, encdata)          # decrypt          rawdata, signingkey = yield km.decrypt( -            encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) +            encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)          self.assertEqual(self.RAW_DATA, rawdata) -        key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False, -                               fetch_remote=False) +        key = yield km.get_key(ADDRESS_2, private=False, fetch_remote=False)          self.assertEqual(signingkey.fingerprint, key.fingerprint)      @defer.inlineCallbacks      def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):          km = self._key_manager()          # put raw keys -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) -        yield km._wrapper_map[OpenPGPKey].put_raw_key( -            PRIVATE_KEY_2, ADDRESS_2) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) +        yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)          # encrypt -        encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey, -                                   sign=ADDRESS_2, fetch_remote=False) +        encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2, +                                   fetch_remote=False)          self.assertNotEqual(self.RAW_DATA, encdata)          # verify          rawdata, signingkey = yield km.decrypt( -            encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False) +            encdata, ADDRESS, verify=ADDRESS, fetch_remote=False)          self.assertEqual(self.RAW_DATA, rawdata)          self.assertTrue(isinstance(signingkey, errors.InvalidSignature)) @@ -549,24 +537,21 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_keymanager_openpgp_sign_verify(self):          km = self._key_manager()          # put raw private keys -        yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) -        signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey, -                                 detach=False) +        yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) +        signdata = yield km.sign(self.RAW_DATA, ADDRESS, detach=False)          self.assertNotEqual(self.RAW_DATA, signdata)          # verify -        signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey, -                                     fetch_remote=False) -        key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, -                               fetch_remote=False) +        signingkey = yield km.verify(signdata, ADDRESS, fetch_remote=False) +        key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)          self.assertEqual(signingkey.fingerprint, key.fingerprint)      def test_keymanager_encrypt_key_not_found(self):          km = self._key_manager() -        d = km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) +        d = km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)          d.addCallback( -            lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey, -                                 sign=ADDRESS, fetch_remote=False)) -        return self.assertFailure(d, KeyNotFound) +            lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, sign=ADDRESS, +                                 fetch_remote=False)) +        return self.assertFailure(d, errors.KeyNotFound)  if __name__ == "__main__":      import unittest diff --git a/keymanager/src/leap/keymanager/tests/test_validation.py b/keymanager/src/leap/keymanager/tests/test_validation.py index 44d6e39..954b1df 100644 --- a/keymanager/src/leap/keymanager/tests/test_validation.py +++ b/keymanager/src/leap/keymanager/tests/test_validation.py @@ -22,7 +22,6 @@ import unittest  from datetime import datetime  from twisted.internet.defer import inlineCallbacks -from leap.keymanager.openpgp import OpenPGPKey  from leap.keymanager.errors import (      KeyNotValidUpgrade  ) @@ -43,64 +42,62 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):      @inlineCallbacks      def test_none_old_key(self):          km = self._key_manager() -        yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        yield km.put_raw_key(PUBLIC_KEY, ADDRESS) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.fingerprint, KEY_FINGERPRINT)      @inlineCallbacks      def test_cant_upgrade(self):          km = self._key_manager() -        yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, +        yield km.put_raw_key(PUBLIC_KEY, ADDRESS,                               validation=ValidationLevels.Provider_Trust) -        d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) +        d = km.put_raw_key(UNRELATED_KEY, ADDRESS)          yield self.assertFailure(d, KeyNotValidUpgrade)      @inlineCallbacks      def test_fingerprint_level(self):          km = self._key_manager() -        yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) -        yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, +        yield km.put_raw_key(PUBLIC_KEY, ADDRESS) +        yield km.put_raw_key(UNRELATED_KEY, ADDRESS,                               validation=ValidationLevels.Fingerprint) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)      @inlineCallbacks      def test_expired_key(self):          km = self._key_manager() -        yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) -        yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        yield km.put_raw_key(EXPIRED_KEY, ADDRESS) +        yield km.put_raw_key(UNRELATED_KEY, ADDRESS) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)      @inlineCallbacks      def test_expired_fail_lower_level(self):          km = self._key_manager()          yield km.put_raw_key( -            EXPIRED_KEY, OpenPGPKey, ADDRESS, +            EXPIRED_KEY, ADDRESS,              validation=ValidationLevels.Third_Party_Endorsement)          d = km.put_raw_key( -            UNRELATED_KEY, -            OpenPGPKey, -            ADDRESS, +            UNRELATED_KEY, ADDRESS,              validation=ValidationLevels.Provider_Trust)          yield self.assertFailure(d, KeyNotValidUpgrade)      @inlineCallbacks      def test_roll_back(self):          km = self._key_manager() -        yield km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) -        yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        yield km.put_raw_key(EXPIRED_KEY_UPDATED, ADDRESS) +        yield km.put_raw_key(EXPIRED_KEY, ADDRESS) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)      @inlineCallbacks      def test_not_used(self):          km = self._key_manager() -        yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS, +        yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS,                               validation=ValidationLevels.Provider_Trust) -        yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, +        yield km.put_raw_key(UNRELATED_KEY, ADDRESS,                               validation=ValidationLevels.Provider_Endorsement) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)      @inlineCallbacks @@ -108,16 +105,16 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):          TEXT = "some text"          km = self._key_manager() -        yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS) -        signature = yield km.sign(TEXT, ADDRESS, OpenPGPKey) +        yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS) +        signature = yield km.sign(TEXT, ADDRESS)          yield self.delete_all_keys(km) -        yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS) -        yield km.encrypt(TEXT, ADDRESS, OpenPGPKey) -        yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature) +        yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS) +        yield km.encrypt(TEXT, ADDRESS) +        yield km.verify(TEXT, ADDRESS, detached_sig=signature)          d = km.put_raw_key( -            UNRELATED_KEY, OpenPGPKey, ADDRESS, +            UNRELATED_KEY, ADDRESS,              validation=ValidationLevels.Provider_Endorsement)          yield self.assertFailure(d, KeyNotValidUpgrade) @@ -126,28 +123,27 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):          TEXT = "some text"          km = self._key_manager() -        yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS) -        yield km.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2) -        encrypted = yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey, -                                     sign=ADDRESS) +        yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS) +        yield km.put_raw_key(PUBLIC_KEY_2, ADDRESS_2) +        encrypted = yield km.encrypt(TEXT, ADDRESS_2, sign=ADDRESS)          yield self.delete_all_keys(km) -        yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS) -        yield km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2) -        yield km.encrypt(TEXT, ADDRESS, OpenPGPKey) -        yield km.decrypt(encrypted, ADDRESS_2, OpenPGPKey, verify=ADDRESS) +        yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS) +        yield km.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) +        yield km.encrypt(TEXT, ADDRESS) +        yield km.decrypt(encrypted, ADDRESS_2, verify=ADDRESS)          d = km.put_raw_key( -            UNRELATED_KEY, OpenPGPKey, ADDRESS, +            UNRELATED_KEY, ADDRESS,              validation=ValidationLevels.Provider_Endorsement)          yield self.assertFailure(d, KeyNotValidUpgrade)      @inlineCallbacks      def test_signed_key(self):          km = self._key_manager() -        yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) -        yield km.put_raw_key(SIGNED_KEY, OpenPGPKey, ADDRESS) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        yield km.put_raw_key(PUBLIC_KEY, ADDRESS) +        yield km.put_raw_key(SIGNED_KEY, ADDRESS) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT)      @inlineCallbacks @@ -155,26 +151,26 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):          TEXT = "some text"          km = self._key_manager() -        yield km.put_raw_key(UUIDS_PRIVATE, OpenPGPKey, ADDRESS_2) -        signature = yield km.sign(TEXT, ADDRESS_2, OpenPGPKey) +        yield km.put_raw_key(UUIDS_PRIVATE, ADDRESS_2) +        signature = yield km.sign(TEXT, ADDRESS_2)          yield self.delete_all_keys(km) -        yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS_2) -        yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS) -        yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey) -        yield km.verify(TEXT, ADDRESS_2, OpenPGPKey, detached_sig=signature) +        yield km.put_raw_key(UUIDS_KEY, ADDRESS_2) +        yield km.put_raw_key(UUIDS_KEY, ADDRESS) +        yield km.encrypt(TEXT, ADDRESS_2) +        yield km.verify(TEXT, ADDRESS_2, detached_sig=signature)          d = km.put_raw_key( -            PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2, +            PUBLIC_KEY_2, ADDRESS_2,              validation=ValidationLevels.Provider_Endorsement)          yield self.assertFailure(d, KeyNotValidUpgrade) -        key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False) +        key = yield km.get_key(ADDRESS_2, fetch_remote=False)          self.assertEqual(key.fingerprint, UUIDS_FINGERPRINT)          yield km.put_raw_key( -            PUBLIC_KEY, OpenPGPKey, ADDRESS, +            PUBLIC_KEY, ADDRESS,              validation=ValidationLevels.Provider_Endorsement) -        key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) +        key = yield km.get_key(ADDRESS, fetch_remote=False)          self.assertEqual(key.fingerprint, KEY_FINGERPRINT) | 
