summaryrefslogtreecommitdiff
path: root/keymanager
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-09-10 10:36:53 -0400
committerKali Kaneko <kali@leap.se>2015-09-24 12:36:16 -0400
commitf3158ed7a792b7c02b2f89fd90aefa708882557f (patch)
tree75058fc10aa336a68d9bb61217a6e97533065b4c /keymanager
parent1b8847b244586dec237949d477c474bc77c8264c (diff)
[refactor] refactor key parsing
so that it can be tested without needing to instantiate the whole OpenPGPScheme object, that receives a soledad instance.
Diffstat (limited to 'keymanager')
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py130
1 files changed, 59 insertions, 71 deletions
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index b8b47d02..069a78e3 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# openpgp.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013-2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -251,7 +251,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(is_address(address), 'Not an user address: %s' % address)
def _gen_key(_):
- with self._temporary_gpgwrapper() as gpg:
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
# TODO: inspect result, or use decorator
params = gpg.gen_key_input(
key_type='RSA',
@@ -348,37 +348,25 @@ class OpenPGPScheme(EncryptionScheme):
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- with self._temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- gpg.import_keys(key_data)
- privkey = None
- pubkey = None
+ priv_info, privkey = process_ascii_key(
+ key_data, self._gpgbinary, secret=True)
+ pub_info, pubkey = process_ascii_key(
+ key_data, self._gpgbinary, secret=False)
- try:
- privkey = gpg.list_keys(secret=True).pop()
- except IndexError:
- pass
- try:
- pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
- except IndexError:
- return (None, None)
-
- openpgp_privkey = None
- if privkey is not None:
- # build private key
- openpgp_privkey = self._build_key_from_gpg(
- privkey,
- gpg.export_keys(privkey['fingerprint'], secret=True))
- leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
- 'Fingerprints for public and private key differ.',
- errors.KeyFingerprintMismatch)
-
- # build public key
- openpgp_pubkey = self._build_key_from_gpg(
- pubkey,
- gpg.export_keys(pubkey['fingerprint'], secret=False))
-
- return (openpgp_pubkey, openpgp_privkey)
+ if not pubkey:
+ return (None, None)
+
+ openpgp_privkey = None
+ if privkey:
+ # build private key
+ openpgp_privkey = self._build_key_from_gpg(priv_info, privkey)
+ leap_check(pub_info['fingerprint'] == priv_info['fingerprint'],
+ 'Fingerprints for public and private key differ.',
+ errors.KeyFingerprintMismatch)
+ # build public key
+ openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey)
+
+ return (openpgp_pubkey, openpgp_privkey)
def put_ascii_key(self, key_data, address):
"""
@@ -439,7 +427,7 @@ class OpenPGPScheme(EncryptionScheme):
oldkey = build_key_from_dict(OpenPGPKey, doc.content)
if key.fingerprint == oldkey.fingerprint:
# in case of an update of the key merge them with gnupg
- with self._temporary_gpgwrapper() as gpg:
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
gpg.import_keys(oldkey.key_data)
gpg.import_keys(key.key_data)
gpgkey = gpg.list_keys(secret=key.private).pop()
@@ -577,24 +565,7 @@ class OpenPGPScheme(EncryptionScheme):
:return: An instance of the key.
:rtype: OpenPGPKey
"""
- expiry_date = None
- if key['expires']:
- expiry_date = datetime.fromtimestamp(int(key['expires']))
- address = []
- for uid in key['uids']:
- address.append(_parse_address(uid))
-
- return OpenPGPKey(
- address,
- gpgbinary=self._gpgbinary,
- key_id=key['keyid'],
- fingerprint=key['fingerprint'],
- key_data=key_data,
- private=True if key['type'] == 'sec' else False,
- length=int(key['length']),
- expiry_date=expiry_date,
- refreshed_at=datetime.now(),
- )
+ return build_gpg_key(key, key_data, self._gpgbinary)
def delete_key(self, key):
"""
@@ -654,21 +625,6 @@ class OpenPGPScheme(EncryptionScheme):
# Data encryption, decryption, signing and verifying
#
- def _temporary_gpgwrapper(self, keys=None):
- """
- Return a gpg wrapper that implements the context manager protocol and
- contains C{keys}.
-
- :param keys: keys to conform the keyring.
- :type key: list(OpenPGPKey)
-
- :return: a TempGPGWrapper instance
- :rtype: TempGPGWrapper
- """
- # TODO do here checks on key_data
- return TempGPGWrapper(
- keys=keys, gpgbinary=self._gpgbinary)
-
@staticmethod
def _assert_gpg_result_ok(result):
"""
@@ -713,7 +669,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(sign, OpenPGPKey)
leap_assert(sign.private is True)
keys.append(sign)
- with self._temporary_gpgwrapper(keys) as gpg:
+ with TempGPGWrapper(keys, self._gpgbinary) as gpg:
result = gpg.encrypt(
data, pubkey.fingerprint,
default_key=sign.key_id if sign else None,
@@ -755,7 +711,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(verify, OpenPGPKey)
leap_assert(verify.private is False)
keys.append(verify)
- with self._temporary_gpgwrapper(keys) as gpg:
+ with TempGPGWrapper(keys, self._gpgbinary) as gpg:
try:
result = gpg.decrypt(
data, passphrase=passphrase, always_trust=True)
@@ -783,7 +739,7 @@ class OpenPGPScheme(EncryptionScheme):
:return: Whether C{data} was encrypted using this wrapper.
:rtype: bool
"""
- with self._temporary_gpgwrapper() as gpg:
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
gpgutil = GPGUtilities(gpg)
return gpgutil.is_encrypted_asym(data)
@@ -814,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme):
# result.fingerprint - contains the fingerprint of the key used to
# sign.
- with self._temporary_gpgwrapper(privkey) as gpg:
+ with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
result = gpg.sign(data, default_key=privkey.key_id,
digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
@@ -849,7 +805,7 @@ class OpenPGPScheme(EncryptionScheme):
"""
leap_assert_type(pubkey, OpenPGPKey)
leap_assert(pubkey.private is False)
- with self._temporary_gpgwrapper(pubkey) as gpg:
+ with TempGPGWrapper(pubkey, self._gpgbinary) as gpg:
result = None
if detached_sig is None:
result = gpg.verify(data)
@@ -867,3 +823,35 @@ class OpenPGPScheme(EncryptionScheme):
rfprint = result.fingerprint
kfprint = gpgpubkey['fingerprint']
return valid and rfprint == kfprint
+
+
+def process_ascii_key(key_data, gpgbinary, secret=False):
+ with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
+ try:
+ gpg.import_keys(key_data)
+ info = gpg.list_keys(secret=secret).pop()
+ key = gpg.export_keys(info['fingerprint'], secret=secret)
+ except IndexError:
+ info = {}
+ key = None
+ return info, key
+
+
+def build_gpg_key(key_info, key_data, gpgbinary=None):
+ expiry_date = None
+ if key_info['expires']:
+ expiry_date = datetime.fromtimestamp(int(key_info['expires']))
+ address = []
+ for uid in key_info['uids']:
+ address.append(_parse_address(uid))
+
+ return OpenPGPKey(
+ address,
+ gpgbinary=gpgbinary,
+ key_id=key_info['keyid'],
+ fingerprint=key_info['fingerprint'],
+ key_data=key_data,
+ private=True if key_info['type'] == 'sec' else False,
+ length=int(key_info['length']),
+ expiry_date=expiry_date,
+ refreshed_at=datetime.now())