summaryrefslogtreecommitdiff
path: root/keymanager/src/leap/keymanager/openpgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'keymanager/src/leap/keymanager/openpgp.py')
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py84
1 files changed, 4 insertions, 80 deletions
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index 301c46c..f8cc976 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -35,9 +35,10 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
from leap.keymanager.wrapper import TempGPGWrapper
from leap.keymanager.keys import (
- EncryptionKey,
+ OpenPGPKey,
init_indexes,
is_address,
+ parse_address,
build_key_from_dict,
TYPE_FINGERPRINT_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
@@ -70,87 +71,10 @@ def from_thread(func, *args, **kwargs):
return cpu_core_semaphore.run(call)
-def _parse_address(address):
- """
- Remove name, '<', '>' and the identity suffix after the '+' until the '@'
- e.g.: test_user+something@provider.com becomes test_user@provider.com
- since the key belongs to the identity without the '+' suffix.
-
- :type address: str
- :rtype: str
- """
- mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
- match = re.match(mail_regex, address)
- if match is None:
- return None
- return ''.join(match.group(2, 4))
-
-
#
# The OpenPGP wrapper
#
-class OpenPGPKey(EncryptionKey):
- """
- Base class for OpenPGP keys.
- """
-
- def __init__(self, address=None, gpgbinary=None, **kwargs):
- self._gpgbinary = gpgbinary
- super(OpenPGPKey, self).__init__(address, **kwargs)
-
- @property
- def signatures(self):
- """
- Get the key signatures
-
- :return: the key IDs that have signed the key
- :rtype: list(str)
- """
- with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
- res = gpg.list_sigs(self.fingerprint)
- for uid, sigs in res.sigs.iteritems():
- if _parse_address(uid) in self.uids:
- return sigs
-
- return []
-
- def merge(self, newkey):
- if newkey.fingerprint != self.fingerprint:
- logger.critical(
- "Can't put a key whith the same key_id and different "
- "fingerprint: %s, %s"
- % (newkey.fingerprint, self.fingerprint))
- raise errors.KeyFingerprintMismatch(newkey.fingerprint)
-
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- gpg.import_keys(self.key_data)
- gpg.import_keys(newkey.key_data)
- gpgkey = gpg.list_keys(secret=newkey.private).pop()
-
- if gpgkey['expires']:
- self.expiry_date = datetime.fromtimestamp(
- int(gpgkey['expires']))
- else:
- self.expiry_date = None
-
- self.uids = []
- for uid in gpgkey['uids']:
- self.uids.append(_parse_address(uid))
-
- self.length = int(gpgkey['length'])
- self.key_data = gpg.export_keys(gpgkey['fingerprint'],
- secret=self.private)
-
- if newkey.validation > self.validation:
- self.validation = newkey.validation
- if newkey.last_audited_at > self.last_audited_at:
- self.validation = newkey.last_audited_at
- self.encr_used = newkey.encr_used or self.encr_used
- self.sign_used = newkey.sign_used or self.sign_used
- self.refreshed_at = datetime.now()
-
-
class OpenPGPScheme(object):
"""
A wrapper for OpenPGP keys management and use (encryption, decyption,
@@ -298,7 +222,7 @@ class OpenPGPScheme(object):
local storage.
:rtype: Deferred
"""
- address = _parse_address(address)
+ address = parse_address(address)
def build_key((keydoc, activedoc)):
if keydoc is None:
@@ -868,7 +792,7 @@ def build_gpg_key(key_info, key_data, address=None, gpgbinary=None):
expiry_date = datetime.fromtimestamp(int(key_info['expires']))
uids = []
for uid in key_info['uids']:
- uids.append(_parse_address(uid))
+ uids.append(parse_address(uid))
if address and address not in uids:
raise errors.KeyAddressMismatch("UIDs %s found, but expected %s"
% (str(uids), address))