summaryrefslogtreecommitdiff
path: root/tests/client
diff options
context:
space:
mode:
Diffstat (limited to 'tests/client')
-rw-r--r--tests/client/__init__.py0
-rw-r--r--tests/client/test_api.py36
-rw-r--r--tests/client/test_app.py52
-rw-r--r--tests/client/test_attachments.py81
-rw-r--r--tests/client/test_aux_methods.py132
-rw-r--r--tests/client/test_crypto.py384
-rw-r--r--tests/client/test_deprecated_crypto.py94
-rw-r--r--tests/client/test_doc.py50
-rw-r--r--tests/client/test_http.py60
-rw-r--r--tests/client/test_https.py137
-rw-r--r--tests/client/test_incoming_processing_flow.py197
-rw-r--r--tests/client/test_recovery_code.py38
-rw-r--r--tests/client/test_secrets.py166
-rw-r--r--tests/client/test_shared_db.py40
-rw-r--r--tests/client/test_signals.py149
l---------tests/client/test_soledad1
-rw-r--r--tests/client/test_soledad_doc.py46
17 files changed, 1663 insertions, 0 deletions
diff --git a/tests/client/__init__.py b/tests/client/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/client/__init__.py
diff --git a/tests/client/test_api.py b/tests/client/test_api.py
new file mode 100644
index 00000000..3c6a8155
--- /dev/null
+++ b/tests/client/test_api.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+# test_api.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for soledad api.
+"""
+
+from mock import MagicMock
+
+from test_soledad.util import BaseSoledadTest
+
+
+class ApiTestCase(BaseSoledadTest):
+
+ def test_recovery_code_creation(self):
+ recovery_code_mock = MagicMock()
+ generated_code = '4645a2f8997e5d0d'
+ recovery_code_mock.generate.return_value = generated_code
+ self._soledad._recovery_code = recovery_code_mock
+
+ code = self._soledad.create_recovery_code()
+
+ self.assertEqual(generated_code, code)
diff --git a/tests/client/test_app.py b/tests/client/test_app.py
new file mode 100644
index 00000000..6867473e
--- /dev/null
+++ b/tests/client/test_app.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# test_soledad_app.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test ObjectStore and Couch backend bits.
+"""
+import pytest
+
+from testscenarios import TestWithScenarios
+
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_token_http_database_for_test
+from test_soledad.util import copy_token_http_database_for_test
+from test_soledad.u1db_tests import test_backends
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_backends`.
+# -----------------------------------------------------------------------------
+
+@pytest.mark.usefixtures('method_tmpdir')
+class SoledadTests(
+ TestWithScenarios, test_backends.AllDatabaseTests, BaseSoledadTest):
+
+ def setUp(self):
+ TestWithScenarios.setUp(self)
+ test_backends.AllDatabaseTests.setUp(self)
+ BaseSoledadTest.setUp(self)
+
+ scenarios = [
+ ('token_http', {
+ 'make_database_for_test': make_token_http_database_for_test,
+ 'copy_database_for_test': copy_token_http_database_for_test,
+ 'make_document_for_test': make_soledad_document_for_test,
+ 'make_app_with_state': make_token_soledad_app,
+ })
+ ]
diff --git a/tests/client/test_attachments.py b/tests/client/test_attachments.py
new file mode 100644
index 00000000..2df5b90d
--- /dev/null
+++ b/tests/client/test_attachments.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+# test_attachments.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for document attachments.
+"""
+
+import pytest
+
+from io import BytesIO
+from mock import Mock
+
+from twisted.internet import defer
+from test_soledad.util import BaseSoledadTest
+
+
+from leap.soledad.client import AttachmentStates
+
+
+def mock_response(doc):
+ doc._manager._client.get = Mock(
+ return_value=defer.succeed(Mock(code=200, json=lambda: [])))
+ doc._manager._client.put = Mock(
+ return_value=defer.succeed(Mock(code=200)))
+
+
+@pytest.mark.usefixture('method_tmpdir')
+class AttachmentTests(BaseSoledadTest):
+
+ @defer.inlineCallbacks
+ def test_create_doc_saves_store(self):
+ doc = yield self._soledad.create_doc({})
+ self.assertEqual(self._soledad, doc.store)
+
+ @defer.inlineCallbacks
+ def test_put_attachment(self):
+ doc = yield self._soledad.create_doc({})
+ mock_response(doc)
+ yield doc.put_attachment(BytesIO('test'))
+ local_list = yield doc._manager.local_list()
+ self.assertIn(doc._blob_id, local_list)
+
+ @defer.inlineCallbacks
+ def test_get_attachment(self):
+ doc = yield self._soledad.create_doc({})
+ mock_response(doc)
+ yield doc.put_attachment(BytesIO('test'))
+ fd = yield doc.get_attachment()
+ self.assertEqual('test', fd.read())
+
+ @defer.inlineCallbacks
+ def test_get_attachment_state(self):
+ doc = yield self._soledad.create_doc({})
+ state = yield doc.get_attachment_state()
+ self.assertEqual(AttachmentStates.NONE, state)
+ mock_response(doc)
+ yield doc.put_attachment(BytesIO('test'))
+ state = yield doc.get_attachment_state()
+ self.assertEqual(AttachmentStates.LOCAL, state)
+
+ @defer.inlineCallbacks
+ def test_is_dirty(self):
+ doc = yield self._soledad.create_doc({})
+ dirty = yield doc.is_dirty()
+ self.assertFalse(dirty)
+ doc.content = {'test': True}
+ dirty = yield doc.is_dirty()
+ self.assertTrue(dirty)
diff --git a/tests/client/test_aux_methods.py b/tests/client/test_aux_methods.py
new file mode 100644
index 00000000..1eb676c7
--- /dev/null
+++ b/tests/client/test_aux_methods.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+# test_soledad.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for general Soledad functionality.
+"""
+import os
+
+from pytest import inlineCallbacks
+
+from leap.soledad.client import Soledad
+from leap.soledad.client._db.adbapi import U1DBConnectionPool
+from leap.soledad.client._secrets.util import SecretsError
+
+from test_soledad.util import BaseSoledadTest
+
+
+class AuxMethodsTestCase(BaseSoledadTest):
+
+ def test__init_dirs(self):
+ sol = self._soledad_instance(prefix='_init_dirs')
+ local_db_dir = os.path.dirname(sol.local_db_path)
+ secrets_path = os.path.dirname(sol.secrets_path)
+ self.assertTrue(os.path.isdir(local_db_dir))
+ self.assertTrue(os.path.isdir(secrets_path))
+
+ def _close_soledad(results):
+ sol.close()
+
+ d = sol.create_doc({})
+ d.addCallback(_close_soledad)
+ return d
+
+ def test__init_u1db_sqlcipher_backend(self):
+ sol = self._soledad_instance(prefix='_init_db')
+ self.assertIsInstance(sol._dbpool, U1DBConnectionPool)
+ self.assertTrue(os.path.isfile(sol.local_db_path))
+ sol.close()
+
+ def test__init_config_with_defaults(self):
+ """
+ Test if configuration defaults point to the correct place.
+ """
+
+ class SoledadMock(Soledad):
+
+ def __init__(self):
+ pass
+
+ # instantiate without initializing so we just test
+ # _init_config_with_defaults()
+ sol = SoledadMock()
+ sol.passphrase = u''
+ sol.server_url = ''
+ sol._init_config_with_defaults()
+ # assert value of local_db_path
+ self.assertEquals(
+ os.path.join(sol.default_prefix, 'soledad.u1db'),
+ sol.local_db_path)
+
+ def test__init_config_from_params(self):
+ """
+ Test if configuration is correctly read from file.
+ """
+ sol = self._soledad_instance(
+ 'leap@leap.se',
+ passphrase=u'123',
+ secrets_path='value_3',
+ local_db_path='value_2',
+ server_url='value_1',
+ cert_file=None)
+ self.assertEqual(
+ os.path.join(self.tempdir, 'value_3'),
+ sol.secrets_path)
+ self.assertEqual(
+ os.path.join(self.tempdir, 'value_2'),
+ sol.local_db_path)
+ self.assertEqual('value_1', sol.server_url)
+ sol.close()
+
+ @inlineCallbacks
+ def test_change_passphrase(self):
+ """
+ Test if passphrase can be changed.
+ """
+ prefix = '_change_passphrase'
+ sol = self._soledad_instance(
+ 'leap@leap.se',
+ passphrase=u'123',
+ prefix=prefix,
+ )
+
+ doc1 = yield sol.create_doc({'simple': 'doc'})
+ sol.change_passphrase(u'654321')
+ sol.close()
+
+ with self.assertRaises(SecretsError):
+ self._soledad_instance(
+ 'leap@leap.se',
+ passphrase=u'123',
+ prefix=prefix)
+
+ sol2 = self._soledad_instance(
+ 'leap@leap.se',
+ passphrase=u'654321',
+ prefix=prefix)
+ doc2 = yield sol2.get_doc(doc1.doc_id)
+
+ self.assertEqual(doc1, doc2)
+
+ sol2.close()
+
+ def test_get_passphrase(self):
+ """
+ Assert passphrase getter works fine.
+ """
+ sol = self._soledad_instance()
+ self.assertEqual('123', sol.passphrase)
+ sol.close()
diff --git a/tests/client/test_crypto.py b/tests/client/test_crypto.py
new file mode 100644
index 00000000..5b647b73
--- /dev/null
+++ b/tests/client/test_crypto.py
@@ -0,0 +1,384 @@
+# -*- coding: utf-8 -*-
+# test_crypto.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for cryptographic related stuff.
+"""
+import binascii
+import base64
+import json
+import os
+
+from io import BytesIO
+
+import pytest
+
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+from cryptography.exceptions import InvalidTag
+
+from leap.soledad.common.document import SoledadDocument
+from test_soledad.util import BaseSoledadTest
+from leap.soledad.client import _crypto
+from leap.soledad.client import _scrypt
+from leap.soledad.common.blobs import preamble as _preamble
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+
+snowden1 = (
+ "You can't come up against "
+ "the world's most powerful intelligence "
+ "agencies and not accept the risk. "
+ "If they want to get you, over time "
+ "they will.")
+
+
+class ScryptTest(unittest.TestCase):
+
+ def test_scrypt(self):
+ secret = 'supersikret'
+ salt = 'randomsalt'
+ key = _scrypt.hash(secret, salt, buflen=32)
+ expected = ('47996b569ea58d51ccbcc318d710'
+ 'a537acd28bb7a94615ab8d061d4b2a920f01')
+ assert binascii.b2a_hex(key) == expected
+
+
+class AESTest(unittest.TestCase):
+
+ def test_chunked_encryption(self):
+ key = 'A' * 32
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, _buffer=fd)
+ iv = aes.iv
+
+ data = snowden1
+ block = 16
+
+ for i in range(len(data) / block):
+ chunk = data[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ ciphertext_chunked = fd.getvalue()
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ assert ciphertext_chunked == ciphertext
+
+ def test_decrypt(self):
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ data = snowden1
+ block = 16
+
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, iv, fd, tag=tag)
+
+ for i in range(len(ciphertext) / block):
+ chunk = ciphertext[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ cleartext_chunked = fd.getvalue()
+ assert cleartext_chunked == data
+
+
+class BlobTestCase(unittest.TestCase):
+
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ def setUp(self):
+ self.inf = BytesIO(snowden1)
+ self.blob = _crypto.BlobEncryptor(
+ self.doc_info, self.inf,
+ armor=True,
+ secret='A' * 96)
+
+ @defer.inlineCallbacks
+ def test_unarmored_blob_encrypt(self):
+ self.blob.armor = False
+ encrypted = yield self.blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, encrypted, armor=False,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted.getvalue() == snowden1
+
+ @defer.inlineCallbacks
+ def test_default_armored_blob_encrypt(self):
+ encrypted = yield self.blob.encrypt()
+ decode = base64.urlsafe_b64decode
+ assert map(decode, encrypted.getvalue().split())
+
+ @defer.inlineCallbacks
+ def test_blob_encryptor(self):
+ encrypted = yield self.blob.encrypt()
+ preamble, ciphertext = encrypted.getvalue().split()
+ preamble = base64.urlsafe_b64decode(preamble)
+ ciphertext = base64.urlsafe_b64decode(ciphertext)
+ ciphertext = ciphertext[:-16]
+
+ assert len(preamble) == _preamble.PACMAN.size
+ unpacked_data = _preamble.PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data
+ assert magic == _crypto.MAGIC
+ assert sch == 1
+ assert meth == _crypto.ENC_METHOD.aes_256_gcm
+ assert iv == self.blob.iv
+ assert doc_id == 'D-deadbeef'
+ assert rev == self.doc_info.rev
+
+ aes_key = _crypto._get_sym_key_for_doc(
+ self.doc_info.doc_id, 'A' * 96)
+ assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0]
+
+ decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag,
+ ciphertext, preamble)
+ assert str(decrypted) == snowden1
+
+ @defer.inlineCallbacks
+ def test_init_with_preamble_alone(self):
+ ciphertext = yield self.blob.encrypt()
+ preamble = ciphertext.getvalue().split()[0]
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, BytesIO(preamble),
+ start_stream=False,
+ secret='A' * 96)
+ assert decryptor._consume_preamble()
+
+ @defer.inlineCallbacks
+ def test_incremental_blob_decryptor(self):
+ ciphertext = yield self.blob.encrypt()
+ preamble, ciphertext = ciphertext.getvalue().split()
+ ciphertext = base64.urlsafe_b64decode(ciphertext)
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, BytesIO(preamble),
+ start_stream=False,
+ secret='A' * 96,
+ tag=ciphertext[-16:])
+ ciphertext = BytesIO(ciphertext[:-16])
+ chunk = ciphertext.read(10)
+ while chunk:
+ decryptor.write(chunk)
+ chunk = ciphertext.read(10)
+ decrypted = decryptor._end_stream()
+ assert decrypted.getvalue() == snowden1
+
+ @defer.inlineCallbacks
+ def test_blob_decryptor(self):
+ ciphertext = yield self.blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, ciphertext,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted.getvalue() == snowden1
+
+ @defer.inlineCallbacks
+ def test_unarmored_blob_decryptor(self):
+ self.blob.armor = False
+ ciphertext = yield self.blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, ciphertext,
+ armor=False,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted.getvalue() == snowden1
+
+ @defer.inlineCallbacks
+ def test_encrypt_and_decrypt(self):
+ """
+ Check that encrypting and decrypting gives same doc.
+ """
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ assert encrypted != payload
+ assert 'raw' in encrypted
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(encrypted)
+ assert _crypto.is_symmetrically_encrypted(encrypted)
+ decrypted = (yield crypto.decrypt_doc(doc2)).getvalue()
+ assert len(decrypted) != 0
+ assert json.loads(decrypted) == payload
+
+ @defer.inlineCallbacks
+ def test_decrypt_with_wrong_tag_raises(self):
+ """
+ Trying to decrypt a document with wrong MAC should raise.
+ """
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ encdict = json.loads(encrypted)
+ preamble, raw = str(encdict['raw']).split()
+ preamble = base64.urlsafe_b64decode(preamble)
+ raw = base64.urlsafe_b64decode(raw)
+ # mess with tag
+ messed = raw[:-16] + '0' * 16
+
+ preamble = base64.urlsafe_b64encode(preamble)
+ newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed))
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(json.dumps({"raw": str(newraw)}))
+
+ with pytest.raises(_crypto.InvalidBlob):
+ yield crypto.decrypt_doc(doc2)
+
+
+class SoledadSecretsTestCase(BaseSoledadTest):
+
+ def test_generated_secrets_have_correct_length(self):
+ expected = self._soledad.secrets.lengths
+ for name, length in expected.iteritems():
+ secret = getattr(self._soledad.secrets, name)
+ self.assertEqual(length, len(secret))
+
+
+class SoledadCryptoAESTestCase(BaseSoledadTest):
+
+ def test_encrypt_decrypt_sym(self):
+ # generate 256-bit key
+ key = os.urandom(32)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != 'data')
+ plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
+ self.assertEqual('data', plaintext)
+
+ def test_decrypt_with_wrong_iv_raises(self):
+ key = os.urandom(32)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != 'data')
+ # get a different iv by changing the first byte
+ rawiv = binascii.a2b_base64(iv)
+ wrongiv = rawiv
+ while wrongiv == rawiv:
+ wrongiv = os.urandom(1) + rawiv[1:]
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(
+ cyphertext, key, iv=binascii.b2a_base64(wrongiv))
+
+ def test_decrypt_with_wrong_key_raises(self):
+ key = os.urandom(32)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != 'data')
+ wrongkey = os.urandom(32) # 256-bits key
+ # ensure keys are different in case we are extremely lucky
+ while wrongkey == key:
+ wrongkey = os.urandom(32)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(cyphertext, wrongkey, iv)
+
+
+class PreambleTestCase(unittest.TestCase):
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ def setUp(self):
+ self.cleartext = BytesIO(snowden1)
+ self.blob = _crypto.BlobEncryptor(
+ self.doc_info, self.cleartext,
+ secret='A' * 96)
+
+ def test_preamble_starts_with_magic_signature(self):
+ preamble = self.blob._encode_preamble()
+ assert preamble.startswith(_crypto.MAGIC)
+
+ def test_preamble_has_cipher_metadata(self):
+ preamble = self.blob._encode_preamble()
+ unpacked = _preamble.PACMAN.unpack(preamble)
+ encryption_scheme, encryption_method = unpacked[1:3]
+ assert encryption_scheme in _crypto.ENC_SCHEME
+ assert encryption_method in _crypto.ENC_METHOD
+ assert unpacked[4] == self.blob.iv
+
+ def test_preamble_has_document_sync_metadata(self):
+ preamble = self.blob._encode_preamble()
+ unpacked = _preamble.PACMAN.unpack(preamble)
+ doc_id, doc_rev = unpacked[5:7]
+ assert doc_id == self.doc_info.doc_id
+ assert doc_rev == self.doc_info.rev
+
+ def test_preamble_has_document_size(self):
+ preamble = self.blob._encode_preamble()
+ unpacked = _preamble.PACMAN.unpack(preamble)
+ size = unpacked[7]
+ assert size == _crypto._ceiling(len(snowden1))
+
+ @defer.inlineCallbacks
+ def test_preamble_can_come_without_size(self):
+ # XXX: This test case is here only to test backwards compatibility!
+ preamble = self.blob._encode_preamble()
+ # repack preamble using legacy format, without doc size
+ unpacked = _preamble.PACMAN.unpack(preamble)
+ preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7])
+ # encrypt it manually for custom tag
+ ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv,
+ self.cleartext.getvalue(),
+ aead=preamble_without_size)
+ ciphertext = ciphertext + tag
+ # encode it
+ ciphertext = base64.urlsafe_b64encode(ciphertext)
+ preamble_without_size = base64.urlsafe_b64encode(preamble_without_size)
+ # decrypt it
+ ciphertext = preamble_without_size + ' ' + ciphertext
+ cleartext = yield _crypto.BlobDecryptor(
+ self.doc_info, BytesIO(ciphertext),
+ secret='A' * 96).decrypt()
+ assert cleartext.getvalue() == self.cleartext.getvalue()
+ warnings = self.flushWarnings()
+ assert len(warnings) == 1
+ assert 'legacy preamble without size' in warnings[0]['message']
+
+
+def _aes_encrypt(key, iv, data, aead=''):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)
+ encryptor = cipher.encryptor()
+ if aead:
+ encryptor.authenticate_additional_data(aead)
+ return encryptor.update(data) + encryptor.finalize(), encryptor.tag
+
+
+def _aes_decrypt(key, iv, tag, data, aead=''):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
+ decryptor = cipher.decryptor()
+ if aead:
+ decryptor.authenticate_additional_data(aead)
+ return decryptor.update(data) + decryptor.finalize()
diff --git a/tests/client/test_deprecated_crypto.py b/tests/client/test_deprecated_crypto.py
new file mode 100644
index 00000000..939a2003
--- /dev/null
+++ b/tests/client/test_deprecated_crypto.py
@@ -0,0 +1,94 @@
+import json
+import pytest
+
+from pytest import inlineCallbacks
+from six.moves.urllib.parse import urljoin
+from uuid import uuid4
+
+from leap.soledad.client import crypto as old_crypto
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.common import crypto as common_crypto
+
+from test_soledad.u1db_tests import simple_doc
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import make_token_soledad_app
+from test_soledad.u1db_tests import TestCaseWithServer
+
+
+def deprecate_client_crypto(client):
+ secret = client._crypto.secret
+ _crypto = old_crypto.SoledadCrypto(secret)
+ setattr(client._dbsyncer, '_crypto', _crypto)
+ return client
+
+
+def couch_database(couch_url, uuid):
+ db = CouchDatabase(couch_url, "user-%s" % (uuid,))
+ return db
+
+
+class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ TestCaseWithServer.setUp(self)
+
+ def tearDown(self):
+ SoledadWithCouchServerMixin.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ @pytest.mark.needs_couch
+ @inlineCallbacks
+ def test_touch_updates_remote_representation(self):
+ self.startTwistedServer()
+ user = 'user-' + uuid4().hex
+ server_url = 'http://%s:%d' % (self.server_address)
+ client = self._soledad_instance(user=user, server_url=server_url)
+ deprecated_client = deprecate_client_crypto(
+ self._soledad_instance(user=user, server_url=server_url))
+
+ self.make_app()
+ remote = self.request_state._create_database(replica_uid=client.uuid)
+ remote = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + user),
+ create=True)
+
+ # ensure remote db is empty
+ gen, docs = remote.get_all_docs()
+ assert gen == 0
+ assert len(docs) == 0
+
+ # create a doc with deprecated client and sync
+ yield deprecated_client.create_doc(json.loads(simple_doc))
+ yield deprecated_client.sync()
+
+ # check for doc in remote db
+ gen, docs = remote.get_all_docs()
+ assert gen == 1
+ assert len(docs) == 1
+ doc = docs.pop()
+ content = doc.content
+ assert common_crypto.ENC_JSON_KEY in content
+ assert common_crypto.ENC_SCHEME_KEY in content
+ assert common_crypto.ENC_METHOD_KEY in content
+ assert common_crypto.ENC_IV_KEY in content
+ assert common_crypto.MAC_KEY in content
+ assert common_crypto.MAC_METHOD_KEY in content
+
+ # "touch" the document with a newer client and synx
+ _, docs = yield client.get_all_docs()
+ yield client.put_doc(doc)
+ yield client.sync()
+
+ # check for newer representation of doc in remote db
+ gen, docs = remote.get_all_docs()
+ assert gen == 2
+ assert len(docs) == 1
+ doc = docs.pop()
+ content = doc.content
+ assert len(content) == 1
+ assert 'raw' in content
diff --git a/tests/client/test_doc.py b/tests/client/test_doc.py
new file mode 100644
index 00000000..36479e90
--- /dev/null
+++ b/tests/client/test_doc.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# test_soledad_doc.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: soledad docs
+"""
+import pytest
+
+from testscenarios import TestWithScenarios
+
+from test_soledad.u1db_tests import test_document
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import make_soledad_document_for_test
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_document`.
+# -----------------------------------------------------------------------------
+
+@pytest.mark.usefixtures('method_tmpdir')
+class TestSoledadDocument(
+ TestWithScenarios,
+ test_document.TestDocument, BaseSoledadTest):
+
+ scenarios = ([(
+ 'leap', {
+ 'make_document_for_test': make_soledad_document_for_test})])
+
+
+@pytest.mark.usefixtures('method_tmpdir')
+class TestSoledadPyDocument(
+ TestWithScenarios,
+ test_document.TestPyDocument, BaseSoledadTest):
+
+ scenarios = ([(
+ 'leap', {
+ 'make_document_for_test': make_soledad_document_for_test})])
diff --git a/tests/client/test_http.py b/tests/client/test_http.py
new file mode 100644
index 00000000..47df4b4a
--- /dev/null
+++ b/tests/client/test_http.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+# test_http.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: test http database
+"""
+
+from twisted.trial import unittest
+
+from leap.soledad.client import auth
+from leap.soledad.common.l2db.remote import http_database
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_http_database`.
+# -----------------------------------------------------------------------------
+
+class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth):
+
+ """
+ Wraps our token auth implementation.
+ """
+
+ def set_token_credentials(self, uuid, token):
+ auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _sign_request(self, method, url_query, params):
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
+
+
+class TestHTTPDatabaseWithCreds(unittest.TestCase):
+
+ def test_get_sync_target_inherits_token_credentials(self):
+ # this test was from TestDatabaseSimpleOperations but we put it here
+ # for convenience.
+ self.db = _HTTPDatabase('dbase')
+ self.db.set_token_credentials('user-uuid', 'auth-token')
+ st = self.db.get_sync_target()
+ self.assertEqual(self.db._creds, st._creds)
+
+ def test_ctr_with_creds(self):
+ db1 = _HTTPDatabase('http://dbs/db', creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ self.assertIn('token', db1._creds)
diff --git a/tests/client/test_https.py b/tests/client/test_https.py
new file mode 100644
index 00000000..1b6caed6
--- /dev/null
+++ b/tests/client/test_https.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -*-
+# test_sync_target.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: https
+"""
+import pytest
+
+from testscenarios import TestWithScenarios
+
+from leap.soledad import client
+
+from leap.soledad.common.l2db.remote import http_client
+from test_soledad.u1db_tests import test_backends
+from test_soledad.u1db_tests import test_https
+from test_soledad.util import (
+ BaseSoledadTest,
+ make_soledad_document_for_test,
+ make_soledad_app,
+ make_token_soledad_app,
+)
+
+
+LEAP_SCENARIOS = [
+ ('http', {
+ 'make_database_for_test': test_backends.make_http_database_for_test,
+ 'copy_database_for_test': test_backends.copy_http_database_for_test,
+ 'make_document_for_test': make_soledad_document_for_test,
+ 'make_app_with_state': make_soledad_app}),
+]
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_https`.
+# -----------------------------------------------------------------------------
+
+def token_leap_https_sync_target(test, host, path, cert_file=None):
+ _, port = test.server.server_address
+ # source_replica_uid = test._soledad._dbpool.replica_uid
+ creds = {'token': {'uuid': 'user-uuid', 'token': 'auth-token'}}
+ if not cert_file:
+ cert_file = test.cacert_pem
+ st = client.http_target.SoledadHTTPSyncTarget(
+ 'https://%s:%d/%s' % (host, port, path),
+ source_replica_uid='other-id',
+ creds=creds,
+ crypto=test._soledad._crypto,
+ cert_file=cert_file)
+ return st
+
+
+@pytest.mark.skip
+class TestSoledadHTTPSyncTargetHttpsSupport(
+ TestWithScenarios,
+ # test_https.TestHttpSyncTargetHttpsSupport,
+ BaseSoledadTest):
+
+ scenarios = [
+ ('token_soledad_https',
+ {
+ # 'server_def': test_https.https_server_def,
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_document_for_test': make_soledad_document_for_test,
+ 'sync_target': token_leap_https_sync_target}),
+ ]
+
+ def setUp(self):
+ # the parent constructor undoes our SSL monkey patch to ensure tests
+ # run smoothly with standard u1db.
+ test_https.TestHttpSyncTargetHttpsSupport.setUp(self)
+ # so here monkey patch again to test our functionality.
+ api = client.api
+ http_client._VerifiedHTTPSConnection = api.VerifiedHTTPSConnection
+ client.api.SOLEDAD_CERT = http_client.CA_CERTS
+
+ def test_cannot_verify_cert(self):
+ self.startServer()
+ # don't print expected traceback server-side
+ self.server.handle_error = lambda req, cli_addr: None
+ self.request_state._create_database('test')
+ remote_target = self.getSyncTarget(
+ 'localhost', 'test', cert_file=http_client.CA_CERTS)
+ d = remote_target.record_sync_info('other-id', 2, 'T-id')
+
+ def _assert_raises(result):
+ from twisted.python.failure import Failure
+ if isinstance(result, Failure):
+ from OpenSSL.SSL import Error
+ error = result.value.message[0].value
+ if isinstance(error, Error):
+ msg = error.message[0][2]
+ self.assertEqual("certificate verify failed", msg)
+ return
+ self.fail("certificate verification should have failed.")
+
+ d.addCallbacks(_assert_raises, _assert_raises)
+ return d
+
+ def test_working(self):
+ """
+ Test that SSL connections work well.
+
+ This test was adapted to patch Soledad's HTTPS connection custom class
+ with the intended CA certificates.
+ """
+ self.startServer()
+ db = self.request_state._create_database('test')
+ remote_target = self.getSyncTarget('localhost', 'test')
+ d = remote_target.record_sync_info('other-id', 2, 'T-id')
+ d.addCallback(lambda _:
+ self.assertEqual(
+ (2, 'T-id'),
+ db._get_replica_gen_and_trans_id('other-id')
+ ))
+ d.addCallback(lambda _: remote_target.close())
+ return d
+
+ def test_host_mismatch(self):
+ """
+ This test is disabled because soledad's twisted-based http agent uses
+ pyOpenSSL, which will complain if we try to use an IP to connect to
+ the remote host (see the original test in u1db_tests/test_https.py).
+ """
+ pass
diff --git a/tests/client/test_incoming_processing_flow.py b/tests/client/test_incoming_processing_flow.py
new file mode 100644
index 00000000..7bc1e3c6
--- /dev/null
+++ b/tests/client/test_incoming_processing_flow.py
@@ -0,0 +1,197 @@
+# -*- coding: utf-8 -*-
+# test_incoming_processing_flow.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Unit tests for incoming box processing flow.
+"""
+from mock import Mock, call
+from leap.soledad.client import interfaces
+from leap.soledad.client.incoming import IncomingBoxProcessingLoop
+from twisted.internet import defer
+from twisted.trial import unittest
+from zope.interface import implementer
+
+
+@implementer(interfaces.IIncomingBoxConsumer)
+class GoodConsumer(object):
+ def __init__(self):
+ self.name = 'GoodConsumer'
+ self.processed, self.saved = [], []
+
+ def process(self, item, item_id, encrypted=True):
+ self.processed.append(item_id)
+ return defer.succeed([item_id])
+
+ def save(self, parts, item_id):
+ self.saved.append(item_id)
+ return defer.succeed(None)
+
+
+class ProcessingFailedConsumer(GoodConsumer):
+ def __init__(self):
+ self.name = 'ProcessingFailedConsumer'
+ self.processed, self.saved = [], []
+
+ def process(self, item, item_id, encrypted=True):
+ return defer.fail()
+
+
+class SavingFailedConsumer(GoodConsumer):
+ def __init__(self):
+ self.name = 'SavingFailedConsumer'
+ self.processed, self.saved = [], []
+
+ def save(self, parts, item_id):
+ return defer.fail()
+
+
+class IncomingBoxProcessingTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.box = Mock()
+ self.loop = IncomingBoxProcessingLoop(self.box)
+
+ def _set_pending_items(self, pending):
+ self.box.list_pending.return_value = defer.succeed(pending)
+ pending_iter = iter([defer.succeed(item) for item in pending])
+ self.box.fetch_for_processing.side_effect = pending_iter
+
+ @defer.inlineCallbacks
+ def test_processing_flow_reserves_a_message(self):
+ self._set_pending_items(['one_item'])
+ self.loop.add_consumer(GoodConsumer())
+ yield self.loop()
+ self.box.fetch_for_processing.assert_called_once_with('one_item')
+
+ @defer.inlineCallbacks
+ def test_no_consumers(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ yield self.loop()
+ self.box.fetch_for_processing.assert_not_called()
+ self.box.delete.assert_not_called()
+
+ @defer.inlineCallbacks
+ def test_pending_list_with_multiple_items(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ calls = [call('one'), call('two'), call('three')]
+ self.box.fetch_for_processing.assert_has_calls(calls)
+
+ @defer.inlineCallbacks
+ def test_good_consumer_process_all(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.assertEquals(items, consumer.processed)
+
+ @defer.inlineCallbacks
+ def test_good_consumer_saves_all(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.assertEquals(items, consumer.saved)
+
+ @defer.inlineCallbacks
+ def test_multiple_good_consumers_process_all(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = GoodConsumer()
+ consumer2 = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ self.loop.add_consumer(consumer2)
+ yield self.loop()
+ self.assertEquals(items, consumer.processed)
+ self.assertEquals(items, consumer2.processed)
+
+ @defer.inlineCallbacks
+ def test_good_consumer_marks_as_processed(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.set_processed.assert_has_calls([call(x) for x in items])
+
+ @defer.inlineCallbacks
+ def test_good_consumer_deletes_items(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.delete.assert_has_calls([call(x) for x in items])
+
+ @defer.inlineCallbacks
+ def test_processing_failed_doesnt_mark_as_processed(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = ProcessingFailedConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.set_processed.assert_not_called()
+
+ @defer.inlineCallbacks
+ def test_processing_failed_doesnt_delete(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = ProcessingFailedConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.delete.assert_not_called()
+
+ @defer.inlineCallbacks
+ def test_processing_failed_marks_as_failed(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = ProcessingFailedConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.set_failed.assert_has_calls([call(x) for x in items])
+
+ @defer.inlineCallbacks
+ def test_saving_failed_marks_as_processed(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = SavingFailedConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.set_processed.assert_has_calls([call(x) for x in items])
+
+ @defer.inlineCallbacks
+ def test_saving_failed_doesnt_delete(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = SavingFailedConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.delete.assert_not_called()
+
+ @defer.inlineCallbacks
+ def test_saving_failed_marks_as_failed(self):
+ items = ['one', 'two', 'three']
+ self._set_pending_items(items)
+ consumer = SavingFailedConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.box.set_failed.assert_has_calls([call(x) for x in items])
diff --git a/tests/client/test_recovery_code.py b/tests/client/test_recovery_code.py
new file mode 100644
index 00000000..7bbccc41
--- /dev/null
+++ b/tests/client/test_recovery_code.py
@@ -0,0 +1,38 @@
+# -*- CODing: utf-8 -*-
+# test_recovery_code.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for recovery code generation.
+"""
+import binascii
+
+from mock import patch
+from twisted.trial import unittest
+from leap.soledad.client._recovery_code import RecoveryCode
+
+
+class RecoveryCodeTestCase(unittest.TestCase):
+
+ @patch('leap.soledad.client._recovery_code.os.urandom')
+ def test_generate_recovery_code(self, mock_os_urandom):
+ generated_random_code = '123456'
+ mock_os_urandom.return_value = generated_random_code
+ recovery_code = RecoveryCode()
+
+ code = recovery_code.generate()
+
+ mock_os_urandom.assert_called_with(RecoveryCode.code_length)
+ self.assertEqual(binascii.hexlify(generated_random_code), code)
diff --git a/tests/client/test_secrets.py b/tests/client/test_secrets.py
new file mode 100644
index 00000000..7b643cb4
--- /dev/null
+++ b/tests/client/test_secrets.py
@@ -0,0 +1,166 @@
+# -*- CODing: utf-8 -*-
+# test_secrets.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for secrets encryption and decryption.
+"""
+import scrypt
+
+from twisted.trial import unittest
+
+from leap.soledad.client._crypto import ENC_METHOD
+from leap.soledad.client._secrets import SecretsCrypto
+
+
+class SecretsCryptoTestCase(unittest.TestCase):
+
+ SECRETS = {
+ 'remote_secret': 'a' * 512,
+ 'local_salt': 'b' * 64,
+ 'local_secret': 'c' * 448
+ }
+ ENCRYPTED_V2 = {
+ 'cipher': 2,
+ 'length': 1437,
+ 'kdf_salt': 'TSgNLeAGFeITeSgNzmYZHh+mzmkZPOqao7CAV/tx3KZCLwsrT0HmWtVK3'
+ 'TyWHWNgVdeamMZYRuvZavE2sp0DGw==\n',
+ 'iv': 'TKZQKIlRgdnXFhJf08qswg==',
+ 'secrets': 'ZNZRi72VDtwZqyuU+uf3yzZt23vCtMS3Ki2bnZyeHUOSGVweJeDadF4oqE'
+ 'BW87NN00j9E49BzyzLr9SNgwZjPp0wlUm7kt+s8EUfJUdH8nxaQ+9iqGXM'
+ 'cCHmBM8L8DRN2m3BrPGx7m+QGlN9sbrRpl7fqc46RWcYuTEpm4upjdtI7O'
+ 'jDd0JG3C0rUzIuKJn9w4rEpX3tLEKXVdZfLvRXS5roR0cauazsDO69E13q'
+ 'a01vDuY+UJ+buLQ3FluPnnk8QE7ztPVUmRJJ76yAIhjVX9owiwlp9GnUJY'
+ 'sETRCqdRSTwUcHIkzVR0zAvtxTX7eGTitzf4gCYEC4T9v5N/jHxEfPdx28'
+ 'MM4KShWN2nFxNFQLQUpMN2OrM7UyUw+DQ3ydqBeBPKPHRN5s05kIK7P/Ra'
+ 'aLNcrJWa7DopLbgLlei0Jd7S4sjv1ufaRY7v0qJaVkhh/VaCylTSVw1rv5'
+ 'YzSWcHHcLuC0R8xLadz6T+EpsVYxgPYCS7w5xoE82zwNQzw/EBxLIcyLPl'
+ 'ipKnr2dttrmm3KXUOT1IdbSbI5elF6yQTAusdqiXuypey+MDqHYWEYWkCn'
+ 'e9/uGM9FjklDLE0RtPEDxhq64tw6u2Xu7RzDzyQDI8EIoTdU+4zEMTnelZ'
+ 'fKEwdG58EDxTXfUk6IDcRUupz3YuToSMhIOkqgXnbWl/nrK0O9v4JMhQjI'
+ 'r+oPICYfFr14kvJXBsfntILTJCxzbqTQcNba3jc8rGqCZ6gM0u4PndwTG2'
+ 'UiCqPU2HMnWvVGQOXeLdQY+EqqXQiRDi0DrDmkVwFf+27dPXxmZ43C48W3'
+ 'lMhTKXl0rdBFnOD5jiMh0X6q/KYXonyEtMZMsjT7dFePcCy4wQRhuut+ac'
+ '/TJWyrr+/IB45E+LZbhV7xCy1dYsbdb52jTRJFpaQ83sj6Iv6SYdiqqXzL'
+ 'F5JGMyuovTjwAoIIQzpLv36xY2wGGAH1V8c7QmDR2qumXrHD9R68WjBoSY'
+ '7IFM0TFAGZNun56y/zQ4r8yOMSAId+j4kuRH0fENEi0FJ+SpmSdHfpvBhE'
+ 'MdGh927E9enEYWmUQMmkxXIw6E+O3cmOWt2hsMbUAikDCpQOnVP2BD55HT'
+ '6FfbW7ITVwPINHYexmy2Xcm8H5zzGFSp+uYIyPBYDKA+VJ+QQI8bud9K+T'
+ 'NBybUv9u6LbB6BsLpwLoxMPJu0WsN2HpmLYgrg2ML1huMF1OtaGRuUr2PL'
+ 'NBaZaL6VOztYrVtQG1+tNyRxn8XQTtx0l6n+EihGVe9Sk5XF6DJA9ZN7uO'
+ 'svTUFJ5qG3Erf4AmbUJWoOR/NvapBtifidM7gPZZ6NqBs6v72rU1pGy+p7'
+ 'o84KrmB2MNf3yJ0BvKxPvFmltF3Dc7LB5TN8ycbmFM6hgrLvvhPxiHEnG/'
+ '8Qcrg0nUXOipFGNgZEU7t7Mz6RJ189Z2Kx1HVGrkAzEgqwZYqijAPlsgzO'
+ 'bg6DwzwC7stolQWGCDQUtJVlE8FZ/Up8zFYYZKn52WzjmSN4/hHhEvdkck'
+ 'Nez/JVev6fMcVrgdrTZ+uCwxjN/4xPdgog2HV470ea1bvIkSNOOrhm194M'
+ '40GmvmBhiSSMjdRQCQtM0t9bUuSQLPDzEiCA9QaLyygtlz9uRR/dXgkEg0'
+ 'J4YNpZvhE0wbyp4GHytbPaAmrcd7im9+buTuMwhXpZl0stmfkJxVHJSZ8Y'
+ 'IHakHs3W1fdYyI3wxGpel/9eYO3ISukolwrHXESP65wVNKfBwbqVJzQmts'
+ 'pyDBOI6DcLKZfE1EVg0+uwQ/5PKZbn0TwlXO1YE3NL3mAply3zQR9hyBrY'
+ '6f1jkHVD3irIlWkSiPJsP8sW+nrK8c/Ha8F+dua6DTZmg594OIaQj8mPiY'
+ 'GcIusiARWocR5/MmSjupGOgFx4HtmckTJtAta3XP4elOx04teH/P9Cgr1x'
+ 'XYf+cEX6gp92L9rTo0FCz3Hw==\n',
+ 'version': 2,
+ 'kdf': 'scrypt',
+ 'kdf_length': 32
+ }
+
+ ENCRYPTED_V1 = {
+ 'version': 1,
+ 'active_secret': 'secret_id',
+ 'storage_secrets': {
+ 'secret_id': {
+ 'kdf': 'scrypt',
+ 'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h'
+ 'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F'
+ 'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh'
+ 'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp'
+ 'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR'
+ 'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050'
+ 'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc'
+ 'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6'
+ 'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9'
+ 'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC'
+ 'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF'
+ 'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa'
+ 'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw'
+ 'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC'
+ 'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu'
+ 'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/'
+ 'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS'
+ 'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v'
+ 'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj'
+ 'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV'
+ 'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk'
+ 'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV'
+ 'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8'
+ 'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ'
+ '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx'
+ 'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4'
+ 'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw'
+ '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==',
+ 'kdf_length': 32,
+ 'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB'
+ 'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n',
+ 'length': 1024,
+ 'cipher': 'aes256',
+ }
+ }
+ }
+
+ def setUp(self):
+ class Soledad(object):
+ passphrase = '123'
+ soledad = Soledad()
+ self._crypto = SecretsCrypto(soledad)
+
+ def test__get_key(self):
+ salt = 'abc'
+ expected = scrypt.hash('123', salt, buflen=32)
+ key = self._crypto._get_key(salt)
+ self.assertEqual(expected, key)
+
+ def test_encrypt(self):
+ info = self._crypto.encrypt(self.SECRETS)
+ self.assertEqual(8, len(info))
+ for key, value in [
+ ('kdf', 'scrypt'),
+ ('kdf_salt', None),
+ ('kdf_length', None),
+ ('cipher', ENC_METHOD.aes_256_gcm),
+ ('length', None),
+ ('iv', None),
+ ('secrets', None),
+ ('version', 2)]:
+ self.assertTrue(key in info)
+ if value:
+ self.assertEqual(info[key], value)
+
+ def test__decrypt_v2(self):
+ encrypted = self.ENCRYPTED_V2
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
+
+ def test__decrypt_v1(self):
+ encrypted = self.ENCRYPTED_V1
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
+
+ def test__no_version_defaults_to_v1(self):
+ encrypted = dict(self.ENCRYPTED_V1)
+ del encrypted['version']
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
+ self.assertEqual(encrypted['version'], 1)
diff --git a/tests/client/test_shared_db.py b/tests/client/test_shared_db.py
new file mode 100644
index 00000000..b045e524
--- /dev/null
+++ b/tests/client/test_shared_db.py
@@ -0,0 +1,40 @@
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.client.shared_db import SoledadSharedDatabase
+
+from test_soledad.util import BaseSoledadTest
+
+
+class SoledadSharedDBTestCase(BaseSoledadTest):
+
+ """
+ These tests ensure the functionalities of the shared recovery database.
+ """
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ self._shared_db = SoledadSharedDatabase(
+ 'https://provider/', document_factory=SoledadDocument,
+ creds=None)
+
+ def tearDown(self):
+ BaseSoledadTest.tearDown(self)
+
+ def test__get_remote_doc(self):
+ """
+ Ensure the shared db is queried with the correct doc_id.
+ """
+ doc_id = self._soledad.secrets.storage._remote_doc_id()
+ self._soledad.secrets.storage._get_remote_doc()
+ self._soledad.secrets.storage._shared_db.get_doc.assert_called_with(
+ doc_id)
+
+ def test_save_remote(self):
+ """
+ Ensure recovery document is put into shared recover db.
+ """
+ doc_id = self._soledad.secrets.storage._remote_doc_id()
+ storage = self._soledad.secrets.storage
+ storage.save_remote({'content': 'blah'})
+ storage._shared_db.get_doc.assert_called_with(doc_id)
+ storage._shared_db.put_doc.assert_called_with(self._doc_put)
+ self.assertTrue(self._doc_put.doc_id == doc_id)
diff --git a/tests/client/test_signals.py b/tests/client/test_signals.py
new file mode 100644
index 00000000..c7609a74
--- /dev/null
+++ b/tests/client/test_signals.py
@@ -0,0 +1,149 @@
+from mock import Mock
+from twisted.internet import defer
+
+from leap import soledad
+from leap.common.events import catalog
+from leap.soledad.common.document import SoledadDocument
+
+from test_soledad.util import ADDRESS
+from test_soledad.util import BaseSoledadTest
+
+
+class SoledadSignalingTestCase(BaseSoledadTest):
+
+ """
+ These tests ensure signals are correctly emmited by Soledad.
+ """
+
+ EVENTS_SERVER_PORT = 8090
+
+ def setUp(self):
+ # mock signaling
+ soledad.client.signal = Mock()
+ soledad.client._secrets.util.events.emit_async = Mock()
+ # run parent's setUp
+ BaseSoledadTest.setUp(self)
+
+ def tearDown(self):
+ BaseSoledadTest.tearDown(self)
+
+ def _pop_mock_call(self, mocked):
+ mocked.call_args_list.pop()
+ mocked.mock_calls.pop()
+ mocked.call_args = mocked.call_args_list[-1]
+
+ def test_stage3_bootstrap_signals(self):
+ """
+ Test that a fresh soledad emits all bootstrap signals.
+
+ Signals are:
+ - downloading keys / done downloading keys.
+ - creating keys / done creating keys.
+ - downloading keys / done downloading keys.
+ - uploading keys / done uploading keys.
+ """
+ soledad.client._secrets.util.events.emit_async.reset_mock()
+ # get a fresh instance so it emits all bootstrap signals
+ sol = self._soledad_instance(
+ secrets_path='alternative_stage3.json',
+ local_db_path='alternative_stage3.u1db')
+ # reverse call order so we can verify in the order the signals were
+ # expected
+ soledad.client._secrets.util.events.emit_async.mock_calls.reverse()
+ soledad.client._secrets.util.events.emit_async.call_args = \
+ soledad.client._secrets.util.events.emit_async.call_args_list[0]
+ soledad.client._secrets.util.events.emit_async.call_args_list.reverse()
+
+ user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
+
+ def _assert(*args, **kwargs):
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.assert_called_with(*args)
+ pop = kwargs.get('pop')
+ if pop or pop is None:
+ self._pop_mock_call(mocked)
+
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_CREATING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False)
+
+ sol.close()
+
+ def test_stage2_bootstrap_signals(self):
+ """
+ Test that if there are keys in server, soledad will download them and
+ emit corresponding signals.
+ """
+ # get existing instance so we have access to keys
+ sol = self._soledad_instance()
+ # create a document with secrets
+ doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id())
+ doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets)
+ sol.close()
+ # reset mock
+ soledad.client._secrets.util.events.emit_async.reset_mock()
+ # get a fresh instance so it emits all bootstrap signals
+ shared_db = self.get_default_shared_mock(get_doc_return_value=doc)
+ sol = self._soledad_instance(
+ secrets_path='alternative_stage2.json',
+ local_db_path='alternative_stage2.u1db',
+ shared_db_class=shared_db)
+ # reverse call order so we can verify in the order the signals were
+ # expected
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.mock_calls.reverse()
+ mocked.call_args = mocked.call_args_list[0]
+ mocked.call_args_list.reverse()
+
+ def _assert(*args, **kwargs):
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.assert_called_with(*args)
+ pop = kwargs.get('pop')
+ if pop or pop is None:
+ self._pop_mock_call(mocked)
+
+ # assert download keys signals
+ user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False)
+
+ sol.close()
+
+ def test_stage1_bootstrap_signals(self):
+ """
+ Test that if soledad already has a local secret, it emits no signals.
+ """
+ soledad.client.signal.reset_mock()
+ # get an existent instance so it emits only some of bootstrap signals
+ sol = self._soledad_instance()
+ self.assertEqual([], soledad.client.signal.mock_calls)
+ sol.close()
+
+ @defer.inlineCallbacks
+ def test_sync_signals(self):
+ """
+ Test Soledad emits SOLEDAD_CREATING_KEYS signal.
+ """
+ # get a fresh instance so it emits all bootstrap signals
+ sol = self._soledad_instance()
+ soledad.client.signal.reset_mock()
+
+ # mock the actual db sync so soledad does not try to connect to the
+ # server
+ d = defer.Deferred()
+ d.callback(None)
+ sol._dbsyncer.sync = Mock(return_value=d)
+
+ yield sol.sync()
+
+ # assert the signal has been emitted
+ soledad.client.events.emit_async.assert_called_with(
+ catalog.SOLEDAD_DONE_DATA_SYNC,
+ {'userid': ADDRESS, 'uuid': ADDRESS},
+ )
+ sol.close()
diff --git a/tests/client/test_soledad b/tests/client/test_soledad
new file mode 120000
index 00000000..c1a35d32
--- /dev/null
+++ b/tests/client/test_soledad
@@ -0,0 +1 @@
+../test_soledad \ No newline at end of file
diff --git a/tests/client/test_soledad_doc.py b/tests/client/test_soledad_doc.py
new file mode 100644
index 00000000..e158d768
--- /dev/null
+++ b/tests/client/test_soledad_doc.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# test_soledad_doc.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: soledad docs
+"""
+from testscenarios import TestWithScenarios
+
+from test_soledad.u1db_tests import test_document
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import make_soledad_document_for_test
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_document`.
+# -----------------------------------------------------------------------------
+
+class TestSoledadDocument(
+ TestWithScenarios,
+ test_document.TestDocument, BaseSoledadTest):
+
+ scenarios = ([(
+ 'leap', {
+ 'make_document_for_test': make_soledad_document_for_test})])
+
+
+class TestSoledadPyDocument(
+ TestWithScenarios,
+ test_document.TestPyDocument, BaseSoledadTest):
+
+ scenarios = ([(
+ 'leap', {
+ 'make_document_for_test': make_soledad_document_for_test})])