diff options
| author | Kali Kaneko <kali@leap.se> | 2013-05-13 23:06:35 +0900 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2013-05-21 01:57:26 +0900 | 
| commit | 7c6a87acaead5f54e1f2155ecf0a089eff97d654 (patch) | |
| tree | 57faff0c4f476557c3e457514f5aed99abfbc3de /src/leap/common | |
| parent | 2a78ab3c730ba652153deaed55cfa8d92089a979 (diff) | |
use temporary openpgpwrapper as a context manager
in this way we implicitely catch any exception during
the wrapped call, and ensure that the destructor is always
called.
Diffstat (limited to 'src/leap/common')
| -rw-r--r-- | src/leap/common/keymanager/__init__.py | 3 | ||||
| -rw-r--r-- | src/leap/common/keymanager/errors.py | 7 | ||||
| -rw-r--r-- | src/leap/common/keymanager/openpgp.py | 526 | ||||
| -rw-r--r-- | src/leap/common/tests/test_keymanager.py | 47 | 
4 files changed, 340 insertions, 243 deletions
| diff --git a/src/leap/common/keymanager/__init__.py b/src/leap/common/keymanager/__init__.py index 3427f03..30a9146 100644 --- a/src/leap/common/keymanager/__init__.py +++ b/src/leap/common/keymanager/__init__.py @@ -160,7 +160,8 @@ class KeyManager(object):              privkey = self.get_key(                  self._address, ktype, private=True, fetch_remote=False)              privkey = json.loads(privkey.get_json()) -            privkey.key_data = encrypt_sym(privkey.key_data, password) +            privkey.key_data = encrypt_sym( +                privkey.key_data, passphrase=password)              data['keys'].append(privkey)          self._fetcher.put(              self._nickserver_url + '/key/' + self._address, diff --git a/src/leap/common/keymanager/errors.py b/src/leap/common/keymanager/errors.py index d6802f0..89949d2 100644 --- a/src/leap/common/keymanager/errors.py +++ b/src/leap/common/keymanager/errors.py @@ -72,6 +72,13 @@ class DecryptionFailed(Exception):      pass +class EncryptionDecryptionFailed(Exception): +    """ +    Raised upon failures of encryption/decryption. +    """ +    pass + +  class SignFailed(Exception):      """      Raised when failed to sign. diff --git a/src/leap/common/keymanager/openpgp.py b/src/leap/common/keymanager/openpgp.py index d578638..e60833b 100644 --- a/src/leap/common/keymanager/openpgp.py +++ b/src/leap/common/keymanager/openpgp.py @@ -15,26 +15,18 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. -  """  Infrastructure for using OpenPGP keys in Key Manager.  """ - - +import logging +import os  import re -import tempfile  import shutil +import tempfile  from leap.common.check import leap_assert, leap_assert_type -from leap.common.keymanager.errors import ( -    KeyNotFound, -    KeyAlreadyExists, -    KeyAttributesDiffer, -    InvalidSignature, -    EncryptionFailed, -    DecryptionFailed, -    SignFailed, -) +from leap.common.keymanager import errors +  from leap.common.keymanager.keys import (      EncryptionKey,      EncryptionScheme, @@ -44,12 +36,242 @@ from leap.common.keymanager.keys import (  )  from leap.common.keymanager.gpg import GPGWrapper +logger = logging.getLogger(__name__) + + +# +# gpg wrapper and decorator +# + +def temporary_gpgwrapper(keys=None): +    """ +    Returns a unitary gpg wrapper that implements context manager +    protocol. + +    @param key_data: ASCII armored key data. +    @type key_data: str + +    @return: a GPGWrapper instance +    @rtype: GPGWrapper +    """ +    # TODO do here checks on key_data +    return TempGPGWrapper(keys=keys) + + +def with_temporary_gpg(fun): +    """ +    Decorator to add a temporary gpg wrapper as context +    to gpg related functions. + +    Decorated functions are expected to return a function whose only +    argument is a gpgwrapper instance. +    """ +    def wrapped(*args, **kwargs): +        """ +        We extract the arguments passed to the wrapped function, +        run the function and do validations. +        We expect that the positional arguments are `data`, +        and an optional `key`. +        All the rest of arguments should be passed as named arguments +        to allow for a correct unpacking. +        """ +        if len(args) == 2: +            keys = args[1] if isinstance(args[1], OpenPGPKey) else None +        else: +            keys = None + +        # sign/verify keys passed as arguments +        sign = kwargs.get('sign', None) +        if sign: +            keys = [keys, sign] + +        verify = kwargs.get('verify', None) +        if verify: +            keys = [keys, verify] + +        # is the wrapped function sign or verify? +        fun_name = fun.__name__ +        is_sign_function = True if fun_name == "sign" else False +        is_verify_function = True if fun_name == "verify" else False + +        result = None + +        with temporary_gpgwrapper(keys) as gpg: +            result = fun(*args, **kwargs)(gpg) + +            # TODO: cleanup a little bit the +            # validation. maybe delegate to other +            # auxiliary functions for clarity. + +            ok = getattr(result, 'ok', None) + +            stderr = getattr(result, 'stderr', None) +            if stderr: +                logger.debug("%s" % (stderr,)) + +            if ok is False: +                raise errors.EncryptionDecryptionFailed( +                    'Failed to encrypt/decrypt in %s: %s' % ( +                        fun.__name__, +                        stderr)) + +            if verify is not None: +                # A verify key has been passed +                if result.valid is False or \ +                        verify.fingerprint != result.pubkey_fingerprint: +                    raise errors.InvalidSignature( +                        'Failed to verify signature with key %s: %s' % +                        (verify.key_id, stderr)) + +            if is_sign_function: +                # Specific validation for sign function +                privkey = gpg.list_keys(secret=True).pop() +                rfprint = result.fingerprint +                kfprint = privkey['fingerprint'] +                if result.fingerprint is None: +                    raise errors.SignFailed( +                        'Failed to sign with key %s: %s' % +                        (privkey['keyid'], stderr)) +                leap_assert( +                    result.fingerprint == kfprint, +                    'Signature and private key fingerprints mismatch: ' +                    '%s != %s' % +                    (rfprint, kfprint)) + +            if is_verify_function: +                # Specific validation for verify function +                pubkey = gpg.list_keys().pop() +                valid = result.valid +                rfprint = result.fingerprint +                kfprint = pubkey['fingerprint'] +                if valid is False or rfprint != kfprint: +                    raise errors.InvalidSignature( +                        'Failed to verify signature ' +                        'with key %s.' % pubkey['keyid']) +                result = result.valid + +            # ok, enough checks. let's return data if available +            if hasattr(result, 'data'): +                result = result.data +        return result +    return wrapped + + +class TempGPGWrapper(object): +    """ +    A context manager returning a temporary GPG wrapper keyring, which +    contains exactly zero or one pubkeys, and zero or one privkeys. + +    Temporary unitary keyrings allow the to use GPG's facilities for exactly +    one key. This function creates an empty temporary keyring and imports +    C{keys} if it is not None. +    """ +    def __init__(self, keys=None): +        """ +        @param keys: OpenPGP key, or list of. +        @type keys: OpenPGPKey or list of OpenPGPKeys +        """ +        self._gpg = None +        if not keys: +            keys = list() +        if not isinstance(keys, list): +            keys = [keys] +        self._keys = keys +        for key in filter(None, keys): +            leap_assert_type(key, OpenPGPKey) + +    def __enter__(self): +        """ +        Calls the unitary gpgwrapper initializer + +        @return: A GPG wrapper with a unitary keyring. +        @rtype: gnupg.GPG +        """ +        self._build_keyring() +        return self._gpg + +    def __exit__(self, exc_type, exc_value, traceback): +        """ +        Ensures the gpgwrapper is properly destroyed. +        """ +        # TODO handle exceptions and log here +        self._destroy_keyring() + +    def _build_keyring(self): +        """ +        Create an empty GPG keyring and import C{keys} into it. + +        @param keys: List of keys to add to the keyring. +        @type keys: list of OpenPGPKey + +        @return: A GPG wrapper with a unitary keyring. +        @rtype: gnupg.GPG +        """ +        privkeys = [key for key in self._keys if key and key.private is True] +        publkeys = [key for key in self._keys if key and key.private is False] +        # here we filter out public keys that have a correspondent +        # private key in the list because the private key_data by +        # itself is enough to also have the public key in the keyring, +        # and we want to count the keys afterwards. + +        privaddrs = map(lambda privkey: privkey.address, privkeys) +        publkeys = filter( +            lambda pubkey: pubkey.address not in privaddrs, publkeys) + +        listkeys = lambda: self._gpg.list_keys() +        listsecretkeys = lambda: self._gpg.list_keys(secret=True) + +        self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp()) +        leap_assert(len(listkeys()) is 0, 'Keyring not empty.') + +        # import keys into the keyring: +        # concatenating ascii-armored keys, which is correctly +        # understood by the GPGWrapper. + +        self._gpg.import_keys("".join( +            [x.key_data for x in publkeys + privkeys])) + +        # assert the number of keys in the keyring +        leap_assert( +            len(listkeys()) == len(publkeys) + len(privkeys), +            'Wrong number of public keys in keyring: %d, should be %d)' % +            (len(listkeys()), len(publkeys) + len(privkeys))) +        leap_assert( +            len(listsecretkeys()) == len(privkeys), +            'Wrong number of private keys in keyring: %d, should be %d)' % +            (len(listsecretkeys()), len(privkeys))) + +    def _destroy_keyring(self): +        """ +        Securely erase a unitary keyring. +        """ +        # TODO: implement some kind of wiping of data or a more +        # secure way that +        # does not write to disk. + +        try: +            for secret in [True, False]: +                for key in self._gpg.list_keys(secret=secret): +                    self._gpg.delete_keys( +                        key['fingerprint'], +                        secret=secret) +            leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!') + +        except: +            raise + +        finally: +            leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'), +                        "watch out! Tried to remove default gnupg home!") +            shutil.rmtree(self._gpg.gnupghome) +  #  # API functions  # -def encrypt_sym(data, passphrase, sign=None): +@with_temporary_gpg +def encrypt_sym(data, passphrase=None, sign=None):      """      Encrypt C{data} with C{passphrase} and sign with C{sign} private key. @@ -68,23 +290,19 @@ def encrypt_sym(data, passphrase, sign=None):          leap_assert_type(sign, OpenPGPKey)          leap_assert(sign.private is True) -    def _encrypt_cb(gpg): -        result = gpg.encrypt( -            data, None, -            sign=sign.key_id if sign else None, -            passphrase=passphrase, symmetric=True) -        # Here we cannot assert for correctness of sig because the sig is in -        # the ciphertext. -        # result.ok    - (bool) indicates if the operation succeeded -        # result.data  - (bool) contains the result of the operation -        if result.ok is False: -            raise EncryptionFailed('Failed to encrypt: %s' % result.stderr) -        return result.data +    # Here we cannot assert for correctness of sig because the sig is in +    # the ciphertext. +    # result.ok    - (bool) indicates if the operation succeeded +    # result.data  - (bool) contains the result of the operation -    return _safe_call(_encrypt_cb, [sign]) +    return lambda gpg: gpg.encrypt( +        data, None, +        sign=sign.key_id if sign else None, +        passphrase=passphrase, symmetric=True) -def decrypt_sym(data, passphrase, verify=None): +@with_temporary_gpg +def decrypt_sym(data, passphrase=None, verify=None):      """      Decrypt C{data} with C{passphrase} and verify with C{verify} public      key. @@ -107,27 +325,17 @@ def decrypt_sym(data, passphrase, verify=None):          leap_assert_type(verify, OpenPGPKey)          leap_assert(verify.private is False) -    def _decrypt_cb(gpg): -        result = gpg.decrypt(data, passphrase=passphrase) -        # result.ok    - (bool) indicates if the operation succeeded -        # result.valid - (bool) indicates if the signature was verified -        # result.data  - (bool) contains the result of the operation -        # result.pubkey_fingerpring  - (str) contains the fingerprint of the -        #                              public key that signed this data. -        if result.ok is False: -            raise DecryptionFailed('Failed to decrypt: %s', result.stderr) -        if verify is not None: -            if result.valid is False or \ -                    verify.fingerprint != result.pubkey_fingerprint: -                raise InvalidSignature( -                    'Failed to verify signature with key %s: %s' % -                    (verify.key_id, result.stderr)) -        return result.data - -    return _safe_call(_decrypt_cb, [verify]) - - -def encrypt_asym(data, pubkey, sign=None): +    # result.ok    - (bool) indicates if the operation succeeded +    # result.valid - (bool) indicates if the signature was verified +    # result.data  - (bool) contains the result of the operation +    # result.pubkey_fingerpring  - (str) contains the fingerprint of the +    #                              public key that signed this data. +    return lambda gpg: gpg.decrypt( +        data, passphrase=passphrase) + + +@with_temporary_gpg +def encrypt_asym(data, key, passphrase=None, sign=None):      """      Encrypt C{data} using public @{key} and sign with C{sign} key. @@ -141,31 +349,25 @@ def encrypt_asym(data, pubkey, sign=None):      @return: The encrypted data.      @rtype: str      """ -    leap_assert_type(pubkey, OpenPGPKey) -    leap_assert(pubkey.private is False, 'Key is not public.') +    leap_assert_type(key, OpenPGPKey) +    leap_assert(key.private is False, 'Key is not public.')      if sign is not None:          leap_assert_type(sign, OpenPGPKey)          leap_assert(sign.private is True) -    def _encrypt_cb(gpg): -        result = gpg.encrypt( -            data, pubkey.fingerprint, -            sign=sign.key_id if sign else None, -            symmetric=False) -        # Here we cannot assert for correctness of sig because the sig is in -        # the ciphertext. -        # result.ok    - (bool) indicates if the operation succeeded -        # result.data  - (bool) contains the result of the operation -        if result.ok is False: -            raise EncryptionFailed( -                'Failed to encrypt with key %s: %s' % -                (pubkey.key_id, result.stderr)) -        return result.data - -    return _safe_call(_encrypt_cb, [pubkey, sign]) - - -def decrypt_asym(data, privkey, verify=None): +    # Here we cannot assert for correctness of sig because the sig is in +    # the ciphertext. +    # result.ok    - (bool) indicates if the operation succeeded +    # result.data  - (bool) contains the result of the operation + +    return lambda gpg: gpg.encrypt( +        data, key.fingerprint, +        sign=sign.key_id if sign else None, +        passphrase=passphrase, symmetric=False) + + +@with_temporary_gpg +def decrypt_asym(data, key, passphrase=None, verify=None):      """      Decrypt C{data} using private @{key} and verify with C{verify} key. @@ -182,32 +384,16 @@ def decrypt_asym(data, privkey, verify=None):      @raise InvalidSignature: Raised if unable to verify the signature with          C{verify} key.      """ -    leap_assert(privkey.private is True, 'Key is not private.') +    leap_assert(key.private is True, 'Key is not private.')      if verify is not None:          leap_assert_type(verify, OpenPGPKey)          leap_assert(verify.private is False) -    def _decrypt_cb(gpg): -        result = gpg.decrypt(data) -        # result.ok    - (bool) indicates if the operation succeeded -        # result.valid - (bool) indicates if the signature was verified -        # result.data  - (bool) contains the result of the operation -        # result.pubkey_fingerpring  - (str) contains the fingerprint of the -        #                              public key that signed this data. -        if result.ok is False: -            raise DecryptionFailed('Failed to decrypt with key %s: %s' % -                                   (privkey.key_id, result.stderr)) -        if verify is not None: -            if result.valid is False or \ -                    verify.fingerprint != result.pubkey_fingerprint: -                raise InvalidSignature( -                    'Failed to verify signature with key %s: %s' % -                    (verify.key_id, result.stderr)) -        return result.data - -    return _safe_call(_decrypt_cb, [privkey, verify]) +    return lambda gpg: gpg.decrypt( +        data, passphrase=passphrase) +@with_temporary_gpg  def is_encrypted(data):      """      Return whether C{data} was encrypted using OpenPGP. @@ -218,13 +404,10 @@ def is_encrypted(data):      @return: Whether C{data} was encrypted using this wrapper.      @rtype: bool      """ - -    def _is_encrypted_cb(gpg): -        return gpg.is_encrypted(data) - -    return _safe_call(_is_encrypted_cb) +    return lambda gpg: gpg.is_encrypted(data) +@with_temporary_gpg  def is_encrypted_sym(data):      """      Return whether C{data} was encrypted using a public OpenPGP key. @@ -235,13 +418,10 @@ def is_encrypted_sym(data):      @return: Whether C{data} was encrypted using this wrapper.      @rtype: bool      """ - -    def _is_encrypted_cb(gpg): -        return gpg.is_encrypted_sym(data) - -    return _safe_call(_is_encrypted_cb) +    return lambda gpg: gpg.is_encrypted_sym(data) +@with_temporary_gpg  def is_encrypted_asym(data):      """      Return whether C{data} was asymmetrically encrypted using OpenPGP. @@ -252,19 +432,17 @@ def is_encrypted_asym(data):      @return: Whether C{data} was encrypted using this wrapper.      @rtype: bool      """ - -    def _is_encrypted_cb(gpg): -        return gpg.is_encrypted_asym(data) - -    return _safe_call(_is_encrypted_cb) +    return lambda gpg: gpg.is_encrypted_asym(data) +@with_temporary_gpg  def sign(data, privkey):      """      Sign C{data} with C{privkey}.      @param data: The data to be signed.      @type data: str +      @param privkey: The private key to be used to sign.      @type privkey: OpenPGPKey @@ -274,53 +452,36 @@ def sign(data, privkey):      leap_assert_type(privkey, OpenPGPKey)      leap_assert(privkey.private is True) -    def _sign_cb(gpg): -        result = gpg.sign(data, keyid=privkey.key_id) -        # result.fingerprint - contains the fingerprint of the key used to -        #                      sign. -        if result.fingerprint is None: -            raise SignFailed( -                'Failed to sign with key %s: %s' % -                (privkey.key_id, result.stderr)) -        leap_assert( -            result.fingerprint == privkey.fingerprint, -            'Signature and private key fingerprints mismatch: %s != %s' % -            (result.fingerprint, privkey.fingerprint)) -        return result.data - -    return _safe_call(_sign_cb, [privkey]) +    # result.fingerprint - contains the fingerprint of the key used to +    #                      sign. +    return lambda gpg: gpg.sign(data, keyid=privkey.key_id) -def verify(data, pubkey): +@with_temporary_gpg +def verify(data, key):      """      Verify signed C{data} with C{pubkey}.      @param data: The data to be verified.      @type data: str +      @param pubkey: The public key to be used on verification.      @type pubkey: OpenPGPKey      @return: The ascii-armored signed data.      @rtype: str      """ -    leap_assert_type(pubkey, OpenPGPKey) -    leap_assert(pubkey.private is False) - -    def _verify_cb(gpg): -        result = gpg.verify(data) -        if result.valid is False or \ -                result.fingerprint != pubkey.fingerprint: -            raise InvalidSignature( -                'Failed to verify signature with key %s.' % pubkey.key_id) -        return True +    leap_assert_type(key, OpenPGPKey) +    leap_assert(key.private is False) -    return _safe_call(_verify_cb, [pubkey]) +    return lambda gpg: gpg.verify(data)  #  # Helper functions  # +  def _build_key_from_gpg(address, key, key_data):      """      Build an OpenPGPKey for C{address} based on C{key} from @@ -350,81 +511,6 @@ def _build_key_from_gpg(address, key, key_data):      ) -def _build_keyring(keys=[]): -    """ - -    Create an empty GPG keyring and import C{keys} into it. - -    @param keys: List of keys to add to the keyring. -    @type keys: list of OpenPGPKey - -    @return: A GPG wrapper with a unitary keyring. -    @rtype: gnupg.GPG -    """ -    privkeys = filter(lambda key: key.private is True, keys) -    pubkeys = filter(lambda key: key.private is False, keys) -    # here we filter out public keys that have a correspondent private key in -    # the list because the private key_data by itself is enough to also have -    # the public key in the keyring, and we want to count the keys afterwards. -    privaddrs = map(lambda privkey: privkey.address, privkeys) -    pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys) -    # create temporary dir for temporary gpg keyring -    tmpdir = tempfile.mkdtemp() -    gpg = GPGWrapper(gnupghome=tmpdir) -    leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.') -    # import keys into the keyring -    gpg.import_keys( -        reduce( -            lambda x, y: x+y, -            [key.key_data for key in pubkeys+privkeys], '')) -    # assert the number of keys in the keyring -    leap_assert( -        len(gpg.list_keys()) == len(pubkeys)+len(privkeys), -        'Wrong number of public keys in keyring: %d, should be %d)' % -        (len(gpg.list_keys()), len(pubkeys)+len(privkeys))) -    leap_assert( -        len(gpg.list_keys(secret=True)) == len(privkeys), -        'Wrong number of private keys in keyring: %d, should be %d)' % -        (len(gpg.list_keys(secret=True)), len(privkeys))) -    return gpg - - -def _destroy_keyring(gpg): -    """ -    Securely erase a keyring. - -    @param gpg: A GPG wrapper instance. -    @type gpg: gnupg.GPG -    """ -    for secret in [True, False]: -        for key in gpg.list_keys(secret=secret): -            gpg.delete_keys( -                key['fingerprint'], -                secret=secret) -    leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty!') -    # TODO: implement some kind of wiping of data or a more secure way that -    # does not write to disk. -    shutil.rmtree(gpg.gnupghome) - - -def _safe_call(callback, keys=[]): -    """ -    Run C{callback} over a keyring containing C{keys}. - -    @param callback: Function whose first argument is the gpg keyring. -    @type callback: function(gnupg.GPG) -    @param keys: List of keys to add to the keyring. -    @type keys: list of OpenPGPKey - -    @return: The results of the callback. -    @rtype: str or bool -    """ -    gpg = _build_keyring(filter(lambda key: key is not None, keys)) -    val = callback(gpg) -    _destroy_keyring(gpg) -    return val - -  #  # The OpenPGP wrapper  # @@ -463,11 +549,11 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert(is_address(address), 'Not an user address: %s' % address)          try:              self.get_key(address) -            raise KeyAlreadyExists(address) -        except KeyNotFound: +            raise errors.KeyAlreadyExists(address) +        except errors.KeyNotFound:              pass -        def _gen_key_cb(gpg): +        def _gen_key(gpg):              params = gpg.gen_key_input(                  key_type='RSA',                  key_length=4096, @@ -495,7 +581,10 @@ class OpenPGPScheme(EncryptionScheme):                      gpg.export_keys(key['fingerprint'], secret=secret))                  self.put_key(openpgp_key) -        _safe_call(_gen_key_cb) +        with temporary_gpgwrapper() as gpg: +            # TODO: inspect result, or use decorator +            _gen_key(gpg) +          return self.get_key(address, private=True)      def get_key(self, address, private=False): @@ -514,7 +603,7 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert(is_address(address), 'Not an user address: %s' % address)          doc = self._get_key_doc(address, private)          if doc is None: -            raise KeyNotFound(address) +            raise errors.KeyNotFound(address)          return build_key_from_dict(OpenPGPKey, address, doc.content)      def put_ascii_key(self, key_data): @@ -525,11 +614,14 @@ class OpenPGPScheme(EncryptionScheme):          @type key_data: str          """          leap_assert_type(key_data, str) +        # TODO: add more checks for correct key data. +        leap_assert(key_data is not None, 'Data does not represent a key.') -        def _put_ascii_key_cb(gpg): +        def _put_ascii_key(gpg):              gpg.import_keys(key_data)              privkey = None              pubkey = None +              try:                  privkey = gpg.list_keys(secret=True).pop()              except IndexError: @@ -561,7 +653,9 @@ class OpenPGPScheme(EncryptionScheme):                  gpg.export_keys(pubkey['fingerprint'], secret=False))              self.put_key(openpgp_pubkey) -        _safe_call(_put_ascii_key_cb) +        with temporary_gpgwrapper() as gpg: +            # TODO: inspect result, or use decorator +            _put_ascii_key(gpg)      def put_key(self, key):          """ @@ -606,9 +700,9 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')          stored_key = self.get_key(key.address, private=key.private)          if stored_key is None: -            raise KeyNotFound(key) +            raise errors.KeyNotFound(key)          if stored_key.__dict__ != key.__dict__: -            raise KeyAttributesDiffer(key) +            raise errors.KeyAttributesDiffer(key)          doc = self._soledad.get_doc(              keymanager_doc_id(OpenPGPKey, key.address, key.private))          self._soledad.delete_doc(doc) diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py index 48f7273..a7aa1ca 100644 --- a/src/leap/common/tests/test_keymanager.py +++ b/src/leap/common/tests/test_keymanager.py @@ -30,16 +30,15 @@ except ImportError:  from leap.common.testing.basetest import BaseLeapTest  from leap.soledad import Soledad -from leap.soledad.crypto import SoledadCrypto - +#from leap.soledad.crypto import SoledadCrypto  from leap.common.keymanager import (      KeyManager,      openpgp,      KeyNotFound,      NoPasswordGiven, -    TAGS_INDEX, -    TAGS_AND_PRIVATE_INDEX, +    #TAGS_INDEX, +    #TAGS_AND_PRIVATE_INDEX,  )  from leap.common.keymanager.openpgp import OpenPGPKey  from leap.common.keymanager.keys import ( @@ -240,14 +239,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):              KeyNotFound, pgp.get_key, ADDRESS, private=True)      def test_openpgp_encrypt_decrypt_sym(self): -        cyphertext = openpgp.encrypt_sym('data', 'pass') +        cyphertext = openpgp.encrypt_sym( +            'data', passphrase='pass')          self.assertTrue(cyphertext is not None)          self.assertTrue(cyphertext != '')          self.assertTrue(cyphertext != 'data')          self.assertTrue(openpgp.is_encrypted_sym(cyphertext))          self.assertFalse(openpgp.is_encrypted_asym(cyphertext))          self.assertTrue(openpgp.is_encrypted(cyphertext)) -        plaintext = openpgp.decrypt_sym(cyphertext, 'pass') +        plaintext = openpgp.decrypt_sym( +            cyphertext, passphrase='pass')          self.assertEqual('data', plaintext)      def test_verify_with_private_raises(self): @@ -298,7 +299,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pubkey = pgp.get_key(ADDRESS, private=False)          self.assertRaises(              AssertionError, -            openpgp.encrypt_sym, data, '123', sign=pubkey) +            openpgp.encrypt_sym, data, passphrase='123', sign=pubkey)      def test_decrypt_asym_verify_with_private_raises(self):          pgp = openpgp.OpenPGPScheme(self._soledad) @@ -321,7 +322,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pubkey = pgp.get_key(ADDRESS, private=False)          encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey)          pgp.put_ascii_key(PUBLIC_KEY_2) -        wrongkey = pgp.get_key('anotheruser@leap.se') +        wrongkey = pgp.get_key(ADDRESS_2)          self.assertRaises(              errors.InvalidSignature,              openpgp.verify, encrypted_and_signed, wrongkey) @@ -332,19 +333,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          encrypted_and_signed = openpgp.encrypt_sym(data, '123', sign=privkey) -        self.assertRaises( -            AssertionError, -            openpgp.decrypt_sym, -            encrypted_and_signed, 'decrypt', verify=privkey) - -    def test_decrypt_sym_verify_with_private_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) -        pgp.put_ascii_key(PRIVATE_KEY) -        data = 'data' -        privkey = pgp.get_key(ADDRESS, private=True) -        encrypted_and_signed = openpgp.encrypt_sym(data, '123', sign=privkey)          pgp.put_ascii_key(PUBLIC_KEY_2) -        wrongkey = pgp.get_key('anotheruser@leap.se') +        wrongkey = pgp.get_key(ADDRESS_2)          self.assertRaises(              errors.InvalidSignature,              openpgp.verify, encrypted_and_signed, wrongkey) @@ -385,8 +375,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          self.assertEqual(data, res) -class KeyManagerKeyManagementTestCase( -        KeyManagerWithSoledadTestCase): +class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_get_all_keys_in_db(self):          km = self._key_manager() @@ -477,15 +466,15 @@ class KeyManagerKeyManagementTestCase(              headers = {'content-type': 'application/json'}              def json(self): -                return {'address': 'anotheruser@leap.se', 'keys': []} +                return {'address': ADDRESS_2, 'keys': []}          km._fetcher.get = Mock(              return_value=Response())          # do the fetch -        km.fetch_keys_from_server('anotheruser@leap.se') +        km.fetch_keys_from_server(ADDRESS_2)          # and verify the call          km._fetcher.get.assert_called_once_with( -            km._nickserver_url + '/key/' + 'anotheruser@leap.se', +            km._nickserver_url + '/key/' + ADDRESS_2,          )      def test_refresh_keys(self): @@ -495,11 +484,13 @@ class KeyManagerKeyManagementTestCase(          km.fetch_keys_from_server = Mock(return_value=[])          km.refresh_keys()          km.fetch_keys_from_server.assert_called_once_with( -            'leap@leap.se' +            ADDRESS          )  # Key material for testing + +# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"  KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"  PUBLIC_KEY = """  -----BEGIN PGP PUBLIC KEY BLOCK----- @@ -662,6 +653,7 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=  -----END PGP PRIVATE KEY BLOCK-----  """ +# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>"  PUBLIC_KEY_2 = """  -----BEGIN PGP PUBLIC KEY BLOCK-----  Version: GnuPG v1.4.10 (GNU/Linux) @@ -719,3 +711,6 @@ THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0  =a5gs  -----END PGP PRIVATE KEY BLOCK-----  """ +import unittest +if __name__ == "__main__": +    unittest.main() | 
