From 62b5a7798924188ba915a1c095917d8709e20ae7 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 23 Apr 2013 20:50:02 -0300 Subject: Refactor, fixes, add api, tests. * Change KeyTypeWrapper to EncryptionScheme * Change OpenPGPWrapper to OpenPGPScheme * Add missing and standardized crypto API. * Add delete_key() * Fix put_key raw so it puts either public or private keys. * Fix gpg's is_encrypted() * Fix openpgp's safe callbacks so they return correctly. * Remove binascii because it generates invalid doc ids. * Add tests. --- src/leap/common/keymanager/openpgp.py | 212 +++++++++++++++++++++++++++++----- 1 file changed, 184 insertions(+), 28 deletions(-) (limited to 'src/leap/common/keymanager/openpgp.py') diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index cd37138..ace8c1e 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -32,7 +32,7 @@ from leap.common.keymanager.errors import ( ) from leap.common.keymanager.keys import ( EncryptionKey, - KeyTypeWrapper, + EncryptionScheme, ) from leap.common.keymanager.gpg import GPGWrapper from leap.common.keymanager.util import ( @@ -46,29 +46,137 @@ from leap.common.keymanager.util import ( # Utility functions # -def _encrypt_symmetric(data, password): +def encrypt_sym(data, passphrase): """ - Encrypt C{data} with C{password}. + Encrypt C{data} with C{passphrase}. - This function uses the OpenPGP wrapper to perform the encryption. + @param data: The data to be encrypted. + @type data: str + @param passphrase: The passphrase used to encrypt C{data}. + @type passphrase: str + + @return: The encrypted data. + @rtype: str + """ + + def _encrypt_cb(gpg): + return str( + gpg.encrypt( + data, None, passphrase=passphrase, symmetric=True)) + + return _safe_call(_encrypt_cb) + + +def decrypt_sym(data, passphrase): + """ + Decrypt C{data} with C{passphrase}. + + @param data: The data to be decrypted. + @type data: str + @param passphrase: The passphrase used to decrypt C{data}. + @type passphrase: str + + @return: The decrypted data. + @rtype: str + """ + + def _decrypt_cb(gpg): + return str(gpg.decrypt(data, passphrase=passphrase)) + + return _safe_call(_decrypt_cb) + + +def encrypt_asym(data, key): + """ + Encrypt C{data} using public @{key}. @param data: The data to be encrypted. @type data: str - @param password: The password used to encrypt C{data}. - @type password: str + @param key: The key used to encrypt. + @type key: OpenPGPKey + @return: The encrypted data. @rtype: str """ - cyphertext = None + leap_assert(key.private is False, 'Key is not public.') def _encrypt_cb(gpg): - cyphertext = str( + return str( gpg.encrypt( - data, None, passphrase=password, symmetric=True)) - data['keys'].append(privkey) + data, key.fingerprint, symmetric=False)) + + return _safe_call(_encrypt_cb, key.key_data) + + +def decrypt_asym(data, key): + """ + Decrypt C{data} using private @{key}. + + @param data: The data to be decrypted. + @type data: str + @param key: The key used to decrypt. + @type key: OpenPGPKey + + @return: The decrypted data. + @rtype: str + """ + leap_assert(key.private is True, 'Key is not private.') - _safe_call(_encrypt_cb) - return cyphertext + def _decrypt_cb(gpg): + return str(gpg.decrypt(data)) + + return _safe_call(_decrypt_cb, key.key_data) + + +def is_encrypted(data): + """ + Return whether C{data} was encrypted using OpenPGP. + + @param data: The data we want to know about. + @type data: str + + @return: Whether C{data} was encrypted using this wrapper. + @rtype: bool + """ + + def _is_encrypted_cb(gpg): + return gpg.is_encrypted(data) + + return _safe_call(_is_encrypted_cb) + + +def is_encrypted_sym(data): + """ + Return whether C{data} was encrypted using a public OpenPGP key. + + @param data: The data we want to know about. + @type data: str + + @return: Whether C{data} was encrypted using this wrapper. + @rtype: bool + """ + + def _is_encrypted_cb(gpg): + return gpg.is_encrypted_sym(data) + + return _safe_call(_is_encrypted_cb) + + +def is_encrypted_asym(data): + """ + Return whether C{data} was asymmetrically encrypted using OpenPGP. + + @param data: The data we want to know about. + @type data: str + + @return: Whether C{data} was encrypted using this wrapper. + @rtype: bool + """ + + def _is_encrypted_cb(gpg): + return gpg.is_encrypted_asym(data) + + return _safe_call(_is_encrypted_cb) def _build_key_from_gpg(address, key, key_data): @@ -154,10 +262,14 @@ def _safe_call(callback, key_data=None, **kwargs): @type key_data: str @param **kwargs: Other eventual parameters for the callback. @type **kwargs: **dict + + @return: The results of the callback. + @rtype: str or bool """ gpg = _build_unitary_gpgwrapper(key_data) - callback(gpg, **kwargs) + val = callback(gpg, **kwargs) _destroy_unitary_gpgwrapper(gpg) + return val # @@ -170,7 +282,7 @@ class OpenPGPKey(EncryptionKey): """ -class OpenPGPWrapper(KeyTypeWrapper): +class OpenPGPScheme(EncryptionScheme): """ A wrapper for OpenPGP keys. """ @@ -182,8 +294,7 @@ class OpenPGPWrapper(KeyTypeWrapper): @param soledad: A Soledad instance for key storage. @type soledad: leap.soledad.Soledad """ - KeyTypeWrapper.__init__(self, soledad) - self._soledad = soledad + EncryptionScheme.__init__(self, soledad) def gen_key(self, address): """ @@ -223,10 +334,13 @@ class OpenPGPWrapper(KeyTypeWrapper): leap_assert( re.match('.*<%s>$' % address, key['uids'][0]) is not None, 'Key not correctly bound to address.') - openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'])) - self.put_key(openpgp_key) + # insert both public and private keys in storage + for secret in [True, False]: + key = gpg.list_keys(secret=secret).pop() + openpgp_key = _build_key_from_gpg( + address, key, + gpg.export_keys(key['fingerprint'], secret=secret)) + self.put_key(openpgp_key) _safe_call(_gen_key_cb) return self.get_key(address, private=True) @@ -262,15 +376,38 @@ class OpenPGPWrapper(KeyTypeWrapper): def _put_key_raw_cb(gpg): - key = gpg.list_keys(secret=False).pop() # unitary keyring + privkey = None + pubkey = None + try: + privkey = gpg.list_keys(secret=True).pop() + except IndexError: + pass + pubkey = gpg.list_keys(secret=False).pop() # unitary keyring # extract adress from first uid on key - match = re.match('.*<([\w.-]+@[\w.-]+)>.*', key['uids'].pop()) + match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop()) leap_assert(match is not None, 'No user address in key data.') address = match.group(1) - openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'])) - self.put_key(openpgp_key) + if privkey is not None: + match = re.match( + '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop()) + leap_assert(match is not None, 'No user address in key data.') + privaddress = match.group(1) + leap_assert( + address == privaddress, + 'Addresses in pub and priv key differ.') + leap_assert( + pubkey['fingerprint'] == privkey['fingerprint'], + 'Fingerprints for pub and priv key differ.') + # insert private key in storage + openpgp_privkey = _build_key_from_gpg( + address, privkey, + gpg.export_keys(privkey['fingerprint'], secret=True)) + self.put_key(openpgp_privkey) + # insert public key in storage + openpgp_pubkey = _build_key_from_gpg( + address, pubkey, + gpg.export_keys(pubkey['fingerprint'], secret=False)) + self.put_key(openpgp_pubkey) _safe_call(_put_key_raw_cb, data) @@ -285,7 +422,8 @@ class OpenPGPWrapper(KeyTypeWrapper): if doc is None: self._soledad.create_doc_from_json( key.get_json(), - doc_id=_keymanager_doc_id(key.address, key.private)) + doc_id=_keymanager_doc_id( + OpenPGPKey, key.address, key.private)) else: doc.set_json(key.get_json()) self._soledad.put_doc(doc) @@ -303,4 +441,22 @@ class OpenPGPWrapper(KeyTypeWrapper): @return: The document with the key or None if it does not exist. @rtype: leap.soledad.backends.leap_backend.LeapDocument """ - return self._soledad.get_doc(_keymanager_doc_id(address, private)) + return self._soledad.get_doc( + _keymanager_doc_id(OpenPGPKey, address, private)) + + def delete_key(self, key): + """ + Remove C{key} from storage. + + @param key: The key to be removed. + @type key: EncryptionKey + """ + leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.') + stored_key = self.get_key(key.address, private=key.private) + if stored_key is None: + raise KeyDoesNotExist(key) + if stored_key.__dict__ != key.__dict__: + raise KeyAttributesDiffer(key) + doc = self._soledad.get_doc( + _keymanager_doc_id(OpenPGPKey, key.address, key.private)) + self._soledad.delete_doc(doc) -- cgit v1.2.3