use temporary openpgpwrapper as a context manager
authorKali Kaneko <kali@leap.se>
Mon, 13 May 2013 14:06:35 +0000 (23:06 +0900)
committerKali Kaneko <kali@leap.se>
Mon, 20 May 2013 16:57:26 +0000 (01:57 +0900)
in this way we implicitely catch any exception during
the wrapped call, and ensure that the destructor is always
called.

.gitignore
changes/feature_openpgp-context-manager [new file with mode: 0644]
pkg/requirements-testing.pip [new file with mode: 0644]
src/leap/common/keymanager/__init__.py
src/leap/common/keymanager/errors.py
src/leap/common/keymanager/openpgp.py
src/leap/common/tests/test_keymanager.py

index 331608c..e0d60cb 100644 (file)
@@ -1,5 +1,7 @@
 *.pyc
 *.egg
 *.egg-info
+*.swp
+*.swo
 dist/
 build/
diff --git a/changes/feature_openpgp-context-manager b/changes/feature_openpgp-context-manager
new file mode 100644 (file)
index 0000000..4dbf759
--- /dev/null
@@ -0,0 +1 @@
+  o Refactor opengpg utility functions implementation so it uses a context manager.
diff --git a/pkg/requirements-testing.pip b/pkg/requirements-testing.pip
new file mode 100644 (file)
index 0000000..932a895
--- /dev/null
@@ -0,0 +1 @@
+mock
index 3427f03..30a9146 100644 (file)
@@ -160,7 +160,8 @@ class KeyManager(object):
             privkey = self.get_key(
                 self._address, ktype, private=True, fetch_remote=False)
             privkey = json.loads(privkey.get_json())
