diff options
author | drebs <drebs@leap.se> | 2016-11-10 23:50:30 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2016-11-10 23:50:30 -0200 |
commit | c1950b41e0995b0213227bd0ce2c633f312037dc (patch) | |
tree | 7c1fde54442fefd3553d33b3fe5a2ec454e0196b /testing/tests/client | |
parent | 507e284773d9c4954225635741f275c5d327e2a9 (diff) | |
parent | 6b23b3f3215f2443aa3e790559b63a41b3040072 (diff) |
Merge tag '0.8.1'
0.8.1
Diffstat (limited to 'testing/tests/client')
-rw-r--r-- | testing/tests/client/__init__.py | 0 | ||||
-rw-r--r-- | testing/tests/client/test_app.py | 44 | ||||
-rw-r--r-- | testing/tests/client/test_aux_methods.py | 147 | ||||
-rw-r--r-- | testing/tests/client/test_crypto.py | 226 | ||||
-rw-r--r-- | testing/tests/client/test_doc.py | 46 | ||||
-rw-r--r-- | testing/tests/client/test_http.py | 60 | ||||
-rw-r--r-- | testing/tests/client/test_http_client.py | 108 | ||||
-rw-r--r-- | testing/tests/client/test_https.py | 137 | ||||
-rw-r--r-- | testing/tests/client/test_shared_db.py | 50 | ||||
-rw-r--r-- | testing/tests/client/test_signals.py | 165 | ||||
-rw-r--r-- | testing/tests/client/test_soledad_doc.py | 46 |
11 files changed, 1029 insertions, 0 deletions
diff --git a/testing/tests/client/__init__.py b/testing/tests/client/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/testing/tests/client/__init__.py diff --git a/testing/tests/client/test_app.py b/testing/tests/client/test_app.py new file mode 100644 index 00000000..fef2f371 --- /dev/null +++ b/testing/tests/client/test_app.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# test_soledad_app.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test ObjectStore and Couch backend bits. +""" +from testscenarios import TestWithScenarios + +from test_soledad.util import BaseSoledadTest +from test_soledad.util import make_soledad_document_for_test +from test_soledad.util import make_token_soledad_app +from test_soledad.util import make_token_http_database_for_test +from test_soledad.util import copy_token_http_database_for_test +from test_soledad.u1db_tests import test_backends + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +# ----------------------------------------------------------------------------- + +class SoledadTests( + TestWithScenarios, test_backends.AllDatabaseTests, BaseSoledadTest): + + scenarios = [ + ('token_http', { + 'make_database_for_test': make_token_http_database_for_test, + 'copy_database_for_test': copy_token_http_database_for_test, + 'make_document_for_test': make_soledad_document_for_test, + 'make_app_with_state': make_token_soledad_app, + }) + ] diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py new file mode 100644 index 00000000..c25ff8ca --- /dev/null +++ b/testing/tests/client/test_aux_methods.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for general Soledad functionality. +""" +import os + +from twisted.internet import defer + +from leap.soledad.common.errors import DatabaseAccessError +from leap.soledad.client import Soledad +from leap.soledad.client.adbapi import U1DBConnectionPool +from leap.soledad.client.secrets import PassphraseTooShort + +from test_soledad.util import BaseSoledadTest + + +class AuxMethodsTestCase(BaseSoledadTest): + + def test__init_dirs(self): + sol = self._soledad_instance(prefix='_init_dirs') + local_db_dir = os.path.dirname(sol.local_db_path) + secrets_path = os.path.dirname(sol.secrets.secrets_path) + self.assertTrue(os.path.isdir(local_db_dir)) + self.assertTrue(os.path.isdir(secrets_path)) + + def _close_soledad(results): + sol.close() + + d = sol.create_doc({}) + d.addCallback(_close_soledad) + return d + + def test__init_u1db_sqlcipher_backend(self): + sol = self._soledad_instance(prefix='_init_db') + self.assertIsInstance(sol._dbpool, U1DBConnectionPool) + self.assertTrue(os.path.isfile(sol.local_db_path)) + sol.close() + + def test__init_config_with_defaults(self): + """ + Test if configuration defaults point to the correct place. + """ + + class SoledadMock(Soledad): + + def __init__(self): + pass + + # instantiate without initializing so we just test + # _init_config_with_defaults() + sol = SoledadMock() + sol._passphrase = u'' + sol._server_url = '' + sol._init_config_with_defaults() + # assert value of local_db_path + self.assertEquals( + os.path.join(sol.default_prefix, 'soledad.u1db'), + sol.local_db_path) + + def test__init_config_from_params(self): + """ + Test if configuration is correctly read from file. + """ + sol = self._soledad_instance( + 'leap@leap.se', + passphrase=u'123', + secrets_path='value_3', + local_db_path='value_2', + server_url='value_1', + cert_file=None) + self.assertEqual( + os.path.join(self.tempdir, 'value_3'), + sol.secrets.secrets_path) + self.assertEqual( + os.path.join(self.tempdir, 'value_2'), + sol.local_db_path) + self.assertEqual('value_1', sol._server_url) + sol.close() + + @defer.inlineCallbacks + def test_change_passphrase(self): + """ + Test if passphrase can be changed. + """ + prefix = '_change_passphrase' + sol = self._soledad_instance( + 'leap@leap.se', + passphrase=u'123', + prefix=prefix, + ) + + doc1 = yield sol.create_doc({'simple': 'doc'}) + sol.change_passphrase(u'654321') + sol.close() + + with self.assertRaises(DatabaseAccessError): + self._soledad_instance( + 'leap@leap.se', + passphrase=u'123', + prefix=prefix) + + sol2 = self._soledad_instance( + 'leap@leap.se', + passphrase=u'654321', + prefix=prefix) + doc2 = yield sol2.get_doc(doc1.doc_id) + + self.assertEqual(doc1, doc2) + + sol2.close() + + def test_change_passphrase_with_short_passphrase_raises(self): + """ + Test if attempt to change passphrase passing a short passphrase + raises. + """ + sol = self._soledad_instance( + 'leap@leap.se', + passphrase=u'123') + # check that soledad complains about new passphrase length + self.assertRaises( + PassphraseTooShort, + sol.change_passphrase, u'54321') + sol.close() + + def test_get_passphrase(self): + """ + Assert passphrase getter works fine. + """ + sol = self._soledad_instance() + self.assertEqual('123', sol._passphrase) + sol.close() diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py new file mode 100644 index 00000000..77252b46 --- /dev/null +++ b/testing/tests/client/test_crypto.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# test_crypto.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for cryptographic related stuff. +""" +import os +import hashlib +import binascii + +from leap.soledad.client import crypto +from leap.soledad.common.document import SoledadDocument +from test_soledad.util import BaseSoledadTest +from leap.soledad.common.crypto import WrongMacError +from leap.soledad.common.crypto import UnknownMacMethodError +from leap.soledad.common.crypto import ENC_JSON_KEY +from leap.soledad.common.crypto import ENC_SCHEME_KEY +from leap.soledad.common.crypto import MAC_KEY +from leap.soledad.common.crypto import MAC_METHOD_KEY + + +class EncryptedSyncTestCase(BaseSoledadTest): + + """ + Tests that guarantee that data will always be encrypted when syncing. + """ + + def test_encrypt_decrypt_json(self): + """ + Test encrypting and decrypting documents. + """ + simpledoc = {'key': 'val'} + doc1 = SoledadDocument(doc_id='id') + doc1.content = simpledoc + + # encrypt doc + doc1.set_json(self._soledad._crypto.encrypt_doc(doc1)) + # assert content is different and includes keys + self.assertNotEqual( + simpledoc, doc1.content, + 'incorrect document encryption') + self.assertTrue(ENC_JSON_KEY in doc1.content) + self.assertTrue(ENC_SCHEME_KEY in doc1.content) + # decrypt doc + doc1.set_json(self._soledad._crypto.decrypt_doc(doc1)) + self.assertEqual( + simpledoc, doc1.content, 'incorrect document encryption') + + +class RecoveryDocumentTestCase(BaseSoledadTest): + + def test_export_recovery_document_raw(self): + rd = self._soledad.secrets._export_recovery_document() + secret_id = rd[self._soledad.secrets.STORAGE_SECRETS_KEY].items()[0][0] + # assert exported secret is the same + secret = self._soledad.secrets._decrypt_storage_secret_version_1( + rd[self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]) + self.assertEqual(secret_id, self._soledad.secrets._secret_id) + self.assertEqual(secret, self._soledad.secrets._secrets[secret_id]) + # assert recovery document structure + encrypted_secret = rd[ + self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id] + self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret) + self.assertTrue( + encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256') + self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) + self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) + + def test_import_recovery_document(self): + rd = self._soledad.secrets._export_recovery_document() + s = self._soledad_instance() + s.secrets._import_recovery_document(rd) + s.secrets.set_secret_id(self._soledad.secrets._secret_id) + self.assertEqual(self._soledad.storage_secret, + s.storage_secret, + 'Failed settinng secret for symmetric encryption.') + s.close() + + +class SoledadSecretsTestCase(BaseSoledadTest): + + def test_new_soledad_instance_generates_one_secret(self): + self.assertTrue( + self._soledad.storage_secret is not None, + "Expected secret to be something different than None") + number_of_secrets = len(self._soledad.secrets._secrets) + self.assertTrue( + number_of_secrets == 1, + "Expected exactly 1 secret, got %d instead." % number_of_secrets) + + def test_generated_secret_is_of_correct_type(self): + expected_type = str + self.assertIsInstance( + self._soledad.storage_secret, expected_type, + "Expected secret to be of type %s" % expected_type) + + def test_generated_secret_has_correct_lengt(self): + expected_length = self._soledad.secrets.GEN_SECRET_LENGTH + actual_length = len(self._soledad.storage_secret) + self.assertTrue( + expected_length == actual_length, + "Expected secret with length %d, got %d instead." + % (expected_length, actual_length)) + + def test_generated_secret_id_is_sha256_hash_of_secret(self): + generated = self._soledad.secrets.secret_id + expected = hashlib.sha256(self._soledad.storage_secret).hexdigest() + self.assertTrue( + generated == expected, + "Expeceted generated secret id to be sha256 hash, got something " + "else instead.") + + def test_generate_new_secret_generates_different_secret_id(self): + # generate new secret + secret_id_1 = self._soledad.secrets.secret_id + secret_id_2 = self._soledad.secrets._gen_secret() + self.assertTrue( + len(self._soledad.secrets._secrets) == 2, + "Expected exactly 2 secrets.") + self.assertTrue( + secret_id_1 != secret_id_2, + "Expected IDs of secrets to be distinct.") + self.assertTrue( + secret_id_1 in self._soledad.secrets._secrets, + "Expected to find ID of first secret in Soledad Secrets.") + self.assertTrue( + secret_id_2 in self._soledad.secrets._secrets, + "Expected to find ID of second secret in Soledad Secrets.") + + def test__has_secret(self): + self.assertTrue( + self._soledad._secrets._has_secret(), + "Should have a secret at this point") + + +class MacAuthTestCase(BaseSoledadTest): + + def test_decrypt_with_wrong_mac_raises(self): + """ + Trying to decrypt a document with wrong MAC should raise. + """ + simpledoc = {'key': 'val'} + doc = SoledadDocument(doc_id='id') + doc.content = simpledoc + # encrypt doc + doc.set_json(self._soledad._crypto.encrypt_doc(doc)) + self.assertTrue(MAC_KEY in doc.content) + self.assertTrue(MAC_METHOD_KEY in doc.content) + # mess with MAC + doc.content[MAC_KEY] = '1234567890ABCDEF' + # try to decrypt doc + self.assertRaises( + WrongMacError, + self._soledad._crypto.decrypt_doc, doc) + + def test_decrypt_with_unknown_mac_method_raises(self): + """ + Trying to decrypt a document with unknown MAC method should raise. + """ + simpledoc = {'key': 'val'} + doc = SoledadDocument(doc_id='id') + doc.content = simpledoc + # encrypt doc + doc.set_json(self._soledad._crypto.encrypt_doc(doc)) + self.assertTrue(MAC_KEY in doc.content) + self.assertTrue(MAC_METHOD_KEY in doc.content) + # mess with MAC method + doc.content[MAC_METHOD_KEY] = 'mymac' + # try to decrypt doc + self.assertRaises( + UnknownMacMethodError, + self._soledad._crypto.decrypt_doc, doc) + + +class SoledadCryptoAESTestCase(BaseSoledadTest): + + def test_encrypt_decrypt_sym(self): + # generate 256-bit key + key = os.urandom(32) + iv, cyphertext = crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + plaintext = crypto.decrypt_sym(cyphertext, key, iv) + self.assertEqual('data', plaintext) + + def test_decrypt_with_wrong_iv_fails(self): + key = os.urandom(32) + iv, cyphertext = crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + # get a different iv by changing the first byte + rawiv = binascii.a2b_base64(iv) + wrongiv = rawiv + while wrongiv == rawiv: + wrongiv = os.urandom(1) + rawiv[1:] + plaintext = crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) + self.assertNotEqual('data', plaintext) + + def test_decrypt_with_wrong_key_fails(self): + key = os.urandom(32) + iv, cyphertext = crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + wrongkey = os.urandom(32) # 256-bits key + # ensure keys are different in case we are extremely lucky + while wrongkey == key: + wrongkey = os.urandom(32) + plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv) + self.assertNotEqual('data', plaintext) diff --git a/testing/tests/client/test_doc.py b/testing/tests/client/test_doc.py new file mode 100644 index 00000000..e158d768 --- /dev/null +++ b/testing/tests/client/test_doc.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# test_soledad_doc.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: soledad docs +""" +from testscenarios import TestWithScenarios + +from test_soledad.u1db_tests import test_document +from test_soledad.util import BaseSoledadTest +from test_soledad.util import make_soledad_document_for_test + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +# ----------------------------------------------------------------------------- + +class TestSoledadDocument( + TestWithScenarios, + test_document.TestDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) + + +class TestSoledadPyDocument( + TestWithScenarios, + test_document.TestPyDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) diff --git a/testing/tests/client/test_http.py b/testing/tests/client/test_http.py new file mode 100644 index 00000000..47df4b4a --- /dev/null +++ b/testing/tests/client/test_http.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: test http database +""" + +from twisted.trial import unittest + +from leap.soledad.client import auth +from leap.soledad.common.l2db.remote import http_database + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_database`. +# ----------------------------------------------------------------------------- + +class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth): + + """ + Wraps our token auth implementation. + """ + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + +class TestHTTPDatabaseWithCreds(unittest.TestCase): + + def test_get_sync_target_inherits_token_credentials(self): + # this test was from TestDatabaseSimpleOperations but we put it here + # for convenience. + self.db = _HTTPDatabase('dbase') + self.db.set_token_credentials('user-uuid', 'auth-token') + st = self.db.get_sync_target() + self.assertEqual(self.db._creds, st._creds) + + def test_ctr_with_creds(self): + db1 = _HTTPDatabase('http://dbs/db', creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + self.assertIn('token', db1._creds) diff --git a/testing/tests/client/test_http_client.py b/testing/tests/client/test_http_client.py new file mode 100644 index 00000000..a107930a --- /dev/null +++ b/testing/tests/client/test_http_client.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# test_http_client.py +# Copyright (C) 2013-2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync target +""" +import json + +from testscenarios import TestWithScenarios + +from leap.soledad.client import auth +from leap.soledad.common.l2db.remote import http_client +from test_soledad.u1db_tests import test_http_client +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_client`. +# ----------------------------------------------------------------------------- + +class TestSoledadClientBase( + TestWithScenarios, + test_http_client.TestHTTPClientBase): + + """ + This class should be used to test Token auth. + """ + + def getClient(self, **kwds): + cli = self.getClientWithToken(**kwds) + if 'creds' not in kwds: + cli.set_token_credentials('user-uuid', 'auth-token') + return cli + + def getClientWithToken(self, **kwds): + self.startServer() + + class _HTTPClientWithToken( + http_client.HTTPClientBase, auth.TokenBasedAuth): + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + return _HTTPClientWithToken(self.getURL('dbase'), **kwds) + + def app(self, environ, start_response): + res = test_http_client.TestHTTPClientBase.app( + self, environ, start_response) + if res is not None: + return res + # mime solead application here. + if '/token' in environ['PATH_INFO']: + auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY) + if not auth: + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [ + json.dumps( + {"error": "unauthorized", + "message": "no token found in environment"}) + ] + scheme, encoded = auth.split(None, 1) + if scheme.lower() != 'token': + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": "unknown scheme: %s" % scheme})] + uuid, token = encoded.decode('base64').split(':', 1) + if uuid != 'user-uuid' and token != 'auth-token': + return Exception("Incorrect address or token.") + start_response("200 OK", [('Content-Type', 'application/json')]) + return [json.dumps([environ['PATH_INFO'], uuid, token])] + + def test_token(self): + """ + Test if token is sent correctly. + """ + cli = self.getClientWithToken() + cli.set_token_credentials('user-uuid', 'auth-token') + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + + def test_token_ctr_creds(self): + cli = self.getClientWithToken(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) diff --git a/testing/tests/client/test_https.py b/testing/tests/client/test_https.py new file mode 100644 index 00000000..caac16da --- /dev/null +++ b/testing/tests/client/test_https.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# test_sync_target.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: https +""" +from unittest import skip + +from testscenarios import TestWithScenarios + +from leap.soledad import client + +from leap.soledad.common.l2db.remote import http_client +from test_soledad.u1db_tests import test_backends +from test_soledad.u1db_tests import test_https +from test_soledad.util import ( + BaseSoledadTest, + make_soledad_document_for_test, + make_soledad_app, + make_token_soledad_app, +) + + +LEAP_SCENARIOS = [ + ('http', { + 'make_database_for_test': test_backends.make_http_database_for_test, + 'copy_database_for_test': test_backends.copy_http_database_for_test, + 'make_document_for_test': make_soledad_document_for_test, + 'make_app_with_state': make_soledad_app}), +] + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_https`. +# ----------------------------------------------------------------------------- + +def token_leap_https_sync_target(test, host, path, cert_file=None): + _, port = test.server.server_address + # source_replica_uid = test._soledad._dbpool.replica_uid + creds = {'token': {'uuid': 'user-uuid', 'token': 'auth-token'}} + if not cert_file: + cert_file = test.cacert_pem + st = client.http_target.SoledadHTTPSyncTarget( + 'https://%s:%d/%s' % (host, port, path), + source_replica_uid='other-id', + creds=creds, + crypto=test._soledad._crypto, + cert_file=cert_file) + return st + + +@skip("Skiping tests imported from U1DB.") +class TestSoledadHTTPSyncTargetHttpsSupport( + TestWithScenarios, + # test_https.TestHttpSyncTargetHttpsSupport, + BaseSoledadTest): + + scenarios = [ + ('token_soledad_https', + { + # 'server_def': test_https.https_server_def, + 'make_app_with_state': make_token_soledad_app, + 'make_document_for_test': make_soledad_document_for_test, + 'sync_target': token_leap_https_sync_target}), + ] + + def setUp(self): + # the parent constructor undoes our SSL monkey patch to ensure tests + # run smoothly with standard u1db. + test_https.TestHttpSyncTargetHttpsSupport.setUp(self) + # so here monkey patch again to test our functionality. + api = client.api + http_client._VerifiedHTTPSConnection = api.VerifiedHTTPSConnection + client.api.SOLEDAD_CERT = http_client.CA_CERTS + + def test_cannot_verify_cert(self): + self.startServer() + # don't print expected traceback server-side + self.server.handle_error = lambda req, cli_addr: None + self.request_state._create_database('test') + remote_target = self.getSyncTarget( + 'localhost', 'test', cert_file=http_client.CA_CERTS) + d = remote_target.record_sync_info('other-id', 2, 'T-id') + + def _assert_raises(result): + from twisted.python.failure import Failure + if isinstance(result, Failure): + from OpenSSL.SSL import Error + error = result.value.message[0].value + if isinstance(error, Error): + msg = error.message[0][2] + self.assertEqual("certificate verify failed", msg) + return + self.fail("certificate verification should have failed.") + + d.addCallbacks(_assert_raises, _assert_raises) + return d + + def test_working(self): + """ + Test that SSL connections work well. + + This test was adapted to patch Soledad's HTTPS connection custom class + with the intended CA certificates. + """ + self.startServer() + db = self.request_state._create_database('test') + remote_target = self.getSyncTarget('localhost', 'test') + d = remote_target.record_sync_info('other-id', 2, 'T-id') + d.addCallback(lambda _: + self.assertEqual( + (2, 'T-id'), + db._get_replica_gen_and_trans_id('other-id') + )) + d.addCallback(lambda _: remote_target.close()) + return d + + def test_host_mismatch(self): + """ + This test is disabled because soledad's twisted-based http agent uses + pyOpenSSL, which will complain if we try to use an IP to connect to + the remote host (see the original test in u1db_tests/test_https.py). + """ + pass diff --git a/testing/tests/client/test_shared_db.py b/testing/tests/client/test_shared_db.py new file mode 100644 index 00000000..aac766c2 --- /dev/null +++ b/testing/tests/client/test_shared_db.py @@ -0,0 +1,50 @@ +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client.shared_db import SoledadSharedDatabase + +from test_soledad.util import BaseSoledadTest +from test_soledad.util import ADDRESS + + +class SoledadSharedDBTestCase(BaseSoledadTest): + + """ + These tests ensure the functionalities of the shared recovery database. + """ + + def setUp(self): + BaseSoledadTest.setUp(self) + self._shared_db = SoledadSharedDatabase( + 'https://provider/', ADDRESS, document_factory=SoledadDocument, + creds=None) + + def tearDown(self): + BaseSoledadTest.tearDown(self) + + def test__get_secrets_from_shared_db(self): + """ + Ensure the shared db is queried with the correct doc_id. + """ + doc_id = self._soledad.secrets._shared_db_doc_id() + self._soledad.secrets._get_secrets_from_shared_db() + self.assertTrue( + self._soledad.shared_db.get_doc.assert_called_with( + doc_id) is None, + 'Wrong doc_id when fetching recovery document.') + + def test__put_secrets_in_shared_db(self): + """ + Ensure recovery document is put into shared recover db. + """ + doc_id = self._soledad.secrets._shared_db_doc_id() + self._soledad.secrets._put_secrets_in_shared_db() + self.assertTrue( + self._soledad.shared_db.get_doc.assert_called_with( + doc_id) is None, + 'Wrong doc_id when fetching recovery document.') + self.assertTrue( + self._soledad.shared_db.put_doc.assert_called_with( + self._doc_put) is None, + 'Wrong document when putting recovery document.') + self.assertTrue( + self._doc_put.doc_id == doc_id, + 'Wrong doc_id when putting recovery document.') diff --git a/testing/tests/client/test_signals.py b/testing/tests/client/test_signals.py new file mode 100644 index 00000000..4e9ebfd0 --- /dev/null +++ b/testing/tests/client/test_signals.py @@ -0,0 +1,165 @@ +from mock import Mock +from twisted.internet import defer + +from leap import soledad +from leap.common.events import catalog +from leap.soledad.common.document import SoledadDocument + +from test_soledad.util import ADDRESS +from test_soledad.util import BaseSoledadTest + + +class SoledadSignalingTestCase(BaseSoledadTest): + + """ + These tests ensure signals are correctly emmited by Soledad. + """ + + EVENTS_SERVER_PORT = 8090 + + def setUp(self): + # mock signaling + soledad.client.signal = Mock() + soledad.client.secrets.events.emit_async = Mock() + # run parent's setUp + BaseSoledadTest.setUp(self) + + def tearDown(self): + BaseSoledadTest.tearDown(self) + + def _pop_mock_call(self, mocked): + mocked.call_args_list.pop() + mocked.mock_calls.pop() + mocked.call_args = mocked.call_args_list[-1] + + def test_stage3_bootstrap_signals(self): + """ + Test that a fresh soledad emits all bootstrap signals. + + Signals are: + - downloading keys / done downloading keys. + - creating keys / done creating keys. + - downloading keys / done downloading keys. + - uploading keys / done uploading keys. + """ + soledad.client.secrets.events.emit_async.reset_mock() + # get a fresh instance so it emits all bootstrap signals + sol = self._soledad_instance( + secrets_path='alternative_stage3.json', + local_db_path='alternative_stage3.u1db') + # reverse call order so we can verify in the order the signals were + # expected + soledad.client.secrets.events.emit_async.mock_calls.reverse() + soledad.client.secrets.events.emit_async.call_args = \ + soledad.client.secrets.events.emit_async.call_args_list[0] + soledad.client.secrets.events.emit_async.call_args_list.reverse() + + user_data = {'userid': ADDRESS, 'uuid': ADDRESS} + + # downloading keys signals + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DOWNLOADING_KEYS, user_data + ) + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data + ) + # creating keys signals + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_CREATING_KEYS, user_data + ) + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_CREATING_KEYS, user_data + ) + # downloading once more (inside _put_keys_in_shared_db) + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DOWNLOADING_KEYS, user_data + ) + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data + ) + # uploading keys signals + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_UPLOADING_KEYS, user_data + ) + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data + ) + sol.close() + + def test_stage2_bootstrap_signals(self): + """ + Test that if there are keys in server, soledad will download them and + emit corresponding signals. + """ + # get existing instance so we have access to keys + sol = self._soledad_instance() + # create a document with secrets + doc = SoledadDocument(doc_id=sol.secrets._shared_db_doc_id()) + doc.content = sol.secrets._export_recovery_document() + sol.close() + # reset mock + soledad.client.secrets.events.emit_async.reset_mock() + # get a fresh instance so it emits all bootstrap signals + shared_db = self.get_default_shared_mock(get_doc_return_value=doc) + sol = self._soledad_instance( + secrets_path='alternative_stage2.json', + local_db_path='alternative_stage2.u1db', + shared_db_class=shared_db) + # reverse call order so we can verify in the order the signals were + # expected + soledad.client.secrets.events.emit_async.mock_calls.reverse() + soledad.client.secrets.events.emit_async.call_args = \ + soledad.client.secrets.events.emit_async.call_args_list[0] + soledad.client.secrets.events.emit_async.call_args_list.reverse() + # assert download keys signals + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DOWNLOADING_KEYS, + {'userid': ADDRESS, 'uuid': ADDRESS} + ) + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, + {'userid': ADDRESS, 'uuid': ADDRESS}, + ) + sol.close() + + def test_stage1_bootstrap_signals(self): + """ + Test that if soledad already has a local secret, it emits no signals. + """ + soledad.client.signal.reset_mock() + # get an existent instance so it emits only some of bootstrap signals + sol = self._soledad_instance() + self.assertEqual([], soledad.client.signal.mock_calls) + sol.close() + + @defer.inlineCallbacks + def test_sync_signals(self): + """ + Test Soledad emits SOLEDAD_CREATING_KEYS signal. + """ + # get a fresh instance so it emits all bootstrap signals + sol = self._soledad_instance() + soledad.client.signal.reset_mock() + + # mock the actual db sync so soledad does not try to connect to the + # server + d = defer.Deferred() + d.callback(None) + sol._dbsyncer.sync = Mock(return_value=d) + + yield sol.sync() + + # assert the signal has been emitted + soledad.client.events.emit_async.assert_called_with( + catalog.SOLEDAD_DONE_DATA_SYNC, + {'userid': ADDRESS, 'uuid': ADDRESS}, + ) + sol.close() diff --git a/testing/tests/client/test_soledad_doc.py b/testing/tests/client/test_soledad_doc.py new file mode 100644 index 00000000..e158d768 --- /dev/null +++ b/testing/tests/client/test_soledad_doc.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# test_soledad_doc.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: soledad docs +""" +from testscenarios import TestWithScenarios + +from test_soledad.u1db_tests import test_document +from test_soledad.util import BaseSoledadTest +from test_soledad.util import make_soledad_document_for_test + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +# ----------------------------------------------------------------------------- + +class TestSoledadDocument( + TestWithScenarios, + test_document.TestDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) + + +class TestSoledadPyDocument( + TestWithScenarios, + test_document.TestPyDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': make_soledad_document_for_test})]) |