diff options
author | Kali Kaneko <kali@leap.se> | 2013-05-13 23:06:35 +0900 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2013-05-21 01:57:26 +0900 |
commit | 7c6a87acaead5f54e1f2155ecf0a089eff97d654 (patch) | |
tree | 57faff0c4f476557c3e457514f5aed99abfbc3de /src/leap/common/keymanager/openpgp.py | |
parent | 2a78ab3c730ba652153deaed55cfa8d92089a979 (diff) |
use temporary openpgpwrapper as a context manager
in this way we implicitely catch any exception during
the wrapped call, and ensure that the destructor is always
called.
Diffstat (limited to 'src/leap/common/keymanager/openpgp.py')
-rw-r--r-- | src/leap/common/keymanager/openpgp.py | 526 |
1 files changed, 310 insertions, 216 deletions
diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index d578638..e60833b 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -15,26 +15,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - """ Infrastructure for using OpenPGP keys in Key Manager. """ - - +import logging +import os import re -import tempfile import shutil +import tempfile from leap.common.check import leap_assert, leap_assert_type -from leap.common.keymanager.errors import ( - KeyNotFound, - KeyAlreadyExists, - KeyAttributesDiffer, - InvalidSignature, - EncryptionFailed, - DecryptionFailed, - SignFailed, -) +from leap.common.keymanager import errors + from leap.common.keymanager.keys import ( EncryptionKey, EncryptionScheme, @@ -44,12 +36,242 @@ from leap.common.keymanager.keys import ( ) from leap.common.keymanager.gpg import GPGWrapper +logger = logging.getLogger(__name__) + + +# +# gpg wrapper and decorator +# + +def temporary_gpgwrapper(keys=None): + """ + Returns a unitary gpg wrapper that implements context manager + protocol. + + @param key_data: ASCII armored key data. + @type key_data: str + + @return: a GPGWrapper instance + @rtype: GPGWrapper + """ + # TODO do here checks on key_data + return TempGPGWrapper(keys=keys) + + +def with_temporary_gpg(fun): + """ + Decorator to add a temporary gpg wrapper as context + to gpg related functions. + + Decorated functions are expected to return a function whose only + argument is a gpgwrapper instance. + """ + def wrapped(*args, **kwargs): + """ + We extract the arguments passed to the wrapped function, + run the function and do validations. + We expect that the positional arguments are `data`, + and an optional `key`. + All the rest of arguments should be passed as named arguments + to allow for a correct unpacking. + """ + if len(args) == 2: + keys = args[1] if isinstance(args[1], OpenPGPKey) else None + else: + keys = None + + # sign/verify keys passed as arguments + sign = kwargs.get('sign', None) + if sign: + keys = [keys, sign] + + verify = kwargs.get('verify', None) + if verify: + keys = [keys, verify] + + # is the wrapped function sign or verify? + fun_name = fun.__name__ + is_sign_function = True if fun_name == "sign" else False + is_verify_function = True if fun_name == "verify" else False + + result = None + + with temporary_gpgwrapper(keys) as gpg: + result = fun(*args, **kwargs)(gpg) + + # TODO: cleanup a little bit the + # validation. maybe delegate to other + # auxiliary functions for clarity. + + ok = getattr(result, 'ok', None) + + stderr = getattr(result, 'stderr', None) + if stderr: + logger.debug("%s" % (stderr,)) + + if ok is False: + raise errors.EncryptionDecryptionFailed( + 'Failed to encrypt/decrypt in %s: %s' % ( + fun.__name__, + stderr)) + + if verify is not None: + # A verify key has been passed + if result.valid is False or \ + verify.fingerprint != result.pubkey_fingerprint: + raise errors.InvalidSignature( + 'Failed to verify signature with key %s: %s' % + (verify.key_id, stderr)) + + if is_sign_function: + # Specific validation for sign function + privkey = gpg.list_keys(secret=True).pop() + rfprint = result.fingerprint + kfprint = privkey['fingerprint'] + if result.fingerprint is None: + raise errors.SignFailed( + 'Failed to sign with key %s: %s' % + (privkey['keyid'], stderr)) + leap_assert( + result.fingerprint == kfprint, + 'Signature and private key fingerprints mismatch: ' + '%s != %s' % + (rfprint, kfprint)) + + if is_verify_function: + # Specific validation for verify function + pubkey = gpg.list_keys().pop() + valid = result.valid + rfprint = result.fingerprint + kfprint = pubkey['fingerprint'] + if valid is False or rfprint != kfprint: + raise errors.InvalidSignature( + 'Failed to verify signature ' + 'with key %s.' % pubkey['keyid']) + result = result.valid + + # ok, enough checks. let's return data if available + if hasattr(result, 'data'): + result = result.data + return result + return wrapped + + +class TempGPGWrapper(object): + """ + A context manager returning a temporary GPG wrapper keyring, which + contains exactly zero or one pubkeys, and zero or one privkeys. + + Temporary unitary keyrings allow the to use GPG's facilities for exactly + one key. This function creates an empty temporary keyring and imports + C{keys} if it is not None. + """ + def __init__(self, keys=None): + """ + @param keys: OpenPGP key, or list of. + @type keys: OpenPGPKey or list of OpenPGPKeys + """ + self._gpg = None + if not keys: + keys = list() + if not isinstance(keys, list): + keys = [keys] + self._keys = keys + for key in filter(None, keys): + leap_assert_type(key, OpenPGPKey) + + def __enter__(self): + """ + Calls the unitary gpgwrapper initializer + + @return: A GPG wrapper with a unitary keyring. + @rtype: gnupg.GPG + """ + self._build_keyring() + return self._gpg + + def __exit__(self, exc_type, exc_value, traceback): + """ + Ensures the gpgwrapper is properly destroyed. + """ + # TODO handle exceptions and log here + self._destroy_keyring() + + def _build_keyring(self): + """ + Create an empty GPG keyring and import C{keys} into it. + + @param keys: List of keys to add to the keyring. + @type keys: list of OpenPGPKey + + @return: A GPG wrapper with a unitary keyring. + @rtype: gnupg.GPG + """ + privkeys = [key for key in self._keys if key and key.private is True] + publkeys = [key for key in self._keys if key and key.private is False] + # here we filter out public keys that have a correspondent + # private key in the list because the private key_data by + # itself is enough to also have the public key in the keyring, + # and we want to count the keys afterwards. + + privaddrs = map(lambda privkey: privkey.address, privkeys) + publkeys = filter( + lambda pubkey: pubkey.address not in privaddrs, publkeys) + + listkeys = lambda: self._gpg.list_keys() + listsecretkeys = lambda: self._gpg.list_keys(secret=True) + + self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp()) + leap_assert(len(listkeys()) is 0, 'Keyring not empty.') + + # import keys into the keyring: + # concatenating ascii-armored keys, which is correctly + # understood by the GPGWrapper. + + self._gpg.import_keys("".join( + [x.key_data for x in publkeys + privkeys])) + + # assert the number of keys in the keyring + leap_assert( + len(listkeys()) == len(publkeys) + len(privkeys), + 'Wrong number of public keys in keyring: %d, should be %d)' % + (len(listkeys()), len(publkeys) + len(privkeys))) + leap_assert( + len(listsecretkeys()) == len(privkeys), + 'Wrong number of private keys in keyring: %d, should be %d)' % + (len(listsecretkeys()), len(privkeys))) + + def _destroy_keyring(self): + """ + Securely erase a unitary keyring. + """ + # TODO: implement some kind of wiping of data or a more + # secure way that + # does not write to disk. + + try: + for secret in [True, False]: + for key in self._gpg.list_keys(secret=secret): + self._gpg.delete_keys( + key['fingerprint'], + secret=secret) + leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!') + + except: + raise + + finally: + leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'), + "watch out! Tried to remove default gnupg home!") + shutil.rmtree(self._gpg.gnupghome) + # # API functions # -def encrypt_sym(data, passphrase, sign=None): +@with_temporary_gpg +def encrypt_sym(data, passphrase=None, sign=None): """ Encrypt C{data} with C{passphrase} and sign with C{sign} private key. @@ -68,23 +290,19 @@ def encrypt_sym(data, passphrase, sign=None): leap_assert_type(sign, OpenPGPKey) leap_assert(sign.private is True) - def _encrypt_cb(gpg): - result = gpg.encrypt( - data, None, - sign=sign.key_id if sign else None, - passphrase=passphrase, symmetric=True) - # Here we cannot assert for correctness of sig because the sig is in - # the ciphertext. - # result.ok - (bool) indicates if the operation succeeded - # result.data - (bool) contains the result of the operation - if result.ok is False: - raise EncryptionFailed('Failed to encrypt: %s' % result.stderr) - return result.data + # Here we cannot assert for correctness of sig because the sig is in + # the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation - return _safe_call(_encrypt_cb, [sign]) + return lambda gpg: gpg.encrypt( + data, None, + sign=sign.key_id if sign else None, + passphrase=passphrase, symmetric=True) -def decrypt_sym(data, passphrase, verify=None): +@with_temporary_gpg +def decrypt_sym(data, passphrase=None, verify=None): """ Decrypt C{data} with C{passphrase} and verify with C{verify} public key. @@ -107,27 +325,17 @@ def decrypt_sym(data, passphrase, verify=None): leap_assert_type(verify, OpenPGPKey) leap_assert(verify.private is False) - def _decrypt_cb(gpg): - result = gpg.decrypt(data, passphrase=passphrase) - # result.ok - (bool) indicates if the operation succeeded - # result.valid - (bool) indicates if the signature was verified - # result.data - (bool) contains the result of the operation - # result.pubkey_fingerpring - (str) contains the fingerprint of the - # public key that signed this data. - if result.ok is False: - raise DecryptionFailed('Failed to decrypt: %s', result.stderr) - if verify is not None: - if result.valid is False or \ - verify.fingerprint != result.pubkey_fingerprint: - raise InvalidSignature( - 'Failed to verify signature with key %s: %s' % - (verify.key_id, result.stderr)) - return result.data - - return _safe_call(_decrypt_cb, [verify]) - - -def encrypt_asym(data, pubkey, sign=None): + # result.ok - (bool) indicates if the operation succeeded + # result.valid - (bool) indicates if the signature was verified + # result.data - (bool) contains the result of the operation + # result.pubkey_fingerpring - (str) contains the fingerprint of the + # public key that signed this data. + return lambda gpg: gpg.decrypt( + data, passphrase=passphrase) + + +@with_temporary_gpg +def encrypt_asym(data, key, passphrase=None, sign=None): """ Encrypt C{data} using public @{key} and sign with C{sign} key. @@ -141,31 +349,25 @@ def encrypt_asym(data, pubkey, sign=None): @return: The encrypted data. @rtype: str """ - leap_assert_type(pubkey, OpenPGPKey) - leap_assert(pubkey.private is False, 'Key is not public.') + leap_assert_type(key, OpenPGPKey) + leap_assert(key.private is False, 'Key is not public.') if sign is not None: leap_assert_type(sign, OpenPGPKey) leap_assert(sign.private is True) - def _encrypt_cb(gpg): - result = gpg.encrypt( - data, pubkey.fingerprint, - sign=sign.key_id if sign else None, - symmetric=False) - # Here we cannot assert for correctness of sig because the sig is in - # the ciphertext. - # result.ok - (bool) indicates if the operation succeeded - # result.data - (bool) contains the result of the operation - if result.ok is False: - raise EncryptionFailed( - 'Failed to encrypt with key %s: %s' % - (pubkey.key_id, result.stderr)) - return result.data - - return _safe_call(_encrypt_cb, [pubkey, sign]) - - -def decrypt_asym(data, privkey, verify=None): + # Here we cannot assert for correctness of sig because the sig is in + # the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation + + return lambda gpg: gpg.encrypt( + data, key.fingerprint, + sign=sign.key_id if sign else None, + passphrase=passphrase, symmetric=False) + + +@with_temporary_gpg +def decrypt_asym(data, key, passphrase=None, verify=None): """ Decrypt C{data} using private @{key} and verify with C{verify} key. @@ -182,32 +384,16 @@ def decrypt_asym(data, privkey, verify=None): @raise InvalidSignature: Raised if unable to verify the signature with C{verify} key. """ - leap_assert(privkey.private is True, 'Key is not private.') + leap_assert(key.private is True, 'Key is not private.') if verify is not None: leap_assert_type(verify, OpenPGPKey) leap_assert(verify.private is False) - def _decrypt_cb(gpg): - result = gpg.decrypt(data) - # result.ok - (bool) indicates if the operation succeeded - # result.valid - (bool) indicates if the signature was verified - # result.data - (bool) contains the result of the operation - # result.pubkey_fingerpring - (str) contains the fingerprint of the - # public key that signed this data. - if result.ok is False: - raise DecryptionFailed('Failed to decrypt with key %s: %s' % - (privkey.key_id, result.stderr)) - if verify is not None: - if result.valid is False or \ - verify.fingerprint != result.pubkey_fingerprint: - raise InvalidSignature( - 'Failed to verify signature with key %s: %s' % - (verify.key_id, result.stderr)) - return result.data - - return _safe_call(_decrypt_cb, [privkey, verify]) + return lambda gpg: gpg.decrypt( + data, passphrase=passphrase) +@with_temporary_gpg def is_encrypted(data): """ Return whether C{data} was encrypted using OpenPGP. @@ -218,13 +404,10 @@ def is_encrypted(data): @return: Whether C{data} was encrypted using this wrapper. @rtype: bool """ - - def _is_encrypted_cb(gpg): - return gpg.is_encrypted(data) - - return _safe_call(_is_encrypted_cb) + return lambda gpg: gpg.is_encrypted(data) +@with_temporary_gpg def is_encrypted_sym(data): """ Return whether C{data} was encrypted using a public OpenPGP key. @@ -235,13 +418,10 @@ def is_encrypted_sym(data): @return: Whether C{data} was encrypted using this wrapper. @rtype: bool """ - - def _is_encrypted_cb(gpg): - return gpg.is_encrypted_sym(data) - - return _safe_call(_is_encrypted_cb) + return lambda gpg: gpg.is_encrypted_sym(data) +@with_temporary_gpg def is_encrypted_asym(data): """ Return whether C{data} was asymmetrically encrypted using OpenPGP. @@ -252,19 +432,17 @@ def is_encrypted_asym(data): @return: Whether C{data} was encrypted using this wrapper. @rtype: bool """ - - def _is_encrypted_cb(gpg): - return gpg.is_encrypted_asym(data) - - return _safe_call(_is_encrypted_cb) + return lambda gpg: gpg.is_encrypted_asym(data) +@with_temporary_gpg def sign(data, privkey): """ Sign C{data} with C{privkey}. @param data: The data to be signed. @type data: str + @param privkey: The private key to be used to sign. @type privkey: OpenPGPKey @@ -274,53 +452,36 @@ def sign(data, privkey): leap_assert_type(privkey, OpenPGPKey) leap_assert(privkey.private is True) - def _sign_cb(gpg): - result = gpg.sign(data, keyid=privkey.key_id) - # result.fingerprint - contains the fingerprint of the key used to - # sign. - if result.fingerprint is None: - raise SignFailed( - 'Failed to sign with key %s: %s' % - (privkey.key_id, result.stderr)) - leap_assert( - result.fingerprint == privkey.fingerprint, - 'Signature and private key fingerprints mismatch: %s != %s' % - (result.fingerprint, privkey.fingerprint)) - return result.data - - return _safe_call(_sign_cb, [privkey]) + # result.fingerprint - contains the fingerprint of the key used to + # sign. + return lambda gpg: gpg.sign(data, keyid=privkey.key_id) -def verify(data, pubkey): +@with_temporary_gpg +def verify(data, key): """ Verify signed C{data} with C{pubkey}. @param data: The data to be verified. @type data: str + @param pubkey: The public key to be used on verification. @type pubkey: OpenPGPKey @return: The ascii-armored signed data. @rtype: str """ - leap_assert_type(pubkey, OpenPGPKey) - leap_assert(pubkey.private is False) - - def _verify_cb(gpg): - result = gpg.verify(data) - if result.valid is False or \ - result.fingerprint != pubkey.fingerprint: - raise InvalidSignature( - 'Failed to verify signature with key %s.' % pubkey.key_id) - return True + leap_assert_type(key, OpenPGPKey) + leap_assert(key.private is False) - return _safe_call(_verify_cb, [pubkey]) + return lambda gpg: gpg.verify(data) # # Helper functions # + def _build_key_from_gpg(address, key, key_data): """ Build an OpenPGPKey for C{address} based on C{key} from @@ -350,81 +511,6 @@ def _build_key_from_gpg(address, key, key_data): ) -def _build_keyring(keys=[]): - """ - - Create an empty GPG keyring and import C{keys} into it. - - @param keys: List of keys to add to the keyring. - @type keys: list of OpenPGPKey - - @return: A GPG wrapper with a unitary keyring. - @rtype: gnupg.GPG - """ - privkeys = filter(lambda key: key.private is True, keys) - pubkeys = filter(lambda key: key.private is False, keys) - # here we filter out public keys that have a correspondent private key in - # the list because the private key_data by itself is enough to also have - # the public key in the keyring, and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address, privkeys) - pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys) - # create temporary dir for temporary gpg keyring - tmpdir = tempfile.mkdtemp() - gpg = GPGWrapper(gnupghome=tmpdir) - leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.') - # import keys into the keyring - gpg.import_keys( - reduce( - lambda x, y: x+y, - [key.key_data for key in pubkeys+privkeys], '')) - # assert the number of keys in the keyring - leap_assert( - len(gpg.list_keys()) == len(pubkeys)+len(privkeys), - 'Wrong number of public keys in keyring: %d, should be %d)' % - (len(gpg.list_keys()), len(pubkeys)+len(privkeys))) - leap_assert( - len(gpg.list_keys(secret=True)) == len(privkeys), - 'Wrong number of private keys in keyring: %d, should be %d)' % - (len(gpg.list_keys(secret=True)), len(privkeys))) - return gpg - - -def _destroy_keyring(gpg): - """ - Securely erase a keyring. - - @param gpg: A GPG wrapper instance. - @type gpg: gnupg.GPG - """ - for secret in [True, False]: - for key in gpg.list_keys(secret=secret): - gpg.delete_keys( - key['fingerprint'], - secret=secret) - leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty!') - # TODO: implement some kind of wiping of data or a more secure way that - # does not write to disk. - shutil.rmtree(gpg.gnupghome) - - -def _safe_call(callback, keys=[]): - """ - Run C{callback} over a keyring containing C{keys}. - - @param callback: Function whose first argument is the gpg keyring. - @type callback: function(gnupg.GPG) - @param keys: List of keys to add to the keyring. - @type keys: list of OpenPGPKey - - @return: The results of the callback. - @rtype: str or bool - """ - gpg = _build_keyring(filter(lambda key: key is not None, keys)) - val = callback(gpg) - _destroy_keyring(gpg) - return val - - # # The OpenPGP wrapper # @@ -463,11 +549,11 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(is_address(address), 'Not an user address: %s' % address) try: self.get_key(address) - raise KeyAlreadyExists(address) - except KeyNotFound: + raise errors.KeyAlreadyExists(address) + except errors.KeyNotFound: pass - def _gen_key_cb(gpg): + def _gen_key(gpg): params = gpg.gen_key_input( key_type='RSA', key_length=4096, @@ -495,7 +581,10 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(key['fingerprint'], secret=secret)) self.put_key(openpgp_key) - _safe_call(_gen_key_cb) + with temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator + _gen_key(gpg) + return self.get_key(address, private=True) def get_key(self, address, private=False): @@ -514,7 +603,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(is_address(address), 'Not an user address: %s' % address) doc = self._get_key_doc(address, private) if doc is None: - raise KeyNotFound(address) + raise errors.KeyNotFound(address) return build_key_from_dict(OpenPGPKey, address, doc.content) def put_ascii_key(self, key_data): @@ -525,11 +614,14 @@ class OpenPGPScheme(EncryptionScheme): @type key_data: str """ leap_assert_type(key_data, str) + # TODO: add more checks for correct key data. + leap_assert(key_data is not None, 'Data does not represent a key.') - def _put_ascii_key_cb(gpg): + def _put_ascii_key(gpg): gpg.import_keys(key_data) privkey = None pubkey = None + try: privkey = gpg.list_keys(secret=True).pop() except IndexError: @@ -561,7 +653,9 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(pubkey['fingerprint'], secret=False)) self.put_key(openpgp_pubkey) - _safe_call(_put_ascii_key_cb) + with temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator + _put_ascii_key(gpg) def put_key(self, key): """ @@ -606,9 +700,9 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.') stored_key = self.get_key(key.address, private=key.private) if stored_key is None: - raise KeyNotFound(key) + raise errors.KeyNotFound(key) if stored_key.__dict__ != key.__dict__: - raise KeyAttributesDiffer(key) + raise errors.KeyAttributesDiffer(key) doc = self._soledad.get_doc( keymanager_doc_id(OpenPGPKey, key.address, key.private)) self._soledad.delete_doc(doc) |