[refactor] refactor key parsing
authorKali Kaneko <kali@leap.se>
Thu, 10 Sep 2015 14:36:53 +0000 (10:36 -0400)
committerKali Kaneko <kali@leap.se>
Thu, 24 Sep 2015 16:36:16 +0000 (12:36 -0400)
so that it can be tested without needing to instantiate the whole
OpenPGPScheme object, that receives a soledad instance.

src/leap/keymanager/openpgp.py

index b8b47d0..069a78e 100644 (file)
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 # openpgp.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013-2015 LEAP
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -251,7 +251,7 @@ class OpenPGPScheme(EncryptionScheme):
         leap_assert(is_address(address), 'Not an user address: %s' % address)
 
         def _gen_key(_):
-            with self._temporary_gpgwrapper() as gpg:
+            with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
                 # TODO: inspect result, or use decorator
                 params = gpg.gen_key_input(
                     key_type='RSA',
@@ -348,37 +348,25 @@ class OpenPGPScheme(EncryptionScheme):
         # TODO: add more checks for correct key data.
         leap_assert(key_data is not None, 'Data does not represent a key.')
 
-        with self._temporary_gpgwrapper() as gpg:
-            # TODO: inspect result, or use decorator
-            gpg.import_keys(key_data)
-            privkey = None
-            pubkey = None
+        priv_info, privkey = process_ascii_key(
+            key_data, self._gpgbinary, secret=True)
+        pub_info, pubkey = process_ascii_key(
+            key_data, self._gpgbinary, secret=False)
 
-            try:
-                privkey = gpg.list_keys(secret=True).pop()
-            except IndexError:
-                pass
-            try:
-                pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
-            except IndexError:
-                return (None, None)
-
-            openpgp_privkey = None
-            if privkey is not None:
-                # build private key
-                openpgp_privkey = self._build_key_from_gpg(
-                    privkey,
-                    gpg.export_keys(privkey['fingerprint'], secret=True))
-                leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
-                           'Fingerprints for public and private key differ.',
-                           errors.KeyFingerprintMismatch)
-
-            # build public key
-            openpgp_pubkey = self._build_key_from_gpg(
-                pubkey,
-                gpg.export_keys(pubkey['fingerprint'], secret=False))
-
-            return (openpgp_pubkey, openpgp_privkey)
+        if not pubkey:
+            return (None, None)
+
+        openpgp_privkey = None
+        if privkey:
+            # build private key
+            openpgp_privkey = self._build_key_from_gpg(priv_info, privkey)
+            leap_check(pub_info['fingerprint'] == priv_info['fingerprint'],
+                       'Fingerprints for public and private key differ.',
+                       errors.KeyFingerprintMismatch)
+        # build public key
+        openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey)
+
+        return (openpgp_pubkey, openpgp_privkey)
 
     def put_ascii_key(self, key_data, address):
         """
@@ -439,7 +427,7 @@ class OpenPGPScheme(EncryptionScheme):
                 oldkey = build_key_from_dict(OpenPGPKey, doc.content)
                 if key.fingerprint == oldkey.fingerprint:
                     # in case of an update of the key merge them with gnupg
-                    with self._temporary_gpgwrapper() as gpg:
+                    with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
                         gpg.import_keys(oldkey.key_data)
                         gpg.import_keys(key.key_data)
                         gpgkey = gpg.list_keys(secret=key.private).pop()
@@ -577,24 +565,7 @@ class OpenPGPScheme(EncryptionScheme):
         :return: An instance of the key.
         :rtype: OpenPGPKey
         """