-            privkey.key_data = encrypt_sym(privkey.key_data, password)
+            privkey.key_data = encrypt_sym(
+                privkey.key_data, passphrase=password)
             data['keys'].append(privkey)
         self._fetcher.put(
             self._nickserver_url + '/key/' + self._address,
index d6802f0..89949d2 100644 (file)
@@ -72,6 +72,13 @@ class DecryptionFailed(Exception):
     pass
 
 
+class EncryptionDecryptionFailed(Exception):
+    """
+    Raised upon failures of encryption/decryption.
+    """
+    pass
+
+
 class SignFailed(Exception):
     """
     Raised when failed to sign.
index d578638..e60833b 100644 (file)
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 
-
 """
 Infrastructure for using OpenPGP keys in Key Manager.
 """
-
-
+import logging
+import os
 import re
-import tempfile
 import shutil
+import tempfile
 
 from leap.common.check import leap_assert, leap_assert_type
-from leap.common.keymanager.errors import (
-    KeyNotFound,
-    KeyAlreadyExists,
-    KeyAttributesDiffer,
-    InvalidSignature,
-    EncryptionFailed,
-    DecryptionFailed,
-    SignFailed,
-)
+from leap.common.keymanager import errors
+
 from leap.common.keymanager.keys import (
     EncryptionKey,
     EncryptionScheme,
@@ -44,12 +36,242 @@ from leap.common.keymanager.keys import (
 )
 from leap.common.keymanager.gpg import GPGWrapper
 
+logger = logging.getLogger(__name__)
+
+
+#
+# gpg wrapper and decorator
+#
+
+def temporary_gpgwrapper(keys=None):
+    """
+    Returns a unitary gpg wrapper that implements context manager
+    protocol.
+
+    @param key_data: ASCII armored key data.
+    @type key_data: str
+
+    @return: a GPGWrapper instance
+    @rtype: GPGWrapper
+    """
+    # TODO do here checks on key_data
+    return TempGPGWrapper(keys=keys)
+
+
+def with_temporary_gpg(fun):
+    """
+    Decorator to add a temporary gpg wrapper as context
+    to gpg related functions.
+
+    Decorated functions are expected to return a function whose only
+    argument is a gpgwrapper instance.
+    """
+    def wrapped(*args, **kwargs):
+        """
+        We extract the arguments passed to the wrapped function,
+        run the function and do validations.
+        We expect that the positional arguments are `data`,
+        and an optional `key`.
+        All the rest of arguments should be passed as named arguments
+        to allow for a correct unpacking.
+        """
+        if len(args) == 2:
+            keys = args[1] if isinstance(args[1], OpenPGPKey) else None
+        else:
+            keys = None
+
+        # sign/verify keys passed as arguments
+        sign = kwargs.get('sign', None)
+        if sign:
+            keys = [keys, sign]
+
+        verify = kwargs.get('verify', None)
+        if verify:
+            keys = [keys, verify]
+
+        # is the wrapped function sign or verify?
+        fun_name = fun.__name__
+        is_sign_function = True if fun_name == "sign" else False
+        is_verify_function = True if fun_name == "verify" else False
+
+        result = None
+
+        with temporary_gpgwrapper(keys) as gpg:
+            result = fun(*args, **kwargs)(gpg)
+
+            # TODO: cleanup a little bit the
+            # validation. maybe delegate to other
+            # auxiliary functions for clarity.
+
+            ok = getattr(result, 'ok', None)
+
+            stderr = getattr(result, 'stderr', None)
+            if stderr:
+                logger.debug("%s" % (stderr,))
+
+            if ok is False:
+                raise errors.EncryptionDecryptionFailed(
+                    'Failed to encrypt/decrypt in %s: %s' % (
+                        fun.__name__,
+                        stderr))
+
+            if verify is not None:
+                # A verify key has been passed
+                if result.valid is False or \
+                        verify.fingerprint != result.pubkey_fingerprint:
+                    raise errors.InvalidSignature(
+                        'Failed to verify signature with key %s: %s' %
+                        (verify.key_id, stderr))
+
+            if is_sign_function:
+                # Specific validation for sign function
+                privkey = gpg.list_keys(secret=True).pop()
+                rfprint = result.fingerprint
+                kfprint = privkey['fingerprint']
+                if result.fingerprint is None:
+                    raise errors.SignFailed(
+                        'Failed to sign with key %s: %s' %
+                        (privkey['keyid'], stderr))
+                leap_assert(
+                    result.fingerprint == kfprint,
+                    'Signature and private key fingerprints mismatch: '
+                    '%s != %s' %
+                    (rfprint, kfprint))
+
+            if is_verify_function:
+                # Specific validation for verify function
+                pubkey = gpg.list_keys().pop()
+                valid = result.valid
+                rfprint = result.fingerprint
+                kfprint = pubkey['fingerprint']
+                if valid is False or rfprint != kfprint:
+                    raise errors.InvalidSignature(
+                        'Failed to verify signature '
+                        'with key %s.' % pubkey['keyid'])
+                result = result.valid
+
+            # ok, enough checks. let's return data if available
+            if hasattr(result, 'data'):
+                result = result.data
+        return result
+    return wrapped
+
+
+class TempGPGWrapper(object):
+    """
+    A context manager returning a temporary GPG wrapper keyring, which
+    contains exactly zero or one pubkeys, and zero or one privkeys.
+
+    Temporary unitary keyrings allow the to use GPG's facilities for exactly
+    one key. This function creates an empty temporary keyring and imports
+    C{keys} if it is not None.
+    """
+    def __init__(self, keys=None):
+        """
+        @param keys: OpenPGP key, or list of.
+        @type keys: OpenPGPKey or list of OpenPGPKeys
+        """
+        self._gpg = None
+        if not keys:
+            keys = list()
+        if not isinstance(keys, list):
+            keys = [keys]
+        self._keys = keys
+        for key in filter(None, keys):
+            leap_assert_type(key, OpenPGPKey)
+
+    def __enter__(self):
+        """
+        Calls the unitary gpgwrapper initializer
+
+        @return: A GPG wrapper with a unitary keyring.
+        @rtype: gnupg.GPG
+        """
+        self._build_keyring()
+        return self._gpg
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        """
+        Ensures the gpgwrapper is properly destroyed.
+        """
+        # TODO handle exceptions and log here
+        self._destroy_keyring()
+
+    def _build_keyring(self):
+        """
+        Create an empty GPG keyring and import C{keys} into it.
+
+        @param keys: List of keys to add to the keyring.
+        @type keys: list of OpenPGPKey
+
+        @return: A GPG wrapper with a unitary keyring.
+        @rtype: gnupg.GPG
+        """
+        privkeys = [key for key in self._keys if key and key.private is True]
+        publkeys = [key for key in self._keys if key and key.private is False]
+        # here we filter out public keys that have a correspondent
+        # private key in the list because the private key_data by
+        # itself is enough to also have the public key in the keyring,
+        # and we want to count the keys afterwards.
+
+        privaddrs = map(lambda privkey: privkey.address, privkeys)
+        publkeys = filter(
+            lambda pubkey: pubkey.address not in privaddrs, publkeys)
+
+        listkeys = lambda: self._gpg.list_keys()
+        listsecretkeys = lambda: self._gpg.list_keys(secret=True)
+
+        self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
+        leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
+
+        # import keys into the keyring:
+        # concatenating ascii-armored keys, which is correctly
+        # understood by the GPGWrapper.
+
+        self._gpg.import_keys("".join(
+            [x.key_data for x in publkeys + privkeys]))
+
+        # assert the number of keys in the keyring
+        leap_assert(
+            len(listkeys()) == len(publkeys) + len(privkeys),
+            'Wrong number of public keys in keyring: %d, should be %d)' %
+            (len(listkeys()), len(publkeys) + len(privkeys)))
+        leap_assert(
+            len(listsecretkeys()) == len(privkeys),
+            'Wrong number of private keys in keyring: %d, should be %d)' %
+            (len(listsecretkeys()), len(privkeys)))
+
+    def _destroy_keyring(self):
+        """
+        Securely erase a unitary keyring.
+        """
+        # TODO: implement some kind of wiping of data or a more
+        # secure way that
+        # does not write to disk.
+
+        try:
+            for secret in [True, False]:
+                for key in self._gpg.list_keys(secret=secret):
+                    self._gpg.delete_keys(
+                        key['fingerprint'],
+                        secret=secret)
+            leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
+
+        except:
+            raise
+
+        finally:
+            leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
+                        "watch out! Tried to remove default gnupg home!")
+            shutil.rmtree(self._gpg.gnupghome)
+
 
 #
 # API functions
 #
 
-def encrypt_sym(data, passphrase, sign=None):
+@with_temporary_gpg
+def encrypt_sym(data, passphrase=None, sign=None):
     """
     Encrypt C{data} with C{passphrase} and sign with C{sign} private key.
 
@@ -68,23 +290,19 @@ def encrypt_sym(data, passphrase, sign=None):
         leap_assert_type(sign, OpenPGPKey)
         leap_assert(sign.private is True)
 
-    def _encrypt_cb(gpg):
-        result = gpg.encrypt(
-            data, None,
-            sign=sign.key_id if sign else None,
-            passphrase=passphrase, symmetric=True)
-        # Here we cannot assert for correctness of sig because the sig is in
-        # the ciphertext.
-        # result.ok    - (bool) indicates if the operation succeeded
-        # result.data  - (bool) contains the result of the operation
-        if result.ok is False:
-            raise EncryptionFailed('Failed to encrypt: %s' % result.stderr)
-        return result.data
+    # Here we cannot assert for correctness of sig because the sig is in
+    # the ciphertext.
+    # result.ok    - (bool) indicates if the operation succeeded
+    # result.data  - (bool) contains the result of the operation
 
-    return _safe_call(_encrypt_cb, [sign])
+    return lambda gpg: gpg.encrypt(
+        data, None,
+        sign=sign.key_id if sign else None,
+        passphrase=passphrase, symmetric=True)
 
 
-def decrypt_sym(data, passphrase, verify=None):
+@with_temporary_gpg
+def decrypt_sym(data, passphrase=None, verify=None):
     """
     Decrypt C{data} with C{passphrase} and verify with C{verify} public
     key.
@@ -107,27 +325,17 @@ def decrypt_sym(data, passphrase, verify=None):
         leap_assert_type(verify, OpenPGPKey)
         leap_assert(verify.private is False)
 
-    def _decrypt_cb(gpg):
-        result = gpg.decrypt(data, passphrase=passphrase)
-        # result.ok    - (bool) indicates if the operation succeeded
-        # result.valid - (bool) indicates if the signature was verified
-        # result.data  - (bool) contains the result of the operation
-        # result.pubkey_fingerpring  - (str) contains the fingerprint of the
-        #                              public key that signed this data.
-        if result.ok is False:
-            raise DecryptionFailed('Failed to decrypt: %s', result.stderr)
-        if verify is not None:
-            if result.valid is False or \
-                    verify.fingerprint != result.pubkey_fingerprint:
-                raise InvalidSignature(
-                    'Failed to verify signature with key %s: %s' %
-                    (verify.key_id, result.stderr))
-        return result.data
-
-    return _safe_call(_decrypt_cb, [verify])
-
-
-def encrypt_asym(data, pubkey, sign=None):
+    # result.ok    - (bool) indicates if the operation succeeded
+    # result.valid - (bool) indicates if the signature was verified
+    # result.data  - (bool) contains the result of the operation
+    # result.pubkey_fingerpring  - (str) contains the fingerprint of the
+    #                              public key that signed this data.
+    return lambda gpg: gpg.decrypt(
+        data, passphrase=passphrase)
+
+
+@with_temporary_gpg
+def encrypt_asym(data, key, passphrase=None, sign=None):
     """
     Encrypt C{data} using public @{key} and sign with C{sign} key.
 
@@ -141,31 +349,25 @@ def encrypt_asym(data, pubkey, sign=None):
     @return: The encrypted data.
     @rtype: str
     """
-    leap_assert_type(pubkey, OpenPGPKey)
-    leap_assert(pubkey.private is False, 'Key is not public.')
+    leap_assert_type(key, OpenPGPKey)
+    leap_assert(key.private is False, 'Key is not public.')
     if sign is not None:
         leap_assert_type(sign, OpenPGPKey)
         leap_assert(sign.private is True)
 
-    def _encrypt_cb(gpg):
-        result = gpg.encrypt(
-            data, pubkey.fingerprint,
-            sign=sign.key_id if sign else None,
-            symmetric=False)
-        # Here we cannot assert for correctness of sig because the sig is in
-        # the ciphertext.
-        # result.ok    - (bool) indicates if the operation succeeded
-        # result.data  - (bool) contains the result of the operation
-        if result.ok is False:
-            raise EncryptionFailed(
-                'Failed to encrypt with key %s: %s' %
-                (pubkey.key_id, result.stderr))
-        return result.data
-
-    return _safe_call(_encrypt_cb, [pubkey, sign])
-
-
-def decrypt_asym(data, privkey, verify=None):
+    # Here we cannot assert for correctness of sig because the sig is in
+    # the ciphertext.
+    # result.ok    - (bool) indicates if the operation succeeded
+    # result.data  - (bool) contains the result of the operation
+
+    return lambda gpg: gpg.encrypt(
+        data, key.fingerprint,
+        sign=sign.key_id if sign else None,
+        passphrase=passphrase, symmetric=False)
+
+
+@with_temporary_gpg
+def decrypt_asym(data, key, passphrase=None, verify=None):
     """
     Decrypt C{data} using private @{key} and verify with C{verify} key.
 
@@ -182,32 +384,16 @@ def decrypt_asym(data, privkey, verify=None):
     @raise InvalidSignature: Raised if unable to verify the signature with
         C{verify} key.
     """
-    leap_assert(privkey.private is True, 'Key is not private.')
+    leap_assert(key.private is True, 'Key is not private.')
     if verify is not None:
         leap_assert_type(verify, OpenPGPKey)
         leap_assert(verify.private is False)
 
-    def _decrypt_cb(gpg):
-        result = gpg.decrypt(data)
-        # result.ok    - (bool) indicates if the operation succeeded
-        # result.valid - (bool) indicates if the signature was verified
-        # result.data  - (bool) contains the result of the operation
-        # result.pubkey_fingerpring  - (str) contains the fingerprint of the
-        #                              public key that signed this data.
-        if result.ok is False:
-            raise DecryptionFailed('Failed to decrypt with key %s: %s' %
-                                   (privkey.key_id, result.stderr))
-        if verify is not None:
-            if result.valid is False or \
-                    verify.fingerprint != result.pubkey_fingerprint:
-                raise InvalidSignature(
-                    'Failed to verify signature with key %s: %s' %
-                    (verify.key_id, result.stderr))
-        return result.data
-
-    return _safe_call(_decrypt_cb, [privkey, verify])
+    return lambda gpg: gpg.decrypt(
+        data, passphrase=passphrase)
 
 
+@with_temporary_gpg
 def is_encrypted(data):
     """
     Return whether C{data} was encrypted using OpenPGP.
@@ -218,13 +404,10 @@ def is_encrypted(data):
     @return: Whether C{data} was encrypted using this wrapper.
     @rtype: bool
     """
-
-    def _is_encrypted_cb(gpg):
-        return gpg.is_encrypted(data)
-
-    return _safe_call(_is_encrypted_cb)
+    return lambda gpg: gpg.is_encrypted(data)
 
 
+@with_temporary_gpg
 def is_encrypted_sym(data):
     """
     Return whether C{data} was encrypted using a public OpenPGP key.
@@ -235,13 +418,10 @@ def is_encrypted_sym(data):
     @return: Whether C{data} was encrypted using this wrapper.
     @rtype: bool
     """
-
-    def _is_encrypted_cb(gpg):
-        return gpg.is_encrypted_sym(data)
-
-    return _safe_call(_is_encrypted_cb)
+    return lambda gpg: gpg.is_encrypted_sym(data)
 
 
+@with_temporary_gpg
 def is_encrypted_asym(data):
     """
     Return whether C{data} was asymmetrically encrypted using OpenPGP.
@@ -252,19 +432,17 @@ def is_encrypted_asym(data):
     @return: Whether C{data} was encrypted using this wrapper.
     @rtype: bool
     """
-
-    def _is_encrypted_cb(gpg):
-        return gpg.is_encrypted_asym(data)
-
-    return _safe_call(_is_encrypted_cb)
+    return lambda gpg: gpg.is_encrypted_asym(data)
 
 
+@with_temporary_gpg
 def sign(data, privkey):
     """
     Sign C{data} with C{privkey}.
 
     @param data: The data to be signed.
     @type data: str
+
     @param privkey: The private key to be used to sign.
     @type privkey: OpenPGPKey
 
@@ -274,53 +452,36 @@ def sign(data, privkey):
     leap_assert_type(privkey, OpenPGPKey)
     leap_assert(privkey.private is True)
 
-    def _sign_cb(gpg):
-        result = gpg.sign(data, keyid=privkey.key_id)
-        # result.fingerprint - contains the fingerprint of the key used to
-        #                      sign.
-        if result.fingerprint is None:
-            raise SignFailed(
-                'Failed to sign with key %s: %s' %
-                (privkey.key_id, result.stderr))
-        leap_assert(
-            result.fingerprint == privkey.fingerprint,
-            'Signature and private key fingerprints mismatch: %s != %s' %
-            (result.fingerprint, privkey.fingerprint))
-        return result.data
-
-    return _safe_call(_sign_cb, [privkey])
+    # result.fingerprint - contains the fingerprint of the key used to
+    #                      sign.
+    return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
 
 
-def verify(data, pubkey):
+@with_temporary_gpg
+def verify(data, key):
     """
     Verify signed C{data} with C{pubkey}.
 
     @param data: The data to be verified.
     @type data: str
+
     @param pubkey: The public key to be used on verification.
     @type pubkey: OpenPGPKey
 
     @return: The ascii-armored signed data.
     @rtype: str
     """
-    leap_assert_type(pubkey, OpenPGPKey)
-    leap_assert(pubkey.private is False)
-
-    def _verify_cb(gpg):
-        result = gpg.verify(data)
-        if result.valid is False or \
-                result.fingerprint != pubkey.fingerprint:
-            raise InvalidSignature(
-                'Failed to verify signature with key %s.' % pubkey.key_id)
-        return True
+    leap_assert_type(key, OpenPGPKey)
+    leap_assert(key.private is False)
 
-    return _safe_call(_verify_cb, [pubkey])
+    return lambda gpg: gpg.verify(data)
 
 
 #
 # Helper functions
 #
 
+
 def _build_key_from_gpg(address, key, key_data):
     """
     Build an OpenPGPKey for C{address} based on C{key} from
@@ -350,81 +511,6 @@ def _build_key_from_gpg(address, key, key_data):
     )
 
 
-def _build_keyring(keys=[]):
-    """
-
-    Create an empty GPG keyring and import C{keys} into it.
-
-    @param keys: List of keys to add to the keyring.
-    @type keys: list of OpenPGPKey
-
-    @return: A GPG wrapper with a unitary keyring.
-    @rtype: gnupg.GPG
-    """
-    privkeys = filter(lambda key: key.private is True, keys)
-    pubkeys = filter(lambda key: key.private is False, keys)
-    # here we filter out public keys that have a correspondent private key in
-    # the list because the private key_data by itself is enough to also have
-    # the public key in the keyring, and we want to count the keys afterwards.
-    privaddrs = map(lambda privkey: privkey.address, privkeys)
-    pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys)
-    # create temporary dir for temporary gpg keyring
-    tmpdir = tempfile.mkdtemp()
-    gpg = GPGWrapper(gnupghome=tmpdir)
-    leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.')
-    # import keys into the keyring
-    gpg.import_keys(
-        reduce(
-            lambda x, y: x+y,
-            [key.key_data for key in pubkeys+privkeys], ''))
-    # assert the number of keys in the keyring
-    leap_assert(
-        len(gpg.list_keys()) == len(pubkeys)+len(privkeys),
-        'Wrong number of public keys in keyring: %d, should be %d)' %
-        (len(gpg.list_keys()), len(pubkeys)+len(privkeys)))
-    leap_assert(
-        len(gpg.list_keys(secret=True)) == len(privkeys),
-        'Wrong number of private keys in keyring: %d, should be %d)' %
-        (len(gpg.list_keys(secret=True)), len(privkeys)))
-    return gpg
-
-
-def _destroy_keyring(gpg):
-    """
-    Securely erase a keyring.
-
-    @param gpg: A GPG wrapper instance.
-    @type gpg: gnupg.GPG
-    """
-    for secret in [True, False]:
-        for key in gpg.list_keys(secret=secret):
-            gpg.delete_keys(
-                key['fingerprint'],
-                secret=secret)
-    leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty!')
-    # TODO: implement some kind of wiping of data or a more secure way that
-    # does not write to disk.
-    shutil.rmtree(gpg.gnupghome)
-
-
-def _safe_call(callback, keys=[]):
-    """
-    Run C{callback} over a keyring containing C{keys}.
-
-    @param callback: Function whose first argument is the gpg keyring.
-    @type callback: function(gnupg.GPG)
-    @param keys: List of keys to add to the keyring.
-    @type keys: list of OpenPGPKey
-
-    @return: The results of the callback.
-    @rtype: str or bool
-    """
-    gpg = _build_keyring(filter(lambda key: key is not None, keys))
-    val = callback(gpg)
-    _destroy_keyring(gpg)
-    return val
-
-
 #
 # The OpenPGP wrapper
 #
@@ -463,11 +549,11 @@ class OpenPGPScheme(EncryptionScheme):
         leap_assert(is_address(address), 'Not an user address: %s' % address)
         try:
             self.get_key(address)
-            raise KeyAlreadyExists(address)
-        except KeyNotFound:
+            raise errors.KeyAlreadyExists(address)
+        except errors.KeyNotFound:
             pass
 
-        def _gen_key_cb(gpg):
+        def _gen_key(gpg):
             params = gpg.gen_key_input(
                 key_type='RSA',
                 key_length=4096,
@@ -495,7 +581,10 @@ class OpenPGPScheme(EncryptionScheme):
                     gpg.export_keys(key['fingerprint'], secret=secret))
                 self.put_key(openpgp_key)
 
-        _safe_call(_gen_key_cb)
+        with temporary_gpgwrapper() as gpg:
+            # TODO: inspect result, or use decorator
+            _gen_key(gpg)
+
         return self.get_key(address, private=True)
 
     def get_key(self, address, private=False):
@@ -514,7 +603,7 @@ class OpenPGPScheme(EncryptionScheme):
         leap_assert(is_address(address), 'Not an user address: %s' % address)
         doc = self._get_key_doc(address, private)
         if doc is None:
-            raise KeyNotFound(address)
+            raise errors.KeyNotFound(address)
         return build_key_from_dict(OpenPGPKey, address, doc.content)
 
     def put_ascii_key(self, key_data):
@@ -525,11 +614,14 @@ class OpenPGPScheme(EncryptionScheme):
         @type key_data: str
         """
         leap_assert_type(key_data, str)
+        # TODO: add more checks for correct key data.
+        leap_assert(key_data is not None, 'Data does not represent a key.')
 
-        def _put_ascii_key_cb(gpg):
+        def _put_ascii_key(gpg):
             gpg.import_keys(key_data)
             privkey = None
             pubkey = None
+
             try:
                 privkey = gpg.list_keys(secret=True).pop()
             except IndexError:
@@ -561,7 +653,9 @@ class OpenPGPScheme(EncryptionScheme):
                 gpg.export_keys(pubkey['fingerprint'], secret=False))
             self.put_key(openpgp_pubkey)
 
