summaryrefslogtreecommitdiff
path: root/testing
diff options
context:
space:
mode:
Diffstat (limited to 'testing')
-rw-r--r--testing/test_soledad/util.py5
-rw-r--r--testing/tests/benchmarks/conftest.py18
-rw-r--r--testing/tests/benchmarks/test_sync.py16
-rw-r--r--testing/tests/client/test_aux_methods.py35
-rw-r--r--testing/tests/client/test_crypto.py165
-rw-r--r--testing/tests/client/test_deprecated_crypto.py8
-rw-r--r--testing/tests/client/test_http_client.py108
-rw-r--r--testing/tests/client/test_secrets.py159
-rw-r--r--testing/tests/client/test_shared_db.py38
-rw-r--r--testing/tests/client/test_signals.py98
-rw-r--r--testing/tests/conftest.py26
-rw-r--r--testing/tests/server/test__resource.py66
-rw-r--r--testing/tests/server/test__server_info.py43
-rw-r--r--testing/tests/server/test_auth.py104
-rw-r--r--testing/tests/server/test_config.py69
-rw-r--r--testing/tests/server/test_server.py291
-rw-r--r--testing/tests/server/test_session.py186
-rw-r--r--testing/tests/server/test_url_mapper.py131
-rw-r--r--testing/tests/sync/test_sync_target.py5
19 files changed, 955 insertions, 616 deletions
diff --git a/testing/test_soledad/util.py b/testing/test_soledad/util.py
index 57f8199b..83a27016 100644
--- a/testing/test_soledad/util.py
+++ b/testing/test_soledad/util.py
@@ -52,7 +52,6 @@ from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client._crypto import is_symmetrically_encrypted
from leap.soledad.server import SoledadApp
-from leap.soledad.server.auth import SoledadTokenAuthMiddleware
PASSWORD = '123456'
@@ -108,7 +107,7 @@ def make_soledad_app(state):
def make_token_soledad_app(state):
- app = SoledadApp(state)
+ application = SoledadApp(state)
def _verify_authentication_data(uuid, auth_data):
if uuid.startswith('user-') and auth_data == 'auth-token':
@@ -119,7 +118,6 @@ def make_token_soledad_app(state):
def _verify_authorization(uuid, environ):
return True
- application = SoledadTokenAuthMiddleware(app)
application._verify_authentication_data = _verify_authentication_data
application._verify_authorization = _verify_authorization
return application
@@ -182,7 +180,6 @@ class MockedSharedDBTest(object):
put_doc = Mock(side_effect=put_doc_side_effect)
open = Mock(return_value=None)
close = Mock(return_value=None)
- syncable = True
def __call__(self):
return self
diff --git a/testing/tests/benchmarks/conftest.py b/testing/tests/benchmarks/conftest.py
index a9cc3464..1b99d96e 100644
--- a/testing/tests/benchmarks/conftest.py
+++ b/testing/tests/benchmarks/conftest.py
@@ -12,12 +12,30 @@ from leap.common.events import server
server.ensure_server()
+#
+# pytest customizations
+#
+
def pytest_addoption(parser):
parser.addoption(
"--num-docs", type="int", default=100,
help="the number of documents to use in performance tests")
+# mark benchmark tests using their group names (thanks ionelmc! :)
+def pytest_collection_modifyitems(items):
+ for item in items:
+ bench = item.get_marker("benchmark")
+ if bench and bench.kwargs.get('group'):
+ group = bench.kwargs['group']
+ marker = getattr(pytest.mark, 'benchmark_' + group)
+ item.add_marker(marker)
+
+
+#
+# benchmark fixtures
+#
+
@pytest.fixture()
def payload():
def generate(size):
diff --git a/testing/tests/benchmarks/test_sync.py b/testing/tests/benchmarks/test_sync.py
index 1501d74b..fcfab998 100644
--- a/testing/tests/benchmarks/test_sync.py
+++ b/testing/tests/benchmarks/test_sync.py
@@ -11,6 +11,12 @@ def load_up(client, amount, payload):
yield gatherResults(deferreds)
+# Each test created with this function will:
+#
+# - get a fresh client.
+# - iterate:
+# - setup: create N docs of a certain size
+# - benchmark: sync() -- uploads N docs.
def create_upload(uploads, size):
@pytest.inlineCallbacks
@pytest.mark.benchmark(group="test_upload")
@@ -29,6 +35,14 @@ test_upload_100_100k = create_upload(100, 100 * 1000)
test_upload_1000_10k = create_upload(1000, 10 * 1000)
+# Each test created with this function will:
+#
+# - get a fresh client.
+# - create N docs of a certain size
+# - sync (uploads those docs)
+# - iterate:
+# - setup: get a fresh client with empty local db
+# - benchmark: sync() -- downloads N docs.
def create_download(downloads, size):
@pytest.inlineCallbacks
@pytest.mark.benchmark(group="test_download")
@@ -41,7 +55,7 @@ def create_download(downloads, size):
# ensures we are dealing with properly encrypted docs
def setup():
- return soledad_client()
+ return soledad_client(force_fresh_db=True)
def sync(clean_client):
return clean_client.sync()
diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py
index 9b4a175f..729aa28a 100644
--- a/testing/tests/client/test_aux_methods.py
+++ b/testing/tests/client/test_aux_methods.py
@@ -19,12 +19,11 @@ Tests for general Soledad functionality.
"""
import os
-from twisted.internet import defer
+from pytest import inlineCallbacks
from leap.soledad.client import Soledad
from leap.soledad.client.adbapi import U1DBConnectionPool
-from leap.soledad.client.secrets import PassphraseTooShort
-from leap.soledad.client.secrets import SecretsException
+from leap.soledad.client._secrets.util import SecretsError
from test_soledad.util import BaseSoledadTest
@@ -34,7 +33,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
def test__init_dirs(self):
sol = self._soledad_instance(prefix='_init_dirs')
local_db_dir = os.path.dirname(sol.local_db_path)
- secrets_path = os.path.dirname(sol.secrets.secrets_path)
+ secrets_path = os.path.dirname(sol.secrets_path)
self.assertTrue(os.path.isdir(local_db_dir))
self.assertTrue(os.path.isdir(secrets_path))
@@ -64,8 +63,8 @@ class AuxMethodsTestCase(BaseSoledadTest):
# instantiate without initializing so we just test
# _init_config_with_defaults()
sol = SoledadMock()
- sol._passphrase = u''
- sol._server_url = ''
+ sol.passphrase = u''
+ sol.server_url = ''
sol._init_config_with_defaults()
# assert value of local_db_path
self.assertEquals(
@@ -85,14 +84,14 @@ class AuxMethodsTestCase(BaseSoledadTest):
cert_file=None)
self.assertEqual(
os.path.join(self.tempdir, 'value_3'),
- sol.secrets.secrets_path)
+ sol.secrets_path)
self.assertEqual(
os.path.join(self.tempdir, 'value_2'),
sol.local_db_path)
- self.assertEqual('value_1', sol._server_url)
+ self.assertEqual('value_1', sol.server_url)
sol.close()
- @defer.inlineCallbacks
+ @inlineCallbacks
def test_change_passphrase(self):
"""
Test if passphrase can be changed.
@@ -108,7 +107,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol.change_passphrase(u'654321')
sol.close()
- with self.assertRaises(SecretsException):
+ with self.assertRaises(SecretsError):
self._soledad_instance(
'leap@leap.se',
passphrase=u'123',
@@ -124,24 +123,10 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol2.close()
- def test_change_passphrase_with_short_passphrase_raises(self):
- """
- Test if attempt to change passphrase passing a short passphrase
- raises.
- """
- sol = self._soledad_instance(
- 'leap@leap.se',
- passphrase=u'123')
- # check that soledad complains about new passphrase length
- self.assertRaises(
- PassphraseTooShort,
- sol.change_passphrase, u'54321')
- sol.close()
-
def test_get_passphrase(self):
"""
Assert passphrase getter works fine.
"""
sol = self._soledad_instance()
- self.assertEqual('123', sol._passphrase)
+ self.assertEqual('123', sol.passphrase)
sol.close()
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 49a61438..399fdc99 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -19,7 +19,6 @@ Tests for cryptographic related stuff.
"""
import binascii
import base64
-import hashlib
import json
import os
@@ -111,7 +110,7 @@ class BlobTestCase(unittest.TestCase):
assert len(preamble) == _crypto.PACMAN.size
unpacked_data = _crypto.PACMAN.unpack(preamble)
- magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data
assert magic == _crypto.BLOB_SIGNATURE_MAGIC
assert sch == 1
assert meth == _crypto.ENC_METHOD.aes_256_gcm
@@ -186,99 +185,13 @@ class BlobTestCase(unittest.TestCase):
yield crypto.decrypt_doc(doc2)
-class RecoveryDocumentTestCase(BaseSoledadTest):
-
- def test_export_recovery_document_raw(self):
- rd = self._soledad.secrets._export_recovery_document()
- secret_id = rd[self._soledad.secrets.STORAGE_SECRETS_KEY].items()[0][0]
- # assert exported secret is the same
- secret = self._soledad.secrets._decrypt_storage_secret_version_1(
- rd[self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id])
- self.assertEqual(secret_id, self._soledad.secrets._secret_id)
- self.assertEqual(secret, self._soledad.secrets._secrets[secret_id])
- # assert recovery document structure
- encrypted_secret = rd[
- self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]
- self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret)
- self.assertEquals(
- _crypto.ENC_METHOD.aes_256_gcm,
- encrypted_secret[self._soledad.secrets.CIPHER_KEY])
- self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)
- self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret)
-
- def test_import_recovery_document(self, cipher='aes256'):
- rd = self._soledad.secrets._export_recovery_document(cipher)
- s = self._soledad_instance()
- s.secrets._import_recovery_document(rd)
- s.secrets.set_secret_id(self._soledad.secrets._secret_id)
- self.assertEqual(self._soledad.storage_secret,
- s.storage_secret,
- 'Failed settinng secret for symmetric encryption.')
- s.close()
-
- def test_import_GCM_recovery_document(self):
- cipher = self._soledad.secrets.CIPHER_AES256_GCM
- self.test_import_recovery_document(cipher)
-
- def test_import_legacy_CTR_recovery_document(self):
- cipher = self._soledad.secrets.CIPHER_AES256
- self.test_import_recovery_document(cipher)
-
-
class SoledadSecretsTestCase(BaseSoledadTest):
- def test_new_soledad_instance_generates_one_secret(self):
- self.assertTrue(
- self._soledad.storage_secret is not None,
- "Expected secret to be something different than None")
- number_of_secrets = len(self._soledad.secrets._secrets)
- self.assertTrue(
- number_of_secrets == 1,
- "Expected exactly 1 secret, got %d instead." % number_of_secrets)
-
- def test_generated_secret_is_of_correct_type(self):
- expected_type = str
- self.assertIsInstance(
- self._soledad.storage_secret, expected_type,
- "Expected secret to be of type %s" % expected_type)
-
- def test_generated_secret_has_correct_lengt(self):
- expected_length = self._soledad.secrets.GEN_SECRET_LENGTH
- actual_length = len(self._soledad.storage_secret)
- self.assertTrue(
- expected_length == actual_length,
- "Expected secret with length %d, got %d instead."
- % (expected_length, actual_length))
-
- def test_generated_secret_id_is_sha256_hash_of_secret(self):
- generated = self._soledad.secrets.secret_id
- expected = hashlib.sha256(self._soledad.storage_secret).hexdigest()
- self.assertTrue(
- generated == expected,
- "Expeceted generated secret id to be sha256 hash, got something "
- "else instead.")
-
- def test_generate_new_secret_generates_different_secret_id(self):
- # generate new secret
- secret_id_1 = self._soledad.secrets.secret_id
- secret_id_2 = self._soledad.secrets._gen_secret()
- self.assertTrue(
- len(self._soledad.secrets._secrets) == 2,
- "Expected exactly 2 secrets.")
- self.assertTrue(
- secret_id_1 != secret_id_2,
- "Expected IDs of secrets to be distinct.")
- self.assertTrue(
- secret_id_1 in self._soledad.secrets._secrets,
- "Expected to find ID of first secret in Soledad Secrets.")
- self.assertTrue(
- secret_id_2 in self._soledad.secrets._secrets,
- "Expected to find ID of second secret in Soledad Secrets.")
-
- def test__has_secret(self):
- self.assertTrue(
- self._soledad._secrets._has_secret(),
- "Should have a secret at this point")
+ def test_generated_secrets_have_correct_length(self):
+ expected = self._soledad.secrets.lengths
+ for name, length in expected.iteritems():
+ secret = getattr(self._soledad.secrets, name)
+ self.assertEqual(length, len(secret))
class SoledadCryptoAESTestCase(BaseSoledadTest):
@@ -322,10 +235,74 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
_crypto.decrypt_sym(cyphertext, wrongkey, iv)
-def _aes_encrypt(key, iv, data):
+class PreambleTestCase(unittest.TestCase):
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ def setUp(self):
+ self.cleartext = BytesIO(snowden1)
+ self.blob = _crypto.BlobEncryptor(
+ self.doc_info, self.cleartext,
+ secret='A' * 96)
+
+ def test_preamble_starts_with_magic_signature(self):
+ preamble = self.blob._encode_preamble()
+ assert preamble.startswith(_crypto.BLOB_SIGNATURE_MAGIC)
+
+ def test_preamble_has_cipher_metadata(self):
+ preamble = self.blob._encode_preamble()
+ unpacked = _crypto.PACMAN.unpack(preamble)
+ encryption_scheme, encryption_method = unpacked[1:3]
+ assert encryption_scheme in _crypto.ENC_SCHEME
+ assert encryption_method in _crypto.ENC_METHOD
+ assert unpacked[4] == self.blob.iv
+
+ def test_preamble_has_document_sync_metadata(self):
+ preamble = self.blob._encode_preamble()
+ unpacked = _crypto.PACMAN.unpack(preamble)
+ doc_id, doc_rev = unpacked[5:7]
+ assert doc_id == self.doc_info.doc_id
+ assert doc_rev == self.doc_info.rev
+
+ def test_preamble_has_document_size(self):
+ preamble = self.blob._encode_preamble()
+ unpacked = _crypto.PACMAN.unpack(preamble)
+ size = unpacked[7]
+ assert size == len(snowden1)
+
+ @defer.inlineCallbacks
+ def test_preamble_can_come_without_size(self):
+ # XXX: This test case is here only to test backwards compatibility!
+ preamble = self.blob._encode_preamble()
+ # repack preamble using legacy format, without doc size
+ unpacked = _crypto.PACMAN.unpack(preamble)
+ preamble_without_size = _crypto.LEGACY_PACMAN.pack(*unpacked[0:7])
+ # encrypt it manually for custom tag
+ ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv,
+ self.cleartext.getvalue(),
+ aead=preamble_without_size)
+ ciphertext = ciphertext + tag
+ # encode it
+ ciphertext = base64.urlsafe_b64encode(ciphertext)
+ preamble_without_size = base64.urlsafe_b64encode(preamble_without_size)
+ # decrypt it
+ ciphertext = preamble_without_size + ' ' + ciphertext
+ cleartext = yield _crypto.BlobDecryptor(
+ self.doc_info, BytesIO(ciphertext),
+ secret='A' * 96).decrypt()
+ assert cleartext == self.cleartext.getvalue()
+ warnings = self.flushWarnings()
+ assert len(warnings) == 1
+ assert 'legacy document without size' in warnings[0]['message']
+
+
+def _aes_encrypt(key, iv, data, aead=''):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)
encryptor = cipher.encryptor()
+ if aead:
+ encryptor.authenticate_additional_data(aead)
return encryptor.update(data) + encryptor.finalize(), encryptor.tag
diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py
index 8ee3735c..1af1a130 100644
--- a/testing/tests/client/test_deprecated_crypto.py
+++ b/testing/tests/client/test_deprecated_crypto.py
@@ -1,5 +1,7 @@
import json
-from twisted.internet import defer
+
+from pytest import inlineCallbacks
+
from uuid import uuid4
from urlparse import urljoin
@@ -39,7 +41,7 @@ class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
def make_app_with_state(state):
return make_token_soledad_app(state)
- @defer.inlineCallbacks
+ @inlineCallbacks
def test_touch_updates_remote_representation(self):
self.startTwistedServer()
user = 'user-' + uuid4().hex
@@ -49,7 +51,7 @@ class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
self._soledad_instance(user=user, server_url=server_url))
self.make_app()
- remote = self.request_state._create_database(replica_uid=client._uuid)
+ remote = self.request_state._create_database(replica_uid=client.uuid)
remote = CouchDatabase.open_database(
urljoin(self.couch_url, 'user-' + user),
create=True)
diff --git a/testing/tests/client/test_http_client.py b/testing/tests/client/test_http_client.py
deleted file mode 100644
index a107930a..00000000
--- a/testing/tests/client/test_http_client.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_http_client.py
-# Copyright (C) 2013-2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: sync target
-"""
-import json
-
-from testscenarios import TestWithScenarios
-
-from leap.soledad.client import auth
-from leap.soledad.common.l2db.remote import http_client
-from test_soledad.u1db_tests import test_http_client
-from leap.soledad.server.auth import SoledadTokenAuthMiddleware
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_http_client`.
-# -----------------------------------------------------------------------------
-
-class TestSoledadClientBase(
- TestWithScenarios,
- test_http_client.TestHTTPClientBase):
-
- """
- This class should be used to test Token auth.
- """
-
- def getClient(self, **kwds):
- cli = self.getClientWithToken(**kwds)
- if 'creds' not in kwds:
- cli.set_token_credentials('user-uuid', 'auth-token')
- return cli
-
- def getClientWithToken(self, **kwds):
- self.startServer()
-
- class _HTTPClientWithToken(
- http_client.HTTPClientBase, auth.TokenBasedAuth):
-
- def set_token_credentials(self, uuid, token):
- auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _sign_request(self, method, url_query, params):
- return auth.TokenBasedAuth._sign_request(
- self, method, url_query, params)
-
- return _HTTPClientWithToken(self.getURL('dbase'), **kwds)
-
- def app(self, environ, start_response):
- res = test_http_client.TestHTTPClientBase.app(
- self, environ, start_response)
- if res is not None:
- return res
- # mime solead application here.
- if '/token' in environ['PATH_INFO']:
- auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY)
- if not auth:
- start_response("401 Unauthorized",
- [('Content-Type', 'application/json')])
- return [
- json.dumps(
- {"error": "unauthorized",
- "message": "no token found in environment"})
- ]
- scheme, encoded = auth.split(None, 1)
- if scheme.lower() != 'token':
- start_response("401 Unauthorized",
- [('Content-Type', 'application/json')])
- return [json.dumps({"error": "unauthorized",
- "message": "unknown scheme: %s" % scheme})]
- uuid, token = encoded.decode('base64').split(':', 1)
- if uuid != 'user-uuid' and token != 'auth-token':
- return Exception("Incorrect address or token.")
- start_response("200 OK", [('Content-Type', 'application/json')])
- return [json.dumps([environ['PATH_INFO'], uuid, token])]
-
- def test_token(self):
- """
- Test if token is sent correctly.
- """
- cli = self.getClientWithToken()
- cli.set_token_credentials('user-uuid', 'auth-token')
- res, headers = cli._request('GET', ['doc', 'token'])
- self.assertEqual(
- ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
-
- def test_token_ctr_creds(self):
- cli = self.getClientWithToken(creds={'token': {
- 'uuid': 'user-uuid',
- 'token': 'auth-token',
- }})
- res, headers = cli._request('GET', ['doc', 'token'])
- self.assertEqual(
- ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
diff --git a/testing/tests/client/test_secrets.py b/testing/tests/client/test_secrets.py
new file mode 100644
index 00000000..18ff458b
--- /dev/null
+++ b/testing/tests/client/test_secrets.py
@@ -0,0 +1,159 @@
+# -*- CODing: utf-8 -*-
+# test_secrets.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for secrets encryption and decryption.
+"""
+import scrypt
+
+from twisted.trial import unittest
+
+from leap.soledad.client._crypto import ENC_METHOD
+from leap.soledad.client._secrets import SecretsCrypto
+
+
+class SecretsCryptoTestCase(unittest.TestCase):
+
+ SECRETS = {
+ 'remote_secret': 'a' * 512,
+ 'local_salt': 'b' * 64,
+ 'local_secret': 'c' * 448
+ }
+ ENCRYPTED_V2 = {
+ 'cipher': 2,
+ 'length': 1437,
+ 'kdf_salt': 'TSgNLeAGFeITeSgNzmYZHh+mzmkZPOqao7CAV/tx3KZCLwsrT0HmWtVK3'
+ 'TyWHWNgVdeamMZYRuvZavE2sp0DGw==\n',
+ 'iv': 'TKZQKIlRgdnXFhJf08qswg==',
+ 'secrets': 'ZNZRi72VDtwZqyuU+uf3yzZt23vCtMS3Ki2bnZyeHUOSGVweJeDadF4oqE'
+ 'BW87NN00j9E49BzyzLr9SNgwZjPp0wlUm7kt+s8EUfJUdH8nxaQ+9iqGXM'
+ 'cCHmBM8L8DRN2m3BrPGx7m+QGlN9sbrRpl7fqc46RWcYuTEpm4upjdtI7O'
+ 'jDd0JG3C0rUzIuKJn9w4rEpX3tLEKXVdZfLvRXS5roR0cauazsDO69E13q'
+ 'a01vDuY+UJ+buLQ3FluPnnk8QE7ztPVUmRJJ76yAIhjVX9owiwlp9GnUJY'
+ 'sETRCqdRSTwUcHIkzVR0zAvtxTX7eGTitzf4gCYEC4T9v5N/jHxEfPdx28'
+ 'MM4KShWN2nFxNFQLQUpMN2OrM7UyUw+DQ3ydqBeBPKPHRN5s05kIK7P/Ra'
+ 'aLNcrJWa7DopLbgLlei0Jd7S4sjv1ufaRY7v0qJaVkhh/VaCylTSVw1rv5'
+ 'YzSWcHHcLuC0R8xLadz6T+EpsVYxgPYCS7w5xoE82zwNQzw/EBxLIcyLPl'
+ 'ipKnr2dttrmm3KXUOT1IdbSbI5elF6yQTAusdqiXuypey+MDqHYWEYWkCn'
+ 'e9/uGM9FjklDLE0RtPEDxhq64tw6u2Xu7RzDzyQDI8EIoTdU+4zEMTnelZ'
+ 'fKEwdG58EDxTXfUk6IDcRUupz3YuToSMhIOkqgXnbWl/nrK0O9v4JMhQjI'
+ 'r+oPICYfFr14kvJXBsfntILTJCxzbqTQcNba3jc8rGqCZ6gM0u4PndwTG2'
+ 'UiCqPU2HMnWvVGQOXeLdQY+EqqXQiRDi0DrDmkVwFf+27dPXxmZ43C48W3'
+ 'lMhTKXl0rdBFnOD5jiMh0X6q/KYXonyEtMZMsjT7dFePcCy4wQRhuut+ac'
+ '/TJWyrr+/IB45E+LZbhV7xCy1dYsbdb52jTRJFpaQ83sj6Iv6SYdiqqXzL'
+ 'F5JGMyuovTjwAoIIQzpLv36xY2wGGAH1V8c7QmDR2qumXrHD9R68WjBoSY'
+ '7IFM0TFAGZNun56y/zQ4r8yOMSAId+j4kuRH0fENEi0FJ+SpmSdHfpvBhE'
+ 'MdGh927E9enEYWmUQMmkxXIw6E+O3cmOWt2hsMbUAikDCpQOnVP2BD55HT'
+ '6FfbW7ITVwPINHYexmy2Xcm8H5zzGFSp+uYIyPBYDKA+VJ+QQI8bud9K+T'
+ 'NBybUv9u6LbB6BsLpwLoxMPJu0WsN2HpmLYgrg2ML1huMF1OtaGRuUr2PL'
+ 'NBaZaL6VOztYrVtQG1+tNyRxn8XQTtx0l6n+EihGVe9Sk5XF6DJA9ZN7uO'
+ 'svTUFJ5qG3Erf4AmbUJWoOR/NvapBtifidM7gPZZ6NqBs6v72rU1pGy+p7'
+ 'o84KrmB2MNf3yJ0BvKxPvFmltF3Dc7LB5TN8ycbmFM6hgrLvvhPxiHEnG/'
+ '8Qcrg0nUXOipFGNgZEU7t7Mz6RJ189Z2Kx1HVGrkAzEgqwZYqijAPlsgzO'
+ 'bg6DwzwC7stolQWGCDQUtJVlE8FZ/Up8zFYYZKn52WzjmSN4/hHhEvdkck'
+ 'Nez/JVev6fMcVrgdrTZ+uCwxjN/4xPdgog2HV470ea1bvIkSNOOrhm194M'
+ '40GmvmBhiSSMjdRQCQtM0t9bUuSQLPDzEiCA9QaLyygtlz9uRR/dXgkEg0'
+ 'J4YNpZvhE0wbyp4GHytbPaAmrcd7im9+buTuMwhXpZl0stmfkJxVHJSZ8Y'
+ 'IHakHs3W1fdYyI3wxGpel/9eYO3ISukolwrHXESP65wVNKfBwbqVJzQmts'
+ 'pyDBOI6DcLKZfE1EVg0+uwQ/5PKZbn0TwlXO1YE3NL3mAply3zQR9hyBrY'
+ '6f1jkHVD3irIlWkSiPJsP8sW+nrK8c/Ha8F+dua6DTZmg594OIaQj8mPiY'
+ 'GcIusiARWocR5/MmSjupGOgFx4HtmckTJtAta3XP4elOx04teH/P9Cgr1x'
+ 'XYf+cEX6gp92L9rTo0FCz3Hw==\n',
+ 'version': 2,
+ 'kdf': 'scrypt',
+ 'kdf_length': 32
+ }
+
+ ENCRYPTED_V1 = {
+ 'version': 1,
+ 'active_secret': 'secret_id',
+ 'storage_secrets': {
+ 'secret_id': {
+ 'kdf': 'scrypt',
+ 'secret': 'u31ObvxNU8jB0HgMj3TVwQ==:JQwlYq6sAQmHYS3x2CJzObT9h'
+ 'j1iiHthvrMh887qedNCcOfJyCA3jpRkc0vjd2Qk/2HSJ+JxM2F'
+ 'MrPzzx5O34EHlgF2scen34guZRRIf42WpnMy+PrL4cnMlZLgCh'
+ 'H1Jz6wcIMEpU9LQ8OaCShk1/yJ6qcVHOV4DDt3mTF7ttiqI5cp'
+ 'msaVtxxYCcpxFiWSeSCEgr0h4/Ih1qHuM6vk+CQjf/zg1f/7HR'
+ 'imIyNYXit9Fw3YTkxBen1wG3f5L7OAODRTuqnWpkQFOmclx050'
+ 'k0frKRcX6UWhIOWpW2mqJXnvzDtQQVGzqIdSgGTGtUDGQ7Onnc'
+ 'NkUlSnuVC7PkDNNRuwit3pCB9YWBWyPAQgs0kLqoV4YcuSctz6'
+ 'SAf76ozdcK5/SrOzutOfyPag4V3AYKMv6rCKALJ10OnFJ61FL9'
+ 'kd6JZam7WOlEUXyO7Gdgvz+eKiQMTZXbtO2kAKqel513MedPXC'
+ 'dzajUe1U2JaGg86UdiDWoPYOiWxnAPwfNJk+1QuNy5NZ7PaMtF'
+ 'IKT3/Xema2U8mufS0FbvJyK2flP1VUWcCzHKTSqX6+kU7UpoWa'
+ 'hYa7PlO40El+putTQLBmNaEeaWFngO+XB4TReICHSiCdcAb3pw'
+ 'sabjtxt+OpK4vbj3yBSfpiZTpVbEjt9U/tUpVp/T2M66lMi3ZC'
+ 'oHLlhu45Zo0aEq3UmQ/WBXu6EkO2eLYz2br9YQwRbSJ6z5CHmu'
+ 'hjKBQlpvGNfZYObx5lY4o6Ab4f/N8gyukskjmAFAf7Fr8cEog/'
+ 'oxmbagoCtUGRYJp2paooqH8L6xXp0Y8+23g7WJaAIr1i4V4aKS'
+ 'r9x7iUK6prcZTtMJZEHCswkLN/+DU6/FX3YZcOjseC+Qv3P+9v'
+ 'zIDp/92KJzqVqITGwrsc6ZsglMW37qxs6albtw3lMWSHlkcLbj'
+ 'Xf/iHPeKnb2WNLdkFNQ1J5OaTJR+E1CrXN+pm1JtB6XaUbaLGV'
+ 'CGUo13lAPVDtXcPbo64kMrQtQu4m9m8X8t8tfuJmINfwBnrKzk'
+ 'O6pl+LwimFaFEArV6wcaMxmwi0lM7mt4U1u9OIQjghQ/dEmOyV'
+ 'dZBnvyG7T/oRuLdUyZ/QGXZMlPQ3lAZ0ONn1Mk4bmKToW8ToE8'
+ 'ylld3rLlWDjjoQP8mP05Izg3mguLHXUhikUL8MD5NdYyeZJ1XZ'
+ '0OZ5S9uncurYj2ABWJoVaq/tFCdCEo9bbjWsePei26GZjaM3Fx'
+ 'RkAICXe/bt6/uLgaPZtO+sdARDuU3DRKMIdgM9NBaIn0kC7Wk4'
+ 'bnYShZ/rbhVt2/ds5XinnDBZsxSR3s553DixJ9v6w9Db++9Stw'
+ '4DgePd9lLy+6WuVBlKmcNflx9zg7US0AOarX2UNiQ==',
+ 'kdf_length': 32,
+ 'kdf_salt': 'MYH68QH48nRFMWH44piFWqBnKtU8KCz6Ajh24otrvzJlqPgB'
+ 'v6bvFJjRvjRp/0/v1j2nt40RZ6H5hfoKmore0g==\n',
+ 'length': 1024,
+ 'cipher': 'aes256',
+ }
+ }
+ }
+
+ def setUp(self):
+ class Soledad(object):
+ passphrase = '123'
+ soledad = Soledad()
+ self._crypto = SecretsCrypto(soledad)
+
+ def test__get_key(self):
+ salt = 'abc'
+ expected = scrypt.hash('123', salt, buflen=32)
+ key = self._crypto._get_key(salt)
+ self.assertEqual(expected, key)
+
+ def test_encrypt(self):
+ info = self._crypto.encrypt(self.SECRETS)
+ self.assertEqual(8, len(info))
+ for key, value in [
+ ('kdf', 'scrypt'),
+ ('kdf_salt', None),
+ ('kdf_length', None),
+ ('cipher', ENC_METHOD.aes_256_gcm),
+ ('length', None),
+ ('iv', None),
+ ('secrets', None),
+ ('version', 2)]:
+ self.assertTrue(key in info)
+ if value:
+ self.assertEqual(info[key], value)
+
+ def test__decrypt_v2(self):
+ encrypted = self.ENCRYPTED_V2
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
+
+ def test__decrypt_v1(self):
+ encrypted = self.ENCRYPTED_V1
+ decrypted = self._crypto.decrypt(encrypted)
+ self.assertEqual(decrypted, self.SECRETS)
diff --git a/testing/tests/client/test_shared_db.py b/testing/tests/client/test_shared_db.py
index aac766c2..b045e524 100644
--- a/testing/tests/client/test_shared_db.py
+++ b/testing/tests/client/test_shared_db.py
@@ -2,7 +2,6 @@ from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.shared_db import SoledadSharedDatabase
from test_soledad.util import BaseSoledadTest
-from test_soledad.util import ADDRESS
class SoledadSharedDBTestCase(BaseSoledadTest):
@@ -14,37 +13,28 @@ class SoledadSharedDBTestCase(BaseSoledadTest):
def setUp(self):
BaseSoledadTest.setUp(self)
self._shared_db = SoledadSharedDatabase(
- 'https://provider/', ADDRESS, document_factory=SoledadDocument,
+ 'https://provider/', document_factory=SoledadDocument,
creds=None)
def tearDown(self):
BaseSoledadTest.tearDown(self)
- def test__get_secrets_from_shared_db(self):
+ def test__get_remote_doc(self):
"""
Ensure the shared db is queried with the correct doc_id.
"""
- doc_id = self._soledad.secrets._shared_db_doc_id()
- self._soledad.secrets._get_secrets_from_shared_db()
- self.assertTrue(
- self._soledad.shared_db.get_doc.assert_called_with(
- doc_id) is None,
- 'Wrong doc_id when fetching recovery document.')
-
- def test__put_secrets_in_shared_db(self):
+ doc_id = self._soledad.secrets.storage._remote_doc_id()
+ self._soledad.secrets.storage._get_remote_doc()
+ self._soledad.secrets.storage._shared_db.get_doc.assert_called_with(
+ doc_id)
+
+ def test_save_remote(self):
"""
Ensure recovery document is put into shared recover db.
"""
- doc_id = self._soledad.secrets._shared_db_doc_id()
- self._soledad.secrets._put_secrets_in_shared_db()
- self.assertTrue(
- self._soledad.shared_db.get_doc.assert_called_with(
- doc_id) is None,
- 'Wrong doc_id when fetching recovery document.')
- self.assertTrue(
- self._soledad.shared_db.put_doc.assert_called_with(
- self._doc_put) is None,
- 'Wrong document when putting recovery document.')
- self.assertTrue(
- self._doc_put.doc_id == doc_id,
- 'Wrong doc_id when putting recovery document.')
+ doc_id = self._soledad.secrets.storage._remote_doc_id()
+ storage = self._soledad.secrets.storage
+ storage.save_remote({'content': 'blah'})
+ storage._shared_db.get_doc.assert_called_with(doc_id)
+ storage._shared_db.put_doc.assert_called_with(self._doc_put)
+ self.assertTrue(self._doc_put.doc_id == doc_id)
diff --git a/testing/tests/client/test_signals.py b/testing/tests/client/test_signals.py
index 4e9ebfd0..c7609a74 100644
--- a/testing/tests/client/test_signals.py
+++ b/testing/tests/client/test_signals.py
@@ -20,7 +20,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
def setUp(self):
# mock signaling
soledad.client.signal = Mock()
- soledad.client.secrets.events.emit_async = Mock()
+ soledad.client._secrets.util.events.emit_async = Mock()
# run parent's setUp
BaseSoledadTest.setUp(self)
@@ -42,55 +42,36 @@ class SoledadSignalingTestCase(BaseSoledadTest):
- downloading keys / done downloading keys.
- uploading keys / done uploading keys.
"""
- soledad.client.secrets.events.emit_async.reset_mock()
+ soledad.client._secrets.util.events.emit_async.reset_mock()
# get a fresh instance so it emits all bootstrap signals
sol = self._soledad_instance(
secrets_path='alternative_stage3.json',
local_db_path='alternative_stage3.u1db')
# reverse call order so we can verify in the order the signals were
# expected
- soledad.client.secrets.events.emit_async.mock_calls.reverse()
- soledad.client.secrets.events.emit_async.call_args = \
- soledad.client.secrets.events.emit_async.call_args_list[0]
- soledad.client.secrets.events.emit_async.call_args_list.reverse()
+ soledad.client._secrets.util.events.emit_async.mock_calls.reverse()
+ soledad.client._secrets.util.events.emit_async.call_args = \
+ soledad.client._secrets.util.events.emit_async.call_args_list[0]
+ soledad.client._secrets.util.events.emit_async.call_args_list.reverse()
user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
- # downloading keys signals
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DOWNLOADING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data
- )
- # creating keys signals
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_CREATING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_CREATING_KEYS, user_data
- )
- # downloading once more (inside _put_keys_in_shared_db)
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DOWNLOADING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data
- )
- # uploading keys signals
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_UPLOADING_KEYS, user_data
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data
- )
+ def _assert(*args, **kwargs):
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.assert_called_with(*args)
+ pop = kwargs.get('pop')
+ if pop or pop is None:
+ self._pop_mock_call(mocked)
+
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_CREATING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_CREATING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_UPLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_UPLOADING_KEYS, user_data, pop=False)
+
sol.close()
def test_stage2_bootstrap_signals(self):
@@ -101,11 +82,11 @@ class SoledadSignalingTestCase(BaseSoledadTest):
# get existing instance so we have access to keys
sol = self._soledad_instance()
# create a document with secrets
- doc = SoledadDocument(doc_id=sol.secrets._shared_db_doc_id())
- doc.content = sol.secrets._export_recovery_document()
+ doc = SoledadDocument(doc_id=sol.secrets.storage._remote_doc_id())
+ doc.content = sol.secrets.crypto.encrypt(sol.secrets._secrets)
sol.close()
# reset mock
- soledad.client.secrets.events.emit_async.reset_mock()
+ soledad.client._secrets.util.events.emit_async.reset_mock()
# get a fresh instance so it emits all bootstrap signals
shared_db = self.get_default_shared_mock(get_doc_return_value=doc)
sol = self._soledad_instance(
@@ -114,20 +95,23 @@ class SoledadSignalingTestCase(BaseSoledadTest):
shared_db_class=shared_db)
# reverse call order so we can verify in the order the signals were
# expected
- soledad.client.secrets.events.emit_async.mock_calls.reverse()
- soledad.client.secrets.events.emit_async.call_args = \
- soledad.client.secrets.events.emit_async.call_args_list[0]
- soledad.client.secrets.events.emit_async.call_args_list.reverse()
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.mock_calls.reverse()
+ mocked.call_args = mocked.call_args_list[0]
+ mocked.call_args_list.reverse()
+
+ def _assert(*args, **kwargs):
+ mocked = soledad.client._secrets.util.events.emit_async
+ mocked.assert_called_with(*args)
+ pop = kwargs.get('pop')
+ if pop or pop is None:
+ self._pop_mock_call(mocked)
+
# assert download keys signals
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DOWNLOADING_KEYS,
- {'userid': ADDRESS, 'uuid': ADDRESS}
- )
- self._pop_mock_call(soledad.client.secrets.events.emit_async)
- soledad.client.secrets.events.emit_async.assert_called_with(
- catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,
- {'userid': ADDRESS, 'uuid': ADDRESS},
- )
+ user_data = {'userid': ADDRESS, 'uuid': ADDRESS}
+ _assert(catalog.SOLEDAD_DOWNLOADING_KEYS, user_data)
+ _assert(catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data, pop=False)
+
sol.close()
def test_stage1_bootstrap_signals(self):
diff --git a/testing/tests/conftest.py b/testing/tests/conftest.py
index 1ff1cbb7..c077828f 100644
--- a/testing/tests/conftest.py
+++ b/testing/tests/conftest.py
@@ -6,7 +6,7 @@ import signal
import time
from hashlib import sha512
-from subprocess import call
+from subprocess import check_call
from urlparse import urljoin
from uuid import uuid4
@@ -98,13 +98,13 @@ class SoledadServer(object):
def start(self):
self._create_conf_file()
# start the server
- call([
+ check_call([
'twistd',
'--logfile=%s' % self._logfile,
'--pidfile=%s' % self._pidfile,
'web',
- '--wsgi=leap.soledad.server.application.wsgi_application',
- '--port=2424'
+ '--class=leap.soledad.server.entrypoint.SoledadEntrypoint',
+ '--port=tcp:2424'
])
def _create_conf_file(self):
@@ -118,7 +118,7 @@ class SoledadServer(object):
def stop(self):
pid = get_pid(self._pidfile)
- os.kill(pid, signal.SIGKILL)
+ os.kill(pid, signal.SIGTERM)
@pytest.fixture(scope='module')
@@ -191,9 +191,19 @@ def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request):
soledad_dbs(default_uuid)
# get a soledad instance
- def create():
- secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % default_uuid)
- local_db_path = os.path.join(tmpdir.strpath, '%s.db' % default_uuid)
+ def create(force_fresh_db=False):
+ secrets_file = '%s.secret' % default_uuid
+ secrets_path = os.path.join(tmpdir.strpath, secrets_file)
+
+ # in some tests we might want to use the same user and remote database
+ # but with a clean/empty local database (i.e. download benchmarks), so
+ # here we provide a way to do that.
+ db_file = '%s.db' % default_uuid
+ if force_fresh_db:
+ prefix = uuid4().hex
+ db_file = prefix + '-' + db_file
+ local_db_path = os.path.join(tmpdir.strpath, db_file)
+
soledad_client = Soledad(
default_uuid,
unicode(passphrase),
diff --git a/testing/tests/server/test__resource.py b/testing/tests/server/test__resource.py
new file mode 100644
index 00000000..c066435e
--- /dev/null
+++ b/testing/tests/server/test__resource.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+# test__resource.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for Soledad server main resource.
+"""
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+from twisted.web.wsgi import WSGIResource
+from twisted.web.resource import getChildForRequest
+from twisted.internet import reactor
+
+from leap.soledad.server._resource import SoledadResource
+from leap.soledad.server._server_info import ServerInfo
+from leap.soledad.server._blobs import BlobsResource
+from leap.soledad.server.gzip_middleware import GzipMiddleware
+
+
+_pool = reactor.getThreadPool()
+
+
+class SoledadResourceTestCase(unittest.TestCase):
+
+ def test_get_root(self):
+ enable_blobs = None # doesn't matter
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest([''])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, ServerInfo)
+
+ def test_get_blobs_enabled(self):
+ enable_blobs = True
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest(['blobs'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, BlobsResource)
+
+ def test_get_blobs_disabled(self):
+ enable_blobs = False
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest(['blobs'])
+ child = getChildForRequest(resource, request)
+ # if blobs is disabled, the request should be routed to sync
+ self.assertIsInstance(child, WSGIResource)
+ self.assertIsInstance(child._application, GzipMiddleware)
+
+ def test_get_sync(self):
+ enable_blobs = None # doesn't matter
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest(['user-db', 'sync-from', 'source-id'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, WSGIResource)
+ self.assertIsInstance(child._application, GzipMiddleware)
diff --git a/testing/tests/server/test__server_info.py b/testing/tests/server/test__server_info.py
new file mode 100644
index 00000000..40567ef1
--- /dev/null
+++ b/testing/tests/server/test__server_info.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# test__server_info.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for Soledad server information announcement.
+"""
+import json
+
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+
+from leap.soledad.server._server_info import ServerInfo
+
+
+class ServerInfoTestCase(unittest.TestCase):
+
+ def test_blobs_enabled(self):
+ resource = ServerInfo(True)
+ response = resource.render(DummyRequest(['']))
+ _info = json.loads(response)
+ self.assertEquals(_info['blobs'], True)
+ self.assertTrue(isinstance(_info['version'], basestring))
+
+ def test_blobs_disabled(self):
+ resource = ServerInfo(False)
+ response = resource.render(DummyRequest(['']))
+ _info = json.loads(response)
+ self.assertEquals(_info['blobs'], False)
+ self.assertTrue(isinstance(_info['version'], basestring))
diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py
new file mode 100644
index 00000000..6eb647ee
--- /dev/null
+++ b/testing/tests/server/test_auth.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# test_auth.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for auth pieces.
+"""
+import collections
+
+from contextlib import contextmanager
+
+from twisted.cred.credentials import UsernamePassword
+from twisted.cred.error import UnauthorizedLogin
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial import unittest
+from twisted.web.resource import IResource
+from twisted.web.test import test_httpauth
+
+import leap.soledad.server.auth as auth_module
+from leap.soledad.server.auth import SoledadRealm
+from leap.soledad.server.auth import TokenChecker
+from leap.soledad.server.auth import TokenCredentialFactory
+from leap.soledad.server._resource import SoledadResource
+
+
+class SoledadRealmTestCase(unittest.TestCase):
+
+ def test_returned_resource(self):
+ # we have to pass a pool to the realm , otherwise tests will hang
+ conf = {'blobs': False}
+ pool = reactor.getThreadPool()
+ realm = SoledadRealm(conf=conf, sync_pool=pool)
+ iface, avatar, logout = realm.requestAvatar('any', None, IResource)
+ self.assertIsInstance(avatar, SoledadResource)
+ self.assertIsNone(logout())
+
+
+class DummyServer(object):
+ """
+ I fake the `couchdb.client.Server` GET api and always return the token
+ given on my creation.
+ """
+
+ def __init__(self, token):
+ self._token = token
+
+ def get(self, _):
+ return self._token
+
+
+@contextmanager
+def dummy_server(token):
+ yield collections.defaultdict(lambda: DummyServer(token))
+
+
+class TokenCheckerTestCase(unittest.TestCase):
+
+ @inlineCallbacks
+ def test_good_creds(self):
+ # set up a dummy server which always return a *valid* token document
+ token = {'user_id': 'user', 'type': 'Token'}
+ server = dummy_server(token)
+ # setup the checker with the custom server
+ checker = TokenChecker()
+ auth_module.couch_server = lambda url: server
+ # assert the checker *can* verify the creds
+ creds = UsernamePassword('user', 'pass')
+ avatarId = yield checker.requestAvatarId(creds)
+ self.assertEqual('user', avatarId)
+
+ @inlineCallbacks
+ def test_bad_creds(self):
+ # set up a dummy server which always return an *invalid* token document
+ token = None
+ server = dummy_server(token)
+ # setup the checker with the custom server
+ checker = TokenChecker()
+ auth_module.couch_server = lambda url: server
+ # assert the checker *cannot* verify the creds
+ creds = UsernamePassword('user', '')
+ with self.assertRaises(UnauthorizedLogin):
+ yield checker.requestAvatarId(creds)
+
+
+class TokenCredentialFactoryTestcase(
+ test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
+ unittest.TestCase):
+
+ def setUp(self):
+ test_httpauth.BasicAuthTestsMixin.setUp(self)
+ self.credentialFactory = TokenCredentialFactory()
diff --git a/testing/tests/server/test_config.py b/testing/tests/server/test_config.py
new file mode 100644
index 00000000..d2a8a9de
--- /dev/null
+++ b/testing/tests/server/test_config.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# test_config.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server configuration.
+"""
+
+from twisted.trial import unittest
+from pkg_resources import resource_filename
+
+from leap.soledad.server._config import _load_config
+from leap.soledad.server._config import CONFIG_DEFAULTS
+
+
+class ConfigurationParsingTest(unittest.TestCase):
+
+ def setUp(self):
+ self.maxDiff = None
+
+ def test_use_defaults_on_failure(self):
+ config = _load_config('this file will never exist')
+ expected = CONFIG_DEFAULTS
+ self.assertEquals(expected, config)
+
+ def test_security_values_configuration(self):
+ # given
+ config_path = resource_filename('test_soledad',
+ 'fixture_soledad.conf')
+ # when
+ config = _load_config(config_path)
+
+ # then
+ expected = {'members': ['user1', 'user2'],
+ 'members_roles': ['role1', 'role2'],
+ 'admins': ['user3', 'user4'],
+ 'admins_roles': ['role3', 'role3']}
+ self.assertDictEqual(expected, config['database-security'])
+
+ def test_server_values_configuration(self):
+ # given
+ config_path = resource_filename('test_soledad',
+ 'fixture_soledad.conf')
+ # when
+ config = _load_config(config_path)
+
+ # then
+ expected = {'couch_url':
+ 'http://soledad:passwd@localhost:5984',
+ 'create_cmd':
+ 'sudo -u soledad-admin /usr/bin/create-user-db',
+ 'admin_netrc':
+ '/etc/couchdb/couchdb-soledad-admin.netrc',
+ 'batching': False,
+ 'blobs': False,
+ 'blobs_path': '/srv/leap/soledad/blobs'}
+ self.assertDictEqual(expected, config['soledad-server'])
diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py
index 6710caaf..4a5ec43f 100644
--- a/testing/tests/server/test_server.py
+++ b/testing/tests/server/test_server.py
@@ -18,17 +18,13 @@
Tests for server-related functionality.
"""
import binascii
-import mock
import os
import pytest
-from hashlib import sha512
-from pkg_resources import resource_filename
from urlparse import urljoin
from uuid import uuid4
from twisted.internet import defer
-from twisted.trial import unittest
from leap.soledad.common.couch.state import CouchServerState
from leap.soledad.common.couch import CouchDatabase
@@ -38,253 +34,10 @@ from test_soledad.util import (
make_token_soledad_app,
make_soledad_document_for_test,
soledad_sync_target,
- BaseSoledadTest,
)
from leap.soledad.client import _crypto
from leap.soledad.client import Soledad
-from leap.soledad.server.config import load_configuration
-from leap.soledad.server.config import CONFIG_DEFAULTS
-from leap.soledad.server.auth import URLToAuthorization
-from leap.soledad.server.auth import SoledadTokenAuthMiddleware
-
-
-class ServerAuthenticationMiddlewareTestCase(CouchDBTestCase):
-
- def setUp(self):
- super(ServerAuthenticationMiddlewareTestCase, self).setUp()
- app = mock.Mock()
- self._state = CouchServerState(self.couch_url)
- app.state = self._state
- self.auth_middleware = SoledadTokenAuthMiddleware(app)
- self._authorize('valid-uuid', 'valid-token')
-
- def _authorize(self, uuid, token):
- token_doc = {}
- token_doc['_id'] = sha512(token).hexdigest()
- token_doc[self._state.TOKENS_USER_ID_KEY] = uuid
- token_doc[self._state.TOKENS_TYPE_KEY] = \
- self._state.TOKENS_TYPE_DEF
- dbname = self._state._tokens_dbname()
- db = self.couch_server.create(dbname)
- db.save(token_doc)
- self.addCleanup(self.delete_db, db.name)
-
- def test_authorized_user(self):
- is_authorized = self.auth_middleware._verify_authentication_data
- self.assertTrue(is_authorized('valid-uuid', 'valid-token'))
- self.assertFalse(is_authorized('valid-uuid', 'invalid-token'))
- self.assertFalse(is_authorized('invalid-uuid', 'valid-token'))
- self.assertFalse(is_authorized('eve', 'invalid-token'))
-
-
-class ServerAuthorizationTestCase(BaseSoledadTest):
-
- """
- Tests related to Soledad server authorization.
- """
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def _make_environ(self, path_info, request_method):
- return {
- 'PATH_INFO': path_info,
- 'REQUEST_METHOD': request_method,
- }
-
- def test_verify_action_with_correct_dbnames(self):
- """
- Test encrypting and decrypting documents.
-
- The following table lists the authorized actions among all possible
- u1db remote actions:
-
- URL path | Authorized actions
- --------------------------------------------------
- / | GET
- /shared-db | GET
- /shared-db/docs | -
- /shared-db/doc/{id} | GET, PUT, DELETE
- /shared-db/sync-from/{source} | -
- /user-db | GET, PUT, DELETE
- /user-db/docs | -
- /user-db/doc/{id} | -
- /user-db/sync-from/{source} | GET, PUT, POST
- """
- uuid = uuid4().hex
- authmap = URLToAuthorization(uuid,)
- dbname = authmap._user_db_name
- # test global auth
- self.assertTrue(
- authmap.is_authorized(self._make_environ('/', 'GET')))
- # test shared-db database resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared', 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared', 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared', 'POST')))
- # test shared-db docs resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'POST')))
- # test shared-db doc resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'GET')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'PUT')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'POST')))
- # test shared-db sync resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'POST')))
- # test user-db database resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'GET')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'PUT')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'POST')))
- # test user-db docs resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'POST')))
- # test user-db doc resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'POST')))
- # test user-db sync resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'GET')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'DELETE')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'POST')))
-
- def test_verify_action_with_wrong_dbnames(self):
- """
- Test if authorization fails for a wrong dbname.
- """
- uuid = uuid4().hex
- authmap = URLToAuthorization(uuid)
- dbname = 'somedb'
- # test wrong-db database resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'POST')))
- # test wrong-db docs resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'POST')))
- # test wrong-db doc resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'POST')))
- # test wrong-db sync resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'POST')))
@pytest.mark.usefixtures("method_tmpdir")
@@ -382,7 +135,7 @@ class EncryptedSyncTestCase(
user=user,
prefix='x',
auth_token='auth-token',
- secrets_path=sol1._secrets_path,
+ secrets_path=sol1.secrets_path,
passphrase=passphrase)
# ensure remote db exists before syncing
@@ -474,45 +227,3 @@ class EncryptedSyncTestCase(
Test if Soledad can sync many smallfiles.
"""
return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100)
-
-
-class ConfigurationParsingTest(unittest.TestCase):
-
- def setUp(self):
- self.maxDiff = None
-
- def test_use_defaults_on_failure(self):
- config = load_configuration('this file will never exist')
- expected = CONFIG_DEFAULTS
- self.assertEquals(expected, config)
-
- def test_security_values_configuration(self):
- # given
- config_path = resource_filename('test_soledad',
- 'fixture_soledad.conf')
- # when
- config = load_configuration(config_path)
-
- # then
- expected = {'members': ['user1', 'user2'],
- 'members_roles': ['role1', 'role2'],
- 'admins': ['user3', 'user4'],
- 'admins_roles': ['role3', 'role3']}
- self.assertDictEqual(expected, config['database-security'])
-
- def test_server_values_configuration(self):
- # given
- config_path = resource_filename('test_soledad',
- 'fixture_soledad.conf')
- # when
- config = load_configuration(config_path)
-
- # then
- expected = {'couch_url':
- 'http://soledad:passwd@localhost:5984',
- 'create_cmd':
- 'sudo -u soledad-admin /usr/bin/create-user-db',
- 'admin_netrc':
- '/etc/couchdb/couchdb-soledad-admin.netrc',
- 'batching': False}
- self.assertDictEqual(expected, config['soledad-server'])
diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py
new file mode 100644
index 00000000..ebb94476
--- /dev/null
+++ b/testing/tests/server/test_session.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+# test_session.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server session entrypoint.
+"""
+from twisted.trial import unittest
+
+from twisted.cred import portal
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred.credentials import IUsernamePassword
+from twisted.web.resource import getChildForRequest
+from twisted.web.static import Data
+from twisted.web.test.requesthelper import DummyRequest
+from twisted.web.test.test_httpauth import b64encode
+from twisted.web.test.test_httpauth import Realm
+from twisted.web._auth.wrapper import UnauthorizedResource
+
+from leap.soledad.server.session import SoledadSession
+
+
+class SoledadSessionTestCase(unittest.TestCase):
+ """
+ Tests adapted from for
+ L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}.
+ """
+
+ def makeRequest(self, *args, **kwargs):
+ request = DummyRequest(*args, **kwargs)
+ request.path = '/'
+ return request
+
+ def setUp(self):
+ self.username = b'foo bar'
+ self.password = b'bar baz'
+ self.avatarContent = b"contents of the avatar resource itself"
+ self.childName = b"foo-child"
+ self.childContent = b"contents of the foo child of the avatar"
+ self.checker = InMemoryUsernamePasswordDatabaseDontUse()
+ self.checker.addUser(self.username, self.password)
+ self.avatar = Data(self.avatarContent, 'text/plain')
+ self.avatar.putChild(
+ self.childName, Data(self.childContent, 'text/plain'))
+ self.avatars = {self.username: self.avatar}
+ self.realm = Realm(self.avatars.get)
+ self.portal = portal.Portal(self.realm, [self.checker])
+ self.wrapper = SoledadSession(self.portal)
+
+ def _authorizedTokenLogin(self, request):
+ authorization = b64encode(
+ self.username + b':' + self.password)
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token ' + authorization)
+ return getChildForRequest(self.wrapper, request)
+
+ def test_getChildWithDefault(self):
+ request = self.makeRequest([self.childName])
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def _invalidAuthorizationTest(self, response):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', response)
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_getChildWithDefaultUnauthorizedUser(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(b'foo:bar'))
+
+ def test_getChildWithDefaultUnauthorizedPassword(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(self.username + b':bar'))
+
+ def test_getChildWithDefaultUnrecognizedScheme(self):
+ return self._invalidAuthorizationTest(b'Quux foo bar baz')
+
+ def test_getChildWithDefaultAuthorized(self):
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.childContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_renderAuthorized(self):
+ # Request it exactly, not any of its children.
+ request = self.makeRequest([])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.avatarContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_decodeRaises(self):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token decode should fail')
+ child = getChildForRequest(self.wrapper, request)
+ self.assertIsInstance(child, UnauthorizedResource)
+
+ def test_parseResponse(self):
+ basicAuthorization = b'Basic abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(basicAuthorization),
+ None)
+ tokenAuthorization = b'Token abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(tokenAuthorization),
+ b'abcdef123456')
+
+ def test_unexpectedDecodeError(self):
+
+ class UnexpectedException(Exception):
+ pass
+
+ class BadFactory(object):
+ scheme = b'bad'
+
+ def getChallenge(self, client):
+ return {}
+
+ def decode(self, response, request):
+ print "decode raised"
+ raise UnexpectedException()
+
+ self.wrapper._credentialFactory = BadFactory()
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', b'Bad abc')
+ child = getChildForRequest(self.wrapper, request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+ errors = self.flushLoggedErrors(UnexpectedException)
+ self.assertEqual(len(errors), 1)
+
+ def test_unexpectedLoginError(self):
+ class UnexpectedException(Exception):
+ pass
+
+ class BrokenChecker(object):
+ credentialInterfaces = (IUsernamePassword,)
+
+ def requestAvatarId(self, credentials):
+ raise UnexpectedException()
+
+ self.portal.registerChecker(BrokenChecker())
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+ self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
diff --git a/testing/tests/server/test_url_mapper.py b/testing/tests/server/test_url_mapper.py
new file mode 100644
index 00000000..fa99cae7
--- /dev/null
+++ b/testing/tests/server/test_url_mapper.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# test_url_mapper.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server-related functionality.
+"""
+
+from twisted.trial import unittest
+from uuid import uuid4
+
+from leap.soledad.server.url_mapper import URLMapper
+
+
+class URLMapperTestCase(unittest.TestCase):
+ """
+ Test if the URLMapper behaves as expected.
+
+ The following table lists the authorized actions among all possible
+ u1db remote actions:
+
+ URL path | Authorized actions
+ --------------------------------------------------
+ / | GET
+ /shared-db | GET
+ /shared-db/docs | -
+ /shared-db/doc/{id} | -
+ /shared-db/sync-from/{source} | -
+ /user-db | -
+ /user-db/docs | -
+ /user-db/doc/{id} | -
+ /user-db/sync-from/{source} | GET, PUT, POST
+ """
+
+ def setUp(self):
+ self._uuid = uuid4().hex
+ self._urlmap = URLMapper()
+ self._dbname = 'user-%s' % self._uuid
+
+ def test_root_authorized(self):
+ match = self._urlmap.match('/', 'GET')
+ self.assertIsNotNone(match)
+
+ def test_shared_authorized(self):
+ self.assertIsNotNone(self._urlmap.match('/shared', 'GET'))
+
+ def test_shared_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared', 'POST'))
+
+ def test_shared_docs_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'GET'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'POST'))
+
+ def test_shared_doc_authorized(self):
+ match = self._urlmap.match('/shared/doc/x', 'GET')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ match = self._urlmap.match('/shared/doc/x', 'PUT')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ match = self._urlmap.match('/shared/doc/x', 'DELETE')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ def test_shared_doc_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST'))
+
+ def test_shared_sync_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST'))
+
+ def test_user_db_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST'))
+
+ def test_user_db_docs_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST'))
+
+ def test_user_db_doc_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST'))
+
+ def test_user_db_sync_authorized(self):
+ uuid = self._uuid
+ dbname = self._dbname
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ def test_user_db_sync_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(
+ self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE'))
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
index 6ce9a5c5..25136ba1 100644
--- a/testing/tests/sync/test_sync_target.py
+++ b/testing/tests/sync/test_sync_target.py
@@ -63,7 +63,8 @@ class TestSoledadParseReceivedDocResponse(unittest.TestCase):
"""
def parse(self, stream):
- parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42))
+ parser = DocStreamReceiver(None, defer.Deferred(),
+ lambda *_: defer.succeed(42))
parser.dataReceived(stream)
parser.finish()
@@ -838,7 +839,7 @@ class TestSoledadDbSync(
# already created on some setUp method.
import binascii
tohex = binascii.b2a_hex
- key = tohex(self._soledad.secrets.get_local_storage_key())
+ key = tohex(self._soledad.secrets.local_key)
dbpath = self._soledad._local_db_path
self.opts = SQLCipherOptions(