Refactor, fixes, add api, tests.
authordrebs <drebs@leap.se>
Tue, 23 Apr 2013 23:50:02 +0000 (20:50 -0300)
committerdrebs <drebs@leap.se>
Tue, 23 Apr 2013 23:50:02 +0000 (20:50 -0300)
* Change KeyTypeWrapper to EncryptionScheme
* Change OpenPGPWrapper to OpenPGPScheme
* Add missing and standardized crypto API.
* Add delete_key()
* Fix put_key raw so it puts either public or private keys.
* Fix gpg's is_encrypted()
* Fix openpgp's safe callbacks so they return correctly.
* Remove binascii because it generates invalid doc ids.
* Add tests.

src/leap/common/keymanager/__init__.py
src/leap/common/keymanager/errors.py
src/leap/common/keymanager/gpg.py
src/leap/common/keymanager/keys.py
src/leap/common/keymanager/openpgp.py
src/leap/common/keymanager/util.py
src/leap/common/tests/test_keymanager.py

index a195724..f939a4e 100644 (file)
@@ -33,8 +33,8 @@ from leap.common.keymanager.errors import (
 )
 from leap.common.keymanager.openpgp import (
     OpenPGPKey,
-    OpenPGPWrapper,
-    _encrypt_symmetric,
+    OpenPGPScheme,
+    encrypt_sym,
 )
 from leap.common.keymanager.http import HTTPClient
 
@@ -56,7 +56,7 @@ class KeyManager(object):
         self._address = address
         self._http_client = HTTPClient(url)
         self._wrapper_map = {
-            OpenPGPKey: OpenPGPWrapper(soledad),
+            OpenPGPKey: OpenPGPScheme(soledad),
             # other types of key will be added to this mapper.
         }
 
@@ -95,7 +95,7 @@ class KeyManager(object):
         if send_private:
             privkey = json.loads(
                 self.get_key(self._address, ktype, private=True).get_json())
