From 64ad9d1d18e0e5234b065541058fdd71d9a90a3c Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Sun, 1 May 2016 14:27:15 -0300 Subject: [refactor] move TempGPGWrapper to it's own file --- src/leap/keymanager/openpgp.py | 114 +---------------------------------------- 1 file changed, 1 insertion(+), 113 deletions(-) (limited to 'src/leap/keymanager/openpgp.py') diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 697ee8a..301c46c 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -20,7 +20,6 @@ Infrastructure for using OpenPGP keys in Key Manager. import logging import os import re -import shutil import tempfile import traceback import io @@ -28,13 +27,13 @@ import io from datetime import datetime from multiprocessing import cpu_count -from gnupg import GPG from gnupg.gnupg import GPGUtilities from twisted.internet import defer from twisted.internet.threads import deferToThread from leap.common.check import leap_assert, leap_assert_type, leap_check from leap.keymanager import errors +from leap.keymanager.wrapper import TempGPGWrapper from leap.keymanager.keys import ( EncryptionKey, init_indexes, @@ -71,117 +70,6 @@ def from_thread(func, *args, **kwargs): return cpu_core_semaphore.run(call) -class TempGPGWrapper(object): - """ - A context manager that wraps a temporary GPG keyring which only contains - the keys given at object creation. - """ - - def __init__(self, keys=None, gpgbinary=None): - """ - Create an empty temporary keyring and import any given C{keys} into - it. - - :param keys: OpenPGP key, or list of. - :type keys: OpenPGPKey or list of OpenPGPKeys - :param gpgbinary: Name for GnuPG binary executable. - :type gpgbinary: C{str} - """ - self._gpg = None - self._gpgbinary = gpgbinary - if not keys: - keys = list() - if not isinstance(keys, list): - keys = [keys] - self._keys = keys - for key in keys: - leap_assert_type(key, OpenPGPKey) - - def __enter__(self): - """ - Build and return a GPG keyring containing the keys given on - object creation. - - :return: A GPG instance containing the keys given on object creation. - :rtype: gnupg.GPG - """ - self._build_keyring() - return self._gpg - - def __exit__(self, exc_type, exc_value, traceback): - """ - Ensure the gpg is properly destroyed. - """ - # TODO handle exceptions and log here - self._destroy_keyring() - - def _build_keyring(self): - """ - Create a GPG keyring containing the keys given on object creation. - - :return: A GPG instance containing the keys given on object creation. - :rtype: gnupg.GPG - """ - privkeys = [key for key in self._keys if key and key.private is True] - publkeys = [key for key in self._keys if key and key.private is False] - # here we filter out public keys that have a correspondent - # private key in the list because the private key_data by - # itself is enough to also have the public key in the keyring, - # and we want to count the keys afterwards. - - privfps = map(lambda privkey: privkey.fingerprint, privkeys) - publkeys = filter( - lambda pubkey: pubkey.fingerprint not in privfps, publkeys) - - listkeys = lambda: self._gpg.list_keys() - listsecretkeys = lambda: self._gpg.list_keys(secret=True) - - self._gpg = GPG(binary=self._gpgbinary, - homedir=tempfile.mkdtemp()) - leap_assert(len(listkeys()) is 0, 'Keyring not empty.') - - # import keys into the keyring: - # concatenating ascii-armored keys, which is correctly - # understood by GPG. - - self._gpg.import_keys("".join( - [x.key_data for x in publkeys + privkeys])) - - # assert the number of keys in the keyring - leap_assert( - len(listkeys()) == len(publkeys) + len(privkeys), - 'Wrong number of public keys in keyring: %d, should be %d)' % - (len(listkeys()), len(publkeys) + len(privkeys))) - leap_assert( - len(listsecretkeys()) == len(privkeys), - 'Wrong number of private keys in keyring: %d, should be %d)' % - (len(listsecretkeys()), len(privkeys))) - - def _destroy_keyring(self): - """ - Securely erase the keyring. - """ - # TODO: implement some kind of wiping of data or a more - # secure way that - # does not write to disk. - - try: - for secret in [True, False]: - for key in self._gpg.list_keys(secret=secret): - self._gpg.delete_keys( - key['fingerprint'], - secret=secret) - leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!') - - except: - raise - - finally: - leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'), - "watch out! Tried to remove default gnupg home!") - shutil.rmtree(self._gpg.homedir) - - def _parse_address(address): """ Remove name, '<', '>' and the identity suffix after the '+' until the '@' -- cgit v1.2.3