summaryrefslogtreecommitdiff
path: root/src/leap/keymanager
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager')
-rw-r--r--src/leap/keymanager/__init__.py75
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py44
2 files changed, 43 insertions, 76 deletions
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index 41f352ec..da679acb 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -82,12 +82,12 @@ class KeyManager(object):
gpgbinary=None):
"""
Initialize a Key Manager for user's C{address} with provider's
- nickserver reachable in C{url}.
+ nickserver reachable in C{nickserver_uri}.
- :param address: The address of the user of this Key Manager.
+ :param address: The email address of the user of this Key Manager.
:type address: str
- :param url: The URL of the nickserver.
- :type url: str
+ :param nickserver_uri: The URI of the nickserver.
+ :type nickserver_uri: str
:param soledad: A Soledad instance for local storage of keys.
:type soledad: leap.soledad.Soledad
:param token: The token for interacting with the webapp API.
@@ -98,7 +98,7 @@ class KeyManager(object):
:type api_uri: str
:param api_version: The version of the webapp API.
:type api_version: str
- :param uid: The users' UID.
+ :param uid: The user's UID.
:type uid: str
:param gpgbinary: Name for GnuPG binary executable.
:type gpgbinary: C{str}
@@ -189,6 +189,7 @@ class KeyManager(object):
res.raise_for_status()
return res
+ @memoized_method(invalidation=300)
def _fetch_keys_from_server(self, address):
"""
Fetch keys bound to C{address} from nickserver and insert them in
@@ -228,12 +229,6 @@ class KeyManager(object):
Public key bound to user's is sent to provider, which will sign it and
replace any prior keys for the same address in its database.
- If C{send_private} is True, then the private key is encrypted with
- C{password} and sent to server in the same request, together with a
- hash string of user's address and password. The encrypted private key
- will be saved in the server in a way it is publicly retrievable
- through the hash string.
-
:param ktype: The type of the key.
:type ktype: KeyType
@@ -255,13 +250,6 @@ class KeyManager(object):
self._put(uri, data)
signal(proto.KEYMANAGER_DONE_UPLOADING_KEYS, self._address)
- @memoized_method
- def get_key_from_cache(self, *args, **kwargs):
- """
- Public interface to `get_key`, that is memoized.
- """
- return self.get_key(*args, **kwargs)
-
def get_key(self, address, ktype, private=False, fetch_remote=True):
"""
Return a key of type C{ktype} bound to C{address}.
@@ -275,6 +263,9 @@ class KeyManager(object):
:type ktype: KeyType
:param private: Look for a private key instead of a public one?
:type private: bool
+ :param fetch_remote: If key not found in local storage try to fetch
+ from nickserver
+ :type fetch_remote: bool
:return: A key of type C{ktype} bound to C{address}.
:rtype: EncryptionKey
@@ -307,10 +298,13 @@ class KeyManager(object):
return key
- def get_all_keys_in_local_db(self, private=False):
+ def get_all_keys(self, private=False):
"""
Return all keys stored in local database.
+ :param private: Include private keys
+ :type private: bool
+
:return: A list with all keys in local db.
:rtype: list
"""
@@ -324,19 +318,6 @@ class KeyManager(object):
KEYMANAGER_KEY_TAG,
'1' if private else '0'))
- def refresh_keys(self):
- """
- Fetch keys from nickserver and update them locally.
- """
- addresses = set(map(
- lambda doc: doc.address,
- self.get_all_keys_in_local_db(private=False)))
- for address in addresses:
- # do not attempt to refresh our own key
- if address == self._address:
- continue
- self._fetch_keys_from_server(address)
-
def gen_key(self, ktype):
"""
Generate a key of type C{ktype} bound to the user's address.
@@ -416,6 +397,9 @@ class KeyManager(object):
:type data: str
:param pubkey: The key used to encrypt.
:type pubkey: EncryptionKey
+ :param passphrase: The passphrase for the secret key used for the
+ signature.
+ :type passphrase: str
:param sign: The key used for signing.
:type sign: EncryptionKey
:param cipher_algo: The cipher algorithm to use.
@@ -428,7 +412,7 @@ class KeyManager(object):
leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
leap_assert(pubkey.private is False, 'Key is not public.')
return self._wrapper_map[pubkey.__class__].encrypt(
- data, pubkey, passphrase, sign)
+ data, pubkey, passphrase, sign, cipher_algo=cipher_algo)
def decrypt(self, data, privkey, passphrase=None, verify=None):
"""
@@ -448,7 +432,7 @@ class KeyManager(object):
:rtype: str
:raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
+ C{verify} key.
"""
leap_assert_type(privkey, EncryptionKey)
leap_assert(
@@ -511,20 +495,6 @@ class KeyManager(object):
return self._wrapper_map[pubkey.__class__].verify(
data, pubkey, detached_sig=detached_sig)
- def parse_openpgp_ascii_key(self, key_data):
- """
- Parses an ascii armored key (or key pair) data and returns
- the OpenPGPKey keys.
-
- :param key_data: the key data to be parsed.
- :type key_data: str or unicode
-
- :returns: the public key and private key (if applies) for that data.
- :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
- the tuple may have one or both components None
- """
- return self._wrapper_map[OpenPGPKey].parse_ascii_key(key_data)
-
def delete_key(self, key):
"""
Remove C{key} from storage.
@@ -545,11 +515,14 @@ class KeyManager(object):
"""
Put C{key} in local storage.
- :param key: The key to be stored.
- :type key: OpenPGPKey
+ :param key: The key to be stored. It can be ascii key or an OpenPGPKey
+ :type key: str or OpenPGPKey
"""
try:
- self._wrapper_map[type(key)].put_key(key)
+ if isinstance(key, basestring):
+ self._wrapper_map[OpenPGPKey].put_ascii_key(key)
+ else:
+ self._wrapper_map[type(key)].put_key(key)
except IndexError as e:
leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index f89dcd9b..39b729d8 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -142,9 +142,9 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
def tearDown(self):
km = self._key_manager()
- for key in km.get_all_keys_in_local_db():
+ for key in km.get_all_keys():
km._wrapper_map[key.__class__].delete_key(key)
- for key in km.get_all_keys_in_local_db(private=True):
+ for key in km.get_all_keys(private=True):
km._wrapper_map[key.__class__].delete_key(key)
def _key_manager(self, user=ADDRESS, url='', token=None):
@@ -343,12 +343,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager()
km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
# get public keys
- keys = km.get_all_keys_in_local_db(False)
+ keys = km.get_all_keys(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
self.assertEqual(ADDRESS, keys[0].address)
self.assertFalse(keys[0].private)
# get private keys
- keys = km.get_all_keys_in_local_db(True)
+ keys = km.get_all_keys(True)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
self.assertEqual(ADDRESS, keys[0].address)
self.assertTrue(keys[0].private)
@@ -437,23 +437,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
verify='cacertpath',
)
- def test_refresh_keys_does_not_refresh_own_key(self):
- """
- Test that refreshing keys will not attempt to refresh our own key.
- """
- km = self._key_manager()
- # we add 2 keys but we expect it to only refresh the second one.
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY_2)
- # mock the key fetching
- km._fetch_keys_from_server = Mock(return_value=[])
- km.ca_cert_path = '' # some bogus path so the km does not complain.
- # do the refreshing
- km.refresh_keys()
- km._fetch_keys_from_server.assert_called_once_with(
- ADDRESS_2
- )
-
def test_get_key_fetches_from_server(self):
"""
Test that getting a key successfuly fetches from server.
@@ -465,7 +448,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
headers = {'content-type': 'application/json'}
def json(self):
- return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2}
+ return {'address': ADDRESS, 'openpgp': PUBLIC_KEY}
def raise_for_status(self):
pass
@@ -475,13 +458,24 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
self.assertRaises(
- KeyNotFound, km.get_key, ADDRESS_2, OpenPGPKey,
+ KeyNotFound, km.get_key, ADDRESS, OpenPGPKey,
fetch_remote=False
)
# try to get key fetching from server.
- key = km.get_key(ADDRESS_2, OpenPGPKey)
+ key = km.get_key(ADDRESS, OpenPGPKey)
self.assertIsInstance(key, OpenPGPKey)
- self.assertEqual(ADDRESS_2, key.address)
+ self.assertEqual(ADDRESS, key.address)
+
+ def test_put_key_ascii(self):
+ """
+ Test that putting ascii key works
+ """
+ km = self._key_manager(url='http://nickserver.domain')
+
+ km.put_key(PUBLIC_KEY)
+ key = km.get_key(ADDRESS, OpenPGPKey)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertEqual(ADDRESS, key.address)
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):