diff options
author | Kali Kaneko <kali@leap.se> | 2015-09-10 10:36:53 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-09-24 12:36:16 -0400 |
commit | f3158ed7a792b7c02b2f89fd90aefa708882557f (patch) | |
tree | 75058fc10aa336a68d9bb61217a6e97533065b4c /keymanager | |
parent | 1b8847b244586dec237949d477c474bc77c8264c (diff) |
[refactor] refactor key parsing
so that it can be tested without needing to instantiate the whole
OpenPGPScheme object, that receives a soledad instance.
Diffstat (limited to 'keymanager')
-rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 130 |
1 files changed, 59 insertions, 71 deletions
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index b8b47d02..069a78e3 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # openpgp.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013-2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -251,7 +251,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(is_address(address), 'Not an user address: %s' % address) def _gen_key(_): - with self._temporary_gpgwrapper() as gpg: + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: # TODO: inspect result, or use decorator params = gpg.gen_key_input( key_type='RSA', @@ -348,37 +348,25 @@ class OpenPGPScheme(EncryptionScheme): # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - with self._temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - gpg.import_keys(key_data) - privkey = None - pubkey = None + priv_info, privkey = process_ascii_key( + key_data, self._gpgbinary, secret=True) + pub_info, pubkey = process_ascii_key( + key_data, self._gpgbinary, secret=False) - try: - privkey = gpg.list_keys(secret=True).pop() - except IndexError: - pass - try: - pubkey = gpg.list_keys(secret=False).pop() # unitary keyring - except IndexError: - return (None, None) - - openpgp_privkey = None - if privkey is not None: - # build private key - openpgp_privkey = self._build_key_from_gpg( - privkey, - gpg.export_keys(privkey['fingerprint'], secret=True)) - leap_check(pubkey['fingerprint'] == privkey['fingerprint'], - 'Fingerprints for public and private key differ.', - errors.KeyFingerprintMismatch) - - # build public key - openpgp_pubkey = self._build_key_from_gpg( - pubkey, - gpg.export_keys(pubkey['fingerprint'], secret=False)) - - return (openpgp_pubkey, openpgp_privkey) + if not pubkey: + return (None, None) + + openpgp_privkey = None + if privkey: + # build private key + openpgp_privkey = self._build_key_from_gpg(priv_info, privkey) + leap_check(pub_info['fingerprint'] == priv_info['fingerprint'], + 'Fingerprints for public and private key differ.', + errors.KeyFingerprintMismatch) + # build public key + openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey) + + return (openpgp_pubkey, openpgp_privkey) def put_ascii_key(self, key_data, address): """ @@ -439,7 +427,7 @@ class OpenPGPScheme(EncryptionScheme): oldkey = build_key_from_dict(OpenPGPKey, doc.content) if key.fingerprint == oldkey.fingerprint: # in case of an update of the key merge them with gnupg - with self._temporary_gpgwrapper() as gpg: + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: gpg.import_keys(oldkey.key_data) gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() @@ -577,24 +565,7 @@ class OpenPGPScheme(EncryptionScheme): :return: An instance of the key. :rtype: OpenPGPKey """ - expiry_date = None - if key['expires']: - expiry_date = datetime.fromtimestamp(int(key['expires'])) - address = [] - for uid in key['uids']: - address.append(_parse_address(uid)) - - return OpenPGPKey( - address, - gpgbinary=self._gpgbinary, - key_id=key['keyid'], - fingerprint=key['fingerprint'], - key_data=key_data, - private=True if key['type'] == 'sec' else False, - length=int(key['length']), - expiry_date=expiry_date, - refreshed_at=datetime.now(), - ) + return build_gpg_key(key, key_data, self._gpgbinary) def delete_key(self, key): """ @@ -654,21 +625,6 @@ class OpenPGPScheme(EncryptionScheme): # Data encryption, decryption, signing and verifying # - def _temporary_gpgwrapper(self, keys=None): - """ - Return a gpg wrapper that implements the context manager protocol and - contains C{keys}. - - :param keys: keys to conform the keyring. - :type key: list(OpenPGPKey) - - :return: a TempGPGWrapper instance - :rtype: TempGPGWrapper - """ - # TODO do here checks on key_data - return TempGPGWrapper( - keys=keys, gpgbinary=self._gpgbinary) - @staticmethod def _assert_gpg_result_ok(result): """ @@ -713,7 +669,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(sign, OpenPGPKey) leap_assert(sign.private is True) keys.append(sign) - with self._temporary_gpgwrapper(keys) as gpg: + with TempGPGWrapper(keys, self._gpgbinary) as gpg: result = gpg.encrypt( data, pubkey.fingerprint, default_key=sign.key_id if sign else None, @@ -755,7 +711,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(verify, OpenPGPKey) leap_assert(verify.private is False) keys.append(verify) - with self._temporary_gpgwrapper(keys) as gpg: + with TempGPGWrapper(keys, self._gpgbinary) as gpg: try: result = gpg.decrypt( data, passphrase=passphrase, always_trust=True) @@ -783,7 +739,7 @@ class OpenPGPScheme(EncryptionScheme): :return: Whether C{data} was encrypted using this wrapper. :rtype: bool """ - with self._temporary_gpgwrapper() as gpg: + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: gpgutil = GPGUtilities(gpg) return gpgutil.is_encrypted_asym(data) @@ -814,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme): # result.fingerprint - contains the fingerprint of the key used to # sign. - with self._temporary_gpgwrapper(privkey) as gpg: + with TempGPGWrapper(privkey, self._gpgbinary) as gpg: result = gpg.sign(data, default_key=privkey.key_id, digest_algo=digest_algo, clearsign=clearsign, detach=detach, binary=binary) @@ -849,7 +805,7 @@ class OpenPGPScheme(EncryptionScheme): """ leap_assert_type(pubkey, OpenPGPKey) leap_assert(pubkey.private is False) - with self._temporary_gpgwrapper(pubkey) as gpg: + with TempGPGWrapper(pubkey, self._gpgbinary) as gpg: result = None if detached_sig is None: result = gpg.verify(data) @@ -867,3 +823,35 @@ class OpenPGPScheme(EncryptionScheme): rfprint = result.fingerprint kfprint = gpgpubkey['fingerprint'] return valid and rfprint == kfprint + + +def process_ascii_key(key_data, gpgbinary, secret=False): + with TempGPGWrapper(gpgbinary=gpgbinary) as gpg: + try: + gpg.import_keys(key_data) + info = gpg.list_keys(secret=secret).pop() + key = gpg.export_keys(info['fingerprint'], secret=secret) + except IndexError: + info = {} + key = None + return info, key + + +def build_gpg_key(key_info, key_data, gpgbinary=None): + expiry_date = None + if key_info['expires']: + expiry_date = datetime.fromtimestamp(int(key_info['expires'])) + address = [] + for uid in key_info['uids']: + address.append(_parse_address(uid)) + + return OpenPGPKey( + address, + gpgbinary=gpgbinary, + key_id=key_info['keyid'], + fingerprint=key_info['fingerprint'], + key_data=key_data, + private=True if key_info['type'] == 'sec' else False, + length=int(key_info['length']), + expiry_date=expiry_date, + refreshed_at=datetime.now()) |