diff options
| author | drebs <drebs@leap.se> | 2014-11-25 12:30:40 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2014-11-25 12:30:40 -0200 | 
| commit | 22fddf85a3fa50f7ab164b2a9311cb01a9818e62 (patch) | |
| tree | 84f4552818b9efa7858a3e9f5f5c0a94bd04a7cc /src/leap/keymanager/openpgp.py | |
| parent | 27776fbab6fe963082a882dfb5232c54b0195d5f (diff) | |
| parent | 2f29739946db6cd360296639830a3bfe7d8c3f31 (diff) | |
Merge branch 'feature/6299_new_doc' into develop
Diffstat (limited to 'src/leap/keymanager/openpgp.py')
| -rw-r--r-- | src/leap/keymanager/openpgp.py | 212 | 
1 files changed, 149 insertions, 63 deletions
| diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index d3c305e2..3f298f71 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -36,12 +36,14 @@ from leap.keymanager.keys import (      EncryptionScheme,      is_address,      build_key_from_dict, -    KEYMANAGER_KEY_TAG, -    TAGS_ADDRESS_PRIVATE_INDEX, +    TYPE_ID_PRIVATE_INDEX, +    TYPE_ADDRESS_PRIVATE_INDEX, +    KEY_ADDRESS_KEY,      KEY_FINGERPRINT_KEY,      KEY_DATA_KEY, +    KEY_ID_KEY, +    KEYMANAGER_ACTIVE_TYPE,  ) -from leap.keymanager.validation import ValidationLevel  logger = logging.getLogger(__name__) @@ -109,9 +111,9 @@ class TempGPGWrapper(object):          # itself is enough to also have the public key in the keyring,          # and we want to count the keys afterwards. -        privaddrs = map(lambda privkey: privkey.address, privkeys) +        privids = map(lambda privkey: privkey.key_id, privkeys)          publkeys = filter( -            lambda pubkey: pubkey.address not in privaddrs, publkeys) +            lambda pubkey: pubkey.key_id not in privids, publkeys)          listkeys = lambda: self._gpg.list_keys()          listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -162,16 +164,13 @@ class TempGPGWrapper(object):              shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data):      """ -    Build an OpenPGPKey for C{address} based on C{key} from -    local gpg storage. +    Build an OpenPGPKey based on C{key} from local gpg storage.      ASCII armored GPG key data has to be queried independently in this      wrapper, so we receive it in C{key_data}. -    :param address: The address bound to the key. -    :type address: str      :param key: Key obtained from GPG storage.      :type key: dict      :param key_data: Key data obtained from GPG storage. @@ -182,6 +181,9 @@ def _build_key_from_gpg(address, key, key_data):      expiry_date = None      if key['expires']:          expiry_date = datetime.fromtimestamp(int(key['expires'])) +    address = [] +    for uid in key['uids']: +        address.append(_parse_address(uid))      return OpenPGPKey(          address, @@ -189,12 +191,28 @@ def _build_key_from_gpg(address, key, key_data):          fingerprint=key['fingerprint'],          key_data=key_data,          private=True if key['type'] == 'sec' else False, -        length=key['length'], +        length=int(key['length']),          expiry_date=expiry_date, -        validation=ValidationLevel.Weak_Chain, +        refreshed_at=datetime.now(),      ) +def _parse_address(address): +    """ +    Remove name, '<', '>' and the identity suffix after the '+' until the '@' +    e.g.: test_user+something@provider.com becomes test_user@provider.com +    since the key belongs to the identity without the '+' suffix. + +    :type address: str +    :rtype: str +    """ +    mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' +    match = re.match(mail_regex, address) +    if match is None: +        return None +    return ''.join(match.group(2, 4)) + +  #  # The OpenPGP wrapper  # @@ -211,6 +229,10 @@ class OpenPGPScheme(EncryptionScheme):      signing and verification).      """ +    # type used on the soledad documents +    KEY_TYPE = OpenPGPKey.__name__ +    ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE +      def __init__(self, soledad, gpgbinary=None):          """          Initialize the OpenPGP wrapper. @@ -272,16 +294,18 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert(                  len(key['uids']) is 1,  # with just one uid!                  'Wrong number of uids for key: %d.' % len(key['uids'])) -            leap_assert( -                re.match('.*<%s>$' % address, key['uids'][0]) is not None, -                'Key not correctly bound to address.') +            uid_match = False +            for uid in key['uids']: +                if re.match('.*<%s>$' % address, uid) is not None: +                    uid_match = True +                    return +            leap_assert(uid_match, 'Key not correctly bound to address.')              # insert both public and private keys in storage              for secret in [True, False]:                  key = gpg.list_keys(secret=secret).pop()                  openpgp_key = _build_key_from_gpg( -                    address, key, -                    gpg.export_keys(key['fingerprint'], secret=secret)) -                self.put_key(openpgp_key) +                    key, gpg.export_keys(key['fingerprint'], secret=secret)) +                self.put_key(openpgp_key, address)          return self.get_key(address, private=True) @@ -298,15 +322,15 @@ class OpenPGPScheme(EncryptionScheme):          :rtype: OpenPGPKey          @raise KeyNotFound: If the key was not found on local storage.          """ -        # Remove the identity suffix after the '+' until the '@' -        # e.g.: test_user+something@provider.com becomes test_user@provider.com -        # since the key belongs to the identity without the '+' suffix. -        address = re.sub(r'\+.*\@', '@', address) +        address = _parse_address(address)          doc = self._get_key_doc(address, private)          if doc is None:              raise errors.KeyNotFound(address) -        return build_key_from_dict(OpenPGPKey, address, doc.content) +        leap_assert( +            address in doc.content[KEY_ADDRESS_KEY], +            'Wrong address in key data.') +        return build_key_from_dict(OpenPGPKey, doc.content)      def parse_ascii_key(self, key_data):          """ @@ -323,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert_type(key_data, (str, unicode))          # TODO: add more checks for correct key data.          leap_assert(key_data is not None, 'Data does not represent a key.') -        mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'          with self._temporary_gpgwrapper() as gpg:              # TODO: inspect result, or use decorator @@ -340,42 +363,31 @@ class OpenPGPScheme(EncryptionScheme):              except IndexError:                  return (None, None) -            # extract adress from first uid on key -            match = re.match(mail_regex, pubkey['uids'].pop()) -            leap_assert(match is not None, 'No user address in key data.') -            address = match.group(1) -              openpgp_privkey = None              if privkey is not None: -                match = re.match(mail_regex, privkey['uids'].pop()) -                leap_assert(match is not None, 'No user address in key data.') -                privaddress = match.group(1) -                  # build private key                  openpgp_privkey = _build_key_from_gpg( -                    privaddress, privkey, +                    privkey,                      gpg.export_keys(privkey['fingerprint'], secret=True)) - -                leap_check(address == privaddress, -                           'Addresses in public and private key differ.', -                           errors.KeyAddressMismatch)                  leap_check(pubkey['fingerprint'] == privkey['fingerprint'],                             'Fingerprints for public and private key differ.',                             errors.KeyFingerprintMismatch)              # build public key              openpgp_pubkey = _build_key_from_gpg( -                address, pubkey, +                pubkey,                  gpg.export_keys(pubkey['fingerprint'], secret=False))              return (openpgp_pubkey, openpgp_privkey) -    def put_ascii_key(self, key_data): +    def put_ascii_key(self, key_data, address):          """          Put key contained in ascii-armored C{key_data} in local storage.          :param key_data: The key data to be stored.          :type key_data: str or unicode +        :param address: address for which this key will be active +        :type address: str          """          leap_assert_type(key_data, (str, unicode)) @@ -386,21 +398,35 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert(False, repr(e))          if openpgp_pubkey is not None: -            self.put_key(openpgp_pubkey) +            self.put_key(openpgp_pubkey, address)          if openpgp_privkey is not None: -            self.put_key(openpgp_privkey) +            self.put_key(openpgp_privkey, address) -    def put_key(self, key): +    def put_key(self, key, address):          """          Put C{key} in local storage.          :param key: The key to be stored.          :type key: OpenPGPKey +        :param address: address for which this key will be active. +        :type address: str          """ -        doc = self._get_key_doc(key.address, private=key.private) -        if doc is None: -            self._soledad.create_doc_from_json(key.get_json()) -        else: +        self._put_key_doc(key) +        self._put_active_doc(key, address) + +    def _put_key_doc(self, key): +        """ +        Put key document in soledad + +        :type key: OpenPGPKey +        """ +        docs = self._soledad.get_from_index( +            TYPE_ID_PRIVATE_INDEX, +            self.KEY_TYPE, +            key.key_id, +            '1' if key.private else '0') +        if len(docs) != 0: +            doc = docs.pop()              if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:                  # in case of an update of the key merge them with gnupg                  with self._temporary_gpgwrapper() as gpg: @@ -408,11 +434,43 @@ class OpenPGPScheme(EncryptionScheme):                      gpg.import_keys(key.key_data)                      gpgkey = gpg.list_keys(secret=key.private).pop()                      key = _build_key_from_gpg( -                        key.address, gpgkey, +                        gpgkey,                          gpg.export_keys(gpgkey['fingerprint'],                                          secret=key.private)) -            doc.set_json(key.get_json()) +                doc.set_json(key.get_json()) +                self._soledad.put_doc(doc) +            else: +                logger.critical( +                    "Can't put a key whith the same key_id and different " +                    "fingerprint: %s, %s" +                    % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY])) +        else: +            self._soledad.create_doc_from_json(key.get_json()) + +    def _put_active_doc(self, key, address): +        """ +        Put active key document in soledad + +        :type key: OpenPGPKey +        :type addresses: str +        """ +        docs = self._soledad.get_from_index( +            TYPE_ADDRESS_PRIVATE_INDEX, +            self.ACTIVE_TYPE, +            address, +            '1' if key.private else '0') +        if len(docs) == 1: +            doc = docs.pop() +            doc.set_json(key.get_active_json(address))              self._soledad.put_doc(doc) +        else: +            if len(docs) > 1: +                logger.error("There is more than one active key document " +                             "for the address %s" % (address,)) +                for doc in docs: +                    self._soledad.delete_doc(doc) +            self._soledad.create_doc_from_json( +                key.get_active_json(address))      def _get_key_doc(self, address, private=False):          """ @@ -427,17 +485,26 @@ class OpenPGPScheme(EncryptionScheme):          :return: The document with the key or None if it does not exist.          :rtype: leap.soledad.document.SoledadDocument          """ -        doclist = self._soledad.get_from_index( -            TAGS_ADDRESS_PRIVATE_INDEX, -            KEYMANAGER_KEY_TAG, +        activedoc = self._soledad.get_from_index( +            TYPE_ADDRESS_PRIVATE_INDEX, +            self.ACTIVE_TYPE,              address,              '1' if private else '0') -        if len(doclist) is 0: +        if len(activedoc) is 0:              return None          leap_assert( +            len(activedoc) is 1, +            'Found more than one key for address %s!' % (address,)) + +        key_id = activedoc[0].content[KEY_ID_KEY] +        doclist = self._soledad.get_from_index( +            TYPE_ID_PRIVATE_INDEX, +            self.KEY_TYPE, +            key_id, +            '1' if private else '0') +        leap_assert(              len(doclist) is 1, -            'Found more than one %s key for address!' % -            'private' if private else 'public') +            'There is %d keys for id %s!' % (len(doclist), key_id))          return doclist.pop()      def delete_key(self, key): @@ -446,18 +513,37 @@ class OpenPGPScheme(EncryptionScheme):          May raise:              errors.KeyNotFound -            errors.KeyAttributesDiffer          :param key: The key to be removed.          :type key: EncryptionKey          """          leap_assert_type(key, OpenPGPKey) -        stored_key = self.get_key(key.address, private=key.private) -        if stored_key is None: +        activedocs = self._soledad.get_from_index( +            TYPE_ID_PRIVATE_INDEX, +            self.ACTIVE_TYPE, +            key.key_id, +            '1' if key.private else '0') +        for doc in activedocs: +            self._soledad.delete_doc(doc) + +        docs = self._soledad.get_from_index( +            TYPE_ID_PRIVATE_INDEX, +            self.KEY_TYPE, +            key.key_id, +            '1' if key.private else '0') +        if len(docs) == 0: +            raise errors.KeyNotFound(key) +        if len(docs) > 1: +            logger.critical("There is more than one key for key_id %s" +                            % key.key_id) + +        doc = None +        for d in docs: +            if d.content['fingerprint'] == key.fingerprint: +                doc = d +                break +        if doc is None:              raise errors.KeyNotFound(key) -        if stored_key.__dict__ != key.__dict__: -            raise errors.KeyAttributesDiffer(key) -        doc = self._get_key_doc(key.address, key.private)          self._soledad.delete_doc(doc)      # @@ -656,8 +742,8 @@ class OpenPGPScheme(EncryptionScheme):                               verified against this detached signature.          :type detached_sig: str -        :return: The ascii-armored signed data. -        :rtype: str +        :return: signature matches +        :rtype: bool          """          leap_assert_type(pubkey, OpenPGPKey)          leap_assert(pubkey.private is False) | 
