diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/soledad/__init__.py | 98 | ||||
| -rw-r--r-- | src/leap/soledad/backends/leap_backend.py | 15 | ||||
| -rw-r--r-- | src/leap/soledad/crypto.py | 69 | ||||
| -rw-r--r-- | src/leap/soledad/tests/__init__.py | 2 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_crypto.py | 22 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_soledad.py | 14 | ||||
| -rw-r--r-- | src/leap/soledad/util.py | 393 | 
7 files changed, 112 insertions, 501 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index a27a586d..23d81e84 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -43,7 +43,6 @@ from hashlib import sha256  from leap.common import events -#from leap.common.keymanager.gpgwrapper import GPGWrapper  from leap.soledad.config import SoledadConfig  from leap.soledad.backends import sqlcipher  from leap.soledad.backends.leap_backend import ( @@ -108,14 +107,14 @@ class Soledad(object):      The length of the secret used for symmetric encryption.      """ -    def __init__(self, user, passphrase, config_path=None, gnupg_home=None, +    def __init__(self, address, passphrase, config_path=None, gnupg_home=None,                   secret_path=None, local_db_path=None,                   shared_db_url=None, auth_token=None, bootstrap=True):          """          Initialize configuration, cryptographic keys and dbs. -        @param user: Email address of the user (username@provider). -        @type user: str +        @param address: User's address in the form C{user@provider}. +        @type address: str          @param passphrase: The passphrase for locking and unlocking encryption              secrets for disk storage.          @type passphrase: str @@ -138,7 +137,7 @@ class Soledad(object):          @type bootstrap: bool          """          # TODO: allow for fingerprint enforcing. -        self._user = user +        self._address = address          self._passphrase = passphrase          self._auth_token = auth_token          self._init_config( @@ -179,7 +178,7 @@ class Soledad(object):          # TODO: log each bootstrap step.          # Stage 0  - Local environment setup          self._init_dirs() -        self._crypto = SoledadCrypto(self._config.get_gnupg_home()) +        self._crypto = SoledadCrypto(self)          if self._config.get_shared_db_url() and self._auth_token:              # TODO: eliminate need to create db here.              self._shared_db = SoledadSharedDatabase.open_database( @@ -192,12 +191,14 @@ class Soledad(object):          if self._has_keys():              self._load_keys()          else: -            doc = self._get_keys_doc() +            doc = self._fetch_keys_from_shared_db()              if not doc:                  self._init_keys()              else: -                self._set_symkey(self.decrypt(doc.content['_symkey'], -                                              passphrase=self._hash_user())) +                self._set_symkey( +                    self._crypto.decrypt_sym( +                        doc.content['_symkey'], +                        passphrase=self._address_hash()))          # Stage 2 - Keys synchronization          self._assert_server_keys()          # Stage 3 - Local database initialization @@ -243,12 +244,13 @@ class Soledad(object):          """          Generate (if needed) and load secret for symmetric encryption.          """ -        events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._user) +        events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._address)          # load/generate secret          if not self._has_symkey():              self._gen_symkey()          self._load_symkey() -        events.signal(events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._user) +        events.signal( +            events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._address)      def _init_db(self):          """ @@ -293,7 +295,8 @@ class Soledad(object):              raise DocumentNotEncrypted(                  "File %s is not encrypted!" % self._config.get_secret_path())          # can we decrypt it? -        cyphertext = self._crypto.decrypt(content, passphrase=self._passphrase) +        cyphertext = self._crypto.decrypt_sym( +            content, passphrase=self._passphrase)          return bool(cyphertext)      def _load_symkey(self): @@ -306,7 +309,8 @@ class Soledad(object):          try:              with open(self._config.get_secret_path()) as f:                  self._symkey = \ -                    self._crypto.decrypt(f.read(), passphrase=self._passphrase) +                    self._crypto.decrypt_sym( +                        f.read(), passphrase=self._passphrase)                  self._crypto.symkey = self._symkey          except IOError:              raise IOError('Failed to open secret file %s.' % @@ -317,10 +321,11 @@ class Soledad(object):          Generate a secret for symmetric encryption and store in a local          encrypted file.          """ -        self._set_symkey(''.join( +        symkey = ''.join(              random.choice(                  string.ascii_letters + -                string.digits) for x in range(self.SECRET_LENGTH))) +                string.digits) for x in range(self.SECRET_LENGTH)) +        self._set_symkey(symkey)      def _set_symkey(self, symkey):          """ @@ -338,9 +343,8 @@ class Soledad(object):          self._store_symkey()      def _store_symkey(self): -        ciphertext = self._crypto.encrypt( -            self._symkey, symmetric=True, -            passphrase=self._passphrase) +        ciphertext = self._crypto.encrypt_sym( +            self._symkey, self._passphrase)          f = open(self._config.get_secret_path(), 'w')          f.write(str(ciphertext))          f.close() @@ -370,7 +374,7 @@ class Soledad(object):          """          self._gen_symkey() -    def _hash_user(self): +    def _address_hash(self):          """          Calculate a hash for storing/retrieving key material on shared          database, based on user's address. @@ -379,9 +383,9 @@ class Soledad(object):          @rtype: str          """          return b2a_base64( -            sha256('user-%s' % self._user).digest())[:-1] +            sha256('address-%s' % self._address).digest())[:-1] -    def _get_keys_doc(self): +    def _fetch_keys_from_shared_db(self):          """          Retrieve the document with encrypted key material from the shared          database. @@ -389,12 +393,14 @@ class Soledad(object):          @return: a document with encrypted key material in its contents          @rtype: LeapDocument          """ -        events.signal(events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._user) +        events.signal( +            events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._address)          # TODO: change below to raise appropriate exceptions          if not self._shared_db:              return None -        doc = self._shared_db.get_doc_unauth(self._hash_user()) -        events.signal(events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._user) +        doc = self._shared_db.get_doc_unauth(self._address_hash()) +        events.signal( +            events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._address)          return doc      def _assert_server_keys(self): @@ -404,20 +410,24 @@ class Soledad(object):          assert self._has_keys()          if not self._shared_db:              return -        doc = self._get_keys_doc() +        doc = self._fetch_keys_from_shared_db()          if doc: -            remote_symkey = self.decrypt(doc.content['_symkey'], -                                         passphrase=self._hash_user()) +            remote_symkey = self.decrypt_sym( +                doc.content['_symkey'], +                passphrase=self._address_hash())              assert remote_symkey == self._symkey          else: -            events.signal(events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._user) +            events.signal( +                events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._address)              content = { -                '_symkey': self.encrypt(self._symkey), +                '_symkey': self.encrypt_sym(self._symkey, self._passphrase),              } -            doc = LeapDocument(doc_id=self._hash_user(), crypto=self._crypto) +            doc = LeapDocument(doc_id=self._address_hash(), +                               crypto=self._crypto)              doc.content = content              self._shared_db.put_doc(doc) -            events.signal(events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._user) +            events.signal( +                events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._address)      #-------------------------------------------------------------------------      # Document storage, retrieval and sync @@ -497,7 +507,6 @@ class Soledad(object):          """          return self._db.get_all_docs(include_deleted) -      def create_doc(self, content, doc_id=None):          """          Create a new document in the local encrypted database. @@ -569,7 +578,7 @@ class Soledad(object):          """          # TODO: create authentication scheme for sync with server.          local_gen = self._db.sync(url, creds=None, autocreate=True) -        events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._user) +        events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._address)          return local_gen      def need_sync(self, url): @@ -587,11 +596,11 @@ class Soledad(object):          info = target.get_sync_info(self._db._get_replica_uid())          # compare source generation with target's last known source generation          if self._db._get_generation() != info[4]: -            events.signal(events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._user) +            events.signal( +                events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._address)              return True          return False -      #-------------------------------------------------------------------------      # Recovery document export and import      #------------------------------------------------------------------------- @@ -623,13 +632,11 @@ class Soledad(object):          @rtype: str          """          data = json.dumps({ -            'user': self._user, +            'address': self._address,              'symkey': self._symkey,          })          if passphrase: -            data = self._crypto.encrypt(data, None, sign=None, -                                        passphrase=passphrase, -                                        symmetric=True) +            data = self._crypto.encrypt_sym(data, passphrase)          return data      def import_recovery_document(self, data, passphrase=None): @@ -649,14 +656,23 @@ class Soledad(object):              raise DocumentNotEncrypted("You provided a password but the "                                         "recovery document is not encrypted.")          if passphrase: -            data = str(self._crypto.decrypt(data, passphrase=passphrase)) +            data = self._crypto.decrypt_sym(data, passphrase=passphrase)          data = json.loads(data) -        self._user = data['user'] +        self._address = data['address']          self._symkey = data['symkey']          self._crypto.symkey = self._symkey          self._store_symkey()          # TODO: make this work well with bootstrap.          self._load_keys() +    # +    # Setters/getters +    # + +    def _get_address(self): +        return self._address + +    address = property(_get_address, doc='The user address.') +  __all__ = ['backends', 'util', 'server', 'shared_db'] diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py index dfec9e85..1c0d5a7d 100644 --- a/src/leap/soledad/backends/leap_backend.py +++ b/src/leap/soledad/backends/leap_backend.py @@ -35,6 +35,9 @@ from u1db.remote.http_database import HTTPDatabase  from u1db.errors import BrokenSyncStream +from leap.common.keymanager import KeyManager + +  class NoDefaultKey(Exception):      """      Exception to signal that there's no default OpenPGP key configured. @@ -120,18 +123,18 @@ class LeapDocument(Document):          """          if not self._crypto:              raise NoSoledadCryptoInstance() -        return self._crypto.encrypt_symmetric( +        return self._crypto.encrypt_sym(              self.get_json(), -            self._crypto._hash_passphrase(self.doc_id)) +            self._crypto.passphrase_hash(self.doc_id))      def set_encrypted_content(self, cyphertext):          """          Decrypt C{cyphertext} and set document's content.          contents.          """ -        plaintext = self._crypto.decrypt_symmetric( +        plaintext = self._crypto.decrypt_sym(              cyphertext, -            self._crypto._hash_passphrase(self.doc_id)) +            self._crypto.passphrase_hash(self.doc_id))          self.set_json(plaintext)          self.encryption_scheme = EncryptionSchemes.NONE @@ -299,14 +302,14 @@ class LeapSyncTarget(HTTPSyncTarget):                          raise DocumentNotEncrypted(                              'Incoming document\'s contents should be '                              'encrypted with a symmetric key.') -                    plain_json = self._crypto.decrypt_symmetric( +                    plain_json = self._crypto.decrypt_sym(                          enc_json, self._crypto._symkey)                  elif entry['encryption_scheme'] == EncryptionScheme.PUBKEY:                      if not self._crypto.is_encrypted_asym(enc_json):                          raise DocumentNotEncrypted(                              'Incoming document\'s contents should be '                              'encrypted to the user\'s public key.') -                    plain_json = self._crypto.decrypt(enc_json) +                    plain_json = self._crypto.decrypt_asym(enc_json)                  else:                      raise DocumentNotEncrypted(                          "Incoming document from sync is not encrypted.") diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py index 471b35e2..dcc40439 100644 --- a/src/leap/soledad/crypto.py +++ b/src/leap/soledad/crypto.py @@ -25,8 +25,7 @@ from binascii import b2a_base64  from hashlib import sha256 -from leap.common.keymanager import KeyManager -from leap.soledad.util import GPGWrapper +from leap.common.keymanager import openpgp  class NoSymmetricSecret(Exception): @@ -40,42 +39,32 @@ class SoledadCrypto(object):      General cryptographic functionality.      """ -    def __init__(self, gnupg_home, symkey=None): +    def __init__(self, soledad):          """          Initialize the crypto object. -        @param gnupg_home: Home of the gpg instance. -        @type gnupg_home: str -        @param symkey: A key to use for symmetric encryption. -        @type symkey: str +        @param soledad: A Soledad instance for key lookup. +        @type soledad: leap.soledad.Soledad          """ -        self._gpg = GPGWrapper(gnupghome=gnupg_home) -        self._symkey = symkey +        self._soledad = soledad +        self._pgp = openpgp.OpenPGPScheme(self._soledad) +        self._symkey = None -    def encrypt(self, data, recipients=None, sign=None, passphrase=None, -                symmetric=False): +    def encrypt_asym(self, data, key):          """          Encrypt data.          @param data: the data to be encrypted          @type data: str -        @param recipients: to whom C{data} should be encrypted  -        @type recipients: list or str -        @param sign: the fingerprint of key to be used for signature -        @type sign: str -        @param passphrase: the passphrase to be used for encryption -        @type passphrase: str -        @param symmetric: whether the encryption scheme should be symmetric -        @type symmetric: bool +        @param key: the key to be used for encryption +        @type key: str          @return: the encrypted data          @rtype: str          """ -        return str(self._gpg.encrypt(data, recipients, sign=sign, -                                     passphrase=passphrase, -                                     symmetric=symmetric)) +        return openpgp.encrypt_asym(data, key) -    def encrypt_symmetric(self, data, passphrase, sign=None): +    def encrypt_sym(self, data, passphrase):          """          Encrypt C{data} using a {password}. @@ -83,18 +72,13 @@ class SoledadCrypto(object):          @type data: str          @param passphrase: the passphrase to use for encryption          @type passphrase: str -        @param data: the data to be encrypted -        @param sign: the fingerprint of key to be used for signature -        @type sign: str          @return: the encrypted data          @rtype: str          """ -        return self.encrypt(data, sign=sign, -                            passphrase=passphrase, -                            symmetric=True) +        return openpgp.encrypt_sym(data, passphrase) -    def decrypt(self, data, passphrase=None): +    def decrypt_asym(self, data):          """          Decrypt data. @@ -106,9 +90,10 @@ class SoledadCrypto(object):          @return: the decrypted data          @rtype: str          """ -        return str(self._gpg.decrypt(data, passphrase=passphrase)) +        key = self._pgp.get_key(self._soledad.address, private=True) +        return openpgp.decrypt_asym(data, key) -    def decrypt_symmetric(self, data, passphrase): +    def decrypt_sym(self, data, passphrase):          """          Decrypt data using symmetric secret. @@ -120,7 +105,7 @@ class SoledadCrypto(object):          @return: the decrypted data          @rtype: str          """ -        return self.decrypt(data, passphrase=passphrase) +        return openpgp.decrypt_sym(data, passphrase)      def is_encrypted(self, data):          """ @@ -132,7 +117,7 @@ class SoledadCrypto(object):          @return: whether the data is a cyphertext          @rtype: bool          """ -        return self._gpg.is_encrypted(data) +        return openpgp.is_encrypted(data)      def is_encrypted_sym(self, data):          """ @@ -141,7 +126,7 @@ class SoledadCrypto(object):          @return: whether data is encrypted to a symmetric key          @rtype: bool          """ -        return self._gpg.is_encrypted_sym(data) +        return openpgp.is_encrypted_sym(data)      def is_encrypted_asym(self, data):          """ @@ -151,14 +136,14 @@ class SoledadCrypto(object):          @return: whether data is encrypted to an OpenPGP private key          @rtype: bool          """ -        return self._gpg.is_encrypted_asym(data) +        return openpgp.is_encrypted_asym(data) -    def _hash_passphrase(self, suffix): +    def passphrase_hash(self, suffix):          """          Generate a passphrase for symmetric encryption. -        The password is derived from C{suffix} and the secret for -        symmetric encryption previously loaded. +        The password is derived from the secret for symmetric encryption and +        a C{suffix} that is appended to the secret prior to hashing.          @param suffix: Will be appended to the symmetric key before hashing.          @type suffix: str @@ -172,9 +157,9 @@ class SoledadCrypto(object):          return b2a_base64(              sha256('%s%s' % (self._symkey, suffix)).digest())[:-1] -   # -   # symkey setters/getters -   # +    # +    # symkey setters/getters +    #      def _get_symkey(self):          return self._symkey diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py index 28114391..dac27a29 100644 --- a/src/leap/soledad/tests/__init__.py +++ b/src/leap/soledad/tests/__init__.py @@ -36,7 +36,7 @@ class BaseSoledadTest(BaseLeapTest):          self._soledad = self._soledad_instance(user=self.email)          self._soledad._init_dirs()          #self._soledad._gpg.import_keys(PUBLIC_KEY) -        self._soledad._crypto = SoledadCrypto(self.gnupg_home) +        self._soledad._crypto = SoledadCrypto(self._soledad)          if not self._soledad._has_symkey():              self._soledad._gen_symkey()          self._soledad._load_symkey() diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index f762437a..039d2f3c 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -77,7 +77,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):          rd = self._soledad.export_recovery_document(None)          self.assertEqual(              { -                'user': self._soledad._user, +                'address': self._soledad._address,                  'symkey': self._soledad._symkey              },              json.loads(rd), @@ -89,10 +89,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):          self.assertEqual(True,                           self._soledad._crypto.is_encrypted_sym(rd))          data = { -            'user': self._soledad._user, +            'address': self._soledad._address,              'symkey': self._soledad._symkey,          } -        raw_data = json.loads(str(self._soledad._crypto.decrypt( +        raw_data = json.loads(str(self._soledad._crypto.decrypt_sym(              rd,              passphrase='123456')))          self.assertEqual( @@ -111,10 +111,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):          gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir          s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2')          s._init_dirs() -        s._crypto = SoledadCrypto(gnupg_home) +        s._crypto = SoledadCrypto(s)          s.import_recovery_document(rd, None) -        self.assertEqual(self._soledad._user, -                         s._user, 'Failed setting user email.') +        self.assertEqual(self._soledad._address, +                         s._address, 'Failed setting user email.')          self.assertEqual(self._soledad._symkey,                           s._symkey,                           'Failed settinng secret for symmetric encryption.') @@ -124,10 +124,10 @@ class RecoveryDocumentTestCase(BaseSoledadTest):          gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir          s = self._soledad_instance(user='anotheruser@leap.se', prefix='3')          s._init_dirs() -        s._crypto = SoledadCrypto(gnupg_home) +        s._crypto = SoledadCrypto(s)          s.import_recovery_document(rd, '123456') -        self.assertEqual(self._soledad._user, -                         s._user, 'Failed setting user email.') +        self.assertEqual(self._soledad._address, +                         s._address, 'Failed setting user email.')          self.assertEqual(self._soledad._symkey,                           s._symkey,                           'Failed settinng secret for symmetric encryption.') @@ -138,7 +138,7 @@ class CryptoMethodsTestCase(BaseSoledadTest):      def test__gen_symkey(self):          sol = self._soledad_instance(user='user@leap.se', prefix='/3')          sol._init_dirs() -        sol._crypto = SoledadCrypto("%s/3/gnupg" % self.tempdir) +        sol._crypto = SoledadCrypto(sol)          self.assertFalse(sol._has_symkey(), "Should not have a symkey at "                                              "this point")          sol._gen_symkey() @@ -147,7 +147,7 @@ class CryptoMethodsTestCase(BaseSoledadTest):      def test__has_keys(self):          sol = self._soledad_instance(user='leap@leap.se', prefix='/5')          sol._init_dirs() -        sol._crypto = SoledadCrypto("%s/5/gnupg" % self.tempdir) +        sol._crypto = SoledadCrypto(sol)          self.assertFalse(sol._has_keys())          sol._gen_symkey()          self.assertTrue(sol._has_keys()) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py index b849c310..bbe9ad4b 100644 --- a/src/leap/soledad/tests/test_soledad.py +++ b/src/leap/soledad/tests/test_soledad.py @@ -50,7 +50,7 @@ class AuxMethodsTestCase(BaseSoledadTest):      def test__init_db(self):          sol = self._soledad_instance()          sol._init_dirs() -        sol._crypto = SoledadCrypto(self.tempdir+'/gnupg') +        sol._crypto = SoledadCrypto(sol)          #self._soledad._gpg.import_keys(PUBLIC_KEY)          if not sol._has_symkey():              sol._gen_symkey() @@ -63,7 +63,7 @@ class AuxMethodsTestCase(BaseSoledadTest):          """          Test if configuration defaults point to the correct place.          """ -        sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False) +        sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False)          self.assertTrue(bool(re.match(              '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))          self.assertTrue(bool(re.match( @@ -83,7 +83,7 @@ class AuxMethodsTestCase(BaseSoledadTest):          # we use regexp match here because HOME environment variable is          # changed by the BaseLeapTest class but BaseConfig does not capture          # that change. -        sol = Soledad(user='leap@leap.se', passphrase='123', bootstrap=False) +        sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False)          self.assertTrue(bool(re.match(              '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))          self.assertTrue(bool(re.match( @@ -114,8 +114,8 @@ class AuxMethodsTestCase(BaseSoledadTest):          f.write(json.dumps(config_values))          f.close()          sol = Soledad( -            user='leap@leap.se', -            passphrase='123',  +            'leap@leap.se', +            passphrase='123',              bootstrap=False,              config_path=tmpfile)          self.assertEqual('value_1', sol._config.get_gnupg_home()) @@ -131,8 +131,8 @@ class AuxMethodsTestCase(BaseSoledadTest):          # changed by the BaseLeapTest class but BaseConfig does not capture          # that change.          sol = Soledad( -            user='leap@leap.se', -            passphrase='123',  +            'leap@leap.se', +            passphrase='123',              bootstrap=False,              gnupg_home='value_4',              secret_path='value_3', diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py deleted file mode 100644 index 90797501..00000000 --- a/src/leap/soledad/util.py +++ /dev/null @@ -1,393 +0,0 @@ -# -*- coding: utf-8 -*- -# util.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Utilities for Soledad. -""" - -import os -import gnupg -import re -from gnupg import ( -    logger, -    _is_sequence, -    _make_binary_stream, -) - - -class ListPackets(): -    """ -    Handle status messages for --list-packets. -    """ - -    def __init__(self, gpg): -        """ -        Initialize the packet listing handling class. - -        @param gpg: GPG object instance. -        @type gpg: gnupg.GPG -        """ -        self.gpg = gpg -        self.nodata = None -        self.key = None -        self.need_passphrase = None -        self.need_passphrase_sym = None -        self.userid_hint = None - -    def handle_status(self, key, value): -        """ -        Handle one line of the --list-packets status message. - -        @param key: The status message key. -        @type key: str -        @param value: The status message value. -        @type value: str -        """ -        # TODO: write tests for handle_status -        if key == 'NODATA': -            self.nodata = True -        if key == 'ENC_TO': -            # This will only capture keys in our keyring. In the future we -            # may want to include multiple unknown keys in this list. -            self.key, _, _ = value.split() -        if key == 'NEED_PASSPHRASE': -            self.need_passphrase = True -        if key == 'NEED_PASSPHRASE_SYM': -            self.need_passphrase_sym = True -        if key == 'USERID_HINT': -            self.userid_hint = value.strip().split() - - -class GPGWrapper(gnupg.GPG): -    """ -    This is a temporary class for handling GPG requests, and should be -    replaced by a more general class used throughout the project. -    """ - -    GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" -    GNUPG_BINARY = "/usr/bin/gpg"  # this has to be changed based on OS - -    def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, -                 verbose=False, use_agent=False, keyring=None, options=None): -        """ -        Initialize a GnuPG process wrapper. - -        @param gpgbinary: Name for GnuPG binary executable. -        @type gpgbinary: C{str} -        @param gpghome: Full pathname to directory containing the public and -            private keyrings. -        @type gpghome: C{str} -        @param keyring: Name of alternative keyring file to use. If specified, -            the default keyring is not used. -        @param verbose: Should some verbose info be output? -        @type verbose: bool -        @param use_agent: Should pass `--use-agent` to GPG binary? -        @type use_agent: bool -        @param keyring: Path for the keyring to use. -        @type keyring: str -        @options: A list of additional options to pass to the GPG binary. -        @type options: list - -        @raise: RuntimeError with explanation message if there is a problem -            invoking gpg. -        """ -        gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary, -                           verbose=verbose, use_agent=use_agent, -                           keyring=keyring, options=options) -        self.result_map['list-packets'] = ListPackets - -    def find_key_by_email(self, email, secret=False): -        """ -        Find user's key based on their email. - -        @param email: Email address of key being searched for. -        @type email: str -        @param secret: Should we search for a secret key? -        @type secret: bool - -        @return: The fingerprint of the found key. -        @rtype: str -        """ -        for key in self.list_keys(secret=secret): -            for uid in key['uids']: -                if re.search(email, uid): -                    return key -        raise LookupError("GnuPG public key for email %s not found!" % email) - -    def find_key_by_subkey(self, subkey, secret=False): -        """ -        Find user's key based on a subkey fingerprint. - -        @param email: Subkey fingerprint of the key being searched for. -        @type email: str -        @param secret: Should we search for a secret key? -        @type secret: bool - -        @return: The fingerprint of the found key. -        @rtype: str -        """ -        for key in self.list_keys(secret=secret): -            for sub in key['subkeys']: -                if sub[0] == subkey: -                    return key -        raise LookupError( -            "GnuPG public key for subkey %s not found!" % subkey) - -    def find_key_by_keyid(self, keyid, secret=False): -        """ -        Find user's key based on the key ID. - -        @param email: The key ID of the key being searched for. -        @type email: str -        @param secret: Should we search for a secret key? -        @type secret: bool - -        @return: The fingerprint of the found key. -        @rtype: str -        """ -        for key in self.list_keys(secret=secret): -            if keyid == key['keyid']: -                return key -        raise LookupError( -            "GnuPG public key for keyid %s not found!" % keyid) - -    def find_key_by_fingerprint(self, fingerprint, secret=False): -        """ -        Find user's key based on the key fingerprint. - -        @param email: The fingerprint of the key being searched for. -        @type email: str -        @param secret: Should we search for a secret key? -        @type secret: bool - -        @return: The fingerprint of the found key. -        @rtype: str -        """ -        for key in self.list_keys(secret=secret): -            if fingerprint == key['fingerprint']: -                return key -        raise LookupError( -            "GnuPG public key for fingerprint %s not found!" % fingerprint) - -    def encrypt(self, data, recipient, sign=None, always_trust=True, -                passphrase=None, symmetric=False): -        """ -        Encrypt data using GPG. - -        @param data: The data to be encrypted. -        @type data: str -        @param recipient: The address of the public key to be used. -        @type recipient: str -        @param sign: Should the encrypted content be signed? -        @type sign: bool -        @param always_trust: Skip key validation and assume that used keys -            are always fully trusted? -        @type always_trust: bool -        @param passphrase: The passphrase to be used if symmetric encryption -            is desired. -        @type passphrase: str -        @param symmetric: Should we encrypt to a password? -        @type symmetric: bool - -        @return: An object with encrypted result in the `data` field. -        @rtype: gnupg.Crypt -        """ -        # TODO: devise a way so we don't need to "always trust". -        return gnupg.GPG.encrypt(self, data, recipient, sign=sign, -                                 always_trust=always_trust, -                                 passphrase=passphrase, -                                 symmetric=symmetric, -                                 cipher_algo='AES256') - -    def decrypt(self, data, always_trust=True, passphrase=None): -        """ -        Decrypt data using GPG. - -        @param data: The data to be decrypted. -        @type data: str -        @param always_trust: Skip key validation and assume that used keys -            are always fully trusted? -        @type always_trust: bool -        @param passphrase: The passphrase to be used if symmetric encryption -            is desired. -        @type passphrase: str - -        @return: An object with decrypted result in the `data` field. -        @rtype: gnupg.Crypt -        """ -        # TODO: devise a way so we don't need to "always trust". -        return gnupg.GPG.decrypt(self, data, always_trust=always_trust, -                                 passphrase=passphrase) - -    def send_keys(self, keyserver, *keyids): -        """ -        Send keys to a keyserver - -        @param keyserver: The keyserver to send the keys to. -        @type keyserver: str -        @param keyids: The key ids to send. -        @type keyids: list - -        @return: A list of keys sent to server. -        @rtype: gnupg.ListKeys -        """ -        # TODO: write tests for this. -        # TODO: write a SendKeys class to handle status for this. -        result = self.result_map['list'](self) -        gnupg.logger.debug('send_keys: %r', keyids) -        data = gnupg._make_binary_stream("", self.encoding) -        args = ['--keyserver', keyserver, '--send-keys'] -        args.extend(keyids) -        self._handle_io(args, data, result, binary=True) -        gnupg.logger.debug('send_keys result: %r', result.__dict__) -        data.close() -        return result - -    def encrypt_file(self, file, recipients, sign=None, -                     always_trust=False, passphrase=None, -                     armor=True, output=None, symmetric=False, -                     cipher_algo=None): -        """ -        Encrypt the message read from the file-like object 'file'. - -        @param file: The file to be encrypted. -        @type data: file -        @param recipient: The address of the public key to be used. -        @type recipient: str -        @param sign: Should the encrypted content be signed? -        @type sign: bool -        @param always_trust: Skip key validation and assume that used keys -            are always fully trusted? -        @type always_trust: bool -        @param passphrase: The passphrase to be used if symmetric encryption -            is desired. -        @type passphrase: str -        @param armor: Create ASCII armored output? -        @type armor: bool -        @param output: Path of file to write results in. -        @type output: str -        @param symmetric: Should we encrypt to a password? -        @type symmetric: bool -        @param cipher_algo: Algorithm to use. -        @type cipher_algo: str - -        @return: An object with encrypted result in the `data` field. -        @rtype: gnupg.Crypt -        """ -        args = ['--encrypt'] -        if symmetric: -            args = ['--symmetric'] -            if cipher_algo: -                args.append('--cipher-algo %s' % cipher_algo) -        else: -            args = ['--encrypt'] -            if not _is_sequence(recipients): -                recipients = (recipients,) -            for recipient in recipients: -                args.append('--recipient "%s"' % recipient) -        if armor:  # create ascii-armored output - set to False for binary -            args.append('--armor') -        if output:  # write the output to a file with the specified name -            if os.path.exists(output): -                os.remove(output)  # to avoid overwrite confirmation message -            args.append('--output "%s"' % output) -        if sign: -            args.append('--sign --default-key "%s"' % sign) -        if always_trust: -            args.append("--always-trust") -        result = self.result_map['crypt'](self) -        self._handle_io(args, file, result, passphrase=passphrase, binary=True) -        logger.debug('encrypt result: %r', result.data) -        return result - -    def list_packets(self, data): -        """ -        List the sequence of packets. - -        @param data: The data to extract packets from. -        @type data: str - -        @return: An object with packet info. -        @rtype ListPackets -        """ -        args = ["--list-packets"] -        result = self.result_map['list-packets'](self) -        self._handle_io( -            args, -            _make_binary_stream(data, self.encoding), -            result, -        ) -        return result - -    def encrypted_to(self, data): -        """ -        Return the key to which data is encrypted to. - -        @param data: The data to be examined. -        @type data: str - -        @return: The fingerprint of the key to which data is encrypted to. -        @rtype: str -        """ -        # TODO: make this support multiple keys. -        result = self.list_packets(data) -        if not result.key: -            raise LookupError( -                "Content is not encrypted to a GnuPG key!") -        try: -            return self.find_key_by_keyid(result.key) -        except: -            return self.find_key_by_subkey(result.key) - -    def is_encrypted_sym(self, data): -        """ -        Say whether some chunk of data is encrypted to a symmetric key. - -        @param data: The data to be examined. -        @type data: str - -        @return: Whether data is encrypted to a symmetric key. -        @rtype: bool -        """ -        result = self.list_packets(data) -        return bool(result.need_passphrase_sym) - -    def is_encrypted_asym(self, data): -        """ -        Say whether some chunk of data is encrypted to a private key. - -        @param data: The data to be examined. -        @type data: str - -        @return: Whether data is encrypted to a private key. -        @rtype: bool -        """ -        result = self.list_packets(data) -        return bool(result.key) - -    def is_encrypted(self, data): -        """ -        Say whether some chunk of data is encrypted to a key. - -        @param data: The data to be examined. -        @type data: str - -        @return: Whether data is encrypted to a key. -        @rtype: bool -        """ -        self.is_encrypted_asym() or self.is_encrypted_sym()  | 
