summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2016-05-01 14:27:15 -0300
committerRuben Pollan <meskio@sindominio.net>2016-06-01 18:49:15 +0200
commit64ad9d1d18e0e5234b065541058fdd71d9a90a3c (patch)
tree71b0271aca733571a7f1e0453c93d335d1989cc5
parent2ea532d876584982b4c0939f72e40913d19c2b00 (diff)
[refactor] move TempGPGWrapper to it's own file
-rw-r--r--src/leap/keymanager/openpgp.py114
-rw-r--r--src/leap/keymanager/wrapper.py134
2 files changed, 135 insertions, 113 deletions
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 697ee8a..301c46c 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -20,7 +20,6 @@ Infrastructure for using OpenPGP keys in Key Manager.
import logging
import os
import re
-import shutil
import tempfile
import traceback
import io
@@ -28,13 +27,13 @@ import io
from datetime import datetime
from multiprocessing import cpu_count
-from gnupg import GPG
from gnupg.gnupg import GPGUtilities
from twisted.internet import defer
from twisted.internet.threads import deferToThread
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
from leap.keymanager.keys import (
EncryptionKey,
init_indexes,
@@ -71,117 +70,6 @@ def from_thread(func, *args, **kwargs):
return cpu_core_semaphore.run(call)
-class TempGPGWrapper(object):
- """
- A context manager that wraps a temporary GPG keyring which only contains
- the keys given at object creation.
- """
-
- def __init__(self, keys=None, gpgbinary=None):
- """
- Create an empty temporary keyring and import any given C{keys} into
- it.
-
- :param keys: OpenPGP key, or list of.
- :type keys: OpenPGPKey or list of OpenPGPKeys
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
- """
- self._gpg = None
- self._gpgbinary = gpgbinary
- if not keys:
- keys = list()
- if not isinstance(keys, list):
- keys = [keys]
- self._keys = keys
- for key in keys:
- leap_assert_type(key, OpenPGPKey)
-
- def __enter__(self):
- """
- Build and return a GPG keyring containing the keys given on
- object creation.
-
- :return: A GPG instance containing the keys given on object creation.
- :rtype: gnupg.GPG
- """
- self._build_keyring()
- return self._gpg
-
- def __exit__(self, exc_type, exc_value, traceback):
- """
- Ensure the gpg is properly destroyed.
- """
- # TODO handle exceptions and log here
- self._destroy_keyring()
-
- def _build_keyring(self):
- """
- Create a GPG keyring containing the keys given on object creation.
-
- :return: A GPG instance containing the keys given on object creation.
- :rtype: gnupg.GPG
- """
- privkeys = [key for key in self._keys if key and key.private is True]
- publkeys = [key for key in self._keys if key and key.private is False]
- # here we filter out public keys that have a correspondent
- # private key in the list because the private key_data by
- # itself is enough to also have the public key in the keyring,
- # and we want to count the keys afterwards.
-
- privfps = map(lambda privkey: privkey.fingerprint, privkeys)
- publkeys = filter(
- lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
-
- listkeys = lambda: self._gpg.list_keys()
- listsecretkeys = lambda: self._gpg.list_keys(secret=True)
-
- self._gpg = GPG(binary=self._gpgbinary,
- homedir=tempfile.mkdtemp())
- leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
-
- # import keys into the keyring:
- # concatenating ascii-armored keys, which is correctly
- # understood by GPG.
-
- self._gpg.import_keys("".join(
- [x.key_data for x in publkeys + privkeys]))
-
- # assert the number of keys in the keyring
- leap_assert(
- len(listkeys()) == len(publkeys) + len(privkeys),
- 'Wrong number of public keys in keyring: %d, should be %d)' %
- (len(listkeys()), len(publkeys) + len(privkeys)))
- leap_assert(
- len(listsecretkeys()) == len(privkeys),
- 'Wrong number of private keys in keyring: %d, should be %d)' %
- (len(listsecretkeys()), len(privkeys)))
-
- def _destroy_keyring(self):
- """
- Securely erase the keyring.
- """
- # TODO: implement some kind of wiping of data or a more
- # secure way that
- # does not write to disk.
-
- try:
- for secret in [True, False]:
- for key in self._gpg.list_keys(secret=secret):
- self._gpg.delete_keys(
- key['fingerprint'],
- secret=secret)
- leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
-
- except:
- raise
-
- finally:
- leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
- "watch out! Tried to remove default gnupg home!")
- shutil.rmtree(self._gpg.homedir)
-
-
def _parse_address(address):
"""
Remove name, '<', '>' and the identity suffix after the '+' until the '@'
diff --git a/src/leap/keymanager/wrapper.py b/src/leap/keymanager/wrapper.py
new file mode 100644
index 0000000..4f36cec
--- /dev/null
+++ b/src/leap/keymanager/wrapper.py
@@ -0,0 +1,134 @@
+# -*- coding: utf-8 -*-
+# wrapper.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+GPG wrapper for temporary keyrings
+"""
+import os
+import shutil
+import tempfile
+from gnupg import GPG
+
+from leap.common.check import leap_assert
+
+
+class TempGPGWrapper(object):
+ """
+ A context manager that wraps a temporary GPG keyring which only contains
+ the keys given at object creation.
+ """
+
+ def __init__(self, keys=None, gpgbinary=None):
+ """
+ Create an empty temporary keyring and import any given C{keys} into
+ it.
+
+ :param keys: OpenPGP key, or list of.
+ :type keys: OpenPGPKey or list of OpenPGPKeys
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+ """
+ self._gpg = None
+ self._gpgbinary = gpgbinary
+ if not keys:
+ keys = list()
+ if not isinstance(keys, list):
+ keys = [keys]
+ self._keys = keys
+
+ def __enter__(self):
+ """
+ Build and return a GPG keyring containing the keys given on
+ object creation.
+
+ :return: A GPG instance containing the keys given on object creation.
+ :rtype: gnupg.GPG
+ """
+ self._build_keyring()
+ return self._gpg
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ Ensure the gpg is properly destroyed.
+ """
+ # TODO handle exceptions and log here
+ self._destroy_keyring()
+
+ def _build_keyring(self):
+ """
+ Create a GPG keyring containing the keys given on object creation.
+
+ :return: A GPG instance containing the keys given on object creation.
+ :rtype: gnupg.GPG
+ """
+ privkeys = [key for key in self._keys if key and key.private is True]
+ publkeys = [key for key in self._keys if key and key.private is False]
+ # here we filter out public keys that have a correspondent
+ # private key in the list because the private key_data by
+ # itself is enough to also have the public key in the keyring,
+ # and we want to count the keys afterwards.
+
+ privfps = map(lambda privkey: privkey.fingerprint, privkeys)
+ publkeys = filter(
+ lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
+
+ listkeys = lambda: self._gpg.list_keys()
+ listsecretkeys = lambda: self._gpg.list_keys(secret=True)
+
+ self._gpg = GPG(binary=self._gpgbinary,
+ homedir=tempfile.mkdtemp())
+ leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
+
+ # import keys into the keyring:
+ # concatenating ascii-armored keys, which is correctly
+ # understood by GPG.
+
+ self._gpg.import_keys("".join(
+ [x.key_data for x in publkeys + privkeys]))
+
+ # assert the number of keys in the keyring
+ leap_assert(
+ len(listkeys()) == len(publkeys) + len(privkeys),
+ 'Wrong number of public keys in keyring: %d, should be %d)' %
+ (len(listkeys()), len(publkeys) + len(privkeys)))
+ leap_assert(
+ len(listsecretkeys()) == len(privkeys),
+ 'Wrong number of private keys in keyring: %d, should be %d)' %
+ (len(listsecretkeys()), len(privkeys)))
+
+ def _destroy_keyring(self):
+ """
+ Securely erase the keyring.
+ """
+ # TODO: implement some kind of wiping of data or a more
+ # secure way that
+ # does not write to disk.
+
+ try:
+ for secret in [True, False]:
+ for key in self._gpg.list_keys(secret=secret):
+ self._gpg.delete_keys(
+ key['fingerprint'],
+ secret=secret)
+ leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
+
+ except:
+ raise
+
+ finally:
+ leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
+ "watch out! Tried to remove default gnupg home!")
+ shutil.rmtree(self._gpg.homedir)