-            privkey.key_data = _encrypt_symmetric(data, passphrase)
+            privkey.key_data = encrypt_sym(data, passphrase)
             data['keys'].append(privkey)
         headers = None  # TODO: replace for token-based-auth
         self._http_client.request(
index 886c666..add6a38 100644 (file)
@@ -31,3 +31,10 @@ class KeyAlreadyExists(Exception):
     """
     Raised when attempted to create a key that already exists.
     """
+
+
+class KeyAttributesDiffer(Exception):
+    """
+    Raised when trying to delete a key but the stored key differs from the key
+    passed to the delete_key() method.
+    """
index 5571ace..f3e6453 100644 (file)
@@ -394,4 +394,4 @@ class GPGWrapper(gnupg.GPG):
         @return: Whether data is encrypted to a key.
         @rtype: bool
         """
-        self.is_encrypted_asym() or self.is_encrypted_sym()
+        return self.is_encrypted_asym(data) or self.is_encrypted_sym(data)
index bed407c..250c2fa 100644 (file)
@@ -17,7 +17,7 @@
 
 
 """
-Abstact key type and wrapper representations.
+Abstact key type and encryption scheme representations.
 """
 
 
@@ -86,22 +86,23 @@ class EncryptionKey(object):
 
 
 #
-# Key wrappers
+# Encryption schemes
 #
 
-class KeyTypeWrapper(object):
+class EncryptionScheme(object):
     """
-    Abstract class for Key Type Wrappers.
+    Abstract class for Encryption Schemes.
 
-    A wrapper for a certain key type should know how to get and put keys in
-    local storage using Soledad and also how to generate new keys.
+    A wrapper for a certain encryption schemes should know how to get and put
+    keys in local storage using Soledad, how to generate new keys and how to
+    find out about possibly encrypted content.
     """
 
     __metaclass__ = ABCMeta
 
     def __init__(self, soledad):
         """
-        Initialize the Key Type Wrapper.
+        Initialize this Encryption Scheme.
 
         @param soledad: A Soledad instance for local storage of keys.
         @type soledad: leap.soledad.Soledad
@@ -139,6 +140,16 @@ class KeyTypeWrapper(object):
 
         @param address: The address bound to the key.
         @type address: str
+
         @return: The key bound to C{address}.
         @rtype: EncryptionKey
         """
+
+    @abstractmethod
+    def delete_key(self, key):
+        """
+        Remove C{key} from storage.
+
+        @param key: The key to be removed.
+        @type key: EncryptionKey
+        """
index cd37138..ace8c1e 100644 (file)
@@ -32,7 +32,7 @@ from leap.common.keymanager.errors import (
 )
 from leap.common.keymanager.keys import (
     EncryptionKey,
-    KeyTypeWrapper,
+    EncryptionScheme,
 )
 from leap.common.keymanager.gpg import GPGWrapper
 from leap.common.keymanager.util import (
@@ -46,29 +46,137 @@ from leap.common.keymanager.util import (
 # Utility functions
 #
 
-def _encrypt_symmetric(data, password):
+def encrypt_sym(data, passphrase):
     """
-    Encrypt C{data} with C{password}.
+    Encrypt C{data} with C{passphrase}.
 
-    This function uses the OpenPGP wrapper to perform the encryption.
+    @param data: The data to be encrypted.
+    @type data: str
+    @param passphrase: The passphrase used to encrypt C{data}.
+    @type passphrase: str
+
+    @return: The encrypted data.
+    @rtype: str
+    """
+
+    def _encrypt_cb(gpg):
+        return str(
+            gpg.encrypt(
+                data, None, passphrase=passphrase, symmetric=True))
+
+    return _safe_call(_encrypt_cb)
+
+
+def decrypt_sym(data, passphrase):
+    """
+    Decrypt C{data} with C{passphrase}.
+
+    @param data: The data to be decrypted.
+    @type data: str
+    @param passphrase: The passphrase used to decrypt C{data}.
+    @type passphrase: str
+
+    @return: The decrypted data.
+    @rtype: str
+    """
+
+    def _decrypt_cb(gpg):
+        return str(gpg.decrypt(data, passphrase=passphrase))
+
+    return _safe_call(_decrypt_cb)
+
+
+def encrypt_asym(data, key):
+    """
+    Encrypt C{data} using public @{key}.
 
     @param data: The data to be encrypted.
     @type data: str
-    @param password: The password used to encrypt C{data}.
-    @type password: str
+    @param key: The key used to encrypt.
+    @type key: OpenPGPKey
+
     @return: The encrypted data.
     @rtype: str
     """
-    cyphertext = None
+    leap_assert(key.private is False, 'Key is not public.')
 
     def _encrypt_cb(gpg):
-        cyphertext = str(
+        return str(
             gpg.encrypt(
-                data, None, passphrase=password, symmetric=True))
-        data['keys'].append(privkey)
+                data, key.fingerprint, symmetric=False))
+
+    return _safe_call(_encrypt_cb, key.key_data)
+
+
+def decrypt_asym(data, key):
+    """
+    Decrypt C{data} using private @{key}.
+
+    @param data: The data to be decrypted.
+    @type data: str
+    @param key: The key used to decrypt.
+    @type key: OpenPGPKey
+
+    @return: The decrypted data.
+    @rtype: str
+    """
+    leap_assert(key.private is True, 'Key is not private.')
 
-    _safe_call(_encrypt_cb)
-    return cyphertext
+    def _decrypt_cb(gpg):
+        return str(gpg.decrypt(data))
+
+    return _safe_call(_decrypt_cb, key.key_data)
+
+
+def is_encrypted(data):
+    """
+    Return whether C{data} was encrypted using OpenPGP.
+
+    @param data: The data we want to know about.
+    @type data: str
+
+    @return: Whether C{data} was encrypted using this wrapper.
+    @rtype: bool
+    """
+
+    def _is_encrypted_cb(gpg):
+        return gpg.is_encrypted(data)
+
+    return _safe_call(_is_encrypted_cb)
+
+
+def is_encrypted_sym(data):
+    """
+    Return whether C{data} was encrypted using a public OpenPGP key.
+
+    @param data: The data we want to know about.
+    @type data: str
+
+    @return: Whether C{data} was encrypted using this wrapper.
+    @rtype: bool
+    """
+
+    def _is_encrypted_cb(gpg):
+        return gpg.is_encrypted_sym(data)
+
+    return _safe_call(_is_encrypted_cb)
+
+
+def is_encrypted_asym(data):
+    """
+    Return whether C{data} was asymmetrically encrypted using OpenPGP.
+
+    @param data: The data we want to know about.
+    @type data: str
+
+    @return: Whether C{data} was encrypted using this wrapper.
+    @rtype: bool
+    """
+
+    def _is_encrypted_cb(gpg):
+        return gpg.is_encrypted_asym(data)
+
+    return _safe_call(_is_encrypted_cb)
 
 
 def _build_key_from_gpg(address, key, key_data):
@@ -154,10 +262,14 @@ def _safe_call(callback, key_data=None, **kwargs):
     @type key_data: str
     @param **kwargs: Other eventual parameters for the callback.
     @type **kwargs: **dict
+
+    @return: The results of the callback.
+    @rtype: str or bool
     """
     gpg = _build_unitary_gpgwrapper(key_data)
-    callback(gpg, **kwargs)
+    val = callback(gpg, **kwargs)
     _destroy_unitary_gpgwrapper(gpg)
+    return val
 
 
 #
@@ -170,7 +282,7 @@ class OpenPGPKey(EncryptionKey):
     """
 
 
-class OpenPGPWrapper(KeyTypeWrapper):
+class OpenPGPScheme(EncryptionScheme):
     """
     A wrapper for OpenPGP keys.
     """
@@ -182,8 +294,7 @@ class OpenPGPWrapper(KeyTypeWrapper):
         @param soledad: A Soledad instance for key storage.
         @type soledad: leap.soledad.Soledad
         """
-        KeyTypeWrapper.__init__(self, soledad)
-        self._soledad = soledad
+        EncryptionScheme.__init__(self, soledad)
 
     def gen_key(self, address):
         """
@@ -223,10 +334,13 @@ class OpenPGPWrapper(KeyTypeWrapper):
             leap_assert(
                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
                 'Key not correctly bound to address.')
-            openpgp_key = _build_key_from_gpg(
-                address, key,
-                gpg.export_keys(key['fingerprint']))
-            self.put_key(openpgp_key)
+            # insert both public and private keys in storage
+            for secret in [True, False]:
+                key = gpg.list_keys(secret=secret).pop()
+                openpgp_key = _build_key_from_gpg(
+                    address, key,
+                    gpg.export_keys(key['fingerprint'], secret=secret))
+                self.put_key(openpgp_key)
 
         _safe_call(_gen_key_cb)
         return self.get_key(address, private=True)
@@ -262,15 +376,38 @@ class OpenPGPWrapper(KeyTypeWrapper):
 
         def _put_key_raw_cb(gpg):
 
-            key = gpg.list_keys(secret=False).pop()  # unitary keyring
+            privkey = None
+            pubkey = None
+            try:
+                privkey = gpg.list_keys(secret=True).pop()
+            except IndexError:
+                pass
+            pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
             # extract adress from first uid on key
-            match = re.match('.*<([\w.-]+@[\w.-]+)>.*', key['uids'].pop())
+            match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
             leap_assert(match is not None, 'No user address in key data.')
             address = match.group(1)
-            openpgp_key = _build_key_from_gpg(
-                address, key,
-                gpg.export_keys(key['fingerprint']))
-            self.put_key(openpgp_key)
+            if privkey is not None:
+                match = re.match(
+                    '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
+                leap_assert(match is not None, 'No user address in key data.')
+                privaddress = match.group(1)
+                leap_assert(
+                    address == privaddress,
+                    'Addresses in pub and priv key differ.')
+                leap_assert(
+                    pubkey['fingerprint'] == privkey['fingerprint'],
+                    'Fingerprints for pub and priv key differ.')
+                # insert private key in storage
+                openpgp_privkey = _build_key_from_gpg(
+                    address, privkey,
+                    gpg.export_keys(privkey['fingerprint'], secret=True))
+                self.put_key(openpgp_privkey)
+            # insert public key in storage
+            openpgp_pubkey = _build_key_from_gpg(
+                address, pubkey,
+                gpg.export_keys(pubkey['fingerprint'], secret=False))
+            self.put_key(openpgp_pubkey)
 
         _safe_call(_put_key_raw_cb, data)
 
@@ -285,7 +422,8 @@ class OpenPGPWrapper(KeyTypeWrapper):
         if doc is None:
             self._soledad.create_doc_from_json(
                 key.get_json(),
-                doc_id=_keymanager_doc_id(key.address, key.private))
+                doc_id=_keymanager_doc_id(
+                    OpenPGPKey, key.address, key.private))
         else:
             doc.set_json(key.get_json())
             self._soledad.put_doc(doc)
@@ -303,4 +441,22 @@ class OpenPGPWrapper(KeyTypeWrapper):
         @return: The document with the key or None if it does not exist.
         @rtype: leap.soledad.backends.leap_backend.LeapDocument
         """
-        return self._soledad.get_doc(_keymanager_doc_id(address, private))
+        return self._soledad.get_doc(
+            _keymanager_doc_id(OpenPGPKey, address, private))
+
+    def delete_key(self, key):
+        """
+        Remove C{key} from storage.
+
+        @param key: The key to be removed.
+        @type key: EncryptionKey
+        """
+        leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
+        stored_key = self.get_key(key.address, private=key.private)
+        if stored_key is None:
+            raise KeyDoesNotExist(key)
+        if stored_key.__dict__ != key.__dict__:
+            raise KeyAttributesDiffer(key)
+        doc = self._soledad.get_doc(
+            _keymanager_doc_id(OpenPGPKey, key.address, key.private))
+        self._soledad.delete_doc(doc)
index 42168c8..667d2b2 100644 (file)
@@ -25,6 +25,9 @@ import re
 
 
 from hashlib import sha256
+from binascii import b2a_base64
+
+
 from leap.common.check import leap_assert
 
 
@@ -79,11 +82,13 @@ def _build_key_from_doc(kClass, address, doc):
     return _build_key_from_dict(kClass, address, doc.content)
 
 
-def _keymanager_doc_id(address, private=False):
+def _keymanager_doc_id(ktype, address, private=False):
     """
     Return the document id for the document containing a key for
     C{address}.
 
+    @param address: The type of the key.
+    @type address: KeyType
     @param address: The address bound to the key.
     @type address: str
     @param private: Whether the key is private or not.
@@ -93,5 +98,6 @@ def _keymanager_doc_id(address, private=False):
     @rtype: str
     """
     leap_assert(_is_address(address), "Wrong address format: %s" % address)
-    ktype = 'private' if private else 'public'
-    return sha256('key-manager-'+address+'-'+ktype).hexdigest()
+    ktype = str(ktype)
+    visibility = 'private' if private else 'public'
+    return sha256('key-manager-'+address+'-'+ktype+'-'+visibility).hexdigest()
index 4a2693e..f9b478f 100644 (file)
@@ -26,14 +26,17 @@ import unittest
 
 from leap.common.testing.basetest import BaseLeapTest
 from leap.soledad import Soledad
+from leap.soledad.crypto import SoledadCrypto
+
+
 from leap.common.keymanager import KeyManager, openpgp, KeyNotFound
 from leap.common.keymanager.openpgp import OpenPGPKey
-from leap.common.keymanager.gpg import GPGWrapper
 from leap.common.keymanager.util import (
     _is_address,
     _build_key_from_dict,
     _keymanager_doc_id,
 )
+from leap.common.keymanager import errors
 
 
 class KeyManagerUtilTestCase(BaseLeapTest):
@@ -72,32 +75,46 @@ class KeyManagerUtilTestCase(BaseLeapTest):
             'validation': 'validation',
         }
         key = _build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict)
