From 62b5a7798924188ba915a1c095917d8709e20ae7 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 23 Apr 2013 20:50:02 -0300 Subject: Refactor, fixes, add api, tests. * Change KeyTypeWrapper to EncryptionScheme * Change OpenPGPWrapper to OpenPGPScheme * Add missing and standardized crypto API. * Add delete_key() * Fix put_key raw so it puts either public or private keys. * Fix gpg's is_encrypted() * Fix openpgp's safe callbacks so they return correctly. * Remove binascii because it generates invalid doc ids. * Add tests. --- src/leap/common/keymanager/__init__.py | 8 +- src/leap/common/keymanager/errors.py | 7 + src/leap/common/keymanager/gpg.py | 2 +- src/leap/common/keymanager/keys.py | 25 +++- src/leap/common/keymanager/openpgp.py | 212 +++++++++++++++++++++++++++---- src/leap/common/keymanager/util.py | 12 +- src/leap/common/tests/test_keymanager.py | 148 +++++++++++++++------ 7 files changed, 333 insertions(+), 81 deletions(-) diff --git a/src/leap/common/keymanager/__init__.py b/src/leap/common/keymanager/__init__.py index a195724..f939a4e 100644 --- a/src/leap/common/keymanager/__init__.py +++ b/src/leap/common/keymanager/__init__.py @@ -33,8 +33,8 @@ from leap.common.keymanager.errors import ( ) from leap.common.keymanager.openpgp import ( OpenPGPKey, - OpenPGPWrapper, - _encrypt_symmetric, + OpenPGPScheme, + encrypt_sym, ) from leap.common.keymanager.http import HTTPClient @@ -56,7 +56,7 @@ class KeyManager(object): self._address = address self._http_client = HTTPClient(url) self._wrapper_map = { - OpenPGPKey: OpenPGPWrapper(soledad), + OpenPGPKey: OpenPGPScheme(soledad), # other types of key will be added to this mapper. } @@ -95,7 +95,7 @@ class KeyManager(object): if send_private: privkey = json.loads( self.get_key(self._address, ktype, private=True).get_json()) - privkey.key_data = _encrypt_symmetric(data, passphrase) + privkey.key_data = encrypt_sym(data, passphrase) data['keys'].append(privkey) headers = None # TODO: replace for token-based-auth self._http_client.request( diff --git a/src/leap/common/keymanager/errors.py b/src/leap/common/keymanager/errors.py index 886c666..add6a38 100644 --- a/src/leap/common/keymanager/errors.py +++ b/src/leap/common/keymanager/errors.py @@ -31,3 +31,10 @@ class KeyAlreadyExists(Exception): """ Raised when attempted to create a key that already exists. """ + + +class KeyAttributesDiffer(Exception): + """ + Raised when trying to delete a key but the stored key differs from the key + passed to the delete_key() method. + """ diff --git a/src/leap/common/keymanager/gpg.py b/src/leap/common/keymanager/gpg.py index 5571ace..f3e6453 100644 --- a/src/leap/common/keymanager/gpg.py +++ b/src/leap/common/keymanager/gpg.py @@ -394,4 +394,4 @@ class GPGWrapper(gnupg.GPG): @return: Whether data is encrypted to a key. @rtype: bool """ - self.is_encrypted_asym() or self.is_encrypted_sym() + return self.is_encrypted_asym(data) or self.is_encrypted_sym(data) diff --git a/src/leap/common/keymanager/keys.py b/src/leap/common/keymanager/keys.py index bed407c..250c2fa 100644 --- a/src/leap/common/keymanager/keys.py +++ b/src/leap/common/keymanager/keys.py @@ -17,7 +17,7 @@ """ -Abstact key type and wrapper representations. +Abstact key type and encryption scheme representations. """ @@ -86,22 +86,23 @@ class EncryptionKey(object): # -# Key wrappers +# Encryption schemes # -class KeyTypeWrapper(object): +class EncryptionScheme(object): """ - Abstract class for Key Type Wrappers. + Abstract class for Encryption Schemes. - A wrapper for a certain key type should know how to get and put keys in - local storage using Soledad and also how to generate new keys. + A wrapper for a certain encryption schemes should know how to get and put + keys in local storage using Soledad, how to generate new keys and how to + find out about possibly encrypted content. """ __metaclass__ = ABCMeta def __init__(self, soledad): """ - Initialize the Key Type Wrapper. + Initialize this Encryption Scheme. @param soledad: A Soledad instance for local storage of keys. @type soledad: leap.soledad.Soledad @@ -139,6 +140,16 @@ class KeyTypeWrapper(object): @param address: The address bound to the key. @type address: str + @return: The key bound to C{address}. @rtype: EncryptionKey """ + + @abstractmethod + def delete_key(self, key): + """ + Remove C{key} from storage. + + @param key: The key to be removed. + @type key: EncryptionKey + """ diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index cd37138..ace8c1e 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -32,7 +32,7 @@ from leap.common.keymanager.errors import ( ) from leap.common.keymanager.keys import ( EncryptionKey, - KeyTypeWrapper, + EncryptionScheme, ) from leap.common.keymanager.gpg import GPGWrapper from leap.common.keymanager.util import ( @@ -46,29 +46,137 @@ from leap.common.keymanager.util import ( # Utility functions # -def _encrypt_symmetric(data, password): +def encrypt_sym(data, passphrase): """ - Encrypt C{data} with C{password}. + Encrypt C{data} with C{passphrase}. - This function uses the OpenPGP wrapper to perform the encryption. + @param data: The data to be encrypted. + @type data: str + @param passphrase: The passphrase used to encrypt C{data}. + @type passphrase: str + + @return: The encrypted data. + @rtype: str + """ + + def _encrypt_cb(gpg): + return str( + gpg.encrypt( + data, None, passphrase=passphrase, symmetric=True)) + + return _safe_call(_encrypt_cb) + + +def decrypt_sym(data, passphrase): + """ + Decrypt C{data} with C{passphrase}. + + @param data: The data to be decrypted. + @type data: str + @param passphrase: The passphrase used to decrypt C{data}. + @type passphrase: str + + @return: The decrypted data. + @rtype: str + """ + + def _decrypt_cb(gpg): + return str(gpg.decrypt(data, passphrase=passphrase)) + + return _safe_call(_decrypt_cb) + + +def encrypt_asym(data, key): + """ + Encrypt C{data} using public @{key}. @param data: The data to be encrypted. @type data: str - @param password: The password used to encrypt C{data}. - @type password: str + @param key: The key used to encrypt. + @type key: OpenPGPKey + @return: The encrypted data. @rtype: str """ - cyphertext = None + leap_assert(key.private is False, 'Key is not public.') def _encrypt_cb(gpg): - cyphertext = str( + return str( gpg.encrypt( - data, None, passphrase=password, symmetric=True)) - data['keys'].append(privkey) + data, key.fingerprint, symmetric=False)) + + return _safe_call(_encrypt_cb, key.key_data) + + +def decrypt_asym(data, key): + """ + Decrypt C{data} using private @{key}. + + @param data: The data to be decrypted. + @type data: str + @param key: The key used to decrypt. + @type key: OpenPGPKey + + @return: The decrypted data. + @rtype: str + """ + leap_assert(key.private is True, 'Key is not private.') - _safe_call(_encrypt_cb) - return cyphertext + def _decrypt_cb(gpg): + return str(gpg.decrypt(data)) + + return _safe_call(_decrypt_cb, key.key_data) + + +def is_encrypted(data): + """ + Return whether C{data} was encrypted using OpenPGP. + + @param data: The data we want to know about. + @type data: str + + @return: Whether C{data} was encrypted using this wrapper. + @rtype: bool + """ + + def _is_encrypted_cb(gpg): + return gpg.is_encrypted(data) + + return _safe_call(_is_encrypted_cb) + + +def is_encrypted_sym(data): + """ + Return whether C{data} was encrypted using a public OpenPGP key. + + @param data: The data we want to know about. + @type data: str + + @return: Whether C{data} was encrypted using this wrapper. + @rtype: bool + """ + + def _is_encrypted_cb(gpg): + return gpg.is_encrypted_sym(data) + + return _safe_call(_is_encrypted_cb) + + +def is_encrypted_asym(data): + """ + Return whether C{data} was asymmetrically encrypted using OpenPGP. + + @param data: The data we want to know about. + @type data: str + + @return: Whether C{data} was encrypted using this wrapper. + @rtype: bool + """ + + def _is_encrypted_cb(gpg): + return gpg.is_encrypted_asym(data) + + return _safe_call(_is_encrypted_cb) def _build_key_from_gpg(address, key, key_data): @@ -154,10 +262,14 @@ def _safe_call(callback, key_data=None, **kwargs): @type key_data: str @param **kwargs: Other eventual parameters for the callback. @type **kwargs: **dict + + @return: The results of the callback. + @rtype: str or bool """ gpg = _build_unitary_gpgwrapper(key_data) - callback(gpg, **kwargs) + val = callback(gpg, **kwargs) _destroy_unitary_gpgwrapper(gpg) + return val # @@ -170,7 +282,7 @@ class OpenPGPKey(EncryptionKey): """ -class OpenPGPWrapper(KeyTypeWrapper): +class OpenPGPScheme(EncryptionScheme): """ A wrapper for OpenPGP keys. """ @@ -182,8 +294,7 @@ class OpenPGPWrapper(KeyTypeWrapper): @param soledad: A Soledad instance for key storage. @type soledad: leap.soledad.Soledad """ - KeyTypeWrapper.__init__(self, soledad) - self._soledad = soledad + EncryptionScheme.__init__(self, soledad) def gen_key(self, address): """ @@ -223,10 +334,13 @@ class OpenPGPWrapper(KeyTypeWrapper): leap_assert( re.match('.*<%s>$' % address, key['uids'][0]) is not None, 'Key not correctly bound to address.') - openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'])) - self.put_key(openpgp_key) + # insert both public and private keys in storage + for secret in [True, False]: + key = gpg.list_keys(secret=secret).pop() + openpgp_key = _build_key_from_gpg( + address, key, + gpg.export_keys(key['fingerprint'], secret=secret)) + self.put_key(openpgp_key) _safe_call(_gen_key_cb) return self.get_key(address, private=True) @@ -262,15 +376,38 @@ class OpenPGPWrapper(KeyTypeWrapper): def _put_key_raw_cb(gpg): - key = gpg.list_keys(secret=False).pop() # unitary keyring + privkey = None + pubkey = None + try: + privkey = gpg.list_keys(secret=True).pop() + except IndexError: + pass + pubkey = gpg.list_keys(secret=False).pop() # unitary keyring # extract adress from first uid on key - match = re.match('.*<([\w.-]+@[\w.-]+)>.*', key['uids'].pop()) + match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop()) leap_assert(match is not None, 'No user address in key data.') address = match.group(1) - openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'])) - self.put_key(openpgp_key) + if privkey is not None: + match = re.match( + '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop()) + leap_assert(match is not None, 'No user address in key data.') + privaddress = match.group(1) + leap_assert( + address == privaddress, + 'Addresses in pub and priv key differ.') + leap_assert( + pubkey['fingerprint'] == privkey['fingerprint'], + 'Fingerprints for pub and priv key differ.') + # insert private key in storage + openpgp_privkey = _build_key_from_gpg( + address, privkey, + gpg.export_keys(privkey['fingerprint'], secret=True)) + self.put_key(openpgp_privkey) + # insert public key in storage + openpgp_pubkey = _build_key_from_gpg( + address, pubkey, + gpg.export_keys(pubkey['fingerprint'], secret=False)) + self.put_key(openpgp_pubkey) _safe_call(_put_key_raw_cb, data) @@ -285,7 +422,8 @@ class OpenPGPWrapper(KeyTypeWrapper): if doc is None: self._soledad.create_doc_from_json( key.get_json(), - doc_id=_keymanager_doc_id(key.address, key.private)) + doc_id=_keymanager_doc_id( + OpenPGPKey, key.address, key.private)) else: doc.set_json(key.get_json()) self._soledad.put_doc(doc) @@ -303,4 +441,22 @@ class OpenPGPWrapper(KeyTypeWrapper): @return: The document with the key or None if it does not exist. @rtype: leap.soledad.backends.leap_backend.LeapDocument """ - return self._soledad.get_doc(_keymanager_doc_id(address, private)) + return self._soledad.get_doc( + _keymanager_doc_id(OpenPGPKey, address, private)) + + def delete_key(self, key): + """ + Remove C{key} from storage. + + @param key: The key to be removed. + @type key: EncryptionKey + """ + leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.') + stored_key = self.get_key(key.address, private=key.private) + if stored_key is None: + raise KeyDoesNotExist(key) + if stored_key.__dict__ != key.__dict__: + raise KeyAttributesDiffer(key) + doc = self._soledad.get_doc( + _keymanager_doc_id(OpenPGPKey, key.address, key.private)) + self._soledad.delete_doc(doc) diff --git a/src/leap/common/keymanager/util.py b/src/leap/common/keymanager/util.py index 42168c8..667d2b2 100644 --- a/src/leap/common/keymanager/util.py +++ b/src/leap/common/keymanager/util.py @@ -25,6 +25,9 @@ import re from hashlib import sha256 +from binascii import b2a_base64 + + from leap.common.check import leap_assert @@ -79,11 +82,13 @@ def _build_key_from_doc(kClass, address, doc): return _build_key_from_dict(kClass, address, doc.content) -def _keymanager_doc_id(address, private=False): +def _keymanager_doc_id(ktype, address, private=False): """ Return the document id for the document containing a key for C{address}. + @param address: The type of the key. + @type address: KeyType @param address: The address bound to the key. @type address: str @param private: Whether the key is private or not. @@ -93,5 +98,6 @@ def _keymanager_doc_id(address, private=False): @rtype: str """ leap_assert(_is_address(address), "Wrong address format: %s" % address) - ktype = 'private' if private else 'public' - return sha256('key-manager-'+address+'-'+ktype).hexdigest() + ktype = str(ktype) + visibility = 'private' if private else 'public' + return sha256('key-manager-'+address+'-'+ktype+'-'+visibility).hexdigest() diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py index 4a2693e..f9b478f 100644 --- a/src/leap/common/tests/test_keymanager.py +++ b/src/leap/common/tests/test_keymanager.py @@ -26,14 +26,17 @@ import unittest from leap.common.testing.basetest import BaseLeapTest from leap.soledad import Soledad +from leap.soledad.crypto import SoledadCrypto + + from leap.common.keymanager import KeyManager, openpgp, KeyNotFound from leap.common.keymanager.openpgp import OpenPGPKey -from leap.common.keymanager.gpg import GPGWrapper from leap.common.keymanager.util import ( _is_address, _build_key_from_dict, _keymanager_doc_id, ) +from leap.common.keymanager import errors class KeyManagerUtilTestCase(BaseLeapTest): @@ -72,32 +75,46 @@ class KeyManagerUtilTestCase(BaseLeapTest): 'validation': 'validation', } key = _build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict) - self.assertEqual(kdict['address'], key.address, + self.assertEqual( + kdict['address'], key.address, 'Wrong data in key.') - self.assertEqual(kdict['key_id'], key.key_id, + self.assertEqual( + kdict['key_id'], key.key_id, 'Wrong data in key.') - self.assertEqual(kdict['fingerprint'], key.fingerprint, + self.assertEqual( + kdict['fingerprint'], key.fingerprint, 'Wrong data in key.') - self.assertEqual(kdict['key_data'], key.key_data, + self.assertEqual( + kdict['key_data'], key.key_data, 'Wrong data in key.') - self.assertEqual(kdict['private'], key.private, + self.assertEqual( + kdict['private'], key.private, 'Wrong data in key.') - self.assertEqual(kdict['length'], key.length, + self.assertEqual( + kdict['length'], key.length, 'Wrong data in key.') - self.assertEqual(kdict['expiry_date'], key.expiry_date, + self.assertEqual( + kdict['expiry_date'], key.expiry_date, 'Wrong data in key.') - self.assertEqual(kdict['first_seen_at'], key.first_seen_at, + self.assertEqual( + kdict['first_seen_at'], key.first_seen_at, 'Wrong data in key.') - self.assertEqual(kdict['last_audited_at'], key.last_audited_at, + self.assertEqual( + kdict['last_audited_at'], key.last_audited_at, 'Wrong data in key.') - self.assertEqual(kdict['validation'], key.validation, + self.assertEqual( + kdict['validation'], key.validation, 'Wrong data in key.') def test__keymanager_doc_id(self): - doc_id1 = _keymanager_doc_id('leap@leap.se', private=False) - doc_id2 = _keymanager_doc_id('leap@leap.se', private=True) - doc_id3 = _keymanager_doc_id('user@leap.se', private=False) - doc_id4 = _keymanager_doc_id('user@leap.se', private=True) + doc_id1 = _keymanager_doc_id( + OpenPGPKey, 'leap@leap.se', private=False) + doc_id2 = _keymanager_doc_id( + OpenPGPKey, 'leap@leap.se', private=True) + doc_id3 = _keymanager_doc_id( + OpenPGPKey, 'user@leap.se', private=False) + doc_id4 = _keymanager_doc_id( + OpenPGPKey, 'user@leap.se', private=True) self.assertFalse(doc_id1 == doc_id2, 'Doc ids are equal!') self.assertFalse(doc_id1 == doc_id3, 'Doc ids are equal!') self.assertFalse(doc_id1 == doc_id4, 'Doc ids are equal!') @@ -119,7 +136,7 @@ class KeyManagerCryptoTestCase(BaseLeapTest): ) # initialize solead by hand for testing purposes self._soledad._init_dirs() - self._soledad._gpg = GPGWrapper(gnupghome=self.tempdir+"/gnupg") + self._soledad._crypto = SoledadCrypto(self._soledad) self._soledad._shared_db = None self._soledad._init_keys() self._soledad._init_db() @@ -130,31 +147,86 @@ class KeyManagerCryptoTestCase(BaseLeapTest): def _key_manager(user='user@leap.se', url='https://domain.org:6425'): return KeyManager(user, url) - def test_openpgp_gen_key(self): - pgp = openpgp.OpenPGPWrapper(self._soledad) - try: - pgp.get_key('user@leap.se') - except KeyNotFound: - key = pgp.gen_key('user@leap.se') - self.assertIsInstance(key, openpgp.OpenPGPKey) - self.assertEqual( - 'user@leap.se', key.address, 'Wrong address bound to key.') - self.assertEqual( - '4096', key.length, 'Wrong key length.') + def _test_openpgp_gen_key(self): + pgp = openpgp.OpenPGPScheme(self._soledad) + self.assertRaises(KeyNotFound, pgp.get_key, 'user@leap.se') + key = pgp.gen_key('user@leap.se') + self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertEqual( + 'user@leap.se', key.address, 'Wrong address bound to key.') + self.assertEqual( + '4096', key.length, 'Wrong key length.') + + def test_openpgp_put_delete_key(self): + pgp = openpgp.OpenPGPScheme(self._soledad) + self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + pgp.put_key_raw(PUBLIC_KEY) + key = pgp.get_key('leap@leap.se', private=False) + pgp.delete_key(key) + self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') def test_openpgp_put_key_raw(self): - pgp = openpgp.OpenPGPWrapper(self._soledad) - try: - pgp.get_key('leap@leap.se') - except KeyNotFound: - pgp.put_key_raw(PUBLIC_KEY) - key = pgp.get_key('leap@leap.se') - self.assertIsInstance(key, openpgp.OpenPGPKey) - self.assertEqual( - 'leap@leap.se', key.address, 'Wrong address bound to key.') - self.assertEqual( - '4096', key.length, 'Wrong key length.') + pgp = openpgp.OpenPGPScheme(self._soledad) + self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + pgp.put_key_raw(PUBLIC_KEY) + key = pgp.get_key('leap@leap.se', private=False) + self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertEqual( + 'leap@leap.se', key.address, 'Wrong address bound to key.') + self.assertEqual( + '4096', key.length, 'Wrong key length.') + pgp.delete_key(key) + self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + + def test_get_public_key(self): + pgp = openpgp.OpenPGPScheme(self._soledad) + self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + pgp.put_key_raw(PUBLIC_KEY) + self.assertRaises( + KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) + key = pgp.get_key('leap@leap.se', private=False) + self.assertEqual('leap@leap.se', key.address) + self.assertFalse(key.private) + self.assertEqual(KEY_FINGERPRINT, key.fingerprint) + pgp.delete_key(key) + self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + + def test_openpgp_encrypt_decrypt_asym(self): + # encrypt + pgp = openpgp.OpenPGPScheme(self._soledad) + pgp.put_key_raw(PUBLIC_KEY) + pubkey = pgp.get_key('leap@leap.se', private=False) + cyphertext = openpgp.encrypt_asym('data', pubkey) + # assert + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + self.assertTrue(openpgp.is_encrypted_asym(cyphertext)) + self.assertFalse(openpgp.is_encrypted_sym(cyphertext)) + self.assertTrue(openpgp.is_encrypted(cyphertext)) + # decrypt + self.assertRaises( + KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) + pgp.put_key_raw(PRIVATE_KEY) + privkey = pgp.get_key('leap@leap.se', private=True) + plaintext = openpgp.decrypt_asym(cyphertext, privkey) + pgp.delete_key(pubkey) + pgp.delete_key(privkey) + self.assertRaises( + KeyNotFound, pgp.get_key, 'leap@leap.se', private=False) + self.assertRaises( + KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) + def test_openpgp_encrypt_decrypt_sym(self): + cyphertext = openpgp.encrypt_sym('data', 'pass') + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + self.assertTrue(openpgp.is_encrypted_sym(cyphertext)) + self.assertFalse(openpgp.is_encrypted_asym(cyphertext)) + self.assertTrue(openpgp.is_encrypted(cyphertext)) + plaintext = openpgp.decrypt_sym(cyphertext, 'pass') + self.assertEqual('data', plaintext) # Key material for testing -- cgit v1.2.3