diff options
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 114 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/wrapper.py | 134 | 
2 files changed, 135 insertions, 113 deletions
| diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 697ee8a..301c46c 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -20,7 +20,6 @@ Infrastructure for using OpenPGP keys in Key Manager.  import logging  import os  import re -import shutil  import tempfile  import traceback  import io @@ -28,13 +27,13 @@ import io  from datetime import datetime  from multiprocessing import cpu_count -from gnupg import GPG  from gnupg.gnupg import GPGUtilities  from twisted.internet import defer  from twisted.internet.threads import deferToThread  from leap.common.check import leap_assert, leap_assert_type, leap_check  from leap.keymanager import errors +from leap.keymanager.wrapper import TempGPGWrapper  from leap.keymanager.keys import (      EncryptionKey,      init_indexes, @@ -71,117 +70,6 @@ def from_thread(func, *args, **kwargs):      return cpu_core_semaphore.run(call) -class TempGPGWrapper(object): -    """ -    A context manager that wraps a temporary GPG keyring which only contains -    the keys given at object creation. -    """ - -    def __init__(self, keys=None, gpgbinary=None): -        """ -        Create an empty temporary keyring and import any given C{keys} into -        it. - -        :param keys: OpenPGP key, or list of. -        :type keys: OpenPGPKey or list of OpenPGPKeys -        :param gpgbinary: Name for GnuPG binary executable. -        :type gpgbinary: C{str} -        """ -        self._gpg = None -        self._gpgbinary = gpgbinary -        if not keys: -            keys = list() -        if not isinstance(keys, list): -            keys = [keys] -        self._keys = keys -        for key in keys: -            leap_assert_type(key, OpenPGPKey) - -    def __enter__(self): -        """ -        Build and return a GPG keyring containing the keys given on -        object creation. - -        :return: A GPG instance containing the keys given on object creation. -        :rtype: gnupg.GPG -        """ -        self._build_keyring() -        return self._gpg - -    def __exit__(self, exc_type, exc_value, traceback): -        """ -        Ensure the gpg is properly destroyed. -        """ -        # TODO handle exceptions and log here -        self._destroy_keyring() - -    def _build_keyring(self): -        """ -        Create a GPG keyring containing the keys given on object creation. - -        :return: A GPG instance containing the keys given on object creation. -        :rtype: gnupg.GPG -        """ -        privkeys = [key for key in self._keys if key and key.private is True] -        publkeys = [key for key in self._keys if key and key.private is False] -        # here we filter out public keys that have a correspondent -        # private key in the list because the private key_data by -        # itself is enough to also have the public key in the keyring, -        # and we want to count the keys afterwards. - -        privfps = map(lambda privkey: privkey.fingerprint, privkeys) -        publkeys = filter( -            lambda pubkey: pubkey.fingerprint not in privfps, publkeys) - -        listkeys = lambda: self._gpg.list_keys() -        listsecretkeys = lambda: self._gpg.list_keys(secret=True) - -        self._gpg = GPG(binary=self._gpgbinary, -                        homedir=tempfile.mkdtemp()) -        leap_assert(len(listkeys()) is 0, 'Keyring not empty.') - -        # import keys into the keyring: -        # concatenating ascii-armored keys, which is correctly -        # understood by GPG. - -        self._gpg.import_keys("".join( -            [x.key_data for x in publkeys + privkeys])) - -        # assert the number of keys in the keyring -        leap_assert( -            len(listkeys()) == len(publkeys) + len(privkeys), -            'Wrong number of public keys in keyring: %d, should be %d)' % -            (len(listkeys()), len(publkeys) + len(privkeys))) -        leap_assert( -            len(listsecretkeys()) == len(privkeys), -            'Wrong number of private keys in keyring: %d, should be %d)' % -            (len(listsecretkeys()), len(privkeys))) - -    def _destroy_keyring(self): -        """ -        Securely erase the keyring. -        """ -        # TODO: implement some kind of wiping of data or a more -        # secure way that -        # does not write to disk. - -        try: -            for secret in [True, False]: -                for key in self._gpg.list_keys(secret=secret): -                    self._gpg.delete_keys( -                        key['fingerprint'], -                        secret=secret) -            leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!') - -        except: -            raise - -        finally: -            leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'), -                        "watch out! Tried to remove default gnupg home!") -            shutil.rmtree(self._gpg.homedir) - -  def _parse_address(address):      """      Remove name, '<', '>' and the identity suffix after the '+' until the '@' diff --git a/keymanager/src/leap/keymanager/wrapper.py b/keymanager/src/leap/keymanager/wrapper.py new file mode 100644 index 0000000..4f36cec --- /dev/null +++ b/keymanager/src/leap/keymanager/wrapper.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# wrapper.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +GPG wrapper for temporary keyrings +""" +import os +import shutil +import tempfile +from gnupg import GPG + +from leap.common.check import leap_assert + + +class TempGPGWrapper(object): +    """ +    A context manager that wraps a temporary GPG keyring which only contains +    the keys given at object creation. +    """ + +    def __init__(self, keys=None, gpgbinary=None): +        """ +        Create an empty temporary keyring and import any given C{keys} into +        it. + +        :param keys: OpenPGP key, or list of. +        :type keys: OpenPGPKey or list of OpenPGPKeys +        :param gpgbinary: Name for GnuPG binary executable. +        :type gpgbinary: C{str} +        """ +        self._gpg = None +        self._gpgbinary = gpgbinary +        if not keys: +            keys = list() +        if not isinstance(keys, list): +            keys = [keys] +        self._keys = keys + +    def __enter__(self): +        """ +        Build and return a GPG keyring containing the keys given on +        object creation. + +        :return: A GPG instance containing the keys given on object creation. +        :rtype: gnupg.GPG +        """ +        self._build_keyring() +        return self._gpg + +    def __exit__(self, exc_type, exc_value, traceback): +        """ +        Ensure the gpg is properly destroyed. +        """ +        # TODO handle exceptions and log here +        self._destroy_keyring() + +    def _build_keyring(self): +        """ +        Create a GPG keyring containing the keys given on object creation. + +        :return: A GPG instance containing the keys given on object creation. +        :rtype: gnupg.GPG +        """ +        privkeys = [key for key in self._keys if key and key.private is True] +        publkeys = [key for key in self._keys if key and key.private is False] +        # here we filter out public keys that have a correspondent +        # private key in the list because the private key_data by +        # itself is enough to also have the public key in the keyring, +        # and we want to count the keys afterwards. + +        privfps = map(lambda privkey: privkey.fingerprint, privkeys) +        publkeys = filter( +            lambda pubkey: pubkey.fingerprint not in privfps, publkeys) + +        listkeys = lambda: self._gpg.list_keys() +        listsecretkeys = lambda: self._gpg.list_keys(secret=True) + +        self._gpg = GPG(binary=self._gpgbinary, +                        homedir=tempfile.mkdtemp()) +        leap_assert(len(listkeys()) is 0, 'Keyring not empty.') + +        # import keys into the keyring: +        # concatenating ascii-armored keys, which is correctly +        # understood by GPG. + +        self._gpg.import_keys("".join( +            [x.key_data for x in publkeys + privkeys])) + +        # assert the number of keys in the keyring +        leap_assert( +            len(listkeys()) == len(publkeys) + len(privkeys), +            'Wrong number of public keys in keyring: %d, should be %d)' % +            (len(listkeys()), len(publkeys) + len(privkeys))) +        leap_assert( +            len(listsecretkeys()) == len(privkeys), +            'Wrong number of private keys in keyring: %d, should be %d)' % +            (len(listsecretkeys()), len(privkeys))) + +    def _destroy_keyring(self): +        """ +        Securely erase the keyring. +        """ +        # TODO: implement some kind of wiping of data or a more +        # secure way that +        # does not write to disk. + +        try: +            for secret in [True, False]: +                for key in self._gpg.list_keys(secret=secret): +                    self._gpg.delete_keys( +                        key['fingerprint'], +                        secret=secret) +            leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!') + +        except: +            raise + +        finally: +            leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'), +                        "watch out! Tried to remove default gnupg home!") +            shutil.rmtree(self._gpg.homedir) | 
