diff options
| author | Kali Kaneko <kali@leap.se> | 2015-09-10 10:36:53 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2015-09-24 12:36:16 -0400 | 
| commit | f3158ed7a792b7c02b2f89fd90aefa708882557f (patch) | |
| tree | 75058fc10aa336a68d9bb61217a6e97533065b4c /keymanager | |
| parent | 1b8847b244586dec237949d477c474bc77c8264c (diff) | |
[refactor] refactor key parsing
so that it can be tested without needing to instantiate the whole
OpenPGPScheme object, that receives a soledad instance.
Diffstat (limited to 'keymanager')
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 130 | 
1 files changed, 59 insertions, 71 deletions
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index b8b47d0..069a78e 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # openpgp.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013-2015 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -251,7 +251,7 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert(is_address(address), 'Not an user address: %s' % address)          def _gen_key(_): -            with self._temporary_gpgwrapper() as gpg: +            with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:                  # TODO: inspect result, or use decorator                  params = gpg.gen_key_input(                      key_type='RSA', @@ -348,37 +348,25 @@ class OpenPGPScheme(EncryptionScheme):          # TODO: add more checks for correct key data.          leap_assert(key_data is not None, 'Data does not represent a key.') -        with self._temporary_gpgwrapper() as gpg: -            # TODO: inspect result, or use decorator -            gpg.import_keys(key_data) -            privkey = None -            pubkey = None +        priv_info, privkey = process_ascii_key( +            key_data, self._gpgbinary, secret=True) +        pub_info, pubkey = process_ascii_key( +            key_data, self._gpgbinary, secret=False) -            try: -                privkey = gpg.list_keys(secret=True).pop() -            except IndexError: -                pass -            try: -                pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring -            except IndexError: -                return (None, None) - -            openpgp_privkey = None -            if privkey is not None: -                # build private key -                openpgp_privkey = self._build_key_from_gpg( -                    privkey, -                    gpg.export_keys(privkey['fingerprint'], secret=True)) -                leap_check(pubkey['fingerprint'] == privkey['fingerprint'], -                           'Fingerprints for public and private key differ.', -                           errors.KeyFingerprintMismatch) - -            # build public key -            openpgp_pubkey = self._build_key_from_gpg( -                pubkey, -                gpg.export_keys(pubkey['fingerprint'], secret=False)) - -            return (openpgp_pubkey, openpgp_privkey) +        if not pubkey: +            return (None, None) + +        openpgp_privkey = None +        if privkey: +            # build private key +            openpgp_privkey = self._build_key_from_gpg(priv_info, privkey) +            leap_check(pub_info['fingerprint'] == priv_info['fingerprint'], +                       'Fingerprints for public and private key differ.', +                       errors.KeyFingerprintMismatch) +        # build public key +        openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey) + +        return (openpgp_pubkey, openpgp_privkey)      def put_ascii_key(self, key_data, address):          """ @@ -439,7 +427,7 @@ class OpenPGPScheme(EncryptionScheme):                  oldkey = build_key_from_dict(OpenPGPKey, doc.content)                  if key.fingerprint == oldkey.fingerprint:                      # in case of an update of the key merge them with gnupg -                    with self._temporary_gpgwrapper() as gpg: +                    with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:                          gpg.import_keys(oldkey.key_data)                          gpg.import_keys(key.key_data)                          gpgkey = gpg.list_keys(secret=key.private).pop() @@ -577,24 +565,7 @@ class OpenPGPScheme(EncryptionScheme):          :return: An instance of the key.          :rtype: OpenPGPKey          """ -        expiry_date = None -        if key['expires']: -            expiry_date = datetime.fromtimestamp(int(key['expires'])) -        address = [] -        for uid in key['uids']: -            address.append(_parse_address(uid)) - -        return OpenPGPKey( -            address, -            gpgbinary=self._gpgbinary, -            key_id=key['keyid'], -            fingerprint=key['fingerprint'], -            key_data=key_data, -            private=True if key['type'] == 'sec' else False, -            length=int(key['length']), -            expiry_date=expiry_date, -            refreshed_at=datetime.now(), -        ) +        return build_gpg_key(key, key_data, self._gpgbinary)      def delete_key(self, key):          """ @@ -654,21 +625,6 @@ class OpenPGPScheme(EncryptionScheme):      # Data encryption, decryption, signing and verifying      # -    def _temporary_gpgwrapper(self, keys=None): -        """ -        Return a gpg wrapper that implements the context manager protocol and -        contains C{keys}. - -        :param keys: keys to conform the keyring. -        :type key: list(OpenPGPKey) - -        :return: a TempGPGWrapper instance -        :rtype: TempGPGWrapper -        """ -        # TODO do here checks on key_data -        return TempGPGWrapper( -            keys=keys, gpgbinary=self._gpgbinary) -      @staticmethod      def _assert_gpg_result_ok(result):          """ @@ -713,7 +669,7 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert_type(sign, OpenPGPKey)              leap_assert(sign.private is True)              keys.append(sign) -        with self._temporary_gpgwrapper(keys) as gpg: +        with TempGPGWrapper(keys, self._gpgbinary) as gpg:              result = gpg.encrypt(                  data, pubkey.fingerprint,                  default_key=sign.key_id if sign else None, @@ -755,7 +711,7 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert_type(verify, OpenPGPKey)              leap_assert(verify.private is False)              keys.append(verify) -        with self._temporary_gpgwrapper(keys) as gpg: +        with TempGPGWrapper(keys, self._gpgbinary) as gpg:              try:                  result = gpg.decrypt(                      data, passphrase=passphrase, always_trust=True) @@ -783,7 +739,7 @@ class OpenPGPScheme(EncryptionScheme):          :return: Whether C{data} was encrypted using this wrapper.          :rtype: bool          """ -        with self._temporary_gpgwrapper() as gpg: +        with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:              gpgutil = GPGUtilities(gpg)              return gpgutil.is_encrypted_asym(data) @@ -814,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme):          # result.fingerprint - contains the fingerprint of the key used to          #                      sign. -        with self._temporary_gpgwrapper(privkey) as gpg: +        with TempGPGWrapper(privkey, self._gpgbinary) as gpg:              result = gpg.sign(data, default_key=privkey.key_id,                                digest_algo=digest_algo, clearsign=clearsign,                                detach=detach, binary=binary) @@ -849,7 +805,7 @@ class OpenPGPScheme(EncryptionScheme):          """          leap_assert_type(pubkey, OpenPGPKey)          leap_assert(pubkey.private is False) -        with self._temporary_gpgwrapper(pubkey) as gpg: +        with TempGPGWrapper(pubkey, self._gpgbinary) as gpg:              result = None              if detached_sig is None:                  result = gpg.verify(data) @@ -867,3 +823,35 @@ class OpenPGPScheme(EncryptionScheme):              rfprint = result.fingerprint              kfprint = gpgpubkey['fingerprint']              return valid and rfprint == kfprint + + +def process_ascii_key(key_data, gpgbinary, secret=False): +    with TempGPGWrapper(gpgbinary=gpgbinary) as gpg: +        try: +            gpg.import_keys(key_data) +            info = gpg.list_keys(secret=secret).pop() +            key = gpg.export_keys(info['fingerprint'], secret=secret) +        except IndexError: +            info = {} +            key = None +    return info, key + + +def build_gpg_key(key_info, key_data, gpgbinary=None): +    expiry_date = None +    if key_info['expires']: +        expiry_date = datetime.fromtimestamp(int(key_info['expires'])) +    address = [] +    for uid in key_info['uids']: +        address.append(_parse_address(uid)) + +    return OpenPGPKey( +        address, +        gpgbinary=gpgbinary, +        key_id=key_info['keyid'], +        fingerprint=key_info['fingerprint'], +        key_data=key_data, +        private=True if key_info['type'] == 'sec' else False, +        length=int(key_info['length']), +        expiry_date=expiry_date, +        refreshed_at=datetime.now())  | 
