summaryrefslogtreecommitdiff
path: root/src/leap/common/keymanager/openpgp.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2013-05-13 23:06:35 +0900
committerKali Kaneko <kali@leap.se>2013-05-21 01:57:26 +0900
commit7c6a87acaead5f54e1f2155ecf0a089eff97d654 (patch)
tree57faff0c4f476557c3e457514f5aed99abfbc3de /src/leap/common/keymanager/openpgp.py
parent2a78ab3c730ba652153deaed55cfa8d92089a979 (diff)
use temporary openpgpwrapper as a context manager
in this way we implicitely catch any exception during the wrapped call, and ensure that the destructor is always called.
Diffstat (limited to 'src/leap/common/keymanager/openpgp.py')
-rw-r--r--src/leap/common/keymanager/openpgp.py526
1 files changed, 310 insertions, 216 deletions
diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py
index d578638..e60833b 100644
--- a/src/leap/common/keymanager/openpgp.py
+++ b/src/leap/common/keymanager/openpgp.py
@@ -15,26 +15,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
"""
Infrastructure for using OpenPGP keys in Key Manager.
"""
-
-
+import logging
+import os
import re
-import tempfile
import shutil
+import tempfile
from leap.common.check import leap_assert, leap_assert_type
-from leap.common.keymanager.errors import (
- KeyNotFound,
- KeyAlreadyExists,
- KeyAttributesDiffer,
- InvalidSignature,
- EncryptionFailed,
- DecryptionFailed,
- SignFailed,
-)
+from leap.common.keymanager import errors
+
from leap.common.keymanager.keys import (
EncryptionKey,
EncryptionScheme,
@@ -44,12 +36,242 @@ from leap.common.keymanager.keys import (
)
from leap.common.keymanager.gpg import GPGWrapper
+logger = logging.getLogger(__name__)
+
+
+#
+# gpg wrapper and decorator
+#
+
+def temporary_gpgwrapper(keys=None):
+ """
+ Returns a unitary gpg wrapper that implements context manager
+ protocol.
+
+ @param key_data: ASCII armored key data.
+ @type key_data: str
+
+ @return: a GPGWrapper instance
+ @rtype: GPGWrapper
+ """
+ # TODO do here checks on key_data
+ return TempGPGWrapper(keys=keys)
+
+
+def with_temporary_gpg(fun):
+ """
+ Decorator to add a temporary gpg wrapper as context
+ to gpg related functions.
+
+ Decorated functions are expected to return a function whose only
+ argument is a gpgwrapper instance.
+ """
+ def wrapped(*args, **kwargs):
+ """
+ We extract the arguments passed to the wrapped function,
+ run the function and do validations.
+ We expect that the positional arguments are `data`,
+ and an optional `key`.
+ All the rest of arguments should be passed as named arguments
+ to allow for a correct unpacking.
+ """
+ if len(args) == 2:
+ keys = args[1] if isinstance(args[1], OpenPGPKey) else None
+ else:
+ keys = None
+
+ # sign/verify keys passed as arguments
+ sign = kwargs.get('sign', None)
+ if sign:
+ keys = [keys, sign]
+
+ verify = kwargs.get('verify', None)
+ if verify:
+ keys = [keys, verify]
+
+ # is the wrapped function sign or verify?
+ fun_name = fun.__name__
+ is_sign_function = True if fun_name == "sign" else False
+ is_verify_function = True if fun_name == "verify" else False
+
+ result = None
+
+ with temporary_gpgwrapper(keys) as gpg:
+ result = fun(*args, **kwargs)(gpg)
+
+ # TODO: cleanup a little bit the
+ # validation. maybe delegate to other
+ # auxiliary functions for clarity.
+
+ ok = getattr(result, 'ok', None)
+
+ stderr = getattr(result, 'stderr', None)
+ if stderr:
+ logger.debug("%s" % (stderr,))
+
+ if ok is False:
+ raise errors.EncryptionDecryptionFailed(
+ 'Failed to encrypt/decrypt in %s: %s' % (
+ fun.__name__,
+ stderr))
+
+ if verify is not None:
+ # A verify key has been passed
+ if result.valid is False or \
+ verify.fingerprint != result.pubkey_fingerprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature with key %s: %s' %
+ (verify.key_id, stderr))
+
+ if is_sign_function:
+ # Specific validation for sign function
+ privkey = gpg.list_keys(secret=True).pop()
+ rfprint = result.fingerprint
+ kfprint = privkey['fingerprint']
+ if result.fingerprint is None:
+ raise errors.SignFailed(
+ 'Failed to sign with key %s: %s' %
+ (privkey['keyid'], stderr))
+ leap_assert(
+ result.fingerprint == kfprint,
+ 'Signature and private key fingerprints mismatch: '
+ '%s != %s' %
+ (rfprint, kfprint))
+
+ if is_verify_function:
+ # Specific validation for verify function
+ pubkey = gpg.list_keys().pop()
+ valid = result.valid
+ rfprint = result.fingerprint
+ kfprint = pubkey['fingerprint']
+ if valid is False or rfprint != kfprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature '
+ 'with key %s.' % pubkey['keyid'])
+ result = result.valid
+
+ # ok, enough checks. let's return data if available
+ if hasattr(result, 'data'):
+ result = result.data
+ return result
+ return wrapped
+
+
+class TempGPGWrapper(object):
+ """
+ A context manager returning a temporary GPG wrapper keyring, which
+ contains exactly zero or one pubkeys, and zero or one privkeys.
+
+ Temporary unitary keyrings allow the to use GPG's facilities for exactly
+ one key. This function creates an empty temporary keyring and imports
+ C{keys} if it is not None.
+ """
+ def __init__(self, keys=None):
+ """
+ @param keys: OpenPGP key, or list of.
+ @type keys: OpenPGPKey or list of OpenPGPKeys
+ """
+ self._gpg = None
+ if not keys:
+ keys = list()
+ if not isinstance(keys, list):
+ keys = [keys]
+ self._keys = keys
+ for key in filter(None, keys):
+ leap_assert_type(key, OpenPGPKey)
+
+ def __enter__(self):
+ """
+ Calls the unitary gpgwrapper initializer
+
+ @return: A GPG wrapper with a unitary keyring.
+ @rtype: gnupg.GPG
+ """
+ self._build_keyring()
+ return self._gpg
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ Ensures the gpgwrapper is properly destroyed.
+ """
+ # TODO handle exceptions and log here
+ self._destroy_keyring()
+
+ def _build_keyring(self):
+ """
+ Create an empty GPG keyring and import C{keys} into it.
+
+ @param keys: List of keys to add to the keyring.
+ @type keys: list of OpenPGPKey
+
+ @return: A GPG wrapper with a unitary keyring.
+ @rtype: gnupg.GPG
+ """
+ privkeys = [key for key in self._keys if key and key.private is True]
+ publkeys = [key for key in self._keys if key and key.private is False]
+ # here we filter out public keys that have a correspondent
+ # private key in the list because the private key_data by
+ # itself is enough to also have the public key in the keyring,
+ # and we want to count the keys afterwards.
+
+ privaddrs = map(lambda privkey: privkey.address, privkeys)
+ publkeys = filter(
+ lambda pubkey: pubkey.address not in privaddrs, publkeys)
+
+ listkeys = lambda: self._gpg.list_keys()
+ listsecretkeys = lambda: self._gpg.list_keys(secret=True)
+
+ self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
+ leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
+
+ # import keys into the keyring:
+ # concatenating ascii-armored keys, which is correctly
+ # understood by the GPGWrapper.
+
+ self._gpg.import_keys("".join(
+ [x.key_data for x in publkeys + privkeys]))
+
+ # assert the number of keys in the keyring
+ leap_assert(
+ len(listkeys()) == len(publkeys) + len(privkeys),
+ 'Wrong number of public keys in keyring: %d, should be %d)' %
+ (len(listkeys()), len(publkeys) + len(privkeys)))
+ leap_assert(
+ len(listsecretkeys()) == len(privkeys),
+ 'Wrong number of private keys in keyring: %d, should be %d)' %
+ (len(listsecretkeys()), len(privkeys)))
+
+ def _destroy_keyring(self):
+ """
+ Securely erase a unitary keyring.
+ """
+ # TODO: implement some kind of wiping of data or a more
+ # secure way that
+ # does not write to disk.
+
+ try:
+ for secret in [True, False]:
+ for key in self._gpg.list_keys(secret=secret):
+ self._gpg.delete_keys(
+ key['fingerprint'],
+ secret=secret)
+ leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
+
+ except:
+ raise
+
+ finally:
+ leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
+ "watch out! Tried to remove default gnupg home!")
+ shutil.rmtree(self._gpg.gnupghome)
+
#
# API functions
#
-def encrypt_sym(data, passphrase, sign=None):
+@with_temporary_gpg
+def encrypt_sym(data, passphrase=None, sign=None):
"""
Encrypt C{data} with C{passphrase} and sign with C{sign} private key.
@@ -68,23 +290,19 @@ def encrypt_sym(data, passphrase, sign=None):
leap_assert_type(sign, OpenPGPKey)
leap_assert(sign.private is True)
- def _encrypt_cb(gpg):
- result = gpg.encrypt(
- data, None,
- sign=sign.key_id if sign else None,
- passphrase=passphrase, symmetric=True)
- # Here we cannot assert for correctness of sig because the sig is in
- # the ciphertext.
- # result.ok - (bool) indicates if the operation succeeded
- # result.data - (bool) contains the result of the operation
- if result.ok is False:
- raise EncryptionFailed('Failed to encrypt: %s' % result.stderr)
- return result.data
+ # Here we cannot assert for correctness of sig because the sig is in
+ # the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
- return _safe_call(_encrypt_cb, [sign])
+ return lambda gpg: gpg.encrypt(
+ data, None,
+ sign=sign.key_id if sign else None,
+ passphrase=passphrase, symmetric=True)
-def decrypt_sym(data, passphrase, verify=None):
+@with_temporary_gpg
+def decrypt_sym(data, passphrase=None, verify=None):
"""
Decrypt C{data} with C{passphrase} and verify with C{verify} public
key.
@@ -107,27 +325,17 @@ def decrypt_sym(data, passphrase, verify=None):
leap_assert_type(verify, OpenPGPKey)
leap_assert(verify.private is False)
- def _decrypt_cb(gpg):
- result = gpg.decrypt(data, passphrase=passphrase)
- # result.ok - (bool) indicates if the operation succeeded
- # result.valid - (bool) indicates if the signature was verified
- # result.data - (bool) contains the result of the operation
- # result.pubkey_fingerpring - (str) contains the fingerprint of the
- # public key that signed this data.
- if result.ok is False:
- raise DecryptionFailed('Failed to decrypt: %s', result.stderr)
- if verify is not None:
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, result.stderr))
- return result.data
-
- return _safe_call(_decrypt_cb, [verify])
-
-
-def encrypt_asym(data, pubkey, sign=None):
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.valid - (bool) indicates if the signature was verified
+ # result.data - (bool) contains the result of the operation
+ # result.pubkey_fingerpring - (str) contains the fingerprint of the
+ # public key that signed this data.
+ return lambda gpg: gpg.decrypt(
+ data, passphrase=passphrase)
+
+
+@with_temporary_gpg
+def encrypt_asym(data, key, passphrase=None, sign=None):
"""
Encrypt C{data} using public @{key} and sign with C{sign} key.
@@ -141,31 +349,25 @@ def encrypt_asym(data, pubkey, sign=None):
@return: The encrypted data.
@rtype: str
"""
- leap_assert_type(pubkey, OpenPGPKey)
- leap_assert(pubkey.private is False, 'Key is not public.')
+ leap_assert_type(key, OpenPGPKey)
+ leap_assert(key.private is False, 'Key is not public.')
if sign is not None:
leap_assert_type(sign, OpenPGPKey)
leap_assert(sign.private is True)
- def _encrypt_cb(gpg):
- result = gpg.encrypt(
- data, pubkey.fingerprint,
- sign=sign.key_id if sign else None,
- symmetric=False)
- # Here we cannot assert for correctness of sig because the sig is in
- # the ciphertext.
- # result.ok - (bool) indicates if the operation succeeded
- # result.data - (bool) contains the result of the operation
- if result.ok is False:
- raise EncryptionFailed(
- 'Failed to encrypt with key %s: %s' %
- (pubkey.key_id, result.stderr))
- return result.data
-
- return _safe_call(_encrypt_cb, [pubkey, sign])
-
-
-def decrypt_asym(data, privkey, verify=None):
+ # Here we cannot assert for correctness of sig because the sig is in
+ # the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
+
+ return lambda gpg: gpg.encrypt(
+ data, key.fingerprint,
+ sign=sign.key_id if sign else None,
+ passphrase=passphrase, symmetric=False)
+
+
+@with_temporary_gpg
+def decrypt_asym(data, key, passphrase=None, verify=None):
"""
Decrypt C{data} using private @{key} and verify with C{verify} key.
@@ -182,32 +384,16 @@ def decrypt_asym(data, privkey, verify=None):
@raise InvalidSignature: Raised if unable to verify the signature with
C{verify} key.
"""
- leap_assert(privkey.private is True, 'Key is not private.')
+ leap_assert(key.private is True, 'Key is not private.')
if verify is not None:
leap_assert_type(verify, OpenPGPKey)
leap_assert(verify.private is False)
- def _decrypt_cb(gpg):
- result = gpg.decrypt(data)
- # result.ok - (bool) indicates if the operation succeeded
- # result.valid - (bool) indicates if the signature was verified
- # result.data - (bool) contains the result of the operation
- # result.pubkey_fingerpring - (str) contains the fingerprint of the
- # public key that signed this data.
- if result.ok is False:
- raise DecryptionFailed('Failed to decrypt with key %s: %s' %
- (privkey.key_id, result.stderr))
- if verify is not None:
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, result.stderr))
- return result.data
-
- return _safe_call(_decrypt_cb, [privkey, verify])
+ return lambda gpg: gpg.decrypt(
+ data, passphrase=passphrase)
+@with_temporary_gpg
def is_encrypted(data):
"""
Return whether C{data} was encrypted using OpenPGP.
@@ -218,13 +404,10 @@ def is_encrypted(data):
@return: Whether C{data} was encrypted using this wrapper.
@rtype: bool
"""
-
- def _is_encrypted_cb(gpg):
- return gpg.is_encrypted(data)
-
- return _safe_call(_is_encrypted_cb)
+ return lambda gpg: gpg.is_encrypted(data)
+@with_temporary_gpg
def is_encrypted_sym(data):
"""
Return whether C{data} was encrypted using a public OpenPGP key.
@@ -235,13 +418,10 @@ def is_encrypted_sym(data):
@return: Whether C{data} was encrypted using this wrapper.
@rtype: bool
"""
-
- def _is_encrypted_cb(gpg):
- return gpg.is_encrypted_sym(data)
-
- return _safe_call(_is_encrypted_cb)
+ return lambda gpg: gpg.is_encrypted_sym(data)
+@with_temporary_gpg
def is_encrypted_asym(data):
"""
Return whether C{data} was asymmetrically encrypted using OpenPGP.
@@ -252,19 +432,17 @@ def is_encrypted_asym(data):
@return: Whether C{data} was encrypted using this wrapper.
@rtype: bool
"""
-
- def _is_encrypted_cb(gpg):
- return gpg.is_encrypted_asym(data)
-
- return _safe_call(_is_encrypted_cb)
+ return lambda gpg: gpg.is_encrypted_asym(data)
+@with_temporary_gpg
def sign(data, privkey):
"""
Sign C{data} with C{privkey}.
@param data: The data to be signed.
@type data: str
+
@param privkey: The private key to be used to sign.
@type privkey: OpenPGPKey
@@ -274,53 +452,36 @@ def sign(data, privkey):
leap_assert_type(privkey, OpenPGPKey)
leap_assert(privkey.private is True)
- def _sign_cb(gpg):
- result = gpg.sign(data, keyid=privkey.key_id)
- # result.fingerprint - contains the fingerprint of the key used to
- # sign.
- if result.fingerprint is None:
- raise SignFailed(
- 'Failed to sign with key %s: %s' %
- (privkey.key_id, result.stderr))
- leap_assert(
- result.fingerprint == privkey.fingerprint,
- 'Signature and private key fingerprints mismatch: %s != %s' %
- (result.fingerprint, privkey.fingerprint))
- return result.data
-
- return _safe_call(_sign_cb, [privkey])
+ # result.fingerprint - contains the fingerprint of the key used to
+ # sign.
+ return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
-def verify(data, pubkey):
+@with_temporary_gpg
+def verify(data, key):
"""
Verify signed C{data} with C{pubkey}.
@param data: The data to be verified.
@type data: str
+
@param pubkey: The public key to be used on verification.
@type pubkey: OpenPGPKey
@return: The ascii-armored signed data.
@rtype: str
"""
- leap_assert_type(pubkey, OpenPGPKey)
- leap_assert(pubkey.private is False)
-
- def _verify_cb(gpg):
- result = gpg.verify(data)
- if result.valid is False or \
- result.fingerprint != pubkey.fingerprint:
- raise InvalidSignature(
- 'Failed to verify signature with key %s.' % pubkey.key_id)
- return True
+ leap_assert_type(key, OpenPGPKey)
+ leap_assert(key.private is False)
- return _safe_call(_verify_cb, [pubkey])
+ return lambda gpg: gpg.verify(data)
#
# Helper functions
#
+
def _build_key_from_gpg(address, key, key_data):
"""
Build an OpenPGPKey for C{address} based on C{key} from
@@ -350,81 +511,6 @@ def _build_key_from_gpg(address, key, key_data):
)
-def _build_keyring(keys=[]):
- """
-
- Create an empty GPG keyring and import C{keys} into it.
-
- @param keys: List of keys to add to the keyring.
- @type keys: list of OpenPGPKey
-
- @return: A GPG wrapper with a unitary keyring.
- @rtype: gnupg.GPG
- """
- privkeys = filter(lambda key: key.private is True, keys)
- pubkeys = filter(lambda key: key.private is False, keys)
- # here we filter out public keys that have a correspondent private key in
- # the list because the private key_data by itself is enough to also have
- # the public key in the keyring, and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address, privkeys)
- pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys)
- # create temporary dir for temporary gpg keyring
- tmpdir = tempfile.mkdtemp()
- gpg = GPGWrapper(gnupghome=tmpdir)
- leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.')
- # import keys into the keyring
- gpg.import_keys(
- reduce(
- lambda x, y: x+y,
- [key.key_data for key in pubkeys+privkeys], ''))
- # assert the number of keys in the keyring
- leap_assert(
- len(gpg.list_keys()) == len(pubkeys)+len(privkeys),
- 'Wrong number of public keys in keyring: %d, should be %d)' %
- (len(gpg.list_keys()), len(pubkeys)+len(privkeys)))
- leap_assert(
- len(gpg.list_keys(secret=True)) == len(privkeys),
- 'Wrong number of private keys in keyring: %d, should be %d)' %
- (len(gpg.list_keys(secret=True)), len(privkeys)))
- return gpg
-
-
-def _destroy_keyring(gpg):
- """
- Securely erase a keyring.
-
- @param gpg: A GPG wrapper instance.
- @type gpg: gnupg.GPG
- """
- for secret in [True, False]:
- for key in gpg.list_keys(secret=secret):
- gpg.delete_keys(
- key['fingerprint'],
- secret=secret)
- leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty!')
- # TODO: implement some kind of wiping of data or a more secure way that
- # does not write to disk.
- shutil.rmtree(gpg.gnupghome)
-
-
-def _safe_call(callback, keys=[]):
- """
- Run C{callback} over a keyring containing C{keys}.
-
- @param callback: Function whose first argument is the gpg keyring.
- @type callback: function(gnupg.GPG)
- @param keys: List of keys to add to the keyring.
- @type keys: list of OpenPGPKey
-
- @return: The results of the callback.
- @rtype: str or bool
- """
- gpg = _build_keyring(filter(lambda key: key is not None, keys))
- val = callback(gpg)
- _destroy_keyring(gpg)
- return val
-
-
#
# The OpenPGP wrapper
#
@@ -463,11 +549,11 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(is_address(address), 'Not an user address: %s' % address)
try:
self.get_key(address)
- raise KeyAlreadyExists(address)
- except KeyNotFound:
+ raise errors.KeyAlreadyExists(address)
+ except errors.KeyNotFound:
pass
- def _gen_key_cb(gpg):
+ def _gen_key(gpg):
params = gpg.gen_key_input(
key_type='RSA',
key_length=4096,
@@ -495,7 +581,10 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(key['fingerprint'], secret=secret))
self.put_key(openpgp_key)
- _safe_call(_gen_key_cb)
+ with temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
+ _gen_key(gpg)
+
return self.get_key(address, private=True)
def get_key(self, address, private=False):
@@ -514,7 +603,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(is_address(address), 'Not an user address: %s' % address)
doc = self._get_key_doc(address, private)
if doc is None:
- raise KeyNotFound(address)
+ raise errors.KeyNotFound(address)
return build_key_from_dict(OpenPGPKey, address, doc.content)
def put_ascii_key(self, key_data):
@@ -525,11 +614,14 @@ class OpenPGPScheme(EncryptionScheme):
@type key_data: str
"""
leap_assert_type(key_data, str)
+ # TODO: add more checks for correct key data.
+ leap_assert(key_data is not None, 'Data does not represent a key.')
- def _put_ascii_key_cb(gpg):
+ def _put_ascii_key(gpg):
gpg.import_keys(key_data)
privkey = None
pubkey = None
+
try:
privkey = gpg.list_keys(secret=True).pop()
except IndexError:
@@ -561,7 +653,9 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(pubkey['fingerprint'], secret=False))
self.put_key(openpgp_pubkey)
- _safe_call(_put_ascii_key_cb)
+ with temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
+ _put_ascii_key(gpg)
def put_key(self, key):
"""
@@ -606,9 +700,9 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
stored_key = self.get_key(key.address, private=key.private)
if stored_key is None:
- raise KeyNotFound(key)
+ raise errors.KeyNotFound(key)
if stored_key.__dict__ != key.__dict__:
- raise KeyAttributesDiffer(key)
+ raise errors.KeyAttributesDiffer(key)
doc = self._soledad.get_doc(
keymanager_doc_id(OpenPGPKey, key.address, key.private))
self._soledad.delete_doc(doc)