summaryrefslogtreecommitdiff
path: root/keymanager
diff options
context:
space:
mode:
Diffstat (limited to 'keymanager')
-rw-r--r--keymanager/src/leap/keymanager/__init__.py842
-rw-r--r--keymanager/src/leap/keymanager/_version.py484
-rw-r--r--keymanager/src/leap/keymanager/documents.py101
-rw-r--r--keymanager/src/leap/keymanager/errors.py119
-rw-r--r--keymanager/src/leap/keymanager/keys.py290
-rw-r--r--keymanager/src/leap/keymanager/migrator.py167
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py881
-rw-r--r--keymanager/src/leap/keymanager/validation.py129
-rw-r--r--keymanager/src/leap/keymanager/wrapper.py134
9 files changed, 0 insertions, 3147 deletions
diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py
deleted file mode 100644
index 0b8a5b3..0000000
--- a/keymanager/src/leap/keymanager/__init__.py
+++ /dev/null
@@ -1,842 +0,0 @@
-# -*- coding: utf-8 -*-
-# __init__.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Key Manager is a Nicknym agent for LEAP client.
-"""
-# let's do a little sanity check to see if we're using the wrong gnupg
-import fileinput
-import os
-import sys
-import tempfile
-import json
-import urllib
-
-from leap.common import ca_bundle
-from twisted.web import client
-from twisted.web._responses import NOT_FOUND
-
-from ._version import get_versions
-
-try:
- from gnupg.gnupg import GPGUtilities
- assert(GPGUtilities) # pyflakes happy
- from gnupg import __version__ as _gnupg_version
- if '-' in _gnupg_version:
- # avoid Parsing it as LegacyVersion, get just
- # the release numbers:
- _gnupg_version = _gnupg_version.split('-')[0]
- from pkg_resources import parse_version
- # We need to make sure that we're not colliding with the infamous
- # python-gnupg
- assert(parse_version(_gnupg_version) >= parse_version('1.4.0'))
-
-except (ImportError, AssertionError):
- print "*******"
- print "Ooops! It looks like there is a conflict in the installed version "
- print "of gnupg."
- print "GNUPG_VERSION:", _gnupg_version
- print
- print "Disclaimer: Ideally, we would need to work a patch and propose the "
- print "merge to upstream. But until then do: "
- print
- print "% pip uninstall python-gnupg"
- print "% pip install gnupg"
- print "*******"
- sys.exit(1)
-
-import logging
-
-from twisted.internet import defer
-from urlparse import urlparse
-
-from leap.common.check import leap_assert
-from leap.common.http import HTTPClient
-from leap.common.events import emit_async, catalog
-from leap.common.decorators import memoized_method
-
-from leap.keymanager.errors import (
- KeyNotFound,
- KeyNotValidUpgrade,
- InvalidSignature
-)
-from leap.keymanager.validation import ValidationLevels, can_upgrade
-from leap.keymanager.openpgp import OpenPGPScheme
-
-__version__ = get_versions()['version']
-del get_versions
-
-logger = logging.getLogger(__name__)
-
-
-#
-# The Key Manager
-#
-
-class KeyManager(object):
-
- #
- # server's key storage constants
- #
-
- OPENPGP_KEY = 'openpgp'
- PUBKEY_KEY = "user[public_key]"
-
- def __init__(self, address, nickserver_uri, soledad, token=None,
- ca_cert_path=None, api_uri=None, api_version=None, uid=None,
- gpgbinary=None):
- """
- Initialize a Key Manager for user's C{address} with provider's
- nickserver reachable in C{nickserver_uri}.
-
- :param address: The email address of the user of this Key Manager.
- :type address: str
- :param nickserver_uri: The URI of the nickserver.
- :type nickserver_uri: str
- :param soledad: A Soledad instance for local storage of keys.
- :type soledad: leap.soledad.Soledad
- :param token: The token for interacting with the webapp API.
- :type token: str
- :param ca_cert_path: The path to the CA certificate.
- :type ca_cert_path: str
- :param api_uri: The URI of the webapp API.
- :type api_uri: str
- :param api_version: The version of the webapp API.
- :type api_version: str
- :param uid: The user's UID.
- :type uid: str
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
- """
- self._address = address
- self._nickserver_uri = nickserver_uri
- self._soledad = soledad
- self._token = token
- self.ca_cert_path = ca_cert_path
- self.api_uri = api_uri
- self.api_version = api_version
- self.uid = uid
- self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary)
- self._combined_ca_bundle = self._create_combined_bundle_file()
- self._async_client = HTTPClient(self._combined_ca_bundle)
- self._async_client_pinned = HTTPClient(self._ca_cert_path)
-
- #
- # destructor
- #
-
- def __del__(self):
- try:
- created_tmp_combined_ca_bundle = self._combined_ca_bundle not in \
- [ca_bundle.where(), self._ca_cert_path]
- if created_tmp_combined_ca_bundle:
- os.remove(self._combined_ca_bundle)
- except OSError:
- pass
-
- #
- # utilities
- #
-
- def _create_combined_bundle_file(self):
- leap_ca_bundle = ca_bundle.where()
-
- if self._ca_cert_path == leap_ca_bundle:
- return self._ca_cert_path # don't merge file with itself
- elif not self._ca_cert_path:
- return leap_ca_bundle
-
- tmp_file = tempfile.NamedTemporaryFile(delete=False)
-
- with open(tmp_file.name, 'w') as fout:
- fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path))
- for line in fin:
- fout.write(line)
- fin.close()
-
- return tmp_file.name
-
- @defer.inlineCallbacks
- def _get_key_from_nicknym(self, address):
- """
- Send a GET request to C{uri} containing C{data}.
-
- :param address: The URI of the request.
- :type address: str
-
- :return: A deferred that will be fired with GET content as json (dict)
- :rtype: Deferred
- """
- try:
- uri = self._nickserver_uri + '?address=' + address
- content = yield self._fetch_and_handle_404_from_nicknym(uri, address)
- json_content = json.loads(content)
-
- except KeyNotFound:
- raise
- except IOError as e:
- logger.warning("HTTP error retrieving key: %r" % (e,))
- logger.warning("%s" % (content,))
- raise KeyNotFound(e.message), None, sys.exc_info()[2]
- except ValueError as v:
- logger.warning("Invalid JSON data from key: %s" % (uri,))
- raise KeyNotFound(v.message + ' - ' + uri), None, sys.exc_info()[2]
-
- except Exception as e:
- logger.warning("Error retrieving key: %r" % (e,))
- raise KeyNotFound(e.message), None, sys.exc_info()[2]
- # Responses are now text/plain, although it's json anyway, but
- # this will fail when it shouldn't
- # leap_assert(
- # res.headers['content-type'].startswith('application/json'),
- # 'Content-type is not JSON.')
- defer.returnValue(json_content)
-
- def _fetch_and_handle_404_from_nicknym(self, uri, address):
- """
- Send a GET request to C{uri} containing C{data}.
-
- :param uri: The URI of the request.
- :type uri: str
- :param address: The email corresponding to the key.
- :type address: str
-
- :return: A deferred that will be fired with GET content as json (dict)
- :rtype: Deferred
- """
- def check_404(response):
- if response.code == NOT_FOUND:
- message = '%s: %s key not found.' % (response.code, address)
- logger.warning(message)
- raise KeyNotFound(message), None, sys.exc_info()[2]
- return response
-
- d = self._async_client_pinned.request(str(uri), 'GET', callback=check_404)
- d.addCallback(client.readBody)
- return d
-
- @defer.inlineCallbacks
- def _get_with_combined_ca_bundle(self, uri, data=None):
- """
- Send a GET request to C{uri} containing C{data}.
-
- Instead of using the ca_cert provided on construction time, this
- version also uses the default certificates shipped with leap.common
-
- :param uri: The URI of the request.
- :type uri: str
- :param data: The body of the request.
- :type data: dict, str or file
-
- :return: A deferred that will be fired with the GET response
- :rtype: Deferred
- """
- try:
- content = yield self._async_client.request(str(uri), 'GET')
- except Exception as e:
- logger.warning("There was a problem fetching key: %s" % (e,))
- raise KeyNotFound(uri)
- if not content:
- raise KeyNotFound(uri)
- defer.returnValue(content)
-
- @defer.inlineCallbacks
- def _put(self, uri, data=None):
- """
- Send a PUT request to C{uri} containing C{data}.
-
- The request will be sent using the configured CA certificate path to
- verify the server certificate and the configured session id for
- authentication.
-
- :param uri: The URI of the request.
- :type uri: str
- :param data: The body of the request.
- :type data: dict, str or file
-
- :return: A deferred that will be fired when PUT request finishes
- :rtype: Deferred
- """
- leap_assert(
- self._token is not None,
- 'We need a token to interact with webapp!')
- if type(data) == dict:
- data = urllib.urlencode(data)
- headers = {'Authorization': [str('Token token=%s' % self._token)]}
- headers['Content-Type'] = ['application/x-www-form-urlencoded']
- try:
- res = yield self._async_client_pinned.request(str(uri), 'PUT',
- body=str(data),
- headers=headers)
- except Exception as e:
- logger.warning("Error uploading key: %r" % (e,))
- raise e
- if 'error' in res:
- # FIXME: That's a workaround for 500,
- # we need to implement a readBody to assert response code
- logger.warning("Error uploading key: %r" % (res,))
- raise Exception(res)
-
- @memoized_method(invalidation=300)
- @defer.inlineCallbacks
- def _fetch_keys_from_server(self, address):
- """
- Fetch keys bound to address from nickserver and insert them in
- local database.
-
- :param address: The address bound to the keys.
- :type address: str
-
- :return: A Deferred which fires when the key is in the storage,
- or which fails with KeyNotFound if the key was not found on
- nickserver.
- :rtype: Deferred
-
- """
- # request keys from the nickserver
- server_keys = yield self._get_key_from_nicknym(address)
-
- # insert keys in local database
- if self.OPENPGP_KEY in server_keys:
- # nicknym server is authoritative for its own domain,
- # for other domains the key might come from key servers.
- validation_level = ValidationLevels.Weak_Chain
- _, domain = _split_email(address)
- if (domain == _get_domain(self._nickserver_uri)):
- validation_level = ValidationLevels.Provider_Trust
-
- yield self.put_raw_key(
- server_keys['openpgp'],
- address=address,
- validation=validation_level)
-
- #
- # key management
- #
-
- def send_key(self):
- """
- Send user's key to provider.
-
- Public key bound to user's is sent to provider, which will sign it and
- replace any prior keys for the same address in its database.
-
- :return: A Deferred which fires when the key is sent, or which fails
- with KeyNotFound if the key was not found in local database.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
- def send(pubkey):
- data = {
- self.PUBKEY_KEY: pubkey.key_data
- }
- uri = "%s/%s/users/%s.json" % (
- self._api_uri,
- self._api_version,
- self._uid)
- d = self._put(uri, data)
- d.addCallback(lambda _:
- emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS,
- self._address))
- return d
-
- d = self.get_key(
- self._address, private=False, fetch_remote=False)
- d.addCallback(send)
- return d
-
- def get_key(self, address, private=False, fetch_remote=True):
- """
- Return a key bound to address.
-
- First, search for the key in local storage. If it is not available,
- then try to fetch from nickserver.
-
- :param address: The address bound to the key.
- :type address: str
- :param private: Look for a private key instead of a public one?
- :type private: bool
- :param fetch_remote: If key not found in local storage try to fetch
- from nickserver
- :type fetch_remote: bool
-
- :return: A Deferred which fires with an EncryptionKey bound to address,
- or which fails with KeyNotFound if no key was found neither
- locally or in keyserver or fail with KeyVersionError if the
- key has a format not supported by this version of KeyManager
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
- logger.debug("getting key for %s" % (address,))
-
- emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
-
- def key_found(key):
- emit_async(catalog.KEYMANAGER_KEY_FOUND, address)
- return key
-
- def key_not_found(failure):
- if not failure.check(KeyNotFound):
- return failure
-
- emit_async(catalog.KEYMANAGER_KEY_NOT_FOUND, address)
-
- # we will only try to fetch a key from nickserver if fetch_remote
- # is True and the key is not private.
- if fetch_remote is False or private is True:
- return failure
-
- emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
- d = self._fetch_keys_from_server(address)
- d.addCallback(
- lambda _: self._openpgp.get_key(address, private=False))
- d.addCallback(key_found)
- return d
-
- # return key if it exists in local database
- d = self._openpgp.get_key(address, private=private)
- d.addCallbacks(key_found, key_not_found)
- return d
-
- def get_all_keys(self, private=False):
- """
- Return all keys stored in local database.
-
- :param private: Include private keys
- :type private: bool
-
- :return: A Deferred which fires with a list of all keys in local db.
- :rtype: Deferred
- """
- return self._openpgp.get_all_keys(private)
-
- def gen_key(self):
- """
- Generate a key bound to the user's address.
-
- :return: A Deferred which fires with the generated EncryptionKey.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- def signal_finished(key):
- emit_async(
- catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
- return key
-
- emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
-
- d = self._openpgp.gen_key(self._address)
- d.addCallback(signal_finished)
- return d
-
- #
- # Setters/getters
- #
-
- def _get_token(self):
- return self._token
-
- def _set_token(self, token):
- self._token = token
-
- token = property(
- _get_token, _set_token, doc='The session token.')
-
- def _get_ca_cert_path(self):
- return self._ca_cert_path
-
- def _set_ca_cert_path(self, ca_cert_path):
- self._ca_cert_path = ca_cert_path
-
- ca_cert_path = property(
- _get_ca_cert_path, _set_ca_cert_path,
- doc='The path to the CA certificate.')
-
- def _get_api_uri(self):
- return self._api_uri
-
- def _set_api_uri(self, api_uri):
- self._api_uri = api_uri
-
- api_uri = property(
- _get_api_uri, _set_api_uri, doc='The webapp API URI.')
-
- def _get_api_version(self):
- return self._api_version
-
- def _set_api_version(self, api_version):
- self._api_version = api_version
-
- api_version = property(
- _get_api_version, _set_api_version, doc='The webapp API version.')
-
- def _get_uid(self):
- return self._uid
-
- def _set_uid(self, uid):
- self._uid = uid
-
- uid = property(
- _get_uid, _set_uid, doc='The uid of the user.')
-
- #
- # encrypt/decrypt and sign/verify API
- #
-
- def encrypt(self, data, address, passphrase=None, sign=None,
- cipher_algo='AES256', fetch_remote=True):
- """
- Encrypt data with the public key bound to address and sign with with
- the private key bound to sign address.
-
- :param data: The data to be encrypted.
- :type data: str
- :param address: The address to encrypt it for.
- :type address: str
- :param passphrase: The passphrase for the secret key used for the
- signature.
- :type passphrase: str
- :param sign: The address to be used for signature.
- :type sign: str
- :param cipher_algo: The cipher algorithm to use.
- :type cipher_algo: str
- :param fetch_remote: If key is not found in local storage try to fetch
- from nickserver
- :type fetch_remote: bool
-
- :return: A Deferred which fires with the encrypted data as str, or
- which fails with KeyNotFound if no keys were found neither
- locally or in keyserver or fails with KeyVersionError if the
- key format is not supported or fails with EncryptError if
- failed encrypting for some reason.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- @defer.inlineCallbacks
- def encrypt(keys):
- pubkey, signkey = keys
- encrypted = yield self._openpgp.encrypt(
- data, pubkey, passphrase, sign=signkey,
- cipher_algo=cipher_algo)
- if not pubkey.encr_used:
- pubkey.encr_used = True
- yield self._openpgp.put_key(pubkey)
- defer.returnValue(encrypted)
-
- dpub = self.get_key(address, private=False,
- fetch_remote=fetch_remote)
- dpriv = defer.succeed(None)
- if sign is not None:
- dpriv = self.get_key(sign, private=True)
- d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
- d.addCallbacks(encrypt, self._extract_first_error)
- return d
-
- def decrypt(self, data, address, passphrase=None, verify=None,
- fetch_remote=True):
- """
- Decrypt data using private key from address and verify with public key
- bound to verify address.
-
- :param data: The data to be decrypted.
- :type data: str
- :param address: The address to whom data was encrypted.
- :type address: str
- :param passphrase: The passphrase for the secret key used for
- decryption.
- :type passphrase: str
- :param verify: The address to be used for signature.
- :type verify: str
- :param fetch_remote: If key for verify not found in local storage try
- to fetch from nickserver
- :type fetch_remote: bool
-
- :return: A Deferred which fires with:
- * (decripted str, signing key) if validation works
- * (decripted str, KeyNotFound) if signing key not found
- * (decripted str, InvalidSignature) if signature is invalid
- * KeyNotFound failure if private key not found
- * DecryptError failure if decription failed
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- @defer.inlineCallbacks
- def decrypt(keys):
- pubkey, privkey = keys
- decrypted, signed = yield self._openpgp.decrypt(
- data, privkey, passphrase=passphrase, verify=pubkey)
- if pubkey is None:
- signature = KeyNotFound(verify)
- elif signed:
- signature = pubkey
- if not pubkey.sign_used:
- pubkey.sign_used = True
- yield self._openpgp.put_key(pubkey)
- defer.returnValue((decrypted, signature))
- else:
- signature = InvalidSignature(
- 'Failed to verify signature with key %s' %
- (pubkey.fingerprint,))
- defer.returnValue((decrypted, signature))
-
- dpriv = self.get_key(address, private=True)
- dpub = defer.succeed(None)
- if verify is not None:
- dpub = self.get_key(verify, private=False,
- fetch_remote=fetch_remote)
- dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f)
- d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
- d.addCallbacks(decrypt, self._extract_first_error)
- return d
-
- def _extract_first_error(self, failure):
- return failure.value.subFailure
-
- def sign(self, data, address, digest_algo='SHA512', clearsign=False,
- detach=True, binary=False):
- """
- Sign data with private key bound to address.
-
- :param data: The data to be signed.
- :type data: str
- :param address: The address to be used to sign.
- :type address: EncryptionKey
- :param digest_algo: The hash digest to use.
- :type digest_algo: str
- :param clearsign: If True, create a cleartext signature.
- :type clearsign: bool
- :param detach: If True, create a detached signature.
- :type detach: bool
- :param binary: If True, do not ascii armour the output.
- :type binary: bool
-
- :return: A Deferred which fires with the signed data as str or fails
- with KeyNotFound if no key was found neither locally or in
- keyserver or fails with SignFailed if there was any error
- signing.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- def sign(privkey):
- return self._openpgp.sign(
- data, privkey, digest_algo=digest_algo, clearsign=clearsign,
- detach=detach, binary=binary)
-
- d = self.get_key(address, private=True)
- d.addCallback(sign)
- return d
-
- def verify(self, data, address, detached_sig=None,
- fetch_remote=True):
- """
- Verify signed data with private key bound to address, eventually using
- detached_sig.
-
- :param data: The data to be verified.
- :type data: str
- :param address: The address to be used to verify.
- :type address: EncryptionKey
- :param detached_sig: A detached signature. If given, C{data} is
- verified using this detached signature.
- :type detached_sig: str
- :param fetch_remote: If key for verify not found in local storage try
- to fetch from nickserver
- :type fetch_remote: bool
-
- :return: A Deferred which fires with the signing EncryptionKey if
- signature verifies, or which fails with InvalidSignature if
- signature don't verifies or fails with KeyNotFound if no key
- was found neither locally or in keyserver.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- def verify(pubkey):
- signed = self._openpgp.verify(
- data, pubkey, detached_sig=detached_sig)
- if signed:
- if not pubkey.sign_used:
- pubkey.sign_used = True
- d = self._openpgp.put_key(pubkey)
- d.addCallback(lambda _: pubkey)
- return d
- return pubkey
- else:
- raise InvalidSignature(
- 'Failed to verify signature with key %s' %
- (pubkey.fingerprint,))
-
- d = self.get_key(address, private=False,
- fetch_remote=fetch_remote)
- d.addCallback(verify)
- return d
-
- def delete_key(self, key):
- """
- Remove key from storage.
-
- :param key: The key to be removed.
- :type key: EncryptionKey
-
- :return: A Deferred which fires when the key is deleted, or which fails
- KeyNotFound if the key was not found on local storage.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
- return self._openpgp.delete_key(key)
-
- def put_key(self, key):
- """
- Put key bound to address in local storage.
-
- :param key: The key to be stored
- :type key: EncryptionKey
-
- :return: A Deferred which fires when the key is in the storage, or
- which fails with KeyNotValidUpdate if a key with the same
- uid exists and the new one is not a valid update for it.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- def old_key_not_found(failure):
- if failure.check(KeyNotFound):
- return None
- else:
- return failure
-
- def check_upgrade(old_key):
- if key.private or can_upgrade(key, old_key):
- return self._openpgp.put_key(key)
- else:
- raise KeyNotValidUpgrade(
- "Key %s can not be upgraded by new key %s"
- % (old_key.fingerprint, key.fingerprint))
-
- d = self._openpgp.get_key(key.address, private=key.private)
- d.addErrback(old_key_not_found)
- d.addCallback(check_upgrade)
- return d
-
- def put_raw_key(self, key, address,
- validation=ValidationLevels.Weak_Chain):
- """
- Put raw key bound to address in local storage.
-
- :param key: The ascii key to be stored
- :type key: str
- :param address: address for which this key will be active
- :type address: str
- :param validation: validation level for this key
- (default: 'Weak_Chain')
- :type validation: ValidationLevels
-
- :return: A Deferred which fires when the key is in the storage, or
- which fails with KeyAddressMismatch if address doesn't match
- any uid on the key or fails with KeyNotFound if no OpenPGP
- material was found in key or fails with KeyNotValidUpdate if a
- key with the same uid exists and the new one is not a valid
- update for it.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
- pubkey, privkey = self._openpgp.parse_key(key, address)
-
- if pubkey is None:
- return defer.fail(KeyNotFound(key))
-
- pubkey.validation = validation
- d = self.put_key(pubkey)
- if privkey is not None:
- d.addCallback(lambda _: self.put_key(privkey))
- return d
-
- @defer.inlineCallbacks
- def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain):
- """
- Fetch a public key bound to address from the network and put it in
- local storage.
-
- :param address: The email address of the key.
- :type address: str
- :param uri: The URI of the key.
- :type uri: str
- :param validation: validation level for this key
- (default: 'Weak_Chain')
- :type validation: ValidationLevels
-
- :return: A Deferred which fires when the key is in the storage, or
- which fails with KeyNotFound: if not valid key on uri or fails
- with KeyAddressMismatch if address doesn't match any uid on
- the key or fails with KeyNotValidUpdate if a key with the same
- uid exists and the new one is not a valid update for it.
- :rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
-
- logger.info("Fetch key for %s from %s" % (address, uri))
- ascii_content = yield self._get_with_combined_ca_bundle(uri)
-
- # XXX parse binary keys
- pubkey, _ = self._openpgp.parse_key(ascii_content, address)
- if pubkey is None:
- raise KeyNotFound(uri)
-
- pubkey.validation = validation
- yield self.put_key(pubkey)
-
-
-def _split_email(address):
- """
- Split username and domain from an email address
-
- :param address: an email address
- :type address: str
-
- :return: username and domain from the email address
- :rtype: (str, str)
- """
- if address.count("@") != 1:
- return None
- return address.split("@")
-
-
-def _get_domain(url):
- """
- Get the domain from an url
-
- :param url: an url
- :type url: str
-
- :return: the domain part of the url
- :rtype: str
- """
- return urlparse(url).hostname
diff --git a/keymanager/src/leap/keymanager/_version.py b/keymanager/src/leap/keymanager/_version.py
deleted file mode 100644
index b28c697..0000000
--- a/keymanager/src/leap/keymanager/_version.py
+++ /dev/null
@@ -1,484 +0,0 @@
-
-# This file helps to compute a version number in source trees obtained from
-# git-archive tarball (such as those provided by githubs download-from-tag
-# feature). Distribution tarballs (built by setup.py sdist) and build
-# directories (produced by setup.py build) will contain a much shorter file
-# that just contains the computed version number.
-
-# This file is released into the public domain. Generated by
-# versioneer-0.16 (https://github.com/warner/python-versioneer)
-
-"""Git implementation of _version.py."""
-
-import errno
-import os
-import re
-import subprocess
-import sys
-
-
-def get_keywords():
- """Get the keywords needed to look up the version information."""
- # these strings will be replaced by git during git-archive.
- # setup.py/versioneer.py will grep for the variable names, so they must
- # each be defined on a line of their own. _version.py will just call
- # get_keywords().
- git_refnames = "$Format:%d$"
- git_full = "$Format:%H$"
- keywords = {"refnames": git_refnames, "full": git_full}
- return keywords
-
-
-class VersioneerConfig:
- """Container for Versioneer configuration parameters."""
-
-
-def get_config():
- """Create, populate and return the VersioneerConfig() object."""
- # these strings are filled in when 'setup.py versioneer' creates
- # _version.py
- cfg = VersioneerConfig()
- cfg.VCS = "git"
- cfg.style = "pep440"
- cfg.tag_prefix = ""
- cfg.parentdir_prefix = "None"
- cfg.versionfile_source = "src/leap/keymanager/_version.py"
- cfg.verbose = False
- return cfg
-
-
-class NotThisMethod(Exception):
- """Exception raised if a method is not valid for the current scenario."""
-
-
-LONG_VERSION_PY = {}
-HANDLERS = {}
-
-
-def register_vcs_handler(vcs, method): # decorator
- """Decorator to mark a method as the handler for a particular VCS."""
- def decorate(f):
- """Store f in HANDLERS[vcs][method]."""
- if vcs not in HANDLERS:
- HANDLERS[vcs] = {}
- HANDLERS[vcs][method] = f
- return f
- return decorate
-
-
-def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False):
- """Call the given command(s)."""
- assert isinstance(commands, list)
- p = None
- for c in commands:
- try:
- dispcmd = str([c] + args)
- # remember shell=False, so use git.cmd on windows, not just git
- p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE,
- stderr=(subprocess.PIPE if hide_stderr
- else None))
- break
- except EnvironmentError:
- e = sys.exc_info()[1]
- if e.errno == errno.ENOENT:
- continue
- if verbose:
- print("unable to run %s" % dispcmd)
- print(e)
- return None
- else:
- if verbose:
- print("unable to find command, tried %s" % (commands,))
- return None
- stdout = p.communicate()[0].strip()
- if sys.version_info[0] >= 3:
- stdout = stdout.decode()
- if p.returncode != 0:
- if verbose:
- print("unable to run %s (error)" % dispcmd)
- return None
- return stdout
-
-
-def versions_from_parentdir(parentdir_prefix, root, verbose):
- """Try to determine the version from the parent directory name.
-
- Source tarballs conventionally unpack into a directory that includes
- both the project name and a version string.
- """
- dirname = os.path.basename(root)
- if not dirname.startswith(parentdir_prefix):
- if verbose:
- print("guessing rootdir is '%s', but '%s' doesn't start with "
- "prefix '%s'" % (root, dirname, parentdir_prefix))
- raise NotThisMethod("rootdir doesn't start with parentdir_prefix")
- return {"version": dirname[len(parentdir_prefix):],
- "full-revisionid": None,
- "dirty": False, "error": None}
-
-
-@register_vcs_handler("git", "get_keywords")
-def git_get_keywords(versionfile_abs):
- """Extract version information from the given file."""
- # the code embedded in _version.py can just fetch the value of these
- # keywords. When used from setup.py, we don't want to import _version.py,
- # so we do it with a regexp instead. This function is not used from
- # _version.py.
- keywords = {}
- try:
- f = open(versionfile_abs, "r")
- for line in f.readlines():
- if line.strip().startswith("git_refnames ="):
- mo = re.search(r'=\s*"(.*)"', line)
- if mo:
- keywords["refnames"] = mo.group(1)
- if line.strip().startswith("git_full ="):
- mo = re.search(r'=\s*"(.*)"', line)
- if mo:
- keywords["full"] = mo.group(1)
- f.close()
- except EnvironmentError:
- pass
- return keywords
-
-
-@register_vcs_handler("git", "keywords")
-def git_versions_from_keywords(keywords, tag_prefix, verbose):
- """Get version information from git keywords."""
- if not keywords:
- raise NotThisMethod("no keywords at all, weird")
- refnames = keywords["refnames"].strip()
- if refnames.startswith("$Format"):
- if verbose:
- print("keywords are unexpanded, not using")
- raise NotThisMethod("unexpanded keywords, not a git-archive tarball")
- refs = set([r.strip() for r in refnames.strip("()").split(",")])
- # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of
- # just "foo-1.0". If we see a "tag: " prefix, prefer those.
- TAG = "tag: "
- tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)])
- if not tags:
- # Either we're using git < 1.8.3, or there really are no tags. We use
- # a heuristic: assume all version tags have a digit. The old git %d
- # expansion behaves like git log --decorate=short and strips out the
- # refs/heads/ and refs/tags/ prefixes that would let us distinguish
- # between branches and tags. By ignoring refnames without digits, we
- # filter out many common branch names like "release" and
- # "stabilization", as well as "HEAD" and "master".
- tags = set([r for r in refs if re.search(r'\d', r)])
- if verbose:
- print("discarding '%s', no digits" % ",".join(refs-tags))
- if verbose:
- print("likely tags: %s" % ",".join(sorted(tags)))
- for ref in sorted(tags):
- # sorting will prefer e.g. "2.0" over "2.0rc1"
- if ref.startswith(tag_prefix):
- r = ref[len(tag_prefix):]
- if verbose:
- print("picking %s" % r)
- return {"version": r,
- "full-revisionid": keywords["full"].strip(),
- "dirty": False, "error": None
- }
- # no suitable tags, so version is "0+unknown", but full hex is still there
- if verbose:
- print("no suitable tags, using unknown + full revision id")
- return {"version": "0+unknown",
- "full-revisionid": keywords["full"].strip(),
- "dirty": False, "error": "no suitable tags"}
-
-
-@register_vcs_handler("git", "pieces_from_vcs")
-def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
- """Get version from 'git describe' in the root of the source tree.
-
- This only gets called if the git-archive 'subst' keywords were *not*
- expanded, and _version.py hasn't already been rewritten with a short
- version string, meaning we're inside a checked out source tree.
- """
- if not os.path.exists(os.path.join(root, ".git")):
- if verbose:
- print("no .git in %s" % root)
- raise NotThisMethod("no .git directory")
-
- GITS = ["git"]
- if sys.platform == "win32":
- GITS = ["git.cmd", "git.exe"]
- # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty]
- # if there isn't one, this yields HEX[-dirty] (no NUM)
- describe_out = run_command(GITS, ["describe", "--tags", "--dirty",
- "--always", "--long",
- "--match", "%s*" % tag_prefix],
- cwd=root)
- # --long was added in git-1.5.5
- if describe_out is None:
- raise NotThisMethod("'git describe' failed")
- describe_out = describe_out.strip()
- full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root)
- if full_out is None:
- raise NotThisMethod("'git rev-parse' failed")
- full_out = full_out.strip()
-
- pieces = {}
- pieces["long"] = full_out
- pieces["short"] = full_out[:7] # maybe improved later
- pieces["error"] = None
-
- # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
- # TAG might have hyphens.
- git_describe = describe_out
-
- # look for -dirty suffix
- dirty = git_describe.endswith("-dirty")
- pieces["dirty"] = dirty
- if dirty:
- git_describe = git_describe[:git_describe.rindex("-dirty")]
-
- # now we have TAG-NUM-gHEX or HEX
-
- if "-" in git_describe:
- # TAG-NUM-gHEX
- mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe)
- if not mo:
- # unparseable. Maybe git-describe is misbehaving?
- pieces["error"] = ("unable to parse git-describe output: '%s'"
- % describe_out)
- return pieces
-
- # tag
- full_tag = mo.group(1)
- if not full_tag.startswith(tag_prefix):
- if verbose:
- fmt = "tag '%s' doesn't start with prefix '%s'"
- print(fmt % (full_tag, tag_prefix))
- pieces["error"] = ("tag '%s' doesn't start with prefix '%s'"
- % (full_tag, tag_prefix))
- return pieces
- pieces["closest-tag"] = full_tag[len(tag_prefix):]
-
- # distance: number of commits since tag
- pieces["distance"] = int(mo.group(2))
-
- # commit: short hex revision ID
- pieces["short"] = mo.group(3)
-
- else:
- # HEX: no tags
- pieces["closest-tag"] = None
- count_out = run_command(GITS, ["rev-list", "HEAD", "--count"],
- cwd=root)
- pieces["distance"] = int(count_out) # total number of commits
-
- return pieces
-
-
-def plus_or_dot(pieces):
- """Return a + if we don't already have one, else return a ."""
- if "+" in pieces.get("closest-tag", ""):
- return "."
- return "+"
-
-
-def render_pep440(pieces):
- """Build up version string, with post-release "local version identifier".
-
- Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you
- get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty
-
- Exceptions:
- 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty]
- """
- if pieces["closest-tag"]:
- rendered = pieces["closest-tag"]
- if pieces["distance"] or pieces["dirty"]:
- rendered += plus_or_dot(pieces)
- rendered += "%d.g%s" % (pieces["distance"], pieces["short"])
- if pieces["dirty"]:
- rendered += ".dirty"
- else:
- # exception #1
- rendered = "0+untagged.%d.g%s" % (pieces["distance"],
- pieces["short"])
- if pieces["dirty"]:
- rendered += ".dirty"
- return rendered
-
-
-def render_pep440_pre(pieces):
- """TAG[.post.devDISTANCE] -- No -dirty.
-
- Exceptions:
- 1: no tags. 0.post.devDISTANCE
- """
- if pieces["closest-tag"]:
- rendered = pieces["closest-tag"]
- if pieces["distance"]:
- rendered += ".post.dev%d" % pieces["distance"]
- else:
- # exception #1
- rendered = "0.post.dev%d" % pieces["distance"]
- return rendered
-
-
-def render_pep440_post(pieces):
- """TAG[.postDISTANCE[.dev0]+gHEX] .
-
- The ".dev0" means dirty. Note that .dev0 sorts backwards
- (a dirty tree will appear "older" than the corresponding clean one),
- but you shouldn't be releasing software with -dirty anyways.
-
- Exceptions:
- 1: no tags. 0.postDISTANCE[.dev0]
- """
- if pieces["closest-tag"]:
- rendered = pieces["closest-tag"]
- if pieces["distance"] or pieces["dirty"]:
- rendered += ".post%d" % pieces["distance"]
- if pieces["dirty"]:
- rendered += ".dev0"
- rendered += plus_or_dot(pieces)
- rendered += "g%s" % pieces["short"]
- else:
- # exception #1
- rendered = "0.post%d" % pieces["distance"]
- if pieces["dirty"]:
- rendered += ".dev0"
- rendered += "+g%s" % pieces["short"]
- return rendered
-
-
-def render_pep440_old(pieces):
- """TAG[.postDISTANCE[.dev0]] .
-
- The ".dev0" means dirty.
-
- Eexceptions:
- 1: no tags. 0.postDISTANCE[.dev0]
- """
- if pieces["closest-tag"]:
- rendered = pieces["closest-tag"]
- if pieces["distance"] or pieces["dirty"]:
- rendered += ".post%d" % pieces["distance"]
- if pieces["dirty"]:
- rendered += ".dev0"
- else:
- # exception #1
- rendered = "0.post%d" % pieces["distance"]
- if pieces["dirty"]:
- rendered += ".dev0"
- return rendered
-
-
-def render_git_describe(pieces):
- """TAG[-DISTANCE-gHEX][-dirty].
-
- Like 'git describe --tags --dirty --always'.
-
- Exceptions:
- 1: no tags. HEX[-dirty] (note: no 'g' prefix)
- """
- if pieces["closest-tag"]:
- rendered = pieces["closest-tag"]
- if pieces["distance"]:
- rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
- else:
- # exception #1
- rendered = pieces["short"]
- if pieces["dirty"]:
- rendered += "-dirty"
- return rendered
-
-
-def render_git_describe_long(pieces):
- """TAG-DISTANCE-gHEX[-dirty].
-
- Like 'git describe --tags --dirty --always -long'.
- The distance/hash is unconditional.
-
- Exceptions:
- 1: no tags. HEX[-dirty] (note: no 'g' prefix)
- """
- if pieces["closest-tag"]:
- rendered = pieces["closest-tag"]
- rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
- else:
- # exception #1
- rendered = pieces["short"]
- if pieces["dirty"]:
- rendered += "-dirty"
- return rendered
-
-
-def render(pieces, style):
- """Render the given version pieces into the requested style."""
- if pieces["error"]:
- return {"version": "unknown",
- "full-revisionid": pieces.get("long"),
- "dirty": None,
- "error": pieces["error"]}
-
- if not style or style == "default":
- style = "pep440" # the default
-
- if style == "pep440":
- rendered = render_pep440(pieces)
- elif style == "pep440-pre":
- rendered = render_pep440_pre(pieces)
- elif style == "pep440-post":
- rendered = render_pep440_post(pieces)
- elif style == "pep440-old":
- rendered = render_pep440_old(pieces)
- elif style == "git-describe":
- rendered = render_git_describe(pieces)
- elif style == "git-describe-long":
- rendered = render_git_describe_long(pieces)
- else:
- raise ValueError("unknown style '%s'" % style)
-
- return {"version": rendered, "full-revisionid": pieces["long"],
- "dirty": pieces["dirty"], "error": None}
-
-
-def get_versions():
- """Get version information or return default if unable to do so."""
- # I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have
- # __file__, we can work backwards from there to the root. Some
- # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which
- # case we can only use expanded keywords.
-
- cfg = get_config()
- verbose = cfg.verbose
-
- try:
- return git_versions_from_keywords(get_keywords(), cfg.tag_prefix,
- verbose)
- except NotThisMethod:
- pass
-
- try:
- root = os.path.realpath(__file__)
- # versionfile_source is the relative path from the top of the source
- # tree (where the .git directory might live) to this file. Invert
- # this to find the root from __file__.
- for i in cfg.versionfile_source.split('/'):
- root = os.path.dirname(root)
- except NameError:
- return {"version": "0+unknown", "full-revisionid": None,
- "dirty": None,
- "error": "unable to find root of source tree"}
-
- try:
- pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose)
- return render(pieces, cfg.style)
- except NotThisMethod:
- pass
-
- try:
- if cfg.parentdir_prefix:
- return versions_from_parentdir(cfg.parentdir_prefix, root, verbose)
- except NotThisMethod:
- pass
-
- return {"version": "0+unknown", "full-revisionid": None,
- "dirty": None,
- "error": "unable to compute version"}
diff --git a/keymanager/src/leap/keymanager/documents.py b/keymanager/src/leap/keymanager/documents.py
deleted file mode 100644
index 2ed5376..0000000
--- a/keymanager/src/leap/keymanager/documents.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# -*- coding: utf-8 -*-
-# documents.py
-# Copyright (C) 2013-2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Soledad documents
-"""
-from twisted.internet import defer
-from leap.common.check import leap_assert
-
-#
-# Dictionary keys used for storing cryptographic keys.
-#
-
-KEY_VERSION_KEY = 'version'
-KEY_UIDS_KEY = 'uids'
-KEY_ADDRESS_KEY = 'address'
-KEY_TYPE_KEY = 'type'
-KEY_FINGERPRINT_KEY = 'fingerprint'
-KEY_DATA_KEY = 'key_data'
-KEY_PRIVATE_KEY = 'private'
-KEY_LENGTH_KEY = 'length'
-KEY_EXPIRY_DATE_KEY = 'expiry_date'
-KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
-KEY_REFRESHED_AT_KEY = 'refreshed_at'
-KEY_VALIDATION_KEY = 'validation'
-KEY_ENCR_USED_KEY = 'encr_used'
-KEY_SIGN_USED_KEY = 'sign_used'
-KEY_TAGS_KEY = 'tags'
-
-
-#
-# Key storage constants
-#
-
-KEYMANAGER_KEY_TAG = 'keymanager-key'
-KEYMANAGER_ACTIVE_TAG = 'keymanager-active'
-KEYMANAGER_ACTIVE_TYPE = '-active'
-
-# Version of the Soledad Document schema,
-# it should be bumped each time the document format changes
-KEYMANAGER_DOC_VERSION = 1
-
-
-#
-# key indexing constants.
-#
-
-TAGS_PRIVATE_INDEX = 'by-tags-private'
-TYPE_FINGERPRINT_PRIVATE_INDEX = 'by-type-fingerprint-private'
-TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private'
-INDEXES = {
- TAGS_PRIVATE_INDEX: [
- KEY_TAGS_KEY,
- 'bool(%s)' % KEY_PRIVATE_KEY,
- ],
- TYPE_FINGERPRINT_PRIVATE_INDEX: [
- KEY_TYPE_KEY,
- KEY_FINGERPRINT_KEY,
- 'bool(%s)' % KEY_PRIVATE_KEY,
- ],
- TYPE_ADDRESS_PRIVATE_INDEX: [
- KEY_TYPE_KEY,
- KEY_ADDRESS_KEY,
- 'bool(%s)' % KEY_PRIVATE_KEY,
- ]
-}
-
-
-@defer.inlineCallbacks
-def init_indexes(soledad):
- """
- Initialize the database indexes.
- """
- leap_assert(soledad is not None,
- "Cannot init indexes with null soledad")
-
- indexes = yield soledad.list_indexes()
- db_indexes = dict(indexes)
- # Loop through the indexes we expect to find.
- for name, expression in INDEXES.items():
- if name not in db_indexes:
- # The index does not yet exist.
- yield soledad.create_index(name, *expression)
- elif expression != db_indexes[name]:
- # The index exists but the definition is not what expected,
- # so we delete it and add the proper index expression.
- yield soledad.delete_index(name)
- yield soledad.create_index(name, *expression)
diff --git a/keymanager/src/leap/keymanager/errors.py b/keymanager/src/leap/keymanager/errors.py
deleted file mode 100644
index dfff393..0000000
--- a/keymanager/src/leap/keymanager/errors.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# -*- coding: utf-8 -*-
-# errors.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-Errors and exceptions used by the Key Manager.
-"""
-
-
-class KeyNotFound(Exception):
- """
- Raised when key was no found on keyserver.
- """
- pass
-
-
-class KeyVersionError(KeyNotFound):
- """
- Raised when key was found in the keyring but the version is not supported.
-
- It will usually mean that it was created by a newer version of KeyManager.
- """
- pass
-
-
-class KeyAlreadyExists(Exception):
- """
- Raised when attempted to create a key that already exists.
- """
- pass
-
-
-class KeyAttributesDiffer(Exception):
- """
- Raised when trying to delete a key but the stored key differs from the key
- passed to the delete_key() method.
- """
- pass
-
-
-class NoPasswordGiven(Exception):
- """
- Raised when trying to perform some action that needs a password without
- providing one.
- """
- pass
-
-
-class InvalidSignature(Exception):
- """
- Raised when signature could not be verified.
- """
- pass
-
-
-class EncryptError(Exception):
- """
- Raised upon failures of encryption.
- """
- pass
-
-
-class DecryptError(Exception):
- """
- Raised upon failures of decryption.
- """
- pass
-
-
-class GPGError(Exception):
- """
- Raised upon failures of encryption/decryption.
- """
- pass
-
-
-class SignFailed(Exception):
- """
- Raised when failed to sign.
- """
- pass
-
-
-class KeyAddressMismatch(Exception):
- """
- A mismatch between addresses.
- """
-
-
-class KeyFingerprintMismatch(Exception):
- """
- A mismatch between fingerprints.
- """
-
-
-class KeyNotValidUpgrade(Exception):
- """
- Already existing key can not be upgraded with the new key
- """
-
-
-class UnsupportedKeyTypeError(Exception):
- """
- Invalid key type
- """
diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py
deleted file mode 100644
index 91ecf3a..0000000
--- a/keymanager/src/leap/keymanager/keys.py
+++ /dev/null
@@ -1,290 +0,0 @@
-# -*- coding: utf-8 -*-
-# keys.py
-# Copyright (C) 2013-2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Abstact key type and encryption scheme representations.
-"""
-
-
-import json
-import logging
-import re
-import time
-
-from datetime import datetime
-
-from leap.keymanager import errors
-from leap.keymanager.wrapper import TempGPGWrapper
-from leap.keymanager.validation import ValidationLevels
-from leap.keymanager import documents as doc
-
-logger = logging.getLogger(__name__)
-
-
-#
-# Key handling utilities
-#
-
-def is_address(address):
- """
- Return whether the given C{address} is in the form user@provider.
-
- :param address: The address to be tested.
- :type address: str
- :return: Whether C{address} is in the form user@provider.
- :rtype: bool
- """
- return bool(re.match('[\w.-]+@[\w.-]+', address))
-
-
-def build_key_from_dict(key, active=None):
- """
- Build an OpenPGPKey key based on info in C{kdict}.
-
- :param key: Dictionary with key data.
- :type key: dict
- :param active: Dictionary with active data.
- :type active: dict
- :return: An instance of the key.
- :rtype: C{kClass}
- """
- address = None
- validation = ValidationLevels.Weak_Chain
- last_audited_at = None
- encr_used = False
- sign_used = False
-
- if active:
- address = active[doc.KEY_ADDRESS_KEY]
- try:
- validation = ValidationLevels.get(active[doc.KEY_VALIDATION_KEY])
- except ValueError:
- logger.error("Not valid validation level (%s) for key %s",
- (active[doc.KEY_VALIDATION_KEY],
- active[doc.KEY_FINGERPRINT_KEY]))
- last_audited_at = _to_datetime(active[doc.KEY_LAST_AUDITED_AT_KEY])
- encr_used = active[doc.KEY_ENCR_USED_KEY]
- sign_used = active[doc.KEY_SIGN_USED_KEY]
-
- expiry_date = _to_datetime(key[doc.KEY_EXPIRY_DATE_KEY])
- refreshed_at = _to_datetime(key[doc.KEY_REFRESHED_AT_KEY])
-
- return OpenPGPKey(
- address=address,
- uids=key[doc.KEY_UIDS_KEY],
- fingerprint=key[doc.KEY_FINGERPRINT_KEY],
- key_data=key[doc.KEY_DATA_KEY],
- private=key[doc.KEY_PRIVATE_KEY],
- length=key[doc.KEY_LENGTH_KEY],
- expiry_date=expiry_date,
- last_audited_at=last_audited_at,
- refreshed_at=refreshed_at,
- validation=validation,
- encr_used=encr_used,
- sign_used=sign_used,
- )
-
-
-def _to_datetime(unix_time):
- if unix_time != 0:
- return datetime.fromtimestamp(unix_time)
- else:
- return None
-
-
-def _to_unix_time(date):
- if date is not None:
- return int(time.mktime(date.timetuple()))
- else:
- return 0
-
-
-class OpenPGPKey(object):
- """
- Base class for OpenPGP keys.
- """
-
- __slots__ = ('address', 'uids', 'fingerprint', 'key_data',
- 'private', 'length', 'expiry_date', 'validation',
- 'last_audited_at', 'refreshed_at',
- 'encr_used', 'sign_used', '_index', '_gpgbinary')
-
- def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="",
- key_data="", private=False, length=0, expiry_date=None,
- validation=ValidationLevels.Weak_Chain, last_audited_at=None,
- refreshed_at=None, encr_used=False, sign_used=False):
- self._gpgbinary = gpgbinary
- self.address = address
- if not uids and address:
- self.uids = [address]
- else:
- self.uids = uids
- self.fingerprint = fingerprint
- self.key_data = key_data
- self.private = private
- self.length = length
- self.expiry_date = expiry_date
-
- self.validation = validation
- self.last_audited_at = last_audited_at
- self.refreshed_at = refreshed_at
- self.encr_used = encr_used
- self.sign_used = sign_used
- self._index = len(self.__slots__)
-
- @property
- def signatures(self):
- """
- Get the key signatures
-
- :return: the key IDs that have signed the key
- :rtype: list(str)
- """
- with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
- res = gpg.list_sigs(self.fingerprint)
- for uid, sigs in res.sigs.iteritems():
- if parse_address(uid) in self.uids:
- return sigs
-
- return []
-
- def merge(self, newkey):
- if newkey.fingerprint != self.fingerprint:
- logger.critical(
- "Can't put a key whith the same key_id and different "
- "fingerprint: %s, %s"
- % (newkey.fingerprint, self.fingerprint))
- raise errors.KeyFingerprintMismatch(newkey.fingerprint)
-
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- gpg.import_keys(self.key_data)
- gpg.import_keys(newkey.key_data)
- gpgkey = gpg.list_keys(secret=newkey.private).pop()
-
- if gpgkey['expires']:
- self.expiry_date = datetime.fromtimestamp(
- int(gpgkey['expires']))
- else:
- self.expiry_date = None
-
- self.uids = []
- for uid in gpgkey['uids']:
- self.uids.append(parse_address(uid))
-
- self.length = int(gpgkey['length'])
- self.key_data = gpg.export_keys(gpgkey['fingerprint'],
- secret=self.private)
-
- if newkey.validation > self.validation:
- self.validation = newkey.validation
- if newkey.last_audited_at > self.last_audited_at:
- self.validation = newkey.last_audited_at
- self.encr_used = newkey.encr_used or self.encr_used
- self.sign_used = newkey.sign_used or self.sign_used
- self.refreshed_at = datetime.now()
-
- def get_json(self):
- """
- Return a JSON string describing this key.
-
- :return: The JSON string describing this key.
- :rtype: str
- """
- expiry_date = _to_unix_time(self.expiry_date)
- refreshed_at = _to_unix_time(self.refreshed_at)
-
- return json.dumps({
- doc.KEY_UIDS_KEY: self.uids,
- doc.KEY_TYPE_KEY: self.__class__.__name__,
- doc.KEY_FINGERPRINT_KEY: self.fingerprint,
- doc.KEY_DATA_KEY: self.key_data,
- doc.KEY_PRIVATE_KEY: self.private,
- doc.KEY_LENGTH_KEY: self.length,
- doc.KEY_EXPIRY_DATE_KEY: expiry_date,
- doc.KEY_REFRESHED_AT_KEY: refreshed_at,
- doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION,
- doc.KEY_TAGS_KEY: [doc.KEYMANAGER_KEY_TAG],
- })
-
- def get_active_json(self):
- """
- Return a JSON string describing this key.
-
- :return: The JSON string describing this key.
- :rtype: str
- """
- last_audited_at = _to_unix_time(self.last_audited_at)
-
- return json.dumps({
- doc.KEY_ADDRESS_KEY: self.address,
- doc.KEY_TYPE_KEY: (self.__class__.__name__ +
- doc.KEYMANAGER_ACTIVE_TYPE),
- doc.KEY_FINGERPRINT_KEY: self.fingerprint,
- doc.KEY_PRIVATE_KEY: self.private,
- doc.KEY_VALIDATION_KEY: str(self.validation),
- doc.KEY_LAST_AUDITED_AT_KEY: last_audited_at,
- doc.KEY_ENCR_USED_KEY: self.encr_used,
- doc.KEY_SIGN_USED_KEY: self.sign_used,
- doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION,
- doc.KEY_TAGS_KEY: [doc.KEYMANAGER_ACTIVE_TAG],
- })
-
- def next(self):
- if self._index == 0:
- self._index = len(self.__slots__)
- raise StopIteration
-
- self._index -= 1
- key = self.__slots__[self._index]
-
- if key.startswith('_'):
- return self.next()
-
- value = getattr(self, key)
- if key == "validation":
- value = str(value)
- elif key in ["expiry_date", "last_audited_at", "refreshed_at"]:
- value = str(value)
- return key, value
-
- def __iter__(self):
- return self
-
- def __repr__(self):
- """
- Representation of this class
- """
- return u"<%s 0x%s (%s - %s)>" % (
- self.__class__.__name__,
- self.fingerprint,
- self.address,
- "priv" if self.private else "publ")
-
-
-def parse_address(address):
- """
- Remove name, '<', '>' and the identity suffix after the '+' until the '@'
- e.g.: test_user+something@provider.com becomes test_user@provider.com
- since the key belongs to the identity without the '+' suffix.
-
- :type address: str
- :rtype: str
- """
- mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
- match = re.match(mail_regex, address)
- if match is None:
- return None
- return ''.join(match.group(2, 4))
diff --git a/keymanager/src/leap/keymanager/migrator.py b/keymanager/src/leap/keymanager/migrator.py
deleted file mode 100644
index c73da2e..0000000
--- a/keymanager/src/leap/keymanager/migrator.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# -*- coding: utf-8 -*-
-# migrator.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Document migrator
-"""
-# XXX: versioning has being added 12/2015 when keymanager was not
-# much in use in the wild. We can probably drop support for
-# keys without version at some point.
-
-
-from collections import namedtuple
-from twisted.internet.defer import gatherResults, succeed
-
-from leap.keymanager import documents as doc
-from leap.keymanager.validation import ValidationLevels
-
-
-KEY_ID_KEY = 'key_id'
-
-KeyDocs = namedtuple("KeyDocs", ['key', 'active'])
-
-
-class KeyDocumentsMigrator(object):
- """
- Migrate old KeyManager Soledad Documents to the newest schema
- """
-
- def __init__(self, soledad):
- self._soledad = soledad
-
- def migrate(self):
- deferred_public = self._get_docs(private=False)
- deferred_public.addCallback(self._migrate_docs)
-
- deferred_private = self._get_docs(private=True)
- deferred_private.addCallback(self._migrate_docs)
-
- return gatherResults([deferred_public, deferred_private])
-
- def _get_docs(self, private=False):
- private_value = '1' if private else '0'
-
- deferred_keys = self._soledad.get_from_index(
- doc.TAGS_PRIVATE_INDEX,
- doc.KEYMANAGER_KEY_TAG,
- private_value)
- deferred_active = self._soledad.get_from_index(
- doc.TAGS_PRIVATE_INDEX,
- doc.KEYMANAGER_ACTIVE_TAG,
- private_value)
- return gatherResults([deferred_keys, deferred_active])
-
- def _migrate_docs(self, (key_docs, active_docs)):
- def update_keys(keys):
- deferreds = []
- for key_id in keys:
- key = keys[key_id].key
- actives = keys[key_id].active
-
- d = self._migrate_actives(key, actives)
- deferreds.append(d)
-
- d = self._migrate_key(key)
- deferreds.append(d)
- return gatherResults(deferreds)
-
- d = self._buildKeyDict(key_docs, active_docs)
- d.addCallback(lambda keydict: self._filter_outdated(keydict))
- d.addCallback(update_keys)
-
- def _buildKeyDict(self, keys, actives):
- keydict = {
- fp2id(key.content[doc.KEY_FINGERPRINT_KEY]): KeyDocs(key, [])
- for key in keys}
-
- deferreds = []
- for active in actives:
- if KEY_ID_KEY in active.content:
- key_id = active.content[KEY_ID_KEY]
- if key_id not in keydict:
- d = self._soledad.delete_doc(active)
- deferreds.append(d)
- continue
- keydict[key_id].active.append(active)
-
- d = gatherResults(deferreds)
- d.addCallback(lambda _: keydict)
- return d
-
- def _filter_outdated(self, keydict):
- outdated = {}
- for key_id, docs in keydict.items():
- if ((docs.key and doc.KEY_VERSION_KEY not in docs.key.content) or
- docs.active):
- outdated[key_id] = docs
- return outdated
-
- def _migrate_actives(self, key, actives):
- if not key:
- deferreds = []
- for active in actives:
- d = self._soledad.delete_doc(active)
- deferreds.append(d)
- return gatherResults(deferreds)
-
- validation = str(ValidationLevels.Weak_Chain)
- last_audited = 0
- encr_used = False
- sign_used = False
- fingerprint = key.content[doc.KEY_FINGERPRINT_KEY]
- if len(actives) == 1 and doc.KEY_VERSION_KEY not in key.content:
- # we can preserve the validation of the key if there is only one
- # active address for the key
- validation = key.content[doc.KEY_VALIDATION_KEY]
- last_audited = key.content[doc.KEY_LAST_AUDITED_AT_KEY]
- encr_used = key.content[doc.KEY_ENCR_USED_KEY]
- sign_used = key.content[doc.KEY_SIGN_USED_KEY]
-
- deferreds = []
- for active in actives:
- if doc.KEY_VERSION_KEY in active.content:
- continue
-
- active.content[doc.KEY_VERSION_KEY] = doc.KEYMANAGER_DOC_VERSION
- active.content[doc.KEY_FINGERPRINT_KEY] = fingerprint
- active.content[doc.KEY_VALIDATION_KEY] = validation
- active.content[doc.KEY_LAST_AUDITED_AT_KEY] = last_audited
- active.content[doc.KEY_ENCR_USED_KEY] = encr_used
- active.content[doc.KEY_SIGN_USED_KEY] = sign_used
- del active.content[KEY_ID_KEY]
- d = self._soledad.put_doc(active)
- deferreds.append(d)
- return gatherResults(deferreds)
-
- def _migrate_key(self, key):
- if not key or doc.KEY_VERSION_KEY in key.content:
- return succeed(None)
-
- key.content[doc.KEY_VERSION_KEY] = doc.KEYMANAGER_DOC_VERSION
- key.content[doc.KEY_UIDS_KEY] = key.content[doc.KEY_ADDRESS_KEY]
- del key.content[doc.KEY_ADDRESS_KEY]
- del key.content[KEY_ID_KEY]
- del key.content[doc.KEY_VALIDATION_KEY]
- del key.content[doc.KEY_LAST_AUDITED_AT_KEY]
- del key.content[doc.KEY_ENCR_USED_KEY]
- del key.content[doc.KEY_SIGN_USED_KEY]
- return self._soledad.put_doc(key)
-
-
-def fp2id(fingerprint):
- KEY_ID_LENGTH = 16
- return fingerprint[-KEY_ID_LENGTH:]
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
deleted file mode 100644
index 31c13df..0000000
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ /dev/null
@@ -1,881 +0,0 @@
-# -*- coding: utf-8 -*-
-# openpgp.py
-# Copyright (C) 2013-2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Infrastructure for using OpenPGP keys in Key Manager.
-"""
-import logging
-import os
-import re
-import tempfile
-import traceback
-import io
-
-
-from datetime import datetime
-from multiprocessing import cpu_count
-from gnupg.gnupg import GPGUtilities
-from twisted.internet import defer
-from twisted.internet.threads import deferToThread
-
-from leap.common.check import leap_assert, leap_assert_type, leap_check
-from leap.keymanager import errors
-from leap.keymanager.wrapper import TempGPGWrapper
-from leap.keymanager.keys import (
- OpenPGPKey,
- is_address,
- parse_address,
- build_key_from_dict,
-)
-from leap.keymanager.documents import (
- init_indexes,
- TAGS_PRIVATE_INDEX,
- TYPE_FINGERPRINT_PRIVATE_INDEX,
- TYPE_ADDRESS_PRIVATE_INDEX,
- KEY_UIDS_KEY,
- KEY_FINGERPRINT_KEY,
- KEY_PRIVATE_KEY,
- KEY_REFRESHED_AT_KEY,
- KEY_SIGN_USED_KEY,
- KEY_ENCR_USED_KEY,
- KEY_ADDRESS_KEY,
- KEY_TYPE_KEY,
- KEY_VERSION_KEY,
- KEYMANAGER_DOC_VERSION,
- KEYMANAGER_ACTIVE_TYPE,
- KEYMANAGER_KEY_TAG,
- KEYMANAGER_ACTIVE_TAG,
-)
-
-
-logger = logging.getLogger(__name__)
-
-
-#
-# A temporary GPG keyring wrapped to provide OpenPGP functionality.
-#
-
-# This function will be used to call blocking GPG functions outside
-# of Twisted reactor and match the concurrent calls to the amount of CPU cores
-cpu_core_semaphore = defer.DeferredSemaphore(cpu_count())
-
-
-def from_thread(func, *args, **kwargs):
- call = lambda: deferToThread(func, *args, **kwargs)
- return cpu_core_semaphore.run(call)
-
-
-#
-# The OpenPGP wrapper
-#
-
-class OpenPGPScheme(object):
- """
- A wrapper for OpenPGP keys management and use (encryption, decyption,
- signing and verification).
- """
-
- # type used on the soledad documents
- KEY_TYPE = OpenPGPKey.__name__
- ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
-
- def __init__(self, soledad, gpgbinary=None):
- """
- Initialize the OpenPGP wrapper.
-
- :param soledad: A Soledad instance for key storage.
- :type soledad: leap.soledad.Soledad
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
- """
- self._soledad = soledad
- self._gpgbinary = gpgbinary
- self.deferred_init = init_indexes(soledad)
- self.deferred_init.addCallback(self._migrate_documents_schema)
- self._wait_indexes("get_key", "put_key", "get_all_keys")
-
- def _migrate_documents_schema(self, _):
- from leap.keymanager.migrator import KeyDocumentsMigrator
- migrator = KeyDocumentsMigrator(self._soledad)
- return migrator.migrate()
-
- def _wait_indexes(self, *methods):
- """
- Methods that need to wait for the indexes to be ready.
-
- Heavily based on
- http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/
-
- :param methods: methods that need to wait for the indexes to be ready
- :type methods: tuple(str)
- """
- self.waiting = []
- self.stored = {}
-
- def restore(_):
- for method in self.stored:
- setattr(self, method, self.stored[method])
- for d in self.waiting:
- d.callback(None)
-
- def makeWrapper(method):
- def wrapper(*args, **kw):
- d = defer.Deferred()
- d.addCallback(lambda _: self.stored[method](*args, **kw))
- self.waiting.append(d)
- return d
- return wrapper
-
- for method in methods:
- self.stored[method] = getattr(self, method)
- setattr(self, method, makeWrapper(method))
-
- self.deferred_init.addCallback(restore)
-
- #
- # Keys management
- #
-
- def gen_key(self, address):
- """
- Generate an OpenPGP keypair bound to C{address}.
-
- :param address: The address bound to the key.
- :type address: str
-
- :return: A Deferred which fires with the key bound to address, or fails
- with KeyAlreadyExists if key already exists in local database.
- :rtype: Deferred
- """
- # make sure the key does not already exist
- leap_assert(is_address(address), 'Not an user address: %s' % address)
-
- @defer.inlineCallbacks
- def _gen_key(_):
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- # TODO: inspect result, or use decorator
- params = gpg.gen_key_input(
- key_type='RSA',
- key_length=4096,
- name_real=address,
- name_email=address,
- name_comment='')
- logger.info("About to generate keys... "
- "This might take SOME time.")
- yield from_thread(gpg.gen_key, params)
- logger.info("Keys for %s have been successfully "
- "generated." % (address,))
- pubkeys = gpg.list_keys()
-
- # assert for new key characteristics
- leap_assert(
- len(pubkeys) is 1, # a unitary keyring!
- 'Keyring has wrong number of keys: %d.' % len(pubkeys))
- key = gpg.list_keys(secret=True).pop()
- leap_assert(
- len(key['uids']) is 1, # with just one uid!
- 'Wrong number of uids for key: %d.' % len(key['uids']))
- uid_match = False
- for uid in key['uids']:
- if re.match('.*<%s>$' % address, uid) is not None:
- uid_match = True
- break
- leap_assert(uid_match, 'Key not correctly bound to address.')
-
- # insert both public and private keys in storage
- deferreds = []
- for secret in [True, False]:
- key = gpg.list_keys(secret=secret).pop()
- openpgp_key = self._build_key_from_gpg(
- key,
- gpg.export_keys(key['fingerprint'], secret=secret),
- address)
- d = self.put_key(openpgp_key)
- deferreds.append(d)
- yield defer.gatherResults(deferreds)
-
- def key_already_exists(_):
- raise errors.KeyAlreadyExists(address)
-
- d = self.get_key(address)
- d.addCallbacks(key_already_exists, _gen_key)
- d.addCallback(lambda _: self.get_key(address, private=True))
- return d
-
- def get_key(self, address, private=False):
- """
- Get key bound to C{address} from local storage.
-
- :param address: The address bound to the key.
- :type address: str
- :param private: Look for a private key instead of a public one?
- :type private: bool
-
- :return: A Deferred which fires with the OpenPGPKey bound to address,
- or which fails with KeyNotFound if the key was not found on
- local storage.
- :rtype: Deferred
- """
- address = parse_address(address)
-
- def build_key((keydoc, activedoc)):
- if keydoc is None:
- raise errors.KeyNotFound(address)
- leap_assert(
- address in keydoc.content[KEY_UIDS_KEY],
- 'Wrong address in key %s. Expected %s, found %s.'
- % (keydoc.content[KEY_FINGERPRINT_KEY], address,
- keydoc.content[KEY_UIDS_KEY]))
- key = build_key_from_dict(keydoc.content, activedoc.content)
- key._gpgbinary = self._gpgbinary
- return key
-
- d = self._get_key_doc(address, private)
- d.addCallback(build_key)
- return d
-
- @defer.inlineCallbacks
- def get_all_keys(self, private=False):
- """
- Return all keys stored in local database.
-
- :param private: Include private keys
- :type private: bool
-
- :return: A Deferred which fires with a list of all keys in local db.
- :rtype: Deferred
- """
- HAS_ACTIVE = "has_active"
-
- active_docs = yield self._soledad.get_from_index(
- TAGS_PRIVATE_INDEX,
- KEYMANAGER_ACTIVE_TAG,
- '1' if private else '0')
- key_docs = yield self._soledad.get_from_index(
- TAGS_PRIVATE_INDEX,
- KEYMANAGER_KEY_TAG,
- '1' if private else '0')
-
- keys = []
- fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]
- for active in active_docs:
- fp_keys = filter(lambda k: fp(k) == fp(active), key_docs)
-
- if len(fp_keys) == 0:
- yield self._soledad.delete_doc(active)
- continue
- elif len(fp_keys) == 1:
- key = fp_keys[0]
- else:
- key = yield self._repair_key_docs(fp_keys)
- key.content[HAS_ACTIVE] = True
- keys.append(build_key_from_dict(key.content, active.content))
-
- unactive_keys = filter(lambda k: HAS_ACTIVE not in k.content, key_docs)
- keys += map(lambda k: build_key_from_dict(k.content), unactive_keys)
- defer.returnValue(keys)
-
- def parse_key(self, key_data, address=None):
- """
- Parses a key (or key pair) data and returns
- the OpenPGPKey keys.
-
- :param key_data: the key data to be parsed.
- :type key_data: str or unicode
- :param address: Active address for the key.
- :type address: str
-
- :returns: the public key and private key (if applies) for that data.
- :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
- the tuple may have one or both components None
- """
- leap_assert_type(key_data, (str, unicode))
- # TODO: add more checks for correct key data.
- leap_assert(key_data is not None, 'Data does not represent a key.')
-
- priv_info, privkey = process_key(
- key_data, self._gpgbinary, secret=True)
- pub_info, pubkey = process_key(
- key_data, self._gpgbinary, secret=False)
-
- if not pubkey:
- return (None, None)
-
- openpgp_privkey = None
- if privkey:
- # build private key
- openpgp_privkey = self._build_key_from_gpg(priv_info, privkey,
- address)
- leap_check(pub_info['fingerprint'] == priv_info['fingerprint'],
- 'Fingerprints for public and private key differ.',
- errors.KeyFingerprintMismatch)
- # build public key
- openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey, address)
-
- return (openpgp_pubkey, openpgp_privkey)
-
- def put_raw_key(self, key_data, address):
- """
- Put key contained in C{key_data} in local storage.
-
- :param key_data: The key data to be stored.
- :type key_data: str or unicode
- :param address: address for which this key will be active
- :type address: str
-
- :return: A Deferred which fires when the OpenPGPKey is in the storage.
- :rtype: Deferred
- """
- leap_assert_type(key_data, (str, unicode))
-
- openpgp_privkey = None
- try:
- openpgp_pubkey, openpgp_privkey = self.parse_key(
- key_data, address)
- except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
- return defer.fail(e)
-
- def put_key(_, key):
- return self.put_key(key)
-
- d = defer.succeed(None)
- if openpgp_pubkey is not None:
- d.addCallback(put_key, openpgp_pubkey)
- if openpgp_privkey is not None:
- d.addCallback(put_key, openpgp_privkey)
- return d
-
- def put_key(self, key):
- """
- Put C{key} in local storage.
-
- :param key: The key to be stored.
- :type key: OpenPGPKey
-
- :return: A Deferred which fires when the key is in the storage.
- :rtype: Deferred
- """
- def merge_and_put((keydoc, activedoc)):
- if not keydoc:
- return put_new_key(activedoc)
-
- active_content = None
- if activedoc:
- active_content = activedoc.content
- oldkey = build_key_from_dict(keydoc.content, active_content)
-
- key.merge(oldkey)
- keydoc.set_json(key.get_json())
- d = self._soledad.put_doc(keydoc)
- d.addCallback(put_active, activedoc)
- return d
-
- def put_new_key(activedoc):
- deferreds = []
- if activedoc:
- d = self._soledad.delete_doc(activedoc)
- deferreds.append(d)
- for json in [key.get_json(), key.get_active_json()]:
- d = self._soledad.create_doc_from_json(json)
- deferreds.append(d)
- return defer.gatherResults(deferreds)
-
- def put_active(_, activedoc):
- active_json = key.get_active_json()
- if activedoc:
- activedoc.set_json(active_json)
- d = self._soledad.put_doc(activedoc)
- else:
- d = self._soledad.create_doc_from_json(active_json)
- return d
-
- def get_active_doc(keydoc):
- d = self._get_active_doc_from_address(key.address, key.private)
- d.addCallback(lambda activedoc: (keydoc, activedoc))
- return d
-
- d = self._get_key_doc_from_fingerprint(key.fingerprint, key.private)
- d.addCallback(get_active_doc)
- d.addCallback(merge_and_put)
- return d
-
- def _get_key_doc(self, address, private=False):
- """
- Get the document with a key (public, by default) bound to C{address}.
-
- If C{private} is True, looks for a private key instead of a public.
-
- :param address: The address bound to the key.
- :type address: str
- :param private: Whether to look for a private key.
- :type private: bool
-
- :return: A Deferred which fires with a touple of two SoledadDocument
- (keydoc, activedoc) or None if it does not exist.
- :rtype: Deferred
- """
- def get_key_from_active_doc(activedoc):
- if not activedoc:
- return (None, None)
- fingerprint = activedoc.content[KEY_FINGERPRINT_KEY]
- d = self._get_key_doc_from_fingerprint(fingerprint, private)
- d.addCallback(delete_active_if_no_key, activedoc)
- return d
-
- def delete_active_if_no_key(keydoc, activedoc):
- if not keydoc:
- d = self._soledad.delete_doc(activedoc)
- d.addCallback(lambda _: (None, None))
- return d
- return (keydoc, activedoc)
-
- d = self._get_active_doc_from_address(address, private)
- d.addCallback(get_key_from_active_doc)
- return d
-
- def _build_key_from_gpg(self, key, key_data, address=None):
- """
- Build an OpenPGPKey for C{address} based on C{key} from
- local gpg storage.
-
- GPG key data has to be queried independently in this
- wrapper, so we receive it in C{key_data}.
-
- :param address: Active address for the key.
- :type address: str
- :param key: Key obtained from GPG storage.
- :type key: dict
- :param key_data: Key data obtained from GPG storage.
- :type key_data: str
- :return: An instance of the key.
- :rtype: OpenPGPKey
- """
- return build_gpg_key(key, key_data, address, self._gpgbinary)
-
- def delete_key(self, key):
- """
- Remove C{key} from storage.
-
- :param key: The key to be removed.
- :type key: EncryptionKey
-
- :return: A Deferred which fires when the key is deleted, or which
- fails with KeyNotFound if the key was not found on local
- storage.
- :rtype: Deferred
- """
- leap_assert_type(key, OpenPGPKey)
-
- def delete_docs(activedocs):
- deferreds = []
- for doc in activedocs:
- d = self._soledad.delete_doc(doc)
- deferreds.append(d)
- return defer.gatherResults(deferreds)
-
- def get_key_docs(_):
- return self._soledad.get_from_index(
- TYPE_FINGERPRINT_PRIVATE_INDEX,
- self.KEY_TYPE,
- key.fingerprint,
- '1' if key.private else '0')
-
- def delete_key(docs):
- if len(docs) == 0:
- raise errors.KeyNotFound(key)
- elif len(docs) > 1:
- logger.warning("There is more than one key for fingerprint %s"
- % key.fingerprint)
-
- has_deleted = False
- deferreds = []
- for doc in docs:
- if doc.content['fingerprint'] == key.fingerprint:
- d = self._soledad.delete_doc(doc)
- deferreds.append(d)
- has_deleted = True
- if not has_deleted:
- raise errors.KeyNotFound(key)
- return defer.gatherResults(deferreds)
-
- d = self._soledad.get_from_index(
- TYPE_FINGERPRINT_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- key.fingerprint,
- '1' if key.private else '0')
- d.addCallback(delete_docs)
- d.addCallback(get_key_docs)
- d.addCallback(delete_key)
- return d
-
- #
- # Data encryption, decryption, signing and verifying
- #
-
- @staticmethod
- def _assert_gpg_result_ok(result):
- """
- Check if GPG result is 'ok' and log stderr outputs.
-
- :param result: GPG results, which have a field calld 'ok' that states
- whether the gpg operation was successful or not.
- :type result: object
-
- :raise GPGError: Raised when the gpg operation was not successful.
- """
- stderr = getattr(result, 'stderr', None)
- if stderr:
- logger.debug("%s" % (stderr,))
- if getattr(result, 'ok', None) is not True:
- raise errors.GPGError(
- 'Failed to encrypt/decrypt: %s' % stderr)
-
- @defer.inlineCallbacks
- def encrypt(self, data, pubkey, passphrase=None, sign=None,
- cipher_algo='AES256'):
- """
- Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
-
- :param data: The data to be encrypted.
- :type data: str
- :param pubkey: The key used to encrypt.
- :type pubkey: OpenPGPKey
- :param sign: The key used for signing.
- :type sign: OpenPGPKey
- :param cipher_algo: The cipher algorithm to use.
- :type cipher_algo: str
-
- :return: A Deferred that will be fired with the encrypted data.
- :rtype: defer.Deferred
-
- :raise EncryptError: Raised if failed encrypting for some reason.
- """
- leap_assert_type(pubkey, OpenPGPKey)
- leap_assert(pubkey.private is False, 'Key is not public.')
- keys = [pubkey]
- if sign is not None:
- leap_assert_type(sign, OpenPGPKey)
- leap_assert(sign.private is True)
- keys.append(sign)
- with TempGPGWrapper(keys, self._gpgbinary) as gpg:
- result = yield from_thread(
- gpg.encrypt,
- data, pubkey.fingerprint,
- default_key=sign.fingerprint if sign else None,
- passphrase=passphrase, symmetric=False,
- cipher_algo=cipher_algo)
- # Here we cannot assert for correctness of sig because the sig is
- # in the ciphertext.
- # result.ok - (bool) indicates if the operation succeeded
- # result.data - (bool) contains the result of the operation
- try:
- self._assert_gpg_result_ok(result)
- defer.returnValue(result.data)
- except errors.GPGError as e:
- logger.warning('Failed to encrypt: %s.' % str(e))
- raise errors.EncryptError()
-
- @defer.inlineCallbacks
- def decrypt(self, data, privkey, passphrase=None, verify=None):
- """
- Decrypt C{data} using private @{privkey} and verify with C{verify} key.
-
- :param data: The data to be decrypted.
- :type data: str
- :param privkey: The key used to decrypt.
- :type privkey: OpenPGPKey
- :param passphrase: The passphrase for the secret key used for
- decryption.
- :type passphrase: str
- :param verify: The key used to verify a signature.
- :type verify: OpenPGPKey
-
- :return: Deferred that will fire with the decrypted data and
- if signature verifies (unicode, bool)
- :rtype: Deferred
-
- :raise DecryptError: Raised if failed decrypting for some reason.
- """
- leap_assert(privkey.private is True, 'Key is not private.')
- keys = [privkey]
- if verify is not None:
- leap_assert_type(verify, OpenPGPKey)
- leap_assert(verify.private is False)
- keys.append(verify)
- with TempGPGWrapper(keys, self._gpgbinary) as gpg:
- try:
- result = yield from_thread(gpg.decrypt,
- data, passphrase=passphrase,
- always_trust=True)
- self._assert_gpg_result_ok(result)
-
- # verify signature
- sign_valid = False
- if (verify is not None and
- result.valid is True and
- verify.fingerprint == result.pubkey_fingerprint):
- sign_valid = True
-
- defer.returnValue((result.data, sign_valid))
- except errors.GPGError as e:
- logger.warning('Failed to decrypt: %s.' % str(e))
- raise errors.DecryptError(str(e))
-
- def is_encrypted(self, data):
- """
- Return whether C{data} was asymmetrically encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- gpgutil = GPGUtilities(gpg)
- return gpgutil.is_encrypted_asym(data)
-
- def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
- detach=True, binary=False):
- """
- Sign C{data} with C{privkey}.
-
- :param data: The data to be signed.
- :type data: str
-
- :param privkey: The private key to be used to sign.
- :type privkey: OpenPGPKey
- :param digest_algo: The hash digest to use.
- :type digest_algo: str
- :param clearsign: If True, create a cleartext signature.
- :type clearsign: bool
- :param detach: If True, create a detached signature.
- :type detach: bool
- :param binary: If True, do not ascii armour the output.
- :type binary: bool
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(privkey, OpenPGPKey)
- leap_assert(privkey.private is True)
-
- # result.fingerprint - contains the fingerprint of the key used to
- # sign.
- with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
- result = gpg.sign(data, default_key=privkey.fingerprint,
- digest_algo=digest_algo, clearsign=clearsign,
- detach=detach, binary=binary)
- rfprint = privkey.fingerprint
- privkey = gpg.list_keys(secret=True).pop()
- kfprint = privkey['fingerprint']
- if result.fingerprint is None:
- raise errors.SignFailed(
- 'Failed to sign with key %s: %s' %
- (privkey['fingerprint'], result.stderr))
- leap_assert(
- result.fingerprint == kfprint,
- 'Signature and private key fingerprints mismatch: '
- '%s != %s' % (rfprint, kfprint))
- return result.data
-
- def verify(self, data, pubkey, detached_sig=None):
- """
- Verify signed C{data} with C{pubkey}, eventually using
- C{detached_sig}.
-
- :param data: The data to be verified.
- :type data: str
- :param pubkey: The public key to be used on verification.
- :type pubkey: OpenPGPKey
- :param detached_sig: A detached signature. If given, C{data} is
- verified against this detached signature.
- :type detached_sig: str
-
- :return: signature matches
- :rtype: bool
- """
- leap_assert_type(pubkey, OpenPGPKey)
- leap_assert(pubkey.private is False)
- with TempGPGWrapper(pubkey, self._gpgbinary) as gpg:
- result = None
- if detached_sig is None:
- result = gpg.verify(data)
- else:
- # to verify using a detached sig we have to use
- # gpg.verify_file(), which receives the data as a binary
- # stream and the name of a file containing the signature.
- sf, sfname = tempfile.mkstemp()
- with os.fdopen(sf, 'w') as sfd:
- sfd.write(detached_sig)
- result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
- os.unlink(sfname)
- gpgpubkey = gpg.list_keys().pop()
- valid = result.valid
- rfprint = result.fingerprint
- kfprint = gpgpubkey['fingerprint']
- return valid and rfprint == kfprint
-
- def _get_active_doc_from_address(self, address, private):
- d = self._soledad.get_from_index(
- TYPE_ADDRESS_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- address,
- '1' if private else '0')
- d.addCallback(self._repair_and_get_doc, self._repair_active_docs)
- d.addCallback(self._check_version)
- return d
-
- def _get_key_doc_from_fingerprint(self, fingerprint, private):
- d = self._soledad.get_from_index(
- TYPE_FINGERPRINT_PRIVATE_INDEX,
- self.KEY_TYPE,
- fingerprint,
- '1' if private else '0')
- d.addCallback(self._repair_and_get_doc, self._repair_key_docs)
- d.addCallback(self._check_version)
- return d
-
- def _repair_and_get_doc(self, doclist, repair_func):
- if len(doclist) is 0:
- return None
- elif len(doclist) > 1:
- return repair_func(doclist)
- return doclist[0]
-
- def _check_version(self, doc):
- if doc is not None:
- version = doc.content[KEY_VERSION_KEY]
- if version > KEYMANAGER_DOC_VERSION:
- raise errors.KeyVersionError(str(version))
- return doc
-
- def _repair_key_docs(self, doclist):
- """
- If there is more than one key for a key id try to self-repair it
-
- :return: a Deferred that will be fired with the valid key doc once all
- the deletions are completed
- :rtype: Deferred
- """
- def log_key_doc(doc):
- logger.error("\t%s: %s" % (doc.content[KEY_UIDS_KEY],
- doc.content[KEY_FINGERPRINT_KEY]))
-
- def cmp_key(d1, d2):
- return cmp(d1.content[KEY_REFRESHED_AT_KEY],
- d2.content[KEY_REFRESHED_AT_KEY])
-
- return self._repair_docs(doclist, cmp_key, log_key_doc)
-
- @defer.inlineCallbacks
- def _repair_active_docs(self, doclist):
- """
- If there is more than one active doc for an address try to self-repair
- it
-
- :return: a Deferred that will be fired with the valid active doc once
- all the deletions are completed
- :rtype: Deferred
- """
- keys = {}
- for doc in doclist:
- fp = doc.content[KEY_FINGERPRINT_KEY]
- private = doc.content[KEY_PRIVATE_KEY]
- try:
- key = yield self._get_key_doc_from_fingerprint(fp, private)
- keys[fp] = key
- except Exception:
- pass
-
- def log_active_doc(doc):
- logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
- doc.content[KEY_FINGERPRINT_KEY]))
-
- def cmp_active(d1, d2):
- # XXX: for private keys it will be nice to check which key is known
- # by the nicknym server and keep this one. But this needs a
- # refactor that might not be worth it.
- used1 = (d1.content[KEY_SIGN_USED_KEY] +
- d1.content[KEY_ENCR_USED_KEY])
- used2 = (d2.content[KEY_SIGN_USED_KEY] +
- d2.content[KEY_ENCR_USED_KEY])
- res = cmp(used1, used2)
- if res != 0:
- return res
-
- key1 = keys[d1.content[KEY_FINGERPRINT_KEY]]
- key2 = keys[d2.content[KEY_FINGERPRINT_KEY]]
- return cmp(key1.content[KEY_REFRESHED_AT_KEY],
- key2.content[KEY_REFRESHED_AT_KEY])
-
- doc = yield self._repair_docs(doclist, cmp_active, log_active_doc)
- defer.returnValue(doc)
-
- def _repair_docs(self, doclist, cmp_func, log_func):
- logger.error("BUG ---------------------------------------------------")
- logger.error("There is more than one doc of type %s:"
- % (doclist[0].content[KEY_TYPE_KEY],))
-
- doclist.sort(cmp=cmp_func, reverse=True)
- log_func(doclist[0])
- deferreds = []
- for doc in doclist[1:]:
- log_func(doc)
- d = self._soledad.delete_doc(doc)
- deferreds.append(d)
-
- logger.error("")
- logger.error(traceback.extract_stack())
- logger.error("BUG (please report above info) ------------------------")
- d = defer.gatherResults(deferreds, consumeErrors=True)
- d.addCallback(lambda _: doclist[0])
- return d
-
-
-def process_key(key_data, gpgbinary, secret=False):
- with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
- try:
- gpg.import_keys(key_data)
- info = gpg.list_keys(secret=secret).pop()
- key = gpg.export_keys(info['fingerprint'], secret=secret)
- except IndexError:
- info = {}
- key = None
- return info, key
-
-
-def build_gpg_key(key_info, key_data, address=None, gpgbinary=None):
- expiry_date = None
- if key_info['expires']:
- expiry_date = datetime.fromtimestamp(int(key_info['expires']))
- uids = []
- for uid in key_info['uids']:
- uids.append(parse_address(uid))
- if address and address not in uids:
- raise errors.KeyAddressMismatch("UIDs %s found, but expected %s"
- % (str(uids), address))
-
- return OpenPGPKey(
- address=address,
- uids=uids,
- gpgbinary=gpgbinary,
- fingerprint=key_info['fingerprint'],
- key_data=key_data,
- private=True if key_info['type'] == 'sec' else False,
- length=int(key_info['length']),
- expiry_date=expiry_date,
- refreshed_at=datetime.now())
diff --git a/keymanager/src/leap/keymanager/validation.py b/keymanager/src/leap/keymanager/validation.py
deleted file mode 100644
index 16a897e..0000000
--- a/keymanager/src/leap/keymanager/validation.py
+++ /dev/null
@@ -1,129 +0,0 @@
-# -*- coding: utf-8 -*-
-# __init__.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Validation levels implementation for key managment.
-
-See:
- https://leap.se/en/docs/design/transitional-key-validation
-"""
-
-
-from datetime import datetime
-
-
-class ValidationLevel(object):
- """
- A validation level
-
- Meant to be used to compare levels or get its string representation.
- """
- def __init__(self, name, value):
- self.name = name
- self.value = value
-
- def __cmp__(self, other):
- return cmp(self.value, other.value)
-
- def __str__(self):
- return self.name
-
- def __repr__(self):
- return "<ValidationLevel: %s (%d)>" % (self.name, self.value)
-
-
-class _ValidationLevels(object):
- """
- Handler class to manage validation levels. It should have only one global
- instance 'ValidationLevels'.
-
- The levels are attributes of the instance and can be used like:
- ValidationLevels.Weak_Chain
- ValidationLevels.get("Weak_Chain")
- """
- _level_names = ("Weak_Chain",
- "Provider_Trust",
- "Provider_Endorsement",
- "Third_Party_Endorsement",
- "Third_Party_Consensus",
- "Historically_Auditing",
- "Known_Key",
- "Fingerprint")
-
- def __init__(self):
- for name in self._level_names:
- setattr(self, name,
- ValidationLevel(name, self._level_names.index(name)))
-
- def get(self, name):
- """
- Get the ValidationLevel of a name
-
- :param name: name of the level
- :type name: str
- :rtype: ValidationLevel
- """
- return getattr(self, name)
-
- def __iter__(self):
- return iter(self._level_names)
-
-
-ValidationLevels = _ValidationLevels()
-
-
-def can_upgrade(new_key, old_key):
- """
- :type new_key: EncryptionKey
- :type old_key: EncryptionKey
- :rtype: bool
- """
- # First contact
- if old_key is None:
- return True
-
- # An update of the same key
- if new_key.fingerprint == old_key.fingerprint:
- return True
-
- # Manually verified fingerprint
- if new_key.validation == ValidationLevels.Fingerprint:
- return True
-
- # Expired key and higher validation level
- if (old_key.expiry_date is not None and
- old_key.expiry_date < datetime.now() and
- new_key.validation >= old_key.validation):
- return True
-
- # No expiration date and higher validation level
- if (old_key.expiry_date is None and
- new_key.validation > old_key.validation):
- return True
-
- # Not successfully used and strict high validation level
- if (not (old_key.sign_used and old_key.encr_used) and
- new_key.validation > old_key.validation):
- return True
-
- # New key signed by the old key
- # XXX: signatures are using key-ids instead of fingerprints
- key_id = old_key.fingerprint[-16:]
- if key_id in new_key.signatures:
- return True
-
- return False
diff --git a/keymanager/src/leap/keymanager/wrapper.py b/keymanager/src/leap/keymanager/wrapper.py
deleted file mode 100644
index 4f36cec..0000000
--- a/keymanager/src/leap/keymanager/wrapper.py
+++ /dev/null
@@ -1,134 +0,0 @@
-# -*- coding: utf-8 -*-
-# wrapper.py
-# Copyright (C) 2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-GPG wrapper for temporary keyrings
-"""
-import os
-import shutil
-import tempfile
-from gnupg import GPG
-
-from leap.common.check import leap_assert
-
-
-class TempGPGWrapper(object):
- """
- A context manager that wraps a temporary GPG keyring which only contains
- the keys given at object creation.
- """
-
- def __init__(self, keys=None, gpgbinary=None):
- """
- Create an empty temporary keyring and import any given C{keys} into
- it.
-
- :param keys: OpenPGP key, or list of.
- :type keys: OpenPGPKey or list of OpenPGPKeys
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
- """
- self._gpg = None
- self._gpgbinary = gpgbinary
- if not keys:
- keys = list()
- if not isinstance(keys, list):
- keys = [keys]
- self._keys = keys
-
- def __enter__(self):
- """
- Build and return a GPG keyring containing the keys given on
- object creation.
-
- :return: A GPG instance containing the keys given on object creation.
- :rtype: gnupg.GPG
- """
- self._build_keyring()
- return self._gpg
-
- def __exit__(self, exc_type, exc_value, traceback):
- """
- Ensure the gpg is properly destroyed.
- """
- # TODO handle exceptions and log here
- self._destroy_keyring()
-
- def _build_keyring(self):
- """
- Create a GPG keyring containing the keys given on object creation.
-
- :return: A GPG instance containing the keys given on object creation.
- :rtype: gnupg.GPG
- """
- privkeys = [key for key in self._keys if key and key.private is True]
- publkeys = [key for key in self._keys if key and key.private is False]
- # here we filter out public keys that have a correspondent
- # private key in the list because the private key_data by
- # itself is enough to also have the public key in the keyring,
- # and we want to count the keys afterwards.
-
- privfps = map(lambda privkey: privkey.fingerprint, privkeys)
- publkeys = filter(
- lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
-
- listkeys = lambda: self._gpg.list_keys()
- listsecretkeys = lambda: self._gpg.list_keys(secret=True)
-
- self._gpg = GPG(binary=self._gpgbinary,
- homedir=tempfile.mkdtemp())
- leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
-
- # import keys into the keyring:
- # concatenating ascii-armored keys, which is correctly
- # understood by GPG.
-
- self._gpg.import_keys("".join(
- [x.key_data for x in publkeys + privkeys]))
-
- # assert the number of keys in the keyring
- leap_assert(
- len(listkeys()) == len(publkeys) + len(privkeys),
- 'Wrong number of public keys in keyring: %d, should be %d)' %
- (len(listkeys()), len(publkeys) + len(privkeys)))
- leap_assert(
- len(listsecretkeys()) == len(privkeys),
- 'Wrong number of private keys in keyring: %d, should be %d)' %
- (len(listsecretkeys()), len(privkeys)))
-
- def _destroy_keyring(self):
- """
- Securely erase the keyring.
- """
- # TODO: implement some kind of wiping of data or a more
- # secure way that
- # does not write to disk.
-
- try:
- for secret in [True, False]:
- for key in self._gpg.list_keys(secret=secret):
- self._gpg.delete_keys(
- key['fingerprint'],
- secret=secret)
- leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
-
- except:
- raise
-
- finally:
- leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
- "watch out! Tried to remove default gnupg home!")
- shutil.rmtree(self._gpg.homedir)