summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-05-02 22:38:31 -0300
committerdrebs <drebs@leap.se>2013-05-02 22:38:31 -0300
commit71a3f21d3b72566efa6cf024317dfc96624a10f7 (patch)
tree40a36e4817d605439aeedce826751bc154d1bfa6
parent170cd90f593a106ea7730babde310724410a585e (diff)
Add tests for key management remote methods.
-rw-r--r--src/leap/common/keymanager/__init__.py42
-rw-r--r--src/leap/common/keymanager/errors.py6
-rw-r--r--src/leap/common/tests/test_keymanager.py166
3 files changed, 176 insertions, 38 deletions
diff --git a/src/leap/common/keymanager/__init__.py b/src/leap/common/keymanager/__init__.py
index 01dc0da..d6dbb8a 100644
--- a/src/leap/common/keymanager/__init__.py
+++ b/src/leap/common/keymanager/__init__.py
@@ -30,6 +30,7 @@ except ImportError:
from leap.common.check import leap_assert
from leap.common.keymanager.errors import (
KeyNotFound,
+ NoPasswordGiven,
)
from leap.common.keymanager.keys import (
build_key_from_dict,
@@ -51,7 +52,7 @@ INDEXES = {
class KeyManager(object):
- def __init__(self, address, nickserver_url, soledad):
+ def __init__(self, address, nickserver_url, soledad, token=None):
"""
Initialize a Key Manager for user's C{address} with provider's
nickserver reachable in C{url}.
@@ -66,11 +67,13 @@ class KeyManager(object):
self._address = address
self._nickserver_url = nickserver_url
self._soledad = soledad
+ self.token = token
self._wrapper_map = {
OpenPGPKey: OpenPGPScheme(soledad),
# other types of key will be added to this mapper.
}
self._init_indexes()
+ self._fetcher = requests
#
# utilities
@@ -109,7 +112,7 @@ class KeyManager(object):
Make a GET HTTP request and return a dictionary containing the
response.
"""
- response = requests.get(self._nickserver_url+path)
+ response = self._fetcher.get(self._nickserver_url+path)
leap_assert(response.status_code == 200, 'Invalid response.')
leap_assert(
response.headers['content-type'].startswith('application/json')
@@ -142,24 +145,27 @@ class KeyManager(object):
keyserver.
"""
# prepare the public key bound to address
+ pubkey = self.get_key(
+ self._address, ktype, private=False, fetch_remote=False)
data = {
'address': self._address,
'keys': [
- json.loads(
- self.get_key(
- self._address, ktype, private=False).get_json()),
+ json.loads(pubkey.get_json()),
]
}
# prepare the private key bound to address
if send_private:
- privkey = json.loads(
- self.get_key(self._address, ktype, private=True).get_json())
- privkey.key_data = encrypt_sym(data, password)
+ if password is None or password == '':
+ raise NoPasswordGiven('Can\'t send unencrypted private keys!')
+ privkey = self.get_key(
+ self._address, ktype, private=True, fetch_remote=False)
+ privkey = json.loads(privkey.get_json())
+ privkey.key_data = encrypt_sym(privkey.key_data, password)
data['keys'].append(privkey)
- requests.put(
+ self._fetcher.put(
self._nickserver_url + '/key/' + self._address,
data=data,
- auth=(self._address, None)) # TODO: replace for token-based auth.
+ auth=(self._address, self._token))
def get_key(self, address, ktype, private=False, fetch_remote=True):
"""
@@ -248,7 +254,8 @@ class KeyManager(object):
"""
addresses = set(map(
lambda doc: doc.address,
- self.get_all_keys_in_local_db(False)))
+ self.get_all_keys_in_local_db(private=False)))
+ # TODO: maybe we should not attempt to refresh our own public key?
for address in addresses:
for key in self.fetch_keys_from_server(address):
self._wrapper_map[key.__class__].put_key(key)
@@ -264,3 +271,16 @@ class KeyManager(object):
@rtype: EncryptionKey
"""
return self._wrapper_map[ktype].gen_key(self._address)
+
+ #
+ # Token setter/getter
+ #
+
+ def _get_token(self):
+ return self._token
+
+ def _set_token(self, token):
+ self._token = token
+
+ token = property(
+ _get_token, _set_token, doc='The auth token.')
diff --git a/src/leap/common/keymanager/errors.py b/src/leap/common/keymanager/errors.py
index add6a38..1cf506e 100644
--- a/src/leap/common/keymanager/errors.py
+++ b/src/leap/common/keymanager/errors.py
@@ -38,3 +38,9 @@ class KeyAttributesDiffer(Exception):
Raised when trying to delete a key but the stored key differs from the key
passed to the delete_key() method.
"""
+
+class NoPasswordGiven(Exception):
+ """
+ Raised when trying to perform some action that needs a password without
+ providing one.
+ """
diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py
index 32bd1fd..1d7a382 100644
--- a/src/leap/common/tests/test_keymanager.py
+++ b/src/leap/common/tests/test_keymanager.py
@@ -21,6 +21,13 @@ Tests for the Key Manager.
"""
+import mock
+try:
+ import simplejson as json
+except ImportError:
+ import json # noqa
+
+
from leap.common.testing.basetest import BaseLeapTest
from leap.soledad import Soledad
from leap.soledad.crypto import SoledadCrypto
@@ -30,6 +37,7 @@ from leap.common.keymanager import (
KeyManager,
openpgp,
KeyNotFound,
+ NoPasswordGiven,
TAGS_INDEX,
TAGS_AND_PRIVATE_INDEX,
)
@@ -42,6 +50,9 @@ from leap.common.keymanager.keys import (
from leap.common.keymanager import errors
+ADDRESS = 'leap@leap.se'
+
+
class KeyManagerUtilTestCase(BaseLeapTest):
def setUp(self):
@@ -66,7 +77,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):
def test_build_key_from_dict(self):
kdict = {
- 'address': 'leap@leap.se',
+ 'address': ADDRESS,
'key_id': 'key_id',
'fingerprint': 'fingerprint',
'key_data': 'key_data',
@@ -77,7 +88,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):
'last_audited_at': 'last_audited_at',
'validation': 'validation',
}
- key = build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict)
+ key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
self.assertEqual(
kdict['address'], key.address,
'Wrong data in key.')
@@ -111,9 +122,9 @@ class KeyManagerUtilTestCase(BaseLeapTest):
def test_keymanager_doc_id(self):
doc_id1 = keymanager_doc_id(
- OpenPGPKey, 'leap@leap.se', private=False)
+ OpenPGPKey, ADDRESS, private=False)
doc_id2 = keymanager_doc_id(
- OpenPGPKey, 'leap@leap.se', private=True)
+ OpenPGPKey, ADDRESS, private=True)
doc_id3 = keymanager_doc_id(
OpenPGPKey, 'user@leap.se', private=False)
doc_id4 = keymanager_doc_id(
@@ -134,6 +145,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
"123456",
secret_path=self.tempdir+"/secret.gpg",
local_db_path=self.tempdir+"/soledad.u1db",
+ server_url='',
+ cert_file=None,
bootstrap=False,
)
# initialize solead by hand for testing purposes
@@ -144,7 +157,14 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
self._soledad._init_db()
def tearDown(self):
- pass
+ km = self._key_manager()
+ for key in km.get_all_keys_in_local_db():
+ km._wrapper_map[key.__class__].delete_key(key)
+ for key in km.get_all_keys_in_local_db(private=True):
+ km._wrapper_map[key.__class__].delete_key(key)
+
+ def _key_manager(self, user=ADDRESS, url=''):
+ return KeyManager(user, url, self._soledad)
class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -161,43 +181,43 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_openpgp_put_delete_key(self):
pgp = openpgp.OpenPGPScheme(self._soledad)
- self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+ self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_key_raw(PUBLIC_KEY)
- key = pgp.get_key('leap@leap.se', private=False)
+ key = pgp.get_key(ADDRESS, private=False)
pgp.delete_key(key)
- self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+ self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_openpgp_put_key_raw(self):
pgp = openpgp.OpenPGPScheme(self._soledad)
- self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+ self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_key_raw(PUBLIC_KEY)
- key = pgp.get_key('leap@leap.se', private=False)
+ key = pgp.get_key(ADDRESS, private=False)
self.assertIsInstance(key, openpgp.OpenPGPKey)
self.assertEqual(
- 'leap@leap.se', key.address, 'Wrong address bound to key.')
+ ADDRESS, key.address, 'Wrong address bound to key.')
self.assertEqual(
'4096', key.length, 'Wrong key length.')
pgp.delete_key(key)
- self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+ self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_get_public_key(self):
pgp = openpgp.OpenPGPScheme(self._soledad)
- self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+ self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_key_raw(PUBLIC_KEY)
self.assertRaises(
- KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
- key = pgp.get_key('leap@leap.se', private=False)
- self.assertEqual('leap@leap.se', key.address)
+ KeyNotFound, pgp.get_key, ADDRESS, private=True)
+ key = pgp.get_key(ADDRESS, private=False)
+ self.assertEqual(ADDRESS, key.address)
self.assertFalse(key.private)
self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
pgp.delete_key(key)
- self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+ self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_openpgp_encrypt_decrypt_asym(self):
# encrypt
pgp = openpgp.OpenPGPScheme(self._soledad)
pgp.put_key_raw(PUBLIC_KEY)
- pubkey = pgp.get_key('leap@leap.se', private=False)
+ pubkey = pgp.get_key(ADDRESS, private=False)
cyphertext = openpgp.encrypt_asym('data', pubkey)
# assert
self.assertTrue(cyphertext is not None)
@@ -208,16 +228,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertTrue(openpgp.is_encrypted(cyphertext))
# decrypt
self.assertRaises(
- KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
+ KeyNotFound, pgp.get_key, ADDRESS, private=True)
pgp.put_key_raw(PRIVATE_KEY)
- privkey = pgp.get_key('leap@leap.se', private=True)
+ privkey = pgp.get_key(ADDRESS, private=True)
plaintext = openpgp.decrypt_asym(cyphertext, privkey)
pgp.delete_key(pubkey)
pgp.delete_key(privkey)
self.assertRaises(
- KeyNotFound, pgp.get_key, 'leap@leap.se', private=False)
+ KeyNotFound, pgp.get_key, ADDRESS, private=False)
self.assertRaises(
- KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
+ KeyNotFound, pgp.get_key, ADDRESS, private=True)
def test_openpgp_encrypt_decrypt_sym(self):
cyphertext = openpgp.encrypt_sym('data', 'pass')
@@ -234,23 +254,115 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
class KeyManagerKeyManagementTestCase(
KeyManagerWithSoledadTestCase):
- def _key_manager(self, user='leap@leap.se', url=''):
- return KeyManager(user, url, self._soledad)
-
def test_get_all_keys_in_db(self):
km = self._key_manager()
km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)
# get public keys
keys = km.get_all_keys_in_local_db(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual('leap@leap.se', keys[0].address)
+ self.assertEqual(ADDRESS, keys[0].address)
self.assertFalse(keys[0].private)
# get private keys
keys = km.get_all_keys_in_local_db(True)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual('leap@leap.se', keys[0].address)
+ self.assertEqual(ADDRESS, keys[0].address)
self.assertTrue(keys[0].private)
+ def test_get_public_key(self):
+ km = self._key_manager()
+ km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)
+ # get the key
+ key = km.get_key(ADDRESS, OpenPGPKey, private=False,
+ fetch_remote=False)
+ self.assertTrue(key is not None)
+ self.assertEqual(key.address, ADDRESS)
+ self.assertEqual(
+ key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+ self.assertFalse(key.private)
+
+ def test_get_private_key(self):
+ km = self._key_manager()
+ km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)
+ # get the key
+ key = km.get_key(ADDRESS, OpenPGPKey, private=True,
+ fetch_remote=False)
+ self.assertTrue(key is not None)
+ self.assertEqual(key.address, ADDRESS)
+ self.assertEqual(
+ key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+ self.assertTrue(key.private)
+
+ def test_send_key_raises_key_not_found(self):
+ km = self._key_manager()
+ self.assertRaises(
+ KeyNotFound,
+ km.send_key, OpenPGPKey, send_private=False)
+
+ def test_send_private_key_raises_key_not_found(self):
+ km = self._key_manager()
+ km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+ self.assertRaises(
+ KeyNotFound,
+ km.send_key, OpenPGPKey, send_private=True,
+ password='123')
+
+ def test_send_private_key_without_password_raises(self):
+ km = self._key_manager()
+ km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+ self.assertRaises(
+ NoPasswordGiven,
+ km.send_key, OpenPGPKey, send_private=True)
+
+ def test_send_public_key(self):
+ km = self._key_manager()
+ km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+ km._fetcher.put = mock.Mock()
+ km.token = '123'
+ km.send_key(OpenPGPKey, send_private=False)
+ # setup args
+ data = {
+ 'address': km._address,
+ 'keys': [
+ json.loads(
+ km.get_key(
+ km._address, OpenPGPKey).get_json()),
+ ]
+ }
+ url = km._nickserver_url + '/key/' + km._address
+
+ km._fetcher.put.assert_called_once_with(
+ url, data=data, auth=(km._address, '123')
+ )
+
+ def test_fetch_keys_from_server(self):
+ km = self._key_manager()
+ # setup mock
+
+ class Response(object):
+ status_code = 200
+ headers = {'content-type': 'application/json'}
+ def json(self):
+ return {'address': 'anotheruser@leap.se', 'keys': []}
+
+ km._fetcher.get = mock.Mock(
+ return_value=Response())
+ # do the fetch
+ km.fetch_keys_from_server('anotheruser@leap.se')
+ # and verify the call
+ km._fetcher.get.assert_called_once_with(
+ km._nickserver_url + '/key/' + 'anotheruser@leap.se',
+ )
+
+ def test_refresh_keys(self):
+ # TODO: maybe we should not attempt to refresh our own public key?
+ km = self._key_manager()
+ km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+ km.fetch_keys_from_server = mock.Mock(return_value=[])
+ km.refresh_keys()
+ km.fetch_keys_from_server.assert_called_once_with(
+ 'leap@leap.se'
+ )
+
# Key material for testing
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"