-        expiry_date = None
-        if key['expires']:
-            expiry_date = datetime.fromtimestamp(int(key['expires']))
-        address = []
-        for uid in key['uids']:
-            address.append(_parse_address(uid))
-
-        return OpenPGPKey(
-            address,
-            gpgbinary=self._gpgbinary,
-            key_id=key['keyid'],
-            fingerprint=key['fingerprint'],
-            key_data=key_data,
-            private=True if key['type'] == 'sec' else False,
-            length=int(key['length']),
-            expiry_date=expiry_date,
-            refreshed_at=datetime.now(),
-        )
+        return build_gpg_key(key, key_data, self._gpgbinary)
 
     def delete_key(self, key):
         """
@@ -654,21 +625,6 @@ class OpenPGPScheme(EncryptionScheme):
     # Data encryption, decryption, signing and verifying
     #
 
-    def _temporary_gpgwrapper(self, keys=None):
-        """
-        Return a gpg wrapper that implements the context manager protocol and
-        contains C{keys}.
-
-        :param keys: keys to conform the keyring.
-        :type key: list(OpenPGPKey)
-
-        :return: a TempGPGWrapper instance
-        :rtype: TempGPGWrapper
-        """
-        # TODO do here checks on key_data
-        return TempGPGWrapper(
-            keys=keys, gpgbinary=self._gpgbinary)
-
     @staticmethod
     def _assert_gpg_result_ok(result):
         """
@@ -713,7 +669,7 @@ class OpenPGPScheme(EncryptionScheme):
             leap_assert_type(sign, OpenPGPKey)
             leap_assert(sign.private is True)
             keys.append(sign)
-        with self._temporary_gpgwrapper(keys) as gpg:
+        with TempGPGWrapper(keys, self._gpgbinary) as gpg:
             result = gpg.encrypt(
                 data, pubkey.fingerprint,
                 default_key=sign.key_id if sign else None,
@@ -755,7 +711,7 @@ class OpenPGPScheme(EncryptionScheme):
             leap_assert_type(verify, OpenPGPKey)
             leap_assert(verify.private is False)
             keys.append(verify)
-        with self._temporary_gpgwrapper(keys) as gpg:
+        with TempGPGWrapper(keys, self._gpgbinary) as gpg:
             try:
                 result = gpg.decrypt(
                     data, passphrase=passphrase, always_trust=True)
@@ -783,7 +739,7 @@ class OpenPGPScheme(EncryptionScheme):
         :return: Whether C{data} was encrypted using this wrapper.
         :rtype: bool
         """
-        with self._temporary_gpgwrapper() as gpg:
+        with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
             gpgutil = GPGUtilities(gpg)
             return gpgutil.is_encrypted_asym(data)
 
@@ -814,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme):
 
         # result.fingerprint - contains the fingerprint of the key used to
         #                      sign.
-        with self._temporary_gpgwrapper(privkey) as gpg:
+        with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
             result = gpg.sign(data, default_key=privkey.key_id,
                               digest_algo=digest_algo, clearsign=clearsign,
                               detach=detach, binary=binary)
@@ -849,7 +805,7 @@ class OpenPGPScheme(EncryptionScheme):
         """
         leap_assert_type(pubkey, OpenPGPKey)
         leap_assert(pubkey.private is False)
-        with self._temporary_gpgwrapper(pubkey) as gpg:
+        with TempGPGWrapper(pubkey, self._gpgbinary) as gpg:
             result = None
             if detached_sig is None:
                 result = gpg.verify(data)
@@ -867,3 +823,35 @@ class OpenPGPScheme(EncryptionScheme):
             rfprint = result.fingerprint
             kfprint = gpgpubkey['fingerprint']
             return valid and rfprint == kfprint
+
+
+def process_ascii_key(key_data, gpgbinary, secret=False):
+    with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
+        try:
+            gpg.import_keys(key_data)
+            info = gpg.list_keys(secret=secret).pop()
+            key = gpg.export_keys(info['fingerprint'], secret=secret)
+        except IndexError:
+            info = {}
+            key = None
+    return info, key
+
+
+def build_gpg_key(key_info, key_data, gpgbinary=None):
+    expiry_date = None
+    if key_info['expires']:
+        expiry_date = datetime.fromtimestamp(int(key_info['expires']))
+    address = []
+    for uid in key_info['uids']:
+        address.append(_parse_address(uid))
+
+    return OpenPGPKey(
+        address,
+        gpgbinary=gpgbinary,
+        key_id=key_info['keyid'],
+        fingerprint=key_info['fingerprint'],
+        key_data=key_data,
+        private=True if key_info['type'] == 'sec' else False,
+        length=int(key_info['length']),
+        expiry_date=expiry_date,
+        refreshed_at=datetime.now())