# -*- coding: utf-8 -*- # keys.py # Copyright (C) 2013-2016 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Abstact key type and encryption scheme representations. """ try: import simplejson as json except ImportError: import json # noqa import logging import re import time from datetime import datetime from leap.keymanager import errors from leap.keymanager.wrapper import TempGPGWrapper from leap.keymanager.validation import ValidationLevels from leap.keymanager import documents as doc logger = logging.getLogger(__name__) # # Key handling utilities # def is_address(address): """ Return whether the given C{address} is in the form user@provider. :param address: The address to be tested. :type address: str :return: Whether C{address} is in the form user@provider. :rtype: bool """ return bool(re.match('[\w.-]+@[\w.-]+', address)) def build_key_from_dict(key, active=None): """ Build an OpenPGPKey key based on info in C{kdict}. :param key: Dictionary with key data. :type key: dict :param active: Dictionary with active data. :type active: dict :return: An instance of the key. :rtype: C{kClass} """ address = None validation = ValidationLevels.Weak_Chain last_audited_at = None encr_used = False sign_used = False if active: address = active[doc.KEY_ADDRESS_KEY] try: validation = ValidationLevels.get(active[doc.KEY_VALIDATION_KEY]) except ValueError: logger.error("Not valid validation level (%s) for key %s", (active[doc.KEY_VALIDATION_KEY], active[doc.KEY_FINGERPRINT_KEY])) last_audited_at = _to_datetime(active[doc.KEY_LAST_AUDITED_AT_KEY]) encr_used = active[doc.KEY_ENCR_USED_KEY] sign_used = active[doc.KEY_SIGN_USED_KEY] expiry_date = _to_datetime(key[doc.KEY_EXPIRY_DATE_KEY]) refreshed_at = _to_datetime(key[doc.KEY_REFRESHED_AT_KEY]) return OpenPGPKey( address=address, uids=key[doc.KEY_UIDS_KEY], fingerprint=key[doc.KEY_FINGERPRINT_KEY], key_data=key[doc.KEY_DATA_KEY], private=key[doc.KEY_PRIVATE_KEY], length=key[doc.KEY_LENGTH_KEY], expiry_date=expiry_date, last_audited_at=last_audited_at, refreshed_at=refreshed_at, validation=validation, encr_used=encr_used, sign_used=sign_used, ) def _to_datetime(unix_time): if unix_time != 0: return datetime.fromtimestamp(unix_time) else: return None def _to_unix_time(date): if date is not None: return int(time.mktime(date.timetuple())) else: return 0 class OpenPGPKey(object): """ Base class for OpenPGP keys. """ __slots__ = ('address', 'uids', 'fingerprint', 'key_data', 'private', 'length', 'expiry_date', 'validation', 'last_audited_at', 'refreshed_at', 'encr_used', 'sign_used', '_index', '_gpgbinary') def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="", key_data="", private=False, length=0, expiry_date=None, validation=ValidationLevels.Weak_Chain, last_audited_at=None, refreshed_at=None, encr_used=False, sign_used=False): self._gpgbinary = gpgbinary self.address = address if not uids and address: self.uids = [address] else: self.uids = uids self.fingerprint = fingerprint self.key_data = key_data self.private = private self.length = length self.expiry_date = expiry_date self.validation = validation self.last_audited_at = last_audited_at self.refreshed_at = refreshed_at self.encr_used = encr_used self.sign_used = sign_used self._index = len(self.__slots__) @property def signatures(self): """ Get the key signatures :return: the key IDs that have signed the key :rtype: list(str) """ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: res = gpg.list_sigs(self.fingerprint) for uid, sigs in res.sigs.iteritems(): if parse_address(uid) in self.uids: return sigs return [] def merge(self, newkey): if newkey.fingerprint != self.fingerprint: logger.critical( "Can't put a key whith the same key_id and different " "fingerprint: %s, %s" % (newkey.fingerprint, self.fingerprint)) raise errors.KeyFingerprintMismatch(newkey.fingerprint) with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: gpg.import_keys(self.key_data) gpg.import_keys(newkey.key_data) gpgkey = gpg.list_keys(secret=newkey.private).pop() if gpgkey['expires']: self.expiry_date = datetime.fromtimestamp( int(gpgkey['expires'])) else: self.expiry_date = None self.uids = [] for uid in gpgkey['uids']: self.uids.append(parse_address(uid)) self.length = int(gpgkey['length']) self.key_data = gpg.export_keys(gpgkey['fingerprint'], secret=self.private) if newkey.validation > self.validation: self.validation = newkey.validation if newkey.last_audited_at > self.last_audited_at: self.validation = newkey.last_audited_at self.encr_used = newkey.encr_used or self.encr_used self.sign_used = newkey.sign_used or self.sign_used self.refreshed_at = datetime.now() def get_json(self): """ Return a JSON string describing this key. :return: The JSON string describing this key. :rtype: str """ expiry_date = _to_unix_time(self.expiry_date) refreshed_at = _to_unix_time(self.refreshed_at) return json.dumps({ doc.KEY_UIDS_KEY: self.uids, doc.KEY_TYPE_KEY: self.__class__.__name__, doc.KEY_FINGERPRINT_KEY: self.fingerprint, doc.KEY_DATA_KEY: self.key_data, doc.KEY_PRIVATE_KEY: self.private, doc.KEY_LENGTH_KEY: self.length, doc.KEY_EXPIRY_DATE_KEY: expiry_date, doc.KEY_REFRESHED_AT_KEY: refreshed_at, doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION, doc.KEY_TAGS_KEY: [doc.KEYMANAGER_KEY_TAG], }) def get_active_json(self): """ Return a JSON string describing this key. :return: The JSON string describing this key. :rtype: str """ last_audited_at = _to_unix_time(self.last_audited_at) return json.dumps({ doc.KEY_ADDRESS_KEY: self.address, doc.KEY_TYPE_KEY: (self.__class__.__name__ + doc.KEYMANAGER_ACTIVE_TYPE), doc.KEY_FINGERPRINT_KEY: self.fingerprint, doc.KEY_PRIVATE_KEY: self.private, doc.KEY_VALIDATION_KEY: str(self.validation), doc.KEY_LAST_AUDITED_AT_KEY: last_audited_at, doc.KEY_ENCR_USED_KEY: self.encr_used, doc.KEY_SIGN_USED_KEY: self.sign_used, doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION, doc.KEY_TAGS_KEY: [doc.KEYMANAGER_ACTIVE_TAG], }) def next(self): if self._index == 0: self._index = len(self.__slots__) raise StopIteration self._index -= 1 key = self.__slots__[self._index] if key.startswith('_'): return self.next() value = getattr(self, key) if key == "validation": value = str(value) elif key in ["expiry_date", "last_audited_at", "refreshed_at"]: value = str(value) return key, value def __iter__(self): return self def __repr__(self): """ Representation of this class """ return u"<%s 0x%s (%s - %s)>" % ( self.__class__.__name__, self.fingerprint, self.address, "priv" if self.private else "publ") def parse_address(address): """ Remove name, '<', '>' and the identity suffix after the '+' until the '@' e.g.: test_user+something@provider.com becomes test_user@provider.com since the key belongs to the identity without the '+' suffix. :type address: str :rtype: str """ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' match = re.match(mail_regex, address) if match is None: return None return ''.join(match.group(2, 4))