From cce42536ac2c7588a9df2f6e012bb8991f8bbd75 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 10 Sep 2015 10:36:53 -0400 Subject: [refactor] refactor key parsing so that it can be tested without needing to instantiate the whole OpenPGPScheme object, that receives a soledad instance. --- src/leap/keymanager/openpgp.py | 130 +++++++++++++++++++---------------------- 1 file changed, 59 insertions(+), 71 deletions(-) (limited to 'src/leap') diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index b8b47d0..069a78e 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # openpgp.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013-2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -251,7 +251,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(is_address(address), 'Not an user address: %s' % address) def _gen_key(_): - with self._temporary_gpgwrapper() as gpg: + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: # TODO: inspect result, or use decorator params = gpg.gen_key_input( key_type='RSA', @@ -348,37 +348,25 @@ class OpenPGPScheme(EncryptionScheme): # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - with self._temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - gpg.import_keys(key_data) - privkey = None - pubkey = None + priv_info, privkey = process_ascii_key( + key_data, self._gpgbinary, secret=True) + pub_info, pubkey = process_ascii_key( + key_data, self._gpgbinary, secret=False) - try: - privkey = gpg.list_keys(secret=True).pop() - except IndexError: - pass - try: - pubkey = gpg.list_keys(secret=False).pop() # unitary keyring - except IndexError: - return (None, None) - - openpgp_privkey = None - if privkey is not None: - # build private key - openpgp_privkey = self._build_key_from_gpg( - privkey, - gpg.export_keys(privkey['fingerprint'], secret=True)) - leap_check(pubkey['fingerprint'] == privkey['fingerprint'], - 'Fingerprints for public and private key differ.', - errors.KeyFingerprintMismatch) - - # build public key - openpgp_pubkey = self._build_key_from_gpg( - pubkey, - gpg.export_keys(pubkey['fingerprint'], secret=False)) - - return (openpgp_pubkey, openpgp_privkey) + if not pubkey: + return (None, None) + + openpgp_privkey = None + if privkey: + # build private key + openpgp_privkey = self._build_key_from_gpg(priv_info, privkey) + leap_check(pub_info['fingerprint'] == priv_info['fingerprint'], + 'Fingerprints for public and private key differ.', + errors.KeyFingerprintMismatch) + # build public key + openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey) + + return (openpgp_pubkey, openpgp_privkey) def put_ascii_key(self, key_data, address): """ @@ -439,7 +427,7 @@ class OpenPGPScheme(EncryptionScheme): oldkey = build_key_from_dict(OpenPGPKey, doc.content) if key.fingerprint == oldkey.fingerprint: # in case of an update of the key merge them with gnupg - with self._temporary_gpgwrapper() as gpg: + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: gpg.import_keys(oldkey.key_data) gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() @@ -577,24 +565,7 @@ class OpenPGPScheme(EncryptionScheme): :return: An instance of the key. :rtype: OpenPGPKey """ - expiry_date = None - if key['expires']: - expiry_date = datetime.fromtimestamp(int(key['expires'])) - address = [] - for uid in key['uids']: - address.append(_parse_address(uid)) - - return OpenPGPKey( - address, - gpgbinary=self._gpgbinary, - key_id=key['keyid'], - fingerprint=key['fingerprint'], - key_data=key_data, - private=True if key['type'] == 'sec' else False, - length=int(key['length']), - expiry_date=expiry_date, - refreshed_at=datetime.now(), - ) + return build_gpg_key(key, key_data, self._gpgbinary) def delete_key(self, key): """ @@ -654,21 +625,6 @@ class OpenPGPScheme(EncryptionScheme): # Data encryption, decryption, signing and verifying # - def _temporary_gpgwrapper(self, keys=None): - """ - Return a gpg wrapper that implements the context manager protocol and - contains C{keys}. - - :param keys: keys to conform the keyring. - :type key: list(OpenPGPKey) - - :return: a TempGPGWrapper instance - :rtype: TempGPGWrapper - """ - # TODO do here checks on key_data - return TempGPGWrapper( - keys=keys, gpgbinary=self._gpgbinary) - @staticmethod def _assert_gpg_result_ok(result): """ @@ -713,7 +669,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(sign, OpenPGPKey) leap_assert(sign.private is True) keys.append(sign) - with self._temporary_gpgwrapper(keys) as gpg: + with TempGPGWrapper(keys, self._gpgbinary) as gpg: result = gpg.encrypt( data, pubkey.fingerprint, default_key=sign.key_id if sign else None, @@ -755,7 +711,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(verify, OpenPGPKey) leap_assert(verify.private is False) keys.append(verify) - with self._temporary_gpgwrapper(keys) as gpg: + with TempGPGWrapper(keys, self._gpgbinary) as gpg: try: result = gpg.decrypt( data, passphrase=passphrase, always_trust=True) @@ -783,7 +739,7 @@ class OpenPGPScheme(EncryptionScheme): :return: Whether C{data} was encrypted using this wrapper. :rtype: bool """ - with self._temporary_gpgwrapper() as gpg: + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: gpgutil = GPGUtilities(gpg) return gpgutil.is_encrypted_asym(data) @@ -814,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme): # result.fingerprint - contains the fingerprint of the key used to # sign. - with self._temporary_gpgwrapper(privkey) as gpg: + with TempGPGWrapper(privkey, self._gpgbinary) as gpg: result = gpg.sign(data, default_key=privkey.key_id, digest_algo=digest_algo, clearsign=clearsign, detach=detach, binary=binary) @@ -849,7 +805,7 @@ class OpenPGPScheme(EncryptionScheme): """ leap_assert_type(pubkey, OpenPGPKey) leap_assert(pubkey.private is False) - with self._temporary_gpgwrapper(pubkey) as gpg: + with TempGPGWrapper(pubkey, self._gpgbinary) as gpg: result = None if detached_sig is None: result = gpg.verify(data) @@ -867,3 +823,35 @@ class OpenPGPScheme(EncryptionScheme): rfprint = result.fingerprint kfprint = gpgpubkey['fingerprint'] return valid and rfprint == kfprint + + +def process_ascii_key(key_data, gpgbinary, secret=False): + with TempGPGWrapper(gpgbinary=gpgbinary) as gpg: + try: + gpg.import_keys(key_data) + info = gpg.list_keys(secret=secret).pop() + key = gpg.export_keys(info['fingerprint'], secret=secret) + except IndexError: + info = {} + key = None + return info, key + + +def build_gpg_key(key_info, key_data, gpgbinary=None): + expiry_date = None + if key_info['expires']: + expiry_date = datetime.fromtimestamp(int(key_info['expires'])) + address = [] + for uid in key_info['uids']: + address.append(_parse_address(uid)) + + return OpenPGPKey( + address, + gpgbinary=gpgbinary, + key_id=key_info['keyid'], + fingerprint=key_info['fingerprint'], + key_data=key_data, + private=True if key_info['type'] == 'sec' else False, + length=int(key_info['length']), + expiry_date=expiry_date, + refreshed_at=datetime.now()) -- cgit v1.2.3