summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/openpgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager/openpgp.py')
-rw-r--r--src/leap/keymanager/openpgp.py527
1 files changed, 363 insertions, 164 deletions
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 950d022..794a0ec 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -22,12 +22,13 @@ import os
import re
import shutil
import tempfile
+import io
-from contextlib import closing
+from datetime import datetime
from gnupg import GPG
from gnupg.gnupg import GPGUtilities
-from gnupg._util import _make_binary_stream
+from twisted.internet import defer
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
@@ -36,8 +37,11 @@ from leap.keymanager.keys import (
EncryptionScheme,
is_address,
build_key_from_dict,
- KEYMANAGER_KEY_TAG,
- TAGS_ADDRESS_PRIVATE_INDEX,
+ TYPE_ID_PRIVATE_INDEX,
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ KEY_ADDRESS_KEY,
+ KEY_ID_KEY,
+ KEYMANAGER_ACTIVE_TYPE,
)
@@ -106,9 +110,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address, privkeys)
+ privids = map(lambda privkey: privkey.key_id, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.address not in privaddrs, publkeys)
+ lambda pubkey: pubkey.key_id not in privids, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -159,33 +163,20 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.homedir)
-def _build_key_from_gpg(address, key, key_data):
+def _parse_address(address):
"""
- Build an OpenPGPKey for C{address} based on C{key} from
- local gpg storage.
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
- ASCII armored GPG key data has to be queried independently in this
- wrapper, so we receive it in C{key_data}.
-
- :param address: The address bound to the key.
:type address: str
- :param key: Key obtained from GPG storage.
- :type key: dict
- :param key_data: Key data obtained from GPG storage.
- :type key_data: str
- :return: An instance of the key.
- :rtype: OpenPGPKey
+ :rtype: str
"""
- return OpenPGPKey(
- address,
- key_id=key['keyid'],
- fingerprint=key['fingerprint'],
- key_data=key_data,
- private=True if key['type'] == 'sec' else False,
- length=key['length'],
- expiry_date=key['expires'],
- validation=None, # TODO: verify for validation.
- )
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
#
@@ -197,6 +188,26 @@ class OpenPGPKey(EncryptionKey):
Base class for OpenPGP keys.
"""
+ def __init__(self, address, gpgbinary=None, **kwargs):
+ self._gpgbinary = gpgbinary
+ super(OpenPGPKey, self).__init__(address, **kwargs)
+
+ @property
+ def signatures(self):
+ """
+ Get the key signatures
+
+ :return: the key IDs that have signed the key
+ :rtype: list(str)
+ """
+ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
+ res = gpg.list_sigs(self.key_id)
+ for uid, sigs in res.sigs.iteritems():
+ if _parse_address(uid) in self.address:
+ return sigs
+
+ return []
+
class OpenPGPScheme(EncryptionScheme):
"""
@@ -204,6 +215,10 @@ class OpenPGPScheme(EncryptionScheme):
signing and verification).
"""
+ # type used on the soledad documents
+ KEY_TYPE = OpenPGPKey.__name__
+ ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
+
def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
@@ -214,6 +229,7 @@ class OpenPGPScheme(EncryptionScheme):
:type gpgbinary: C{str}
"""
EncryptionScheme.__init__(self, soledad)
+ self._wait_indexes("get_key", "put_key")
self._gpgbinary = gpgbinary
#
@@ -226,57 +242,63 @@ class OpenPGPScheme(EncryptionScheme):
:param address: The address bound to the key.
:type address: str
- :return: The key bound to C{address}.
- :rtype: OpenPGPKey
- @raise KeyAlreadyExists: If key already exists in local database.
+
+ :return: A Deferred which fires with the key bound to address, or fails
+ with KeyAlreadyExists if key already exists in local database.
+ :rtype: Deferred
"""
# make sure the key does not already exist
leap_assert(is_address(address), 'Not an user address: %s' % address)
- try:
- self.get_key(address)
- raise errors.KeyAlreadyExists(address)
- except errors.KeyNotFound:
- logger.debug('Key for %s not found' % (address,))
- with self._temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- params = gpg.gen_key_input(
- key_type='RSA',
- key_length=4096,
- name_real=address,
- name_email=address,
- name_comment='')
- logger.info("About to generate keys... This might take SOME time.")
- gpg.gen_key(params)
- logger.info("Keys for %s have been successfully "
- "generated." % (address,))
- pubkeys = gpg.list_keys()
-
- # assert for new key characteristics
-
- # XXX This exception is not properly catched by the soledad
- # bootstrapping, so if we do not finish generating the keys
- # we end with a blocked thread -- kali
-
- leap_assert(
- len(pubkeys) is 1, # a unitary keyring!
- 'Keyring has wrong number of keys: %d.' % len(pubkeys))
- key = gpg.list_keys(secret=True).pop()
- leap_assert(
- len(key['uids']) is 1, # with just one uid!
- 'Wrong number of uids for key: %d.' % len(key['uids']))
- leap_assert(
- re.match('.*<%s>$' % address, key['uids'][0]) is not None,
- 'Key not correctly bound to address.')
- # insert both public and private keys in storage
- for secret in [True, False]:
- key = gpg.list_keys(secret=secret).pop()
- openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint'], secret=secret))
- self.put_key(openpgp_key)
+ def _gen_key(_):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
+ params = gpg.gen_key_input(
+ key_type='RSA',
+ key_length=4096,
+ name_real=address,
+ name_email=address,
+ name_comment='')
+ logger.info("About to generate keys... "
+ "This might take SOME time.")
+ gpg.gen_key(params)
+ logger.info("Keys for %s have been successfully "
+ "generated." % (address,))
+ pubkeys = gpg.list_keys()
+
+ # assert for new key characteristics
+ leap_assert(
+ len(pubkeys) is 1, # a unitary keyring!
+ 'Keyring has wrong number of keys: %d.' % len(pubkeys))
+ key = gpg.list_keys(secret=True).pop()
+ leap_assert(
+ len(key['uids']) is 1, # with just one uid!
+ 'Wrong number of uids for key: %d.' % len(key['uids']))
+ uid_match = False
+ for uid in key['uids']:
+ if re.match('.*<%s>$' % address, uid) is not None:
+ uid_match = True
+ break
+ leap_assert(uid_match, 'Key not correctly bound to address.')
+
+ # insert both public and private keys in storage
+ deferreds = []
+ for secret in [True, False]:
+ key = gpg.list_keys(secret=secret).pop()
+ openpgp_key = self._build_key_from_gpg(
+ key,
+ gpg.export_keys(key['fingerprint'], secret=secret))
+ d = self.put_key(openpgp_key, address)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+ def key_already_exists(_):
+ raise errors.KeyAlreadyExists(address)
- return self.get_key(address, private=True)
+ d = self.get_key(address)
+ d.addCallbacks(key_already_exists, _gen_key)
+ d.addCallback(lambda _: self.get_key(address, private=True))
+ return d
def get_key(self, address, private=False):
"""
@@ -287,19 +309,26 @@ class OpenPGPScheme(EncryptionScheme):
:param private: Look for a private key instead of a public one?
:type private: bool
- :return: The key bound to C{address}.
- :rtype: OpenPGPKey
- @raise KeyNotFound: If the key was not found on local storage.
+ :return: A Deferred which fires with the OpenPGPKey bound to address,
+ or which fails with KeyNotFound if the key was not found on
+ local storage.
+ :rtype: Deferred
"""
- # Remove the identity suffix after the '+' until the '@'
- # e.g.: test_user+something@provider.com becomes test_user@provider.com
- # since the key belongs to the identity without the '+' suffix.
- address = re.sub(r'\+.*\@', '@', address)
+ address = _parse_address(address)
- doc = self._get_key_doc(address, private)
- if doc is None:
- raise errors.KeyNotFound(address)
- return build_key_from_dict(OpenPGPKey, address, doc.content)
+ def build_key(doc):
+ if doc is None:
+ raise errors.KeyNotFound(address)
+ leap_assert(
+ address in doc.content[KEY_ADDRESS_KEY],
+ 'Wrong address in key data.')
+ key = build_key_from_dict(OpenPGPKey, doc.content)
+ key._gpgbinary = self._gpgbinary
+ return key
+
+ d = self._get_key_doc(address, private)
+ d.addCallback(build_key)
+ return d
def parse_ascii_key(self, key_data):
"""
@@ -316,7 +345,6 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(key_data, (str, unicode))
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
with self._temporary_gpgwrapper() as gpg:
# TODO: inspect result, or use decorator
@@ -328,44 +356,39 @@ class OpenPGPScheme(EncryptionScheme):
privkey = gpg.list_keys(secret=True).pop()
except IndexError:
pass
- pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
-
- # extract adress from first uid on key
- match = re.match(mail_regex, pubkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- address = match.group(1)
+ try:
+ pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
+ except IndexError:
+ return (None, None)
openpgp_privkey = None
if privkey is not None:
- match = re.match(mail_regex, privkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- privaddress = match.group(1)
-
# build private key
- openpgp_privkey = _build_key_from_gpg(
- privaddress, privkey,
+ openpgp_privkey = self._build_key_from_gpg(
+ privkey,
gpg.export_keys(privkey['fingerprint'], secret=True))
-
- leap_check(address == privaddress,
- 'Addresses in public and private key differ.',
- errors.KeyAddressMismatch)
leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
'Fingerprints for public and private key differ.',
errors.KeyFingerprintMismatch)
# build public key
- openpgp_pubkey = _build_key_from_gpg(
- address, pubkey,
+ openpgp_pubkey = self._build_key_from_gpg(
+ pubkey,
gpg.export_keys(pubkey['fingerprint'], secret=False))
return (openpgp_pubkey, openpgp_privkey)
- def put_ascii_key(self, key_data):
+ def put_ascii_key(self, key_data, address):
"""
Put key contained in ascii-armored C{key_data} in local storage.
:param key_data: The key data to be stored.
:type key_data: str or unicode
+ :param address: address for which this key will be active
+ :type address: str
+
+ :return: A Deferred which fires when the OpenPGPKey is in the storage.
+ :rtype: Deferred
"""
leap_assert_type(key_data, (str, unicode))
@@ -373,26 +396,124 @@ class OpenPGPScheme(EncryptionScheme):
try:
openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
- leap_assert(False, repr(e))
+ return defer.fail(e)
+
+ def put_key(_, key):
+ return self.put_key(key, address)
+ d = defer.succeed(None)
if openpgp_pubkey is not None:
- self.put_key(openpgp_pubkey)
+ d.addCallback(put_key, openpgp_pubkey)
if openpgp_privkey is not None:
- self.put_key(openpgp_privkey)
+ d.addCallback(put_key, openpgp_privkey)
+ return d
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored.
:type key: OpenPGPKey
+ :param address: address for which this key will be active.
+ :type address: str
+
+ :return: A Deferred which fires when the key is in the storage.
+ :rtype: Deferred
+ """
+ d = self._put_key_doc(key)
+ d.addCallback(lambda _: self._put_active_doc(key, address))
+ return d
+
+ def _put_key_doc(self, key):
"""
- doc = self._get_key_doc(key.address, private=key.private)
- if doc is None:
- self._soledad.create_doc_from_json(key.get_json())
- else:
- doc.set_json(key.get_json())
- self._soledad.put_doc(doc)
+ Put key document in soledad
+
+ :type key: OpenPGPKey
+ :rtype: Deferred
+ """
+ def check_and_put(docs, key):
+ if len(docs) == 1:
+ doc = docs.pop()
+ oldkey = build_key_from_dict(OpenPGPKey, doc.content)
+ if key.fingerprint == oldkey.fingerprint:
+ # in case of an update of the key merge them with gnupg
+ with self._temporary_gpgwrapper() as gpg:
+ gpg.import_keys(oldkey.key_data)
+ gpg.import_keys(key.key_data)
+ gpgkey = gpg.list_keys(secret=key.private).pop()
+ mergedkey = self._build_key_from_gpg(
+ gpgkey,
+ gpg.export_keys(gpgkey['fingerprint'],
+ secret=key.private))
+ mergedkey.validation = max(
+ [key.validation, oldkey.validation])
+ mergedkey.last_audited_at = oldkey.last_audited_at
+ mergedkey.refreshed_at = key.refreshed_at
+ mergedkey.encr_used = key.encr_used or oldkey.encr_used
+ mergedkey.sign_used = key.sign_used or oldkey.sign_used
+ doc.set_json(mergedkey.get_json())
+ d = self._soledad.put_doc(doc)
+ else:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (key.fingerprint, oldkey.fingerprint))
+ d = defer.fail(
+ errors.KeyFingerprintMismatch(key.fingerprint))
+ elif len(docs) > 1:
+ logger.critical(
+ "There is more than one key with the same key_id %s"
+ % (key.key_id,))
+ d = defer.fail(errors.KeyAttributesDiffer(key.key_id))
+ else:
+ d = self._soledad.create_doc_from_json(key.get_json())
+ return d
+
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ d.addCallback(check_and_put, key)
+ return d
+
+ def _put_active_doc(self, key, address):
+ """
+ Put active key document in soledad
+
+ :type key: OpenPGPKey
+ :type addresses: str
+ :rtype: Deferred
+ """
+ def check_and_put(docs):
+ if len(docs) == 1:
+ doc = docs.pop()
+ doc.set_json(key.get_active_json(address))
+ d = self._soledad.put_doc(doc)
+ else:
+ if len(docs) > 1:
+ logger.error("There is more than one active key document "
+ "for the address %s" % (address,))
+ deferreds = []
+ for doc in docs:
+ delete = self._soledad.delete_doc(doc)
+ deferreds.append(delete)
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+ else:
+ d = defer.succeed(None)
+
+ d.addCallback(
+ lambda _: self._soledad.create_doc_from_json(
+ key.get_active_json(address)))
+ return d
+
+ d = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if key.private else '0')
+ d.addCallback(check_and_put)
+ return d
def _get_key_doc(self, address, private=False):
"""
@@ -404,41 +525,128 @@ class OpenPGPScheme(EncryptionScheme):
:type address: str
:param private: Whether to look for a private key.
:type private: bool
- :return: The document with the key or None if it does not exist.
- :rtype: leap.soledad.document.SoledadDocument
+
+ :return: A Deferred which fires with the SoledadDocument with the key
+ or None if it does not exist.
+ :rtype: Deferred
"""
- doclist = self._soledad.get_from_index(
- TAGS_ADDRESS_PRIVATE_INDEX,
- KEYMANAGER_KEY_TAG,
+ def get_key_from_active_doc(activedoc):
+ if len(activedoc) is 0:
+ return None
+ leap_assert(
+ len(activedoc) is 1,
+ 'Found more than one key for address %s!' % (address,))
+
+ key_id = activedoc[0].content[KEY_ID_KEY]
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key_id,
+ '1' if private else '0')
+ d.addCallback(get_doc, key_id)
+ return d
+
+ def get_doc(doclist, key_id):
+ leap_assert(
+ len(doclist) is 1,
+ 'There is %d keys for id %s!' % (len(doclist), key_id))
+ return doclist.pop()
+
+ d = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
address,
'1' if private else '0')
- if len(doclist) is 0:
- return None
- leap_assert(
- len(doclist) is 1,
- 'Found more than one %s key for address!' %
- 'private' if private else 'public')
- return doclist.pop()
+ d.addCallback(get_key_from_active_doc)
+ return d
+
+ def _build_key_from_gpg(self, key, key_data):
+ """
+ Build an OpenPGPKey for C{address} based on C{key} from
+ local gpg storage.
+
+ ASCII armored GPG key data has to be queried independently in this
+ wrapper, so we receive it in C{key_data}.
+
+ :param key: Key obtained from GPG storage.
+ :type key: dict
+ :param key_data: Key data obtained from GPG storage.
+ :type key_data: str
+ :return: An instance of the key.
+ :rtype: OpenPGPKey
+ """
+ expiry_date = None
+ if key['expires']:
+ expiry_date = datetime.fromtimestamp(int(key['expires']))
+ address = []
+ for uid in key['uids']:
+ address.append(_parse_address(uid))
+
+ return OpenPGPKey(
+ address,
+ gpgbinary=self._gpgbinary,
+ key_id=key['keyid'],
+ fingerprint=key['fingerprint'],
+ key_data=key_data,
+ private=True if key['type'] == 'sec' else False,
+ length=int(key['length']),
+ expiry_date=expiry_date,
+ refreshed_at=datetime.now(),
+ )
def delete_key(self, key):
"""
Remove C{key} from storage.
- May raise:
- errors.KeyNotFound
- errors.KeyAttributesDiffer
-
:param key: The key to be removed.
:type key: EncryptionKey
+
+ :return: A Deferred which fires when the key is deleted, or which
+ fails with KeyNotFound if the key was not found on local
+ storage.
+ :rtype: Deferred
"""
leap_assert_type(key, OpenPGPKey)
- stored_key = self.get_key(key.address, private=key.private)
- if stored_key is None:
- raise errors.KeyNotFound(key)
- if stored_key.__dict__ != key.__dict__:
- raise errors.KeyAttributesDiffer(key)
- doc = self._get_key_doc(key.address, key.private)
- self._soledad.delete_doc(doc)
+
+ def delete_docs(activedocs):
+ deferreds = []
+ for doc in activedocs:
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+ def get_key_docs(_):
+ return self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+
+ def delete_key(docs):
+ if len(docs) == 0:
+ raise errors.KeyNotFound(key)
+ if len(docs) > 1:
+ logger.critical("There is more than one key for key_id %s"
+ % key.key_id)
+
+ doc = None
+ for d in docs:
+ if d.content['fingerprint'] == key.fingerprint:
+ doc = d
+ break
+ if doc is None:
+ raise errors.KeyNotFound(key)
+ return self._soledad.delete_doc(doc)
+
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ d.addCallback(delete_docs)
+ d.addCallback(get_key_docs)
+ d.addCallback(delete_key)
+ return d
#
# Data encryption, decryption, signing and verifying
@@ -449,10 +657,8 @@ class OpenPGPScheme(EncryptionScheme):
Return a gpg wrapper that implements the context manager protocol and
contains C{keys}.
- :param key_data: ASCII armored key data.
- :type key_data: str
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
+ :param keys: keys to conform the keyring.
+ :type key: list(OpenPGPKey)
:return: a TempGPGWrapper instance
:rtype: TempGPGWrapper
@@ -536,12 +742,10 @@ class OpenPGPScheme(EncryptionScheme):
:param verify: The key used to verify a signature.
:type verify: OpenPGPKey
- :return: The decrypted data.
- :rtype: unicode
+ :return: The decrypted data and if signature verifies
+ :rtype: (unicode, bool)
:raise DecryptError: Raised if failed decrypting for some reason.
- :raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
"""
leap_assert(privkey.private is True, 'Key is not private.')
keys = [privkey]
@@ -554,15 +758,15 @@ class OpenPGPScheme(EncryptionScheme):
result = gpg.decrypt(
data, passphrase=passphrase, always_trust=True)
self._assert_gpg_result_ok(result)
+
# verify signature
- if (verify is not None):
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, result.stderr))
+ sign_valid = False
+ if (verify is not None and
+ result.valid is True and
+ verify.fingerprint == result.pubkey_fingerprint):
+ sign_valid = True
- return result.data
+ return (result.data, sign_valid)
except errors.GPGError as e:
logger.error('Failed to decrypt: %s.' % str(e))
raise errors.DecryptError(str(e))
@@ -638,8 +842,8 @@ class OpenPGPScheme(EncryptionScheme):
verified against this detached signature.
:type detached_sig: str
- :return: The ascii-armored signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
leap_assert_type(pubkey, OpenPGPKey)
leap_assert(pubkey.private is False)
@@ -654,15 +858,10 @@ class OpenPGPScheme(EncryptionScheme):
sf, sfname = tempfile.mkstemp()
with os.fdopen(sf, 'w') as sfd:
sfd.write(detached_sig)
- with closing(_make_binary_stream(data, gpg._encoding)) as df:
- result = gpg.verify_file(df, sig_file=sfname)
+ result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
+ os.unlink(sfname)
gpgpubkey = gpg.list_keys().pop()
valid = result.valid
rfprint = result.fingerprint
kfprint = gpgpubkey['fingerprint']
- # raise in case sig is invalid
- if valid is False or rfprint != kfprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature '
- 'with key %s.' % gpgpubkey['keyid'])
- return True
+ return valid and rfprint == kfprint