From cfff46ff9becdbe5cf48816870e625ed253ecc57 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 17 Sep 2017 12:08:25 -0300 Subject: [refactor] move tests to root of repository Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952 --- tests/client/__init__.py | 0 tests/client/test_api.py | 36 +++ tests/client/test_app.py | 52 ++++ tests/client/test_attachments.py | 81 ++++++ tests/client/test_aux_methods.py | 132 +++++++++ tests/client/test_crypto.py | 384 ++++++++++++++++++++++++++ tests/client/test_deprecated_crypto.py | 94 +++++++ tests/client/test_doc.py | 50 ++++ tests/client/test_http.py | 60 ++++ tests/client/test_https.py | 137 +++++++++ tests/client/test_incoming_processing_flow.py | 197 +++++++++++++ tests/client/test_recovery_code.py | 38 +++ tests/client/test_secrets.py | 166 +++++++++++ tests/client/test_shared_db.py | 40 +++ tests/client/test_signals.py | 149 ++++++++++ tests/client/test_soledad | 1 + tests/client/test_soledad_doc.py | 46 +++ 17 files changed, 1663 insertions(+) create mode 100644 tests/client/__init__.py create mode 100644 tests/client/test_api.py create mode 100644 tests/client/test_app.py create mode 100644 tests/client/test_attachments.py create mode 100644 tests/client/test_aux_methods.py create mode 100644 tests/client/test_crypto.py create mode 100644 tests/client/test_deprecated_crypto.py create mode 100644 tests/client/test_doc.py create mode 100644 tests/client/test_http.py create mode 100644 tests/client/test_https.py create mode 100644 tests/client/test_incoming_processing_flow.py create mode 100644 tests/client/test_recovery_code.py create mode 100644 tests/client/test_secrets.py create mode 100644 tests/client/test_shared_db.py create mode 100644 tests/client/test_signals.py create mode 120000 tests/client/test_soledad create mode 100644 tests/client/test_soledad_doc.py (limited to 'tests/client') diff --git a/tests/client/__init__.py b/tests/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/client/test_api.py b/tests/client/test_api.py new file mode 100644 index 00000000..3c6a8155 --- /dev/null +++ b/tests/client/test_api.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# test_api.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for soledad api. +""" + +from mock import MagicMock + +from test_soledad.util import BaseSoledadTest + + +class ApiTestCase(BaseSoledadTest): + + def test_recovery_code_creation(self): + recovery_code_mock = MagicMock() + generated_code = '4645a2f8997e5d0d' + recovery_code_mock.generate.return_value = generated_code + self._soledad._recovery_code = recovery_code_mock + + code = self._soledad.create_recovery_code() + + self.assertEqual(generated_code, code) diff --git a/tests/client/test_app.py b/tests/client/test_app.py new file mode 100644 index 00000000..6867473e --- /dev/null +++ b/tests/client/test_app.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# test_soledad_app.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Test ObjectStore and Couch backend bits. +""" +import pytest + +from testscenarios import TestWithScenarios + +from test_soledad.util import BaseSoledadTest +from test_soledad.util import make_soledad_document_for_test +from test_soledad.util import make_token_soledad_app +from test_soledad.util import make_token_http_database_for_test +from test_soledad.util import copy_token_http_database_for_test +from test_soledad.u1db_tests import test_backends + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +# ----------------------------------------------------------------------------- + +@pytest.mark.usefixtures('method_tmpdir') +class SoledadTests( + TestWithScenarios, test_backends.AllDatabaseTests, BaseSoledadTest): + + def setUp(self): + TestWithScenarios.setUp(self) + test_backends.AllDatabaseTests.setUp(self) + BaseSoledadTest.setUp(self) + + scenarios = [ + ('token_http', { + 'make_database_for_test': make_token_http_database_for_test, + 'copy_database_for_test': copy_token_http_database_for_test, + 'make_document_for_test': make_soledad_document_for_test, + 'make_app_with_state': make_token_soledad_app, + }) + ] diff --git a/tests/client/test_attachments.py b/tests/client/test_attachments.py new file mode 100644 index 00000000..2df5b90d --- /dev/null +++ b/tests/client/test_attachments.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# test_attachments.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for document attachments. +""" + +import pytest + +from io import BytesIO +from mock import Mock + +from twisted.internet import defer +from test_soledad.util import BaseSoledadTest + + +from leap.soledad.client import AttachmentStates + + +def mock_response(doc): + doc._manager._client.get = Mock( + return_value=defer.succeed(Mock(code=200, json=lambda: []))) + doc._manager._client.put = Mock( + return_value=defer.succeed(Mock(code=200))) + + +@pytest.mark.usefixture('method_tmpdir') +class AttachmentTests(BaseSoledadTest): + + @defer.inlineCallbacks + def test_create_doc_saves_store(self): + doc = yield self._soledad.create_doc({}) + self.assertEqual(self._soledad, doc.store) + + @defer.inlineCallbacks + def test_put_attachment(self): + doc = yield self._soledad.create_doc({}) + mock_response(doc) + yield doc.put_attachment(BytesIO('test')) + local_list = yield doc._manager.local_list() + self.assertIn(doc._blob_id, local_list) + + @defer.inlineCallbacks + def test_get_attachment(self): + doc = yield self._soledad.create_doc({}) + mock_response(doc) + yield doc.put_attachment(BytesIO('test')) + fd = yield doc.get_attachment() + self.assertEqual('test', fd.read()) + + @defer.inlineCallbacks + def test_get_attachment_state(self): + doc = yield self._soledad.create_doc({}) + state = yield doc.get_attachment_state() + self.assertEqual(AttachmentStates.NONE, state) + mock_response(doc) + yield doc.put_attachment(BytesIO('test')) + state = yield doc.get_attachment_state() + self.assertEqual(AttachmentStates.LOCAL, state) + + @defer.inlineCallbacks + def test_is_dirty(self): + doc = yield self._soledad.create_doc({}) + dirty = yield doc.is_dirty() + self.assertFalse(dirty) + doc.content = {'test': True} + dirty = yield doc.is_dirty() + self.assertTrue(dirty) diff --git a/tests/client/test_aux_methods.py b/tests/client/test_aux_methods.py new file mode 100644 index 00000000..1eb676c7 --- /dev/null +++ b/tests/client/test_aux_methods.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for general Soledad functionality. +""" +import os + +from pytest import inlineCallbacks + +from leap.soledad.client import Soledad +from leap.soledad.client._db.adbapi import U1DBConnectionPool +from leap.soledad.client._secrets.util import SecretsError + +from test_soledad.util import BaseSoledadTest + + +class AuxMethodsTestCase(BaseSoledadTest): + + def test__init_dirs(self): + sol = self._soledad_instance(prefix='_init_dirs') + local_db_dir = os.path.dirname(sol.local_db_path) + secrets_path = os.path.dirname(sol.secrets_path) + self.assertTrue(os.path.isdir(local_db_dir)) + self.assertTrue(os.path.isdir(secrets_path)) + + def _close_soledad(results): + sol.close() + + d = sol.create_doc({}) + d.addCallback(_close_soledad) + return d + + def test__init_u1db_sqlcipher_backend(self): + sol = self._soledad_instance(prefix='_init_db') + self.assertIsInstance(sol._dbpool, U1DBConnectionPool) + self.assertTrue(os.path.isfile(sol.local_db_path)) + sol.close() + + def test__init_config_with_defaults(self): + """ + Test if configuration defaults point to the correct place. + """ + + class SoledadMock(Soledad): + + def __init__(self): + pass + + # instantiate without initializing so we just test + # _init_config_with_defaults() + sol = SoledadMock() + sol.passphrase = u'' + sol.server_url = '' + sol._init_config_with_defaults() + # assert value of local_db_path + self.assertEquals( + os.path.join(sol.default_prefix, 'soledad.u1db'), + sol.local_db_path) + + def test__init_config_from_params(self): + """ + Test if configuration is correctly read from file. + """ + sol = self._soledad_instance( + 'leap@leap.se', + passphrase=u'123', + secrets_path='value_3', + local_db_path='value_2', + server_url='value_1', + cert_file=None) + self.assertEqual( + os.path.join(self.tempdir, 'value_3'), + sol.secrets_path) + self.assertEqual( + os.path.join(self.tempdir, 'value_2'), + sol.local_db_path) + self.assertEqual('value_1', sol.server_url) + sol.close() + + @inlineCallbacks + def test_change_passphrase(self): + """ + Test if passphrase can be changed. + """ + prefix = '_change_passphrase' + sol = self._soledad_instance( + 'leap@leap.se', + passphrase=u'123', + prefix=prefix, + ) + + doc1 = yield sol.create_doc({'simple': 'doc'}) + sol.change_passphrase(u'654321') + sol.close() + + with self.assertRaises(SecretsError): + self._soledad_instance( + 'leap@leap.se', + passphrase=u'123', + prefix=prefix) + + sol2 = self._soledad_instance( + 'leap@leap.se', + passphrase=u'654321', + prefix=prefix) + doc2 = yield sol2.get_doc(doc1.doc_id) + + self.assertEqual(doc1, doc2) + + sol2.close() + + def test_get_passphrase(self): + """ + Assert passphrase getter works fine. + """ + sol = self._soledad_instance() + self.assertEqual('123', sol.passphrase) + sol.close() diff --git a/tests/client/test_crypto.py b/tests/client/test_crypto.py new file mode 100644 index 00000000..5b647b73 --- /dev/null +++ b/tests/client/test_crypto.py @@ -0,0 +1,384 @@ +# -*- coding: utf-8 -*- +# test_crypto.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for cryptographic related stuff. +""" +import binascii +import base64 +import json +import os + +from io import BytesIO + +import pytest + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.exceptions import InvalidTag + +from leap.soledad.common.document import SoledadDocument +from test_soledad.util import BaseSoledadTest +from leap.soledad.client import _crypto +from leap.soledad.client import _scrypt +from leap.soledad.common.blobs import preamble as _preamble + +from twisted.trial import unittest +from twisted.internet import defer + + +snowden1 = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + + +class ScryptTest(unittest.TestCase): + + def test_scrypt(self): + secret = 'supersikret' + salt = 'randomsalt' + key = _scrypt.hash(secret, salt, buflen=32) + expected = ('47996b569ea58d51ccbcc318d710' + 'a537acd28bb7a94615ab8d061d4b2a920f01') + assert binascii.b2a_hex(key) == expected + + +class AESTest(unittest.TestCase): + + def test_chunked_encryption(self): + key = 'A' * 32 + + fd = BytesIO() + aes = _crypto.AESWriter(key, _buffer=fd) + iv = aes.iv + + data = snowden1 + block = 16 + + for i in range(len(data) / block): + chunk = data[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + ciphertext_chunked = fd.getvalue() + ciphertext, tag = _aes_encrypt(key, iv, data) + + assert ciphertext_chunked == ciphertext + + def test_decrypt(self): + key = 'A' * 32 + iv = 'A' * 16 + + data = snowden1 + block = 16 + + ciphertext, tag = _aes_encrypt(key, iv, data) + + fd = BytesIO() + aes = _crypto.AESWriter(key, iv, fd, tag=tag) + + for i in range(len(ciphertext) / block): + chunk = ciphertext[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + cleartext_chunked = fd.getvalue() + assert cleartext_chunked == data + + +class BlobTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + def setUp(self): + self.inf = BytesIO(snowden1) + self.blob = _crypto.BlobEncryptor( + self.doc_info, self.inf, + armor=True, + secret='A' * 96) + + @defer.inlineCallbacks + def test_unarmored_blob_encrypt(self): + self.blob.armor = False + encrypted = yield self.blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, encrypted, armor=False, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_default_armored_blob_encrypt(self): + encrypted = yield self.blob.encrypt() + decode = base64.urlsafe_b64decode + assert map(decode, encrypted.getvalue().split()) + + @defer.inlineCallbacks + def test_blob_encryptor(self): + encrypted = yield self.blob.encrypt() + preamble, ciphertext = encrypted.getvalue().split() + preamble = base64.urlsafe_b64decode(preamble) + ciphertext = base64.urlsafe_b64decode(ciphertext) + ciphertext = ciphertext[:-16] + + assert len(preamble) == _preamble.PACMAN.size + unpacked_data = _preamble.PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data + assert magic == _crypto.MAGIC + assert sch == 1 + assert meth == _crypto.ENC_METHOD.aes_256_gcm + assert iv == self.blob.iv + assert doc_id == 'D-deadbeef' + assert rev == self.doc_info.rev + + aes_key = _crypto._get_sym_key_for_doc( + self.doc_info.doc_id, 'A' * 96) + assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0] + + decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag, + ciphertext, preamble) + assert str(decrypted) == snowden1 + + @defer.inlineCallbacks + def test_init_with_preamble_alone(self): + ciphertext = yield self.blob.encrypt() + preamble = ciphertext.getvalue().split()[0] + decryptor = _crypto.BlobDecryptor( + self.doc_info, BytesIO(preamble), + start_stream=False, + secret='A' * 96) + assert decryptor._consume_preamble() + + @defer.inlineCallbacks + def test_incremental_blob_decryptor(self): + ciphertext = yield self.blob.encrypt() + preamble, ciphertext = ciphertext.getvalue().split() + ciphertext = base64.urlsafe_b64decode(ciphertext) + + decryptor = _crypto.BlobDecryptor( + self.doc_info, BytesIO(preamble), + start_stream=False, + secret='A' * 96, + tag=ciphertext[-16:]) + ciphertext = BytesIO(ciphertext[:-16]) + chunk = ciphertext.read(10) + while chunk: + decryptor.write(chunk) + chunk = ciphertext.read(10) + decrypted = decryptor._end_stream() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_blob_decryptor(self): + ciphertext = yield self.blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_unarmored_blob_decryptor(self): + self.blob.armor = False + ciphertext = yield self.blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, + armor=False, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_encrypt_and_decrypt(self): + """ + Check that encrypting and decrypting gives same doc. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + assert encrypted != payload + assert 'raw' in encrypted + doc2 = SoledadDocument('id1', '1') + doc2.set_json(encrypted) + assert _crypto.is_symmetrically_encrypted(encrypted) + decrypted = (yield crypto.decrypt_doc(doc2)).getvalue() + assert len(decrypted) != 0 + assert json.loads(decrypted) == payload + + @defer.inlineCallbacks + def test_decrypt_with_wrong_tag_raises(self): + """ + Trying to decrypt a document with wrong MAC should raise. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + encdict = json.loads(encrypted) + preamble, raw = str(encdict['raw']).split() + preamble = base64.urlsafe_b64decode(preamble) + raw = base64.urlsafe_b64decode(raw) + # mess with tag + messed = raw[:-16] + '0' * 16 + + preamble = base64.urlsafe_b64encode(preamble) + newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) + doc2 = SoledadDocument('id1', '1') + doc2.set_json(json.dumps({"raw": str(newraw)})) + + with pytest.raises(_crypto.InvalidBlob): + yield crypto.decrypt_doc(doc2) + + +class SoledadSecretsTestCase(BaseSoledadTest): + + def test_generated_secrets_have_correct_length(self): + expected = self._soledad.secrets.lengths + for name, length in expected.iteritems(): + secret = getattr(self._soledad.secrets, name) + self.assertEqual(length, len(secret)) + + +class SoledadCryptoAESTestCase(BaseSoledadTest): + + def test_encrypt_decrypt_sym(self): + # generate 256-bit key + key = os.urandom(32) + iv, cyphertext = _crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) + self.assertEqual('data', plaintext) + + def test_decrypt_with_wrong_iv_raises(self): + key = os.urandom(32) + iv, cyphertext = _crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + # get a different iv by changing the first byte + rawiv = binascii.a2b_base64(iv) + wrongiv = rawiv + while wrongiv == rawiv: + wrongiv = os.urandom(1) + rawiv[1:] + with pytest.raises(InvalidTag): + _crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) + + def test_decrypt_with_wrong_key_raises(self): + key = os.urandom(32) + iv, cyphertext = _crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + wrongkey = os.urandom(32) # 256-bits key + # ensure keys are different in case we are extremely lucky + while wrongkey == key: + wrongkey = os.urandom(32) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym(cyphertext, wrongkey, iv) + + +class PreambleTestCase(unittest.TestCase): + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + def setUp(self): + self.cleartext = BytesIO(snowden1) + self.blob = _crypto.BlobEncryptor( + self.doc_info, self.cleartext, + secret='A' * 96) + + def test_preamble_starts_with_magic_signature(self): + preamble = self.blob._encode_preamble() + assert preamble.startswith(_crypto.MAGIC) + + def test_preamble_has_cipher_metadata(self): + preamble = self.blob._encode_preamble() + unpacked = _preamble.PACMAN.unpack(preamble) + encryption_scheme, encryption_method = unpacked[1:3] + assert encryption_scheme in _crypto.ENC_SCHEME + assert encryption_method in _crypto.ENC_METHOD + assert unpacked[4] == self.blob.iv + + def test_preamble_has_document_sync_metadata(self): + preamble = self.blob._encode_preamble() + unpacked = _preamble.PACMAN.unpack(preamble) + doc_id, doc_rev = unpacked[5:7] + assert doc_id == self.doc_info.doc_id + assert doc_rev == self.doc_info.rev + + def test_preamble_has_document_size(self): + preamble = self.blob._encode_preamble() + unpacked = _preamble.PACMAN.unpack(preamble) + size = unpacked[7] + assert size == _crypto._ceiling(len(snowden1)) + + @defer.inlineCallbacks + def test_preamble_can_come_without_size(self): + # XXX: This test case is here only to test backwards compatibility! + preamble = self.blob._encode_preamble() + # repack preamble using legacy format, without doc size + unpacked = _preamble.PACMAN.unpack(preamble) + preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7]) + # encrypt it manually for custom tag + ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv, + self.cleartext.getvalue(), + aead=preamble_without_size) + ciphertext = ciphertext + tag + # encode it + ciphertext = base64.urlsafe_b64encode(ciphertext) + preamble_without_size = base64.urlsafe_b64encode(preamble_without_size) + # decrypt it + ciphertext = preamble_without_size + ' ' + ciphertext + cleartext = yield _crypto.BlobDecryptor( + self.doc_info, BytesIO(ciphertext), + secret='A' * 96).decrypt() + assert cleartext.getvalue() == self.cleartext.getvalue() + warnings = self.flushWarnings() + assert len(warnings) == 1 + assert 'legacy preamble without size' in warnings[0]['message'] + + +def _aes_encrypt(key, iv, data, aead=''): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) + encryptor = cipher.encryptor() + if aead: + encryptor.authenticate_additional_data(aead) + return encryptor.update(data) + encryptor.finalize(), encryptor.tag + + +def _aes_decrypt(key, iv, tag, data, aead=''): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) + decryptor = cipher.decryptor() + if aead: + decryptor.authenticate_additional_data(aead) + return decryptor.update(data) + decryptor.finalize() diff --git a/tests/client/test_deprecated_crypto.py b/tests/client/test_deprecated_crypto.py new file mode 100644 index 00000000..939a2003 --- /dev/null +++ b/tests/client/test_deprecated_crypto.py @@ -0,0 +1,94 @@ +import json +import pytest + +from pytest import inlineCallbacks +from six.moves.urllib.parse import urljoin +from uuid import uuid4 + +from leap.soledad.client import crypto as old_crypto +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common import crypto as common_crypto + +from test_soledad.u1db_tests import simple_doc +from test_soledad.util import SoledadWithCouchServerMixin +from test_soledad.util import make_token_soledad_app +from test_soledad.u1db_tests import TestCaseWithServer + + +def deprecate_client_crypto(client): + secret = client._crypto.secret + _crypto = old_crypto.SoledadCrypto(secret) + setattr(client._dbsyncer, '_crypto', _crypto) + return client + + +def couch_database(couch_url, uuid): + db = CouchDatabase(couch_url, "user-%s" % (uuid,)) + return db + + +class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer): + + def setUp(self): + SoledadWithCouchServerMixin.setUp(self) + TestCaseWithServer.setUp(self) + + def tearDown(self): + SoledadWithCouchServerMixin.tearDown(self) + TestCaseWithServer.tearDown(self) + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + @pytest.mark.needs_couch + @inlineCallbacks + def test_touch_updates_remote_representation(self): + self.startTwistedServer() + user = 'user-' + uuid4().hex + server_url = 'http://%s:%d' % (self.server_address) + client = self._soledad_instance(user=user, server_url=server_url) + deprecated_client = deprecate_client_crypto( + self._soledad_instance(user=user, server_url=server_url)) + + self.make_app() + remote = self.request_state._create_database(replica_uid=client.uuid) + remote = CouchDatabase.open_database( + urljoin(self.couch_url, 'user-' + user), + create=True) + + # ensure remote db is empty + gen, docs = remote.get_all_docs() + assert gen == 0 + assert len(docs) == 0 + + # create a doc with deprecated client and sync + yield deprecated_client.create_doc(json.loads(simple_doc)) + yield deprecated_client.sync() + + # check for doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 1 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert common_crypto.ENC_JSON_KEY in content + assert common_crypto.ENC_SCHEME_KEY in content + assert common_crypto.ENC_METHOD_KEY in content + assert common_crypto.ENC_IV_KEY in content + assert common_crypto.MAC_KEY in content + assert common_crypto.MAC_METHOD_KEY in content + + # "touch" the document with a newer client and synx + _, docs = yield client.get_all_docs() + yield client.put_doc(doc) + yield client.sync() + + # check for newer representation of doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 2 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert len(content) == 1 + assert 'raw' in content diff --git a/tests/client/test_doc.py b/tests/client/test_doc.py new file mode 100644 index 00000000..36479e90 --- /dev/null +++ b/tests/client/test_doc.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# test_soledad_doc.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Test Leap backend bits: soledad docs +""" +import pytest + +from testscenarios import TestWithScenarios + +from test_soledad.u1db_tests import test_document +from test_soledad.util import BaseSoledadTest +from test_soledad.util import make_soledad_document_for_test + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +# ----------------------------------------------------------------------------- + +@pytest.mark.usefixtures('method_tmpdir') +class TestSoledadDocument( + TestWithScenarios, + test_document.TestDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) + + +@pytest.mark.usefixtures('method_tmpdir') +class TestSoledadPyDocument( + TestWithScenarios, + test_document.TestPyDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) diff --git a/tests/client/test_http.py b/tests/client/test_http.py new file mode 100644 index 00000000..47df4b4a --- /dev/null +++ b/tests/client/test_http.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Test Leap backend bits: test http database +""" + +from twisted.trial import unittest + +from leap.soledad.client import auth +from leap.soledad.common.l2db.remote import http_database + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_database`. +# ----------------------------------------------------------------------------- + +class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth): + + """ + Wraps our token auth implementation. + """ + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + +class TestHTTPDatabaseWithCreds(unittest.TestCase): + + def test_get_sync_target_inherits_token_credentials(self): + # this test was from TestDatabaseSimpleOperations but we put it here + # for convenience. + self.db = _HTTPDatabase('dbase') + self.db.set_token_credentials('user-uuid', 'auth-token') + st = self.db.get_sync_target() + self.assertEqual(self.db._creds, st._creds) + + def test_ctr_with_creds(self): + db1 = _HTTPDatabase('http://dbs/db', creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + self.assertIn('token', db1._creds) diff --git a/tests/client/test_https.py b/tests/client/test_https.py new file mode 100644 index 00000000..1b6caed6 --- /dev/null +++ b/tests/client/test_https.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# test_sync_target.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Test Leap backend bits: https +""" +import pytest + +from testscenarios import TestWithScenarios + +from leap.soledad import client + +from leap.soledad.common.l2db.remote import http_client +from test_soledad.u1db_tests import test_backends +from test_soledad.u1db_tests import test_https +from test_soledad.util import ( + BaseSoledadTest, + make_soledad_document_for_test, + make_soledad_app, + make_token_soledad_app, +) + + +LEAP_SCENARIOS = [ + ('http', { + 'make_database_for_test': test_backends.make_http_database_for_test, + 'copy_database_for_test': test_backends.copy_http_database_for_test, + 'make_document_for_test': make_soledad_document_for_test, + 'make_app_with_state': make_soledad_app}), +] + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_https`. +# ----------------------------------------------------------------------------- + +def token_leap_https_sync_target(test, host, path, cert_file=None): + _, port = test.server.server_address + # source_replica_uid = test._soledad._dbpool.replica_uid + creds = {'token': {'uuid': 'user-uuid', 'token': 'auth-token'}} + if not cert_file: + cert_file = test.cacert_pem + st = client.http_target.SoledadHTTPSyncTarget( + 'https://%s:%d/%s' % (host, port, path), + source_replica_uid='other-id', + creds=creds, + crypto=test._soledad._crypto, + cert_file=cert_file) + return st + + +@pytest.mark.skip +class TestSoledadHTTPSyncTargetHttpsSupport( + TestWithScenarios, + # test_https.TestHttpSyncTargetHttpsSupport, + BaseSoledadTest): + + scenarios = [ + ('token_soledad_https', + { + # 'server_def': test_https.https_server_def, + 'make_app_with_state': make_token_soledad_app, + 'make_document_for_test': make_soledad_document_for_test, + 'sync_target': token_leap_https_sync_target}), + ] + + def setUp(self): + # the parent constructor undoes our SSL monkey patch to ensure tests + # run smoothly with standard u1db. + test_https.TestHttpSyncTargetHttpsSupport.setUp(self) + # so here monkey patch again to test our functionality. + api = client.api + http_client._VerifiedHTTPSConnection = api.VerifiedHTTPSConnection + client.api.SOLEDAD_CERT = http_client.CA_CERTS + + def test_cannot_verify_cert(self): + self.startServer() + # don't print expected traceback server-side + self.server.handle_error = lambda req, cli_addr: None + self.request_state._create_database('test') + remote_target = self.getSyncTarget( + 'localhost', 'test', cert_file=http_client.CA_CERTS) + d = remote_target.record_sync_info('other-id', 2, 'T-id') + + def _assert_raises(result): + from twisted.python.failure import Failure + if isinstance(result, Failure): + from OpenSSL.SSL import Error + error = result.value.message[0].value + if isinstance(error, Error): + msg = error.message[0][2] + self.assertEqual("certificate verify failed", msg) + return + self.fail("certificate verification should have failed.") + + d.addCallbacks(_assert_raises, _assert_raises) + return d + + def test_working(self): + """ + Test that SSL connections work well. + + This test was adapted to patch Soledad's HTTPS connection custom class + with the intended CA certificates. + """ + self.startServer() + db = self.request_state._create_database('test') + remote_target = self.getSyncTarget('localhost', 'test') + d = remote_target.record_sync_info('other-id', 2, 'T-id') + d.addCallback(lambda _: + self.assertEqual( + (2, 'T-id'), + db._get_replica_gen_and_trans_id('other-id') + )) + d.addCallback(lambda _: remote_target.close()) + return d + + def test_host_mismatch(self): + """ + This test is disabled because soledad's twisted-based http agent uses + pyOpenSSL, which will complain if we try to use an IP to connect to + the remote host (see the original test in u1db_tests/test_https.py). + """ + pass diff --git a/tests/client/test_incoming_processing_flow.py b/tests/client/test_incoming_processing_flow.py new file mode 100644 index 00000000..7bc1e3c6 --- /dev/null +++ b/tests/client/test_incoming_processing_flow.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +# test_incoming_processing_flow.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Unit tests for incoming box processing flow. +""" +from mock import Mock, call +from leap.soledad.client import interfaces +from leap.soledad.client.incoming import IncomingBoxProcessingLoop +from twisted.internet import defer +from twisted.trial import unittest +from zope.interface import implementer + + +@implementer(interfaces.IIncomingBoxConsumer) +class GoodConsumer(object): + def __init__(self): + self.name = 'GoodConsumer' + self.processed, self.saved = [], [] + + def process(self, item, item_id, encrypted=True): + self.processed.append(item_id) + return defer.succeed([item_id]) + + def save(self, parts, item_id): + self.saved.append(item_id) + return defer.succeed(None) + + +class ProcessingFailedConsumer(GoodConsumer): + def __init__(self): + self.name = 'ProcessingFailedConsumer' + self.processed, self.saved = [], [] + + def process(self, item, item_id, encrypted=True): + return defer.fail() + + +class SavingFailedConsumer(GoodConsumer): + def __init__(self): + self.name = 'SavingFailedConsumer' + self.processed, self.saved = [], [] + + def save(self, parts, item_id): + return defer.fail() + + +class IncomingBoxProcessingTestCase(unittest.TestCase): + + def setUp(self): + self.box = Mock() + self.loop = IncomingBoxProcessingLoop(self.box) + + def _set_pending_items(self, pending): + self.box.list_pending.return_value = defer.succeed(pending) + pending_iter = iter([defer.succeed(item) for item in pending]) + self.box.fetch_for_processing.side_effect = pending_iter + + @defer.inlineCallbacks + def test_processing_flow_reserves_a_message(self): + self._set_pending_items(['one_item']) + self.loop.add_consumer(GoodConsumer()) + yield self.loop() + self.box.fetch_for_processing.assert_called_once_with('one_item') + + @defer.inlineCallbacks + def test_no_consumers(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + yield self.loop() + self.box.fetch_for_processing.assert_not_called() + self.box.delete.assert_not_called() + + @defer.inlineCallbacks + def test_pending_list_with_multiple_items(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + calls = [call('one'), call('two'), call('three')] + self.box.fetch_for_processing.assert_has_calls(calls) + + @defer.inlineCallbacks + def test_good_consumer_process_all(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.assertEquals(items, consumer.processed) + + @defer.inlineCallbacks + def test_good_consumer_saves_all(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.assertEquals(items, consumer.saved) + + @defer.inlineCallbacks + def test_multiple_good_consumers_process_all(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + consumer2 = GoodConsumer() + self.loop.add_consumer(consumer) + self.loop.add_consumer(consumer2) + yield self.loop() + self.assertEquals(items, consumer.processed) + self.assertEquals(items, consumer2.processed) + + @defer.inlineCallbacks + def test_good_consumer_marks_as_processed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_processed.assert_has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_good_consumer_deletes_items(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.delete.assert_has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_processing_failed_doesnt_mark_as_processed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = ProcessingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_processed.assert_not_called() + + @defer.inlineCallbacks + def test_processing_failed_doesnt_delete(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = ProcessingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.delete.assert_not_called() + + @defer.inlineCallbacks + def test_processing_failed_marks_as_failed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = ProcessingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_failed.assert_has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_saving_failed_marks_as_processed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = SavingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_processed.assert_has_calls([call(x) for x in items]) + + @defer.inlineCallbacks + def test_saving_failed_doesnt_delete(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = SavingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.delete.assert_not_called() + + @defer.inlineCallbacks + def test_saving_failed_marks_as_failed(self): + items = ['one', 'two', 'three'] + self._set_pending_items(items) + consumer = SavingFailedConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.box.set_failed.assert_has_calls([call(x) for x in items]) diff --git a/tests/client/test_recovery_code.py b/tests/client/test_recovery_code.py new file mode 100644 index 00000000..7bbccc41 --- /dev/null +++ b/tests/client/test_recovery_code.py @@ -0,0 +1,38 @@ +# -*- CODing: utf-8 -*- +# test_recovery_code.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for recovery code generation. +""" +import binascii + +from mock import patch +from twisted.trial import unittest +from leap.soledad.client._recovery_code import RecoveryCode + + +class RecoveryCodeTestCase(unittest.TestCase): + + @patch('leap.soledad.client._recovery_code.os.urandom') + def test_generate_recovery_code(self, mock_os_urandom): + generated_random_code = '123456' + mock_os_urandom.return_value = generated_random_code + recovery_code = RecoveryCode() + + code = recovery_code.generate() + + mock_os_urandom.assert_called_with(RecoveryCode.code_length) + self.assertEqual(binascii.hexlify(generated_random_code), code) diff --git a/tests/client/test_secrets.py b/tests/client/test_secrets.py new file mode 100644 index 00000000..7b643cb4 --- /dev/null +++ b/tests/client/test_secrets.py @@ -0,0 +1,166 @@ +# -*- CODing: utf-8 -*- +# test_secrets.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for secrets encryption and decryption. +""" +import scrypt + +from twisted.trial import unittest + +from leap.soledad.client._crypto import ENC_METHOD +from leap.soledad.client._secrets import SecretsCrypto + + +class SecretsCryptoTestCase(unittest.TestCase): + + SECRETS = { + 'remote_secret': 'a' * 512, + 'local_salt': 'b' * 64, + 'local_secret': 'c' * 448 + } + ENCRYPTED_V2 = { + 'cipher': 2, + 'length': 1437, + 'kdf_salt': 'TSgNLeAGFeITeSgNzmYZHh+mzmkZPOqao7CAV/tx3KZCLwsrT0HmWtVK3' + 'TyWHWNgVdeamMZYRuvZavE2sp0DGw==\n', + 'iv': 'TKZQKIlRgdnXFhJf08qswg==', + 'secrets': 'ZNZRi72VDtwZqyuU+uf3yzZt23vCtMS3Ki2bnZyeHUOSGVweJeDadF4oqE' + 'BW87NN00j9E49BzyzLr9SNgwZjPp0wlUm7kt+s8EUfJUdH8nxaQ+9iqGXM' + 'cCHmBM8L8DRN2m3BrPGx7m+QGlN9sbrRpl7fqc46RWcYuTEpm4upjdtI7O' + 'jDd0JG3C0rUzIuKJn9w4rEpX3tLEKXVdZfLvRXS5roR0cauazsDO69E13q' + 'a01vDuY+UJ+buLQ3FluPnnk8QE7ztPVUmRJJ76yAIhjVX9owiwlp9GnUJY' + 'sETRCqdRSTwUcHIkzVR0zAvtxTX7eGTitzf4gCYEC4T9v5N/jHxEfPdx28' + 'MM4KShWN2nFxNFQLQUpMN2OrM7UyUw+DQ3ydqBeBPKPHRN5s05kIK7P/Ra' + 'aLNcrJWa7DopLbgLlei0Jd7S4sjv1ufaRY7v0qJaVkhh/VaCylTSVw1rv5' + 'YzSWcHHcLuC0R8xLadz6T+EpsVYxgPYCS7w5xoE82zwNQzw/EBxLIcyLPl' + 'ipKnr2dttrmm3KXUOT1IdbSbI5elF6yQTAusdqiXuypey+MDqHYWEYWkCn' + 'e9/uGM9FjklDLE0RtPEDxhq64tw6u2Xu7RzDzyQDI8EIoTdU+4zEMTnelZ' + 'fKEwdG58EDxTXfUk6IDcRUupz3YuToSMhIOkqgXnbWl/nrK0O9v4JMhQjI' + 'r+oPICYfFr14kvJXBsfntILTJCxzbqTQcNba3jc8rGqCZ6gM0u4PndwTG2' + 'UiCqPU2HMnWvVGQOXeLdQY+EqqXQiRDi0DrDmkVwFf+27dPXxmZ43C48W3' + 'lMhTKXl0rdBFnOD5jiMh0X6q/KYXonyEtMZMsjT7dFePcCy4wQRhuut+ac' + '/TJWyrr+/IB45E+LZbhV7xCy1dYsbdb52jTRJFpaQ83sj6Iv6SYdiqqXzL' + 'F5JGMyuovTjwAoIIQzpLv36xY2wGGAH1V8c7QmDR2qumXrHD9R68WjBoSY' + '7IFM0TFAGZNun56y/zQ4r8yOMSAId+j4kuRH0fENEi0FJ+SpmSdHfpvBhE' + 'MdGh927E9enEYWmUQMmkxXIw6E+O3cmOWt2hsMbUAikDCpQOnVP2BD55HT' + '6FfbW7ITVwPINHYexmy2Xcm8H5zzGFSp+uYIyPBYDKA+VJ+QQI8bud9K+T' + 'NBybUv9u6LbB6BsLpwLoxMPJu0WsN2HpmLYgrg2ML1huMF1OtaGRuUr2PL' + 'NBaZaL6VOztYrVtQG1+tNyRxn8XQTtx0l6n+EihGVe9Sk5XF6DJA9ZN7uO' + 'svTUFJ5qG3Erf4AmbUJWoOR/NvapBtifidM7gPZZ6NqBs6v72rU1pGy+p7' + 'o84KrmB2MNf3yJ0BvKxPvFmltF3Dc7LB5TN8ycbmFM6hgrLvvhPxiHEnG/' + '8Qcrg0nUXOipFGNgZEU7t7Mz6RJ189Z2Kx1HVGrkAzEgqwZYqijAPlsgzO' + 'bg6DwzwC7stolQWGCDQUtJVlE8FZ/Up8zFYYZKn52WzjmSN4/hHhEvdkck' + 'Nez/JVev6fMcVrgdrTZ+uCwxjN/4xPdgog2HV470ea1bvIkSNOOrhm194M' + '40GmvmBhiSSMjdRQCQtM0t9bUuSQLPDzEiCA9QaLyygtlz9uRR/dXgkEg0' + 'J4YNpZvhE0wbyp4GHytbPaAmrcd7im9+buTuMwhXpZl0stmfkJxVHJSZ8Y' + 'IHakHs3W1fdYyI3wxGpel/9eYO3ISukolwrHXESP65wVNKfBwbqVJzQmts' + 'pyDBOI6DcLKZfE1EVg0+uwQ/5PKZbn0TwlXO1YE3NL3mAply3zQR9hyBrY' + '6f1jkHVD3irIlWkSiPJsP8sW+nrK8c/Ha8F+dua6DTZmg594OIaQj8mPiY' + 'GcIusiARWocR5/MmSjupGOgFx4HtmckTJtAta3XP4elOx04teH/P9Cgr1x' + 'XYf+cEX6gp92L9rTo0FCz3Hw==\n', + 'version': 2, + 'kdf': 'scrypt', + 'kdf_length': 32 + } + + ENCRYPTED_V1 = { + 'version': 1, + 'active_secret': 'secret_id', + 'storage_secrets': { + 'secret_id': { + 'kdf': 'scrypt', + 'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h' + 'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F' + 'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh' + 'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp' + 'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR' + 'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050' + 'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc' + 'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6' + 'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9' + 'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC' + 'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF' + 'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa' + 'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw' + 'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC' + 'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu' + 'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/' + 'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS' + 'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v' + 'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj' + 'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV' + 'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk' + 'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV' + 'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8' + 'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ' + '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx' + 'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4' + 'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw' + '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==', + 'kdf_length': 32, + 'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB' + 'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n', + 'length': 1024, + 'cipher': 'aes256', + } + } + } + + def setUp(self): + class Soledad(object): + passphrase = '123' + soledad = Soledad() + self._crypto = SecretsCrypto(soledad) + + def test__get_key(self): + salt = 'abc' + expected = scrypt.hash('123', salt, buflen=32) + key = self._crypto._get_key(salt) + self.assertEqual(expected, key) + + def test_encrypt(self): + info = self._crypto.encrypt(self.SECRETS) + self.assertEqual(8, len(info)) + for key, value in [ + ('kdf', 'scrypt'), + ('kdf_salt', None), + ('kdf_length', None), + ('cipher', ENC_METHOD.aes_256_gcm), + ('length', None), + ('iv', None), + ('secrets', None), + ('version', 2)]: + self.assertTrue(key in info) + if value: + self.assertEqual(info[key], value) + + def test__decrypt_v2(self): + encrypted = self.ENCRYPTED_V2 + decrypted = self._crypto.decrypt(encrypted) + self.assertEqual(decrypted, self.SECRETS) + + def test__decrypt_v1(self): + encrypted = self.ENCRYPTED_V1 + decrypted = self._crypto.decrypt(encrypted) + self.assertEqual(decrypted, self.SECRETS) + + def test__no_version_defaults_to_v1(self): + encrypted = dict(self.ENCRYPTED_V1) + del encrypted['version'] + decrypted = self._crypto.decrypt(encrypted) + self.assertEqual(decrypted, self.SECRETS) + self.assertEqual(encrypted['version'], 1) diff --git a/tests/client/test_shared_db.py b/tests/client/test_shared_db.py new file mode 100644 index 00000000..b045e524 --- /dev/null +++ b/tests/client/test_shared_db.py @@ -0,0 +1,40 @@ +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client.shared_db import SoledadSharedDatabase + +from test_soledad.util import BaseSoledadTest + + +class SoledadSharedDBTestCase(BaseSoledadTest): + + """ + These tests ensure the functionalities of the shared recovery database. + """ + + def setUp(self): + BaseSoledadTest.setUp(self) + self._shared_db = SoledadSharedDatabase( + 'https://provider/', document_factory=SoledadDocument, + creds=None) + + def tearDown(self): + BaseSoledadTest.tearDown(self) + + def test__get_remote_doc(self): + """ + Ensure the shared db is queried with the correct doc_id. + """ + doc_id = self._soledad.secrets.storage._remote_doc_id() + self._soledad.secrets.storage._get_remote_doc() + self._soledad.secrets.storage._shared_db.get_doc.assert_called_with( + doc_id) + + def test_save_remote(self): + """ + Ensure recovery document is put into shared recover db. + """ + doc_id = self._soledad.secrets.storage._remote_doc_id() + storage = self._soledad.secrets.storage + storage.save_remote({'content': 'blah'}) + storage._shared_db.get_doc.assert_called_with(doc_id) + storage._shared_db.put_doc.assert_called_with(self._doc_put) + self.assertTrue(self._doc_put.doc_id == doc_id) diff --git a/tests/client/test_signals.py b/tests/client/test_signals.py new file mode 100644 index 00000000..c7609a74 --- /dev/null +++ b/tests/client/test_signals.py @@ -0,0 +1,149 @@ +from mock import Mock +from twisted.internet import defer + +from leap import soledad +from leap.common.events import catalog +from leap.soledad.common.document import SoledadDocument + +from test_soledad.util import ADDRESS +from test_soledad.util import BaseSoledadTest + + +class SoledadSignalingTestCase(BaseSoledadTest): + + """ + These tests ensure signals are correctly emmited by Soledad. + """ + + EVENTS_SERVER_PORT = 8090 + + def setUp(self): + # mock signaling + soledad.client.signal = Mock() + soledad.client._secrets.util.events.emit_async = Mock() + # run parent's setUp + BaseSoledadTest.setUp(self) + + def tearDown(self): + BaseSoledadTest.tearDown(self) + + def _pop_mock_call(self, mocked): + mocked.call_args_list.pop() + mocked.mock_calls.pop() + mocked.call_args = mocked.call_args_list[-1] + + def test_stage3_bootstrap_signals(self): + """ + Test that a fresh soledad emits all bootstrap signals. + + Signals are: + - downloading keys / done downloading keys. + - creating keys / done creating keys. + - downloading keys / done downloading keys. + - uploading keys / done uploading keys. + """ + soledad.client._secrets.util.events.emit_async.reset_mock() + # get a fresh instance so it emits all bootstrap signals + sol = self._soledad_instance( + secrets_path='alternative_stage3.json', + local_db_path='alternative_stage3.u1db') + # reverse call order so we can verify in the order the signals were + # expected + soledad.client._secrets.util.events.emit_async.mock_calls.reverse() + soledad.client._secrets.util.events.emit_async.call_args = \ + soledad.client._secrets.util.events.emit_async.call_args_list[0] + soledad.client._secrets.util.events.emit_async.call_args_list.reverse() + + user_data = {'userid': ADDRESS, 'uuid': ADDRESS} + + def _assert(*args, **kwargs): + mocked = soledad.client._secrets.util.events.emit_async + mocked.assert_called_with(*args) + pop = kwargs.get('pop') + if pop or pop is None: + self._pop_mock_call(mocked) + + _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) + _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) + _assert(catalog.SOLEDAD_CREATING_KEYS, user_data) + _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data) + _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data) + _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) + _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) + _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False) + + sol.close() + + def test_stage2_bootstrap_signals(self): + """ + Test that if there are keys in server, soledad will download them and + emit corresponding signals. + """ + # get existing instance so we have access to keys + sol = self._soledad_instance() + # create a document with secrets + doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id()) + doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets) + sol.close() + # reset mock + soledad.client._secrets.util.events.emit_async.reset_mock() + # get a fresh instance so it emits all bootstrap signals + shared_db = self.get_default_shared_mock(get_doc_return_value=doc) + sol = self._soledad_instance( + secrets_path='alternative_stage2.json', + local_db_path='alternative_stage2.u1db', + shared_db_class=shared_db) + # reverse call order so we can verify in the order the signals were + # expected + mocked = soledad.client._secrets.util.events.emit_async + mocked.mock_calls.reverse() + mocked.call_args = mocked.call_args_list[0] + mocked.call_args_list.reverse() + + def _assert(*args, **kwargs): + mocked = soledad.client._secrets.util.events.emit_async + mocked.assert_called_with(*args) + pop = kwargs.get('pop') + if pop or pop is None: + self._pop_mock_call(mocked) + + # assert download keys signals + user_data = {'userid': ADDRESS, 'uuid': ADDRESS} + _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data) + _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False) + + sol.close() + + def test_stage1_bootstrap_signals(self): + """ + Test that if soledad already has a local secret, it emits no signals. + """ + soledad.client.signal.reset_mock() + # get an existent instance so it emits only some of bootstrap signals + sol = self._soledad_instance() + self.assertEqual([], soledad.client.signal.mock_calls) + sol.close() + + @defer.inlineCallbacks + def test_sync_signals(self): + """ + Test Soledad emits SOLEDAD_CREATING_KEYS signal. + """ + # get a fresh instance so it emits all bootstrap signals + sol = self._soledad_instance() + soledad.client.signal.reset_mock() + + # mock the actual db sync so soledad does not try to connect to the + # server + d = defer.Deferred() + d.callback(None) + sol._dbsyncer.sync = Mock(return_value=d) + + yield sol.sync() + + # assert the signal has been emitted + soledad.client.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_DATA_SYNC, + {'userid': ADDRESS, 'uuid': ADDRESS}, + ) + sol.close() diff --git a/tests/client/test_soledad b/tests/client/test_soledad new file mode 120000 index 00000000..c1a35d32 --- /dev/null +++ b/tests/client/test_soledad @@ -0,0 +1 @@ +../test_soledad \ No newline at end of file diff --git a/tests/client/test_soledad_doc.py b/tests/client/test_soledad_doc.py new file mode 100644 index 00000000..e158d768 --- /dev/null +++ b/tests/client/test_soledad_doc.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# test_soledad_doc.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Test Leap backend bits: soledad docs +""" +from testscenarios import TestWithScenarios + +from test_soledad.u1db_tests import test_document +from test_soledad.util import BaseSoledadTest +from test_soledad.util import make_soledad_document_for_test + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +# ----------------------------------------------------------------------------- + +class TestSoledadDocument( + TestWithScenarios, + test_document.TestDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) + + +class TestSoledadPyDocument( + TestWithScenarios, + test_document.TestPyDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) -- cgit v1.2.3