summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/openpgp.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-11-25 12:30:40 -0200
committerdrebs <drebs@leap.se>2014-11-25 12:30:40 -0200
commit22fddf85a3fa50f7ab164b2a9311cb01a9818e62 (patch)
tree84f4552818b9efa7858a3e9f5f5c0a94bd04a7cc /src/leap/keymanager/openpgp.py
parent27776fbab6fe963082a882dfb5232c54b0195d5f (diff)
parent2f29739946db6cd360296639830a3bfe7d8c3f31 (diff)
Merge branch 'feature/6299_new_doc' into develop
Diffstat (limited to 'src/leap/keymanager/openpgp.py')
-rw-r--r--src/leap/keymanager/openpgp.py212
1 files changed, 149 insertions, 63 deletions
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index d3c305e..3f298f7 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -36,12 +36,14 @@ from leap.keymanager.keys import (
EncryptionScheme,
is_address,
build_key_from_dict,
- KEYMANAGER_KEY_TAG,
- TAGS_ADDRESS_PRIVATE_INDEX,
+ TYPE_ID_PRIVATE_INDEX,
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ KEY_ADDRESS_KEY,
KEY_FINGERPRINT_KEY,
KEY_DATA_KEY,
+ KEY_ID_KEY,
+ KEYMANAGER_ACTIVE_TYPE,
)
-from leap.keymanager.validation import ValidationLevel
logger = logging.getLogger(__name__)
@@ -109,9 +111,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address, privkeys)
+ privids = map(lambda privkey: privkey.key_id, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.address not in privaddrs, publkeys)
+ lambda pubkey: pubkey.key_id not in privids, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -162,16 +164,13 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.homedir)
-def _build_key_from_gpg(address, key, key_data):
+def _build_key_from_gpg(key, key_data):
"""
- Build an OpenPGPKey for C{address} based on C{key} from
- local gpg storage.
+ Build an OpenPGPKey based on C{key} from local gpg storage.
ASCII armored GPG key data has to be queried independently in this
wrapper, so we receive it in C{key_data}.
- :param address: The address bound to the key.
- :type address: str
:param key: Key obtained from GPG storage.
:type key: dict
:param key_data: Key data obtained from GPG storage.
@@ -182,6 +181,9 @@ def _build_key_from_gpg(address, key, key_data):
expiry_date = None
if key['expires']:
expiry_date = datetime.fromtimestamp(int(key['expires']))
+ address = []
+ for uid in key['uids']:
+ address.append(_parse_address(uid))
return OpenPGPKey(
address,
@@ -189,12 +191,28 @@ def _build_key_from_gpg(address, key, key_data):
fingerprint=key['fingerprint'],
key_data=key_data,
private=True if key['type'] == 'sec' else False,
- length=key['length'],
+ length=int(key['length']),
expiry_date=expiry_date,
- validation=ValidationLevel.Weak_Chain,
+ refreshed_at=datetime.now(),
)
+def _parse_address(address):
+ """
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
+
+ :type address: str
+ :rtype: str
+ """
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
+
+
#
# The OpenPGP wrapper
#
@@ -211,6 +229,10 @@ class OpenPGPScheme(EncryptionScheme):
signing and verification).
"""
+ # type used on the soledad documents
+ KEY_TYPE = OpenPGPKey.__name__
+ ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
+
def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
@@ -272,16 +294,18 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(
len(key['uids']) is 1, # with just one uid!
'Wrong number of uids for key: %d.' % len(key['uids']))
- leap_assert(
- re.match('.*<%s>$' % address, key['uids'][0]) is not None,
- 'Key not correctly bound to address.')
+ uid_match = False
+ for uid in key['uids']:
+ if re.match('.*<%s>$' % address, uid) is not None:
+ uid_match = True
+ return
+ leap_assert(uid_match, 'Key not correctly bound to address.')
# insert both public and private keys in storage
for secret in [True, False]:
key = gpg.list_keys(secret=secret).pop()
openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint'], secret=secret))
- self.put_key(openpgp_key)
+ key, gpg.export_keys(key['fingerprint'], secret=secret))
+ self.put_key(openpgp_key, address)
return self.get_key(address, private=True)
@@ -298,15 +322,15 @@ class OpenPGPScheme(EncryptionScheme):
:rtype: OpenPGPKey
@raise KeyNotFound: If the key was not found on local storage.
"""
- # Remove the identity suffix after the '+' until the '@'
- # e.g.: test_user+something@provider.com becomes test_user@provider.com
- # since the key belongs to the identity without the '+' suffix.
- address = re.sub(r'\+.*\@', '@', address)
+ address = _parse_address(address)
doc = self._get_key_doc(address, private)
if doc is None:
raise errors.KeyNotFound(address)
- return build_key_from_dict(OpenPGPKey, address, doc.content)
+ leap_assert(
+ address in doc.content[KEY_ADDRESS_KEY],
+ 'Wrong address in key data.')
+ return build_key_from_dict(OpenPGPKey, doc.content)
def parse_ascii_key(self, key_data):
"""
@@ -323,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(key_data, (str, unicode))
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
with self._temporary_gpgwrapper() as gpg:
# TODO: inspect result, or use decorator
@@ -340,42 +363,31 @@ class OpenPGPScheme(EncryptionScheme):
except IndexError:
return (None, None)
- # extract adress from first uid on key
- match = re.match(mail_regex, pubkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- address = match.group(1)
-
openpgp_privkey = None
if privkey is not None:
- match = re.match(mail_regex, privkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- privaddress = match.group(1)
-
# build private key
openpgp_privkey = _build_key_from_gpg(
- privaddress, privkey,
+ privkey,
gpg.export_keys(privkey['fingerprint'], secret=True))
-
- leap_check(address == privaddress,
- 'Addresses in public and private key differ.',
- errors.KeyAddressMismatch)
leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
'Fingerprints for public and private key differ.',
errors.KeyFingerprintMismatch)
# build public key
openpgp_pubkey = _build_key_from_gpg(
- address, pubkey,
+ pubkey,
gpg.export_keys(pubkey['fingerprint'], secret=False))
return (openpgp_pubkey, openpgp_privkey)
- def put_ascii_key(self, key_data):
+ def put_ascii_key(self, key_data, address):
"""
Put key contained in ascii-armored C{key_data} in local storage.
:param key_data: The key data to be stored.
:type key_data: str or unicode
+ :param address: address for which this key will be active
+ :type address: str
"""
leap_assert_type(key_data, (str, unicode))
@@ -386,21 +398,35 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(False, repr(e))
if openpgp_pubkey is not None:
- self.put_key(openpgp_pubkey)
+ self.put_key(openpgp_pubkey, address)
if openpgp_privkey is not None:
- self.put_key(openpgp_privkey)
+ self.put_key(openpgp_privkey, address)
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored.
:type key: OpenPGPKey
+ :param address: address for which this key will be active.
+ :type address: str
"""
- doc = self._get_key_doc(key.address, private=key.private)
- if doc is None:
- self._soledad.create_doc_from_json(key.get_json())
- else:
+ self._put_key_doc(key)
+ self._put_active_doc(key, address)
+
+ def _put_key_doc(self, key):
+ """
+ Put key document in soledad
+
+ :type key: OpenPGPKey
+ """
+ docs = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ if len(docs) != 0:
+ doc = docs.pop()
if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:
# in case of an update of the key merge them with gnupg
with self._temporary_gpgwrapper() as gpg:
@@ -408,11 +434,43 @@ class OpenPGPScheme(EncryptionScheme):
gpg.import_keys(key.key_data)
gpgkey = gpg.list_keys(secret=key.private).pop()
key = _build_key_from_gpg(
- key.address, gpgkey,
+ gpgkey,
gpg.export_keys(gpgkey['fingerprint'],
secret=key.private))
- doc.set_json(key.get_json())
+ doc.set_json(key.get_json())
+ self._soledad.put_doc(doc)
+ else:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY]))
+ else:
+ self._soledad.create_doc_from_json(key.get_json())
+
+ def _put_active_doc(self, key, address):
+ """
+ Put active key document in soledad
+
+ :type key: OpenPGPKey
+ :type addresses: str
+ """
+ docs = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if key.private else '0')
+ if len(docs) == 1:
+ doc = docs.pop()
+ doc.set_json(key.get_active_json(address))
self._soledad.put_doc(doc)
+ else:
+ if len(docs) > 1:
+ logger.error("There is more than one active key document "
+ "for the address %s" % (address,))
+ for doc in docs:
+ self._soledad.delete_doc(doc)
+ self._soledad.create_doc_from_json(
+ key.get_active_json(address))
def _get_key_doc(self, address, private=False):
"""
@@ -427,17 +485,26 @@ class OpenPGPScheme(EncryptionScheme):
:return: The document with the key or None if it does not exist.
:rtype: leap.soledad.document.SoledadDocument
"""
- doclist = self._soledad.get_from_index(
- TAGS_ADDRESS_PRIVATE_INDEX,
- KEYMANAGER_KEY_TAG,
+ activedoc = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
address,
'1' if private else '0')
- if len(doclist) is 0:
+ if len(activedoc) is 0:
return None
leap_assert(
+ len(activedoc) is 1,
+ 'Found more than one key for address %s!' % (address,))
+
+ key_id = activedoc[0].content[KEY_ID_KEY]
+ doclist = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key_id,
+ '1' if private else '0')
+ leap_assert(
len(doclist) is 1,
- 'Found more than one %s key for address!' %
- 'private' if private else 'public')
+ 'There is %d keys for id %s!' % (len(doclist), key_id))
return doclist.pop()
def delete_key(self, key):
@@ -446,18 +513,37 @@ class OpenPGPScheme(EncryptionScheme):
May raise:
errors.KeyNotFound
- errors.KeyAttributesDiffer
:param key: The key to be removed.
:type key: EncryptionKey
"""
leap_assert_type(key, OpenPGPKey)
- stored_key = self.get_key(key.address, private=key.private)
- if stored_key is None:
+ activedocs = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ for doc in activedocs:
+ self._soledad.delete_doc(doc)
+
+ docs = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ if len(docs) == 0:
+ raise errors.KeyNotFound(key)
+ if len(docs) > 1:
+ logger.critical("There is more than one key for key_id %s"
+ % key.key_id)
+
+ doc = None
+ for d in docs:
+ if d.content['fingerprint'] == key.fingerprint:
+ doc = d
+ break
+ if doc is None:
raise errors.KeyNotFound(key)
- if stored_key.__dict__ != key.__dict__:
- raise errors.KeyAttributesDiffer(key)
- doc = self._get_key_doc(key.address, key.private)
self._soledad.delete_doc(doc)
#
@@ -656,8 +742,8 @@ class OpenPGPScheme(EncryptionScheme):
verified against this detached signature.
:type detached_sig: str
- :return: The ascii-armored signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
leap_assert_type(pubkey, OpenPGPKey)
leap_assert(pubkey.private is False)