From 23e435cf12b9bdd25410b3b8a48ced4168f872d2 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 9 May 2013 17:53:07 -0300 Subject: Encrypt/decrypt can also sign/verify. Also Implement code review comments for openpgp sign. * Add assertions and exceptions to openpgp encrypt/decrypt/sign/verify methods. * Added comments where they will help. * Make code more clear by encapsulating more the access to GPG wrapper and removing concatenation of ascii armored keys. * Add verification of fingerprint of verifying key. * Shorten check for signing key id. --- src/leap/common/keymanager/openpgp.py | 279 +++++++++++++++++++++++----------- 1 file changed, 194 insertions(+), 85 deletions(-) (limited to 'src/leap/common/keymanager/openpgp.py') diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index 0fd314a..0cb1308 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -29,7 +29,11 @@ from leap.common.check import leap_assert, leap_assert_type from leap.common.keymanager.errors import ( KeyNotFound, KeyAlreadyExists, - KeyAttributesDiffer + KeyAttributesDiffer, + InvalidSignature, + EncryptionFailed, + DecryptionFailed, + SignFailed, ) from leap.common.keymanager.keys import ( EncryptionKey, @@ -45,84 +49,163 @@ from leap.common.keymanager.gpg import GPGWrapper # API functions # -def encrypt_sym(data, passphrase): +def encrypt_sym(data, passphrase, sign=None): """ - Encrypt C{data} with C{passphrase}. + Encrypt C{data} with C{passphrase} and sign with C{sign} private key. @param data: The data to be encrypted. @type data: str @param passphrase: The passphrase used to encrypt C{data}. @type passphrase: str + @param sign: The private key used for signing. + @type sign: OpenPGPKey @return: The encrypted data. @rtype: str """ + leap_assert_type(passphrase, str) + if sign is not None: + leap_assert_type(sign, OpenPGPKey) + leap_assert(sign.private == True) def _encrypt_cb(gpg): - return gpg.encrypt( - data, None, passphrase=passphrase, symmetric=True).data - - return _safe_call(_encrypt_cb) - - -def decrypt_sym(data, passphrase): + result = gpg.encrypt( + data, None, + sign=sign.key_id if sign else None, + passphrase=passphrase, symmetric=True) + # Here we cannot assert for correctness of sig because the sig is in + # the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation + if result.ok is False: + raise EncryptionFailed('Failed to encrypt: %s' % result.stderr) + return result.data + + return _safe_call(_encrypt_cb, [sign]) + + +def decrypt_sym(data, passphrase, verify=None): """ - Decrypt C{data} with C{passphrase}. + Decrypt C{data} with C{passphrase} and verify with C{verify} public + key. @param data: The data to be decrypted. @type data: str @param passphrase: The passphrase used to decrypt C{data}. @type passphrase: str + @param verify: The key used to verify a signature. + @type verify: OpenPGPKey @return: The decrypted data. @rtype: str + + @raise InvalidSignature: Raised if unable to verify the signature with + C{verify} key. """ + leap_assert_type(passphrase, str) + if verify is not None: + leap_assert_type(verify, OpenPGPKey) + leap_assert(verify.private == False) def _decrypt_cb(gpg): - return gpg.decrypt(data, passphrase=passphrase).data - - return _safe_call(_decrypt_cb) - - -def encrypt_asym(data, key): + result = gpg.decrypt(data, passphrase=passphrase) + # result.ok - (bool) indicates if the operation succeeded + # result.valid - (bool) indicates if the signature was verified + # result.data - (bool) contains the result of the operation + # result.pubkey_fingerpring - (str) contains the fingerprint of the + # public key that signed this data. + if result.ok is False: + raise DecryptionFailed('Failed to decrypt: %s', result.stderr) + if verify is not None: + if result.valid is False or \ + verify.fingerprint != result.pubkey_fingerprint: + raise InvalidSignature( + 'Failed to verify signature with key %s: %s' % + (verify.key_id, result.stderr)) + return result.data + + return _safe_call(_decrypt_cb, [verify]) + + +def encrypt_asym(data, pubkey, sign=None): """ - Encrypt C{data} using public @{key}. + Encrypt C{data} using public @{key} and sign with C{sign} key. @param data: The data to be encrypted. @type data: str - @param key: The key used to encrypt. - @type key: OpenPGPKey + @param pubkey: The key used to encrypt. + @type pubkey: OpenPGPKey + @param sign: The key used for signing. + @type sign: OpenPGPKey @return: The encrypted data. @rtype: str """ - leap_assert(key.private is False, 'Key is not public.') + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False, 'Key is not public.') + if sign is not None: + leap_assert_type(sign, OpenPGPKey) + leap_assert(sign.private == True) def _encrypt_cb(gpg): - return gpg.encrypt( - data, key.fingerprint, symmetric=False).data - - return _safe_call(_encrypt_cb, key.key_data) - - -def decrypt_asym(data, key): + result = gpg.encrypt( + data, pubkey.fingerprint, + sign=sign.key_id if sign else None, + symmetric=False) + # Here we cannot assert for correctness of sig because the sig is in + # the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation + if result.ok is False: + raise EncryptionFailed( + 'Failed to encrypt with key %s: %s' % + (pubkey.key_id, result.stderr)) + return result.data + + return _safe_call(_encrypt_cb, [pubkey, sign]) + + +def decrypt_asym(data, privkey, verify=None): """ - Decrypt C{data} using private @{key}. + Decrypt C{data} using private @{key} and verify with C{verify} key. @param data: The data to be decrypted. @type data: str - @param key: The key used to decrypt. - @type key: OpenPGPKey + @param privkey: The key used to decrypt. + @type privkey: OpenPGPKey + @param verify: The key used to verify a signature. + @type verify: OpenPGPKey @return: The decrypted data. @rtype: str + + @raise InvalidSignature: Raised if unable to verify the signature with + C{verify} key. """ - leap_assert(key.private is True, 'Key is not private.') + leap_assert(privkey.private is True, 'Key is not private.') + if verify is not None: + leap_assert_type(verify, OpenPGPKey) + leap_assert(verify.private == False) def _decrypt_cb(gpg): - return gpg.decrypt(data).data - - return _safe_call(_decrypt_cb, key.key_data) + result = gpg.decrypt(data) + # result.ok - (bool) indicates if the operation succeeded + # result.valid - (bool) indicates if the signature was verified + # result.data - (bool) contains the result of the operation + # result.pubkey_fingerpring - (str) contains the fingerprint of the + # public key that signed this data. + if result.ok is False: + raise DecryptionFailed('Failed to decrypt with key %s: %s' % + (privkey.key_id, result.stderr)) + if verify is not None: + if result.valid is False or \ + verify.fingerprint != result.pubkey_fingerprint: + raise InvalidSignature( + 'Failed to verify signature with key %s: %s' % + (verify.key_id, result.stderr)) + return result.data + + return _safe_call(_decrypt_cb, [privkey, verify]) def is_encrypted(data): @@ -175,45 +258,61 @@ def is_encrypted_asym(data): return _safe_call(_is_encrypted_cb) -def sign(data, key): +def sign(data, privkey): """ - Sign C{data} with C{key}. + Sign C{data} with C{privkey}. @param data: The data to be signed. @type data: str - @param key: The key to be used to sign. - @type key: OpenPGPKey + @param privkey: The private key to be used to sign. + @type privkey: OpenPGPKey @return: The ascii-armored signed data. @rtype: str """ - leap_assert_type(key, OpenPGPKey) - leap_assert(key.private == True) + leap_assert_type(privkey, OpenPGPKey) + leap_assert(privkey.private == True) def _sign_cb(gpg): - return gpg.sign(data, keyid=key.key_id).data + result = gpg.sign(data, keyid=privkey.key_id) + # result.fingerprint - contains the fingerprint of the key used to + # sign. + if result.fingerprint is None: + raise SignFailed( + 'Failed to sign with key %s: %s' % + (privkey.key_id, result.stderr)) + leap_assert( + result.fingerprint == privkey.fingerprint, + 'Signature and private key fingerprints mismatch: %s != %s' % + (result.fingerprint, privkey.fingerprint)) + return result.data - return _safe_call(_sign_cb, key.key_data) + return _safe_call(_sign_cb, [privkey]) -def verify(data, key): +def verify(data, pubkey): """ - Verify signed C{data} with C{key}. + Verify signed C{data} with C{pubkey}. @param data: The data to be verified. @type data: str - @param key: The key to be used on verification. - @type key: OpenPGPKey + @param pubkey: The public key to be used on verification. + @type pubkey: OpenPGPKey @return: The ascii-armored signed data. @rtype: str """ - leap_assert_type(key, OpenPGPKey) - leap_assert(key.private == False) + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private == False) def _verify_cb(gpg): - return gpg.verify(data).valid + result = gpg.verify(data) + if result.valid is False or \ + result.fingerprint != pubkey.fingerprint: + raise InvalidSignature( + 'Failed to verify signature with key %s.' % pubkey.key_id) + return True - return _safe_call(_verify_cb, key.key_data) + return _safe_call(_verify_cb, [pubkey]) # # Helper functions @@ -248,35 +347,48 @@ def _build_key_from_gpg(address, key, key_data): ) -def _build_unitary_gpgwrapper(key_data=None): +def _build_keyring(keys=[]): """ - Return a temporary GPG wrapper keyring containing exactly zero or one - keys. - Temporary unitary keyrings allow the to use GPG's facilities for exactly - one key. This function creates an empty temporary keyring and imports - C{key_data} if it is not None. + Create an empty GPG keyring and import C{keys} into it. + + @param keys: List of keys to add to the keyring. + @type keys: list of OpenPGPKey - @param key_data: ASCII armored key data. - @type key_data: str @return: A GPG wrapper with a unitary keyring. @rtype: gnupg.GPG """ + privkeys = filter(lambda key: key.private == True, keys) + pubkeys = filter(lambda key: key.private == False, keys) + # here we filter out public keys that have a correspondent private key in + # the list because the private key_data by itself is enough to also have + # the public key in the keyring, and we want to count the keys afterwards. + privaddrs = map(lambda privkey: privkey.address, privkeys) + pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys) + # create temporary dir for temporary gpg keyring tmpdir = tempfile.mkdtemp() gpg = GPGWrapper(gnupghome=tmpdir) leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.') - if key_data: - gpg.import_keys(key_data) - leap_assert( - len(gpg.list_keys()) is 1, - 'Unitary keyring has wrong number of keys: %d.' - % len(gpg.list_keys())) + # import keys into the keyring + gpg.import_keys( + reduce( + lambda x, y: x+y, + [key.key_data for key in pubkeys+privkeys], '')) + # assert the number of keys in the keyring + leap_assert( + len(gpg.list_keys()) == len(pubkeys)+len(privkeys), + 'Wrong number of public keys in keyring: %d, should be %d)' % + (len(gpg.list_keys()), len(pubkeys)+len(privkeys))) + leap_assert( + len(gpg.list_keys(secret=True)) == len(privkeys), + 'Wrong number of private keys in keyring: %d, should be %d)' % + (len(gpg.list_keys(secret=True)), len(privkeys))) return gpg -def _destroy_unitary_gpgwrapper(gpg): +def _destroy_keyring(gpg): """ - Securely erase a unitary keyring. + Securely erase a keyring. @param gpg: A GPG wrapper instance. @type gpg: gnupg.GPG @@ -292,23 +404,21 @@ def _destroy_unitary_gpgwrapper(gpg): shutil.rmtree(gpg.gnupghome) -def _safe_call(callback, key_data=None, **kwargs): +def _safe_call(callback, keys=[]): """ - Run C{callback} in an unitary keyring containing C{key_data}. + Run C{callback} over a keyring containing C{keys}. @param callback: Function whose first argument is the gpg keyring. @type callback: function(gnupg.GPG) - @param key_data: ASCII armored key data. - @type key_data: str - @param **kwargs: Other eventual parameters for the callback. - @type **kwargs: **dict + @param keys: List of keys to add to the keyring. + @type keys: list of OpenPGPKey @return: The results of the callback. @rtype: str or bool """ - gpg = _build_unitary_gpgwrapper(key_data) - val = callback(gpg, **kwargs) - _destroy_unitary_gpgwrapper(gpg) + gpg = _build_keyring(filter(lambda key: key is not None, keys)) + val = callback(gpg) + _destroy_keyring(gpg) return val @@ -404,18 +514,17 @@ class OpenPGPScheme(EncryptionScheme): raise KeyNotFound(address) return build_key_from_dict(OpenPGPKey, address, doc.content) - def put_key_raw(self, data): + def put_ascii_key(self, key_data): """ - Put key contained in raw C{data} in local storage. + Put key contained in ascii-armored C{key_data} in local storage. - @param data: The key data to be stored. - @type data: str + @param key_data: The key data to be stored. + @type key_data: str """ - # TODO: add more checks for correct key data. - leap_assert(data is not None, 'Data does not represent a key.') - - def _put_key_raw_cb(gpg): + leap_assert_type(key_data, str) + def _put_ascii_key_cb(gpg): + gpg.import_keys(key_data) privkey = None pubkey = None try: @@ -449,7 +558,7 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(pubkey['fingerprint'], secret=False)) self.put_key(openpgp_pubkey) - _safe_call(_put_key_raw_cb, data) + _safe_call(_put_ascii_key_cb) def put_key(self, key): """ -- cgit v1.2.3