summaryrefslogtreecommitdiff
path: root/keymanager/src/leap/keymanager/keys.py
diff options
context:
space:
mode:
Diffstat (limited to 'keymanager/src/leap/keymanager/keys.py')
-rw-r--r--keymanager/src/leap/keymanager/keys.py290
1 files changed, 290 insertions, 0 deletions
diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py
new file mode 100644
index 0000000..91ecf3a
--- /dev/null
+++ b/keymanager/src/leap/keymanager/keys.py
@@ -0,0 +1,290 @@
+# -*- coding: utf-8 -*-
+# keys.py
+# Copyright (C) 2013-2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Abstact key type and encryption scheme representations.
+"""
+
+
+import json
+import logging
+import re
+import time
+
+from datetime import datetime
+
+from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
+from leap.keymanager.validation import ValidationLevels
+from leap.keymanager import documents as doc
+
+logger = logging.getLogger(__name__)
+
+
+#
+# Key handling utilities
+#
+
+def is_address(address):
+ """
+ Return whether the given C{address} is in the form user@provider.
+
+ :param address: The address to be tested.
+ :type address: str
+ :return: Whether C{address} is in the form user@provider.
+ :rtype: bool
+ """
+ return bool(re.match('[\w.-]+@[\w.-]+', address))
+
+
+def build_key_from_dict(key, active=None):
+ """
+ Build an OpenPGPKey key based on info in C{kdict}.
+
+ :param key: Dictionary with key data.
+ :type key: dict
+ :param active: Dictionary with active data.
+ :type active: dict
+ :return: An instance of the key.
+ :rtype: C{kClass}
+ """
+ address = None
+ validation = ValidationLevels.Weak_Chain
+ last_audited_at = None
+ encr_used = False
+ sign_used = False
+
+ if active:
+ address = active[doc.KEY_ADDRESS_KEY]
+ try:
+ validation = ValidationLevels.get(active[doc.KEY_VALIDATION_KEY])
+ except ValueError:
+ logger.error("Not valid validation level (%s) for key %s",
+ (active[doc.KEY_VALIDATION_KEY],
+ active[doc.KEY_FINGERPRINT_KEY]))
+ last_audited_at = _to_datetime(active[doc.KEY_LAST_AUDITED_AT_KEY])
+ encr_used = active[doc.KEY_ENCR_USED_KEY]
+ sign_used = active[doc.KEY_SIGN_USED_KEY]
+
+ expiry_date = _to_datetime(key[doc.KEY_EXPIRY_DATE_KEY])
+ refreshed_at = _to_datetime(key[doc.KEY_REFRESHED_AT_KEY])
+
+ return OpenPGPKey(
+ address=address,
+ uids=key[doc.KEY_UIDS_KEY],
+ fingerprint=key[doc.KEY_FINGERPRINT_KEY],
+ key_data=key[doc.KEY_DATA_KEY],
+ private=key[doc.KEY_PRIVATE_KEY],
+ length=key[doc.KEY_LENGTH_KEY],
+ expiry_date=expiry_date,
+ last_audited_at=last_audited_at,
+ refreshed_at=refreshed_at,
+ validation=validation,
+ encr_used=encr_used,
+ sign_used=sign_used,
+ )
+
+
+def _to_datetime(unix_time):
+ if unix_time != 0:
+ return datetime.fromtimestamp(unix_time)
+ else:
+ return None
+
+
+def _to_unix_time(date):
+ if date is not None:
+ return int(time.mktime(date.timetuple()))
+ else:
+ return 0
+
+
+class OpenPGPKey(object):
+ """
+ Base class for OpenPGP keys.
+ """
+
+ __slots__ = ('address', 'uids', 'fingerprint', 'key_data',
+ 'private', 'length', 'expiry_date', 'validation',
+ 'last_audited_at', 'refreshed_at',
+ 'encr_used', 'sign_used', '_index', '_gpgbinary')
+
+ def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="",
+ key_data="", private=False, length=0, expiry_date=None,
+ validation=ValidationLevels.Weak_Chain, last_audited_at=None,
+ refreshed_at=None, encr_used=False, sign_used=False):
+ self._gpgbinary = gpgbinary
+ self.address = address
+ if not uids and address:
+ self.uids = [address]
+ else:
+ self.uids = uids
+ self.fingerprint = fingerprint
+ self.key_data = key_data
+ self.private = private
+ self.length = length
+ self.expiry_date = expiry_date
+
+ self.validation = validation
+ self.last_audited_at = last_audited_at
+ self.refreshed_at = refreshed_at
+ self.encr_used = encr_used
+ self.sign_used = sign_used
+ self._index = len(self.__slots__)
+
+ @property
+ def signatures(self):
+ """
+ Get the key signatures
+
+ :return: the key IDs that have signed the key
+ :rtype: list(str)
+ """
+ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
+ res = gpg.list_sigs(self.fingerprint)
+ for uid, sigs in res.sigs.iteritems():
+ if parse_address(uid) in self.uids:
+ return sigs
+
+ return []
+
+ def merge(self, newkey):
+ if newkey.fingerprint != self.fingerprint:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (newkey.fingerprint, self.fingerprint))
+ raise errors.KeyFingerprintMismatch(newkey.fingerprint)
+
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(self.key_data)
+ gpg.import_keys(newkey.key_data)
+ gpgkey = gpg.list_keys(secret=newkey.private).pop()
+
+ if gpgkey['expires']:
+ self.expiry_date = datetime.fromtimestamp(
+ int(gpgkey['expires']))
+ else:
+ self.expiry_date = None
+
+ self.uids = []
+ for uid in gpgkey['uids']:
+ self.uids.append(parse_address(uid))
+
+ self.length = int(gpgkey['length'])
+ self.key_data = gpg.export_keys(gpgkey['fingerprint'],
+ secret=self.private)
+
+ if newkey.validation > self.validation:
+ self.validation = newkey.validation
+ if newkey.last_audited_at > self.last_audited_at:
+ self.validation = newkey.last_audited_at
+ self.encr_used = newkey.encr_used or self.encr_used
+ self.sign_used = newkey.sign_used or self.sign_used
+ self.refreshed_at = datetime.now()
+
+ def get_json(self):
+ """
+ Return a JSON string describing this key.
+
+ :return: The JSON string describing this key.
+ :rtype: str
+ """
+ expiry_date = _to_unix_time(self.expiry_date)
+ refreshed_at = _to_unix_time(self.refreshed_at)
+
+ return json.dumps({
+ doc.KEY_UIDS_KEY: self.uids,
+ doc.KEY_TYPE_KEY: self.__class__.__name__,
+ doc.KEY_FINGERPRINT_KEY: self.fingerprint,
+ doc.KEY_DATA_KEY: self.key_data,
+ doc.KEY_PRIVATE_KEY: self.private,
+ doc.KEY_LENGTH_KEY: self.length,
+ doc.KEY_EXPIRY_DATE_KEY: expiry_date,
+ doc.KEY_REFRESHED_AT_KEY: refreshed_at,
+ doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION,
+ doc.KEY_TAGS_KEY: [doc.KEYMANAGER_KEY_TAG],
+ })
+
+ def get_active_json(self):
+ """
+ Return a JSON string describing this key.
+
+ :return: The JSON string describing this key.
+ :rtype: str
+ """
+ last_audited_at = _to_unix_time(self.last_audited_at)
+
+ return json.dumps({
+ doc.KEY_ADDRESS_KEY: self.address,
+ doc.KEY_TYPE_KEY: (self.__class__.__name__ +
+ doc.KEYMANAGER_ACTIVE_TYPE),
+ doc.KEY_FINGERPRINT_KEY: self.fingerprint,
+ doc.KEY_PRIVATE_KEY: self.private,
+ doc.KEY_VALIDATION_KEY: str(self.validation),
+ doc.KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+ doc.KEY_ENCR_USED_KEY: self.encr_used,
+ doc.KEY_SIGN_USED_KEY: self.sign_used,
+ doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION,
+ doc.KEY_TAGS_KEY: [doc.KEYMANAGER_ACTIVE_TAG],
+ })
+
+ def next(self):
+ if self._index == 0:
+ self._index = len(self.__slots__)
+ raise StopIteration
+
+ self._index -= 1
+ key = self.__slots__[self._index]
+
+ if key.startswith('_'):
+ return self.next()
+
+ value = getattr(self, key)
+ if key == "validation":
+ value = str(value)
+ elif key in ["expiry_date", "last_audited_at", "refreshed_at"]:
+ value = str(value)
+ return key, value
+
+ def __iter__(self):
+ return self
+
+ def __repr__(self):
+ """
+ Representation of this class
+ """
+ return u"<%s 0x%s (%s - %s)>" % (
+ self.__class__.__name__,
+ self.fingerprint,
+ self.address,
+ "priv" if self.private else "publ")
+
+
+def parse_address(address):
+ """
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
+
+ :type address: str
+ :rtype: str
+ """
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))