summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keymanager/src/leap/keymanager/keys.py88
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py84
-rw-r--r--keymanager/src/leap/keymanager/tests/test_openpgp.py2
3 files changed, 79 insertions, 95 deletions
diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py
index f2a12a9b..1d039451 100644
--- a/keymanager/src/leap/keymanager/keys.py
+++ b/keymanager/src/leap/keymanager/keys.py
@@ -30,11 +30,12 @@ import re
import time
-from abc import ABCMeta
from datetime import datetime
from leap.common.check import leap_assert
from twisted.internet import defer
+from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
from leap.keymanager.validation import ValidationLevels
logger = logging.getLogger(__name__)
@@ -177,29 +178,21 @@ def _to_unix_time(date):
return 0
-#
-# Abstraction for encryption keys
-#
-
-class EncryptionKey(object):
+class OpenPGPKey(object):
"""
- Abstract class for encryption keys.
-
- A key is "validated" if the nicknym agent has bound the user address to a
- public key.
+ Base class for OpenPGP keys.
"""
- __metaclass__ = ABCMeta
-
__slots__ = ('address', 'uids', 'fingerprint', 'key_data',
'private', 'length', 'expiry_date', 'validation',
'last_audited_at', 'refreshed_at',
- 'encr_used', 'sign_used', '_index')
+ 'encr_used', 'sign_used', '_index', '_gpgbinary')
- def __init__(self, address=None, uids=[], fingerprint="",
+ def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="",
key_data="", private=False, length=0, expiry_date=None,
validation=ValidationLevels.Weak_Chain, last_audited_at=None,
refreshed_at=None, encr_used=False, sign_used=False):
+ self._gpgbinary = gpgbinary
self.address = address
if not uids and address:
self.uids = [address]
@@ -218,6 +211,57 @@ class EncryptionKey(object):
self.sign_used = sign_used
self._index = len(self.__slots__)
+ @property
+ def signatures(self):
+ """
+ Get the key signatures
+
+ :return: the key IDs that have signed the key
+ :rtype: list(str)
+ """
+ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
+ res = gpg.list_sigs(self.fingerprint)
+ for uid, sigs in res.sigs.iteritems():
+ if parse_address(uid) in self.uids:
+ return sigs
+
+ return []
+
+ def merge(self, newkey):
+ if newkey.fingerprint != self.fingerprint:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (newkey.fingerprint, self.fingerprint))
+ raise errors.KeyFingerprintMismatch(newkey.fingerprint)
+
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(self.key_data)
+ gpg.import_keys(newkey.key_data)
+ gpgkey = gpg.list_keys(secret=newkey.private).pop()
+
+ if gpgkey['expires']:
+ self.expiry_date = datetime.fromtimestamp(
+ int(gpgkey['expires']))
+ else:
+ self.expiry_date = None
+
+ self.uids = []
+ for uid in gpgkey['uids']:
+ self.uids.append(parse_address(uid))
+
+ self.length = int(gpgkey['length'])
+ self.key_data = gpg.export_keys(gpgkey['fingerprint'],
+ secret=self.private)
+
+ if newkey.validation > self.validation:
+ self.validation = newkey.validation
+ if newkey.last_audited_at > self.last_audited_at:
+ self.validation = newkey.last_audited_at
+ self.encr_used = newkey.encr_used or self.encr_used
+ self.sign_used = newkey.sign_used or self.sign_used
+ self.refreshed_at = datetime.now()
+
def get_json(self):
"""
Return a JSON string describing this key.
@@ -295,6 +339,22 @@ class EncryptionKey(object):
"priv" if self.private else "publ")
+def parse_address(address):
+ """
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
+
+ :type address: str
+ :rtype: str
+ """
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
+
+
def init_indexes(soledad):
"""
Initialize the database indexes.
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index 301c46c1..f8cc9765 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -35,9 +35,10 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
from leap.keymanager.wrapper import TempGPGWrapper
from leap.keymanager.keys import (
- EncryptionKey,
+ OpenPGPKey,
init_indexes,
is_address,
+ parse_address,
build_key_from_dict,
TYPE_FINGERPRINT_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
@@ -70,87 +71,10 @@ def from_thread(func, *args, **kwargs):
return cpu_core_semaphore.run(call)
-def _parse_address(address):
- """
- Remove name, '<', '>' and the identity suffix after the '+' until the '@'
- e.g.: test_user+something@provider.com becomes test_user@provider.com
- since the key belongs to the identity without the '+' suffix.
-
- :type address: str
- :rtype: str
- """
- mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
- match = re.match(mail_regex, address)
- if match is None:
- return None
- return ''.join(match.group(2, 4))
-
-
#
# The OpenPGP wrapper
#
-class OpenPGPKey(EncryptionKey):
- """
- Base class for OpenPGP keys.
- """
-
- def __init__(self, address=None, gpgbinary=None, **kwargs):
- self._gpgbinary = gpgbinary
- super(OpenPGPKey, self).__init__(address, **kwargs)
-
- @property
- def signatures(self):
- """
- Get the key signatures
-
- :return: the key IDs that have signed the key
- :rtype: list(str)
- """
- with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
- res = gpg.list_sigs(self.fingerprint)
- for uid, sigs in res.sigs.iteritems():
- if _parse_address(uid) in self.uids:
- return sigs
-
- return []
-
- def merge(self, newkey):
- if newkey.fingerprint != self.fingerprint:
- logger.critical(
- "Can't put a key whith the same key_id and different "
- "fingerprint: %s, %s"
- % (newkey.fingerprint, self.fingerprint))
- raise errors.KeyFingerprintMismatch(newkey.fingerprint)
-
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- gpg.import_keys(self.key_data)
- gpg.import_keys(newkey.key_data)
- gpgkey = gpg.list_keys(secret=newkey.private).pop()
-
- if gpgkey['expires']:
- self.expiry_date = datetime.fromtimestamp(
- int(gpgkey['expires']))
- else:
- self.expiry_date = None
-
- self.uids = []
- for uid in gpgkey['uids']:
- self.uids.append(_parse_address(uid))
-
- self.length = int(gpgkey['length'])
- self.key_data = gpg.export_keys(gpgkey['fingerprint'],
- secret=self.private)
-
- if newkey.validation > self.validation:
- self.validation = newkey.validation
- if newkey.last_audited_at > self.last_audited_at:
- self.validation = newkey.last_audited_at
- self.encr_used = newkey.encr_used or self.encr_used
- self.sign_used = newkey.sign_used or self.sign_used
- self.refreshed_at = datetime.now()
-
-
class OpenPGPScheme(object):
"""
A wrapper for OpenPGP keys management and use (encryption, decyption,
@@ -298,7 +222,7 @@ class OpenPGPScheme(object):
local storage.
:rtype: Deferred
"""
- address = _parse_address(address)
+ address = parse_address(address)
def build_key((keydoc, activedoc)):
if keydoc is None:
@@ -868,7 +792,7 @@ def build_gpg_key(key_info, key_data, address=None, gpgbinary=None):
expiry_date = datetime.fromtimestamp(int(key_info['expires']))
uids = []
for uid in key_info['uids']:
- uids.append(_parse_address(uid))
+ uids.append(parse_address(uid))
if address and address not in uids:
raise errors.KeyAddressMismatch("UIDs %s found, but expected %s"
% (str(uids), address))
diff --git a/keymanager/src/leap/keymanager/tests/test_openpgp.py b/keymanager/src/leap/keymanager/tests/test_openpgp.py
index acb2c1c4..0e39dab7 100644
--- a/keymanager/src/leap/keymanager/tests/test_openpgp.py
+++ b/keymanager/src/leap/keymanager/tests/test_openpgp.py
@@ -33,7 +33,7 @@ from leap.keymanager.keys import (
TYPE_FINGERPRINT_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
)
-from leap.keymanager.openpgp import OpenPGPKey
+from leap.keymanager.keys import OpenPGPKey
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
ADDRESS,