diff options
| -rw-r--r-- | changes/feature_remove-strict-dependency-on-leap.common | 1 | ||||
| -rw-r--r-- | setup.py | 8 | ||||
| -rw-r--r-- | src/leap/soledad/__init__.py | 135 | ||||
| -rw-r--r-- | src/leap/soledad/backends/leap_backend.py | 26 | ||||
| -rw-r--r-- | src/leap/soledad/crypto.py | 64 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_crypto.py | 217 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_soledad.py | 74 | 
7 files changed, 316 insertions, 209 deletions
| diff --git a/changes/feature_remove-strict-dependency-on-leap.common b/changes/feature_remove-strict-dependency-on-leap.common new file mode 100644 index 00000000..f25dcbf3 --- /dev/null +++ b/changes/feature_remove-strict-dependency-on-leap.common @@ -0,0 +1 @@ +  o Remove strict dependency on leap.common. @@ -26,7 +26,6 @@ from setuptools import (  install_requirements = [      'configparser',      'couchdb', -    'leap.common',      'pysqlcipher',      'simplejson',      'twisted>=12.0.0',  # TODO: maybe we just want twisted-web? @@ -35,9 +34,10 @@ install_requirements = [      'u1db',      'requests',      'six==1.1.0', -    'pysqlite',      'scrypt',      'routes', +    'pyxdg', +    'pycrypto',  ] @@ -45,6 +45,7 @@ tests_requirements = [      'mock',      'nose2',      'testscenarios', +    'leap.common',  ] @@ -89,4 +90,7 @@ setup(      tests_require=tests_requirements,      data_files=data_files,      classifiers=trove_classifiers, +    extras_require={ +        'signaling': ['leap.common'], +    }  ) diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index fba275e3..ea3f676b 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -36,6 +36,7 @@ import scrypt  import httplib  import socket  import ssl +import errno  from xdg import BaseDirectory @@ -47,9 +48,92 @@ from u1db.remote.ssl_match_hostname import (  # noqa  ) -from leap.common import events -from leap.common.check import leap_assert -from leap.common.files import mkdir_p +# +# Assert functions +# + +def soledad_assert(condition, message): +    """ +    Asserts the condition and displays the message if that's not +    met. + +    @param condition: condition to check +    @type condition: bool +    @param message: message to display if the condition isn't met +    @type message: str +    """ +    assert condition, message + + +# we want to use leap.common.check.leap_assert in case it is available, +# because it also logs in a way other parts of leap can access log messages. +try: +    from leap.common.check import leap_assert +    soledad_assert = leap_assert +except ImportError: +    pass + + +def soledad_assert_type(var, expectedType): +    """ +    Helper assert check for a variable's expected type + +    @param var: variable to check +    @type var: any +    @param expectedType: type to check agains +    @type expectedType: type +    """ +    soledad_assert(isinstance(var, expectedType), +                   "Expected type %r instead of %r" % +                   (expectedType, type(var))) + +try: +    from leap.common.check import leap_assert_type +    soledad_assert_type = leap_assert_type +except ImportError: +    pass + + +# +# Signaling function +# + +# we define a fake signaling function and fake signal constants that will +# allow for logging signaling attempts in case leap.common.events is not +# available. + +def signal(signal, content=""): +    logger.info("Would signal: %s - %s." % (str(signal), content)) + +SOLEDAD_CREATING_KEYS = 'Creating keys...' +SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' +SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' +SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.' +SOLEDAD_UPLOADING_KEYS = 'Uploading keys...' +SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.' +SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.' +SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' + +# we want to use leap.common.events to emits signals, if it is available. +try: +    from leap.common import events +    # replace fake signaling function with real one +    signal = events.signal +    # replace fake string signals with real signals +    SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS +    SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS +    SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS +    SOLEDAD_DONE_DOWNLOADING_KEYS = \ +        events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS +    SOLEDAD_UPLOADING_KEYS = events.events_pb2.SOLEDAD_UPLOADING_KEYS +    SOLEDAD_DONE_UPLOADING_KEYS = \ +        events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS +    SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC +    SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC +except ImportError: +    pass + +  from leap.soledad.backends import sqlcipher  from leap.soledad.backends.leap_backend import (      LeapDocument, @@ -64,6 +148,10 @@ from leap.soledad.crypto import SoledadCrypto  logger = logging.getLogger(name=__name__) +# +# Constants +# +  SOLEDAD_CERT = None  """  Path to the certificate file used to certify the SSL connection between @@ -226,7 +314,7 @@ class Soledad(object):                  self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME)          # initialize server_url          self._server_url = server_url -        leap_assert( +        soledad_assert(              self._server_url is not None,              'Missing URL for Soledad server.') @@ -295,7 +383,13 @@ class Soledad(object):              [self._local_db_path, self._secrets_path])          for path in paths:              logger.info('Creating directory: %s.' % path) -            mkdir_p(path) +            try: +                os.makedirs(path) +            except OSError as exc: +                if exc.errno == errno.EEXIST and os.path.isdir(path): +                    pass +                else: +                    raise      def _init_db(self):          """ @@ -439,8 +533,8 @@ class Soledad(object):          This method emits the following signals: -            * leap.common.events.events_pb2.SOLEDAD_CREATING_KEYS -            * leap.common.events.events_pb2.SOLEDAD_DONE_CREATING_KEYS +            * SOLEDAD_CREATING_KEYS +            * SOLEDAD_DONE_CREATING_KEYS          A secret has the following structure: @@ -458,7 +552,7 @@ class Soledad(object):          @return: The id of the generated secret.          @rtype: str          """ -        events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._uuid) +        signal(SOLEDAD_CREATING_KEYS, self._uuid)          # generate random secret          secret = os.urandom(self.GENERATED_SECRET_LENGTH)          secret_id = sha256(secret).hexdigest() @@ -479,8 +573,7 @@ class Soledad(object):                  str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)),          }          self._store_secrets() -        events.signal( -            events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._uuid) +        signal(SOLEDAD_DONE_CREATING_KEYS, self._uuid)          return secret_id      def _store_secrets(self): @@ -543,15 +636,13 @@ class Soledad(object):          @return: a document with encrypted key material in its contents          @rtype: LeapDocument          """ -        events.signal( -            events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._uuid) +        signal(SOLEDAD_DOWNLOADING_KEYS, self._uuid)          db = self._shared_db()          if not db:              logger.warning('No shared db found')              return          doc = db.get_doc(self._uuid_hash()) -        events.signal( -            events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) +        signal(SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)          return doc      def _put_secrets_in_shared_db(self): @@ -563,7 +654,7 @@ class Soledad(object):          Otherwise, upload keys to shared recovery database.          """ -        leap_assert( +        soledad_assert(              self._has_secret(),              'Tried to send keys to server but they don\'t exist in local '              'storage.') @@ -574,15 +665,13 @@ class Soledad(object):          # fill doc with encrypted secrets          doc.content = self.export_recovery_document(include_uuid=False)          # upload secrets to server -        events.signal( -            events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._uuid) +        signal(SOLEDAD_UPLOADING_KEYS, self._uuid)          db = self._shared_db()          if not db:              logger.warning('No shared db found')              return          db.put_doc(doc) -        events.signal( -            events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) +        signal(SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)      #      # Document storage, retrieval and sync. @@ -835,7 +924,7 @@ class Soledad(object):          local_gen = self._db.sync(              urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),              creds=self._creds, autocreate=True) -        events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._uuid) +        signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)          return local_gen      def need_sync(self, url): @@ -852,8 +941,7 @@ class Soledad(object):          info = target.get_sync_info(self._db._get_replica_uid())          # compare source generation with target's last known source generation          if self._db._get_generation() != info[4]: -            events.signal( -                events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._uuid) +            signal(SOLEDAD_NEW_DATA_TO_SYNC, self._uuid)              return True          return False @@ -1005,3 +1093,6 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):  old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection  http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection + + +__all__ = ['soledad_assert', 'Soledad'] diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py index d92025db..4d92db37 100644 --- a/src/leap/soledad/backends/leap_backend.py +++ b/src/leap/soledad/backends/leap_backend.py @@ -33,13 +33,11 @@ from u1db.errors import BrokenSyncStream  from u1db.remote.http_target import HTTPSyncTarget -from leap.common.crypto import ( +from leap.soledad import soledad_assert +from leap.soledad.crypto import (      EncryptionMethods,      UnknownEncryptionMethod, -    encrypt_sym, -    decrypt_sym,  ) -from leap.common.check import leap_assert  from leap.soledad.auth import TokenBasedAuth @@ -167,9 +165,9 @@ def encrypt_doc(crypto, doc):          content.      @rtype: str      """ -    leap_assert(doc.is_tombstone() is False) +    soledad_assert(doc.is_tombstone() is False)      # encrypt content using AES-256 CTR mode -    iv, ciphertext = encrypt_sym( +    iv, ciphertext = crypto.encrypt_sym(          doc.get_json(),          crypto.doc_passphrase(doc.doc_id),          method=EncryptionMethods.AES_256_CTR) @@ -220,12 +218,12 @@ def decrypt_doc(crypto, doc):      @return: The JSON serialization of the decrypted content.      @rtype: str      """ -    leap_assert(doc.is_tombstone() is False) -    leap_assert(ENC_JSON_KEY in doc.content) -    leap_assert(ENC_SCHEME_KEY in doc.content) -    leap_assert(ENC_METHOD_KEY in doc.content) -    leap_assert(MAC_KEY in doc.content) -    leap_assert(MAC_METHOD_KEY in doc.content) +    soledad_assert(doc.is_tombstone() is False) +    soledad_assert(ENC_JSON_KEY in doc.content) +    soledad_assert(ENC_SCHEME_KEY in doc.content) +    soledad_assert(ENC_METHOD_KEY in doc.content) +    soledad_assert(MAC_KEY in doc.content) +    soledad_assert(MAC_METHOD_KEY in doc.content)      # verify MAC      ciphertext = binascii.a2b_hex(  # content is stored as hex.          doc.content[ENC_JSON_KEY]) @@ -241,8 +239,8 @@ def decrypt_doc(crypto, doc):      if enc_scheme == EncryptionSchemes.SYMKEY:          enc_method = doc.content[ENC_METHOD_KEY]          if enc_method == EncryptionMethods.AES_256_CTR: -            leap_assert(ENC_IV_KEY in doc.content) -            plainjson = decrypt_sym( +            soledad_assert(ENC_IV_KEY in doc.content) +            plainjson = crypto.decrypt_sym(                  ciphertext,                  crypto.doc_passphrase(doc.doc_id),                  method=enc_method, diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py index e020eee6..be83e4a2 100644 --- a/src/leap/soledad/crypto.py +++ b/src/leap/soledad/crypto.py @@ -21,11 +21,35 @@ Cryptographic utilities for Soledad.  """ +import os +import binascii  import hmac  import hashlib -from leap.common import crypto +from Crypto.Cipher import AES +from Crypto.Util import Counter + + +from leap.soledad import ( +    soledad_assert, +    soledad_assert_type, +) + + +class EncryptionMethods(object): +    """ +    Representation of encryption methods that can be used. +    """ + +    AES_256_CTR = 'aes-256-ctr' + + +class UnknownEncryptionMethod(Exception): +    """ +    Raised when trying to encrypt/decrypt with unknown method. +    """ +    pass  class NoSymmetricSecret(Exception): @@ -51,7 +75,7 @@ class SoledadCrypto(object):          self._soledad = soledad      def encrypt_sym(self, data, key, -                    method=crypto.EncryptionMethods.AES_256_CTR): +                    method=EncryptionMethods.AES_256_CTR):          """          Encrypt C{data} using a {password}. @@ -67,10 +91,24 @@ class SoledadCrypto(object):          @return: A tuple with the initial value and the encrypted data.          @rtype: (long, str)          """ -        return crypto.encrypt_sym(data, key, method) +        soledad_assert_type(key, str) + +        # AES-256 in CTR mode +        if method == EncryptionMethods.AES_256_CTR: +            soledad_assert( +                len(key) == 32,  # 32 x 8 = 256 bits. +                'Wrong key size: %s bits (must be 256 bits long).' % +                (len(key) * 8)) +            iv = os.urandom(8) +            ctr = Counter.new(64, prefix=iv) +            cipher = AES.new(key=key, mode=AES.MODE_CTR, counter=ctr) +            return binascii.b2a_base64(iv), cipher.encrypt(data) + +        # raise if method is unknown +        raise UnknownEncryptionMethod('Unkwnown method: %s' % method)      def decrypt_sym(self, data, key, -                    method=crypto.EncryptionMethods.AES_256_CTR, **kwargs): +                    method=EncryptionMethods.AES_256_CTR, **kwargs):          """          Decrypt data using symmetric secret. @@ -88,7 +126,23 @@ class SoledadCrypto(object):          @return: The decrypted data.          @rtype: str          """ -        return crypto.decrypt_sym(data, key, method, **kwargs) +        soledad_assert_type(key, str) + +        # AES-256 in CTR mode +        if method == EncryptionMethods.AES_256_CTR: +            # assert params +            soledad_assert( +                len(key) == 32,  # 32 x 8 = 256 bits. +                'Wrong key size: %s (must be 256 bits long).' % len(key)) +            soledad_assert( +                'iv' in kwargs, +                'AES-256-CTR needs an initial value given as.') +            ctr = Counter.new(64, prefix=binascii.a2b_base64(kwargs['iv'])) +            cipher = AES.new(key=key, mode=AES.MODE_CTR, counter=ctr) +            return cipher.decrypt(data) + +        # raise if method is unknown +        raise UnknownEncryptionMethod('Unkwnown method: %s' % method)      def doc_passphrase(self, doc_id):          """ diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index ae84dad3..59d20fa7 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -25,27 +25,20 @@ import shutil  import tempfile  import simplejson as json  import hashlib +import binascii -from leap.soledad.backends.leap_backend import ( -    LeapDocument, -    encrypt_doc, -    decrypt_doc, -    EncryptionSchemes, -    LeapSyncTarget, -    ENC_JSON_KEY, -    ENC_SCHEME_KEY, -    MAC_METHOD_KEY, -    MAC_KEY, -    UnknownMacMethod, -    WrongMac, -) -from leap.soledad.backends.couch import CouchDatabase +from leap.common.testing.basetest import BaseLeapTest +from Crypto import Random + + +from leap.common.testing.basetest import BaseLeapTest  from leap.soledad import Soledad -from leap.soledad.crypto import SoledadCrypto -from leap.soledad.tests import BaseSoledadTest -from leap.soledad.tests.test_couch import CouchDBTestCase +from leap.soledad import crypto +from leap.soledad.backends import leap_backend +from leap.soledad.backends.couch import CouchDatabase  from leap.soledad.tests import ( +    BaseSoledadTest,      KEY_FINGERPRINT,      PRIVATE_KEY,  ) @@ -54,8 +47,12 @@ from leap.soledad.tests.u1db_tests import (      nested_doc,      TestCaseWithServer,  ) -from leap.soledad.tests.test_leap_backend import make_leap_document_for_test -from leap.soledad.backends.couch import CouchServerState + + +# These will be used in the future for EncryptedCouchSyncTestCase +#from leap.soledad.tests.test_couch import CouchDBTestCase +#from leap.soledad.tests.test_leap_backend import make_leap_document_for_test +#from leap.soledad.backends.couch import CouchServerState  class EncryptedSyncTestCase(BaseSoledadTest): @@ -68,112 +65,22 @@ class EncryptedSyncTestCase(BaseSoledadTest):          Test encrypting and decrypting documents.          """          simpledoc = {'key': 'val'} -        doc1 = LeapDocument(doc_id='id') +        doc1 = leap_backend.LeapDocument(doc_id='id')          doc1.content = simpledoc          # encrypt doc -        doc1.set_json(encrypt_doc(self._soledad._crypto, doc1)) +        doc1.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc1))          # assert content is different and includes keys          self.assertNotEqual(              simpledoc, doc1.content,              'incorrect document encryption') -        self.assertTrue(ENC_JSON_KEY in doc1.content) -        self.assertTrue(ENC_SCHEME_KEY in doc1.content) +        self.assertTrue(leap_backend.ENC_JSON_KEY in doc1.content) +        self.assertTrue(leap_backend.ENC_SCHEME_KEY in doc1.content)          # decrypt doc -        doc1.set_json(decrypt_doc(self._soledad._crypto, doc1)) +        doc1.set_json(leap_backend.decrypt_doc(self._soledad._crypto, doc1))          self.assertEqual(              simpledoc, doc1.content, 'incorrect document encryption') -#from leap.soledad.server import SoledadApp, SoledadAuthMiddleware -# -# -#def make_token_leap_app(test, state): -#    app = SoledadApp(state) -#    application = SoledadAuthMiddleware(app, prefix='/soledad/') -#    return application -# -# -#def leap_sync_target(test, path): -#    return LeapSyncTarget(test.getURL(path)) -# -# -#def token_leap_sync_target(test, path): -#    st = leap_sync_target(test, 'soledad/' + path) -#    st.set_token_credentials('any_user', 'any_token') -#    return st -# -# -#class EncryptedCouchSyncTest(CouchDBTestCase, TestCaseWithServer): -# -#    make_app_with_state = make_token_leap_app -# -#    make_document_for_test = make_leap_document_for_test -# -#    sync_target = token_leap_sync_target -# -#    def make_app(self): -#        # potential hook point -#        self.request_state = CouchServerState(self._couch_url) -#        return self.make_app_with_state(self.request_state) -# -#    def _soledad_instance(self, user='leap@leap.se', prefix='', -#                          bootstrap=False, gnupg_home='/gnupg', -#                          secrets_path='/secret.gpg', -#                          local_db_path='/soledad.u1db'): -#        return Soledad( -#            user, -#            '123', -#            gnupg_home=self.tempdir+prefix+gnupg_home, -#            secrets_path=self.tempdir+prefix+secrets_path, -#            local_db_path=self.tempdir+prefix+local_db_path, -#            bootstrap=bootstrap) -# -#    def setUp(self): -#        CouchDBTestCase.setUp(self) -#        TestCaseWithServer.setUp(self) -#        self.tempdir = tempfile.mkdtemp(suffix='.couch.test') -#        # initialize soledad by hand so we can control keys -#        self._soledad = self._soledad_instance('leap@leap.se') -#        self._soledad._init_dirs() -#        self._soledad._crypto = SoledadCrypto(self._soledad) -#        if not self._soledad._has_get_storage_secret()(): -#            self._soledad._gen_get_storage_secret()() -#        self._soledad._load_get_storage_secret()() -#        self._soledad._init_db() -# -#    def tearDown(self): -#        shutil.rmtree(self.tempdir) -# -#    def test_encrypted_sym_sync(self): -#        # get direct access to couchdb -#        import ipdb; ipdb.set_trace() -#        self._couch_url = 'http://localhost:' + str(self.wrapper.port) -#        db = CouchDatabase(self._couch_url, 'testdb') -#        # create and encrypt a doc to insert directly in couchdb -#        doc = LeapDocument('doc-id') -#        doc.set_json( -#            encrypt_doc( -#                self._soledad._crypto, 'doc-id', json.dumps(simple_doc))) -#        db.put_doc(doc) -#        # setup credentials for access to soledad server -#        creds = { -#            'token': { -#                'uuid': 'leap@leap.se', -#                'token': '1234', -#            } -#        } -#        # sync local soledad db with server -#        self.assertTrue(self._soledad.get_doc('doc-id') is None) -#        self.startServer() -#        # TODO fix sync for test. -#        #self._soledad.sync(self.getURL('soledad/testdb'), creds) -#        # get and check doc -#        doc = self._soledad.get_doc('doc-id') -#        # TODO: fix below. -#        #self.assertTrue(doc is not None) -#        #self.assertTrue(doc.content == simple_doc) - -  class RecoveryDocumentTestCase(BaseSoledadTest):      def test_export_recovery_document_raw(self): @@ -199,7 +106,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):                           'Failed settinng secret for symmetric encryption.') -class CryptoMethodsTestCase(BaseSoledadTest): +class SoledadSecretsTestCase(BaseSoledadTest):      def test__gen_secret(self):          # instantiate and save secret_id @@ -254,33 +161,85 @@ class MacAuthTestCase(BaseSoledadTest):          Trying to decrypt a document with wrong MAC should raise.          """          simpledoc = {'key': 'val'} -        doc = LeapDocument(doc_id='id') +        doc = leap_backend.LeapDocument(doc_id='id')          doc.content = simpledoc          # encrypt doc -        doc.set_json(encrypt_doc(self._soledad._crypto, doc)) -        self.assertTrue(MAC_KEY in doc.content) -        self.assertTrue(MAC_METHOD_KEY in doc.content) +        doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc)) +        self.assertTrue(leap_backend.MAC_KEY in doc.content) +        self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content)          # mess with MAC -        doc.content[MAC_KEY] = '1234567890ABCDEF' +        doc.content[leap_backend.MAC_KEY] = '1234567890ABCDEF'          # try to decrypt doc          self.assertRaises( -            WrongMac, -            decrypt_doc, self._soledad._crypto, doc) +            leap_backend.WrongMac, +            leap_backend.decrypt_doc, self._soledad._crypto, doc)      def test_decrypt_with_unknown_mac_method_raises(self):          """          Trying to decrypt a document with unknown MAC method should raise.          """          simpledoc = {'key': 'val'} -        doc = LeapDocument(doc_id='id') +        doc = leap_backend.LeapDocument(doc_id='id')          doc.content = simpledoc          # encrypt doc -        doc.set_json(encrypt_doc(self._soledad._crypto, doc)) -        self.assertTrue(MAC_KEY in doc.content) -        self.assertTrue(MAC_METHOD_KEY in doc.content) +        doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc)) +        self.assertTrue(leap_backend.MAC_KEY in doc.content) +        self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content)          # mess with MAC method -        doc.content[MAC_METHOD_KEY] = 'mymac' +        doc.content[leap_backend.MAC_METHOD_KEY] = 'mymac'          # try to decrypt doc          self.assertRaises( -            UnknownMacMethod, -            decrypt_doc, self._soledad._crypto, doc) +            leap_backend.UnknownMacMethod, +            leap_backend.decrypt_doc, self._soledad._crypto, doc) + + +class SoledadCryptoTestCase(BaseSoledadTest): + +    def test_encrypt_decrypt_sym(self): +        # generate 256-bit key +        key = Random.new().read(32) +        iv, cyphertext = self._soledad._crypto.encrypt_sym( +            'data', key, +            method=crypto.EncryptionMethods.AES_256_CTR) +        self.assertTrue(cyphertext is not None) +        self.assertTrue(cyphertext != '') +        self.assertTrue(cyphertext != 'data') +        plaintext = self._soledad._crypto.decrypt_sym( +            cyphertext, key, iv=iv, +            method=crypto.EncryptionMethods.AES_256_CTR) +        self.assertEqual('data', plaintext) + +    def test_decrypt_with_wrong_iv_fails(self): +        key = Random.new().read(32) +        iv, cyphertext = self._soledad._crypto.encrypt_sym( +            'data', key, +            method=crypto.EncryptionMethods.AES_256_CTR) +        self.assertTrue(cyphertext is not None) +        self.assertTrue(cyphertext != '') +        self.assertTrue(cyphertext != 'data') +        # get a different iv by changing the first byte +        rawiv = binascii.a2b_base64(iv) +        wrongiv = rawiv +        while wrongiv == rawiv: +            wrongiv = os.urandom(1) + rawiv[1:] +        plaintext = self._soledad._crypto.decrypt_sym( +            cyphertext, key, iv=binascii.b2a_base64(wrongiv), +            method=crypto.EncryptionMethods.AES_256_CTR) +        self.assertNotEqual('data', plaintext) + +    def test_decrypt_with_wrong_key_fails(self): +        key = Random.new().read(32) +        iv, cyphertext = self._soledad._crypto.encrypt_sym( +            'data', key, +            method=crypto.EncryptionMethods.AES_256_CTR) +        self.assertTrue(cyphertext is not None) +        self.assertTrue(cyphertext != '') +        self.assertTrue(cyphertext != 'data') +        wrongkey = Random.new().read(32)  # 256-bits key +        # ensure keys are different in case we are extremely lucky +        while wrongkey == key: +            wrongkey = Random.new().read(32) +        plaintext = self._soledad._crypto.decrypt_sym( +            cyphertext, wrongkey, iv=iv, +            method=crypto.EncryptionMethods.AES_256_CTR) +        self.assertNotEqual('data', plaintext) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py index 09711f19..21d36771 100644 --- a/src/leap/soledad/tests/test_soledad.py +++ b/src/leap/soledad/tests/test_soledad.py @@ -165,7 +165,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):      def setUp(self):          BaseSoledadTest.setUp(self)          # mock signaling -        soledad.events.signal = Mock() +        soledad.signal = Mock()      def tearDown(self):          pass @@ -179,54 +179,54 @@ class SoledadSignalingTestCase(BaseSoledadTest):          """          Test that a fresh soledad emits all bootstrap signals.          """ -        soledad.events.signal.reset_mock() +        soledad.signal.reset_mock()          # get a fresh instance so it emits all bootstrap signals          sol = self._soledad_instance(              secrets_path='alternative.json',              local_db_path='alternative.u1db')          # reverse call order so we can verify in the order the signals were          # expected -        soledad.events.signal.mock_calls.reverse() -        soledad.events.signal.call_args = \ -            soledad.events.signal.call_args_list[0] -        soledad.events.signal.call_args_list.reverse() +        soledad.signal.mock_calls.reverse() +        soledad.signal.call_args = \ +            soledad.signal.call_args_list[0] +        soledad.signal.call_args_list.reverse()          # assert signals -        soledad.events.signal.assert_called_with( +        soledad.signal.assert_called_with(              proto.SOLEDAD_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_CREATING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_CREATING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_UPLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_UPLOADING_KEYS,              ADDRESS,          ) @@ -235,32 +235,32 @@ class SoledadSignalingTestCase(BaseSoledadTest):          """          Test that an existent soledad emits some of the bootstrap signals.          """ -        soledad.events.signal.reset_mock() +        soledad.signal.reset_mock()          # get an existent instance so it emits only some of bootstrap signals          sol = self._soledad_instance()          # reverse call order so we can verify in the order the signals were          # expected -        soledad.events.signal.mock_calls.reverse() -        soledad.events.signal.call_args = \ -            soledad.events.signal.call_args_list[0] -        soledad.events.signal.call_args_list.reverse() +        soledad.signal.mock_calls.reverse() +        soledad.signal.call_args = \ +            soledad.signal.call_args_list[0] +        soledad.signal.call_args_list.reverse()          # assert signals -        soledad.events.signal.assert_called_with( +        soledad.signal.assert_called_with(              proto.SOLEDAD_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_UPLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.events.signal) -        soledad.events.signal.assert_called_with( +        self._pop_mock_call(soledad.signal) +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_UPLOADING_KEYS,              ADDRESS,          ) @@ -269,7 +269,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):          """          Test Soledad emits SOLEDAD_CREATING_KEYS signal.          """ -        soledad.events.signal.reset_mock() +        soledad.signal.reset_mock()          # get a fresh instance so it emits all bootstrap signals          sol = self._soledad_instance()          # mock the actual db sync so soledad does not try to connect to the @@ -278,7 +278,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):          # do the sync          sol.sync()          # assert the signal has been emitted -        soledad.events.signal.assert_called_with( +        soledad.signal.assert_called_with(              proto.SOLEDAD_DONE_DATA_SYNC,              ADDRESS,          ) @@ -287,7 +287,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):          """          Test Soledad emits SOLEDAD_CREATING_KEYS signal.          """ -        soledad.events.signal.reset_mock() +        soledad.signal.reset_mock()          sol = self._soledad_instance()          # mock the sync target          LeapSyncTarget.get_sync_info = Mock(return_value=[0, 0, 0, 0, 2]) @@ -296,7 +296,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):          # check for new data to sync          sol.need_sync('http://provider/userdb')          # assert the signal has been emitted -        soledad.events.signal.assert_called_with( +        soledad.signal.assert_called_with(              proto.SOLEDAD_NEW_DATA_TO_SYNC,              ADDRESS,          ) | 
