summaryrefslogtreecommitdiff
path: root/testing/tests/client
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-09-17 12:08:25 -0300
committerdrebs <drebs@riseup.net>2017-09-17 15:50:55 -0300
commitcfff46ff9becdbe5cf48816870e625ed253ecc57 (patch)
tree8d239e4499f559d86ed17ea3632008303b25d485 /testing/tests/client
parentf29abe28bd778838626d12fcabe3980a8ce4fa8c (diff)
[refactor] move tests to root of repository
Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952
Diffstat (limited to 'testing/tests/client')
-rw-r--r--testing/tests/client/__init__.py0
-rw-r--r--testing/tests/client/test_api.py36
-rw-r--r--testing/tests/client/test_app.py52
-rw-r--r--testing/tests/client/test_attachments.py81
-rw-r--r--testing/tests/client/test_aux_methods.py132
-rw-r--r--testing/tests/client/test_crypto.py384
-rw-r--r--testing/tests/client/test_deprecated_crypto.py94
-rw-r--r--testing/tests/client/test_doc.py50
-rw-r--r--testing/tests/client/test_http.py60
-rw-r--r--testing/tests/client/test_https.py137
-rw-r--r--testing/tests/client/test_incoming_processing_flow.py197
-rw-r--r--testing/tests/client/test_recovery_code.py38
-rw-r--r--testing/tests/client/test_secrets.py166
-rw-r--r--testing/tests/client/test_shared_db.py40
-rw-r--r--testing/tests/client/test_signals.py149
-rw-r--r--testing/tests/client/test_soledad_doc.py46
16 files changed, 0 insertions, 1662 deletions
diff --git a/testing/tests/client/__init__.py b/testing/tests/client/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/testing/tests/client/__init__.py
+++ /dev/null
diff --git a/testing/tests/client/test_api.py b/testing/tests/client/test_api.py
deleted file mode 100644
index 3c6a8155..00000000
--- a/testing/tests/client/test_api.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_api.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for soledad api.
-"""
-
-from mock import MagicMock
-
-from test_soledad.util import BaseSoledadTest
-
-
-class ApiTestCase(BaseSoledadTest):
-
- def test_recovery_code_creation(self):
- recovery_code_mock = MagicMock()
- generated_code = '4645a2f8997e5d0d'
- recovery_code_mock.generate.return_value = generated_code
- self._soledad._recovery_code = recovery_code_mock
-
- code = self._soledad.create_recovery_code()
-
- self.assertEqual(generated_code, code)
diff --git a/testing/tests/client/test_app.py b/testing/tests/client/test_app.py
deleted file mode 100644
index 6867473e..00000000
--- a/testing/tests/client/test_app.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_soledad_app.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test ObjectStore and Couch backend bits.
-"""
-import pytest
-
-from testscenarios import TestWithScenarios
-
-from test_soledad.util import BaseSoledadTest
-from test_soledad.util import make_soledad_document_for_test
-from test_soledad.util import make_token_soledad_app
-from test_soledad.util import make_token_http_database_for_test
-from test_soledad.util import copy_token_http_database_for_test
-from test_soledad.u1db_tests import test_backends
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_backends`.
-# -----------------------------------------------------------------------------
-
-@pytest.mark.usefixtures('method_tmpdir')
-class SoledadTests(
- TestWithScenarios, test_backends.AllDatabaseTests, BaseSoledadTest):
-
- def setUp(self):
- TestWithScenarios.setUp(self)
- test_backends.AllDatabaseTests.setUp(self)
- BaseSoledadTest.setUp(self)
-
- scenarios = [
- ('token_http', {
- 'make_database_for_test': make_token_http_database_for_test,
- 'copy_database_for_test': copy_token_http_database_for_test,
- 'make_document_for_test': make_soledad_document_for_test,
- 'make_app_with_state': make_token_soledad_app,
- })
- ]
diff --git a/testing/tests/client/test_attachments.py b/testing/tests/client/test_attachments.py
deleted file mode 100644
index 2df5b90d..00000000
--- a/testing/tests/client/test_attachments.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_attachments.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for document attachments.
-"""
-
-import pytest
-
-from io import BytesIO
-from mock import Mock
-
-from twisted.internet import defer
-from test_soledad.util import BaseSoledadTest
-
-
-from leap.soledad.client import AttachmentStates
-
-
-def mock_response(doc):
- doc._manager._client.get = Mock(
- return_value=defer.succeed(Mock(code=200, json=lambda: [])))
- doc._manager._client.put = Mock(
- return_value=defer.succeed(Mock(code=200)))
-
-
-@pytest.mark.usefixture('method_tmpdir')
-class AttachmentTests(BaseSoledadTest):
-
- @defer.inlineCallbacks
- def test_create_doc_saves_store(self):
- doc = yield self._soledad.create_doc({})
- self.assertEqual(self._soledad, doc.store)
-
- @defer.inlineCallbacks
- def test_put_attachment(self):
- doc = yield self._soledad.create_doc({})
- mock_response(doc)
- yield doc.put_attachment(BytesIO('test'))
- local_list = yield doc._manager.local_list()
- self.assertIn(doc._blob_id, local_list)
-
- @defer.inlineCallbacks
- def test_get_attachment(self):
- doc = yield self._soledad.create_doc({})
- mock_response(doc)
- yield doc.put_attachment(BytesIO('test'))
- fd = yield doc.get_attachment()
- self.assertEqual('test', fd.read())
-
- @defer.inlineCallbacks
- def test_get_attachment_state(self):
- doc = yield self._soledad.create_doc({})
- state = yield doc.get_attachment_state()
- self.assertEqual(AttachmentStates.NONE, state)
- mock_response(doc)
- yield doc.put_attachment(BytesIO('test'))
- state = yield doc.get_attachment_state()
- self.assertEqual(AttachmentStates.LOCAL, state)
-
- @defer.inlineCallbacks
- def test_is_dirty(self):
- doc = yield self._soledad.create_doc({})
- dirty = yield doc.is_dirty()
- self.assertFalse(dirty)
- doc.content = {'test': True}
- dirty = yield doc.is_dirty()
- self.assertTrue(dirty)
diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py
deleted file mode 100644
index 1eb676c7..00000000
--- a/testing/tests/client/test_aux_methods.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_soledad.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for general Soledad functionality.
-"""
-import os
-
-from pytest import inlineCallbacks
-
-from leap.soledad.client import Soledad
-from leap.soledad.client._db.adbapi import U1DBConnectionPool
-from leap.soledad.client._secrets.util import SecretsError
-
-from test_soledad.util import BaseSoledadTest
-
-
-class AuxMethodsTestCase(BaseSoledadTest):
-
- def test__init_dirs(self):
- sol = self._soledad_instance(prefix='_init_dirs')
- local_db_dir = os.path.dirname(sol.local_db_path)
- secrets_path = os.path.dirname(sol.secrets_path)
- self.assertTrue(os.path.isdir(local_db_dir))
- self.assertTrue(os.path.isdir(secrets_path))
-
- def _close_soledad(results):
- sol.close()
-
- d = sol.create_doc({})
- d.addCallback(_close_soledad)
- return d
-
- def test__init_u1db_sqlcipher_backend(self):
- sol = self._soledad_instance(prefix='_init_db')
- self.assertIsInstance(sol._dbpool, U1DBConnectionPool)
- self.assertTrue(os.path.isfile(sol.local_db_path))
- sol.close()
-
- def test__init_config_with_defaults(self):
- """
- Test if configuration defaults point to the correct place.
- """
-
- class SoledadMock(Soledad):
-
- def __init__(self):
- pass
-
- # instantiate without initializing so we just test
- # _init_config_with_defaults()
- sol = SoledadMock()
- sol.passphrase = u''
- sol.server_url = ''
- sol._init_config_with_defaults()
- # assert value of local_db_path
- self.assertEquals(
- os.path.join(sol.default_prefix, 'soledad.u1db'),
- sol.local_db_path)
-
- def test__init_config_from_params(self):
- """
- Test if configuration is correctly read from file.
- """
- sol = self._soledad_instance(
- 'leap@leap.se',
- passphrase=u'123',
- secrets_path='value_3',
- local_db_path='value_2',
- server_url='value_1',
- cert_file=None)
- self.assertEqual(
- os.path.join(self.tempdir, 'value_3'),
- sol.secrets_path)
- self.assertEqual(
- os.path.join(self.tempdir, 'value_2'),
- sol.local_db_path)
- self.assertEqual('value_1', sol.server_url)
- sol.close()
-
- @inlineCallbacks
- def test_change_passphrase(self):
- """
- Test if passphrase can be changed.
- """
- prefix = '_change_passphrase'
- sol = self._soledad_instance(
- 'leap@leap.se',
- passphrase=u'123',
- prefix=prefix,
- )
-
- doc1 = yield sol.create_doc({'simple': 'doc'})
- sol.change_passphrase(u'654321')
- sol.close()
-
- with self.assertRaises(SecretsError):
- self._soledad_instance(
- 'leap@leap.se',
- passphrase=u'123',
- prefix=prefix)
-
- sol2 = self._soledad_instance(
- 'leap@leap.se',
- passphrase=u'654321',
- prefix=prefix)
- doc2 = yield sol2.get_doc(doc1.doc_id)
-
- self.assertEqual(doc1, doc2)
-
- sol2.close()
-
- def test_get_passphrase(self):
- """
- Assert passphrase getter works fine.
- """
- sol = self._soledad_instance()
- self.assertEqual('123', sol.passphrase)
- sol.close()
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
deleted file mode 100644
index 5b647b73..00000000
--- a/testing/tests/client/test_crypto.py
+++ /dev/null
@@ -1,384 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_crypto.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for cryptographic related stuff.
-"""
-import binascii
-import base64
-import json
-import os
-
-from io import BytesIO
-
-import pytest
-
-from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-from cryptography.hazmat.backends import default_backend
-from cryptography.exceptions import InvalidTag
-
-from leap.soledad.common.document import SoledadDocument
-from test_soledad.util import BaseSoledadTest
-from leap.soledad.client import _crypto
-from leap.soledad.client import _scrypt
-from leap.soledad.common.blobs import preamble as _preamble
-
-from twisted.trial import unittest
-from twisted.internet import defer
-
-
-snowden1 = (
- "You can't come up against "
- "the world's most powerful intelligence "
- "agencies and not accept the risk. "
- "If they want to get you, over time "
- "they will.")
-
-
-class ScryptTest(unittest.TestCase):
-
- def test_scrypt(self):
- secret = 'supersikret'
- salt = 'randomsalt'
- key = _scrypt.hash(secret, salt, buflen=32)
- expected = ('47996b569ea58d51ccbcc318d710'
- 'a537acd28bb7a94615ab8d061d4b2a920f01')
- assert binascii.b2a_hex(key) == expected
-
-
-class AESTest(unittest.TestCase):
-
- def test_chunked_encryption(self):
- key = 'A' * 32
-
- fd = BytesIO()
- aes = _crypto.AESWriter(key, _buffer=fd)
- iv = aes.iv
-
- data = snowden1
- block = 16
-
- for i in range(len(data) / block):
- chunk = data[i * block:(i + 1) * block]
- aes.write(chunk)
- aes.end()
-
- ciphertext_chunked = fd.getvalue()
- ciphertext, tag = _aes_encrypt(key, iv, data)
-
- assert ciphertext_chunked == ciphertext
-
- def test_decrypt(self):
- key = 'A' * 32
- iv = 'A' * 16
-
- data = snowden1
- block = 16
-
- ciphertext, tag = _aes_encrypt(key, iv, data)
-
- fd = BytesIO()
- aes = _crypto.AESWriter(key, iv, fd, tag=tag)
-
- for i in range(len(ciphertext) / block):
- chunk = ciphertext[i * block:(i + 1) * block]
- aes.write(chunk)
- aes.end()
-
- cleartext_chunked = fd.getvalue()
- assert cleartext_chunked == data
-
-
-class BlobTestCase(unittest.TestCase):
-
- class doc_info:
- doc_id = 'D-deadbeef'
- rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
-
- def setUp(self):
- self.inf = BytesIO(snowden1)
- self.blob = _crypto.BlobEncryptor(
- self.doc_info, self.inf,
- armor=True,
- secret='A' * 96)
-
- @defer.inlineCallbacks
- def test_unarmored_blob_encrypt(self):
- self.blob.armor = False
- encrypted = yield self.blob.encrypt()
-
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, encrypted, armor=False,
- secret='A' * 96)
- decrypted = yield decryptor.decrypt()
- assert decrypted.getvalue() == snowden1
-
- @defer.inlineCallbacks
- def test_default_armored_blob_encrypt(self):
- encrypted = yield self.blob.encrypt()
- decode = base64.urlsafe_b64decode
- assert map(decode, encrypted.getvalue().split())
-
- @defer.inlineCallbacks
- def test_blob_encryptor(self):
- encrypted = yield self.blob.encrypt()
- preamble, ciphertext = encrypted.getvalue().split()
- preamble = base64.urlsafe_b64decode(preamble)
- ciphertext = base64.urlsafe_b64decode(ciphertext)
- ciphertext = ciphertext[:-16]
-
- assert len(preamble) == _preamble.PACMAN.size
- unpacked_data = _preamble.PACMAN.unpack(preamble)
- magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data
- assert magic == _crypto.MAGIC
- assert sch == 1
- assert meth == _crypto.ENC_METHOD.aes_256_gcm
- assert iv == self.blob.iv
- assert doc_id == 'D-deadbeef'
- assert rev == self.doc_info.rev
-
- aes_key = _crypto._get_sym_key_for_doc(
- self.doc_info.doc_id, 'A' * 96)
- assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0]
-
- decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag,
- ciphertext, preamble)
- assert str(decrypted) == snowden1
-
- @defer.inlineCallbacks
- def test_init_with_preamble_alone(self):
- ciphertext = yield self.blob.encrypt()
- preamble = ciphertext.getvalue().split()[0]
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, BytesIO(preamble),
- start_stream=False,
- secret='A' * 96)
- assert decryptor._consume_preamble()
-
- @defer.inlineCallbacks
- def test_incremental_blob_decryptor(self):
- ciphertext = yield self.blob.encrypt()
- preamble, ciphertext = ciphertext.getvalue().split()
- ciphertext = base64.urlsafe_b64decode(ciphertext)
-
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, BytesIO(preamble),
- start_stream=False,
- secret='A' * 96,
- tag=ciphertext[-16:])
- ciphertext = BytesIO(ciphertext[:-16])
- chunk = ciphertext.read(10)
- while chunk:
- decryptor.write(chunk)
- chunk = ciphertext.read(10)
- decrypted = decryptor._end_stream()
- assert decrypted.getvalue() == snowden1
-
- @defer.inlineCallbacks
- def test_blob_decryptor(self):
- ciphertext = yield self.blob.encrypt()
-
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, ciphertext,
- secret='A' * 96)
- decrypted = yield decryptor.decrypt()
- assert decrypted.getvalue() == snowden1
-
- @defer.inlineCallbacks
- def test_unarmored_blob_decryptor(self):
- self.blob.armor = False
- ciphertext = yield self.blob.encrypt()
-
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, ciphertext,
- armor=False,
- secret='A' * 96)
- decrypted = yield decryptor.decrypt()
- assert decrypted.getvalue() == snowden1
-
- @defer.inlineCallbacks
- def test_encrypt_and_decrypt(self):
- """
- Check that encrypting and decrypting gives same doc.
- """
- crypto = _crypto.SoledadCrypto('A' * 96)
- payload = {'key': 'someval'}
- doc1 = SoledadDocument('id1', '1', json.dumps(payload))
-
- encrypted = yield crypto.encrypt_doc(doc1)
- assert encrypted != payload
- assert 'raw' in encrypted
- doc2 = SoledadDocument('id1', '1')
- doc2.set_json(encrypted)
- assert _crypto.is_symmetrically_encrypted(encrypted)
- decrypted = (yield crypto.decrypt_doc(doc2)).getvalue()
- assert len(decrypted) != 0
- assert json.loads(decrypted) == payload
-
- @defer.inlineCallbacks
- def test_decrypt_with_wrong_tag_raises(self):
- """
- Trying to decrypt a document with wrong MAC should raise.
- """
- crypto = _crypto.SoledadCrypto('A' * 96)
- payload = {'key': 'someval'}
- doc1 = SoledadDocument('id1', '1', json.dumps(payload))
-
- encrypted = yield crypto.encrypt_doc(doc1)
- encdict = json.loads(encrypted)
- preamble, raw = str(encdict['raw']).split()
- preamble = base64.urlsafe_b64decode(preamble)
- raw = base64.urlsafe_b64decode(raw)
- # mess with tag
- messed = raw[:-16] + '0' * 16
-
- preamble = base64.urlsafe_b64encode(preamble)
- newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed))
- doc2 = SoledadDocument('id1', '1')
- doc2.set_json(json.dumps({"raw": str(newraw)}))
-
- with pytest.raises(_crypto.InvalidBlob):
- yield crypto.decrypt_doc(doc2)
-
-
-class SoledadSecretsTestCase(BaseSoledadTest):
-
- def test_generated_secrets_have_correct_length(self):
- expected = self._soledad.secrets.lengths
- for name, length in expected.iteritems():
- secret = getattr(self._soledad.secrets, name)
- self.assertEqual(length, len(secret))
-
-
-class SoledadCryptoAESTestCase(BaseSoledadTest):
-
- def test_encrypt_decrypt_sym(self):
- # generate 256-bit key
- key = os.urandom(32)
- iv, cyphertext = _crypto.encrypt_sym('data', key)
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
- self.assertEqual('data', plaintext)
-
- def test_decrypt_with_wrong_iv_raises(self):
- key = os.urandom(32)
- iv, cyphertext = _crypto.encrypt_sym('data', key)
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- # get a different iv by changing the first byte
- rawiv = binascii.a2b_base64(iv)
- wrongiv = rawiv
- while wrongiv == rawiv:
- wrongiv = os.urandom(1) + rawiv[1:]
- with pytest.raises(InvalidTag):
- _crypto.decrypt_sym(
- cyphertext, key, iv=binascii.b2a_base64(wrongiv))
-
- def test_decrypt_with_wrong_key_raises(self):
- key = os.urandom(32)
- iv, cyphertext = _crypto.encrypt_sym('data', key)
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- wrongkey = os.urandom(32) # 256-bits key
- # ensure keys are different in case we are extremely lucky
- while wrongkey == key:
- wrongkey = os.urandom(32)
- with pytest.raises(InvalidTag):
- _crypto.decrypt_sym(cyphertext, wrongkey, iv)
-
-
-class PreambleTestCase(unittest.TestCase):
- class doc_info:
- doc_id = 'D-deadbeef'
- rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
-
- def setUp(self):
- self.cleartext = BytesIO(snowden1)
- self.blob = _crypto.BlobEncryptor(
- self.doc_info, self.cleartext,
- secret='A' * 96)
-
- def test_preamble_starts_with_magic_signature(self):
- preamble = self.blob._encode_preamble()
- assert preamble.startswith(_crypto.MAGIC)
-
- def test_preamble_has_cipher_metadata(self):
- preamble = self.blob._encode_preamble()
- unpacked = _preamble.PACMAN.unpack(preamble)
- encryption_scheme, encryption_method = unpacked[1:3]
- assert encryption_scheme in _crypto.ENC_SCHEME
- assert encryption_method in _crypto.ENC_METHOD
- assert unpacked[4] == self.blob.iv
-
- def test_preamble_has_document_sync_metadata(self):
- preamble = self.blob._encode_preamble()
- unpacked = _preamble.PACMAN.unpack(preamble)
- doc_id, doc_rev = unpacked[5:7]
- assert doc_id == self.doc_info.doc_id
- assert doc_rev == self.doc_info.rev
-
- def test_preamble_has_document_size(self):
- preamble = self.blob._encode_preamble()
- unpacked = _preamble.PACMAN.unpack(preamble)
- size = unpacked[7]
- assert size == _crypto._ceiling(len(snowden1))
-
- @defer.inlineCallbacks
- def test_preamble_can_come_without_size(self):
- # XXX: This test case is here only to test backwards compatibility!
- preamble = self.blob._encode_preamble()
- # repack preamble using legacy format, without doc size
- unpacked = _preamble.PACMAN.unpack(preamble)
- preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7])
- # encrypt it manually for custom tag
- ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv,
- self.cleartext.getvalue(),
- aead=preamble_without_size)
- ciphertext = ciphertext + tag
- # encode it
- ciphertext = base64.urlsafe_b64encode(ciphertext)
- preamble_without_size = base64.urlsafe_b64encode(preamble_without_size)
- # decrypt it
- ciphertext = preamble_without_size + ' ' + ciphertext
- cleartext = yield _crypto.BlobDecryptor(
- self.doc_info, BytesIO(ciphertext),
- secret='A' * 96).decrypt()
- assert cleartext.getvalue() == self.cleartext.getvalue()
- warnings = self.flushWarnings()
- assert len(warnings) == 1
- assert 'legacy preamble without size' in warnings[0]['message']
-
-
-def _aes_encrypt(key, iv, data, aead=''):
- backend = default_backend()
- cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)
- encryptor = cipher.encryptor()
- if aead:
- encryptor.authenticate_additional_data(aead)
- return encryptor.update(data) + encryptor.finalize(), encryptor.tag
-
-
-def _aes_decrypt(key, iv, tag, data, aead=''):
- backend = default_backend()
- cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
- decryptor = cipher.decryptor()
- if aead:
- decryptor.authenticate_additional_data(aead)
- return decryptor.update(data) + decryptor.finalize()
diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py
deleted file mode 100644
index 939a2003..00000000
--- a/testing/tests/client/test_deprecated_crypto.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import json
-import pytest
-
-from pytest import inlineCallbacks
-from six.moves.urllib.parse import urljoin
-from uuid import uuid4
-
-from leap.soledad.client import crypto as old_crypto
-from leap.soledad.common.couch import CouchDatabase
-from leap.soledad.common import crypto as common_crypto
-
-from test_soledad.u1db_tests import simple_doc
-from test_soledad.util import SoledadWithCouchServerMixin
-from test_soledad.util import make_token_soledad_app
-from test_soledad.u1db_tests import TestCaseWithServer
-
-
-def deprecate_client_crypto(client):
- secret = client._crypto.secret
- _crypto = old_crypto.SoledadCrypto(secret)
- setattr(client._dbsyncer, '_crypto', _crypto)
- return client
-
-
-def couch_database(couch_url, uuid):
- db = CouchDatabase(couch_url, "user-%s" % (uuid,))
- return db
-
-
-class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
-
- def setUp(self):
- SoledadWithCouchServerMixin.setUp(self)
- TestCaseWithServer.setUp(self)
-
- def tearDown(self):
- SoledadWithCouchServerMixin.tearDown(self)
- TestCaseWithServer.tearDown(self)
-
- @staticmethod
- def make_app_with_state(state):
- return make_token_soledad_app(state)
-
- @pytest.mark.needs_couch
- @inlineCallbacks
- def test_touch_updates_remote_representation(self):
- self.startTwistedServer()
- user = 'user-' + uuid4().hex
- server_url = 'http://%s:%d' % (self.server_address)
- client = self._soledad_instance(user=user, server_url=server_url)
- deprecated_client = deprecate_client_crypto(
- self._soledad_instance(user=user, server_url=server_url))
-
- self.make_app()
- remote = self.request_state._create_database(replica_uid=client.uuid)
- remote = CouchDatabase.open_database(
- urljoin(self.couch_url, 'user-' + user),
- create=True)
-
- # ensure remote db is empty
- gen, docs = remote.get_all_docs()
- assert gen == 0
- assert len(docs) == 0
-
- # create a doc with deprecated client and sync
- yield deprecated_client.create_doc(json.loads(simple_doc))
- yield deprecated_client.sync()
-
- # check for doc in remote db
- gen, docs = remote.get_all_docs()
- assert gen == 1
- assert len(docs) == 1
- doc = docs.pop()
- content = doc.content
- assert common_crypto.ENC_JSON_KEY in content
- assert common_crypto.ENC_SCHEME_KEY in content
- assert common_crypto.ENC_METHOD_KEY in content
- assert common_crypto.ENC_IV_KEY in content
- assert common_crypto.MAC_KEY in content
- assert common_crypto.MAC_METHOD_KEY in content
-
- # "touch" the document with a newer client and synx
- _, docs = yield client.get_all_docs()
- yield client.put_doc(doc)
- yield client.sync()
-
- # check for newer representation of doc in remote db
- gen, docs = remote.get_all_docs()
- assert gen == 2
- assert len(docs) == 1
- doc = docs.pop()
- content = doc.content
- assert len(content) == 1
- assert 'raw' in content
diff --git a/testing/tests/client/test_doc.py b/testing/tests/client/test_doc.py
deleted file mode 100644
index 36479e90..00000000
--- a/testing/tests/client/test_doc.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_soledad_doc.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: soledad docs
-"""
-import pytest
-
-from testscenarios import TestWithScenarios
-
-from test_soledad.u1db_tests import test_document
-from test_soledad.util import BaseSoledadTest
-from test_soledad.util import make_soledad_document_for_test
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_document`.
-# -----------------------------------------------------------------------------
-
-@pytest.mark.usefixtures('method_tmpdir')
-class TestSoledadDocument(
- TestWithScenarios,
- test_document.TestDocument, BaseSoledadTest):
-
- scenarios = ([(
- 'leap', {
- 'make_document_for_test': make_soledad_document_for_test})])
-
-
-@pytest.mark.usefixtures('method_tmpdir')
-class TestSoledadPyDocument(
- TestWithScenarios,
- test_document.TestPyDocument, BaseSoledadTest):
-
- scenarios = ([(
- 'leap', {
- 'make_document_for_test': make_soledad_document_for_test})])
diff --git a/testing/tests/client/test_http.py b/testing/tests/client/test_http.py
deleted file mode 100644
index 47df4b4a..00000000
--- a/testing/tests/client/test_http.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_http.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: test http database
-"""
-
-from twisted.trial import unittest
-
-from leap.soledad.client import auth
-from leap.soledad.common.l2db.remote import http_database
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_http_database`.
-# -----------------------------------------------------------------------------
-
-class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth):
-
- """
- Wraps our token auth implementation.
- """
-
- def set_token_credentials(self, uuid, token):
- auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _sign_request(self, method, url_query, params):
- return auth.TokenBasedAuth._sign_request(
- self, method, url_query, params)
-
-
-class TestHTTPDatabaseWithCreds(unittest.TestCase):
-
- def test_get_sync_target_inherits_token_credentials(self):
- # this test was from TestDatabaseSimpleOperations but we put it here
- # for convenience.
- self.db = _HTTPDatabase('dbase')
- self.db.set_token_credentials('user-uuid', 'auth-token')
- st = self.db.get_sync_target()
- self.assertEqual(self.db._creds, st._creds)
-
- def test_ctr_with_creds(self):
- db1 = _HTTPDatabase('http://dbs/db', creds={'token': {
- 'uuid': 'user-uuid',
- 'token': 'auth-token',
- }})
- self.assertIn('token', db1._creds)
diff --git a/testing/tests/client/test_https.py b/testing/tests/client/test_https.py
deleted file mode 100644
index 1b6caed6..00000000
--- a/testing/tests/client/test_https.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_sync_target.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: https
-"""
-import pytest
-
-from testscenarios import TestWithScenarios
-
-from leap.soledad import client
-
-from leap.soledad.common.l2db.remote import http_client
-from test_soledad.u1db_tests import test_backends
-from test_soledad.u1db_tests import test_https
-from test_soledad.util import (
- BaseSoledadTest,
- make_soledad_document_for_test,
- make_soledad_app,
- make_token_soledad_app,
-)
-
-
-LEAP_SCENARIOS = [
- ('http', {
- 'make_database_for_test': test_backends.make_http_database_for_test,
- 'copy_database_for_test': test_backends.copy_http_database_for_test,
- 'make_document_for_test': make_soledad_document_for_test,
- 'make_app_with_state': make_soledad_app}),
-]
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_https`.
-# -----------------------------------------------------------------------------
-
-def token_leap_https_sync_target(test, host, path, cert_file=None):
- _, port = test.server.server_address
- # source_replica_uid = test._soledad._dbpool.replica_uid
- creds = {'token': {'uuid': 'user-uuid', 'token': 'auth-token'}}
- if not cert_file:
- cert_file = test.cacert_pem
- st = client.http_target.SoledadHTTPSyncTarget(
- 'https://%s:%d/%s' % (host, port, path),
- source_replica_uid='other-id',
- creds=creds,
- crypto=test._soledad._crypto,
- cert_file=cert_file)
- return st
-
-
-@pytest.mark.skip
-class TestSoledadHTTPSyncTargetHttpsSupport(
- TestWithScenarios,
- # test_https.TestHttpSyncTargetHttpsSupport,
- BaseSoledadTest):
-
- scenarios = [
- ('token_soledad_https',
- {
- # 'server_def': test_https.https_server_def,
- 'make_app_with_state': make_token_soledad_app,
- 'make_document_for_test': make_soledad_document_for_test,
- 'sync_target': token_leap_https_sync_target}),
- ]
-
- def setUp(self):
- # the parent constructor undoes our SSL monkey patch to ensure tests
- # run smoothly with standard u1db.
- test_https.TestHttpSyncTargetHttpsSupport.setUp(self)
- # so here monkey patch again to test our functionality.
- api = client.api
- http_client._VerifiedHTTPSConnection = api.VerifiedHTTPSConnection
- client.api.SOLEDAD_CERT = http_client.CA_CERTS
-
- def test_cannot_verify_cert(self):
- self.startServer()
- # don't print expected traceback server-side
- self.server.handle_error = lambda req, cli_addr: None
- self.request_state._create_database('test')
- remote_target = self.getSyncTarget(
- 'localhost', 'test', cert_file=http_client.CA_CERTS)
- d = remote_target.record_sync_info('other-id', 2, 'T-id')
-
- def _assert_raises(result):
- from twisted.python.failure import Failure
- if isinstance(result, Failure):
- from OpenSSL.SSL import Error
- error = result.value.message[0].value
- if isinstance(error, Error):
- msg = error.message[0][2]
- self.assertEqual("certificate verify failed", msg)
- return
- self.fail("certificate verification should have failed.")
-
- d.addCallbacks(_assert_raises, _assert_raises)
- return d
-
- def test_working(self):
- """
- Test that SSL connections work well.
-
- This test was adapted to patch Soledad's HTTPS connection custom class
- with the intended CA certificates.
- """
- self.startServer()
- db = self.request_state._create_database('test')
- remote_target = self.getSyncTarget('localhost', 'test')
- d = remote_target.record_sync_info('other-id', 2, 'T-id')
- d.addCallback(lambda _:
- self.assertEqual(
- (2, 'T-id'),
- db._get_replica_gen_and_trans_id('other-id')
- ))
- d.addCallback(lambda _: remote_target.close())
- return d
-
- def test_host_mismatch(self):
- """
- This test is disabled because soledad's twisted-based http agent uses
- pyOpenSSL, which will complain if we try to use an IP to connect to
- the remote host (see the original test in u1db_tests/test_https.py).
- """
- pass
diff --git a/testing/tests/client/test_incoming_processing_flow.py b/testing/tests/client/test_incoming_processing_flow.py
deleted file mode 100644
index 7bc1e3c6..00000000
--- a/testing/tests/client/test_incoming_processing_flow.py
+++ /dev/null
@@ -1,197 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_incoming_processing_flow.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Unit tests for incoming box processing flow.
-"""
-from mock import Mock, call
-from leap.soledad.client import interfaces
-from leap.soledad.client.incoming import IncomingBoxProcessingLoop
-from twisted.internet import defer
-from twisted.trial import unittest
-from zope.interface import implementer
-
-
-@implementer(interfaces.IIncomingBoxConsumer)
-class GoodConsumer(object):
- def __init__(self):
- self.name = 'GoodConsumer'
- self.processed, self.saved = [], []
-
- def process(self, item, item_id, encrypted=True):
- self.processed.append(item_id)
- return defer.succeed([item_id])
-
- def save(self, parts, item_id):
- self.saved.append(item_id)
- return defer.succeed(None)
-
-
-class ProcessingFailedConsumer(GoodConsumer):
- def __init__(self):
- self.name = 'ProcessingFailedConsumer'
- self.processed, self.saved = [], []
-
- def process(self, item, item_id, encrypted=True):
- return defer.fail()
-
-
-class SavingFailedConsumer(GoodConsumer):
- def __init__(self):
- self.name = 'SavingFailedConsumer'
- self.processed, self.saved = [], []
-
- def save(self, parts, item_id):
- return defer.fail()
-
-
-class IncomingBoxProcessingTestCase(unittest.TestCase):
-
- def setUp(self):
- self.box = Mock()
- self.loop = IncomingBoxProcessingLoop(self.box)
-
- def _set_pending_items(self, pending):
- self.box.list_pending.return_value = defer.succeed(pending)
- pending_iter = iter([defer.succeed(item) for item in pending])
- self.box.fetch_for_processing.side_effect = pending_iter
-
- @defer.inlineCallbacks
- def test_processing_flow_reserves_a_message(self):
- self._set_pending_items(['one_item'])
- self.loop.add_consumer(GoodConsumer())
- yield self.loop()
- self.box.fetch_for_processing.assert_called_once_with('one_item')
-
- @defer.inlineCallbacks
- def test_no_consumers(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- yield self.loop()
- self.box.fetch_for_processing.assert_not_called()
- self.box.delete.assert_not_called()
-
- @defer.inlineCallbacks
- def test_pending_list_with_multiple_items(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = GoodConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- calls = [call('one'), call('two'), call('three')]
- self.box.fetch_for_processing.assert_has_calls(calls)
-
- @defer.inlineCallbacks
- def test_good_consumer_process_all(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = GoodConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.assertEquals(items, consumer.processed)
-
- @defer.inlineCallbacks
- def test_good_consumer_saves_all(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = GoodConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.assertEquals(items, consumer.saved)
-
- @defer.inlineCallbacks
- def test_multiple_good_consumers_process_all(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = GoodConsumer()
- consumer2 = GoodConsumer()
- self.loop.add_consumer(consumer)
- self.loop.add_consumer(consumer2)
- yield self.loop()
- self.assertEquals(items, consumer.processed)
- self.assertEquals(items, consumer2.processed)
-
- @defer.inlineCallbacks
- def test_good_consumer_marks_as_processed(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = GoodConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.set_processed.assert_has_calls([call(x) for x in items])
-
- @defer.inlineCallbacks
- def test_good_consumer_deletes_items(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = GoodConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.delete.assert_has_calls([call(x) for x in items])
-
- @defer.inlineCallbacks
- def test_processing_failed_doesnt_mark_as_processed(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = ProcessingFailedConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.set_processed.assert_not_called()
-
- @defer.inlineCallbacks
- def test_processing_failed_doesnt_delete(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = ProcessingFailedConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.delete.assert_not_called()
-
- @defer.inlineCallbacks
- def test_processing_failed_marks_as_failed(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = ProcessingFailedConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.set_failed.assert_has_calls([call(x) for x in items])
-
- @defer.inlineCallbacks
- def test_saving_failed_marks_as_processed(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = SavingFailedConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.set_processed.assert_has_calls([call(x) for x in items])
-
- @defer.inlineCallbacks
- def test_saving_failed_doesnt_delete(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = SavingFailedConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.delete.assert_not_called()
-
- @defer.inlineCallbacks
- def test_saving_failed_marks_as_failed(self):
- items = ['one', 'two', 'three']
- self._set_pending_items(items)
- consumer = SavingFailedConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.box.set_failed.assert_has_calls([call(x) for x in items])
diff --git a/testing/tests/client/test_recovery_code.py b/testing/tests/client/test_recovery_code.py
deleted file mode 100644
index 7bbccc41..00000000
--- a/testing/tests/client/test_recovery_code.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- CODing: utf-8 -*-
-# test_recovery_code.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for recovery code generation.
-"""
-import binascii
-
-from mock import patch
-from twisted.trial import unittest
-from leap.soledad.client._recovery_code import RecoveryCode
-
-
-class RecoveryCodeTestCase(unittest.TestCase):
-
- @patch('leap.soledad.client._recovery_code.os.urandom')
- def test_generate_recovery_code(self, mock_os_urandom):
- generated_random_code = '123456'
- mock_os_urandom.return_value = generated_random_code
- recovery_code = RecoveryCode()
-
- code = recovery_code.generate()
-
- mock_os_urandom.assert_called_with(RecoveryCode.code_length)
- self.assertEqual(binascii.hexlify(generated_random_code), code)
diff --git a/testing/tests/client/test_secrets.py b/testing/tests/client/test_secrets.py
deleted file mode 100644
index 7b643cb4..00000000
--- a/testing/tests/client/test_secrets.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- CODing: utf-8 -*-
-# test_secrets.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for secrets encryption and decryption.
-"""
-import scrypt
-
-from twisted.trial import unittest
-
-from leap.soledad.client._crypto import ENC_METHOD
-from leap.soledad.client._secrets import SecretsCrypto
-
-
-class SecretsCryptoTestCase(unittest.TestCase):
-
- SECRETS = {
- 'remote_secret': 'a' * 512,
- 'local_salt': 'b' * 64,
- 'local_secret': 'c' * 448
- }
- ENCRYPTED_V2 = {
- 'cipher': 2,
- 'length': 1437,
- 'kdf_salt': 'TSgNLeAGFeITeSgNzmYZHh+mzmkZPOqao7CAV/tx3KZCLwsrT0HmWtVK3'
- 'TyWHWNgVdeamMZYRuvZavE2sp0DGw==\n',
- 'iv': 'TKZQKIlRgdnXFhJf08qswg==',
- 'secrets': 'ZNZRi72VDtwZqyuU+uf3yzZt23vCtMS3Ki2bnZyeHUOSGVweJeDadF4oqE'
- 'BW87NN00j9E49BzyzLr9SNgwZjPp0wlUm7kt+s8EUfJUdH8nxaQ+9iqGXM'
- 'cCHmBM8L8DRN2m3BrPGx7m+QGlN9sbrRpl7fqc46RWcYuTEpm4upjdtI7O'
- 'jDd0JG3C0rUzIuKJn9w4rEpX3tLEKXVdZfLvRXS5roR0cauazsDO69E13q'
- 'a01vDuY+UJ+buLQ3FluPnnk8QE7ztPVUmRJJ76yAIhjVX9owiwlp9GnUJY'
- 'sETRCqdRSTwUcHIkzVR0zAvtxTX7eGTitzf4gCYEC4T9v5N/jHxEfPdx28'
- 'MM4KShWN2nFxNFQLQUpMN2OrM7UyUw+DQ3ydqBeBPKPHRN5s05kIK7P/Ra'
- 'aLNcrJWa7DopLbgLlei0Jd7S4sjv1ufaRY7v0qJaVkhh/VaCylTSVw1rv5'
- 'YzSWcHHcLuC0R8xLadz6T+EpsVYxgPYCS7w5xoE82zwNQzw/EBxLIcyLPl'
- 'ipKnr2dttrmm3KXUOT1IdbSbI5elF6yQTAusdqiXuypey+MDqHYWEYWkCn'
- 'e9/uGM9FjklDLE0RtPEDxhq64tw6u2Xu7RzDzyQDI8EIoTdU+4zEMTnelZ'
- 'fKEwdG58EDxTXfUk6IDcRUupz3YuToSMhIOkqgXnbWl/nrK0O9v4JMhQjI'
- 'r+oPICYfFr14kvJXBsfntILTJCxzbqTQcNba3jc8rGqCZ6gM0u4PndwTG2'
- 'UiCqPU2HMnWvVGQOXeLdQY+EqqXQiRDi0DrDmkVwFf+27dPXxmZ43C48W3'
- 'lMhTKXl0rdBFnOD5jiMh0X6q/KYXonyEtMZMsjT7dFePcCy4wQRhuut+ac'
- '/TJWyrr+/IB45E+LZbhV7xCy1dYsbdb52jTRJFpaQ83sj6Iv6SYdiqqXzL'
- 'F5JGMyuovTjwAoIIQzpLv36xY2wGGAH1V8c7QmDR2qumXrHD9R68WjBoSY'
- '7IFM0TFAGZNun56y/zQ4r8yOMSAId+j4kuRH0fENEi0FJ+SpmSdHfpvBhE'
- 'MdGh927E9enEYWmUQMmkxXIw6E+O3cmOWt2hsMbUAikDCpQOnVP2BD55HT'
- '6FfbW7ITVwPINHYexmy2Xcm8H5zzGFSp+uYIyPBYDKA+VJ+QQI8bud9K+T'
- 'NBybUv9u6LbB6BsLpwLoxMPJu0WsN2HpmLYgrg2ML1huMF1OtaGRuUr2PL'
- 'NBaZaL6VOztYrVtQG1+tNyRxn8XQTtx0l6n+EihGVe9Sk5XF6DJA9ZN7uO'
- 'svTUFJ5qG3Erf4AmbUJWoOR/NvapBtifidM7gPZZ6NqBs6v72rU1pGy+p7'
- 'o84KrmB2MNf3yJ0BvKxPvFmltF3Dc7LB5TN8ycbmFM6hgrLvvhPxiHEnG/'
- '8Qcrg0nUXOipFGNgZEU7t7Mz6RJ189Z2Kx1HVGrkAzEgqwZYqijAPlsgzO'
- 'bg6DwzwC7stolQWGCDQUtJVlE8FZ/Up8zFYYZKn52WzjmSN4/hHhEvdkck'
- 'Nez/JVev6fMcVrgdrTZ+uCwxjN/4xPdgog2HV470ea1bvIkSNOOrhm194M'
- '40GmvmBhiSSMjdRQCQtM0t9bUuSQLPDzEiCA9QaLyygtlz9uRR/dXgkEg0'
- 'J4YNpZvhE0wbyp4GHytbPaAmrcd7im9+buTuMwhXpZl0stmfkJxVHJSZ8Y'
- 'IHakHs3W1fdYyI3wxGpel/9eYO3ISukolwrHXESP65wVNKfBwbqVJzQmts'
- 'pyDBOI6DcLKZfE1EVg0+uwQ/5PKZbn0TwlXO1YE3NL3mAply3zQR9hyBrY'
- '6f1jkHVD3irIlWkSiPJsP8sW+nrK8c/Ha8F+dua6DTZmg594OIaQj8mPiY'
- 'GcIusiARWocR5/MmSjupGOgFx4HtmckTJtAta3XP4elOx04teH/P9Cgr1x'
- 'XYf+cEX6gp92L9rTo0FCz3Hw==\n',
- 'version': 2,
- 'kdf': 'scrypt',
- 'kdf_length': 32
- }
-
- ENCRYPTED_V1 = {
- 'version': 1,
- 'active_secret': 'secret_id',
- 'storage_secrets': {
- 'secret_id': {
- 'kdf': 'scrypt',
- 'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h'
- 'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F'
- 'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh'
- 'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp'
- 'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR'
- 'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050'
- 'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc'
- 'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6'
- 'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9'
- 'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC'
- 'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF'
- 'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa'
- 'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw'
- 'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC'
- 'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu'
- 'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/'
- 'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS'
- 'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v'
- 'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj'
- 'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV'
- 'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk'
- 'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV'
- 'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8'
- 'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ'
- '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx'
- 'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4'
- 'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw'
- '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==',
- 'kdf_length': 32,
- 'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB'
- 'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n',
- 'length': 1024,
- 'cipher': 'aes256',
- }
- }
- }
-
- def setUp(self):
- class Soledad(object):
- passphrase = '123'
- soledad = Soledad()
- self._crypto = SecretsCrypto(soledad)
-
- def test__get_key(self):
- salt = 'abc'
- expected = scrypt.hash('123', salt, buflen=32)
- key = self._crypto._get_key(salt)
- self.assertEqual(expected, key)
-
- def test_encrypt(self):
- info = self._crypto.encrypt(self.SECRETS)
- self.assertEqual(8, len(info))
- for key, value in [
- ('kdf', 'scrypt'),
- ('kdf_salt', None),
- ('kdf_length', None),
- ('cipher', ENC_METHOD.aes_256_gcm),
- ('length', None),
- ('iv', None),
- ('secrets', None),
- ('version', 2)]:
- self.assertTrue(key in info)
- if value:
- self.assertEqual(info[key], value)
-
- def test__decrypt_v2(self):
- encrypted = self.ENCRYPTED_V2
- decrypted = self._crypto.decrypt(encrypted)
- self.assertEqual(decrypted, self.SECRETS)
-
- def test__decrypt_v1(self):
- encrypted = self.ENCRYPTED_V1
- decrypted = self._crypto.decrypt(encrypted)
- self.assertEqual(decrypted, self.SECRETS)
-
- def test__no_version_defaults_to_v1(self):
- encrypted = dict(self.ENCRYPTED_V1)
- del encrypted['version']
- decrypted = self._crypto.decrypt(encrypted)
- self.assertEqual(decrypted, self.SECRETS)
- self.assertEqual(encrypted['version'], 1)
diff --git a/testing/tests/client/test_shared_db.py b/testing/tests/client/test_shared_db.py
deleted file mode 100644
index b045e524..00000000
--- a/testing/tests/client/test_shared_db.py
+++ /dev/null
@@ -1,40 +0,0 @@
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.client.shared_db import SoledadSharedDatabase
-
-from test_soledad.util import BaseSoledadTest
-
-
-class SoledadSharedDBTestCase(BaseSoledadTest):
-
- """
- These tests ensure the functionalities of the shared recovery database.
- """
-
- def setUp(self):
- BaseSoledadTest.setUp(self)
- self._shared_db = SoledadSharedDatabase(
- 'https://provider/', document_factory=SoledadDocument,
- creds=None)
-
- def tearDown(self):
- BaseSoledadTest.tearDown(self)
-
- def test__get_remote_doc(self):
- """
- Ensure the shared db is queried with the correct doc_id.
- """
- doc_id = self._soledad.secrets.storage._remote_doc_id()
- self._soledad.secrets.storage._get_remote_doc()
- self._soledad.secrets.storage._shared_db.get_doc.assert_called_with(
- doc_id)
-
- def test_save_remote(self):
- """
- Ensure recovery document is put into shared recover db.
- """
- doc_id = self._soledad.secrets.storage._remote_doc_id()
- storage = self._soledad.secrets.storage
- storage.save_remote({'content': 'blah'})
- storage._shared_db.get_doc.assert_called_with(doc_id)
- storage._shared_db.put_doc.assert_called_with(self._doc_put)
- self.assertTrue(self._doc_put.doc_id == doc_id)
diff --git a/testing/tests/client/test_signals.py b/testing/tests/client/test_signals.py
deleted file mode 100644
index c7609a74..00000000
--- a/testing/tests/client/test_signals.py
+++ /dev/null
@@ -1,149 +0,0 @@
-from mock import Mock
-from twisted.internet import defer
-
-from leap import soledad
-from leap.common.events import catalog
-from leap.soledad.common.document import SoledadDocument
-
-from test_soledad.util import ADDRESS
-from test_soledad.util import BaseSoledadTest
-
-
-class SoledadSignalingTestCase(BaseSoledadTest):
-
- """
- These tests ensure signals are correctly emmited by Soledad.
- """
-
- EVENTS_SERVER_PORT = 8090
-
- def setUp(self):
- # mock signaling
- soledad.client.signal = Mock()
- soledad.client._secrets.util.events.emit_async = Mock()
- # run parent's setUp
- BaseSoledadTest.setUp(self)
-
- def tearDown(self):
- BaseSoledadTest.tearDown(self)
-
- def _pop_mock_call(self, mocked):
- mocked.call_args_list.pop()
- mocked.mock_calls.pop()
- mocked.call_args = mocked.call_args_list[-1]
-
- def test_stage3_bootstrap_signals(self):
- """
- Test that a fresh soledad emits all bootstrap signals.
-
- Signals are:
- - downloading keys / done downloading keys.
- - creating keys / done creating keys.
- - downloading keys / done downloading keys.
- - uploading keys / done uploading keys.
- """
- soledad.client._secrets.util.events.emit_async.reset_mock()
- # get a fresh instance so it emits all bootstrap signals
- sol = self._soledad_instance(
- secrets_path='alternative_stage3.json',
- local_db_path='alternative_stage3.u1db')
- # reverse call order so we can verify in the order the signals were
- # expected
- soledad.client._secrets.util.events.emit_async.mock_calls.reverse()
- soledad.client._secrets.util.events.emit_async.call_args = \
- soledad.client._secrets.util.events.emit_async.call_args_list[0]
- soledad.client._secrets.util.events.emit_async.call_args_list.reverse()
-
- user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
-
- def _assert(*args, **kwargs):
- mocked = soledad.client._secrets.util.events.emit_async
- mocked.assert_called_with(*args)
- pop = kwargs.get('pop')
- if pop or pop is None:
- self._pop_mock_call(mocked)
-
- _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
- _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
- _assert(catalog.SOLEDAD_CREATING_KEYS, user_data)
- _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data)
- _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data)
- _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
- _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
- _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False)
-
- sol.close()
-
- def test_stage2_bootstrap_signals(self):
- """
- Test that if there are keys in server, soledad will download them and
- emit corresponding signals.
- """
- # get existing instance so we have access to keys
- sol = self._soledad_instance()
- # create a document with secrets
- doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id())
- doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets)
- sol.close()
- # reset mock
- soledad.client._secrets.util.events.emit_async.reset_mock()
- # get a fresh instance so it emits all bootstrap signals
- shared_db = self.get_default_shared_mock(get_doc_return_value=doc)
- sol = self._soledad_instance(
- secrets_path='alternative_stage2.json',
- local_db_path='alternative_stage2.u1db',
- shared_db_class=shared_db)
- # reverse call order so we can verify in the order the signals were
- # expected
- mocked = soledad.client._secrets.util.events.emit_async
- mocked.mock_calls.reverse()
- mocked.call_args = mocked.call_args_list[0]
- mocked.call_args_list.reverse()
-
- def _assert(*args, **kwargs):
- mocked = soledad.client._secrets.util.events.emit_async
- mocked.assert_called_with(*args)
- pop = kwargs.get('pop')
- if pop or pop is None:
- self._pop_mock_call(mocked)
-
- # assert download keys signals
- user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
- _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
- _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False)
-
- sol.close()
-
- def test_stage1_bootstrap_signals(self):
- """
- Test that if soledad already has a local secret, it emits no signals.
- """
- soledad.client.signal.reset_mock()
- # get an existent instance so it emits only some of bootstrap signals
- sol = self._soledad_instance()
- self.assertEqual([], soledad.client.signal.mock_calls)
- sol.close()
-
- @defer.inlineCallbacks
- def test_sync_signals(self):
- """
- Test Soledad emits SOLEDAD_CREATING_KEYS signal.
- """
- # get a fresh instance so it emits all bootstrap signals
- sol = self._soledad_instance()
- soledad.client.signal.reset_mock()
-
- # mock the actual db sync so soledad does not try to connect to the
- # server
- d = defer.Deferred()
- d.callback(None)
- sol._dbsyncer.sync = Mock(return_value=d)
-
- yield sol.sync()
-
- # assert the signal has been emitted
- soledad.client.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DATA_SYNC,
- {'userid': ADDRESS, 'uuid': ADDRESS},
- )
- sol.close()
diff --git a/testing/tests/client/test_soledad_doc.py b/testing/tests/client/test_soledad_doc.py
deleted file mode 100644
index e158d768..00000000
--- a/testing/tests/client/test_soledad_doc.py
+++ /dev/null
@@ -1,46 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_soledad_doc.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: soledad docs
-"""
-from testscenarios import TestWithScenarios
-
-from test_soledad.u1db_tests import test_document
-from test_soledad.util import BaseSoledadTest
-from test_soledad.util import make_soledad_document_for_test
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_document`.
-# -----------------------------------------------------------------------------
-
-class TestSoledadDocument(
- TestWithScenarios,
- test_document.TestDocument, BaseSoledadTest):
-
- scenarios = ([(
- 'leap', {
- 'make_document_for_test': make_soledad_document_for_test})])
-
-
-class TestSoledadPyDocument(
- TestWithScenarios,
- test_document.TestPyDocument, BaseSoledadTest):
-
- scenarios = ([(
- 'leap', {
- 'make_document_for_test': make_soledad_document_for_test})])