-        _safe_call(_put_ascii_key_cb)
+        with temporary_gpgwrapper() as gpg:
+            # TODO: inspect result, or use decorator
+            _put_ascii_key(gpg)
 
     def put_key(self, key):
         """
@@ -606,9 +700,9 @@ class OpenPGPScheme(EncryptionScheme):
         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
         stored_key = self.get_key(key.address, private=key.private)
         if stored_key is None:
-            raise KeyNotFound(key)
+            raise errors.KeyNotFound(key)
         if stored_key.__dict__ != key.__dict__:
-            raise KeyAttributesDiffer(key)
+            raise errors.KeyAttributesDiffer(key)
         doc = self._soledad.get_doc(
             keymanager_doc_id(OpenPGPKey, key.address, key.private))
         self._soledad.delete_doc(doc)
index 48f7273..a7aa1ca 100644 (file)
@@ -30,16 +30,15 @@ except ImportError:
 
 from leap.common.testing.basetest import BaseLeapTest
 from leap.soledad import Soledad
-from leap.soledad.crypto import SoledadCrypto
-
+#from leap.soledad.crypto import SoledadCrypto
 
 from leap.common.keymanager import (
     KeyManager,
     openpgp,
     KeyNotFound,
     NoPasswordGiven,
-    TAGS_INDEX,
-    TAGS_AND_PRIVATE_INDEX,
+    #TAGS_INDEX,
+    #TAGS_AND_PRIVATE_INDEX,
 )
 from leap.common.keymanager.openpgp import OpenPGPKey
 from leap.common.keymanager.keys import (
@@ -240,14 +239,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
             KeyNotFound, pgp.get_key, ADDRESS, private=True)
 
     def test_openpgp_encrypt_decrypt_sym(self):
-        cyphertext = openpgp.encrypt_sym('data', 'pass')
+        cyphertext = openpgp.encrypt_sym(
+            'data', passphrase='pass')
         self.assertTrue(cyphertext is not None)
         self.assertTrue(cyphertext != '')
         self.assertTrue(cyphertext != 'data')
         self.assertTrue(openpgp.is_encrypted_sym(cyphertext))
         self.assertFalse(openpgp.is_encrypted_asym(cyphertext))
         self.assertTrue(openpgp.is_encrypted(cyphertext))
-        plaintext = openpgp.decrypt_sym(cyphertext, 'pass')
+        plaintext = openpgp.decrypt_sym(
+            cyphertext, passphrase='pass')
         self.assertEqual('data', plaintext)
 
     def test_verify_with_private_raises(self):
@@ -298,7 +299,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pubkey = pgp.get_key(ADDRESS, private=False)
         self.assertRaises(
             AssertionError,
-            openpgp.encrypt_sym, data, '123', sign=pubkey)
+            openpgp.encrypt_sym, data, passphrase='123', sign=pubkey)
 
     def test_decrypt_asym_verify_with_private_raises(self):
         pgp = openpgp.OpenPGPScheme(self._soledad)
@@ -321,7 +322,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pubkey = pgp.get_key(ADDRESS, private=False)
         encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey)
         pgp.put_ascii_key(PUBLIC_KEY_2)
-        wrongkey = pgp.get_key('anotheruser@leap.se')
+        wrongkey = pgp.get_key(ADDRESS_2)
         self.assertRaises(
             errors.InvalidSignature,
             openpgp.verify, encrypted_and_signed, wrongkey)
@@ -332,19 +333,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         encrypted_and_signed = openpgp.encrypt_sym(data, '123', sign=privkey)
-        self.assertRaises(
-            AssertionError,
-            openpgp.decrypt_sym,
-            encrypted_and_signed, 'decrypt', verify=privkey)
-
-    def test_decrypt_sym_verify_with_private_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
-        pgp.put_ascii_key(PRIVATE_KEY)
-        data = 'data'
-        privkey = pgp.get_key(ADDRESS, private=True)
-        encrypted_and_signed = openpgp.encrypt_sym(data, '123', sign=privkey)
         pgp.put_ascii_key(PUBLIC_KEY_2)
-        wrongkey = pgp.get_key('anotheruser@leap.se')
+        wrongkey = pgp.get_key(ADDRESS_2)
         self.assertRaises(
             errors.InvalidSignature,
             openpgp.verify, encrypted_and_signed, wrongkey)
@@ -385,8 +375,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         self.assertEqual(data, res)
 
 
-class KeyManagerKeyManagementTestCase(
-        KeyManagerWithSoledadTestCase):
+class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
 
     def test_get_all_keys_in_db(self):
         km = self._key_manager()
@@ -477,15 +466,15 @@ class KeyManagerKeyManagementTestCase(
             headers = {'content-type': 'application/json'}
 
             def json(self):
-                return {'address': 'anotheruser@leap.se', 'keys': []}
+                return {'address': ADDRESS_2, 'keys': []}
 
         km._fetcher.get = Mock(
             return_value=Response())
         # do the fetch
-        km.fetch_keys_from_server('anotheruser@leap.se')
+        km.fetch_keys_from_server(ADDRESS_2)
         # and verify the call
         km._fetcher.get.assert_called_once_with(
-            km._nickserver_url + '/key/' + 'anotheruser@leap.se',
+            km._nickserver_url + '/key/' + ADDRESS_2,
         )
 
     def test_refresh_keys(self):
@@ -495,11 +484,13 @@ class KeyManagerKeyManagementTestCase(
         km.fetch_keys_from_server = Mock(return_value=[])
         km.refresh_keys()
         km.fetch_keys_from_server.assert_called_once_with(
-            'leap@leap.se'
+            ADDRESS
         )
 
 
 # Key material for testing
+
+# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
 KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
 PUBLIC_KEY = """
 -----BEGIN PGP PUBLIC KEY BLOCK-----
@@ -662,6 +653,7 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=
 -----END PGP PRIVATE KEY BLOCK-----
 """
 
+# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>"
 PUBLIC_KEY_2 = """
 -----BEGIN PGP PUBLIC KEY BLOCK-----
 Version: GnuPG v1.4.10 (GNU/Linux)
@@ -719,3 +711,6 @@ THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0
 =a5gs
 -----END PGP PRIVATE KEY BLOCK-----
 """
+import unittest
+if __name__ == "__main__":
+    unittest.main()