summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/openpgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager/openpgp.py')
-rw-r--r--src/leap/keymanager/openpgp.py387
1 files changed, 186 insertions, 201 deletions
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index d648137..a843261 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -22,14 +22,15 @@ import os
import re
import shutil
import tempfile
-import traceback
import io
from datetime import datetime
+from multiprocessing import cpu_count
from gnupg import GPG
from gnupg.gnupg import GPGUtilities
from twisted.internet import defer
+from twisted.internet.threads import deferToThread
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
@@ -38,12 +39,10 @@ from leap.keymanager.keys import (
EncryptionScheme,
is_address,
build_key_from_dict,
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
- KEY_ADDRESS_KEY,
- KEY_ID_KEY,
+ KEY_UIDS_KEY,
KEY_FINGERPRINT_KEY,
- KEY_REFRESHED_AT_KEY,
KEYMANAGER_ACTIVE_TYPE,
)
@@ -55,6 +54,16 @@ logger = logging.getLogger(__name__)
# A temporary GPG keyring wrapped to provide OpenPGP functionality.
#
+# This function will be used to call blocking GPG functions outside
+# of Twisted reactor and match the concurrent calls to the amount of CPU cores
+cpu_core_semaphore = defer.DeferredSemaphore(cpu_count())
+
+
+def from_thread(func, *args, **kwargs):
+ call = lambda: deferToThread(func, *args, **kwargs)
+ return cpu_core_semaphore.run(call)
+
+
class TempGPGWrapper(object):
"""
A context manager that wraps a temporary GPG keyring which only contains
@@ -113,9 +122,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privids = map(lambda privkey: privkey.key_id, privkeys)
+ privfps = map(lambda privkey: privkey.fingerprint, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.key_id not in privids, publkeys)
+ lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -191,7 +200,7 @@ class OpenPGPKey(EncryptionKey):
Base class for OpenPGP keys.
"""
- def __init__(self, address, gpgbinary=None, **kwargs):
+ def __init__(self, address=None, gpgbinary=None, **kwargs):
self._gpgbinary = gpgbinary
super(OpenPGPKey, self).__init__(address, **kwargs)
@@ -204,13 +213,48 @@ class OpenPGPKey(EncryptionKey):
:rtype: list(str)
"""
with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
- res = gpg.list_sigs(self.key_id)
+ res = gpg.list_sigs(self.fingerprint)
for uid, sigs in res.sigs.iteritems():
- if _parse_address(uid) in self.address:
+ if _parse_address(uid) in self.uids:
return sigs
return []
+ def merge(self, newkey):
+ if newkey.fingerprint != self.fingerprint:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (newkey.fingerprint, self.fingerprint))
+ raise errors.KeyFingerprintMismatch(newkey.fingerprint)
+
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(self.key_data)
+ gpg.import_keys(newkey.key_data)
+ gpgkey = gpg.list_keys(secret=newkey.private).pop()
+
+ if gpgkey['expires']:
+ self.expiry_date = datetime.fromtimestamp(
+ int(gpgkey['expires']))
+ else:
+ self.expiry_date = None
+
+ self.uids = []
+ for uid in gpgkey['uids']:
+ self.uids.append(_parse_address(uid))
+
+ self.length = int(gpgkey['length'])
+ self.key_data = gpg.export_keys(gpgkey['fingerprint'],
+ secret=self.private)
+
+ if newkey.validation > self.validation:
+ self.validation = newkey.validation
+ if newkey.last_audited_at > self.last_audited_at:
+ self.validation = newkey.last_audited_at
+ self.encr_used = newkey.encr_used or self.encr_used
+ self.sign_used = newkey.sign_used or self.sign_used
+ self.refreshed_at = datetime.now()
+
class OpenPGPScheme(EncryptionScheme):
"""
@@ -253,6 +297,7 @@ class OpenPGPScheme(EncryptionScheme):
# make sure the key does not already exist
leap_assert(is_address(address), 'Not an user address: %s' % address)
+ @defer.inlineCallbacks
def _gen_key(_):
with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
# TODO: inspect result, or use decorator
@@ -264,7 +309,7 @@ class OpenPGPScheme(EncryptionScheme):
name_comment='')
logger.info("About to generate keys... "
"This might take SOME time.")
- gpg.gen_key(params)
+ yield from_thread(gpg.gen_key, params)
logger.info("Keys for %s have been successfully "
"generated." % (address,))
pubkeys = gpg.list_keys()
@@ -290,10 +335,11 @@ class OpenPGPScheme(EncryptionScheme):
key = gpg.list_keys(secret=secret).pop()
openpgp_key = self._build_key_from_gpg(
key,
- gpg.export_keys(key['fingerprint'], secret=secret))
- d = self.put_key(openpgp_key, address)
+ gpg.export_keys(key['fingerprint'], secret=secret),
+ address)
+ d = self.put_key(openpgp_key)
deferreds.append(d)
- return defer.gatherResults(deferreds)
+ yield defer.gatherResults(deferreds)
def key_already_exists(_):
raise errors.KeyAlreadyExists(address)
@@ -319,15 +365,16 @@ class OpenPGPScheme(EncryptionScheme):
"""
address = _parse_address(address)
- def build_key(doc):
- if doc is None:
+ def build_key((keydoc, activedoc)):
+ if keydoc is None:
raise errors.KeyNotFound(address)
leap_assert(
- address in doc.content[KEY_ADDRESS_KEY],
+ address in keydoc.content[KEY_UIDS_KEY],
'Wrong address in key %s. Expected %s, found %s.'
- % (doc.content[KEY_ID_KEY], address,
- doc.content[KEY_ADDRESS_KEY]))
- key = build_key_from_dict(OpenPGPKey, doc.content)
+ % (keydoc.content[KEY_FINGERPRINT_KEY], address,
+ keydoc.content[KEY_UIDS_KEY]))
+ key = build_key_from_dict(OpenPGPKey, keydoc.content,
+ activedoc.content)
key._gpgbinary = self._gpgbinary
return key
@@ -335,13 +382,15 @@ class OpenPGPScheme(EncryptionScheme):
d.addCallback(build_key)
return d
- def parse_ascii_key(self, key_data):
+ def parse_ascii_key(self, key_data, address=None):
"""
Parses an ascii armored key (or key pair) data and returns
the OpenPGPKey keys.
:param key_data: the key data to be parsed.
:type key_data: str or unicode
+ :param address: Active address for the key.
+ :type address: str
:returns: the public key and private key (if applies) for that data.
:rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
@@ -362,12 +411,13 @@ class OpenPGPScheme(EncryptionScheme):
openpgp_privkey = None
if privkey:
# build private key
- openpgp_privkey = self._build_key_from_gpg(priv_info, privkey)
+ openpgp_privkey = self._build_key_from_gpg(priv_info, privkey,
+ address)
leap_check(pub_info['fingerprint'] == priv_info['fingerprint'],
'Fingerprints for public and private key differ.',
errors.KeyFingerprintMismatch)
# build public key
- openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey)
+ openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey, address)
return (openpgp_pubkey, openpgp_privkey)
@@ -387,12 +437,13 @@ class OpenPGPScheme(EncryptionScheme):
openpgp_privkey = None
try:
- openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
+ openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(
+ key_data, address)
except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
return defer.fail(e)
def put_key(_, key):
- return self.put_key(key, address)
+ return self.put_key(key)
d = defer.succeed(None)
if openpgp_pubkey is not None:
@@ -401,112 +452,59 @@ class OpenPGPScheme(EncryptionScheme):
d.addCallback(put_key, openpgp_privkey)
return d
- def put_key(self, key, address):
+ def put_key(self, key):
"""
Put C{key} in local storage.
:param key: The key to be stored.
:type key: OpenPGPKey
- :param address: address for which this key will be active.
- :type address: str
:return: A Deferred which fires when the key is in the storage.
:rtype: Deferred
"""
- d = self._put_key_doc(key)
- d.addCallback(lambda _: self._put_active_doc(key, address))
- return d
+ def merge_and_put((keydoc, activedoc)):
+ if not keydoc:
+ return put_new_key(activedoc)
- def _put_key_doc(self, key):
- """
- Put key document in soledad
+ active_content = None
+ if activedoc:
+ active_content = activedoc.content
+ oldkey = build_key_from_dict(OpenPGPKey, keydoc.content,
+ active_content)
- :type key: OpenPGPKey
- :rtype: Deferred
- """
- def check_and_put(docs, key):
- deferred_repair = defer.succeed(None)
- if len(docs) == 0:
- return self._soledad.create_doc_from_json(key.get_json())
- elif len(docs) > 1:
- deferred_repair = self._repair_key_docs(docs, key.key_id)
-
- doc = docs[0]
- oldkey = build_key_from_dict(OpenPGPKey, doc.content)
- if key.fingerprint != oldkey.fingerprint:
- logger.critical(
- "Can't put a key whith the same key_id and different "
- "fingerprint: %s, %s"
- % (key.fingerprint, oldkey.fingerprint))
- return defer.fail(
- errors.KeyFingerprintMismatch(key.fingerprint))
-
- # in case of an update of the key merge them with gnupg
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- gpg.import_keys(oldkey.key_data)
- gpg.import_keys(key.key_data)
- gpgkey = gpg.list_keys(secret=key.private).pop()
- mergedkey = self._build_key_from_gpg(
- gpgkey,
- gpg.export_keys(gpgkey['fingerprint'],
- secret=key.private))
- mergedkey.validation = max(
- [key.validation, oldkey.validation])
- mergedkey.last_audited_at = oldkey.last_audited_at
- mergedkey.refreshed_at = key.refreshed_at
- mergedkey.encr_used = key.encr_used or oldkey.encr_used
- mergedkey.sign_used = key.sign_used or oldkey.sign_used
- doc.set_json(mergedkey.get_json())
- deferred_put = self._soledad.put_doc(doc)
-
- d = defer.gatherResults([deferred_put, deferred_repair])
- d.addCallback(lambda res: res[0])
+ key.merge(oldkey)
+ keydoc.set_json(key.get_json())
+ d = self._soledad.put_doc(keydoc)
+ d.addCallback(put_active, activedoc)
return d
- d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
- self.KEY_TYPE,
- key.key_id,
- '1' if key.private else '0')
- d.addCallback(check_and_put, key)
- return d
-
- def _put_active_doc(self, key, address):
- """
- Put active key document in soledad
+ def put_new_key(activedoc):
+ deferreds = []
+ if activedoc:
+ d = self._soledad.delete_doc(activedoc)
+ deferreds.append(d)
+ for json in [key.get_json(), key.get_active_json()]:
+ d = self._soledad.create_doc_from_json(json)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
- :type key: OpenPGPKey
- :type addresses: str
- :rtype: Deferred
- """
- def check_and_put(docs):
- if len(docs) == 1:
- doc = docs.pop()
- doc.set_json(key.get_active_json(address))
- d = self._soledad.put_doc(doc)
+ def put_active(_, activedoc):
+ active_json = key.get_active_json()
+ if activedoc:
+ activedoc.set_json(active_json)
+ d = self._soledad.put_doc(activedoc)
else:
- if len(docs) > 1:
- logger.error("There is more than one active key document "
- "for the address %s" % (address,))
- deferreds = []
- for doc in docs:
- delete = self._soledad.delete_doc(doc)
- deferreds.append(delete)
- d = defer.gatherResults(deferreds, consumeErrors=True)
- else:
- d = defer.succeed(None)
-
- d.addCallback(
- lambda _: self._soledad.create_doc_from_json(
- key.get_active_json(address)))
+ d = self._soledad.create_doc_from_json(active_json)
return d
- d = self._soledad.get_from_index(
- TYPE_ADDRESS_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- address,
- '1' if key.private else '0')
- d.addCallback(check_and_put)
+ def get_active_doc(keydoc):
+ d = self._get_active_doc_from_address(key.address, key.private)
+ d.addCallback(lambda activedoc: (keydoc, activedoc))
+ return d
+
+ d = self._get_key_doc_from_fingerprint(key.fingerprint, key.private)
+ d.addCallback(get_active_doc)
+ d.addCallback(merge_and_put)
return d
def _get_key_doc(self, address, private=False):
@@ -520,48 +518,30 @@ class OpenPGPScheme(EncryptionScheme):
:param private: Whether to look for a private key.
:type private: bool
- :return: A Deferred which fires with the SoledadDocument with the key
- or None if it does not exist.
+ :return: A Deferred which fires with a touple of two SoledadDocument
+ (keydoc, activedoc) or None if it does not exist.
:rtype: Deferred
"""
def get_key_from_active_doc(activedoc):
- if len(activedoc) is 0:
- return None
- leap_assert(
- len(activedoc) is 1,
- 'Found more than one key for address %s!' % (address,))
-
- key_id = activedoc[0].content[KEY_ID_KEY]
- d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
- self.KEY_TYPE,
- key_id,
- '1' if private else '0')
- d.addCallback(get_doc, key_id, activedoc)
+ if not activedoc:
+ return (None, None)
+ fingerprint = activedoc.content[KEY_FINGERPRINT_KEY]
+ d = self._get_key_doc_from_fingerprint(fingerprint, private)
+ d.addCallback(delete_active_if_no_key, activedoc)
return d
- def get_doc(doclist, key_id, activedoc):
- if len(doclist) == 0:
- logger.warning('There is no key for id %s! Self-repairing it.'
- % (key_id))
+ def delete_active_if_no_key(keydoc, activedoc):
+ if not keydoc:
d = self._soledad.delete_doc(activedoc)
- d.addCallback(lambda _: None)
+ d.addCallback(lambda _: (None, None))
return d
- elif len(doclist) > 1:
- d = self._repair_key_docs(doclist, key_id)
- d.addCallback(lambda _: doclist[0])
- return d
- return doclist[0]
+ return (keydoc, activedoc)
- d = self._soledad.get_from_index(
- TYPE_ADDRESS_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- address,
- '1' if private else '0')
+ d = self._get_active_doc_from_address(address, private)
d.addCallback(get_key_from_active_doc)
return d
- def _build_key_from_gpg(self, key, key_data):
+ def _build_key_from_gpg(self, key, key_data, address=None):
"""
Build an OpenPGPKey for C{address} based on C{key} from
local gpg storage.
@@ -569,6 +549,8 @@ class OpenPGPScheme(EncryptionScheme):
ASCII armored GPG key data has to be queried independently in this
wrapper, so we receive it in C{key_data}.
+ :param address: Active address for the key.
+ :type address: str
:param key: Key obtained from GPG storage.
:type key: dict
:param key_data: Key data obtained from GPG storage.
@@ -576,7 +558,7 @@ class OpenPGPScheme(EncryptionScheme):
:return: An instance of the key.
:rtype: OpenPGPKey
"""
- return build_gpg_key(key, key_data, self._gpgbinary)
+ return build_gpg_key(key, key_data, address, self._gpgbinary)
def delete_key(self, key):
"""
@@ -601,17 +583,17 @@ class OpenPGPScheme(EncryptionScheme):
def get_key_docs(_):
return self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
self.KEY_TYPE,
- key.key_id,
+ key.fingerprint,
'1' if key.private else '0')
def delete_key(docs):
if len(docs) == 0:
raise errors.KeyNotFound(key)
elif len(docs) > 1:
- logger.warning("There is more than one key for key_id %s"
- % key.key_id)
+ logger.warning("There is more than one key for fingerprint %s"
+ % key.fingerprint)
has_deleted = False
deferreds = []
@@ -625,45 +607,15 @@ class OpenPGPScheme(EncryptionScheme):
return defer.gatherResults(deferreds)
d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
self.ACTIVE_TYPE,
- key.key_id,
+ key.fingerprint,
'1' if key.private else '0')
d.addCallback(delete_docs)
d.addCallback(get_key_docs)
d.addCallback(delete_key)
return d
- def _repair_key_docs(self, doclist, key_id):
- """
- If there is more than one key for a key id try to self-repair it
-
- :return: a Deferred that will be fired once all the deletions are
- completed
- :rtype: Deferred
- """
- logger.error("BUG ---------------------------------------------------")
- logger.error("There is more than one key with the same key_id %s:"
- % (key_id,))
-
- def log_key_doc(doc):
- logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
- doc.content[KEY_FINGERPRINT_KEY]))
-
- doclist.sort(key=lambda doc: doc.content[KEY_REFRESHED_AT_KEY],
- reverse=True)
- log_key_doc(doclist[0])
- deferreds = []
- for doc in doclist[1:]:
- log_key_doc(doc)
- d = self._soledad.delete_doc(doc)
- deferreds.append(d)
-
- logger.error("")
- logger.error(traceback.extract_stack())
- logger.error("BUG (please report above info) ------------------------")
- return defer.gatherResults(deferreds, consumeErrors=True)
-
#
# Data encryption, decryption, signing and verifying
#
@@ -686,6 +638,7 @@ class OpenPGPScheme(EncryptionScheme):
raise errors.GPGError(
'Failed to encrypt/decrypt: %s' % stderr)
+ @defer.inlineCallbacks
def encrypt(self, data, pubkey, passphrase=None, sign=None,
cipher_algo='AES256'):
"""
@@ -700,8 +653,8 @@ class OpenPGPScheme(EncryptionScheme):
:param cipher_algo: The cipher algorithm to use.
:type cipher_algo: str
- :return: The encrypted data.
- :rtype: str
+ :return: A Deferred that will be fired with the encrypted data.
+ :rtype: defer.Deferred
:raise EncryptError: Raised if failed encrypting for some reason.
"""
@@ -713,9 +666,10 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(sign.private is True)
keys.append(sign)
with TempGPGWrapper(keys, self._gpgbinary) as gpg:
- result = gpg.encrypt(
+ result = yield from_thread(
+ gpg.encrypt,
data, pubkey.fingerprint,
- default_key=sign.key_id if sign else None,
+ default_key=sign.fingerprint if sign else None,
passphrase=passphrase, symmetric=False,
cipher_algo=cipher_algo)
# Here we cannot assert for correctness of sig because the sig is
@@ -724,11 +678,12 @@ class OpenPGPScheme(EncryptionScheme):
# result.data - (bool) contains the result of the operation
try:
self._assert_gpg_result_ok(result)
- return result.data
+ defer.returnValue(result.data)
except errors.GPGError as e:
- logger.error('Failed to decrypt: %s.' % str(e))
+ logger.warning('Failed to encrypt: %s.' % str(e))
raise errors.EncryptError()
+ @defer.inlineCallbacks
def decrypt(self, data, privkey, passphrase=None, verify=None):
"""
Decrypt C{data} using private @{privkey} and verify with C{verify} key.
@@ -743,8 +698,9 @@ class OpenPGPScheme(EncryptionScheme):
:param verify: The key used to verify a signature.
:type verify: OpenPGPKey
- :return: The decrypted data and if signature verifies
- :rtype: (unicode, bool)
+ :return: Deferred that will fire with the decrypted data and
+ if signature verifies (unicode, bool)
+ :rtype: Deferred
:raise DecryptError: Raised if failed decrypting for some reason.
"""
@@ -756,8 +712,9 @@ class OpenPGPScheme(EncryptionScheme):
keys.append(verify)
with TempGPGWrapper(keys, self._gpgbinary) as gpg:
try:
- result = gpg.decrypt(
- data, passphrase=passphrase, always_trust=True)
+ result = yield from_thread(gpg.decrypt,
+ data, passphrase=passphrase,
+ always_trust=True)
self._assert_gpg_result_ok(result)
# verify signature
@@ -767,9 +724,9 @@ class OpenPGPScheme(EncryptionScheme):
verify.fingerprint == result.pubkey_fingerprint):
sign_valid = True
- return (result.data, sign_valid)
+ defer.returnValue((result.data, sign_valid))
except errors.GPGError as e:
- logger.error('Failed to decrypt: %s.' % str(e))
+ logger.warning('Failed to decrypt: %s.' % str(e))
raise errors.DecryptError(str(e))
def is_encrypted(self, data):
@@ -814,7 +771,7 @@ class OpenPGPScheme(EncryptionScheme):
# result.fingerprint - contains the fingerprint of the key used to
# sign.
with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
- result = gpg.sign(data, default_key=privkey.key_id,
+ result = gpg.sign(data, default_key=privkey.fingerprint,
digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
rfprint = privkey.fingerprint
@@ -823,7 +780,7 @@ class OpenPGPScheme(EncryptionScheme):
if result.fingerprint is None:
raise errors.SignFailed(
'Failed to sign with key %s: %s' %
- (privkey['keyid'], result.stderr))
+ (privkey['fingerprint'], result.stderr))
leap_assert(
result.fingerprint == kfprint,
'Signature and private key fingerprints mismatch: '
@@ -867,6 +824,31 @@ class OpenPGPScheme(EncryptionScheme):
kfprint = gpgpubkey['fingerprint']
return valid and rfprint == kfprint
+ def _get_active_doc_from_address(self, address, private):
+ d = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if private else '0')
+ d.addCallback(self._repair_and_get_doc, self._repair_active_docs)
+ return d
+
+ def _get_key_doc_from_fingerprint(self, fingerprint, private):
+ d = self._soledad.get_from_index(
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ fingerprint,
+ '1' if private else '0')
+ d.addCallback(self._repair_and_get_doc, self._repair_key_docs)
+ return d
+
+ def _repair_and_get_doc(self, doclist, repair_func):
+ if len(doclist) is 0:
+ return None
+ elif len(doclist) > 1:
+ return repair_func(doclist)
+ return doclist[0]
+
def process_ascii_key(key_data, gpgbinary, secret=False):
with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
@@ -880,18 +862,21 @@ def process_ascii_key(key_data, gpgbinary, secret=False):
return info, key
-def build_gpg_key(key_info, key_data, gpgbinary=None):
+def build_gpg_key(key_info, key_data, address=None, gpgbinary=None):
expiry_date = None
if key_info['expires']:
expiry_date = datetime.fromtimestamp(int(key_info['expires']))
- address = []
+ uids = []
for uid in key_info['uids']:
- address.append(_parse_address(uid))
+ uids.append(_parse_address(uid))
+ if address and address not in uids:
+ raise errors.KeyAddressMismatch("UIDs %s found, but expected %s"
+ % (str(uids), address))
return OpenPGPKey(
- address,
+ address=address,
+ uids=uids,
gpgbinary=gpgbinary,
- key_id=key_info['keyid'],
fingerprint=key_info['fingerprint'],
key_data=key_data,
private=True if key_info['type'] == 'sec' else False,