summaryrefslogtreecommitdiff
path: root/src/leap/common/keymanager/openpgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/keymanager/openpgp.py')
-rw-r--r--src/leap/common/keymanager/openpgp.py212
1 files changed, 184 insertions, 28 deletions
diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py
index cd37138..ace8c1e 100644
--- a/src/leap/common/keymanager/openpgp.py
+++ b/src/leap/common/keymanager/openpgp.py
@@ -32,7 +32,7 @@ from leap.common.keymanager.errors import (
)
from leap.common.keymanager.keys import (
EncryptionKey,
- KeyTypeWrapper,
+ EncryptionScheme,
)
from leap.common.keymanager.gpg import GPGWrapper
from leap.common.keymanager.util import (
@@ -46,29 +46,137 @@ from leap.common.keymanager.util import (
# Utility functions
#
-def _encrypt_symmetric(data, password):
+def encrypt_sym(data, passphrase):
"""
- Encrypt C{data} with C{password}.
+ Encrypt C{data} with C{passphrase}.
- This function uses the OpenPGP wrapper to perform the encryption.
+ @param data: The data to be encrypted.
+ @type data: str
+ @param passphrase: The passphrase used to encrypt C{data}.
+ @type passphrase: str
+
+ @return: The encrypted data.
+ @rtype: str
+ """
+
+ def _encrypt_cb(gpg):
+ return str(
+ gpg.encrypt(
+ data, None, passphrase=passphrase, symmetric=True))
+
+ return _safe_call(_encrypt_cb)
+
+
+def decrypt_sym(data, passphrase):
+ """
+ Decrypt C{data} with C{passphrase}.
+
+ @param data: The data to be decrypted.
+ @type data: str
+ @param passphrase: The passphrase used to decrypt C{data}.
+ @type passphrase: str
+
+ @return: The decrypted data.
+ @rtype: str
+ """
+
+ def _decrypt_cb(gpg):
+ return str(gpg.decrypt(data, passphrase=passphrase))
+
+ return _safe_call(_decrypt_cb)
+
+
+def encrypt_asym(data, key):
+ """
+ Encrypt C{data} using public @{key}.
@param data: The data to be encrypted.
@type data: str
- @param password: The password used to encrypt C{data}.
- @type password: str
+ @param key: The key used to encrypt.
+ @type key: OpenPGPKey
+
@return: The encrypted data.
@rtype: str
"""
- cyphertext = None
+ leap_assert(key.private is False, 'Key is not public.')
def _encrypt_cb(gpg):
- cyphertext = str(
+ return str(
gpg.encrypt(
- data, None, passphrase=password, symmetric=True))
- data['keys'].append(privkey)
+ data, key.fingerprint, symmetric=False))
+
+ return _safe_call(_encrypt_cb, key.key_data)
+
+
+def decrypt_asym(data, key):
+ """
+ Decrypt C{data} using private @{key}.
+
+ @param data: The data to be decrypted.
+ @type data: str
+ @param key: The key used to decrypt.
+ @type key: OpenPGPKey
+
+ @return: The decrypted data.
+ @rtype: str
+ """
+ leap_assert(key.private is True, 'Key is not private.')
- _safe_call(_encrypt_cb)
- return cyphertext
+ def _decrypt_cb(gpg):
+ return str(gpg.decrypt(data))
+
+ return _safe_call(_decrypt_cb, key.key_data)
+
+
+def is_encrypted(data):
+ """
+ Return whether C{data} was encrypted using OpenPGP.
+
+ @param data: The data we want to know about.
+ @type data: str
+
+ @return: Whether C{data} was encrypted using this wrapper.
+ @rtype: bool
+ """
+
+ def _is_encrypted_cb(gpg):
+ return gpg.is_encrypted(data)
+
+ return _safe_call(_is_encrypted_cb)
+
+
+def is_encrypted_sym(data):
+ """
+ Return whether C{data} was encrypted using a public OpenPGP key.
+
+ @param data: The data we want to know about.
+ @type data: str
+
+ @return: Whether C{data} was encrypted using this wrapper.
+ @rtype: bool
+ """
+
+ def _is_encrypted_cb(gpg):
+ return gpg.is_encrypted_sym(data)
+
+ return _safe_call(_is_encrypted_cb)
+
+
+def is_encrypted_asym(data):
+ """
+ Return whether C{data} was asymmetrically encrypted using OpenPGP.
+
+ @param data: The data we want to know about.
+ @type data: str
+
+ @return: Whether C{data} was encrypted using this wrapper.
+ @rtype: bool
+ """
+
+ def _is_encrypted_cb(gpg):
+ return gpg.is_encrypted_asym(data)
+
+ return _safe_call(_is_encrypted_cb)
def _build_key_from_gpg(address, key, key_data):
@@ -154,10 +262,14 @@ def _safe_call(callback, key_data=None, **kwargs):
@type key_data: str
@param **kwargs: Other eventual parameters for the callback.
@type **kwargs: **dict
+
+ @return: The results of the callback.
+ @rtype: str or bool
"""
gpg = _build_unitary_gpgwrapper(key_data)
- callback(gpg, **kwargs)
+ val = callback(gpg, **kwargs)
_destroy_unitary_gpgwrapper(gpg)
+ return val
#
@@ -170,7 +282,7 @@ class OpenPGPKey(EncryptionKey):
"""
-class OpenPGPWrapper(KeyTypeWrapper):
+class OpenPGPScheme(EncryptionScheme):
"""
A wrapper for OpenPGP keys.
"""
@@ -182,8 +294,7 @@ class OpenPGPWrapper(KeyTypeWrapper):
@param soledad: A Soledad instance for key storage.
@type soledad: leap.soledad.Soledad
"""
- KeyTypeWrapper.__init__(self, soledad)
- self._soledad = soledad
+ EncryptionScheme.__init__(self, soledad)
def gen_key(self, address):
"""
@@ -223,10 +334,13 @@ class OpenPGPWrapper(KeyTypeWrapper):
leap_assert(
re.match('.*<%s>$' % address, key['uids'][0]) is not None,
'Key not correctly bound to address.')
- openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint']))
- self.put_key(openpgp_key)
+ # insert both public and private keys in storage
+ for secret in [True, False]:
+ key = gpg.list_keys(secret=secret).pop()
+ openpgp_key = _build_key_from_gpg(
+ address, key,
+ gpg.export_keys(key['fingerprint'], secret=secret))
+ self.put_key(openpgp_key)
_safe_call(_gen_key_cb)
return self.get_key(address, private=True)
@@ -262,15 +376,38 @@ class OpenPGPWrapper(KeyTypeWrapper):
def _put_key_raw_cb(gpg):
- key = gpg.list_keys(secret=False).pop() # unitary keyring
+ privkey = None
+ pubkey = None
+ try:
+ privkey = gpg.list_keys(secret=True).pop()
+ except IndexError:
+ pass
+ pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
# extract adress from first uid on key
- match = re.match('.*<([\w.-]+@[\w.-]+)>.*', key['uids'].pop())
+ match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
leap_assert(match is not None, 'No user address in key data.')
address = match.group(1)
- openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint']))
- self.put_key(openpgp_key)
+ if privkey is not None:
+ match = re.match(
+ '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
+ leap_assert(match is not None, 'No user address in key data.')
+ privaddress = match.group(1)
+ leap_assert(
+ address == privaddress,
+ 'Addresses in pub and priv key differ.')
+ leap_assert(
+ pubkey['fingerprint'] == privkey['fingerprint'],
+ 'Fingerprints for pub and priv key differ.')
+ # insert private key in storage
+ openpgp_privkey = _build_key_from_gpg(
+ address, privkey,
+ gpg.export_keys(privkey['fingerprint'], secret=True))
+ self.put_key(openpgp_privkey)
+ # insert public key in storage
+ openpgp_pubkey = _build_key_from_gpg(
+ address, pubkey,
+ gpg.export_keys(pubkey['fingerprint'], secret=False))
+ self.put_key(openpgp_pubkey)
_safe_call(_put_key_raw_cb, data)
@@ -285,7 +422,8 @@ class OpenPGPWrapper(KeyTypeWrapper):
if doc is None:
self._soledad.create_doc_from_json(
key.get_json(),
- doc_id=_keymanager_doc_id(key.address, key.private))
+ doc_id=_keymanager_doc_id(
+ OpenPGPKey, key.address, key.private))
else:
doc.set_json(key.get_json())
self._soledad.put_doc(doc)
@@ -303,4 +441,22 @@ class OpenPGPWrapper(KeyTypeWrapper):
@return: The document with the key or None if it does not exist.
@rtype: leap.soledad.backends.leap_backend.LeapDocument
"""
- return self._soledad.get_doc(_keymanager_doc_id(address, private))
+ return self._soledad.get_doc(
+ _keymanager_doc_id(OpenPGPKey, address, private))
+
+ def delete_key(self, key):
+ """
+ Remove C{key} from storage.
+
+ @param key: The key to be removed.
+ @type key: EncryptionKey
+ """
+ leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
+ stored_key = self.get_key(key.address, private=key.private)
+ if stored_key is None:
+ raise KeyDoesNotExist(key)
+ if stored_key.__dict__ != key.__dict__:
+ raise KeyAttributesDiffer(key)
+ doc = self._soledad.get_doc(
+ _keymanager_doc_id(OpenPGPKey, key.address, key.private))
+ self._soledad.delete_doc(doc)