diff options
| -rw-r--r-- | keymanager/src/leap/keymanager/keys.py | 88 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 84 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_openpgp.py | 2 | 
3 files changed, 79 insertions, 95 deletions
| diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index f2a12a9..1d03945 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -30,11 +30,12 @@ import re  import time -from abc import ABCMeta  from datetime import datetime  from leap.common.check import leap_assert  from twisted.internet import defer +from leap.keymanager import errors +from leap.keymanager.wrapper import TempGPGWrapper  from leap.keymanager.validation import ValidationLevels  logger = logging.getLogger(__name__) @@ -177,29 +178,21 @@ def _to_unix_time(date):          return 0 -# -# Abstraction for encryption keys -# - -class EncryptionKey(object): +class OpenPGPKey(object):      """ -    Abstract class for encryption keys. - -    A key is "validated" if the nicknym agent has bound the user address to a -    public key. +    Base class for OpenPGP keys.      """ -    __metaclass__ = ABCMeta -      __slots__ = ('address', 'uids', 'fingerprint', 'key_data',                   'private', 'length', 'expiry_date', 'validation',                   'last_audited_at', 'refreshed_at', -                 'encr_used', 'sign_used', '_index') +                 'encr_used', 'sign_used', '_index', '_gpgbinary') -    def __init__(self, address=None, uids=[], fingerprint="", +    def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="",                   key_data="", private=False, length=0, expiry_date=None,                   validation=ValidationLevels.Weak_Chain, last_audited_at=None,                   refreshed_at=None, encr_used=False, sign_used=False): +        self._gpgbinary = gpgbinary          self.address = address          if not uids and address:              self.uids = [address] @@ -218,6 +211,57 @@ class EncryptionKey(object):          self.sign_used = sign_used          self._index = len(self.__slots__) +    @property +    def signatures(self): +        """ +        Get the key signatures + +        :return: the key IDs that have signed the key +        :rtype: list(str) +        """ +        with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: +            res = gpg.list_sigs(self.fingerprint) +            for uid, sigs in res.sigs.iteritems(): +                if parse_address(uid) in self.uids: +                    return sigs + +        return [] + +    def merge(self, newkey): +        if newkey.fingerprint != self.fingerprint: +            logger.critical( +                "Can't put a key whith the same key_id and different " +                "fingerprint: %s, %s" +                % (newkey.fingerprint, self.fingerprint)) +            raise errors.KeyFingerprintMismatch(newkey.fingerprint) + +        with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: +            gpg.import_keys(self.key_data) +            gpg.import_keys(newkey.key_data) +            gpgkey = gpg.list_keys(secret=newkey.private).pop() + +            if gpgkey['expires']: +                self.expiry_date = datetime.fromtimestamp( +                    int(gpgkey['expires'])) +            else: +                self.expiry_date = None + +            self.uids = [] +            for uid in gpgkey['uids']: +                self.uids.append(parse_address(uid)) + +            self.length = int(gpgkey['length']) +            self.key_data = gpg.export_keys(gpgkey['fingerprint'], +                                            secret=self.private) + +        if newkey.validation > self.validation: +            self.validation = newkey.validation +        if newkey.last_audited_at > self.last_audited_at: +            self.validation = newkey.last_audited_at +        self.encr_used = newkey.encr_used or self.encr_used +        self.sign_used = newkey.sign_used or self.sign_used +        self.refreshed_at = datetime.now() +      def get_json(self):          """          Return a JSON string describing this key. @@ -295,6 +339,22 @@ class EncryptionKey(object):              "priv" if self.private else "publ") +def parse_address(address): +    """ +    Remove name, '<', '>' and the identity suffix after the '+' until the '@' +    e.g.: test_user+something@provider.com becomes test_user@provider.com +    since the key belongs to the identity without the '+' suffix. + +    :type address: str +    :rtype: str +    """ +    mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' +    match = re.match(mail_regex, address) +    if match is None: +        return None +    return ''.join(match.group(2, 4)) + +  def init_indexes(soledad):      """      Initialize the database indexes. diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 301c46c..f8cc976 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -35,9 +35,10 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check  from leap.keymanager import errors  from leap.keymanager.wrapper import TempGPGWrapper  from leap.keymanager.keys import ( -    EncryptionKey, +    OpenPGPKey,      init_indexes,      is_address, +    parse_address,      build_key_from_dict,      TYPE_FINGERPRINT_PRIVATE_INDEX,      TYPE_ADDRESS_PRIVATE_INDEX, @@ -70,87 +71,10 @@ def from_thread(func, *args, **kwargs):      return cpu_core_semaphore.run(call) -def _parse_address(address): -    """ -    Remove name, '<', '>' and the identity suffix after the '+' until the '@' -    e.g.: test_user+something@provider.com becomes test_user@provider.com -    since the key belongs to the identity without the '+' suffix. - -    :type address: str -    :rtype: str -    """ -    mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' -    match = re.match(mail_regex, address) -    if match is None: -        return None -    return ''.join(match.group(2, 4)) - -  #  # The OpenPGP wrapper  # -class OpenPGPKey(EncryptionKey): -    """ -    Base class for OpenPGP keys. -    """ - -    def __init__(self, address=None, gpgbinary=None, **kwargs): -        self._gpgbinary = gpgbinary -        super(OpenPGPKey, self).__init__(address, **kwargs) - -    @property -    def signatures(self): -        """ -        Get the key signatures - -        :return: the key IDs that have signed the key -        :rtype: list(str) -        """ -        with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: -            res = gpg.list_sigs(self.fingerprint) -            for uid, sigs in res.sigs.iteritems(): -                if _parse_address(uid) in self.uids: -                    return sigs - -        return [] - -    def merge(self, newkey): -        if newkey.fingerprint != self.fingerprint: -            logger.critical( -                "Can't put a key whith the same key_id and different " -                "fingerprint: %s, %s" -                % (newkey.fingerprint, self.fingerprint)) -            raise errors.KeyFingerprintMismatch(newkey.fingerprint) - -        with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: -            gpg.import_keys(self.key_data) -            gpg.import_keys(newkey.key_data) -            gpgkey = gpg.list_keys(secret=newkey.private).pop() - -            if gpgkey['expires']: -                self.expiry_date = datetime.fromtimestamp( -                    int(gpgkey['expires'])) -            else: -                self.expiry_date = None - -            self.uids = [] -            for uid in gpgkey['uids']: -                self.uids.append(_parse_address(uid)) - -            self.length = int(gpgkey['length']) -            self.key_data = gpg.export_keys(gpgkey['fingerprint'], -                                            secret=self.private) - -        if newkey.validation > self.validation: -            self.validation = newkey.validation -        if newkey.last_audited_at > self.last_audited_at: -            self.validation = newkey.last_audited_at -        self.encr_used = newkey.encr_used or self.encr_used -        self.sign_used = newkey.sign_used or self.sign_used -        self.refreshed_at = datetime.now() - -  class OpenPGPScheme(object):      """      A wrapper for OpenPGP keys management and use (encryption, decyption, @@ -298,7 +222,7 @@ class OpenPGPScheme(object):                   local storage.          :rtype: Deferred          """ -        address = _parse_address(address) +        address = parse_address(address)          def build_key((keydoc, activedoc)):              if keydoc is None: @@ -868,7 +792,7 @@ def build_gpg_key(key_info, key_data, address=None, gpgbinary=None):          expiry_date = datetime.fromtimestamp(int(key_info['expires']))      uids = []      for uid in key_info['uids']: -        uids.append(_parse_address(uid)) +        uids.append(parse_address(uid))      if address and address not in uids:          raise errors.KeyAddressMismatch("UIDs %s found, but expected %s"                                          % (str(uids), address)) diff --git a/keymanager/src/leap/keymanager/tests/test_openpgp.py b/keymanager/src/leap/keymanager/tests/test_openpgp.py index acb2c1c..0e39dab 100644 --- a/keymanager/src/leap/keymanager/tests/test_openpgp.py +++ b/keymanager/src/leap/keymanager/tests/test_openpgp.py @@ -33,7 +33,7 @@ from leap.keymanager.keys import (      TYPE_FINGERPRINT_PRIVATE_INDEX,      TYPE_ADDRESS_PRIVATE_INDEX,  ) -from leap.keymanager.openpgp import OpenPGPKey +from leap.keymanager.keys import OpenPGPKey  from leap.keymanager.tests import (      KeyManagerWithSoledadTestCase,      ADDRESS, | 
