summaryrefslogtreecommitdiff
path: root/src/leap/common/keymanager/openpgp.py
diff options
context:
space:
mode:
authorTomas Touceda <chiiph@leap.se>2013-05-13 10:00:39 -0300
committerTomas Touceda <chiiph@leap.se>2013-05-13 10:00:39 -0300
commit2784a01a4d12dca8ad4c66bac3125c692cee44ef (patch)
tree73ec3ef926a90a7fd5a93d9572ec476a4845e5cd /src/leap/common/keymanager/openpgp.py
parent779afe2e830a48e91a9bda6dc8cd8b35ce555ed5 (diff)
parent8aa61059ecab25888d97aa26bcd59c0ffd09b9e9 (diff)
Merge remote-tracking branch 'drebs/feature/2488-openpgp-sign-3' into develop
Diffstat (limited to 'src/leap/common/keymanager/openpgp.py')
-rw-r--r--src/leap/common/keymanager/openpgp.py279
1 files changed, 194 insertions, 85 deletions
diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py
index 0fd314a..0cb1308 100644
--- a/src/leap/common/keymanager/openpgp.py
+++ b/src/leap/common/keymanager/openpgp.py
@@ -29,7 +29,11 @@ from leap.common.check import leap_assert, leap_assert_type
from leap.common.keymanager.errors import (
KeyNotFound,
KeyAlreadyExists,
- KeyAttributesDiffer
+ KeyAttributesDiffer,
+ InvalidSignature,
+ EncryptionFailed,
+ DecryptionFailed,
+ SignFailed,
)
from leap.common.keymanager.keys import (
EncryptionKey,
@@ -45,84 +49,163 @@ from leap.common.keymanager.gpg import GPGWrapper
# API functions
#
-def encrypt_sym(data, passphrase):
+def encrypt_sym(data, passphrase, sign=None):
"""
- Encrypt C{data} with C{passphrase}.
+ Encrypt C{data} with C{passphrase} and sign with C{sign} private key.
@param data: The data to be encrypted.
@type data: str
@param passphrase: The passphrase used to encrypt C{data}.
@type passphrase: str
+ @param sign: The private key used for signing.
+ @type sign: OpenPGPKey
@return: The encrypted data.
@rtype: str
"""
+ leap_assert_type(passphrase, str)
+ if sign is not None:
+ leap_assert_type(sign, OpenPGPKey)
+ leap_assert(sign.private == True)
def _encrypt_cb(gpg):
- return gpg.encrypt(
- data, None, passphrase=passphrase, symmetric=True).data
-
- return _safe_call(_encrypt_cb)
-
-
-def decrypt_sym(data, passphrase):
+ result = gpg.encrypt(
+ data, None,
+ sign=sign.key_id if sign else None,
+ passphrase=passphrase, symmetric=True)
+ # Here we cannot assert for correctness of sig because the sig is in
+ # the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
+ if result.ok is False:
+ raise EncryptionFailed('Failed to encrypt: %s' % result.stderr)
+ return result.data
+
+ return _safe_call(_encrypt_cb, [sign])
+
+
+def decrypt_sym(data, passphrase, verify=None):
"""
- Decrypt C{data} with C{passphrase}.
+ Decrypt C{data} with C{passphrase} and verify with C{verify} public
+ key.
@param data: The data to be decrypted.
@type data: str
@param passphrase: The passphrase used to decrypt C{data}.
@type passphrase: str
+ @param verify: The key used to verify a signature.
+ @type verify: OpenPGPKey
@return: The decrypted data.
@rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
"""
+ leap_assert_type(passphrase, str)
+ if verify is not None:
+ leap_assert_type(verify, OpenPGPKey)
+ leap_assert(verify.private == False)
def _decrypt_cb(gpg):
- return gpg.decrypt(data, passphrase=passphrase).data
-
- return _safe_call(_decrypt_cb)
-
-
-def encrypt_asym(data, key):
+ result = gpg.decrypt(data, passphrase=passphrase)
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.valid - (bool) indicates if the signature was verified
+ # result.data - (bool) contains the result of the operation
+ # result.pubkey_fingerpring - (str) contains the fingerprint of the
+ # public key that signed this data.
+ if result.ok is False:
+ raise DecryptionFailed('Failed to decrypt: %s', result.stderr)
+ if verify is not None:
+ if result.valid is False or \
+ verify.fingerprint != result.pubkey_fingerprint:
+ raise InvalidSignature(
+ 'Failed to verify signature with key %s: %s' %
+ (verify.key_id, result.stderr))
+ return result.data
+
+ return _safe_call(_decrypt_cb, [verify])
+
+
+def encrypt_asym(data, pubkey, sign=None):
"""
- Encrypt C{data} using public @{key}.
+ Encrypt C{data} using public @{key} and sign with C{sign} key.
@param data: The data to be encrypted.
@type data: str
- @param key: The key used to encrypt.
- @type key: OpenPGPKey
+ @param pubkey: The key used to encrypt.
+ @type pubkey: OpenPGPKey
+ @param sign: The key used for signing.
+ @type sign: OpenPGPKey
@return: The encrypted data.
@rtype: str
"""
- leap_assert(key.private is False, 'Key is not public.')
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ if sign is not None:
+ leap_assert_type(sign, OpenPGPKey)
+ leap_assert(sign.private == True)
def _encrypt_cb(gpg):
- return gpg.encrypt(
- data, key.fingerprint, symmetric=False).data
-
- return _safe_call(_encrypt_cb, key.key_data)
-
-
-def decrypt_asym(data, key):
+ result = gpg.encrypt(
+ data, pubkey.fingerprint,
+ sign=sign.key_id if sign else None,
+ symmetric=False)
+ # Here we cannot assert for correctness of sig because the sig is in
+ # the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
+ if result.ok is False:
+ raise EncryptionFailed(
+ 'Failed to encrypt with key %s: %s' %
+ (pubkey.key_id, result.stderr))
+ return result.data
+
+ return _safe_call(_encrypt_cb, [pubkey, sign])
+
+
+def decrypt_asym(data, privkey, verify=None):
"""
- Decrypt C{data} using private @{key}.
+ Decrypt C{data} using private @{key} and verify with C{verify} key.
@param data: The data to be decrypted.
@type data: str
- @param key: The key used to decrypt.
- @type key: OpenPGPKey
+ @param privkey: The key used to decrypt.
+ @type privkey: OpenPGPKey
+ @param verify: The key used to verify a signature.
+ @type verify: OpenPGPKey
@return: The decrypted data.
@rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
"""
- leap_assert(key.private is True, 'Key is not private.')
+ leap_assert(privkey.private is True, 'Key is not private.')
+ if verify is not None:
+ leap_assert_type(verify, OpenPGPKey)
+ leap_assert(verify.private == False)
def _decrypt_cb(gpg):
- return gpg.decrypt(data).data
-
- return _safe_call(_decrypt_cb, key.key_data)
+ result = gpg.decrypt(data)
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.valid - (bool) indicates if the signature was verified
+ # result.data - (bool) contains the result of the operation
+ # result.pubkey_fingerpring - (str) contains the fingerprint of the
+ # public key that signed this data.
+ if result.ok is False:
+ raise DecryptionFailed('Failed to decrypt with key %s: %s' %
+ (privkey.key_id, result.stderr))
+ if verify is not None:
+ if result.valid is False or \
+ verify.fingerprint != result.pubkey_fingerprint:
+ raise InvalidSignature(
+ 'Failed to verify signature with key %s: %s' %
+ (verify.key_id, result.stderr))
+ return result.data
+
+ return _safe_call(_decrypt_cb, [privkey, verify])
def is_encrypted(data):
@@ -175,45 +258,61 @@ def is_encrypted_asym(data):
return _safe_call(_is_encrypted_cb)
-def sign(data, key):
+def sign(data, privkey):
"""
- Sign C{data} with C{key}.
+ Sign C{data} with C{privkey}.
@param data: The data to be signed.
@type data: str
- @param key: The key to be used to sign.
- @type key: OpenPGPKey
+ @param privkey: The private key to be used to sign.
+ @type privkey: OpenPGPKey
@return: The ascii-armored signed data.
@rtype: str
"""
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private == True)
+ leap_assert_type(privkey, OpenPGPKey)
+ leap_assert(privkey.private == True)
def _sign_cb(gpg):
- return gpg.sign(data, keyid=key.key_id).data
+ result = gpg.sign(data, keyid=privkey.key_id)
+ # result.fingerprint - contains the fingerprint of the key used to
+ # sign.
+ if result.fingerprint is None:
+ raise SignFailed(
+ 'Failed to sign with key %s: %s' %
+ (privkey.key_id, result.stderr))
+ leap_assert(
+ result.fingerprint == privkey.fingerprint,
+ 'Signature and private key fingerprints mismatch: %s != %s' %
+ (result.fingerprint, privkey.fingerprint))
+ return result.data
- return _safe_call(_sign_cb, key.key_data)
+ return _safe_call(_sign_cb, [privkey])
-def verify(data, key):
+def verify(data, pubkey):
"""
- Verify signed C{data} with C{key}.
+ Verify signed C{data} with C{pubkey}.
@param data: The data to be verified.
@type data: str
- @param key: The key to be used on verification.
- @type key: OpenPGPKey
+ @param pubkey: The public key to be used on verification.
+ @type pubkey: OpenPGPKey
@return: The ascii-armored signed data.
@rtype: str
"""
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private == False)
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private == False)
def _verify_cb(gpg):
- return gpg.verify(data).valid
+ result = gpg.verify(data)
+ if result.valid is False or \
+ result.fingerprint != pubkey.fingerprint:
+ raise InvalidSignature(
+ 'Failed to verify signature with key %s.' % pubkey.key_id)
+ return True
- return _safe_call(_verify_cb, key.key_data)
+ return _safe_call(_verify_cb, [pubkey])
#
# Helper functions
@@ -248,35 +347,48 @@ def _build_key_from_gpg(address, key, key_data):
)
-def _build_unitary_gpgwrapper(key_data=None):
+def _build_keyring(keys=[]):
"""
- Return a temporary GPG wrapper keyring containing exactly zero or one
- keys.
- Temporary unitary keyrings allow the to use GPG's facilities for exactly
- one key. This function creates an empty temporary keyring and imports
- C{key_data} if it is not None.
+ Create an empty GPG keyring and import C{keys} into it.
+
+ @param keys: List of keys to add to the keyring.
+ @type keys: list of OpenPGPKey
- @param key_data: ASCII armored key data.
- @type key_data: str
@return: A GPG wrapper with a unitary keyring.
@rtype: gnupg.GPG
"""
+ privkeys = filter(lambda key: key.private == True, keys)
+ pubkeys = filter(lambda key: key.private == False, keys)
+ # here we filter out public keys that have a correspondent private key in
+ # the list because the private key_data by itself is enough to also have
+ # the public key in the keyring, and we want to count the keys afterwards.
+ privaddrs = map(lambda privkey: privkey.address, privkeys)
+ pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys)
+ # create temporary dir for temporary gpg keyring
tmpdir = tempfile.mkdtemp()
gpg = GPGWrapper(gnupghome=tmpdir)
leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.')
- if key_data:
- gpg.import_keys(key_data)
- leap_assert(
- len(gpg.list_keys()) is 1,
- 'Unitary keyring has wrong number of keys: %d.'
- % len(gpg.list_keys()))
+ # import keys into the keyring
+ gpg.import_keys(
+ reduce(
+ lambda x, y: x+y,
+ [key.key_data for key in pubkeys+privkeys], ''))
+ # assert the number of keys in the keyring
+ leap_assert(
+ len(gpg.list_keys()) == len(pubkeys)+len(privkeys),
+ 'Wrong number of public keys in keyring: %d, should be %d)' %
+ (len(gpg.list_keys()), len(pubkeys)+len(privkeys)))
+ leap_assert(
+ len(gpg.list_keys(secret=True)) == len(privkeys),
+ 'Wrong number of private keys in keyring: %d, should be %d)' %
+ (len(gpg.list_keys(secret=True)), len(privkeys)))
return gpg
-def _destroy_unitary_gpgwrapper(gpg):
+def _destroy_keyring(gpg):
"""
- Securely erase a unitary keyring.
+ Securely erase a keyring.
@param gpg: A GPG wrapper instance.
@type gpg: gnupg.GPG
@@ -292,23 +404,21 @@ def _destroy_unitary_gpgwrapper(gpg):
shutil.rmtree(gpg.gnupghome)
-def _safe_call(callback, key_data=None, **kwargs):
+def _safe_call(callback, keys=[]):
"""
- Run C{callback} in an unitary keyring containing C{key_data}.
+ Run C{callback} over a keyring containing C{keys}.
@param callback: Function whose first argument is the gpg keyring.
@type callback: function(gnupg.GPG)
- @param key_data: ASCII armored key data.
- @type key_data: str
- @param **kwargs: Other eventual parameters for the callback.
- @type **kwargs: **dict
+ @param keys: List of keys to add to the keyring.
+ @type keys: list of OpenPGPKey
@return: The results of the callback.
@rtype: str or bool
"""
- gpg = _build_unitary_gpgwrapper(key_data)
- val = callback(gpg, **kwargs)
- _destroy_unitary_gpgwrapper(gpg)
+ gpg = _build_keyring(filter(lambda key: key is not None, keys))
+ val = callback(gpg)
+ _destroy_keyring(gpg)
return val
@@ -404,18 +514,17 @@ class OpenPGPScheme(EncryptionScheme):
raise KeyNotFound(address)
return build_key_from_dict(OpenPGPKey, address, doc.content)
- def put_key_raw(self, data):
+ def put_ascii_key(self, key_data):
"""
- Put key contained in raw C{data} in local storage.
+ Put key contained in ascii-armored C{key_data} in local storage.
- @param data: The key data to be stored.
- @type data: str
+ @param key_data: The key data to be stored.
+ @type key_data: str
"""
- # TODO: add more checks for correct key data.
- leap_assert(data is not None, 'Data does not represent a key.')
-
- def _put_key_raw_cb(gpg):
+ leap_assert_type(key_data, str)
+ def _put_ascii_key_cb(gpg):
+ gpg.import_keys(key_data)
privkey = None
pubkey = None
try:
@@ -449,7 +558,7 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(pubkey['fingerprint'], secret=False))
self.put_key(openpgp_pubkey)
- _safe_call(_put_key_raw_cb, data)
+ _safe_call(_put_ascii_key_cb)
def put_key(self, key):
"""