diff options
author | drebs <drebs@leap.se> | 2013-04-23 18:40:03 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-04-23 18:53:36 -0300 |
commit | 000639eeb664c476d1cea3fe7056db94caa093c0 (patch) | |
tree | 45a34ecce725d116bb25bb107c3c6480fa139578 /src/leap/soledad | |
parent | 4c74a91ad905e7d59260ccec789930c16b29a62d (diff) |
Completelly switch to Key Manager for crypto.
This removes all GPG wrapper that was left and includes Key Manager to take
care of all crypto stuff.
Diffstat (limited to 'src/leap/soledad')
-rw-r--r-- | src/leap/soledad/__init__.py | 98 | ||||
-rw-r--r-- | src/leap/soledad/backends/leap_backend.py | 15 | ||||
-rw-r--r-- | src/leap/soledad/crypto.py | 69 | ||||
-rw-r--r-- | src/leap/soledad/tests/__init__.py | 2 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_crypto.py | 22 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_soledad.py | 14 | ||||
-rw-r--r-- | src/leap/soledad/util.py | 393 |
7 files changed, 112 insertions, 501 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index a27a586d..23d81e84 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -43,7 +43,6 @@ from hashlib import sha256 from leap.common import events -#from leap.common.keymanager.gpgwrapper import GPGWrapper from leap.soledad.config import SoledadConfig from leap.soledad.backends import sqlcipher from leap.soledad.backends.leap_backend import ( @@ -108,14 +107,14 @@ class Soledad(object): The length of the secret used for symmetric encryption. """ - def __init__(self, user, passphrase, config_path=None, gnupg_home=None, + def __init__(self, address, passphrase, config_path=None, gnupg_home=None, secret_path=None, local_db_path=None, shared_db_url=None, auth_token=None, bootstrap=True): """ Initialize configuration, cryptographic keys and dbs. - @param user: Email address of the user (username@provider). - @type user: str + @param address: User's address in the form C{user@provider}. + @type address: str @param passphrase: The passphrase for locking and unlocking encryption secrets for disk storage. @type passphrase: str @@ -138,7 +137,7 @@ class Soledad(object): @type bootstrap: bool """ # TODO: allow for fingerprint enforcing. - self._user = user + self._address = address self._passphrase = passphrase self._auth_token = auth_token self._init_config( @@ -179,7 +178,7 @@ class Soledad(object): # TODO: log each bootstrap step. # Stage 0 - Local environment setup self._init_dirs() - self._crypto = SoledadCrypto(self._config.get_gnupg_home()) + self._crypto = SoledadCrypto(self) if self._config.get_shared_db_url() and self._auth_token: # TODO: eliminate need to create db here. self._shared_db = SoledadSharedDatabase.open_database( @@ -192,12 +191,14 @@ class Soledad(object): if self._has_keys(): self._load_keys() else: - doc = self._get_keys_doc() + doc = self._fetch_keys_from_shared_db() if not doc: self._init_keys() else: - self._set_symkey(self.decrypt(doc.content['_symkey'], - passphrase=self._hash_user())) + self._set_symkey( + self._crypto.decrypt_sym( + doc.content['_symkey'], + passphrase=self._address_hash())) # Stage 2 - Keys synchronization self._assert_server_keys() # Stage 3 - Local database initialization @@ -243,12 +244,13 @@ class Soledad(object): """ Generate (if needed) and load secret for symmetric encryption. """ - events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._user) + events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._address) # load/generate secret if not self._has_symkey(): self._gen_symkey() self._load_symkey() - events.signal(events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._user) + events.signal( + events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._address) def _init_db(self): """ @@ -293,7 +295,8 @@ class Soledad(object): raise DocumentNotEncrypted( "File %s is not encrypted!" % self._config.get_secret_path()) # can we decrypt it? - cyphertext = self._crypto.decrypt(content, passphrase=self._passphrase) + cyphertext = self._crypto.decrypt_sym( + content, passphrase=self._passphrase) return bool(cyphertext) def _load_symkey(self): @@ -306,7 +309,8 @@ class Soledad(object): try: with open(self._config.get_secret_path()) as f: self._symkey = \ - self._crypto.decrypt(f.read(), passphrase=self._passphrase) + self._crypto.decrypt_sym( + f.read(), passphrase=self._passphrase) self._crypto.symkey = self._symkey except IOError: raise IOError('Failed to open secret file %s.' % @@ -317,10 +321,11 @@ class Soledad(object): Generate a secret for symmetric encryption and store in a local encrypted file. """ - self._set_symkey(''.join( + symkey = ''.join( random.choice( string.ascii_letters + - string.digits) for x in range(self.SECRET_LENGTH))) + string.digits) for x in range(self.SECRET_LENGTH)) + self._set_symkey(symkey) def _set_symkey(self, symkey): """ @@ -338,9 +343,8 @@ class Soledad(object): self._store_symkey() def _store_symkey(self): - ciphertext = self._crypto.encrypt( - self._symkey, symmetric=True, - passphrase=self._passphrase) + ciphertext = self._crypto.encrypt_sym( + self._symkey, self._passphrase) f = open(self._config.get_secret_path(), 'w') f.write(str(ciphertext)) f.close() @@ -370,7 +374,7 @@ class Soledad(object): """ self._gen_symkey() - def _hash_user(self): + def _address_hash(self): """ Calculate a hash for storing/retrieving key material on shared database, based on user's address. @@ -379,9 +383,9 @@ class Soledad(object): @rtype: str """ return b2a_base64( - sha256('user-%s' % self._user).digest())[:-1] + sha256('address-%s' % self._address).digest())[:-1] - def _get_keys_doc(self): + def _fetch_keys_from_shared_db(self): """ Retrieve the document with encrypted key material from the shared database. @@ -389,12 +393,14 @@ class Soledad(object): @return: a document with encrypted key material in its contents @rtype: LeapDocument """ - events.signal(events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._user) + events.signal( + events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._address) # TODO: change below to raise appropriate exceptions if not self._shared_db: return None - doc = self._shared_db.get_doc_unauth(self._hash_user()) - events.signal(events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._user) + doc = self._shared_db.get_doc_unauth(self._address_hash()) + events.signal( + events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._address) return doc def _assert_server_keys(self): @@ -404,20 +410,24 @@ class Soledad(object): assert self._has_keys() if not self._shared_db: return - doc = self._get_keys_doc() + doc = self._fetch_keys_from_shared_db() if doc: - remote_symkey = self.decrypt(doc.content['_symkey'], - passphrase=self._hash_user()) + remote_symkey = self.decrypt_sym( + doc.content['_symkey'], + passphrase=self._address_hash()) assert remote_symkey == self._symkey else: - events.signal(events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._user) + events.signal( + events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._address) content = { - '_symkey': self.encrypt(self._symkey), + '_symkey': self.encrypt_sym(self._symkey, self._passphrase), } - doc = LeapDocument(doc_id=self._hash_user(), crypto=self._crypto) + doc = LeapDocument(doc_id=self._address_hash(), + crypto=self._crypto) doc.content = content self._shared_db.put_doc(doc) - events.signal(events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._user) + events.signal( + events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._address) #------------------------------------------------------------------------- # Document storage, retrieval and sync @@ -497,7 +507,6 @@ class Soledad(object): """ return self._db.get_all_docs(include_deleted) - def create_doc(self, content, doc_id=None): """ Create a new document in the local encrypted database. @@ -569,7 +578,7 @@ class Soledad(object): """ # TODO: create authentication scheme for sync with server. local_gen = self._db.sync(url, creds=None, autocreate=True) - events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._user) + events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._address) return local_gen def need_sync(self, url): @@ -587,11 +596,11 @@ class Soledad(object): info = target.get_sync_info(self._db._get_replica_uid()) # compare source generation with target's last known source generation if self._db._get_generation() != info[4]: - events.signal(events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._user) + events.signal( + events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._address) return True return False - #------------------------------------------------------------------------- # Recovery document export and import #------------------------------------------------------------------------- @@ -623,13 +632,11 @@ class Soledad(object): @rtype: str """ data = json.dumps({ - 'user': self._user, + 'address': self._address, 'symkey': self._symkey, }) if passphrase: - data = self._crypto.encrypt(data, None, sign=None, - passphrase=passphrase, - symmetric=True) + data = self._crypto.encrypt_sym(data, passphrase) return data def import_recovery_document(self, data, passphrase=None): @@ -649,14 +656,23 @@ class Soledad(object): raise DocumentNotEncrypted("You provided a password but the " "recovery document is not encrypted.") if passphrase: - data = str(self._crypto.decrypt(data, passphrase=passphrase)) + data = self._crypto.decrypt_sym(data, passphrase=passphrase) data = json.loads(data) - self._user = data['user'] + self._address = data['address'] self._symkey = data['symkey'] self._crypto.symkey = self._symkey self._store_symkey() # TODO: make this work well with bootstrap. self._load_keys() + # + # Setters/getters + # + + def _get_address(self): + return self._address + + address = property(_get_address, doc='The user address.') + __all__ = ['backends', 'util', 'server', 'shared_db'] diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py index dfec9e85..1c0d5a7d 100644 --- a/src/leap/soledad/backends/leap_backend.py +++ b/src/leap/soledad/backends/leap_backend.py @@ -35,6 +35,9 @@ from u1db.remote.http_database import HTTPDatabase from u1db.errors import BrokenSyncStream +from leap.common.keymanager import KeyManager + + class NoDefaultKey(Exception): """ Exception to signal that there's no default OpenPGP key configured. @@ -120,18 +123,18 @@ class LeapDocument(Document): """ if not self._crypto: raise NoSoledadCryptoInstance() - return self._crypto.encrypt_symmetric( + return self._crypto.encrypt_sym( self.get_json(), - self._crypto._hash_passphrase(self.doc_id)) + self._crypto.passphrase_hash(self.doc_id)) def set_encrypted_content(self, cyphertext): """ Decrypt C{cyphertext} and set document's content. contents. """ - plaintext = self._crypto.decrypt_symmetric( + plaintext = self._crypto.decrypt_sym( cyphertext, - self._crypto._hash_passphrase(self.doc_id)) + self._crypto.passphrase_hash(self.doc_id)) self.set_json(plaintext) self.encryption_scheme = EncryptionSchemes.NONE @@ -299,14 +302,14 @@ class LeapSyncTarget(HTTPSyncTarget): raise DocumentNotEncrypted( 'Incoming document\'s contents should be ' 'encrypted with a symmetric key.') - plain_json = self._crypto.decrypt_symmetric( + plain_json = self._crypto.decrypt_sym( enc_json, self._crypto._symkey) elif entry['encryption_scheme'] == EncryptionScheme.PUBKEY: if not self._crypto.is_encrypted_asym(enc_json): raise DocumentNotEncrypted( 'Incoming document\'s contents should be ' 'encrypted to the user\'s public key.') - plain_json = self._crypto.decrypt(enc_json) + plain_json = self._crypto.decrypt_asym(enc_json) else: raise DocumentNotEncrypted( "Incoming document from sync is not encrypted.") diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py index 471b35e2..dcc40439 100644 --- a/src/leap/soledad/crypto.py +++ b/src/leap/soledad/crypto.py @@ -25,8 +25,7 @@ from binascii import b2a_base64 from hashlib import sha256 -from leap.common.keymanager import KeyManager -from leap.soledad.util import GPGWrapper +from leap.common.keymanager import openpgp class NoSymmetricSecret(Exception): @@ -40,42 +39,32 @@ class SoledadCrypto(object): General cryptographic functionality. """ - def __init__(self, gnupg_home, symkey=None): + def __init__(self, soledad): """ Initialize the crypto object. - @param gnupg_home: Home of the gpg instance. - @type gnupg_home: str - @param symkey: A key to use for symmetric encryption. - @type symkey: str + @param soledad: A Soledad instance for key lookup. + @type soledad: leap.soledad.Soledad """ - self._gpg = GPGWrapper(gnupghome=gnupg_home) - self._symkey = symkey + self._soledad = soledad + self._pgp = openpgp.OpenPGPScheme(self._soledad) + self._symkey = None - def encrypt(self, data, recipients=None, sign=None, passphrase=None, - symmetric=False): + def encrypt_asym(self, data, key): """ Encrypt data. @param data: the data to be encrypted @type data: str - @param recipients: to whom C{data} should be encrypted - @type recipients: list or str - @param sign: the fingerprint of key to be used for signature - @type sign: str - @param passphrase: the passphrase to be used for encryption - @type passphrase: str - @param symmetric: whether the encryption scheme should be symmetric - @type symmetric: bool + @param key: the key to be used for encryption + @type key: str @return: the encrypted data @rtype: str """ - return str(self._gpg.encrypt(data, recipients, sign=sign, - passphrase=passphrase, - symmetric=symmetric)) + return openpgp.encrypt_asym(data, key) - def encrypt_symmetric(self, data, passphrase, sign=None): + def encrypt_sym(self, data, passphrase): """ Encrypt C{data} using a {password}. @@ -83,18 +72,13 @@ class SoledadCrypto(object): @type data: str @param passphrase: the passphrase to use for encryption @type passphrase: str - @param data: the data to be encrypted - @param sign: the fingerprint of key to be used for signature - @type sign: str @return: the encrypted data @rtype: str """ - return self.encrypt(data, sign=sign, - passphrase=passphrase, - symmetric=True) + return openpgp.encrypt_sym(data, passphrase) - def decrypt(self, data, passphrase=None): + def decrypt_asym(self, data): """ Decrypt data. @@ -106,9 +90,10 @@ class SoledadCrypto(object): @return: the decrypted data @rtype: str """ - return str(self._gpg.decrypt(data, passphrase=passphrase)) + key = self._pgp.get_key(self._soledad.address, private=True) + return openpgp.decrypt_asym(data, key) - def decrypt_symmetric(self, data, passphrase): + def decrypt_sym(self, data, passphrase): """ Decrypt data using symmetric secret. @@ -120,7 +105,7 @@ class SoledadCrypto(object): @return: the decrypted data @rtype: str """ - return self.decrypt(data, passphrase=passphrase) + return openpgp.decrypt_sym(data, passphrase) def is_encrypted(self, data): """ @@ -132,7 +117,7 @@ class SoledadCrypto(object): @return: whether the data is a cyphertext @rtype: bool """ - return self._gpg.is_encrypted(data) + return openpgp.is_encrypted(data) def is_encrypted_sym(self, data): """ @@ -141,7 +126,7 @@ class SoledadCrypto(object): @return: whether data is encrypted to a symmetric key @rtype: bool """ - return self._gpg.is_encrypted_sym(data) + return openpgp.is_encrypted_sym(data) def is_encrypted_asym(self, data): """ @@ -151,14 +136,14 @@ class SoledadCrypto(object): @return: whether data is encrypted to an OpenPGP private key @rtype: bool """ - return self._gpg.is_encrypted_asym(data) + return openpgp.is_encrypted_asym(data) - def _hash_passphrase(self, suffix): + def passphrase_hash(self, suffix): """ Generate a passphrase for symmetric encryption. - The password is derived from C{suffix} and the secret for - symmetric encryption previously loaded. + The password is derived from the secret for symmetric encryption and + a C{suffix} that is appended to the secret prior to hashing. @param suffix: Will be appended to the symmetric key before hashing. @type suffix: str @@ -172,9 +157,9 @@ class SoledadCrypto(object): return b2a_base64( sha256('%s%s' % (self._symkey, suffix)).digest())[:-1] - # - # symkey setters/getters - # + # + # symkey setters/getters + # def _get_symkey(self): return self._symkey diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py index 28114391..dac27a29 100644 --- a/src/leap/soledad/tests/__init__.py +++ b/src/leap/soledad/tests/__init__.py @@ -36,7 +36,7 @@ class BaseSoledadTest(BaseLeapTest): self._soledad = self._soledad_instance(user=self.email) self._soledad._init_dirs() #self._soledad._gpg.import_keys(PUBLIC_KEY) - self._soledad._crypto = SoledadCrypto(self.gnupg_home) + self._soledad._crypto = SoledadCrypto(self._soledad) if not self._soledad._has_symkey(): self._soledad._gen_symkey() self._soledad._load_symkey() diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index f762437a..039d2f3c 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -77,7 +77,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest): rd = self._soledad.export_recovery_document(None) self.assertEqual( { - 'user': self._soledad._user, + 'address': self._soledad._address, 'symkey': self._soledad._symkey }, json.loads(rd), @@ -89,10 +89,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest): self.assertEqual(True, self._soledad._crypto.is_encrypted_sym(rd)) data = { - 'user': self._soledad._user, + 'address': self._soledad._address, 'symkey': self._soledad._symkey, } - raw_data = json.loads(str(self._soledad._crypto.decrypt( + raw_data = json.loads(str(self._soledad._crypto.decrypt_sym( rd, passphrase='123456'))) self.assertEqual( @@ -111,10 +111,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest): gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2') s._init_dirs() - s._crypto = SoledadCrypto(gnupg_home) + s._crypto = SoledadCrypto(s) s.import_recovery_document(rd, None) - self.assertEqual(self._soledad._user, - s._user, 'Failed setting user email.') + self.assertEqual(self._soledad._address, + s._address, 'Failed setting user email.') self.assertEqual(self._soledad._symkey, s._symkey, 'Failed settinng secret for symmetric encryption.') @@ -124,10 +124,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest): gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir s = self._soledad_instance(user='anotheruser@leap.se', prefix='3') s._init_dirs() - s._crypto = SoledadCrypto(gnupg_home) + s._crypto = SoledadCrypto(s) s.import_recovery_document(rd, '123456') - self.assertEqual(self._soledad._user, - s._user, 'Failed setting user email.') + self.assertEqual(self._soledad._address, + s._address, 'Failed setting user email.') self.assertEqual(self._soledad._symkey, s._symkey, 'Failed settinng secret for symmetric encryption.') @@ -138,7 +138,7 @@ class CryptoMethodsTestCase(BaseSoledadTest): def test__gen_symkey(self): sol = self._soledad_instance(user='user@leap.se', prefix='/3') sol._init_dirs() - sol._crypto = SoledadCrypto("%s/3/gnupg" % self.tempdir) + sol._crypto = SoledadCrypto(sol) self.assertFalse(sol._has_symkey(), "Should not have a symkey at " "this point") sol._gen_symkey() @@ -147,7 +147,7 @@ class CryptoMethodsTestCase(BaseSoledadTest): def test__has_keys(self): sol = self._soledad_instance(user='leap@leap.se', prefix='/5') sol._init_dirs() - sol._crypto = SoledadCrypto("%s/5/gnupg" % self.tempdir) + sol._crypto = SoledadCrypto(sol) self.assertFalse(sol._has_keys()) sol._gen_symkey() self.assertTrue(sol._has_keys()) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py index b849c310..bbe9ad4b 100644 --- a/src/leap/soledad/tests/test_soledad.py +++ b/src/leap/soledad/tests/test_soledad.py @@ -50,7 +50,7 @@ class AuxMethodsTestCase(BaseSoledadTest): def test__init_db(self): sol = self._soledad_instance() sol._init_dirs() - sol._crypto = SoledadCrypto(self.tempdir+'/gnupg') + sol._crypto = SoledadCrypto(sol) #self._soledad._gpg.import_keys(PUBLIC_KEY) if not sol._has_symkey(): sol._gen_symkey() @@ -63,7 +63,7 @@ class AuxMethodsTestCase(BaseSoledadTest): """ Test if configuration defaults point to the correct place. """ - sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False) + sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False) self.assertTrue(bool(re.match( '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home()))) self.assertTrue(bool(re.match( @@ -83,7 +83,7 @@ class AuxMethodsTestCase(BaseSoledadTest): # we use regexp match here because HOME environment variable is # changed by the BaseLeapTest class but BaseConfig does not capture # that change. - sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False) + sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False) self.assertTrue(bool(re.match( '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home()))) self.assertTrue(bool(re.match( @@ -114,8 +114,8 @@ class AuxMethodsTestCase(BaseSoledadTest): f.write(json.dumps(config_values)) f.close() sol = Soledad( - user='leap@leap.se', - passphrase='123', + 'leap@leap.se', + passphrase='123', bootstrap=False, config_path=tmpfile) self.assertEqual('value_1', sol._config.get_gnupg_home()) @@ -131,8 +131,8 @@ class AuxMethodsTestCase(BaseSoledadTest): # changed by the BaseLeapTest class but BaseConfig does not capture # that change. sol = Soledad( - user='leap@leap.se', - passphrase='123', + 'leap@leap.se', + passphrase='123', bootstrap=False, gnupg_home='value_4', secret_path='value_3', diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py deleted file mode 100644 index 90797501..00000000 --- a/src/leap/soledad/util.py +++ /dev/null @@ -1,393 +0,0 @@ -# -*- coding: utf-8 -*- -# util.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Utilities for Soledad. -""" - -import os -import gnupg -import re -from gnupg import ( - logger, - _is_sequence, - _make_binary_stream, -) - - -class ListPackets(): - """ - Handle status messages for --list-packets. - """ - - def __init__(self, gpg): - """ - Initialize the packet listing handling class. - - @param gpg: GPG object instance. - @type gpg: gnupg.GPG - """ - self.gpg = gpg - self.nodata = None - self.key = None - self.need_passphrase = None - self.need_passphrase_sym = None - self.userid_hint = None - - def handle_status(self, key, value): - """ - Handle one line of the --list-packets status message. - - @param key: The status message key. - @type key: str - @param value: The status message value. - @type value: str - """ - # TODO: write tests for handle_status - if key == 'NODATA': - self.nodata = True - if key == 'ENC_TO': - # This will only capture keys in our keyring. In the future we - # may want to include multiple unknown keys in this list. - self.key, _, _ = value.split() - if key == 'NEED_PASSPHRASE': - self.need_passphrase = True - if key == 'NEED_PASSPHRASE_SYM': - self.need_passphrase_sym = True - if key == 'USERID_HINT': - self.userid_hint = value.strip().split() - - -class GPGWrapper(gnupg.GPG): - """ - This is a temporary class for handling GPG requests, and should be - replaced by a more general class used throughout the project. - """ - - GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" - GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS - - def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, - verbose=False, use_agent=False, keyring=None, options=None): - """ - Initialize a GnuPG process wrapper. - - @param gpgbinary: Name for GnuPG binary executable. - @type gpgbinary: C{str} - @param gpghome: Full pathname to directory containing the public and - private keyrings. - @type gpghome: C{str} - @param keyring: Name of alternative keyring file to use. If specified, - the default keyring is not used. - @param verbose: Should some verbose info be output? - @type verbose: bool - @param use_agent: Should pass `--use-agent` to GPG binary? - @type use_agent: bool - @param keyring: Path for the keyring to use. - @type keyring: str - @options: A list of additional options to pass to the GPG binary. - @type options: list - - @raise: RuntimeError with explanation message if there is a problem - invoking gpg. - """ - gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary, - verbose=verbose, use_agent=use_agent, - keyring=keyring, options=options) - self.result_map['list-packets'] = ListPackets - - def find_key_by_email(self, email, secret=False): - """ - Find user's key based on their email. - - @param email: Email address of key being searched for. - @type email: str - @param secret: Should we search for a secret key? - @type secret: bool - - @return: The fingerprint of the found key. - @rtype: str - """ - for key in self.list_keys(secret=secret): - for uid in key['uids']: - if re.search(email, uid): - return key - raise LookupError("GnuPG public key for email %s not found!" % email) - - def find_key_by_subkey(self, subkey, secret=False): - """ - Find user's key based on a subkey fingerprint. - - @param email: Subkey fingerprint of the key being searched for. - @type email: str - @param secret: Should we search for a secret key? - @type secret: bool - - @return: The fingerprint of the found key. - @rtype: str - """ - for key in self.list_keys(secret=secret): - for sub in key['subkeys']: - if sub[0] == subkey: - return key - raise LookupError( - "GnuPG public key for subkey %s not found!" % subkey) - - def find_key_by_keyid(self, keyid, secret=False): - """ - Find user's key based on the key ID. - - @param email: The key ID of the key being searched for. - @type email: str - @param secret: Should we search for a secret key? - @type secret: bool - - @return: The fingerprint of the found key. - @rtype: str - """ - for key in self.list_keys(secret=secret): - if keyid == key['keyid']: - return key - raise LookupError( - "GnuPG public key for keyid %s not found!" % keyid) - - def find_key_by_fingerprint(self, fingerprint, secret=False): - """ - Find user's key based on the key fingerprint. - - @param email: The fingerprint of the key being searched for. - @type email: str - @param secret: Should we search for a secret key? - @type secret: bool - - @return: The fingerprint of the found key. - @rtype: str - """ - for key in self.list_keys(secret=secret): - if fingerprint == key['fingerprint']: - return key - raise LookupError( - "GnuPG public key for fingerprint %s not found!" % fingerprint) - - def encrypt(self, data, recipient, sign=None, always_trust=True, - passphrase=None, symmetric=False): - """ - Encrypt data using GPG. - - @param data: The data to be encrypted. - @type data: str - @param recipient: The address of the public key to be used. - @type recipient: str - @param sign: Should the encrypted content be signed? - @type sign: bool - @param always_trust: Skip key validation and assume that used keys - are always fully trusted? - @type always_trust: bool - @param passphrase: The passphrase to be used if symmetric encryption - is desired. - @type passphrase: str - @param symmetric: Should we encrypt to a password? - @type symmetric: bool - - @return: An object with encrypted result in the `data` field. - @rtype: gnupg.Crypt - """ - # TODO: devise a way so we don't need to "always trust". - return gnupg.GPG.encrypt(self, data, recipient, sign=sign, - always_trust=always_trust, - passphrase=passphrase, - symmetric=symmetric, - cipher_algo='AES256') - - def decrypt(self, data, always_trust=True, passphrase=None): - """ - Decrypt data using GPG. - - @param data: The data to be decrypted. - @type data: str - @param always_trust: Skip key validation and assume that used keys - are always fully trusted? - @type always_trust: bool - @param passphrase: The passphrase to be used if symmetric encryption - is desired. - @type passphrase: str - - @return: An object with decrypted result in the `data` field. - @rtype: gnupg.Crypt - """ - # TODO: devise a way so we don't need to "always trust". - return gnupg.GPG.decrypt(self, data, always_trust=always_trust, - passphrase=passphrase) - - def send_keys(self, keyserver, *keyids): - """ - Send keys to a keyserver - - @param keyserver: The keyserver to send the keys to. - @type keyserver: str - @param keyids: The key ids to send. - @type keyids: list - - @return: A list of keys sent to server. - @rtype: gnupg.ListKeys - """ - # TODO: write tests for this. - # TODO: write a SendKeys class to handle status for this. - result = self.result_map['list'](self) - gnupg.logger.debug('send_keys: %r', keyids) - data = gnupg._make_binary_stream("", self.encoding) - args = ['--keyserver', keyserver, '--send-keys'] - args.extend(keyids) - self._handle_io(args, data, result, binary=True) - gnupg.logger.debug('send_keys result: %r', result.__dict__) - data.close() - return result - - def encrypt_file(self, file, recipients, sign=None, - always_trust=False, passphrase=None, - armor=True, output=None, symmetric=False, - cipher_algo=None): - """ - Encrypt the message read from the file-like object 'file'. - - @param file: The file to be encrypted. - @type data: file - @param recipient: The address of the public key to be used. - @type recipient: str - @param sign: Should the encrypted content be signed? - @type sign: bool - @param always_trust: Skip key validation and assume that used keys - are always fully trusted? - @type always_trust: bool - @param passphrase: The passphrase to be used if symmetric encryption - is desired. - @type passphrase: str - @param armor: Create ASCII armored output? - @type armor: bool - @param output: Path of file to write results in. - @type output: str - @param symmetric: Should we encrypt to a password? - @type symmetric: bool - @param cipher_algo: Algorithm to use. - @type cipher_algo: str - - @return: An object with encrypted result in the `data` field. - @rtype: gnupg.Crypt - """ - args = ['--encrypt'] - if symmetric: - args = ['--symmetric'] - if cipher_algo: - args.append('--cipher-algo %s' % cipher_algo) - else: - args = ['--encrypt'] - if not _is_sequence(recipients): - recipients = (recipients,) - for recipient in recipients: - args.append('--recipient "%s"' % recipient) - if armor: # create ascii-armored output - set to False for binary - args.append('--armor') - if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) - if sign: - args.append('--sign --default-key "%s"' % sign) - if always_trust: - args.append("--always-trust") - result = self.result_map['crypt'](self) - self._handle_io(args, file, result, passphrase=passphrase, binary=True) - logger.debug('encrypt result: %r', result.data) - return result - - def list_packets(self, data): - """ - List the sequence of packets. - - @param data: The data to extract packets from. - @type data: str - - @return: An object with packet info. - @rtype ListPackets - """ - args = ["--list-packets"] - result = self.result_map['list-packets'](self) - self._handle_io( - args, - _make_binary_stream(data, self.encoding), - result, - ) - return result - - def encrypted_to(self, data): - """ - Return the key to which data is encrypted to. - - @param data: The data to be examined. - @type data: str - - @return: The fingerprint of the key to which data is encrypted to. - @rtype: str - """ - # TODO: make this support multiple keys. - result = self.list_packets(data) - if not result.key: - raise LookupError( - "Content is not encrypted to a GnuPG key!") - try: - return self.find_key_by_keyid(result.key) - except: - return self.find_key_by_subkey(result.key) - - def is_encrypted_sym(self, data): - """ - Say whether some chunk of data is encrypted to a symmetric key. - - @param data: The data to be examined. - @type data: str - - @return: Whether data is encrypted to a symmetric key. - @rtype: bool - """ - result = self.list_packets(data) - return bool(result.need_passphrase_sym) - - def is_encrypted_asym(self, data): - """ - Say whether some chunk of data is encrypted to a private key. - - @param data: The data to be examined. - @type data: str - - @return: Whether data is encrypted to a private key. - @rtype: bool - """ - result = self.list_packets(data) - return bool(result.key) - - def is_encrypted(self, data): - """ - Say whether some chunk of data is encrypted to a key. - - @param data: The data to be examined. - @type data: str - - @return: Whether data is encrypted to a key. - @rtype: bool - """ - self.is_encrypted_asym() or self.is_encrypted_sym() |