From e7583ac12e0d594bb0be648b62a9d58eff3bf631 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Sun, 1 May 2016 14:27:51 -0300 Subject: [refactor] remove EncryptionKey --- src/leap/keymanager/keys.py | 88 ++++++++++++++++++++++++++----- src/leap/keymanager/openpgp.py | 84 ++--------------------------- src/leap/keymanager/tests/test_openpgp.py | 2 +- 3 files changed, 79 insertions(+), 95 deletions(-) diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py index f2a12a9..1d03945 100644 --- a/src/leap/keymanager/keys.py +++ b/src/leap/keymanager/keys.py @@ -30,11 +30,12 @@ import re import time -from abc import ABCMeta from datetime import datetime from leap.common.check import leap_assert from twisted.internet import defer +from leap.keymanager import errors +from leap.keymanager.wrapper import TempGPGWrapper from leap.keymanager.validation import ValidationLevels logger = logging.getLogger(__name__) @@ -177,29 +178,21 @@ def _to_unix_time(date): return 0 -# -# Abstraction for encryption keys -# - -class EncryptionKey(object): +class OpenPGPKey(object): """ - Abstract class for encryption keys. - - A key is "validated" if the nicknym agent has bound the user address to a - public key. + Base class for OpenPGP keys. """ - __metaclass__ = ABCMeta - __slots__ = ('address', 'uids', 'fingerprint', 'key_data', 'private', 'length', 'expiry_date', 'validation', 'last_audited_at', 'refreshed_at', - 'encr_used', 'sign_used', '_index') + 'encr_used', 'sign_used', '_index', '_gpgbinary') - def __init__(self, address=None, uids=[], fingerprint="", + def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="", key_data="", private=False, length=0, expiry_date=None, validation=ValidationLevels.Weak_Chain, last_audited_at=None, refreshed_at=None, encr_used=False, sign_used=False): + self._gpgbinary = gpgbinary self.address = address if not uids and address: self.uids = [address] @@ -218,6 +211,57 @@ class EncryptionKey(object): self.sign_used = sign_used self._index = len(self.__slots__) + @property + def signatures(self): + """ + Get the key signatures + + :return: the key IDs that have signed the key + :rtype: list(str) + """ + with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: + res = gpg.list_sigs(self.fingerprint) + for uid, sigs in res.sigs.iteritems(): + if parse_address(uid) in self.uids: + return sigs + + return [] + + def merge(self, newkey): + if newkey.fingerprint != self.fingerprint: + logger.critical( + "Can't put a key whith the same key_id and different " + "fingerprint: %s, %s" + % (newkey.fingerprint, self.fingerprint)) + raise errors.KeyFingerprintMismatch(newkey.fingerprint) + + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: + gpg.import_keys(self.key_data) + gpg.import_keys(newkey.key_data) + gpgkey = gpg.list_keys(secret=newkey.private).pop() + + if gpgkey['expires']: + self.expiry_date = datetime.fromtimestamp( + int(gpgkey['expires'])) + else: + self.expiry_date = None + + self.uids = [] + for uid in gpgkey['uids']: + self.uids.append(parse_address(uid)) + + self.length = int(gpgkey['length']) + self.key_data = gpg.export_keys(gpgkey['fingerprint'], + secret=self.private) + + if newkey.validation > self.validation: + self.validation = newkey.validation + if newkey.last_audited_at > self.last_audited_at: + self.validation = newkey.last_audited_at + self.encr_used = newkey.encr_used or self.encr_used + self.sign_used = newkey.sign_used or self.sign_used + self.refreshed_at = datetime.now() + def get_json(self): """ Return a JSON string describing this key. @@ -295,6 +339,22 @@ class EncryptionKey(object): "priv" if self.private else "publ") +def parse_address(address): + """ + Remove name, '<', '>' and the identity suffix after the '+' until the '@' + e.g.: test_user+something@provider.com becomes test_user@provider.com + since the key belongs to the identity without the '+' suffix. + + :type address: str + :rtype: str + """ + mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' + match = re.match(mail_regex, address) + if match is None: + return None + return ''.join(match.group(2, 4)) + + def init_indexes(soledad): """ Initialize the database indexes. diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 301c46c..f8cc976 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -35,9 +35,10 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check from leap.keymanager import errors from leap.keymanager.wrapper import TempGPGWrapper from leap.keymanager.keys import ( - EncryptionKey, + OpenPGPKey, init_indexes, is_address, + parse_address, build_key_from_dict, TYPE_FINGERPRINT_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, @@ -70,87 +71,10 @@ def from_thread(func, *args, **kwargs): return cpu_core_semaphore.run(call) -def _parse_address(address): - """ - Remove name, '<', '>' and the identity suffix after the '+' until the '@' - e.g.: test_user+something@provider.com becomes test_user@provider.com - since the key belongs to the identity without the '+' suffix. - - :type address: str - :rtype: str - """ - mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' - match = re.match(mail_regex, address) - if match is None: - return None - return ''.join(match.group(2, 4)) - - # # The OpenPGP wrapper # -class OpenPGPKey(EncryptionKey): - """ - Base class for OpenPGP keys. - """ - - def __init__(self, address=None, gpgbinary=None, **kwargs): - self._gpgbinary = gpgbinary - super(OpenPGPKey, self).__init__(address, **kwargs) - - @property - def signatures(self): - """ - Get the key signatures - - :return: the key IDs that have signed the key - :rtype: list(str) - """ - with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: - res = gpg.list_sigs(self.fingerprint) - for uid, sigs in res.sigs.iteritems(): - if _parse_address(uid) in self.uids: - return sigs - - return [] - - def merge(self, newkey): - if newkey.fingerprint != self.fingerprint: - logger.critical( - "Can't put a key whith the same key_id and different " - "fingerprint: %s, %s" - % (newkey.fingerprint, self.fingerprint)) - raise errors.KeyFingerprintMismatch(newkey.fingerprint) - - with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: - gpg.import_keys(self.key_data) - gpg.import_keys(newkey.key_data) - gpgkey = gpg.list_keys(secret=newkey.private).pop() - - if gpgkey['expires']: - self.expiry_date = datetime.fromtimestamp( - int(gpgkey['expires'])) - else: - self.expiry_date = None - - self.uids = [] - for uid in gpgkey['uids']: - self.uids.append(_parse_address(uid)) - - self.length = int(gpgkey['length']) - self.key_data = gpg.export_keys(gpgkey['fingerprint'], - secret=self.private) - - if newkey.validation > self.validation: - self.validation = newkey.validation - if newkey.last_audited_at > self.last_audited_at: - self.validation = newkey.last_audited_at - self.encr_used = newkey.encr_used or self.encr_used - self.sign_used = newkey.sign_used or self.sign_used - self.refreshed_at = datetime.now() - - class OpenPGPScheme(object): """ A wrapper for OpenPGP keys management and use (encryption, decyption, @@ -298,7 +222,7 @@ class OpenPGPScheme(object): local storage. :rtype: Deferred """ - address = _parse_address(address) + address = parse_address(address) def build_key((keydoc, activedoc)): if keydoc is None: @@ -868,7 +792,7 @@ def build_gpg_key(key_info, key_data, address=None, gpgbinary=None): expiry_date = datetime.fromtimestamp(int(key_info['expires'])) uids = [] for uid in key_info['uids']: - uids.append(_parse_address(uid)) + uids.append(parse_address(uid)) if address and address not in uids: raise errors.KeyAddressMismatch("UIDs %s found, but expected %s" % (str(uids), address)) diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py index acb2c1c..0e39dab 100644 --- a/src/leap/keymanager/tests/test_openpgp.py +++ b/src/leap/keymanager/tests/test_openpgp.py @@ -33,7 +33,7 @@ from leap.keymanager.keys import ( TYPE_FINGERPRINT_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, ) -from leap.keymanager.openpgp import OpenPGPKey +from leap.keymanager.keys import OpenPGPKey from leap.keymanager.tests import ( KeyManagerWithSoledadTestCase, ADDRESS, -- cgit v1.2.3