summaryrefslogtreecommitdiff
path: root/src/leap
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap')
-rw-r--r--src/leap/soledad/__init__.py3
-rw-r--r--src/leap/soledad/tests/__init__.py7
-rw-r--r--src/leap/soledad/tests/test_crypto.py2
-rw-r--r--src/leap/soledad/tests/test_leap_backend.py2
-rw-r--r--src/leap/soledad/tests/test_soledad.py166
-rw-r--r--src/leap/soledad/tests/test_sqlcipher.py2
6 files changed, 172 insertions, 10 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index b847364f..c7f8cff3 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -50,7 +50,6 @@ from u1db.remote.ssl_match_hostname import ( # noqa
from leap.common import events
from leap.common.check import leap_assert
from leap.common.files import mkdir_p
-from leap.common.keymanager.errors import DecryptionFailed
from leap.soledad.backends import sqlcipher
from leap.soledad.backends.leap_backend import (
LeapDocument,
@@ -450,7 +449,7 @@ class Soledad(object):
key = scrypt.hash(self._passphrase, salt, buflen=32)
iv, ciphertext = self._crypto.encrypt_sym(secret, key)
self._secrets[secret_id] = {
- # leap.common.keymanager.openpgp uses AES256 for symmetric
+ # leap.soledad.crypto submodule uses AES256 for symmetric
# encryption.
self.KDF_KEY: 'scrypt', # TODO: remove hard coded kdf
self.KDF_SALT_KEY: binascii.b2a_base64(salt),
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py
index c00fb847..9fec5530 100644
--- a/src/leap/soledad/tests/__init__.py
+++ b/src/leap/soledad/tests/__init__.py
@@ -22,6 +22,9 @@ from leap.common.testing.basetest import BaseLeapTest
# instance in each test.
#-----------------------------------------------------------------------------
+ADDRESS = 'leap@leap.se'
+
+
class BaseSoledadTest(BaseLeapTest):
"""
Instantiates Soledad for usage in tests.
@@ -31,7 +34,7 @@ class BaseSoledadTest(BaseLeapTest):
# config info
self.db1_file = os.path.join(self.tempdir, "db1.u1db")
self.db2_file = os.path.join(self.tempdir, "db2.u1db")
- self.email = 'leap@leap.se'
+ self.email = ADDRESS
# open test dbs
self._db1 = u1db.open(self.db1_file, create=True,
document_factory=LeapDocument)
@@ -48,7 +51,7 @@ class BaseSoledadTest(BaseLeapTest):
os.unlink(f)
self._soledad.close()
- def _soledad_instance(self, user='leap@leap.se', passphrase='123',
+ def _soledad_instance(self, user=ADDRESS, passphrase='123',
prefix='',
secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME,
local_db_path='soledad.u1db', server_url='',
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index 5432856e..ae84dad3 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -233,7 +233,6 @@ class CryptoMethodsTestCase(BaseSoledadTest):
self.assertTrue(
secret_id_2 == hashlib.sha256(sol.storage_secret).hexdigest())
-
def test__has_secret(self):
sol = self._soledad_instance(user='user@leap.se')
self.assertTrue(sol._has_secret(), "Should have a secret at "
@@ -248,7 +247,6 @@ class CryptoMethodsTestCase(BaseSoledadTest):
self.assertFalse(sol._has_secret())
-
class MacAuthTestCase(BaseSoledadTest):
def test_decrypt_with_wrong_mac_raises(self):
diff --git a/src/leap/soledad/tests/test_leap_backend.py b/src/leap/soledad/tests/test_leap_backend.py
index d04ee412..6914e869 100644
--- a/src/leap/soledad/tests/test_leap_backend.py
+++ b/src/leap/soledad/tests/test_leap_backend.py
@@ -688,7 +688,7 @@ class LeapDatabaseSyncTargetTests(
self.assertEqual(
[(doc.doc_id, doc.rev, 1),
(doc2.doc_id, doc2.rev, 2)],
- [c[:-3]+c[-2:-1] for c in self.other_changes])
+ [c[:-3] + c[-2:-1] for c in self.other_changes])
self.assertEqual(
json.loads(tests.simple_doc),
json.loads(self.other_changes[0][2]))
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
index 1c0e6d4a..09711f19 100644
--- a/src/leap/soledad/tests/test_soledad.py
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -25,14 +25,29 @@ import os
import re
import tempfile
import simplejson as json
+from mock import Mock
from leap.common.testing.basetest import BaseLeapTest
-from leap.soledad.tests import BaseSoledadTest
+from leap.common.events import (
+ server,
+ component,
+ events_pb2 as proto,
+ register,
+ signal,
+)
+from leap.soledad.tests import (
+ BaseSoledadTest,
+ ADDRESS,
+)
+from leap import soledad
from leap.soledad import Soledad
from leap.soledad.crypto import SoledadCrypto
from leap.soledad.shared_db import SoledadSharedDatabase
-from leap.soledad.backends.leap_backend import LeapDocument
+from leap.soledad.backends.leap_backend import (
+ LeapDocument,
+ LeapSyncTarget,
+)
class AuxMethodsTestCase(BaseSoledadTest):
@@ -138,3 +153,150 @@ class SoledadSharedDBTestCase(BaseSoledadTest):
self.assertTrue(
self._doc_put.doc_id == doc_id,
'Wrong doc_id when putting recovery document.')
+
+
+class SoledadSignalingTestCase(BaseSoledadTest):
+ """
+ These tests ensure signals are correctly emmited by Soledad.
+ """
+
+ EVENTS_SERVER_PORT = 8090
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ # mock signaling
+ soledad.events.signal = Mock()
+
+ def tearDown(self):
+ pass
+
+ def _pop_mock_call(self, mocked):
+ mocked.call_args_list.pop()
+ mocked.mock_calls.pop()
+ mocked.call_args = mocked.call_args_list[-1]
+
+ def test_stage2_bootstrap_signals(self):
+ """
+ Test that a fresh soledad emits all bootstrap signals.
+ """
+ soledad.events.signal.reset_mock()
+ # get a fresh instance so it emits all bootstrap signals
+ sol = self._soledad_instance(
+ secrets_path='alternative.json',
+ local_db_path='alternative.u1db')
+ # reverse call order so we can verify in the order the signals were
+ # expected
+ soledad.events.signal.mock_calls.reverse()
+ soledad.events.signal.call_args = \
+ soledad.events.signal.call_args_list[0]
+ soledad.events.signal.call_args_list.reverse()
+ # assert signals
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DOWNLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_DOWNLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_CREATING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_CREATING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DOWNLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_DOWNLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_UPLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_UPLOADING_KEYS,
+ ADDRESS,
+ )
+
+ def test_stage1_bootstrap_signals(self):
+ """
+ Test that an existent soledad emits some of the bootstrap signals.
+ """
+ soledad.events.signal.reset_mock()
+ # get an existent instance so it emits only some of bootstrap signals
+ sol = self._soledad_instance()
+ # reverse call order so we can verify in the order the signals were
+ # expected
+ soledad.events.signal.mock_calls.reverse()
+ soledad.events.signal.call_args = \
+ soledad.events.signal.call_args_list[0]
+ soledad.events.signal.call_args_list.reverse()
+ # assert signals
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DOWNLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_DOWNLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_UPLOADING_KEYS,
+ ADDRESS,
+ )
+ self._pop_mock_call(soledad.events.signal)
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_UPLOADING_KEYS,
+ ADDRESS,
+ )
+
+ def test_sync_signals(self):
+ """
+ Test Soledad emits SOLEDAD_CREATING_KEYS signal.
+ """
+ soledad.events.signal.reset_mock()
+ # get a fresh instance so it emits all bootstrap signals
+ sol = self._soledad_instance()
+ # mock the actual db sync so soledad does not try to connect to the
+ # server
+ sol._db.sync = Mock()
+ # do the sync
+ sol.sync()
+ # assert the signal has been emitted
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_DONE_DATA_SYNC,
+ ADDRESS,
+ )
+
+ def test_need_sync_signals(self):
+ """
+ Test Soledad emits SOLEDAD_CREATING_KEYS signal.
+ """
+ soledad.events.signal.reset_mock()
+ sol = self._soledad_instance()
+ # mock the sync target
+ LeapSyncTarget.get_sync_info = Mock(return_value=[0, 0, 0, 0, 2])
+ # mock our generation so soledad thinks there's new data to sync
+ sol._db._get_generation = Mock(return_value=1)
+ # check for new data to sync
+ sol.need_sync('http://provider/userdb')
+ # assert the signal has been emitted
+ soledad.events.signal.assert_called_with(
+ proto.SOLEDAD_NEW_DATA_TO_SYNC,
+ ADDRESS,
+ )
diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py
index dfc5577b..9741bd4e 100644
--- a/src/leap/soledad/tests/test_sqlcipher.py
+++ b/src/leap/soledad/tests/test_sqlcipher.py
@@ -745,7 +745,7 @@ class SQLCipherSyncTargetTests(
self.assertEqual(
[(doc.doc_id, doc.rev, 1),
(doc2.doc_id, doc2.rev, 2)],
- [c[:2]+c[3:4] for c in self.other_changes])
+ [c[:2] + c[3:4] for c in self.other_changes])
self.assertEqual(
json.dumps(tests.simple_doc),
json.dumps(self.other_changes[0][2]))