From cfff46ff9becdbe5cf48816870e625ed253ecc57 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 17 Sep 2017 12:08:25 -0300 Subject: [refactor] move tests to root of repository Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952 --- testing/tests/client/__init__.py | 0 testing/tests/client/test_api.py | 36 -- testing/tests/client/test_app.py | 52 --- testing/tests/client/test_attachments.py | 81 ----- testing/tests/client/test_aux_methods.py | 132 ------- testing/tests/client/test_crypto.py | 384 --------------------- testing/tests/client/test_deprecated_crypto.py | 94 ----- testing/tests/client/test_doc.py | 50 --- testing/tests/client/test_http.py | 60 ---- testing/tests/client/test_https.py | 137 -------- .../tests/client/test_incoming_processing_flow.py | 197 ----------- testing/tests/client/test_recovery_code.py | 38 -- testing/tests/client/test_secrets.py | 166 --------- testing/tests/client/test_shared_db.py | 40 --- testing/tests/client/test_signals.py | 149 -------- testing/tests/client/test_soledad_doc.py | 46 --- 16 files changed, 1662 deletions(-) delete mode 100644 testing/tests/client/__init__.py delete mode 100644 testing/tests/client/test_api.py delete mode 100644 testing/tests/client/test_app.py delete mode 100644 testing/tests/client/test_attachments.py delete mode 100644 testing/tests/client/test_aux_methods.py delete mode 100644 testing/tests/client/test_crypto.py delete mode 100644 testing/tests/client/test_deprecated_crypto.py delete mode 100644 testing/tests/client/test_doc.py delete mode 100644 testing/tests/client/test_http.py delete mode 100644 testing/tests/client/test_https.py delete mode 100644 testing/tests/client/test_incoming_processing_flow.py delete mode 100644 testing/tests/client/test_recovery_code.py delete mode 100644 testing/tests/client/test_secrets.py delete mode 100644 testing/tests/client/test_shared_db.py delete mode 100644 testing/tests/client/test_signals.py delete mode 100644 testing/tests/client/test_soledad_doc.py (limited to 'testing/tests/client') diff --git a/testing/tests/client/__init__.py b/testing/tests/client/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/testing/tests/client/test_api.py b/testing/tests/client/test_api.py deleted file mode 100644 index 3c6a8155..00000000 --- a/testing/tests/client/test_api.py +++ /dev/null @@ -1,36 +0,0 @@ -# -*- coding: utf-8 -*- -# test_api.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for soledad api. -""" - -from mock import MagicMock - -from test_soledad.util import BaseSoledadTest - - -class ApiTestCase(BaseSoledadTest): - - def test_recovery_code_creation(self): - recovery_code_mock = MagicMock() - generated_code = '4645a2f8997e5d0d' - recovery_code_mock.generate.return_value = generated_code - self._soledad._recovery_code = recovery_code_mock - - code = self._soledad.create_recovery_code() - - self.assertEqual(generated_code, code) diff --git a/testing/tests/client/test_app.py b/testing/tests/client/test_app.py deleted file mode 100644 index 6867473e..00000000 --- a/testing/tests/client/test_app.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding: utf-8 -*- -# test_soledad_app.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Test ObjectStore and Couch backend bits. -""" -import pytest - -from testscenarios import TestWithScenarios - -from test_soledad.util import BaseSoledadTest -from test_soledad.util import make_soledad_document_for_test -from test_soledad.util import make_token_soledad_app -from test_soledad.util import make_token_http_database_for_test -from test_soledad.util import copy_token_http_database_for_test -from test_soledad.u1db_tests import test_backends - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_backends`. -# ----------------------------------------------------------------------------- - -@pytest.mark.usefixtures('method_tmpdir') -class SoledadTests( - TestWithScenarios, test_backends.AllDatabaseTests, BaseSoledadTest): - - def setUp(self): - TestWithScenarios.setUp(self) - test_backends.AllDatabaseTests.setUp(self) - BaseSoledadTest.setUp(self) - - scenarios = [ - ('token_http', { - 'make_database_for_test': make_token_http_database_for_test, - 'copy_database_for_test': copy_token_http_database_for_test, - 'make_document_for_test': make_soledad_document_for_test, - 'make_app_with_state': make_token_soledad_app, - }) - ] diff --git a/testing/tests/client/test_attachments.py b/testing/tests/client/test_attachments.py deleted file mode 100644 index 2df5b90d..00000000 --- a/testing/tests/client/test_attachments.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- -# test_attachments.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for document attachments. -""" - -import pytest - -from io import BytesIO -from mock import Mock - -from twisted.internet import defer -from test_soledad.util import BaseSoledadTest - - -from leap.soledad.client import AttachmentStates - - -def mock_response(doc): - doc._manager._client.get = Mock( - return_value=defer.succeed(Mock(code=200, json=lambda: []))) - doc._manager._client.put = Mock( - return_value=defer.succeed(Mock(code=200))) - - -@pytest.mark.usefixture('method_tmpdir') -class AttachmentTests(BaseSoledadTest): - - @defer.inlineCallbacks - def test_create_doc_saves_store(self): - doc = yield self._soledad.create_doc({}) - self.assertEqual(self._soledad, doc.store) - - @defer.inlineCallbacks - def test_put_attachment(self): - doc = yield self._soledad.create_doc({}) - mock_response(doc) - yield doc.put_attachment(BytesIO('test')) - local_list = yield doc._manager.local_list() - self.assertIn(doc._blob_id, local_list) - - @defer.inlineCallbacks - def test_get_attachment(self): - doc = yield self._soledad.create_doc({}) - mock_response(doc) - yield doc.put_attachment(BytesIO('test')) - fd = yield doc.get_attachment() - self.assertEqual('test', fd.read()) - - @defer.inlineCallbacks - def test_get_attachment_state(self): - doc = yield self._soledad.create_doc({}) - state = yield doc.get_attachment_state() - self.assertEqual(AttachmentStates.NONE, state) - mock_response(doc) - yield doc.put_attachment(BytesIO('test')) - state = yield doc.get_attachment_state() - self.assertEqual(AttachmentStates.LOCAL, state) - - @defer.inlineCallbacks - def test_is_dirty(self): - doc = yield self._soledad.create_doc({}) - dirty = yield doc.is_dirty() - self.assertFalse(dirty) - doc.content = {'test': True} - dirty = yield doc.is_dirty() - self.assertTrue(dirty) diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py deleted file mode 100644 index 1eb676c7..00000000 --- a/testing/tests/client/test_aux_methods.py +++ /dev/null @@ -1,132 +0,0 @@ -# -*- coding: utf-8 -*- -# test_soledad.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for general Soledad functionality. -""" -import os - -from pytest import inlineCallbacks - -from leap.soledad.client import Soledad -from leap.soledad.client._db.adbapi import U1DBConnectionPool -from leap.soledad.client._secrets.util import SecretsError - -from test_soledad.util import BaseSoledadTest - - -class AuxMethodsTestCase(BaseSoledadTest): - - def test__init_dirs(self): - sol = self._soledad_instance(prefix='_init_dirs') - local_db_dir = os.path.dirname(sol.local_db_path) - secrets_path = os.path.dirname(sol.secrets_path) - self.assertTrue(os.path.isdir(local_db_dir)) - self.assertTrue(os.path.isdir(secrets_path)) - - def _close_soledad(results): - sol.close() - - d = sol.create_doc({}) - d.addCallback(_close_soledad) - return d - - def test__init_u1db_sqlcipher_backend(self): - sol = self._soledad_instance(prefix='_init_db') - self.assertIsInstance(sol._dbpool, U1DBConnectionPool) - self.assertTrue(os.path.isfile(sol.local_db_path)) - sol.close() - - def test__init_config_with_defaults(self): - """ - Test if configuration defaults point to the correct place. - """ - - class SoledadMock(Soledad): - - def __init__(self): - pass - - # instantiate without initializing so we just test - # _init_config_with_defaults() - sol = SoledadMock() - sol.passphrase = u'' - sol.server_url = '' - sol._init_config_with_defaults() - # assert value of local_db_path - self.assertEquals( - os.path.join(sol.default_prefix, 'soledad.u1db'), - sol.local_db_path) - - def test__init_config_from_params(self): - """ - Test if configuration is correctly read from file. - """ - sol = self._soledad_instance( - 'leap@leap.se', - passphrase=u'123', - secrets_path='value_3', - local_db_path='value_2', - server_url='value_1', - cert_file=None) - self.assertEqual( - os.path.join(self.tempdir, 'value_3'), - sol.secrets_path) - self.assertEqual( - os.path.join(self.tempdir, 'value_2'), - sol.local_db_path) - self.assertEqual('value_1', sol.server_url) - sol.close() - - @inlineCallbacks - def test_change_passphrase(self): - """ - Test if passphrase can be changed. - """ - prefix = '_change_passphrase' - sol = self._soledad_instance( - 'leap@leap.se', - passphrase=u'123', - prefix=prefix, - ) - - doc1 = yield sol.create_doc({'simple': 'doc'}) - sol.change_passphrase(u'654321') - sol.close() - - with self.assertRaises(SecretsError): - self._soledad_instance( - 'leap@leap.se', - passphrase=u'123', - prefix=prefix) - - sol2 = self._soledad_instance( - 'leap@leap.se', - passphrase=u'654321', - prefix=prefix) - doc2 = yield sol2.get_doc(doc1.doc_id) - - self.assertEqual(doc1, doc2) - - sol2.close() - - def test_get_passphrase(self): - """ - Assert passphrase getter works fine. - """ - sol = self._soledad_instance() - self.assertEqual('123', sol.passphrase) - sol.close() diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py deleted file mode 100644 index 5b647b73..00000000 --- a/testing/tests/client/test_crypto.py +++ /dev/null @@ -1,384 +0,0 @@ -# -*- coding: utf-8 -*- -# test_crypto.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for cryptographic related stuff. -""" -import binascii -import base64 -import json -import os - -from io import BytesIO - -import pytest - -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend -from cryptography.exceptions import InvalidTag - -from leap.soledad.common.document import SoledadDocument -from test_soledad.util import BaseSoledadTest -from leap.soledad.client import _crypto -from leap.soledad.client import _scrypt -from leap.soledad.common.blobs import preamble as _preamble - -from twisted.trial import unittest -from twisted.internet import defer - - -snowden1 = ( - "You can't come up against " - "the world's most powerful intelligence " - "agencies and not accept the risk. " - "If they want to get you, over time " - "they will.") - - -class ScryptTest(unittest.TestCase): - - def test_scrypt(self): - secret = 'supersikret' - salt = 'randomsalt' - key = _scrypt.hash(secret, salt, buflen=32) - expected = ('47996b569ea58d51ccbcc318d710' - 'a537acd28bb7a94615ab8d061d4b2a920f01') - assert binascii.b2a_hex(key) == expected - - -class AESTest(unittest.TestCase): - - def test_chunked_encryption(self): - key = 'A' * 32 - - fd = BytesIO() - aes = _crypto.AESWriter(key, _buffer=fd) - iv = aes.iv - - data = snowden1 - block = 16 - - for i in range(len(data) / block): - chunk = data[i * block:(i + 1) * block] - aes.write(chunk) - aes.end() - - ciphertext_chunked = fd.getvalue() - ciphertext, tag = _aes_encrypt(key, iv, data) - - assert ciphertext_chunked == ciphertext - - def test_decrypt(self): - key = 'A' * 32 - iv = 'A' * 16 - - data = snowden1 - block = 16 - - ciphertext, tag = _aes_encrypt(key, iv, data) - - fd = BytesIO() - aes = _crypto.AESWriter(key, iv, fd, tag=tag) - - for i in range(len(ciphertext) / block): - chunk = ciphertext[i * block:(i + 1) * block] - aes.write(chunk) - aes.end() - - cleartext_chunked = fd.getvalue() - assert cleartext_chunked == data - - -class BlobTestCase(unittest.TestCase): - - class doc_info: - doc_id = 'D-deadbeef' - rev = '397932e0c77f45fcb7c3732930e7e9b2:1' - - def setUp(self): - self.inf = BytesIO(snowden1) - self.blob = _crypto.BlobEncryptor( - self.doc_info, self.inf, - armor=True, - secret='A' * 96) - - @defer.inlineCallbacks - def test_unarmored_blob_encrypt(self): - self.blob.armor = False - encrypted = yield self.blob.encrypt() - - decryptor = _crypto.BlobDecryptor( - self.doc_info, encrypted, armor=False, - secret='A' * 96) - decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_default_armored_blob_encrypt(self): - encrypted = yield self.blob.encrypt() - decode = base64.urlsafe_b64decode - assert map(decode, encrypted.getvalue().split()) - - @defer.inlineCallbacks - def test_blob_encryptor(self): - encrypted = yield self.blob.encrypt() - preamble, ciphertext = encrypted.getvalue().split() - preamble = base64.urlsafe_b64decode(preamble) - ciphertext = base64.urlsafe_b64decode(ciphertext) - ciphertext = ciphertext[:-16] - - assert len(preamble) == _preamble.PACMAN.size - unpacked_data = _preamble.PACMAN.unpack(preamble) - magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data - assert magic == _crypto.MAGIC - assert sch == 1 - assert meth == _crypto.ENC_METHOD.aes_256_gcm - assert iv == self.blob.iv - assert doc_id == 'D-deadbeef' - assert rev == self.doc_info.rev - - aes_key = _crypto._get_sym_key_for_doc( - self.doc_info.doc_id, 'A' * 96) - assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0] - - decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag, - ciphertext, preamble) - assert str(decrypted) == snowden1 - - @defer.inlineCallbacks - def test_init_with_preamble_alone(self): - ciphertext = yield self.blob.encrypt() - preamble = ciphertext.getvalue().split()[0] - decryptor = _crypto.BlobDecryptor( - self.doc_info, BytesIO(preamble), - start_stream=False, - secret='A' * 96) - assert decryptor._consume_preamble() - - @defer.inlineCallbacks - def test_incremental_blob_decryptor(self): - ciphertext = yield self.blob.encrypt() - preamble, ciphertext = ciphertext.getvalue().split() - ciphertext = base64.urlsafe_b64decode(ciphertext) - - decryptor = _crypto.BlobDecryptor( - self.doc_info, BytesIO(preamble), - start_stream=False, - secret='A' * 96, - tag=ciphertext[-16:]) - ciphertext = BytesIO(ciphertext[:-16]) - chunk = ciphertext.read(10) - while chunk: - decryptor.write(chunk) - chunk = ciphertext.read(10) - decrypted = decryptor._end_stream() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_blob_decryptor(self): - ciphertext = yield self.blob.encrypt() - - decryptor = _crypto.BlobDecryptor( - self.doc_info, ciphertext, - secret='A' * 96) - decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_unarmored_blob_decryptor(self): - self.blob.armor = False - ciphertext = yield self.blob.encrypt() - - decryptor = _crypto.BlobDecryptor( - self.doc_info, ciphertext, - armor=False, - secret='A' * 96) - decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_encrypt_and_decrypt(self): - """ - Check that encrypting and decrypting gives same doc. - """ - crypto = _crypto.SoledadCrypto('A' * 96) - payload = {'key': 'someval'} - doc1 = SoledadDocument('id1', '1', json.dumps(payload)) - - encrypted = yield crypto.encrypt_doc(doc1) - assert encrypted != payload - assert 'raw' in encrypted - doc2 = SoledadDocument('id1', '1') - doc2.set_json(encrypted) - assert _crypto.is_symmetrically_encrypted(encrypted) - decrypted = (yield crypto.decrypt_doc(doc2)).getvalue() - assert len(decrypted) != 0 - assert json.loads(decrypted) == payload - - @defer.inlineCallbacks - def test_decrypt_with_wrong_tag_raises(self): - """ - Trying to decrypt a document with wrong MAC should raise. - """ - crypto = _crypto.SoledadCrypto('A' * 96) - payload = {'key': 'someval'} - doc1 = SoledadDocument('id1', '1', json.dumps(payload)) - - encrypted = yield crypto.encrypt_doc(doc1) - encdict = json.loads(encrypted) - preamble, raw = str(encdict['raw']).split() - preamble = base64.urlsafe_b64decode(preamble) - raw = base64.urlsafe_b64decode(raw) - # mess with tag - messed = raw[:-16] + '0' * 16 - - preamble = base64.urlsafe_b64encode(preamble) - newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) - doc2 = SoledadDocument('id1', '1') - doc2.set_json(json.dumps({"raw": str(newraw)})) - - with pytest.raises(_crypto.InvalidBlob): - yield crypto.decrypt_doc(doc2) - - -class SoledadSecretsTestCase(BaseSoledadTest): - - def test_generated_secrets_have_correct_length(self): - expected = self._soledad.secrets.lengths - for name, length in expected.iteritems(): - secret = getattr(self._soledad.secrets, name) - self.assertEqual(length, len(secret)) - - -class SoledadCryptoAESTestCase(BaseSoledadTest): - - def test_encrypt_decrypt_sym(self): - # generate 256-bit key - key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - plaintext = _crypto.decrypt_sym(cyphertext, key, iv) - self.assertEqual('data', plaintext) - - def test_decrypt_with_wrong_iv_raises(self): - key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - # get a different iv by changing the first byte - rawiv = binascii.a2b_base64(iv) - wrongiv = rawiv - while wrongiv == rawiv: - wrongiv = os.urandom(1) + rawiv[1:] - with pytest.raises(InvalidTag): - _crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - - def test_decrypt_with_wrong_key_raises(self): - key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - wrongkey = os.urandom(32) # 256-bits key - # ensure keys are different in case we are extremely lucky - while wrongkey == key: - wrongkey = os.urandom(32) - with pytest.raises(InvalidTag): - _crypto.decrypt_sym(cyphertext, wrongkey, iv) - - -class PreambleTestCase(unittest.TestCase): - class doc_info: - doc_id = 'D-deadbeef' - rev = '397932e0c77f45fcb7c3732930e7e9b2:1' - - def setUp(self): - self.cleartext = BytesIO(snowden1) - self.blob = _crypto.BlobEncryptor( - self.doc_info, self.cleartext, - secret='A' * 96) - - def test_preamble_starts_with_magic_signature(self): - preamble = self.blob._encode_preamble() - assert preamble.startswith(_crypto.MAGIC) - - def test_preamble_has_cipher_metadata(self): - preamble = self.blob._encode_preamble() - unpacked = _preamble.PACMAN.unpack(preamble) - encryption_scheme, encryption_method = unpacked[1:3] - assert encryption_scheme in _crypto.ENC_SCHEME - assert encryption_method in _crypto.ENC_METHOD - assert unpacked[4] == self.blob.iv - - def test_preamble_has_document_sync_metadata(self): - preamble = self.blob._encode_preamble() - unpacked = _preamble.PACMAN.unpack(preamble) - doc_id, doc_rev = unpacked[5:7] - assert doc_id == self.doc_info.doc_id - assert doc_rev == self.doc_info.rev - - def test_preamble_has_document_size(self): - preamble = self.blob._encode_preamble() - unpacked = _preamble.PACMAN.unpack(preamble) - size = unpacked[7] - assert size == _crypto._ceiling(len(snowden1)) - - @defer.inlineCallbacks - def test_preamble_can_come_without_size(self): - # XXX: This test case is here only to test backwards compatibility! - preamble = self.blob._encode_preamble() - # repack preamble using legacy format, without doc size - unpacked = _preamble.PACMAN.unpack(preamble) - preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7]) - # encrypt it manually for custom tag - ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv, - self.cleartext.getvalue(), - aead=preamble_without_size) - ciphertext = ciphertext + tag - # encode it - ciphertext = base64.urlsafe_b64encode(ciphertext) - preamble_without_size = base64.urlsafe_b64encode(preamble_without_size) - # decrypt it - ciphertext = preamble_without_size + ' ' + ciphertext - cleartext = yield _crypto.BlobDecryptor( - self.doc_info, BytesIO(ciphertext), - secret='A' * 96).decrypt() - assert cleartext.getvalue() == self.cleartext.getvalue() - warnings = self.flushWarnings() - assert len(warnings) == 1 - assert 'legacy preamble without size' in warnings[0]['message'] - - -def _aes_encrypt(key, iv, data, aead=''): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) - encryptor = cipher.encryptor() - if aead: - encryptor.authenticate_additional_data(aead) - return encryptor.update(data) + encryptor.finalize(), encryptor.tag - - -def _aes_decrypt(key, iv, tag, data, aead=''): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) - decryptor = cipher.decryptor() - if aead: - decryptor.authenticate_additional_data(aead) - return decryptor.update(data) + decryptor.finalize() diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py deleted file mode 100644 index 939a2003..00000000 --- a/testing/tests/client/test_deprecated_crypto.py +++ /dev/null @@ -1,94 +0,0 @@ -import json -import pytest - -from pytest import inlineCallbacks -from six.moves.urllib.parse import urljoin -from uuid import uuid4 - -from leap.soledad.client import crypto as old_crypto -from leap.soledad.common.couch import CouchDatabase -from leap.soledad.common import crypto as common_crypto - -from test_soledad.u1db_tests import simple_doc -from test_soledad.util import SoledadWithCouchServerMixin -from test_soledad.util import make_token_soledad_app -from test_soledad.u1db_tests import TestCaseWithServer - - -def deprecate_client_crypto(client): - secret = client._crypto.secret - _crypto = old_crypto.SoledadCrypto(secret) - setattr(client._dbsyncer, '_crypto', _crypto) - return client - - -def couch_database(couch_url, uuid): - db = CouchDatabase(couch_url, "user-%s" % (uuid,)) - return db - - -class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer): - - def setUp(self): - SoledadWithCouchServerMixin.setUp(self) - TestCaseWithServer.setUp(self) - - def tearDown(self): - SoledadWithCouchServerMixin.tearDown(self) - TestCaseWithServer.tearDown(self) - - @staticmethod - def make_app_with_state(state): - return make_token_soledad_app(state) - - @pytest.mark.needs_couch - @inlineCallbacks - def test_touch_updates_remote_representation(self): - self.startTwistedServer() - user = 'user-' + uuid4().hex - server_url = 'http://%s:%d' % (self.server_address) - client = self._soledad_instance(user=user, server_url=server_url) - deprecated_client = deprecate_client_crypto( - self._soledad_instance(user=user, server_url=server_url)) - - self.make_app() - remote = self.request_state._create_database(replica_uid=client.uuid) - remote = CouchDatabase.open_database( - urljoin(self.couch_url, 'user-' + user), - create=True) - - # ensure remote db is empty - gen, docs = remote.get_all_docs() - assert gen == 0 - assert len(docs) == 0 - - # create a doc with deprecated client and sync - yield deprecated_client.create_doc(json.loads(simple_doc)) - yield deprecated_client.sync() - - # check for doc in remote db - gen, docs = remote.get_all_docs() - assert gen == 1 - assert len(docs) == 1 - doc = docs.pop() - content = doc.content - assert common_crypto.ENC_JSON_KEY in content - assert common_crypto.ENC_SCHEME_KEY in content - assert common_crypto.ENC_METHOD_KEY in content - assert common_crypto.ENC_IV_KEY in content - assert common_crypto.MAC_KEY in content - assert common_crypto.MAC_METHOD_KEY in content - - # "touch" the document with a newer client and synx - _, docs = yield client.get_all_docs() - yield client.put_doc(doc) - yield client.sync() - - # check for newer representation of doc in remote db - gen, docs = remote.get_all_docs() - assert gen == 2 - assert len(docs) == 1 - doc = docs.pop() - content = doc.content - assert len(content) == 1 - assert 'raw' in content diff --git a/testing/tests/client/test_doc.py b/testing/tests/client/test_doc.py deleted file mode 100644 index 36479e90..00000000 --- a/testing/tests/client/test_doc.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- -# test_soledad_doc.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Test Leap backend bits: soledad docs -""" -import pytest - -from testscenarios import TestWithScenarios - -from test_soledad.u1db_tests import test_document -from test_soledad.util import BaseSoledadTest -from test_soledad.util import make_soledad_document_for_test - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_document`. -# ----------------------------------------------------------------------------- - -@pytest.mark.usefixtures('method_tmpdir') -class TestSoledadDocument( - TestWithScenarios, - test_document.TestDocument, BaseSoledadTest): - - scenarios = ([( - 'leap', { - 'make_document_for_test': make_soledad_document_for_test})]) - - -@pytest.mark.usefixtures('method_tmpdir') -class TestSoledadPyDocument( - TestWithScenarios, - test_document.TestPyDocument, BaseSoledadTest): - - scenarios = ([( - 'leap', { - 'make_document_for_test': make_soledad_document_for_test})]) diff --git a/testing/tests/client/test_http.py b/testing/tests/client/test_http.py deleted file mode 100644 index 47df4b4a..00000000 --- a/testing/tests/client/test_http.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- -# test_http.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Test Leap backend bits: test http database -""" - -from twisted.trial import unittest - -from leap.soledad.client import auth -from leap.soledad.common.l2db.remote import http_database - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_http_database`. -# ----------------------------------------------------------------------------- - -class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth): - - """ - Wraps our token auth implementation. - """ - - def set_token_credentials(self, uuid, token): - auth.TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _sign_request(self, method, url_query, params): - return auth.TokenBasedAuth._sign_request( - self, method, url_query, params) - - -class TestHTTPDatabaseWithCreds(unittest.TestCase): - - def test_get_sync_target_inherits_token_credentials(self): - # this test was from TestDatabaseSimpleOperations but we put it here - # for convenience. - self.db = _HTTPDatabase('dbase') - self.db.set_token_credentials('user-uuid', 'auth-token') - st = self.db.get_sync_target() - self.assertEqual(self.db._creds, st._creds) - - def test_ctr_with_creds(self): - db1 = _HTTPDatabase('http://dbs/db', creds={'token': { - 'uuid': 'user-uuid', - 'token': 'auth-token', - }}) - self.assertIn('token', db1._creds) diff --git a/testing/tests/client/test_https.py b/testing/tests/client/test_https.py deleted file mode 100644 index 1b6caed6..00000000 --- a/testing/tests/client/test_https.py +++ /dev/null @@ -1,137 +0,0 @@ -# -*- coding: utf-8 -*- -# test_sync_target.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Test Leap backend bits: https -""" -import pytest - -from testscenarios import TestWithScenarios - -from leap.soledad import client - -from leap.soledad.common.l2db.remote import http_client -from test_soledad.u1db_tests import test_backends -from test_soledad.u1db_tests import test_https -from test_soledad.util import ( - BaseSoledadTest, - make_soledad_document_for_test, - make_soledad_app, - make_token_soledad_app, -) - - -LEAP_SCENARIOS = [ - ('http', { - 'make_database_for_test': test_backends.make_http_database_for_test, - 'copy_database_for_test': test_backends.copy_http_database_for_test, - 'make_document_for_test': make_soledad_document_for_test, - 'make_app_with_state': make_soledad_app}), -] - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_https`. -# ----------------------------------------------------------------------------- - -def token_leap_https_sync_target(test, host, path, cert_file=None): - _, port = test.server.server_address - # source_replica_uid = test._soledad._dbpool.replica_uid - creds = {'token': {'uuid': 'user-uuid', 'token': 'auth-token'}} - if not cert_file: - cert_file = test.cacert_pem - st = client.http_target.SoledadHTTPSyncTarget( - 'https://%s:%d/%s' % (host, port, path), - source_replica_uid='other-id', - creds=creds, - crypto=test._soledad._crypto, - cert_file=cert_file) - return st - - -@pytest.mark.skip -class TestSoledadHTTPSyncTargetHttpsSupport( - TestWithScenarios, - # test_https.TestHttpSyncTargetHttpsSupport, - BaseSoledadTest): - - scenarios = [ - ('token_soledad_https', - { - # 'server_def': test_https.https_server_def, - 'make_app_with_state': make_token_soledad_app, - 'make_document_for_test': make_soledad_document_for_test, - 'sync_target': token_leap_https_sync_target}), - ] - - def setUp(self): - # the parent constructor undoes our SSL monkey patch to ensure tests - # run smoothly with standard u1db. - test_https.TestHttpSyncTargetHttpsSupport.setUp(self) - # so here monkey patch again to test our functionality. - api = client.api - http_client._VerifiedHTTPSConnection = api.VerifiedHTTPSConnection - client.api.SOLEDAD_CERT = http_client.CA_CERTS - - def test_cannot_verify_cert(self): - self.startServer() - # don't print expected traceback server-side - self.server.handle_error = lambda req, cli_addr: None - self.request_state._create_database('test') - remote_target = self.getSyncTarget( - 'localhost', 'test', cert_file=http_client.CA_CERTS) - d = remote_target.record_sync_info('other-id', 2, 'T-id') - - def _assert_raises(result): - from twisted.python.failure import Failure - if isinstance(result, Failure): - from OpenSSL.SSL import Error - error = result.value.message[0].value - if isinstance(error, Error): - msg = error.message[0][2] - self.assertEqual("certificate verify failed", msg) - return - self.fail("certificate verification should have failed.") - - d.addCallbacks(_assert_raises, _assert_raises) - return d - - def test_working(self): - """ - Test that SSL connections work well. - - This test was adapted to patch Soledad's HTTPS connection custom class - with the intended CA certificates. - """ - self.startServer() - db = self.request_state._create_database('test') - remote_target = self.getSyncTarget('localhost', 'test') - d = remote_target.record_sync_info('other-id', 2, 'T-id') - d.addCallback(lambda _: - self.assertEqual( - (2, 'T-id'), - db._get_replica_gen_and_trans_id('other-id') - )) - d.addCallback(lambda _: remote_target.close()) - return d - - def test_host_mismatch(self): - """ - This test is disabled because soledad's twisted-based http agent uses - pyOpenSSL, which will complain if we try to use an IP to connect to - the remote host (see the original test in u1db_tests/test_https.py). - """ - pass diff --git a/testing/tests/client/test_incoming_processing_flow.py b/testing/tests/client/test_incoming_processing_flow.py deleted file mode 100644 index 7bc1e3c6..00000000 --- a/testing/tests/client/test_incoming_processing_flow.py +++ /dev/null @@ -1,197 +0,0 @@ -# -*- coding: utf-8 -*- -# test_incoming_processing_flow.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Unit tests for incoming box processing flow. -""" -from mock import Mock, call -from leap.soledad.client import interfaces -from leap.soledad.client.incoming import IncomingBoxProcessingLoop -from twisted.internet import defer -from twisted.trial import unittest -from zope.interface import implementer - - -@implementer(interfaces.IIncomingBoxConsumer) -class GoodConsumer(object): - def __init__(self): - self.name = 'GoodConsumer' - self.processed, self.saved = [], [] - - def process(self, item, item_id, encrypted=True): - self.processed.append(item_id) - return defer.succeed([item_id]) - - def save(self, parts, item_id): - self.saved.append(item_id) - return defer.succeed(None) - - -class ProcessingFailedConsumer(GoodConsumer): - def __init__(self): - self.name = 'ProcessingFailedConsumer' - self.processed, self.saved = [], [] - - def process(self, item, item_id, encrypted=True): - return defer.fail() - - -class SavingFailedConsumer(GoodConsumer): - def __init__(self): - self.name = 'SavingFailedConsumer' - self.processed, self.saved = [], [] - - def save(self, parts, item_id): - return defer.fail() - - -class IncomingBoxProcessingTestCase(unittest.TestCase): - - def setUp(self): - self.box = Mock() - self.loop = IncomingBoxProcessingLoop(self.box) - - def _set_pending_items(self, pending): - self.box.list_pending.return_value = defer.succeed(pending) - pending_iter = iter([defer.succeed(item) for item in pending]) - self.box.fetch_for_processing.side_effect = pending_iter - - @defer.inlineCallbacks - def test_processing_flow_reserves_a_message(self): - self._set_pending_items(['one_item']) - self.loop.add_consumer(GoodConsumer()) - yield self.loop() - self.box.fetch_for_processing.assert_called_once_with('one_item') - - @defer.inlineCallbacks - def test_no_consumers(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - yield self.loop() - self.box.fetch_for_processing.assert_not_called() - self.box.delete.assert_not_called() - - @defer.inlineCallbacks - def test_pending_list_with_multiple_items(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = GoodConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - calls = [call('one'), call('two'), call('three')] - self.box.fetch_for_processing.assert_has_calls(calls) - - @defer.inlineCallbacks - def test_good_consumer_process_all(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = GoodConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.assertEquals(items, consumer.processed) - - @defer.inlineCallbacks - def test_good_consumer_saves_all(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = GoodConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.assertEquals(items, consumer.saved) - - @defer.inlineCallbacks - def test_multiple_good_consumers_process_all(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = GoodConsumer() - consumer2 = GoodConsumer() - self.loop.add_consumer(consumer) - self.loop.add_consumer(consumer2) - yield self.loop() - self.assertEquals(items, consumer.processed) - self.assertEquals(items, consumer2.processed) - - @defer.inlineCallbacks - def test_good_consumer_marks_as_processed(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = GoodConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.set_processed.assert_has_calls([call(x) for x in items]) - - @defer.inlineCallbacks - def test_good_consumer_deletes_items(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = GoodConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.delete.assert_has_calls([call(x) for x in items]) - - @defer.inlineCallbacks - def test_processing_failed_doesnt_mark_as_processed(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = ProcessingFailedConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.set_processed.assert_not_called() - - @defer.inlineCallbacks - def test_processing_failed_doesnt_delete(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = ProcessingFailedConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.delete.assert_not_called() - - @defer.inlineCallbacks - def test_processing_failed_marks_as_failed(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = ProcessingFailedConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.set_failed.assert_has_calls([call(x) for x in items]) - - @defer.inlineCallbacks - def test_saving_failed_marks_as_processed(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = SavingFailedConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.set_processed.assert_has_calls([call(x) for x in items]) - - @defer.inlineCallbacks - def test_saving_failed_doesnt_delete(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = SavingFailedConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.delete.assert_not_called() - - @defer.inlineCallbacks - def test_saving_failed_marks_as_failed(self): - items = ['one', 'two', 'three'] - self._set_pending_items(items) - consumer = SavingFailedConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.box.set_failed.assert_has_calls([call(x) for x in items]) diff --git a/testing/tests/client/test_recovery_code.py b/testing/tests/client/test_recovery_code.py deleted file mode 100644 index 7bbccc41..00000000 --- a/testing/tests/client/test_recovery_code.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- CODing: utf-8 -*- -# test_recovery_code.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for recovery code generation. -""" -import binascii - -from mock import patch -from twisted.trial import unittest -from leap.soledad.client._recovery_code import RecoveryCode - - -class RecoveryCodeTestCase(unittest.TestCase): - - @patch('leap.soledad.client._recovery_code.os.urandom') - def test_generate_recovery_code(self, mock_os_urandom): - generated_random_code = '123456' - mock_os_urandom.return_value = generated_random_code - recovery_code = RecoveryCode() - - code = recovery_code.generate() - - mock_os_urandom.assert_called_with(RecoveryCode.code_length) - self.assertEqual(binascii.hexlify(generated_random_code), code) diff --git a/testing/tests/client/test_secrets.py b/testing/tests/client/test_secrets.py deleted file mode 100644 index 7b643cb4..00000000 --- a/testing/tests/client/test_secrets.py +++ /dev/null @@ -1,166 +0,0 @@ -# -*- CODing: utf-8 -*- -# test_secrets.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for secrets encryption and decryption. -""" -import scrypt - -from twisted.trial import unittest - -from leap.soledad.client._crypto import ENC_METHOD -from leap.soledad.client._secrets import SecretsCrypto - - -class SecretsCryptoTestCase(unittest.TestCase): - - SECRETS = { - 'remote_secret': 'a' * 512, - 'local_salt': 'b' * 64, - 'local_secret': 'c' * 448 - } - ENCRYPTED_V2 = { - 'cipher': 2, - 'length': 1437, - 'kdf_salt': 'TSgNLeAGFeITeSgNzmYZHh+mzmkZPOqao7CAV/tx3KZCLwsrT0HmWtVK3' - 'TyWHWNgVdeamMZYRuvZavE2sp0DGw==\n', - 'iv': 'TKZQKIlRgdnXFhJf08qswg==', - 'secrets': 'ZNZRi72VDtwZqyuU+uf3yzZt23vCtMS3Ki2bnZyeHUOSGVweJeDadF4oqE' - 'BW87NN00j9E49BzyzLr9SNgwZjPp0wlUm7kt+s8EUfJUdH8nxaQ+9iqGXM' - 'cCHmBM8L8DRN2m3BrPGx7m+QGlN9sbrRpl7fqc46RWcYuTEpm4upjdtI7O' - 'jDd0JG3C0rUzIuKJn9w4rEpX3tLEKXVdZfLvRXS5roR0cauazsDO69E13q' - 'a01vDuY+UJ+buLQ3FluPnnk8QE7ztPVUmRJJ76yAIhjVX9owiwlp9GnUJY' - 'sETRCqdRSTwUcHIkzVR0zAvtxTX7eGTitzf4gCYEC4T9v5N/jHxEfPdx28' - 'MM4KShWN2nFxNFQLQUpMN2OrM7UyUw+DQ3ydqBeBPKPHRN5s05kIK7P/Ra' - 'aLNcrJWa7DopLbgLlei0Jd7S4sjv1ufaRY7v0qJaVkhh/VaCylTSVw1rv5' - 'YzSWcHHcLuC0R8xLadz6T+EpsVYxgPYCS7w5xoE82zwNQzw/EBxLIcyLPl' - 'ipKnr2dttrmm3KXUOT1IdbSbI5elF6yQTAusdqiXuypey+MDqHYWEYWkCn' - 'e9/uGM9FjklDLE0RtPEDxhq64tw6u2Xu7RzDzyQDI8EIoTdU+4zEMTnelZ' - 'fKEwdG58EDxTXfUk6IDcRUupz3YuToSMhIOkqgXnbWl/nrK0O9v4JMhQjI' - 'r+oPICYfFr14kvJXBsfntILTJCxzbqTQcNba3jc8rGqCZ6gM0u4PndwTG2' - 'UiCqPU2HMnWvVGQOXeLdQY+EqqXQiRDi0DrDmkVwFf+27dPXxmZ43C48W3' - 'lMhTKXl0rdBFnOD5jiMh0X6q/KYXonyEtMZMsjT7dFePcCy4wQRhuut+ac' - '/TJWyrr+/IB45E+LZbhV7xCy1dYsbdb52jTRJFpaQ83sj6Iv6SYdiqqXzL' - 'F5JGMyuovTjwAoIIQzpLv36xY2wGGAH1V8c7QmDR2qumXrHD9R68WjBoSY' - '7IFM0TFAGZNun56y/zQ4r8yOMSAId+j4kuRH0fENEi0FJ+SpmSdHfpvBhE' - 'MdGh927E9enEYWmUQMmkxXIw6E+O3cmOWt2hsMbUAikDCpQOnVP2BD55HT' - '6FfbW7ITVwPINHYexmy2Xcm8H5zzGFSp+uYIyPBYDKA+VJ+QQI8bud9K+T' - 'NBybUv9u6LbB6BsLpwLoxMPJu0WsN2HpmLYgrg2ML1huMF1OtaGRuUr2PL' - 'NBaZaL6VOztYrVtQG1+tNyRxn8XQTtx0l6n+EihGVe9Sk5XF6DJA9ZN7uO' - 'svTUFJ5qG3Erf4AmbUJWoOR/NvapBtifidM7gPZZ6NqBs6v72rU1pGy+p7' - 'o84KrmB2MNf3yJ0BvKxPvFmltF3Dc7LB5TN8ycbmFM6hgrLvvhPxiHEnG/' - '8Qcrg0nUXOipFGNgZEU7t7Mz6RJ189Z2Kx1HVGrkAzEgqwZYqijAPlsgzO' - 'bg6DwzwC7stolQWGCDQUtJVlE8FZ/Up8zFYYZKn52WzjmSN4/hHhEvdkck' - 'Nez/JVev6fMcVrgdrTZ+uCwxjN/4xPdgog2HV470ea1bvIkSNOOrhm194M' - '40GmvmBhiSSMjdRQCQtM0t9bUuSQLPDzEiCA9QaLyygtlz9uRR/dXgkEg0' - 'J4YNpZvhE0wbyp4GHytbPaAmrcd7im9+buTuMwhXpZl0stmfkJxVHJSZ8Y' - 'IHakHs3W1fdYyI3wxGpel/9eYO3ISukolwrHXESP65wVNKfBwbqVJzQmts' - 'pyDBOI6DcLKZfE1EVg0+uwQ/5PKZbn0TwlXO1YE3NL3mAply3zQR9hyBrY' - '6f1jkHVD3irIlWkSiPJsP8sW+nrK8c/Ha8F+dua6DTZmg594OIaQj8mPiY' - 'GcIusiARWocR5/MmSjupGOgFx4HtmckTJtAta3XP4elOx04teH/P9Cgr1x' - 'XYf+cEX6gp92L9rTo0FCz3Hw==\n', - 'version': 2, - 'kdf': 'scrypt', - 'kdf_length': 32 - } - - ENCRYPTED_V1 = { - 'version': 1, - 'active_secret': 'secret_id', - 'storage_secrets': { - 'secret_id': { - 'kdf': 'scrypt', - 'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h' - 'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F' - 'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh' - 'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp' - 'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR' - 'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050' - 'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc' - 'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6' - 'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9' - 'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC' - 'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF' - 'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa' - 'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw' - 'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC' - 'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu' - 'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/' - 'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS' - 'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v' - 'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj' - 'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV' - 'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk' - 'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV' - 'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8' - 'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ' - '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx' - 'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4' - 'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw' - '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==', - 'kdf_length': 32, - 'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB' - 'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n', - 'length': 1024, - 'cipher': 'aes256', - } - } - } - - def setUp(self): - class Soledad(object): - passphrase = '123' - soledad = Soledad() - self._crypto = SecretsCrypto(soledad) - - def test__get_key(self): - salt = 'abc' - expected = scrypt.hash('123', salt, buflen=32) - key = self._crypto._get_key(salt) - self.assertEqual(expected, key) - - def test_encrypt(self): - info = self._crypto.encrypt(self.SECRETS) - self.assertEqual(8, len(info)) - for key, value in [ - ('kdf', 'scrypt'), - ('kdf_salt', None), - ('kdf_length', None), - ('cipher', ENC_METHOD.aes_256_gcm), - ('length', None), - ('iv', None), - ('secrets', None), - ('version', 2)]: - self.assertTrue(key in info) - if value: - self.assertEqual(info[key], value) - - def test__decrypt_v2(self): - encrypted = self.ENCRYPTED_V2 - decrypted = self._crypto.decrypt(encrypted) - self.assertEqual(decrypted, self.SECRETS) - - def test__decrypt_v1(self): - encrypted = self.ENCRYPTED_V1 - decrypted = self._crypto.decrypt(encrypted) - self.assertEqual(decrypted, self.SECRETS) - - def test__no_version_defaults_to_v1(self): - encrypted = dict(self.ENCRYPTED_V1) - del encrypted['version'] - decrypted = self._crypto.decrypt(encrypted) - self.assertEqual(decrypted, self.SECRETS) - self.assertEqual(encrypted['version'], 1) diff --git a/testing/tests/client/test_shared_db.py b/testing/tests/client/test_shared_db.py deleted file mode 100644 index b045e524..00000000 --- a/testing/tests/client/test_shared_db.py +++ /dev/null @@ -1,40 +0,0 @@ -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.shared_db import SoledadSharedDatabase - -from test_soledad.util import BaseSoledadTest - - -class SoledadSharedDBTestCase(BaseSoledadTest): - - """ - These tests ensure the functionalities of the shared recovery database. - """ - - def setUp(self): - BaseSoledadTest.setUp(self) - self._shared_db = SoledadSharedDatabase( - 'https://provider/', document_factory=SoledadDocument, - creds=None) - - def tearDown(self): - BaseSoledadTest.tearDown(self) - - def test__get_remote_doc(self): - """ - Ensure the shared db is queried with the correct doc_id. - """ - doc_id = self._soledad.secrets.storage._remote_doc_id() - self._soledad.secrets.storage._get_remote_doc() - self._soledad.secrets.storage._shared_db.get_doc.assert_called_with( - doc_id) - - def test_save_remote(self): - """ - Ensure recovery document is put into shared recover db. - """ - doc_id = self._soledad.secrets.storage._remote_doc_id() - storage = self._soledad.secrets.storage - storage.save_remote({'content': 'blah'}) - storage._shared_db.get_doc.assert_called_with(doc_id) - storage._shared_db.put_doc.assert_called_with(self._doc_put) - self.assertTrue(self._doc_put.doc_id == doc_id) diff --git a/testing/tests/client/test_signals.py b/testing/tests/client/test_signals.py deleted file mode 100644 index c7609a74..00000000 --- a/testing/tests/client/test_signals.py +++ /dev/null @@ -1,149 +0,0 @@ -from mock import Mock -from twisted.internet import defer - -from leap import soledad -from leap.common.events import catalog -from leap.soledad.common.document import SoledadDocument - -from test_soledad.util import ADDRESS -from test_soledad.util import BaseSoledadTest - - -class SoledadSignalingTestCase(BaseSoledadTest): - - """ - These tests ensure signals are correctly emmited by Soledad. - """ - - EVENTS_SERVER_PORT = 8090 - - def setUp(self): - # mock signaling - soledad.client.signal = Mock() - soledad.client._secrets.util.events.emit_async = Mock() - # run parent's setUp - BaseSoledadTest.setUp(self) - - def tearDown(self): - BaseSoledadTest.tearDown(self) - - def _pop_mock_call(self, mocked): - mocked.call_args_list.pop() - mocked.mock_calls.pop() - mocked.call_args = mocked.call_args_list[-1] - - def test_stage3_bootstrap_signals(self): - """ - Test that a fresh soledad emits all bootstrap signals. - - Signals are: - - downloading keys / done downloading keys. - - creating keys / done creating keys. - - downloading keys / done downloading keys. - - uploading keys / done uploading keys. - """ - soledad.client._secrets.util.events.emit_async.reset_mock() - # get a fresh instance so it emits all bootstrap signals - sol = self._soledad_instance( - secrets_path='alternative_stage3.json', - local_db_path='alternative_stage3.u1db') - # reverse call order so we can verify in the order the signals were - # expected - soledad.client._secrets.util.events.emit_async.mock_calls.reverse() - soledad.client._secrets.util.events.emit_async.call_args = \ - soledad.client._secrets.util.events.emit_async.call_args_list[0] - soledad.client._secrets.util.events.emit_async.call_args_list.reverse() - - user_data = {'userid': ADDRESS, 'uuid': ADDRESS} - - def _assert(*args, **kwargs): - mocked = soledad.client._secrets.util.events.emit_async - mocked.assert_called_with(*args) - pop = kwargs.get('pop') - if pop or pop is None: - self._pop_mock_call(mocked) - - _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) - _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) - _assert(catalog.SOLEDAD_CREATING_KEYS, user_data) - _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data) - _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data) - _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) - _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) - _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False) - - sol.close() - - def test_stage2_bootstrap_signals(self): - """ - Test that if there are keys in server, soledad will download them and - emit corresponding signals. - """ - # get existing instance so we have access to keys - sol = self._soledad_instance() - # create a document with secrets - doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id()) - doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets) - sol.close() - # reset mock - soledad.client._secrets.util.events.emit_async.reset_mock() - # get a fresh instance so it emits all bootstrap signals - shared_db = self.get_default_shared_mock(get_doc_return_value=doc) - sol = self._soledad_instance( - secrets_path='alternative_stage2.json', - local_db_path='alternative_stage2.u1db', - shared_db_class=shared_db) - # reverse call order so we can verify in the order the signals were - # expected - mocked = soledad.client._secrets.util.events.emit_async - mocked.mock_calls.reverse() - mocked.call_args = mocked.call_args_list[0] - mocked.call_args_list.reverse() - - def _assert(*args, **kwargs): - mocked = soledad.client._secrets.util.events.emit_async - mocked.assert_called_with(*args) - pop = kwargs.get('pop') - if pop or pop is None: - self._pop_mock_call(mocked) - - # assert download keys signals - user_data = {'userid': ADDRESS, 'uuid': ADDRESS} - _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) - _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False) - - sol.close() - - def test_stage1_bootstrap_signals(self): - """ - Test that if soledad already has a local secret, it emits no signals. - """ - soledad.client.signal.reset_mock() - # get an existent instance so it emits only some of bootstrap signals - sol = self._soledad_instance() - self.assertEqual([], soledad.client.signal.mock_calls) - sol.close() - - @defer.inlineCallbacks - def test_sync_signals(self): - """ - Test Soledad emits SOLEDAD_CREATING_KEYS signal. - """ - # get a fresh instance so it emits all bootstrap signals - sol = self._soledad_instance() - soledad.client.signal.reset_mock() - - # mock the actual db sync so soledad does not try to connect to the - # server - d = defer.Deferred() - d.callback(None) - sol._dbsyncer.sync = Mock(return_value=d) - - yield sol.sync() - - # assert the signal has been emitted - soledad.client.events.emit_async.assert_called_with( - catalog.SOLEDAD_DONE_DATA_SYNC, - {'userid': ADDRESS, 'uuid': ADDRESS}, - ) - sol.close() diff --git a/testing/tests/client/test_soledad_doc.py b/testing/tests/client/test_soledad_doc.py deleted file mode 100644 index e158d768..00000000 --- a/testing/tests/client/test_soledad_doc.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -# test_soledad_doc.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Test Leap backend bits: soledad docs -""" -from testscenarios import TestWithScenarios - -from test_soledad.u1db_tests import test_document -from test_soledad.util import BaseSoledadTest -from test_soledad.util import make_soledad_document_for_test - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_document`. -# ----------------------------------------------------------------------------- - -class TestSoledadDocument( - TestWithScenarios, - test_document.TestDocument, BaseSoledadTest): - - scenarios = ([( - 'leap', { - 'make_document_for_test': make_soledad_document_for_test})]) - - -class TestSoledadPyDocument( - TestWithScenarios, - test_document.TestPyDocument, BaseSoledadTest): - - scenarios = ([( - 'leap', { - 'make_document_for_test': make_soledad_document_for_test})]) -- cgit v1.2.3