-        self.assertEqual(kdict['address'], key.address,
+        self.assertEqual(
+            kdict['address'], key.address,
             'Wrong data in key.')
-        self.assertEqual(kdict['key_id'], key.key_id,
+        self.assertEqual(
+            kdict['key_id'], key.key_id,
             'Wrong data in key.')
-        self.assertEqual(kdict['fingerprint'], key.fingerprint,
+        self.assertEqual(
+            kdict['fingerprint'], key.fingerprint,
             'Wrong data in key.')
-        self.assertEqual(kdict['key_data'], key.key_data,
+        self.assertEqual(
+            kdict['key_data'], key.key_data,
             'Wrong data in key.')
-        self.assertEqual(kdict['private'], key.private,
+        self.assertEqual(
+            kdict['private'], key.private,
             'Wrong data in key.')
-        self.assertEqual(kdict['length'], key.length,
+        self.assertEqual(
+            kdict['length'], key.length,
             'Wrong data in key.')
-        self.assertEqual(kdict['expiry_date'], key.expiry_date,
+        self.assertEqual(
+            kdict['expiry_date'], key.expiry_date,
             'Wrong data in key.')
-        self.assertEqual(kdict['first_seen_at'], key.first_seen_at,
+        self.assertEqual(
+            kdict['first_seen_at'], key.first_seen_at,
             'Wrong data in key.')
-        self.assertEqual(kdict['last_audited_at'], key.last_audited_at,
+        self.assertEqual(
+            kdict['last_audited_at'], key.last_audited_at,
             'Wrong data in key.')
-        self.assertEqual(kdict['validation'], key.validation,
+        self.assertEqual(
+            kdict['validation'], key.validation,
             'Wrong data in key.')
 
     def test__keymanager_doc_id(self):
-        doc_id1 = _keymanager_doc_id('leap@leap.se', private=False)
-        doc_id2 = _keymanager_doc_id('leap@leap.se', private=True)
-        doc_id3 = _keymanager_doc_id('user@leap.se', private=False)
-        doc_id4 = _keymanager_doc_id('user@leap.se', private=True)
+        doc_id1 = _keymanager_doc_id(
+            OpenPGPKey, 'leap@leap.se', private=False)
+        doc_id2 = _keymanager_doc_id(
+            OpenPGPKey, 'leap@leap.se', private=True)
+        doc_id3 = _keymanager_doc_id(
+            OpenPGPKey, 'user@leap.se', private=False)
+        doc_id4 = _keymanager_doc_id(
+            OpenPGPKey, 'user@leap.se', private=True)
         self.assertFalse(doc_id1 == doc_id2, 'Doc ids are equal!')
         self.assertFalse(doc_id1 == doc_id3, 'Doc ids are equal!')
         self.assertFalse(doc_id1 == doc_id4, 'Doc ids are equal!')
@@ -119,7 +136,7 @@ class KeyManagerCryptoTestCase(BaseLeapTest):
         )
         # initialize solead by hand for testing purposes
         self._soledad._init_dirs()
-        self._soledad._gpg = GPGWrapper(gnupghome=self.tempdir+"/gnupg")
+        self._soledad._crypto = SoledadCrypto(self._soledad)
         self._soledad._shared_db = None
         self._soledad._init_keys()
         self._soledad._init_db()
@@ -130,31 +147,86 @@ class KeyManagerCryptoTestCase(BaseLeapTest):
     def _key_manager(user='user@leap.se', url='https://domain.org:6425'):
         return KeyManager(user, url)
 
-    def test_openpgp_gen_key(self):
-        pgp = openpgp.OpenPGPWrapper(self._soledad)
-        try:
-            pgp.get_key('user@leap.se')
-        except KeyNotFound:
-            key = pgp.gen_key('user@leap.se')
-            self.assertIsInstance(key, openpgp.OpenPGPKey)
-            self.assertEqual(
-                'user@leap.se', key.address, 'Wrong address bound to key.')
-            self.assertEqual(
-                '4096', key.length, 'Wrong key length.')
+    def _test_openpgp_gen_key(self):
+        pgp = openpgp.OpenPGPScheme(self._soledad)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'user@leap.se')
+        key = pgp.gen_key('user@leap.se')
+        self.assertIsInstance(key, openpgp.OpenPGPKey)
+        self.assertEqual(
+            'user@leap.se', key.address, 'Wrong address bound to key.')
+        self.assertEqual(
+            '4096', key.length, 'Wrong key length.')
+
+    def test_openpgp_put_delete_key(self):
+        pgp = openpgp.OpenPGPScheme(self._soledad)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        pgp.put_key_raw(PUBLIC_KEY)
+        key = pgp.get_key('leap@leap.se', private=False)
+        pgp.delete_key(key)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
 
     def test_openpgp_put_key_raw(self):
-        pgp = openpgp.OpenPGPWrapper(self._soledad)
-        try:
-            pgp.get_key('leap@leap.se')
-        except KeyNotFound:
-            pgp.put_key_raw(PUBLIC_KEY)
-            key = pgp.get_key('leap@leap.se')
-            self.assertIsInstance(key, openpgp.OpenPGPKey)
-            self.assertEqual(
-                'leap@leap.se', key.address, 'Wrong address bound to key.')
-            self.assertEqual(
-                '4096', key.length, 'Wrong key length.')
+        pgp = openpgp.OpenPGPScheme(self._soledad)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        pgp.put_key_raw(PUBLIC_KEY)
+        key = pgp.get_key('leap@leap.se', private=False)
+        self.assertIsInstance(key, openpgp.OpenPGPKey)
+        self.assertEqual(
+            'leap@leap.se', key.address, 'Wrong address bound to key.')
+        self.assertEqual(
+            '4096', key.length, 'Wrong key length.')
+        pgp.delete_key(key)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+
+    def test_get_public_key(self):
+        pgp = openpgp.OpenPGPScheme(self._soledad)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        pgp.put_key_raw(PUBLIC_KEY)
+        self.assertRaises(
+            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
+        key = pgp.get_key('leap@leap.se', private=False)
+        self.assertEqual('leap@leap.se', key.address)
+        self.assertFalse(key.private)
+        self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+        pgp.delete_key(key)
+        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+
+    def test_openpgp_encrypt_decrypt_asym(self):
+        # encrypt
+        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp.put_key_raw(PUBLIC_KEY)
+        pubkey = pgp.get_key('leap@leap.se', private=False)
+        cyphertext = openpgp.encrypt_asym('data', pubkey)
+        # assert
+        self.assertTrue(cyphertext is not None)
+        self.assertTrue(cyphertext != '')
+        self.assertTrue(cyphertext != 'data')
+        self.assertTrue(openpgp.is_encrypted_asym(cyphertext))
+        self.assertFalse(openpgp.is_encrypted_sym(cyphertext))
+        self.assertTrue(openpgp.is_encrypted(cyphertext))
+        # decrypt
+        self.assertRaises(
+            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
+        pgp.put_key_raw(PRIVATE_KEY)
+        privkey = pgp.get_key('leap@leap.se', private=True)
+        plaintext = openpgp.decrypt_asym(cyphertext, privkey)
+        pgp.delete_key(pubkey)
+        pgp.delete_key(privkey)
+        self.assertRaises(
+            KeyNotFound, pgp.get_key, 'leap@leap.se', private=False)
+        self.assertRaises(
+            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
 
+    def test_openpgp_encrypt_decrypt_sym(self):
+        cyphertext = openpgp.encrypt_sym('data', 'pass')
+        self.assertTrue(cyphertext is not None)
+        self.assertTrue(cyphertext != '')
+        self.assertTrue(cyphertext != 'data')
+        self.assertTrue(openpgp.is_encrypted_sym(cyphertext))
+        self.assertFalse(openpgp.is_encrypted_asym(cyphertext))
+        self.assertTrue(openpgp.is_encrypted(cyphertext))
+        plaintext = openpgp.decrypt_sym(cyphertext, 'pass')
+        self.assertEqual('data', plaintext)
 
 
 # Key material for testing