From b3ad976ec8aa64a00cc824dc57aa2135ab41deb6 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 22 Apr 2013 10:39:58 -0300 Subject: Add send_keys() and refresh_keys() to Key Manager. --- src/leap/common/keymanager/openpgp.py | 112 ++++++++++++++++------------------ 1 file changed, 51 insertions(+), 61 deletions(-) (limited to 'src/leap/common/keymanager/openpgp.py') diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index 1c51d94..cd37138 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# openpgpwrapper.py +# openpgp.py # Copyright (C) 2013 LEAP # # This program is free software: you can redistribute it and/or modify @@ -25,7 +25,7 @@ import re import tempfile import shutil -from hashlib import sha256 +from leap.common.check import leap_assert from leap.common.keymanager.errors import ( KeyNotFound, KeyAlreadyExists, @@ -35,45 +35,40 @@ from leap.common.keymanager.keys import ( KeyTypeWrapper, ) from leap.common.keymanager.gpg import GPGWrapper +from leap.common.keymanager.util import ( + _is_address, + _build_key_from_doc, + _keymanager_doc_id, +) # # Utility functions # -def _is_address(address): - """ - Return whether the given C{address} is in the form user@provider. - - @param address: The address to be tested. - @type address: str - @return: Whether C{address} is in the form user@provider. - @rtype: bool +def _encrypt_symmetric(data, password): """ - return bool(re.match('[\w.-]+@[\w.-]+', address)) + Encrypt C{data} with C{password}. + This function uses the OpenPGP wrapper to perform the encryption. -def _build_key_from_doc(address, doc): + @param data: The data to be encrypted. + @type data: str + @param password: The password used to encrypt C{data}. + @type password: str + @return: The encrypted data. + @rtype: str """ - Build an OpenPGPKey for C{address} based on C{doc} from local storage. + cyphertext = None - @param address: The address bound to the key. - @type address: str - @param doc: Document obtained from Soledad storage. - @type doc: leap.soledad.backends.leap_backend.LeapDocument - @return: The built key. - @rtype: OpenPGPKey - """ - return OpenPGPKey( - address, - key_id=doc.content['key_id'], - fingerprint=doc.content['fingerprint'], - key_data=doc.content['key_data'], - private=doc.content['private'], - length=doc.content['length'], - expiry_date=doc.content['expiry_date'], - validation=None, # TODO: verify for validation. - ) + def _encrypt_cb(gpg): + cyphertext = str( + gpg.encrypt( + data, None, passphrase=password, symmetric=True)) + data['keys'].append(privkey) + + _safe_call(_encrypt_cb) + return cyphertext def _build_key_from_gpg(address, key, key_data): @@ -90,7 +85,7 @@ def _build_key_from_gpg(address, key, key_data): @type key: dict @param key_data: Key data obtained from GPG storage. @type key_data: str - @return: The built key. + @return: An instance of the key. @rtype: OpenPGPKey """ return OpenPGPKey( @@ -105,24 +100,6 @@ def _build_key_from_gpg(address, key, key_data): ) -def _keymanager_doc_id(address, private=False): - """ - Return the document id for the document containing a key for - C{address}. - - @param address: The address bound to the key. - @type address: str - @param private: Whether the key is private or not. - @type private: bool - @return: The document id for the document that stores a key bound to - C{address}. - @rtype: str - """ - assert _is_address(address) - ktype = 'private' if private else 'public' - return sha256('key-manager-'+address+'-'+ktype).hexdigest() - - def _build_unitary_gpgwrapper(key_data=None): """ Return a temporary GPG wrapper keyring containing exactly zero or one @@ -139,10 +116,13 @@ def _build_unitary_gpgwrapper(key_data=None): """ tmpdir = tempfile.mkdtemp() gpg = GPGWrapper(gnupghome=tmpdir) - assert len(gpg.list_keys()) is 0 + leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.') if key_data: gpg.import_keys(key_data) - assert len(gpg.list_keys()) is 1 + leap_assert( + len(gpg.list_keys()) is 1, + 'Unitary keyring has wrong number of keys: %d.' + % len(gpg.list_keys())) return gpg @@ -158,7 +138,7 @@ def _destroy_unitary_gpgwrapper(gpg): gpg.delete_keys( key['fingerprint'], secret=secret) - assert len(gpg.list_keys()) == 0 + leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty!') # TODO: implement some kind of wiping of data or a more secure way that # does not write to disk. shutil.rmtree(gpg.gnupghome) @@ -216,7 +196,7 @@ class OpenPGPWrapper(KeyTypeWrapper): @raise KeyAlreadyExists: If key already exists in local database. """ # make sure the key does not already exist - assert _is_address(address) + leap_assert(_is_address(address), 'Not an user address: %s' % address) try: self.get_key(address) raise KeyAlreadyExists(address) @@ -231,11 +211,18 @@ class OpenPGPWrapper(KeyTypeWrapper): name_email=address, name_comment='Generated by LEAP Key Manager.') gpg.gen_key(params) - assert len(gpg.list_keys()) is 1 # a unitary keyring! + pubkeys = gpg.list_keys() + # assert for new key characteristics + leap_assert( + len(pubkeys) is 1, # a unitary keyring! + 'Keyring has wrong number of keys: %d.' % len(pubkeys)) key = gpg.list_keys(secret=True).pop() - assert len(key['uids']) is 1 # with just one uid! - # assert for correct address - assert re.match('.*<%s>$' % address, key['uids'][0]) is not None + leap_assert( + len(key['uids']) is 1, # with just one uid! + 'Wrong number of uids for key: %d.' % len(key['uids'])) + leap_assert( + re.match('.*<%s>$' % address, key['uids'][0]) is not None, + 'Key not correctly bound to address.') openpgp_key = _build_key_from_gpg( address, key, gpg.export_keys(key['fingerprint'])) @@ -250,16 +237,18 @@ class OpenPGPWrapper(KeyTypeWrapper): @param address: The address bound to the key. @type address: str + @param private: Look for a private key instead of a public one? + @type private: bool @return: The key bound to C{address}. @rtype: OpenPGPKey @raise KeyNotFound: If the key was not found on local storage. """ - assert _is_address(address) + leap_assert(_is_address(address), 'Not an user address: %s' % address) doc = self._get_key_doc(address, private) if doc is None: raise KeyNotFound(address) - return _build_key_from_doc(address, doc) + return _build_key_from_doc(OpenPGPKey, address, doc) def put_key_raw(self, data): """ @@ -268,14 +257,15 @@ class OpenPGPWrapper(KeyTypeWrapper): @param data: The key data to be stored. @type data: str """ - assert data is not None + # TODO: add more checks for correct key data. + leap_assert(data is not None, 'Data does not represent a key.') def _put_key_raw_cb(gpg): key = gpg.list_keys(secret=False).pop() # unitary keyring # extract adress from first uid on key match = re.match('.*<([\w.-]+@[\w.-]+)>.*', key['uids'].pop()) - assert match is not None + leap_assert(match is not None, 'No user address in key data.') address = match.group(1) openpgp_key = _build_key_from_gpg( address, key, -- cgit v1.2.3