diff options
Diffstat (limited to 'testing/tests/client')
| -rw-r--r-- | testing/tests/client/test_aux_methods.py | 35 | ||||
| -rw-r--r-- | testing/tests/client/test_crypto.py | 165 | ||||
| -rw-r--r-- | testing/tests/client/test_deprecated_crypto.py | 8 | ||||
| -rw-r--r-- | testing/tests/client/test_http_client.py | 108 | ||||
| -rw-r--r-- | testing/tests/client/test_secrets.py | 159 | ||||
| -rw-r--r-- | testing/tests/client/test_shared_db.py | 38 | ||||
| -rw-r--r-- | testing/tests/client/test_signals.py | 98 | 
7 files changed, 300 insertions, 311 deletions
| diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py index 9b4a175f..729aa28a 100644 --- a/testing/tests/client/test_aux_methods.py +++ b/testing/tests/client/test_aux_methods.py @@ -19,12 +19,11 @@ Tests for general Soledad functionality.  """  import os -from twisted.internet import defer +from pytest import inlineCallbacks  from leap.soledad.client import Soledad  from leap.soledad.client.adbapi import U1DBConnectionPool -from leap.soledad.client.secrets import PassphraseTooShort -from leap.soledad.client.secrets import SecretsException +from leap.soledad.client._secrets.util import SecretsError  from test_soledad.util import BaseSoledadTest @@ -34,7 +33,7 @@ class AuxMethodsTestCase(BaseSoledadTest):      def test__init_dirs(self):          sol = self._soledad_instance(prefix='_init_dirs')          local_db_dir = os.path.dirname(sol.local_db_path) -        secrets_path = os.path.dirname(sol.secrets.secrets_path) +        secrets_path = os.path.dirname(sol.secrets_path)          self.assertTrue(os.path.isdir(local_db_dir))          self.assertTrue(os.path.isdir(secrets_path)) @@ -64,8 +63,8 @@ class AuxMethodsTestCase(BaseSoledadTest):          # instantiate without initializing so we just test          # _init_config_with_defaults()          sol = SoledadMock() -        sol._passphrase = u'' -        sol._server_url = '' +        sol.passphrase = u'' +        sol.server_url = ''          sol._init_config_with_defaults()          # assert value of local_db_path          self.assertEquals( @@ -85,14 +84,14 @@ class AuxMethodsTestCase(BaseSoledadTest):              cert_file=None)          self.assertEqual(              os.path.join(self.tempdir, 'value_3'), -            sol.secrets.secrets_path) +            sol.secrets_path)          self.assertEqual(              os.path.join(self.tempdir, 'value_2'),              sol.local_db_path) -        self.assertEqual('value_1', sol._server_url) +        self.assertEqual('value_1', sol.server_url)          sol.close() -    @defer.inlineCallbacks +    @inlineCallbacks      def test_change_passphrase(self):          """          Test if passphrase can be changed. @@ -108,7 +107,7 @@ class AuxMethodsTestCase(BaseSoledadTest):          sol.change_passphrase(u'654321')          sol.close() -        with self.assertRaises(SecretsException): +        with self.assertRaises(SecretsError):              self._soledad_instance(                  'leap@leap.se',                  passphrase=u'123', @@ -124,24 +123,10 @@ class AuxMethodsTestCase(BaseSoledadTest):          sol2.close() -    def test_change_passphrase_with_short_passphrase_raises(self): -        """ -        Test if attempt to change passphrase passing a short passphrase -        raises. -        """ -        sol = self._soledad_instance( -            'leap@leap.se', -            passphrase=u'123') -        # check that soledad complains about new passphrase length -        self.assertRaises( -            PassphraseTooShort, -            sol.change_passphrase, u'54321') -        sol.close() -      def test_get_passphrase(self):          """          Assert passphrase getter works fine.          """          sol = self._soledad_instance() -        self.assertEqual('123', sol._passphrase) +        self.assertEqual('123', sol.passphrase)          sol.close() diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 49a61438..399fdc99 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -19,7 +19,6 @@ Tests for cryptographic related stuff.  """  import binascii  import base64 -import hashlib  import json  import os @@ -111,7 +110,7 @@ class BlobTestCase(unittest.TestCase):          assert len(preamble) == _crypto.PACMAN.size          unpacked_data = _crypto.PACMAN.unpack(preamble) -        magic, sch, meth, ts, iv, doc_id, rev = unpacked_data +        magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data          assert magic == _crypto.BLOB_SIGNATURE_MAGIC          assert sch == 1          assert meth == _crypto.ENC_METHOD.aes_256_gcm @@ -186,99 +185,13 @@ class BlobTestCase(unittest.TestCase):              yield crypto.decrypt_doc(doc2) -class RecoveryDocumentTestCase(BaseSoledadTest): - -    def test_export_recovery_document_raw(self): -        rd = self._soledad.secrets._export_recovery_document() -        secret_id = rd[self._soledad.secrets.STORAGE_SECRETS_KEY].items()[0][0] -        # assert exported secret is the same -        secret = self._soledad.secrets._decrypt_storage_secret_version_1( -            rd[self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]) -        self.assertEqual(secret_id, self._soledad.secrets._secret_id) -        self.assertEqual(secret, self._soledad.secrets._secrets[secret_id]) -        # assert recovery document structure -        encrypted_secret = rd[ -            self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id] -        self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret) -        self.assertEquals( -            _crypto.ENC_METHOD.aes_256_gcm, -            encrypted_secret[self._soledad.secrets.CIPHER_KEY]) -        self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) -        self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) - -    def test_import_recovery_document(self, cipher='aes256'): -        rd = self._soledad.secrets._export_recovery_document(cipher) -        s = self._soledad_instance() -        s.secrets._import_recovery_document(rd) -        s.secrets.set_secret_id(self._soledad.secrets._secret_id) -        self.assertEqual(self._soledad.storage_secret, -                         s.storage_secret, -                         'Failed settinng secret for symmetric encryption.') -        s.close() - -    def test_import_GCM_recovery_document(self): -        cipher = self._soledad.secrets.CIPHER_AES256_GCM -        self.test_import_recovery_document(cipher) - -    def test_import_legacy_CTR_recovery_document(self): -        cipher = self._soledad.secrets.CIPHER_AES256 -        self.test_import_recovery_document(cipher) - -  class SoledadSecretsTestCase(BaseSoledadTest): -    def test_new_soledad_instance_generates_one_secret(self): -        self.assertTrue( -            self._soledad.storage_secret is not None, -            "Expected secret to be something different than None") -        number_of_secrets = len(self._soledad.secrets._secrets) -        self.assertTrue( -            number_of_secrets == 1, -            "Expected exactly 1 secret, got %d instead." % number_of_secrets) - -    def test_generated_secret_is_of_correct_type(self): -        expected_type = str -        self.assertIsInstance( -            self._soledad.storage_secret, expected_type, -            "Expected secret to be of type %s" % expected_type) - -    def test_generated_secret_has_correct_lengt(self): -        expected_length = self._soledad.secrets.GEN_SECRET_LENGTH -        actual_length = len(self._soledad.storage_secret) -        self.assertTrue( -            expected_length == actual_length, -            "Expected secret with length %d, got %d instead." -            % (expected_length, actual_length)) - -    def test_generated_secret_id_is_sha256_hash_of_secret(self): -        generated = self._soledad.secrets.secret_id -        expected = hashlib.sha256(self._soledad.storage_secret).hexdigest() -        self.assertTrue( -            generated == expected, -            "Expeceted generated secret id to be sha256 hash, got something " -            "else instead.") - -    def test_generate_new_secret_generates_different_secret_id(self): -        # generate new secret -        secret_id_1 = self._soledad.secrets.secret_id -        secret_id_2 = self._soledad.secrets._gen_secret() -        self.assertTrue( -            len(self._soledad.secrets._secrets) == 2, -            "Expected exactly 2 secrets.") -        self.assertTrue( -            secret_id_1 != secret_id_2, -            "Expected IDs of secrets to be distinct.") -        self.assertTrue( -            secret_id_1 in self._soledad.secrets._secrets, -            "Expected to find ID of first secret in Soledad Secrets.") -        self.assertTrue( -            secret_id_2 in self._soledad.secrets._secrets, -            "Expected to find ID of second secret in Soledad Secrets.") - -    def test__has_secret(self): -        self.assertTrue( -            self._soledad._secrets._has_secret(), -            "Should have a secret at this point") +    def test_generated_secrets_have_correct_length(self): +        expected = self._soledad.secrets.lengths +        for name, length in expected.iteritems(): +            secret = getattr(self._soledad.secrets, name) +            self.assertEqual(length, len(secret))  class SoledadCryptoAESTestCase(BaseSoledadTest): @@ -322,10 +235,74 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):              _crypto.decrypt_sym(cyphertext, wrongkey, iv) -def _aes_encrypt(key, iv, data): +class PreambleTestCase(unittest.TestCase): +    class doc_info: +        doc_id = 'D-deadbeef' +        rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + +    def setUp(self): +        self.cleartext = BytesIO(snowden1) +        self.blob = _crypto.BlobEncryptor( +            self.doc_info, self.cleartext, +            secret='A' * 96) + +    def test_preamble_starts_with_magic_signature(self): +        preamble = self.blob._encode_preamble() +        assert preamble.startswith(_crypto.BLOB_SIGNATURE_MAGIC) + +    def test_preamble_has_cipher_metadata(self): +        preamble = self.blob._encode_preamble() +        unpacked = _crypto.PACMAN.unpack(preamble) +        encryption_scheme, encryption_method = unpacked[1:3] +        assert encryption_scheme in _crypto.ENC_SCHEME +        assert encryption_method in _crypto.ENC_METHOD +        assert unpacked[4] == self.blob.iv + +    def test_preamble_has_document_sync_metadata(self): +        preamble = self.blob._encode_preamble() +        unpacked = _crypto.PACMAN.unpack(preamble) +        doc_id, doc_rev = unpacked[5:7] +        assert doc_id == self.doc_info.doc_id +        assert doc_rev == self.doc_info.rev + +    def test_preamble_has_document_size(self): +        preamble = self.blob._encode_preamble() +        unpacked = _crypto.PACMAN.unpack(preamble) +        size = unpacked[7] +        assert size == len(snowden1) + +    @defer.inlineCallbacks +    def test_preamble_can_come_without_size(self): +        # XXX: This test case is here only to test backwards compatibility! +        preamble = self.blob._encode_preamble() +        # repack preamble using legacy format, without doc size +        unpacked = _crypto.PACMAN.unpack(preamble) +        preamble_without_size = _crypto.LEGACY_PACMAN.pack(*unpacked[0:7]) +        # encrypt it manually for custom tag +        ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv, +                                       self.cleartext.getvalue(), +                                       aead=preamble_without_size) +        ciphertext = ciphertext + tag +        # encode it +        ciphertext = base64.urlsafe_b64encode(ciphertext) +        preamble_without_size = base64.urlsafe_b64encode(preamble_without_size) +        # decrypt it +        ciphertext = preamble_without_size + ' ' + ciphertext +        cleartext = yield _crypto.BlobDecryptor( +            self.doc_info, BytesIO(ciphertext), +            secret='A' * 96).decrypt() +        assert cleartext == self.cleartext.getvalue() +        warnings = self.flushWarnings() +        assert len(warnings) == 1 +        assert 'legacy document without size' in warnings[0]['message'] + + +def _aes_encrypt(key, iv, data, aead=''):      backend = default_backend()      cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)      encryptor = cipher.encryptor() +    if aead: +        encryptor.authenticate_additional_data(aead)      return encryptor.update(data) + encryptor.finalize(), encryptor.tag diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py index 8ee3735c..1af1a130 100644 --- a/testing/tests/client/test_deprecated_crypto.py +++ b/testing/tests/client/test_deprecated_crypto.py @@ -1,5 +1,7 @@  import json -from twisted.internet import defer + +from pytest import inlineCallbacks +  from uuid import uuid4  from urlparse import urljoin @@ -39,7 +41,7 @@ class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):      def make_app_with_state(state):          return make_token_soledad_app(state) -    @defer.inlineCallbacks +    @inlineCallbacks      def test_touch_updates_remote_representation(self):          self.startTwistedServer()          user = 'user-' + uuid4().hex @@ -49,7 +51,7 @@ class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):              self._soledad_instance(user=user, server_url=server_url))          self.make_app() -        remote = self.request_state._create_database(replica_uid=client._uuid) +        remote = self.request_state._create_database(replica_uid=client.uuid)          remote = CouchDatabase.open_database(              urljoin(self.couch_url, 'user-' + user),              create=True) diff --git a/testing/tests/client/test_http_client.py b/testing/tests/client/test_http_client.py deleted file mode 100644 index a107930a..00000000 --- a/testing/tests/client/test_http_client.py +++ /dev/null @@ -1,108 +0,0 @@ -# -*- coding: utf-8 -*- -# test_http_client.py -# Copyright (C) 2013-2016 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test Leap backend bits: sync target -""" -import json - -from testscenarios import TestWithScenarios - -from leap.soledad.client import auth -from leap.soledad.common.l2db.remote import http_client -from test_soledad.u1db_tests import test_http_client -from leap.soledad.server.auth import SoledadTokenAuthMiddleware - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_http_client`. -# ----------------------------------------------------------------------------- - -class TestSoledadClientBase( -        TestWithScenarios, -        test_http_client.TestHTTPClientBase): - -    """ -    This class should be used to test Token auth. -    """ - -    def getClient(self, **kwds): -        cli = self.getClientWithToken(**kwds) -        if 'creds' not in kwds: -            cli.set_token_credentials('user-uuid', 'auth-token') -        return cli - -    def getClientWithToken(self, **kwds): -        self.startServer() - -        class _HTTPClientWithToken( -                http_client.HTTPClientBase, auth.TokenBasedAuth): - -            def set_token_credentials(self, uuid, token): -                auth.TokenBasedAuth.set_token_credentials(self, uuid, token) - -            def _sign_request(self, method, url_query, params): -                return auth.TokenBasedAuth._sign_request( -                    self, method, url_query, params) - -        return _HTTPClientWithToken(self.getURL('dbase'), **kwds) - -    def app(self, environ, start_response): -        res = test_http_client.TestHTTPClientBase.app( -            self, environ, start_response) -        if res is not None: -            return res -        # mime solead application here. -        if '/token' in environ['PATH_INFO']: -            auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY) -            if not auth: -                start_response("401 Unauthorized", -                               [('Content-Type', 'application/json')]) -                return [ -                    json.dumps( -                        {"error": "unauthorized", -                         "message": "no token found in environment"}) -                ] -            scheme, encoded = auth.split(None, 1) -            if scheme.lower() != 'token': -                start_response("401 Unauthorized", -                               [('Content-Type', 'application/json')]) -                return [json.dumps({"error": "unauthorized", -                                    "message": "unknown scheme: %s" % scheme})] -            uuid, token = encoded.decode('base64').split(':', 1) -            if uuid != 'user-uuid' and token != 'auth-token': -                return Exception("Incorrect address or token.") -            start_response("200 OK", [('Content-Type', 'application/json')]) -            return [json.dumps([environ['PATH_INFO'], uuid, token])] - -    def test_token(self): -        """ -        Test if token is sent correctly. -        """ -        cli = self.getClientWithToken() -        cli.set_token_credentials('user-uuid', 'auth-token') -        res, headers = cli._request('GET', ['doc', 'token']) -        self.assertEqual( -            ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) - -    def test_token_ctr_creds(self): -        cli = self.getClientWithToken(creds={'token': { -            'uuid': 'user-uuid', -            'token': 'auth-token', -        }}) -        res, headers = cli._request('GET', ['doc', 'token']) -        self.assertEqual( -            ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) diff --git a/testing/tests/client/test_secrets.py b/testing/tests/client/test_secrets.py new file mode 100644 index 00000000..18ff458b --- /dev/null +++ b/testing/tests/client/test_secrets.py @@ -0,0 +1,159 @@ +# -*- CODing: utf-8 -*- +# test_secrets.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for secrets encryption and decryption. +""" +import scrypt + +from twisted.trial import unittest + +from leap.soledad.client._crypto import ENC_METHOD +from leap.soledad.client._secrets import SecretsCrypto + + +class SecretsCryptoTestCase(unittest.TestCase): + +    SECRETS = { +        'remote_secret': 'a' * 512, +        'local_salt': 'b' * 64, +        'local_secret': 'c' * 448 +    } +    ENCRYPTED_V2 = { +        'cipher': 2, +        'length': 1437, +        'kdf_salt': 'TSgNLeAGFeITeSgNzmYZHh+mzmkZPOqao7CAV/tx3KZCLwsrT0HmWtVK3' +                    'TyWHWNgVdeamMZYRuvZavE2sp0DGw==\n', +        'iv': 'TKZQKIlRgdnXFhJf08qswg==', +        'secrets': 'ZNZRi72VDtwZqyuU+uf3yzZt23vCtMS3Ki2bnZyeHUOSGVweJeDadF4oqE' +                   'BW87NN00j9E49BzyzLr9SNgwZjPp0wlUm7kt+s8EUfJUdH8nxaQ+9iqGXM' +                   'cCHmBM8L8DRN2m3BrPGx7m+QGlN9sbrRpl7fqc46RWcYuTEpm4upjdtI7O' +                   'jDd0JG3C0rUzIuKJn9w4rEpX3tLEKXVdZfLvRXS5roR0cauazsDO69E13q' +                   'a01vDuY+UJ+buLQ3FluPnnk8QE7ztPVUmRJJ76yAIhjVX9owiwlp9GnUJY' +                   'sETRCqdRSTwUcHIkzVR0zAvtxTX7eGTitzf4gCYEC4T9v5N/jHxEfPdx28' +                   'MM4KShWN2nFxNFQLQUpMN2OrM7UyUw+DQ3ydqBeBPKPHRN5s05kIK7P/Ra' +                   'aLNcrJWa7DopLbgLlei0Jd7S4sjv1ufaRY7v0qJaVkhh/VaCylTSVw1rv5' +                   'YzSWcHHcLuC0R8xLadz6T+EpsVYxgPYCS7w5xoE82zwNQzw/EBxLIcyLPl' +                   'ipKnr2dttrmm3KXUOT1IdbSbI5elF6yQTAusdqiXuypey+MDqHYWEYWkCn' +                   'e9/uGM9FjklDLE0RtPEDxhq64tw6u2Xu7RzDzyQDI8EIoTdU+4zEMTnelZ' +                   'fKEwdG58EDxTXfUk6IDcRUupz3YuToSMhIOkqgXnbWl/nrK0O9v4JMhQjI' +                   'r+oPICYfFr14kvJXBsfntILTJCxzbqTQcNba3jc8rGqCZ6gM0u4PndwTG2' +                   'UiCqPU2HMnWvVGQOXeLdQY+EqqXQiRDi0DrDmkVwFf+27dPXxmZ43C48W3' +                   'lMhTKXl0rdBFnOD5jiMh0X6q/KYXonyEtMZMsjT7dFePcCy4wQRhuut+ac' +                   '/TJWyrr+/IB45E+LZbhV7xCy1dYsbdb52jTRJFpaQ83sj6Iv6SYdiqqXzL' +                   'F5JGMyuovTjwAoIIQzpLv36xY2wGGAH1V8c7QmDR2qumXrHD9R68WjBoSY' +                   '7IFM0TFAGZNun56y/zQ4r8yOMSAId+j4kuRH0fENEi0FJ+SpmSdHfpvBhE' +                   'MdGh927E9enEYWmUQMmkxXIw6E+O3cmOWt2hsMbUAikDCpQOnVP2BD55HT' +                   '6FfbW7ITVwPINHYexmy2Xcm8H5zzGFSp+uYIyPBYDKA+VJ+QQI8bud9K+T' +                   'NBybUv9u6LbB6BsLpwLoxMPJu0WsN2HpmLYgrg2ML1huMF1OtaGRuUr2PL' +                   'NBaZaL6VOztYrVtQG1+tNyRxn8XQTtx0l6n+EihGVe9Sk5XF6DJA9ZN7uO' +                   'svTUFJ5qG3Erf4AmbUJWoOR/NvapBtifidM7gPZZ6NqBs6v72rU1pGy+p7' +                   'o84KrmB2MNf3yJ0BvKxPvFmltF3Dc7LB5TN8ycbmFM6hgrLvvhPxiHEnG/' +                   '8Qcrg0nUXOipFGNgZEU7t7Mz6RJ189Z2Kx1HVGrkAzEgqwZYqijAPlsgzO' +                   'bg6DwzwC7stolQWGCDQUtJVlE8FZ/Up8zFYYZKn52WzjmSN4/hHhEvdkck' +                   'Nez/JVev6fMcVrgdrTZ+uCwxjN/4xPdgog2HV470ea1bvIkSNOOrhm194M' +                   '40GmvmBhiSSMjdRQCQtM0t9bUuSQLPDzEiCA9QaLyygtlz9uRR/dXgkEg0' +                   'J4YNpZvhE0wbyp4GHytbPaAmrcd7im9+buTuMwhXpZl0stmfkJxVHJSZ8Y' +                   'IHakHs3W1fdYyI3wxGpel/9eYO3ISukolwrHXESP65wVNKfBwbqVJzQmts' +                   'pyDBOI6DcLKZfE1EVg0+uwQ/5PKZbn0TwlXO1YE3NL3mAply3zQR9hyBrY' +                   '6f1jkHVD3irIlWkSiPJsP8sW+nrK8c/Ha8F+dua6DTZmg594OIaQj8mPiY' +                   'GcIusiARWocR5/MmSjupGOgFx4HtmckTJtAta3XP4elOx04teH/P9Cgr1x' +                   'XYf+cEX6gp92L9rTo0FCz3Hw==\n', +        'version': 2, +        'kdf': 'scrypt', +        'kdf_length': 32 +    } + +    ENCRYPTED_V1 = { +        'version': 1, +        'active_secret': 'secret_id', +        'storage_secrets': { +            'secret_id': { +                'kdf': 'scrypt', +                'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h' +                          'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F' +                          'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh' +                          'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp' +                          'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR' +                          'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050' +                          'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc' +                          'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6' +                          'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9' +                          'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC' +                          'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF' +                          'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa' +                          'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw' +                          'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC' +                          'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu' +                          'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/' +                          'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS' +                          'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v' +                          'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj' +                          'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV' +                          'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk' +                          'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV' +                          'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8' +                          'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ' +                          '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx' +                          'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4' +                          'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw' +                          '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==', +                'kdf_length': 32, +                'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB' +                            'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n', +                'length': 1024, +                'cipher': 'aes256', +            } +        } +    } + +    def setUp(self): +        class Soledad(object): +            passphrase = '123' +        soledad = Soledad() +        self._crypto = SecretsCrypto(soledad) + +    def test__get_key(self): +        salt = 'abc' +        expected = scrypt.hash('123', salt, buflen=32) +        key = self._crypto._get_key(salt) +        self.assertEqual(expected, key) + +    def test_encrypt(self): +        info = self._crypto.encrypt(self.SECRETS) +        self.assertEqual(8, len(info)) +        for key, value in [ +                ('kdf', 'scrypt'), +                ('kdf_salt', None), +                ('kdf_length', None), +                ('cipher', ENC_METHOD.aes_256_gcm), +                ('length', None), +                ('iv', None), +                ('secrets', None), +                ('version', 2)]: +            self.assertTrue(key in info) +            if value: +                self.assertEqual(info[key], value) + +    def test__decrypt_v2(self): +        encrypted = self.ENCRYPTED_V2 +        decrypted = self._crypto.decrypt(encrypted) +        self.assertEqual(decrypted, self.SECRETS) + +    def test__decrypt_v1(self): +        encrypted = self.ENCRYPTED_V1 +        decrypted = self._crypto.decrypt(encrypted) +        self.assertEqual(decrypted, self.SECRETS) diff --git a/testing/tests/client/test_shared_db.py b/testing/tests/client/test_shared_db.py index aac766c2..b045e524 100644 --- a/testing/tests/client/test_shared_db.py +++ b/testing/tests/client/test_shared_db.py @@ -2,7 +2,6 @@ from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.shared_db import SoledadSharedDatabase  from test_soledad.util import BaseSoledadTest -from test_soledad.util import ADDRESS  class SoledadSharedDBTestCase(BaseSoledadTest): @@ -14,37 +13,28 @@ class SoledadSharedDBTestCase(BaseSoledadTest):      def setUp(self):          BaseSoledadTest.setUp(self)          self._shared_db = SoledadSharedDatabase( -            'https://provider/', ADDRESS, document_factory=SoledadDocument, +            'https://provider/', document_factory=SoledadDocument,              creds=None)      def tearDown(self):          BaseSoledadTest.tearDown(self) -    def test__get_secrets_from_shared_db(self): +    def test__get_remote_doc(self):          """          Ensure the shared db is queried with the correct doc_id.          """ -        doc_id = self._soledad.secrets._shared_db_doc_id() -        self._soledad.secrets._get_secrets_from_shared_db() -        self.assertTrue( -            self._soledad.shared_db.get_doc.assert_called_with( -                doc_id) is None, -            'Wrong doc_id when fetching recovery document.') - -    def test__put_secrets_in_shared_db(self): +        doc_id = self._soledad.secrets.storage._remote_doc_id() +        self._soledad.secrets.storage._get_remote_doc() +        self._soledad.secrets.storage._shared_db.get_doc.assert_called_with( +            doc_id) + +    def test_save_remote(self):          """          Ensure recovery document is put into shared recover db.          """ -        doc_id = self._soledad.secrets._shared_db_doc_id() -        self._soledad.secrets._put_secrets_in_shared_db() -        self.assertTrue( -            self._soledad.shared_db.get_doc.assert_called_with( -                doc_id) is None, -            'Wrong doc_id when fetching recovery document.') -        self.assertTrue( -            self._soledad.shared_db.put_doc.assert_called_with( -                self._doc_put) is None, -            'Wrong document when putting recovery document.') -        self.assertTrue( -            self._doc_put.doc_id == doc_id, -            'Wrong doc_id when putting recovery document.') +        doc_id = self._soledad.secrets.storage._remote_doc_id() +        storage = self._soledad.secrets.storage +        storage.save_remote({'content': 'blah'}) +        storage._shared_db.get_doc.assert_called_with(doc_id) +        storage._shared_db.put_doc.assert_called_with(self._doc_put) +        self.assertTrue(self._doc_put.doc_id == doc_id) diff --git a/testing/tests/client/test_signals.py b/testing/tests/client/test_signals.py index 4e9ebfd0..c7609a74 100644 --- a/testing/tests/client/test_signals.py +++ b/testing/tests/client/test_signals.py @@ -20,7 +20,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):      def setUp(self):          # mock signaling          soledad.client.signal = Mock() -        soledad.client.secrets.events.emit_async = Mock() +        soledad.client._secrets.util.events.emit_async = Mock()          # run parent's setUp          BaseSoledadTest.setUp(self) @@ -42,55 +42,36 @@ class SoledadSignalingTestCase(BaseSoledadTest):            - downloading keys / done downloading keys.            - uploading keys / done uploading keys.          """ -        soledad.client.secrets.events.emit_async.reset_mock() +        soledad.client._secrets.util.events.emit_async.reset_mock()          # get a fresh instance so it emits all bootstrap signals          sol = self._soledad_instance(              secrets_path='alternative_stage3.json',              local_db_path='alternative_stage3.u1db')          # reverse call order so we can verify in the order the signals were          # expected -        soledad.client.secrets.events.emit_async.mock_calls.reverse() -        soledad.client.secrets.events.emit_async.call_args = \ -            soledad.client.secrets.events.emit_async.call_args_list[0] -        soledad.client.secrets.events.emit_async.call_args_list.reverse() +        soledad.client._secrets.util.events.emit_async.mock_calls.reverse() +        soledad.client._secrets.util.events.emit_async.call_args = \ +            soledad.client._secrets.util.events.emit_async.call_args_list[0] +        soledad.client._secrets.util.events.emit_async.call_args_list.reverse()          user_data = {'userid': ADDRESS, 'uuid': ADDRESS} -        # downloading keys signals -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DOWNLOADING_KEYS, user_data -        ) -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data -        ) -        # creating keys signals -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_CREATING_KEYS, user_data -        ) -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DONE_CREATING_KEYS, user_data -        ) -        # downloading once more (inside _put_keys_in_shared_db) -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DOWNLOADING_KEYS, user_data -        ) -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data -        ) -        # uploading keys signals -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_UPLOADING_KEYS, user_data -        ) -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data -        ) +        def _assert(*args, **kwargs): +            mocked = soledad.client._secrets.util.events.emit_async +            mocked.assert_called_with(*args) +            pop = kwargs.get('pop') +            if pop or pop is None: +                self._pop_mock_call(mocked) + +        _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) +        _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) +        _assert(catalog.SOLEDAD_CREATING_KEYS, user_data) +        _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data) +        _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data) +        _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) +        _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) +        _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False) +          sol.close()      def test_stage2_bootstrap_signals(self): @@ -101,11 +82,11 @@ class SoledadSignalingTestCase(BaseSoledadTest):          # get existing instance so we have access to keys          sol = self._soledad_instance()          # create a document with secrets -        doc = SoledadDocument(doc_id=sol.secrets._shared_db_doc_id()) -        doc.content = sol.secrets._export_recovery_document() +        doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id()) +        doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets)          sol.close()          # reset mock -        soledad.client.secrets.events.emit_async.reset_mock() +        soledad.client._secrets.util.events.emit_async.reset_mock()          # get a fresh instance so it emits all bootstrap signals          shared_db = self.get_default_shared_mock(get_doc_return_value=doc)          sol = self._soledad_instance( @@ -114,20 +95,23 @@ class SoledadSignalingTestCase(BaseSoledadTest):              shared_db_class=shared_db)          # reverse call order so we can verify in the order the signals were          # expected -        soledad.client.secrets.events.emit_async.mock_calls.reverse() -        soledad.client.secrets.events.emit_async.call_args = \ -            soledad.client.secrets.events.emit_async.call_args_list[0] -        soledad.client.secrets.events.emit_async.call_args_list.reverse() +        mocked = soledad.client._secrets.util.events.emit_async +        mocked.mock_calls.reverse() +        mocked.call_args = mocked.call_args_list[0] +        mocked.call_args_list.reverse() + +        def _assert(*args, **kwargs): +            mocked = soledad.client._secrets.util.events.emit_async +            mocked.assert_called_with(*args) +            pop = kwargs.get('pop') +            if pop or pop is None: +                self._pop_mock_call(mocked) +          # assert download keys signals -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DOWNLOADING_KEYS, -            {'userid': ADDRESS, 'uuid': ADDRESS} -        ) -        self._pop_mock_call(soledad.client.secrets.events.emit_async) -        soledad.client.secrets.events.emit_async.assert_called_with( -            catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, -            {'userid': ADDRESS, 'uuid': ADDRESS}, -        ) +        user_data = {'userid': ADDRESS, 'uuid': ADDRESS} +        _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) +        _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False) +          sol.close()      def test_stage1_bootstrap_signals(self): | 
