diff options
Diffstat (limited to 'keymanager/src/leap')
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 66 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 33 | 
2 files changed, 26 insertions, 73 deletions
| diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 41f352e..bd85c2d 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -82,12 +82,12 @@ class KeyManager(object):                   gpgbinary=None):          """          Initialize a Key Manager for user's C{address} with provider's -        nickserver reachable in C{url}. +        nickserver reachable in C{nickserver_uri}. -        :param address: The address of the user of this Key Manager. +        :param address: The email address of the user of this Key Manager.          :type address: str -        :param url: The URL of the nickserver. -        :type url: str +        :param nickserver_uri: The URI of the nickserver. +        :type nickserver_uri: str          :param soledad: A Soledad instance for local storage of keys.          :type soledad: leap.soledad.Soledad          :param token: The token for interacting with the webapp API. @@ -98,7 +98,7 @@ class KeyManager(object):          :type api_uri: str          :param api_version: The version of the webapp API.          :type api_version: str -        :param uid: The users' UID. +        :param uid: The user's UID.          :type uid: str          :param gpgbinary: Name for GnuPG binary executable.          :type gpgbinary: C{str} @@ -189,6 +189,7 @@ class KeyManager(object):          res.raise_for_status()          return res +    @memoized_method(invalidation=300)      def _fetch_keys_from_server(self, address):          """          Fetch keys bound to C{address} from nickserver and insert them in @@ -228,12 +229,6 @@ class KeyManager(object):          Public key bound to user's is sent to provider, which will sign it and          replace any prior keys for the same address in its database. -        If C{send_private} is True, then the private key is encrypted with -        C{password} and sent to server in the same request, together with a -        hash string of user's address and password. The encrypted private key -        will be saved in the server in a way it is publicly retrievable -        through the hash string. -          :param ktype: The type of the key.          :type ktype: KeyType @@ -255,13 +250,6 @@ class KeyManager(object):          self._put(uri, data)          signal(proto.KEYMANAGER_DONE_UPLOADING_KEYS, self._address) -    @memoized_method -    def get_key_from_cache(self, *args, **kwargs): -        """ -        Public interface to `get_key`, that is memoized. -        """ -        return self.get_key(*args, **kwargs) -      def get_key(self, address, ktype, private=False, fetch_remote=True):          """          Return a key of type C{ktype} bound to C{address}. @@ -275,6 +263,9 @@ class KeyManager(object):          :type ktype: KeyType          :param private: Look for a private key instead of a public one?          :type private: bool +        :param fetch_remote: If key not found in local storage try to fetch +                             from nickserver +        :type fetch_remote: bool          :return: A key of type C{ktype} bound to C{address}.          :rtype: EncryptionKey @@ -307,10 +298,13 @@ class KeyManager(object):              return key -    def get_all_keys_in_local_db(self, private=False): +    def get_all_keys(self, private=False):          """          Return all keys stored in local database. +        :param private: Include private keys +        :type private: bool +          :return: A list with all keys in local db.          :rtype: list          """ @@ -324,19 +318,6 @@ class KeyManager(object):                  KEYMANAGER_KEY_TAG,                  '1' if private else '0')) -    def refresh_keys(self): -        """ -        Fetch keys from nickserver and update them locally. -        """ -        addresses = set(map( -            lambda doc: doc.address, -            self.get_all_keys_in_local_db(private=False))) -        for address in addresses: -            # do not attempt to refresh our own key -            if address == self._address: -                continue -            self._fetch_keys_from_server(address) -      def gen_key(self, ktype):          """          Generate a key of type C{ktype} bound to the user's address. @@ -416,6 +397,9 @@ class KeyManager(object):          :type data: str          :param pubkey: The key used to encrypt.          :type pubkey: EncryptionKey +        :param passphrase: The passphrase for the secret key used for the +                           signature. +        :type passphrase: str          :param sign: The key used for signing.          :type sign: EncryptionKey          :param cipher_algo: The cipher algorithm to use. @@ -428,7 +412,7 @@ class KeyManager(object):          leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')          leap_assert(pubkey.private is False, 'Key is not public.')          return self._wrapper_map[pubkey.__class__].encrypt( -            data, pubkey, passphrase, sign) +            data, pubkey, passphrase, sign, cipher_algo=cipher_algo)      def decrypt(self, data, privkey, passphrase=None, verify=None):          """ @@ -448,7 +432,7 @@ class KeyManager(object):          :rtype: str          :raise InvalidSignature: Raised if unable to verify the signature with -            C{verify} key. +                                 C{verify} key.          """          leap_assert_type(privkey, EncryptionKey)          leap_assert( @@ -511,20 +495,6 @@ class KeyManager(object):          return self._wrapper_map[pubkey.__class__].verify(              data, pubkey, detached_sig=detached_sig) -    def parse_openpgp_ascii_key(self, key_data): -        """ -        Parses an ascii armored key (or key pair) data and returns -        the OpenPGPKey keys. - -        :param key_data: the key data to be parsed. -        :type key_data: str or unicode - -        :returns: the public key and private key (if applies) for that data. -        :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey) -                the tuple may have one or both components None -        """ -        return self._wrapper_map[OpenPGPKey].parse_ascii_key(key_data) -      def delete_key(self, key):          """          Remove C{key} from storage. diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index f89dcd9..7192bfb 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -142,9 +142,9 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):      def tearDown(self):          km = self._key_manager() -        for key in km.get_all_keys_in_local_db(): +        for key in km.get_all_keys():              km._wrapper_map[key.__class__].delete_key(key) -        for key in km.get_all_keys_in_local_db(private=True): +        for key in km.get_all_keys(private=True):              km._wrapper_map[key.__class__].delete_key(key)      def _key_manager(self, user=ADDRESS, url='', token=None): @@ -343,12 +343,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km = self._key_manager()          km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)          # get public keys -        keys = km.get_all_keys_in_local_db(False) +        keys = km.get_all_keys(False)          self.assertEqual(len(keys), 1, 'Wrong number of keys')          self.assertEqual(ADDRESS, keys[0].address)          self.assertFalse(keys[0].private)          # get private keys -        keys = km.get_all_keys_in_local_db(True) +        keys = km.get_all_keys(True)          self.assertEqual(len(keys), 1, 'Wrong number of keys')          self.assertEqual(ADDRESS, keys[0].address)          self.assertTrue(keys[0].private) @@ -437,23 +437,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              verify='cacertpath',          ) -    def test_refresh_keys_does_not_refresh_own_key(self): -        """ -        Test that refreshing keys will not attempt to refresh our own key. -        """ -        km = self._key_manager() -        # we add 2 keys but we expect it to only refresh the second one. -        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) -        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY_2) -        # mock the key fetching -        km._fetch_keys_from_server = Mock(return_value=[]) -        km.ca_cert_path = ''  # some bogus path so the km does not complain. -        # do the refreshing -        km.refresh_keys() -        km._fetch_keys_from_server.assert_called_once_with( -            ADDRESS_2 -        ) -      def test_get_key_fetches_from_server(self):          """          Test that getting a key successfuly fetches from server. @@ -465,7 +448,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              headers = {'content-type': 'application/json'}              def json(self): -                return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2} +                return {'address': ADDRESS, 'openpgp': PUBLIC_KEY}              def raise_for_status(self):                  pass @@ -475,13 +458,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km.ca_cert_path = 'cacertpath'          # try to key get without fetching from server          self.assertRaises( -            KeyNotFound, km.get_key, ADDRESS_2, OpenPGPKey, +            KeyNotFound, km.get_key, ADDRESS, OpenPGPKey,              fetch_remote=False          )          # try to get key fetching from server. -        key = km.get_key(ADDRESS_2, OpenPGPKey) +        key = km.get_key(ADDRESS, OpenPGPKey)          self.assertIsInstance(key, OpenPGPKey) -        self.assertEqual(ADDRESS_2, key.address) +        self.assertEqual(ADDRESS, key.address)  class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): | 
