summaryrefslogtreecommitdiff
path: root/keymanager/src/leap/keymanager/openpgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'keymanager/src/leap/keymanager/openpgp.py')
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py114
1 files changed, 1 insertions, 113 deletions
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index 697ee8a..301c46c 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -20,7 +20,6 @@ Infrastructure for using OpenPGP keys in Key Manager.
import logging
import os
import re
-import shutil
import tempfile
import traceback
import io
@@ -28,13 +27,13 @@ import io
from datetime import datetime
from multiprocessing import cpu_count
-from gnupg import GPG
from gnupg.gnupg import GPGUtilities
from twisted.internet import defer
from twisted.internet.threads import deferToThread
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
from leap.keymanager.keys import (
EncryptionKey,
init_indexes,
@@ -71,117 +70,6 @@ def from_thread(func, *args, **kwargs):
return cpu_core_semaphore.run(call)
-class TempGPGWrapper(object):
- """
- A context manager that wraps a temporary GPG keyring which only contains
- the keys given at object creation.
- """
-
- def __init__(self, keys=None, gpgbinary=None):
- """
- Create an empty temporary keyring and import any given C{keys} into
- it.
-
- :param keys: OpenPGP key, or list of.
- :type keys: OpenPGPKey or list of OpenPGPKeys
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
- """
- self._gpg = None
- self._gpgbinary = gpgbinary
- if not keys:
- keys = list()
- if not isinstance(keys, list):
- keys = [keys]
- self._keys = keys
- for key in keys:
- leap_assert_type(key, OpenPGPKey)
-
- def __enter__(self):
- """
- Build and return a GPG keyring containing the keys given on
- object creation.
-
- :return: A GPG instance containing the keys given on object creation.
- :rtype: gnupg.GPG
- """
- self._build_keyring()
- return self._gpg
-
- def __exit__(self, exc_type, exc_value, traceback):
- """
- Ensure the gpg is properly destroyed.
- """
- # TODO handle exceptions and log here
- self._destroy_keyring()
-
- def _build_keyring(self):
- """
- Create a GPG keyring containing the keys given on object creation.
-
- :return: A GPG instance containing the keys given on object creation.
- :rtype: gnupg.GPG
- """
- privkeys = [key for key in self._keys if key and key.private is True]
- publkeys = [key for key in self._keys if key and key.private is False]
- # here we filter out public keys that have a correspondent
- # private key in the list because the private key_data by
- # itself is enough to also have the public key in the keyring,
- # and we want to count the keys afterwards.
-
- privfps = map(lambda privkey: privkey.fingerprint, privkeys)
- publkeys = filter(
- lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
-
- listkeys = lambda: self._gpg.list_keys()
- listsecretkeys = lambda: self._gpg.list_keys(secret=True)
-
- self._gpg = GPG(binary=self._gpgbinary,
- homedir=tempfile.mkdtemp())
- leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
-
- # import keys into the keyring:
- # concatenating ascii-armored keys, which is correctly
- # understood by GPG.
-
- self._gpg.import_keys("".join(
- [x.key_data for x in publkeys + privkeys]))
-
- # assert the number of keys in the keyring
- leap_assert(
- len(listkeys()) == len(publkeys) + len(privkeys),
- 'Wrong number of public keys in keyring: %d, should be %d)' %
- (len(listkeys()), len(publkeys) + len(privkeys)))
- leap_assert(
- len(listsecretkeys()) == len(privkeys),
- 'Wrong number of private keys in keyring: %d, should be %d)' %
- (len(listsecretkeys()), len(privkeys)))
-
- def _destroy_keyring(self):
- """
- Securely erase the keyring.
- """
- # TODO: implement some kind of wiping of data or a more
- # secure way that
- # does not write to disk.
-
- try:
- for secret in [True, False]:
- for key in self._gpg.list_keys(secret=secret):
- self._gpg.delete_keys(
- key['fingerprint'],
- secret=secret)
- leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
-
- except:
- raise
-
- finally:
- leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
- "watch out! Tried to remove default gnupg home!")
- shutil.rmtree(self._gpg.homedir)
-
-
def _parse_address(address):
"""
Remove name, '<', '>' and the identity suffix after the '+' until the '@'