summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/keys.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager/keys.py')
-rw-r--r--src/leap/keymanager/keys.py88
1 files changed, 74 insertions, 14 deletions
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index f2a12a9..1d03945 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -30,11 +30,12 @@ import re
import time
-from abc import ABCMeta
from datetime import datetime
from leap.common.check import leap_assert
from twisted.internet import defer
+from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
from leap.keymanager.validation import ValidationLevels
logger = logging.getLogger(__name__)
@@ -177,29 +178,21 @@ def _to_unix_time(date):
return 0
-#
-# Abstraction for encryption keys
-#
-
-class EncryptionKey(object):
+class OpenPGPKey(object):
"""
- Abstract class for encryption keys.
-
- A key is "validated" if the nicknym agent has bound the user address to a
- public key.
+ Base class for OpenPGP keys.
"""
- __metaclass__ = ABCMeta
-
__slots__ = ('address', 'uids', 'fingerprint', 'key_data',
'private', 'length', 'expiry_date', 'validation',
'last_audited_at', 'refreshed_at',
- 'encr_used', 'sign_used', '_index')
+ 'encr_used', 'sign_used', '_index', '_gpgbinary')
- def __init__(self, address=None, uids=[], fingerprint="",
+ def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="",
key_data="", private=False, length=0, expiry_date=None,
validation=ValidationLevels.Weak_Chain, last_audited_at=None,
refreshed_at=None, encr_used=False, sign_used=False):
+ self._gpgbinary = gpgbinary
self.address = address
if not uids and address:
self.uids = [address]
@@ -218,6 +211,57 @@ class EncryptionKey(object):
self.sign_used = sign_used
self._index = len(self.__slots__)
+ @property
+ def signatures(self):
+ """
+ Get the key signatures
+
+ :return: the key IDs that have signed the key
+ :rtype: list(str)
+ """
+ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
+ res = gpg.list_sigs(self.fingerprint)
+ for uid, sigs in res.sigs.iteritems():
+ if parse_address(uid) in self.uids:
+ return sigs
+
+ return []
+
+ def merge(self, newkey):
+ if newkey.fingerprint != self.fingerprint:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (newkey.fingerprint, self.fingerprint))
+ raise errors.KeyFingerprintMismatch(newkey.fingerprint)
+
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(self.key_data)
+ gpg.import_keys(newkey.key_data)
+ gpgkey = gpg.list_keys(secret=newkey.private).pop()
+
+ if gpgkey['expires']:
+ self.expiry_date = datetime.fromtimestamp(
+ int(gpgkey['expires']))
+ else:
+ self.expiry_date = None
+
+ self.uids = []
+ for uid in gpgkey['uids']:
+ self.uids.append(parse_address(uid))
+
+ self.length = int(gpgkey['length'])
+ self.key_data = gpg.export_keys(gpgkey['fingerprint'],
+ secret=self.private)
+
+ if newkey.validation > self.validation:
+ self.validation = newkey.validation
+ if newkey.last_audited_at > self.last_audited_at:
+ self.validation = newkey.last_audited_at
+ self.encr_used = newkey.encr_used or self.encr_used
+ self.sign_used = newkey.sign_used or self.sign_used
+ self.refreshed_at = datetime.now()
+
def get_json(self):
"""
Return a JSON string describing this key.
@@ -295,6 +339,22 @@ class EncryptionKey(object):
"priv" if self.private else "publ")
+def parse_address(address):
+ """
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
+
+ :type address: str
+ :rtype: str
+ """
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
+
+
def init_indexes(soledad):
"""
Initialize the database indexes.