summaryrefslogtreecommitdiff
path: root/testing
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-12-05 09:10:47 -0200
committerKali Kaneko <kali@leap.se>2017-02-09 17:41:26 +0100
commitff85c2a41fe933d9959fb84a0df2a13a6e199cec (patch)
tree6f52cae13589aa13675f662f40f5040056575ef1 /testing
parent8a463796bbaba3979234b0699d140947581421e7 (diff)
[refactor] improve secrets generation and storage code
Diffstat (limited to 'testing')
-rw-r--r--testing/tests/client/test_aux_methods.py27
-rw-r--r--testing/tests/client/test_crypto.py227
-rw-r--r--testing/tests/client/test_deprecated_crypto.py6
-rw-r--r--testing/tests/client/test_shared_db.py38
-rw-r--r--testing/tests/client/test_signals.py98
-rw-r--r--testing/tests/sync/test_sync_target.py2
6 files changed, 203 insertions, 195 deletions
diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py
index 9b4a175f..a08f7d36 100644
--- a/testing/tests/client/test_aux_methods.py
+++ b/testing/tests/client/test_aux_methods.py
@@ -19,12 +19,11 @@ Tests for general Soledad functionality.
"""
import os
-from twisted.internet import defer
+from pytest import inlineCallbacks
from leap.soledad.client import Soledad
from leap.soledad.client.adbapi import U1DBConnectionPool
-from leap.soledad.client.secrets import PassphraseTooShort
-from leap.soledad.client.secrets import SecretsException
+from leap.soledad.client._secrets.util import SecretsError
from test_soledad.util import BaseSoledadTest
@@ -34,7 +33,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
def test__init_dirs(self):
sol = self._soledad_instance(prefix='_init_dirs')
local_db_dir = os.path.dirname(sol.local_db_path)
- secrets_path = os.path.dirname(sol.secrets.secrets_path)
+ secrets_path = os.path.dirname(sol.secrets.storage._local_path)
self.assertTrue(os.path.isdir(local_db_dir))
self.assertTrue(os.path.isdir(secrets_path))
@@ -85,14 +84,14 @@ class AuxMethodsTestCase(BaseSoledadTest):
cert_file=None)
self.assertEqual(
os.path.join(self.tempdir, 'value_3'),
- sol.secrets.secrets_path)
+ sol.secrets.storage._local_path)
self.assertEqual(
os.path.join(self.tempdir, 'value_2'),
sol.local_db_path)
self.assertEqual('value_1', sol._server_url)
sol.close()
- @defer.inlineCallbacks
+ @inlineCallbacks
def test_change_passphrase(self):
"""
Test if passphrase can be changed.
@@ -108,7 +107,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol.change_passphrase(u'654321')
sol.close()
- with self.assertRaises(SecretsException):
+ with self.assertRaises(SecretsError):
self._soledad_instance(
'leap@leap.se',
passphrase=u'123',
@@ -124,20 +123,6 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol2.close()
- def test_change_passphrase_with_short_passphrase_raises(self):
- """
- Test if attempt to change passphrase passing a short passphrase
- raises.
- """
- sol = self._soledad_instance(
- 'leap@leap.se',
- passphrase=u'123')
- # check that soledad complains about new passphrase length
- self.assertRaises(
- PassphraseTooShort,
- sol.change_passphrase, u'54321')
- sol.close()
-
def test_get_passphrase(self):
"""
Assert passphrase getter works fine.
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 49a61438..379475cd 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -19,9 +19,9 @@ Tests for cryptographic related stuff.
"""
import binascii
import base64
-import hashlib
import json
import os
+import scrypt
from io import BytesIO
@@ -34,6 +34,7 @@ from cryptography.exceptions import InvalidTag
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
from leap.soledad.client import _crypto
+from leap.soledad.client._secrets import SecretsCrypto
from twisted.trial import unittest
from twisted.internet import defer
@@ -186,99 +187,145 @@ class BlobTestCase(unittest.TestCase):
yield crypto.decrypt_doc(doc2)
-class RecoveryDocumentTestCase(BaseSoledadTest):
-
- def test_export_recovery_document_raw(self):
- rd = self._soledad.secrets._export_recovery_document()
- secret_id = rd[self._soledad.secrets.STORAGE_SECRETS_KEY].items()[0][0]
- # assert exported secret is the same
- secret = self._soledad.secrets._decrypt_storage_secret_version_1(
- rd[self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id])
- self.assertEqual(secret_id, self._soledad.secrets._secret_id)
- self.assertEqual(secret, self._soledad.secrets._secrets[secret_id])
- # assert recovery document structure
- encrypted_secret = rd[
- self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]
- self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret)
- self.assertEquals(
- _crypto.ENC_METHOD.aes_256_gcm,
- encrypted_secret[self._soledad.secrets.CIPHER_KEY])
- self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)
- self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret)
-
- def test_import_recovery_document(self, cipher='aes256'):
- rd = self._soledad.secrets._export_recovery_document(cipher)
- s = self._soledad_instance()
- s.secrets._import_recovery_document(rd)
- s.secrets.set_secret_id(self._soledad.secrets._secret_id)
- self.assertEqual(self._soledad.storage_secret,
- s.storage_secret,
- 'Failed settinng secret for symmetric encryption.')
- s.close()
-
- def test_import_GCM_recovery_document(self):
- cipher = self._soledad.secrets.CIPHER_AES256_GCM
- self.test_import_recovery_document(cipher)
-
- def test_import_legacy_CTR_recovery_document(self):
- cipher = self._soledad.secrets.CIPHER_AES256
- self.test_import_recovery_document(cipher)
+class SecretsCryptoTestCase(unittest.TestCase):
+
+ SECRETS = {'remote': 'a' * 512, 'salt': 'b' * 64, 'local': 'c' * 448}
+ ENCRYPTED_V2 = {
+ 'cipher': 'aes_256_gcm',
+ 'length': 1417,
+ 'kdf_salt': '3DCkfecls0GcX2RadA04FAC2cqkI+vpGwwCLwffdRI6vpO5SPxaw/eM0/'
+ 'z3GUADm3If3YCQBldKXNdqHQLsU1Q==\n',
+ 'iv': 'rRwCDw5Rbp5+J3QwjQ46Hw==',
+ 'secrets': 'lxf6yrGDcBr8XFWNDgsCoO2XPGfDJndviL9Y2GmHcSEBWnO2dm2sieuPoq'
+ 'PwSHRSJSrzM4Ezgdaan7X8+ErnuRLUVqbPAqPl8xx8FdCjnid4vFyFYNFI'
+ '/dmo8SQAf8O9vdlVEPZ5Nk2DuWIrh+oPlrSUOmR6XzI0YVdoJDmGWowygU'
+ 'MR0R9Bi9xFGlG135NVcNP8KGdnQDkI0V+U/3qm3tctbo4LRCxxJ60wdi0M'
+ 'DA6iYFI/IMshxI/ZXFHp5/YPk0k2m0i6z71kMVksgjIMMgT5Kmz7WR54na'
+ 'IkWbvNkbYRFR/Hbg9p6Bs7NjJlOLTjnwGJNYPbdyfJXKd1R/S8Mg7ZqsyQ'
+ 'VbBqXHwEN7gYlMZ66D8wu8LOK70mN7LLiSz5J8tXO3rDT1mIIf3IvNhv/j'
+ 'rEZHf1fTFPRp+ZVEt/hJKyPv71ua4p2lgdgNlCs2IsACk9ku/LQwXP6uZr'
+ 'hMJsTvniTQoCVXFYVN/jKo7Pz/+uT5wOXOXtL7smpBE/2r3uoERNM+Zw11'
+ 'SA8UzzMZQMxJQKVNwLmKtwvztN5dxVXhxCUyeLmeQc84VzV7NK0WMUOdfA'
+ '18I0HS6rHLKcdsvrPAdzvGim7tiE8TBdp8ITNQ8yMFNiGNyOVliTSTwQFf'
+ 'sCj6m5nYcjvprNQ8RkeitvicrtI1Ylc8CfFK50xPV77XVmlgvNsfm54msN'
+ 'tV0K5+XwaNgimlh/1m2bVEYj55gO0twVASwRuZj3sSY2z669iuXRk7EPyT'
+ 'jcE2NnfW+lqOQkJ73N7pv73t6OjiEnrKx7VmH94zYlY8ZReVVn4RTZhare'
+ 'D7rqCmGPhsPaCPaAfotfNBBa0w6p6L9ZlNxpIesnMObtyGob1g4Vcu8O6K'
+ '2Q1Ldj95+Q53tJDpx2NLP/5tfAUlbehD3whKwKOz/rGKEfhgE+Nx32RR0y'
+ 'YM4aJ7CYI/U3YH82xqGoa1ufIJbSBt965CVIHSVJt/mYfilhMACV/wBlvL'
+ 'ua08iKpHwc7suMc9DuFS4s/bAzc128L8wtfNvNiP6zhAV+UvfgUmyNKjgl'
+ '0be9Ke2pCNChEQmViNal3zbWNcBrXYQpFpX1lWNkx/OuQalxzSaqmZiOR5'
+ 'eRwqRDZ3R9EpkOFj2ZXS1NlJg1kYXL/ibS8uvjKgJFPrZQzwaKmPNsZyGc'
+ 'CnHupfgC2iRIu97wnvmDxWQ9Cs62NSynr0IYGkTLN5PZU6Z5gd1F7zV6uh'
+ 'oFiHOYidj2EoUj7xnb8GHi5U6PQzaC97nSCR4CFnmcpfv+XcRIWe8nrM8G'
+ 'AVdcUob8pofUlnyGV6GEGlO3mnb7ls5B6lvuZqB/x6UqZiNKwmZvxvS11X'
+ 'AGkhfBGTfFZeqRlLwXvXWnOUOO0KJ8h3gSlc1gFVY+4HCbTOqjUASWw0mV'
+ 'JP+U0anK9wu9B/icLDUZxM/NRdbTQFmcfvABjwdm2GTmwGpQek/H0wN3dO'
+ 'terlTiS7arMUft7A6hkhkmLb0iDfWPWdN50V+XOMpdZtaJSGqwNHokc75p'
+ '3zYll0/ZpxTgmWXariOkKxr6KHHjml89QNQSBE2TJW/YnQ5SrkaHLHKdcy'
+ 'PqQtcXDz/WxKquQfRF+fsvcwqaeqlAWOxUXHU77cBvDGPU5O3uvEIJnHr1'
+ 'kuabqRQbJIV5Uzo4sEW828r2IWQnUd4Om79y+9yp/aT10DusEmvOgS3oSp'
+ '3eYkhvlVULeCQEJoI41t4nGLhHiiK4xBG8yFknuV7nF4k2O+EbyCXsJeeD'
+ 'qlGok91zEhQl1MlQA8ZofRK7bDPcn97USiJMss81s5bwIv4yN8s0QL62Ha'
+ 'vrIYG7C26DV6c0GxULu02H1YOnoPf6JsGC/2+zA+b7a+4O0EP0BXU3FYCb'
+ 'iEDbDpB3dFe63ed+ml2HQjqzOLAtKVXzAQq5UNV4m2zY0/y7gV7qSrM='
+ '\n',
+ 'version': 2,
+ 'kdf': 'scrypt',
+ 'kdf_length': 32
+ }
+
+ ENCRYPTED_V1 = {
+ 'version': 1,
+ 'active_secret': 'secret_id',
+ 'storage_secrets': {
+ 'secret_id': {
+ 'kdf': 'scrypt',
+ 'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h'
+ 'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F'
+ 'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh'
+ 'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp'
+ 'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR'
+ 'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050'
+ 'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc'
+ 'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6'
+ 'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9'
+ 'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC'
+ 'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF'
+ 'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa'
+ 'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw'
+ 'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC'
+ 'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu'
+ 'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/'
+ 'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS'
+ 'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v'
+ 'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj'
+ 'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV'
+ 'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk'
+ 'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV'
+ 'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8'
+ 'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ'
+ '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx'
+ 'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4'
+ 'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw'
+ '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==',
+ 'kdf_length': 32,
+ 'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB'
+ 'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n',
+ 'length': 1024,
+ 'cipher': 'aes256',
+ }
+ }
+ }
+
+ def setUp(self):
+ def _get_pass():
+ return '123'
+ self._crypto = SecretsCrypto(_get_pass)
+
+ def test__get_pass(self):
+ self.assertEqual(self._crypto._get_pass(), '123')
+
+ def test__get_key(self):
+ salt = 'abc'
+ expected = scrypt.hash('123', salt, buflen=32)
+ key = self._crypto._get_key(salt)
+ self.assertEqual(expected, key)
+
+ def test_encrypt(self):
+ info = self._crypto.encrypt(self.SECRETS)
+ self.assertEqual(8, len(info))
+ for key, value in [
+ ('kdf', 'scrypt'),
+ ('kdf_salt', None),
+ ('kdf_length', None),
+ ('cipher', 'aes_256_gcm'),
+ ('length', None),
+ ('iv', None),
+ ('secrets', None),
+ ('version', 2)]:
+ self.assertTrue(key in info)
+ if value:
+ self.assertEqual(info[key], value)
+
+ def test__decrypt_v2(self):
+ encrypted = self.ENCRYPTED_V2
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
+
+ def test__decrypt_v1(self):
+ encrypted = self.ENCRYPTED_V1
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
class SoledadSecretsTestCase(BaseSoledadTest):
- def test_new_soledad_instance_generates_one_secret(self):
- self.assertTrue(
- self._soledad.storage_secret is not None,
- "Expected secret to be something different than None")
- number_of_secrets = len(self._soledad.secrets._secrets)
- self.assertTrue(
- number_of_secrets == 1,
- "Expected exactly 1 secret, got %d instead." % number_of_secrets)
-
- def test_generated_secret_is_of_correct_type(self):
- expected_type = str
- self.assertIsInstance(
- self._soledad.storage_secret, expected_type,
- "Expected secret to be of type %s" % expected_type)
-
- def test_generated_secret_has_correct_lengt(self):
- expected_length = self._soledad.secrets.GEN_SECRET_LENGTH
- actual_length = len(self._soledad.storage_secret)
- self.assertTrue(
- expected_length == actual_length,
- "Expected secret with length %d, got %d instead."
- % (expected_length, actual_length))
-
- def test_generated_secret_id_is_sha256_hash_of_secret(self):
- generated = self._soledad.secrets.secret_id
- expected = hashlib.sha256(self._soledad.storage_secret).hexdigest()
- self.assertTrue(
- generated == expected,
- "Expeceted generated secret id to be sha256 hash, got something "
- "else instead.")
-
- def test_generate_new_secret_generates_different_secret_id(self):
- # generate new secret
- secret_id_1 = self._soledad.secrets.secret_id
- secret_id_2 = self._soledad.secrets._gen_secret()
- self.assertTrue(
- len(self._soledad.secrets._secrets) == 2,
- "Expected exactly 2 secrets.")
- self.assertTrue(
- secret_id_1 != secret_id_2,
- "Expected IDs of secrets to be distinct.")
- self.assertTrue(
- secret_id_1 in self._soledad.secrets._secrets,
- "Expected to find ID of first secret in Soledad Secrets.")
- self.assertTrue(
- secret_id_2 in self._soledad.secrets._secrets,
- "Expected to find ID of second secret in Soledad Secrets.")
-
- def test__has_secret(self):
- self.assertTrue(
- self._soledad._secrets._has_secret(),
- "Should have a secret at this point")
+ def test_generated_secrets_have_correct_length(self):
+ expected = self._soledad.secrets.lengths
+ for name, length in expected.iteritems():
+ secret = getattr(self._soledad.secrets, name)
+ self.assertEqual(length, len(secret))
class SoledadCryptoAESTestCase(BaseSoledadTest):
diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py
index 8ee3735c..8c711c22 100644
--- a/testing/tests/client/test_deprecated_crypto.py
+++ b/testing/tests/client/test_deprecated_crypto.py
@@ -1,5 +1,7 @@
import json
-from twisted.internet import defer
+
+from pytest import inlineCallbacks
+
from uuid import uuid4
from urlparse import urljoin
@@ -39,7 +41,7 @@ class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
def make_app_with_state(state):
return make_token_soledad_app(state)
- @defer.inlineCallbacks
+ @inlineCallbacks
def test_touch_updates_remote_representation(self):
self.startTwistedServer()
user = 'user-' + uuid4().hex
diff --git a/testing/tests/client/test_shared_db.py b/testing/tests/client/test_shared_db.py
index aac766c2..b045e524 100644
--- a/testing/tests/client/test_shared_db.py
+++ b/testing/tests/client/test_shared_db.py
@@ -2,7 +2,6 @@ from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.shared_db import SoledadSharedDatabase
from test_soledad.util import BaseSoledadTest
-from test_soledad.util import ADDRESS
class SoledadSharedDBTestCase(BaseSoledadTest):
@@ -14,37 +13,28 @@ class SoledadSharedDBTestCase(BaseSoledadTest):
def setUp(self):
BaseSoledadTest.setUp(self)
self._shared_db = SoledadSharedDatabase(
- 'https://provider/', ADDRESS, document_factory=SoledadDocument,
+ 'https://provider/', document_factory=SoledadDocument,
creds=None)
def tearDown(self):
BaseSoledadTest.tearDown(self)
- def test__get_secrets_from_shared_db(self):
+ def test__get_remote_doc(self):
"""
Ensure the shared db is queried with the correct doc_id.
"""
- doc_id = self._soledad.secrets._shared_db_doc_id()
- self._soledad.secrets._get_secrets_from_shared_db()
- self.assertTrue(
- self._soledad.shared_db.get_doc.assert_called_with(
- doc_id) is None,
- 'Wrong doc_id when fetching recovery document.')
-
- def test__put_secrets_in_shared_db(self):
+ doc_id = self._soledad.secrets.storage._remote_doc_id()
+ self._soledad.secrets.storage._get_remote_doc()
+ self._soledad.secrets.storage._shared_db.get_doc.assert_called_with(
+ doc_id)
+
+ def test_save_remote(self):
"""
Ensure recovery document is put into shared recover db.
"""
- doc_id = self._soledad.secrets._shared_db_doc_id()
- self._soledad.secrets._put_secrets_in_shared_db()
- self.assertTrue(
- self._soledad.shared_db.get_doc.assert_called_with(
- doc_id) is None,
- 'Wrong doc_id when fetching recovery document.')
- self.assertTrue(
- self._soledad.shared_db.put_doc.assert_called_with(
- self._doc_put) is None,
- 'Wrong document when putting recovery document.')
- self.assertTrue(
- self._doc_put.doc_id == doc_id,
- 'Wrong doc_id when putting recovery document.')
+ doc_id = self._soledad.secrets.storage._remote_doc_id()
+ storage = self._soledad.secrets.storage
+ storage.save_remote({'content': 'blah'})
+ storage._shared_db.get_doc.assert_called_with(doc_id)
+ storage._shared_db.put_doc.assert_called_with(self._doc_put)
+ self.assertTrue(self._doc_put.doc_id == doc_id)
diff --git a/testing/tests/client/test_signals.py b/testing/tests/client/test_signals.py
index 4e9ebfd0..c7609a74 100644
--- a/testing/tests/client/test_signals.py
+++ b/testing/tests/client/test_signals.py
@@ -20,7 +20,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
def setUp(self):
# mock signaling
soledad.client.signal = Mock()
- soledad.client.secrets.events.emit_async = Mock()
+ soledad.client._secrets.util.events.emit_async = Mock()
# run parent's setUp
BaseSoledadTest.setUp(self)
@@ -42,55 +42,36 @@ class SoledadSignalingTestCase(BaseSoledadTest):
- downloading keys / done downloading keys.
- uploading keys / done uploading keys.
"""
- soledad.client.secrets.events.emit_async.reset_mock()
+ soledad.client._secrets.util.events.emit_async.reset_mock()
# get a fresh instance so it emits all bootstrap signals
sol = self._soledad_instance(
secrets_path='alternative_stage3.json',
local_db_path='alternative_stage3.u1db')
# reverse call order so we can verify in the order the signals were
# expected
- soledad.client.secrets.events.emit_async.mock_calls.reverse()
- soledad.client.secrets.events.emit_async.call_args = \
- soledad.client.secrets.events.emit_async.call_args_list[0]
- soledad.client.secrets.events.emit_async.call_args_list.reverse()
+ soledad.client._secrets.util.events.emit_async.mock_calls.reverse()
+ soledad.client._secrets.util.events.emit_async.call_args = \
+ soledad.client._secrets.util.events.emit_async.call_args_list[0]
+ soledad.client._secrets.util.events.emit_async.call_args_list.reverse()
user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
- # downloading keys signals
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DOWNLOADING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data
- )
- # creating keys signals
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_CREATING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_CREATING_KEYS, user_data
- )
- # downloading once more (inside _put_keys_in_shared_db)
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DOWNLOADING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data
- )
- # uploading keys signals
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_UPLOADING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data
- )
+ def _assert(*args, **kwargs):
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.assert_called_with(*args)
+ pop = kwargs.get('pop')
+ if pop or pop is None:
+ self._pop_mock_call(mocked)
+
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_CREATING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False)
+
sol.close()
def test_stage2_bootstrap_signals(self):
@@ -101,11 +82,11 @@ class SoledadSignalingTestCase(BaseSoledadTest):
# get existing instance so we have access to keys
sol = self._soledad_instance()
# create a document with secrets
- doc = SoledadDocument(doc_id=sol.secrets._shared_db_doc_id())
- doc.content = sol.secrets._export_recovery_document()
+ doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id())
+ doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets)
sol.close()
# reset mock
- soledad.client.secrets.events.emit_async.reset_mock()
+ soledad.client._secrets.util.events.emit_async.reset_mock()
# get a fresh instance so it emits all bootstrap signals
shared_db = self.get_default_shared_mock(get_doc_return_value=doc)
sol = self._soledad_instance(
@@ -114,20 +95,23 @@ class SoledadSignalingTestCase(BaseSoledadTest):
shared_db_class=shared_db)
# reverse call order so we can verify in the order the signals were
# expected
- soledad.client.secrets.events.emit_async.mock_calls.reverse()
- soledad.client.secrets.events.emit_async.call_args = \
- soledad.client.secrets.events.emit_async.call_args_list[0]
- soledad.client.secrets.events.emit_async.call_args_list.reverse()
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.mock_calls.reverse()
+ mocked.call_args = mocked.call_args_list[0]
+ mocked.call_args_list.reverse()
+
+ def _assert(*args, **kwargs):
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.assert_called_with(*args)
+ pop = kwargs.get('pop')
+ if pop or pop is None:
+ self._pop_mock_call(mocked)
+
# assert download keys signals
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DOWNLOADING_KEYS,
- {'userid': ADDRESS, 'uuid': ADDRESS}
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,
- {'userid': ADDRESS, 'uuid': ADDRESS},
- )
+ user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False)
+
sol.close()
def test_stage1_bootstrap_signals(self):
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
index 6ce9a5c5..302a16b8 100644
--- a/testing/tests/sync/test_sync_target.py
+++ b/testing/tests/sync/test_sync_target.py
@@ -838,7 +838,7 @@ class TestSoledadDbSync(
# already created on some setUp method.
import binascii
tohex = binascii.b2a_hex
- key = tohex(self._soledad.secrets.get_local_storage_key())
+ key = tohex(self._soledad.secrets.local)
dbpath = self._soledad._local_db_path
self.opts = SQLCipherOptions(