diff options
Diffstat (limited to 'src/leap/common')
| -rw-r--r-- | src/leap/common/keymanager/__init__.py | 8 | ||||
| -rw-r--r-- | src/leap/common/keymanager/errors.py | 7 | ||||
| -rw-r--r-- | src/leap/common/keymanager/gpg.py | 2 | ||||
| -rw-r--r-- | src/leap/common/keymanager/keys.py | 25 | ||||
| -rw-r--r-- | src/leap/common/keymanager/openpgp.py | 212 | ||||
| -rw-r--r-- | src/leap/common/keymanager/util.py | 12 | ||||
| -rw-r--r-- | src/leap/common/tests/test_keymanager.py | 148 | 
7 files changed, 333 insertions, 81 deletions
| diff --git a/src/leap/common/keymanager/__init__.py b/src/leap/common/keymanager/__init__.py index a195724..f939a4e 100644 --- a/src/leap/common/keymanager/__init__.py +++ b/src/leap/common/keymanager/__init__.py @@ -33,8 +33,8 @@ from leap.common.keymanager.errors import (  )  from leap.common.keymanager.openpgp import (      OpenPGPKey, -    OpenPGPWrapper, -    _encrypt_symmetric, +    OpenPGPScheme, +    encrypt_sym,  )  from leap.common.keymanager.http import HTTPClient @@ -56,7 +56,7 @@ class KeyManager(object):          self._address = address          self._http_client = HTTPClient(url)          self._wrapper_map = { -            OpenPGPKey: OpenPGPWrapper(soledad), +            OpenPGPKey: OpenPGPScheme(soledad),              # other types of key will be added to this mapper.          } @@ -95,7 +95,7 @@ class KeyManager(object):          if send_private:              privkey = json.loads(                  self.get_key(self._address, ktype, private=True).get_json()) -            privkey.key_data = _encrypt_symmetric(data, passphrase) +            privkey.key_data = encrypt_sym(data, passphrase)              data['keys'].append(privkey)          headers = None  # TODO: replace for token-based-auth          self._http_client.request( diff --git a/src/leap/common/keymanager/errors.py b/src/leap/common/keymanager/errors.py index 886c666..add6a38 100644 --- a/src/leap/common/keymanager/errors.py +++ b/src/leap/common/keymanager/errors.py @@ -31,3 +31,10 @@ class KeyAlreadyExists(Exception):      """      Raised when attempted to create a key that already exists.      """ + + +class KeyAttributesDiffer(Exception): +    """ +    Raised when trying to delete a key but the stored key differs from the key +    passed to the delete_key() method. +    """ diff --git a/src/leap/common/keymanager/gpg.py b/src/leap/common/keymanager/gpg.py index 5571ace..f3e6453 100644 --- a/src/leap/common/keymanager/gpg.py +++ b/src/leap/common/keymanager/gpg.py @@ -394,4 +394,4 @@ class GPGWrapper(gnupg.GPG):          @return: Whether data is encrypted to a key.          @rtype: bool          """ -        self.is_encrypted_asym() or self.is_encrypted_sym() +        return self.is_encrypted_asym(data) or self.is_encrypted_sym(data) diff --git a/src/leap/common/keymanager/keys.py b/src/leap/common/keymanager/keys.py index bed407c..250c2fa 100644 --- a/src/leap/common/keymanager/keys.py +++ b/src/leap/common/keymanager/keys.py @@ -17,7 +17,7 @@  """ -Abstact key type and wrapper representations. +Abstact key type and encryption scheme representations.  """ @@ -86,22 +86,23 @@ class EncryptionKey(object):  # -# Key wrappers +# Encryption schemes  # -class KeyTypeWrapper(object): +class EncryptionScheme(object):      """ -    Abstract class for Key Type Wrappers. +    Abstract class for Encryption Schemes. -    A wrapper for a certain key type should know how to get and put keys in -    local storage using Soledad and also how to generate new keys. +    A wrapper for a certain encryption schemes should know how to get and put +    keys in local storage using Soledad, how to generate new keys and how to +    find out about possibly encrypted content.      """      __metaclass__ = ABCMeta      def __init__(self, soledad):          """ -        Initialize the Key Type Wrapper. +        Initialize this Encryption Scheme.          @param soledad: A Soledad instance for local storage of keys.          @type soledad: leap.soledad.Soledad @@ -139,6 +140,16 @@ class KeyTypeWrapper(object):          @param address: The address bound to the key.          @type address: str +          @return: The key bound to C{address}.          @rtype: EncryptionKey          """ + +    @abstractmethod +    def delete_key(self, key): +        """ +        Remove C{key} from storage. + +        @param key: The key to be removed. +        @type key: EncryptionKey +        """ diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index cd37138..ace8c1e 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -32,7 +32,7 @@ from leap.common.keymanager.errors import (  )  from leap.common.keymanager.keys import (      EncryptionKey, -    KeyTypeWrapper, +    EncryptionScheme,  )  from leap.common.keymanager.gpg import GPGWrapper  from leap.common.keymanager.util import ( @@ -46,29 +46,137 @@ from leap.common.keymanager.util import (  # Utility functions  # -def _encrypt_symmetric(data, password): +def encrypt_sym(data, passphrase):      """ -    Encrypt C{data} with C{password}. +    Encrypt C{data} with C{passphrase}. -    This function uses the OpenPGP wrapper to perform the encryption. +    @param data: The data to be encrypted. +    @type data: str +    @param passphrase: The passphrase used to encrypt C{data}. +    @type passphrase: str + +    @return: The encrypted data. +    @rtype: str +    """ + +    def _encrypt_cb(gpg): +        return str( +            gpg.encrypt( +                data, None, passphrase=passphrase, symmetric=True)) + +    return _safe_call(_encrypt_cb) + + +def decrypt_sym(data, passphrase): +    """ +    Decrypt C{data} with C{passphrase}. + +    @param data: The data to be decrypted. +    @type data: str +    @param passphrase: The passphrase used to decrypt C{data}. +    @type passphrase: str + +    @return: The decrypted data. +    @rtype: str +    """ + +    def _decrypt_cb(gpg): +        return str(gpg.decrypt(data, passphrase=passphrase)) + +    return _safe_call(_decrypt_cb) + + +def encrypt_asym(data, key): +    """ +    Encrypt C{data} using public @{key}.      @param data: The data to be encrypted.      @type data: str -    @param password: The password used to encrypt C{data}. -    @type password: str +    @param key: The key used to encrypt. +    @type key: OpenPGPKey +      @return: The encrypted data.      @rtype: str      """ -    cyphertext = None +    leap_assert(key.private is False, 'Key is not public.')      def _encrypt_cb(gpg): -        cyphertext = str( +        return str(              gpg.encrypt( -                data, None, passphrase=password, symmetric=True)) -        data['keys'].append(privkey) +                data, key.fingerprint, symmetric=False)) + +    return _safe_call(_encrypt_cb, key.key_data) + + +def decrypt_asym(data, key): +    """ +    Decrypt C{data} using private @{key}. + +    @param data: The data to be decrypted. +    @type data: str +    @param key: The key used to decrypt. +    @type key: OpenPGPKey + +    @return: The decrypted data. +    @rtype: str +    """ +    leap_assert(key.private is True, 'Key is not private.') -    _safe_call(_encrypt_cb) -    return cyphertext +    def _decrypt_cb(gpg): +        return str(gpg.decrypt(data)) + +    return _safe_call(_decrypt_cb, key.key_data) + + +def is_encrypted(data): +    """ +    Return whether C{data} was encrypted using OpenPGP. + +    @param data: The data we want to know about. +    @type data: str + +    @return: Whether C{data} was encrypted using this wrapper. +    @rtype: bool +    """ + +    def _is_encrypted_cb(gpg): +        return gpg.is_encrypted(data) + +    return _safe_call(_is_encrypted_cb) + + +def is_encrypted_sym(data): +    """ +    Return whether C{data} was encrypted using a public OpenPGP key. + +    @param data: The data we want to know about. +    @type data: str + +    @return: Whether C{data} was encrypted using this wrapper. +    @rtype: bool +    """ + +    def _is_encrypted_cb(gpg): +        return gpg.is_encrypted_sym(data) + +    return _safe_call(_is_encrypted_cb) + + +def is_encrypted_asym(data): +    """ +    Return whether C{data} was asymmetrically encrypted using OpenPGP. + +    @param data: The data we want to know about. +    @type data: str + +    @return: Whether C{data} was encrypted using this wrapper. +    @rtype: bool +    """ + +    def _is_encrypted_cb(gpg): +        return gpg.is_encrypted_asym(data) + +    return _safe_call(_is_encrypted_cb)  def _build_key_from_gpg(address, key, key_data): @@ -154,10 +262,14 @@ def _safe_call(callback, key_data=None, **kwargs):      @type key_data: str      @param **kwargs: Other eventual parameters for the callback.      @type **kwargs: **dict + +    @return: The results of the callback. +    @rtype: str or bool      """      gpg = _build_unitary_gpgwrapper(key_data) -    callback(gpg, **kwargs) +    val = callback(gpg, **kwargs)      _destroy_unitary_gpgwrapper(gpg) +    return val  # @@ -170,7 +282,7 @@ class OpenPGPKey(EncryptionKey):      """ -class OpenPGPWrapper(KeyTypeWrapper): +class OpenPGPScheme(EncryptionScheme):      """      A wrapper for OpenPGP keys.      """ @@ -182,8 +294,7 @@ class OpenPGPWrapper(KeyTypeWrapper):          @param soledad: A Soledad instance for key storage.          @type soledad: leap.soledad.Soledad          """ -        KeyTypeWrapper.__init__(self, soledad) -        self._soledad = soledad +        EncryptionScheme.__init__(self, soledad)      def gen_key(self, address):          """ @@ -223,10 +334,13 @@ class OpenPGPWrapper(KeyTypeWrapper):              leap_assert(                  re.match('.*<%s>$' % address, key['uids'][0]) is not None,                  'Key not correctly bound to address.') -            openpgp_key = _build_key_from_gpg( -                address, key, -                gpg.export_keys(key['fingerprint'])) -            self.put_key(openpgp_key) +            # insert both public and private keys in storage +            for secret in [True, False]: +                key = gpg.list_keys(secret=secret).pop() +                openpgp_key = _build_key_from_gpg( +                    address, key, +                    gpg.export_keys(key['fingerprint'], secret=secret)) +                self.put_key(openpgp_key)          _safe_call(_gen_key_cb)          return self.get_key(address, private=True) @@ -262,15 +376,38 @@ class OpenPGPWrapper(KeyTypeWrapper):          def _put_key_raw_cb(gpg): -            key = gpg.list_keys(secret=False).pop()  # unitary keyring +            privkey = None +            pubkey = None +            try: +                privkey = gpg.list_keys(secret=True).pop() +            except IndexError: +                pass +            pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring              # extract adress from first uid on key -            match = re.match('.*<([\w.-]+@[\w.-]+)>.*', key['uids'].pop()) +            match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())              leap_assert(match is not None, 'No user address in key data.')              address = match.group(1) -            openpgp_key = _build_key_from_gpg( -                address, key, -                gpg.export_keys(key['fingerprint'])) -            self.put_key(openpgp_key) +            if privkey is not None: +                match = re.match( +                    '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop()) +                leap_assert(match is not None, 'No user address in key data.') +                privaddress = match.group(1) +                leap_assert( +                    address == privaddress, +                    'Addresses in pub and priv key differ.') +                leap_assert( +                    pubkey['fingerprint'] == privkey['fingerprint'], +                    'Fingerprints for pub and priv key differ.') +                # insert private key in storage +                openpgp_privkey = _build_key_from_gpg( +                    address, privkey, +                    gpg.export_keys(privkey['fingerprint'], secret=True)) +                self.put_key(openpgp_privkey) +            # insert public key in storage +            openpgp_pubkey = _build_key_from_gpg( +                address, pubkey, +                gpg.export_keys(pubkey['fingerprint'], secret=False)) +            self.put_key(openpgp_pubkey)          _safe_call(_put_key_raw_cb, data) @@ -285,7 +422,8 @@ class OpenPGPWrapper(KeyTypeWrapper):          if doc is None:              self._soledad.create_doc_from_json(                  key.get_json(), -                doc_id=_keymanager_doc_id(key.address, key.private)) +                doc_id=_keymanager_doc_id( +                    OpenPGPKey, key.address, key.private))          else:              doc.set_json(key.get_json())              self._soledad.put_doc(doc) @@ -303,4 +441,22 @@ class OpenPGPWrapper(KeyTypeWrapper):          @return: The document with the key or None if it does not exist.          @rtype: leap.soledad.backends.leap_backend.LeapDocument          """ -        return self._soledad.get_doc(_keymanager_doc_id(address, private)) +        return self._soledad.get_doc( +            _keymanager_doc_id(OpenPGPKey, address, private)) + +    def delete_key(self, key): +        """ +        Remove C{key} from storage. + +        @param key: The key to be removed. +        @type key: EncryptionKey +        """ +        leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.') +        stored_key = self.get_key(key.address, private=key.private) +        if stored_key is None: +            raise KeyDoesNotExist(key) +        if stored_key.__dict__ != key.__dict__: +            raise KeyAttributesDiffer(key) +        doc = self._soledad.get_doc( +            _keymanager_doc_id(OpenPGPKey, key.address, key.private)) +        self._soledad.delete_doc(doc) diff --git a/src/leap/common/keymanager/util.py b/src/leap/common/keymanager/util.py index 42168c8..667d2b2 100644 --- a/src/leap/common/keymanager/util.py +++ b/src/leap/common/keymanager/util.py @@ -25,6 +25,9 @@ import re  from hashlib import sha256 +from binascii import b2a_base64 + +  from leap.common.check import leap_assert @@ -79,11 +82,13 @@ def _build_key_from_doc(kClass, address, doc):      return _build_key_from_dict(kClass, address, doc.content) -def _keymanager_doc_id(address, private=False): +def _keymanager_doc_id(ktype, address, private=False):      """      Return the document id for the document containing a key for      C{address}. +    @param address: The type of the key. +    @type address: KeyType      @param address: The address bound to the key.      @type address: str      @param private: Whether the key is private or not. @@ -93,5 +98,6 @@ def _keymanager_doc_id(address, private=False):      @rtype: str      """      leap_assert(_is_address(address), "Wrong address format: %s" % address) -    ktype = 'private' if private else 'public' -    return sha256('key-manager-'+address+'-'+ktype).hexdigest() +    ktype = str(ktype) +    visibility = 'private' if private else 'public' +    return sha256('key-manager-'+address+'-'+ktype+'-'+visibility).hexdigest() diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py index 4a2693e..f9b478f 100644 --- a/src/leap/common/tests/test_keymanager.py +++ b/src/leap/common/tests/test_keymanager.py @@ -26,14 +26,17 @@ import unittest  from leap.common.testing.basetest import BaseLeapTest  from leap.soledad import Soledad +from leap.soledad.crypto import SoledadCrypto + +  from leap.common.keymanager import KeyManager, openpgp, KeyNotFound  from leap.common.keymanager.openpgp import OpenPGPKey -from leap.common.keymanager.gpg import GPGWrapper  from leap.common.keymanager.util import (      _is_address,      _build_key_from_dict,      _keymanager_doc_id,  ) +from leap.common.keymanager import errors  class KeyManagerUtilTestCase(BaseLeapTest): @@ -72,32 +75,46 @@ class KeyManagerUtilTestCase(BaseLeapTest):              'validation': 'validation',          }          key = _build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict) -        self.assertEqual(kdict['address'], key.address, +        self.assertEqual( +            kdict['address'], key.address,              'Wrong data in key.') -        self.assertEqual(kdict['key_id'], key.key_id, +        self.assertEqual( +            kdict['key_id'], key.key_id,              'Wrong data in key.') -        self.assertEqual(kdict['fingerprint'], key.fingerprint, +        self.assertEqual( +            kdict['fingerprint'], key.fingerprint,              'Wrong data in key.') -        self.assertEqual(kdict['key_data'], key.key_data, +        self.assertEqual( +            kdict['key_data'], key.key_data,              'Wrong data in key.') -        self.assertEqual(kdict['private'], key.private, +        self.assertEqual( +            kdict['private'], key.private,              'Wrong data in key.') -        self.assertEqual(kdict['length'], key.length, +        self.assertEqual( +            kdict['length'], key.length,              'Wrong data in key.') -        self.assertEqual(kdict['expiry_date'], key.expiry_date, +        self.assertEqual( +            kdict['expiry_date'], key.expiry_date,              'Wrong data in key.') -        self.assertEqual(kdict['first_seen_at'], key.first_seen_at, +        self.assertEqual( +            kdict['first_seen_at'], key.first_seen_at,              'Wrong data in key.') -        self.assertEqual(kdict['last_audited_at'], key.last_audited_at, +        self.assertEqual( +            kdict['last_audited_at'], key.last_audited_at,              'Wrong data in key.') -        self.assertEqual(kdict['validation'], key.validation, +        self.assertEqual( +            kdict['validation'], key.validation,              'Wrong data in key.')      def test__keymanager_doc_id(self): -        doc_id1 = _keymanager_doc_id('leap@leap.se', private=False) -        doc_id2 = _keymanager_doc_id('leap@leap.se', private=True) -        doc_id3 = _keymanager_doc_id('user@leap.se', private=False) -        doc_id4 = _keymanager_doc_id('user@leap.se', private=True) +        doc_id1 = _keymanager_doc_id( +            OpenPGPKey, 'leap@leap.se', private=False) +        doc_id2 = _keymanager_doc_id( +            OpenPGPKey, 'leap@leap.se', private=True) +        doc_id3 = _keymanager_doc_id( +            OpenPGPKey, 'user@leap.se', private=False) +        doc_id4 = _keymanager_doc_id( +            OpenPGPKey, 'user@leap.se', private=True)          self.assertFalse(doc_id1 == doc_id2, 'Doc ids are equal!')          self.assertFalse(doc_id1 == doc_id3, 'Doc ids are equal!')          self.assertFalse(doc_id1 == doc_id4, 'Doc ids are equal!') @@ -119,7 +136,7 @@ class KeyManagerCryptoTestCase(BaseLeapTest):          )          # initialize solead by hand for testing purposes          self._soledad._init_dirs() -        self._soledad._gpg = GPGWrapper(gnupghome=self.tempdir+"/gnupg") +        self._soledad._crypto = SoledadCrypto(self._soledad)          self._soledad._shared_db = None          self._soledad._init_keys()          self._soledad._init_db() @@ -130,31 +147,86 @@ class KeyManagerCryptoTestCase(BaseLeapTest):      def _key_manager(user='user@leap.se', url='https://domain.org:6425'):          return KeyManager(user, url) -    def test_openpgp_gen_key(self): -        pgp = openpgp.OpenPGPWrapper(self._soledad) -        try: -            pgp.get_key('user@leap.se') -        except KeyNotFound: -            key = pgp.gen_key('user@leap.se') -            self.assertIsInstance(key, openpgp.OpenPGPKey) -            self.assertEqual( -                'user@leap.se', key.address, 'Wrong address bound to key.') -            self.assertEqual( -                '4096', key.length, 'Wrong key length.') +    def _test_openpgp_gen_key(self): +        pgp = openpgp.OpenPGPScheme(self._soledad) +        self.assertRaises(KeyNotFound, pgp.get_key, 'user@leap.se') +        key = pgp.gen_key('user@leap.se') +        self.assertIsInstance(key, openpgp.OpenPGPKey) +        self.assertEqual( +            'user@leap.se', key.address, 'Wrong address bound to key.') +        self.assertEqual( +            '4096', key.length, 'Wrong key length.') + +    def test_openpgp_put_delete_key(self): +        pgp = openpgp.OpenPGPScheme(self._soledad) +        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        pgp.put_key_raw(PUBLIC_KEY) +        key = pgp.get_key('leap@leap.se', private=False) +        pgp.delete_key(key) +        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')      def test_openpgp_put_key_raw(self): -        pgp = openpgp.OpenPGPWrapper(self._soledad) -        try: -            pgp.get_key('leap@leap.se') -        except KeyNotFound: -            pgp.put_key_raw(PUBLIC_KEY) -            key = pgp.get_key('leap@leap.se') -            self.assertIsInstance(key, openpgp.OpenPGPKey) -            self.assertEqual( -                'leap@leap.se', key.address, 'Wrong address bound to key.') -            self.assertEqual( -                '4096', key.length, 'Wrong key length.') +        pgp = openpgp.OpenPGPScheme(self._soledad) +        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        pgp.put_key_raw(PUBLIC_KEY) +        key = pgp.get_key('leap@leap.se', private=False) +        self.assertIsInstance(key, openpgp.OpenPGPKey) +        self.assertEqual( +            'leap@leap.se', key.address, 'Wrong address bound to key.') +        self.assertEqual( +            '4096', key.length, 'Wrong key length.') +        pgp.delete_key(key) +        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + +    def test_get_public_key(self): +        pgp = openpgp.OpenPGPScheme(self._soledad) +        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        pgp.put_key_raw(PUBLIC_KEY) +        self.assertRaises( +            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +        key = pgp.get_key('leap@leap.se', private=False) +        self.assertEqual('leap@leap.se', key.address) +        self.assertFalse(key.private) +        self.assertEqual(KEY_FINGERPRINT, key.fingerprint) +        pgp.delete_key(key) +        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') + +    def test_openpgp_encrypt_decrypt_asym(self): +        # encrypt +        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp.put_key_raw(PUBLIC_KEY) +        pubkey = pgp.get_key('leap@leap.se', private=False) +        cyphertext = openpgp.encrypt_asym('data', pubkey) +        # assert +        self.assertTrue(cyphertext is not None) +        self.assertTrue(cyphertext != '') +        self.assertTrue(cyphertext != 'data') +        self.assertTrue(openpgp.is_encrypted_asym(cyphertext)) +        self.assertFalse(openpgp.is_encrypted_sym(cyphertext)) +        self.assertTrue(openpgp.is_encrypted(cyphertext)) +        # decrypt +        self.assertRaises( +            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +        pgp.put_key_raw(PRIVATE_KEY) +        privkey = pgp.get_key('leap@leap.se', private=True) +        plaintext = openpgp.decrypt_asym(cyphertext, privkey) +        pgp.delete_key(pubkey) +        pgp.delete_key(privkey) +        self.assertRaises( +            KeyNotFound, pgp.get_key, 'leap@leap.se', private=False) +        self.assertRaises( +            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +    def test_openpgp_encrypt_decrypt_sym(self): +        cyphertext = openpgp.encrypt_sym('data', 'pass') +        self.assertTrue(cyphertext is not None) +        self.assertTrue(cyphertext != '') +        self.assertTrue(cyphertext != 'data') +        self.assertTrue(openpgp.is_encrypted_sym(cyphertext)) +        self.assertFalse(openpgp.is_encrypted_asym(cyphertext)) +        self.assertTrue(openpgp.is_encrypted(cyphertext)) +        plaintext = openpgp.decrypt_sym(cyphertext, 'pass') +        self.assertEqual('data', plaintext)  # Key material for testing | 
