From cfff46ff9becdbe5cf48816870e625ed253ecc57 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 17 Sep 2017 12:08:25 -0300 Subject: [refactor] move tests to root of repository Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952 --- testing/tests/server/__init__.py | 0 testing/tests/server/test__resource.py | 85 ------ testing/tests/server/test__server_info.py | 43 ---- testing/tests/server/test_auth.py | 138 ---------- .../tests/server/test_blobs_resource_validation.py | 63 ----- testing/tests/server/test_blobs_server.py | 286 --------------------- testing/tests/server/test_config.py | 70 ----- .../tests/server/test_incoming_flow_integration.py | 97 ------- testing/tests/server/test_incoming_resource.py | 57 ---- testing/tests/server/test_incoming_server.py | 92 ------- testing/tests/server/test_server.py | 230 ----------------- testing/tests/server/test_session.py | 195 -------------- testing/tests/server/test_shared_db.py | 69 ----- testing/tests/server/test_tac.py | 87 ------- testing/tests/server/test_url_mapper.py | 133 ---------- 15 files changed, 1645 deletions(-) delete mode 100644 testing/tests/server/__init__.py delete mode 100644 testing/tests/server/test__resource.py delete mode 100644 testing/tests/server/test__server_info.py delete mode 100644 testing/tests/server/test_auth.py delete mode 100644 testing/tests/server/test_blobs_resource_validation.py delete mode 100644 testing/tests/server/test_blobs_server.py delete mode 100644 testing/tests/server/test_config.py delete mode 100644 testing/tests/server/test_incoming_flow_integration.py delete mode 100644 testing/tests/server/test_incoming_resource.py delete mode 100644 testing/tests/server/test_incoming_server.py delete mode 100644 testing/tests/server/test_server.py delete mode 100644 testing/tests/server/test_session.py delete mode 100644 testing/tests/server/test_shared_db.py delete mode 100644 testing/tests/server/test_tac.py delete mode 100644 testing/tests/server/test_url_mapper.py (limited to 'testing/tests/server') diff --git a/testing/tests/server/__init__.py b/testing/tests/server/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/testing/tests/server/test__resource.py b/testing/tests/server/test__resource.py deleted file mode 100644 index a43ac19f..00000000 --- a/testing/tests/server/test__resource.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -# test__resource.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for Soledad server main resource. -""" -from twisted.trial import unittest -from twisted.web.test.test_web import DummyRequest -from twisted.web.wsgi import WSGIResource -from twisted.web.resource import getChildForRequest -from twisted.internet import reactor - -from leap.soledad.server._resource import PublicResource -from leap.soledad.server._resource import LocalResource -from leap.soledad.server._server_info import ServerInfo -from leap.soledad.server._blobs import BlobsResource -from leap.soledad.server._incoming import IncomingResource -from leap.soledad.server.gzip_middleware import GzipMiddleware - - -_pool = reactor.getThreadPool() - - -class PublicResourceTestCase(unittest.TestCase): - - def test_get_root(self): - blobs_resource = None # doesn't matter - resource = PublicResource( - blobs_resource=blobs_resource, sync_pool=_pool) - request = DummyRequest(['']) - child = getChildForRequest(resource, request) - self.assertIsInstance(child, ServerInfo) - - def test_get_blobs_enabled(self): - blobs_resource = BlobsResource("filesystem", '/tmp') - resource = PublicResource( - blobs_resource=blobs_resource, sync_pool=_pool) - request = DummyRequest(['blobs']) - child = getChildForRequest(resource, request) - self.assertIsInstance(child, BlobsResource) - - def test_get_blobs_disabled(self): - blobs_resource = None - resource = PublicResource( - blobs_resource=blobs_resource, sync_pool=_pool) - request = DummyRequest(['blobs']) - child = getChildForRequest(resource, request) - # if blobs is disabled, the request should be routed to sync - self.assertIsInstance(child, WSGIResource) - self.assertIsInstance(child._application, GzipMiddleware) - - def test_get_sync(self): - blobs_resource = None # doesn't matter - resource = PublicResource( - blobs_resource=blobs_resource, sync_pool=_pool) - request = DummyRequest(['user-db', 'sync-from', 'source-id']) - child = getChildForRequest(resource, request) - self.assertIsInstance(child, WSGIResource) - self.assertIsInstance(child._application, GzipMiddleware) - - def test_no_incoming_on_public_resource(self): - resource = PublicResource(None, sync_pool=_pool) - request = DummyRequest(['incoming']) - child = getChildForRequest(resource, request) - # WSGIResource is returned if a path is unknown - self.assertIsInstance(child, WSGIResource) - - def test_get_incoming(self): - resource = LocalResource() - request = DummyRequest(['incoming']) - child = getChildForRequest(resource, request) - self.assertIsInstance(child, IncomingResource) diff --git a/testing/tests/server/test__server_info.py b/testing/tests/server/test__server_info.py deleted file mode 100644 index 40567ef1..00000000 --- a/testing/tests/server/test__server_info.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- coding: utf-8 -*- -# test__server_info.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -Tests for Soledad server information announcement. -""" -import json - -from twisted.trial import unittest -from twisted.web.test.test_web import DummyRequest - -from leap.soledad.server._server_info import ServerInfo - - -class ServerInfoTestCase(unittest.TestCase): - - def test_blobs_enabled(self): - resource = ServerInfo(True) - response = resource.render(DummyRequest([''])) - _info = json.loads(response) - self.assertEquals(_info['blobs'], True) - self.assertTrue(isinstance(_info['version'], basestring)) - - def test_blobs_disabled(self): - resource = ServerInfo(False) - response = resource.render(DummyRequest([''])) - _info = json.loads(response) - self.assertEquals(_info['blobs'], False) - self.assertTrue(isinstance(_info['version'], basestring)) diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py deleted file mode 100644 index 78cf20ab..00000000 --- a/testing/tests/server/test_auth.py +++ /dev/null @@ -1,138 +0,0 @@ -# -*- coding: utf-8 -*- -# test_auth.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for auth pieces. -""" -import os -import collections -import pytest - -from contextlib import contextmanager - -from twisted.cred.credentials import UsernamePassword -from twisted.cred.error import UnauthorizedLogin -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks -from twisted.trial import unittest -from twisted.web.resource import IResource -from twisted.web.test import test_httpauth - -import leap.soledad.server.auth as auth_module -from leap.soledad.server.auth import SoledadRealm -from leap.soledad.server.auth import CouchDBTokenChecker -from leap.soledad.server.auth import FileTokenChecker -from leap.soledad.server.auth import TokenCredentialFactory -from leap.soledad.server._resource import PublicResource - - -class SoledadRealmTestCase(unittest.TestCase): - - def test_returned_resource(self): - # we have to pass a pool to the realm , otherwise tests will hang - conf = {'blobs': False} - pool = reactor.getThreadPool() - realm = SoledadRealm(conf=conf, sync_pool=pool) - iface, avatar, logout = realm.requestAvatar('any', None, IResource) - self.assertIsInstance(avatar, PublicResource) - self.assertIsNone(logout()) - - -class DummyServer(object): - """ - I fake the `couchdb.client.Server` GET api and always return the token - given on my creation. - """ - - def __init__(self, token): - self._token = token - - def get(self, _): - return self._token - - -@contextmanager -def dummy_server(token): - yield collections.defaultdict(lambda: DummyServer(token)) - - -class CouchDBTokenCheckerTestCase(unittest.TestCase): - - @inlineCallbacks - def test_good_creds(self): - # set up a dummy server which always return a *valid* token document - token = {'user_id': 'user', 'type': 'Token'} - server = dummy_server(token) - # setup the checker with the custom server - checker = CouchDBTokenChecker() - auth_module.couch_server = lambda url: server - # assert the checker *can* verify the creds - creds = UsernamePassword('user', 'pass') - avatarId = yield checker.requestAvatarId(creds) - self.assertEqual('user', avatarId) - - @inlineCallbacks - def test_bad_creds(self): - # set up a dummy server which always return an *invalid* token document - token = None - server = dummy_server(token) - # setup the checker with the custom server - checker = CouchDBTokenChecker() - auth_module.couch_server = lambda url: server - # assert the checker *cannot* verify the creds - creds = UsernamePassword('user', '') - with self.assertRaises(UnauthorizedLogin): - yield checker.requestAvatarId(creds) - - -class FileTokenCheckerTestCase(unittest.TestCase): - - @inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_good_creds(self): - auth_file_path = os.path.join(self.tempdir, 'auth.file') - with open(auth_file_path, 'w') as tempfile: - tempfile.write('goodservice:goodtoken') - # setup the checker with the auth tokens file - conf = {'services_tokens_file': auth_file_path} - checker = FileTokenChecker(conf) - # assert the checker *can* verify the creds - creds = UsernamePassword('goodservice', 'goodtoken') - avatarId = yield checker.requestAvatarId(creds) - self.assertEqual('goodservice', avatarId) - - @inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_bad_creds(self): - auth_file_path = os.path.join(self.tempdir, 'auth.file') - with open(auth_file_path, 'w') as tempfile: - tempfile.write('service:token') - # setup the checker with the auth tokens file - conf = {'services_tokens_file': auth_file_path} - checker = FileTokenChecker(conf) - # assert the checker *cannot* verify the creds - creds = UsernamePassword('service', 'wrongtoken') - with self.assertRaises(UnauthorizedLogin): - yield checker.requestAvatarId(creds) - - -class TokenCredentialFactoryTestcase( - test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin, - unittest.TestCase): - - def setUp(self): - test_httpauth.BasicAuthTestsMixin.setUp(self) - self.credentialFactory = TokenCredentialFactory() diff --git a/testing/tests/server/test_blobs_resource_validation.py b/testing/tests/server/test_blobs_resource_validation.py deleted file mode 100644 index 9f6dfc2f..00000000 --- a/testing/tests/server/test_blobs_resource_validation.py +++ /dev/null @@ -1,63 +0,0 @@ -# -*- coding: utf-8 -*- -# test_blobs_resource_validation.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for invalid user or blob_id on blobs resource -""" -import pytest -from twisted.trial import unittest -from twisted.web.test.test_web import DummyRequest -from leap.soledad.server import _blobs as server_blobs - - -class BlobServerTestCase(unittest.TestCase): - - @pytest.mark.usefixtures("method_tmpdir") - def setUp(self): - self.resource = server_blobs.BlobsResource("filesystem", self.tempdir) - - @pytest.mark.usefixtures("method_tmpdir") - def test_valid_arguments(self): - request = DummyRequest(['v4l1d-us3r', 'v4l1d-bl0b-1d']) - self.assertTrue(self.resource._validate(request)) - - @pytest.mark.usefixtures("method_tmpdir") - def test_invalid_user_get(self): - request = DummyRequest(['invalid user', 'valid-blob-id']) - request.path = '/blobs/' - with pytest.raises(Exception): - self.resource.render_GET(request) - - @pytest.mark.usefixtures("method_tmpdir") - def test_invalid_user_put(self): - request = DummyRequest(['invalid user', 'valid-blob-id']) - request.path = '/blobs/' - with pytest.raises(Exception): - self.resource.render_PUT(request) - - @pytest.mark.usefixtures("method_tmpdir") - def test_invalid_blob_id_get(self): - request = DummyRequest(['valid-user', 'invalid blob id']) - request.path = '/blobs/' - with pytest.raises(Exception): - self.resource.render_GET(request) - - @pytest.mark.usefixtures("method_tmpdir") - def test_invalid_blob_id_put(self): - request = DummyRequest(['valid-user', 'invalid blob id']) - request.path = '/blobs/' - with pytest.raises(Exception): - self.resource.render_PUT(request) diff --git a/testing/tests/server/test_blobs_server.py b/testing/tests/server/test_blobs_server.py deleted file mode 100644 index 9eddf108..00000000 --- a/testing/tests/server/test_blobs_server.py +++ /dev/null @@ -1,286 +0,0 @@ -# -*- coding: utf-8 -*- -# test_blobs_server.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Integration tests for blobs server -""" -import os -import pytest -from uuid import uuid4 -from io import BytesIO -from twisted.trial import unittest -from twisted.web.server import Site -from twisted.internet import reactor -from twisted.internet import defer -from treq._utils import set_global_pool - -from leap.soledad.common.blobs import Flags -from leap.soledad.server import _blobs as server_blobs -from leap.soledad.client._db.blobs import BlobManager -from leap.soledad.client._db.blobs import BlobAlreadyExistsError -from leap.soledad.client._db.blobs import InvalidFlagsError -from leap.soledad.client._db.blobs import SoledadError - - -class BlobServerTestCase(unittest.TestCase): - - def setUp(self): - root = server_blobs.BlobsResource("filesystem", self.tempdir) - site = Site(root) - self.port = reactor.listenTCP(0, site, interface='127.0.0.1') - self.host = self.port.getHost() - self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) - self.secret = 'A' * 96 - set_global_pool(None) - - def tearDown(self): - self.port.stopListening() - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_upload_download(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("save me") - yield manager._encrypt_and_upload('blob_id', fd) - blob, size = yield manager._download_and_decrypt('blob_id') - self.assertEquals(blob.getvalue(), "save me") - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_set_get_flags(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("flag me") - yield manager._encrypt_and_upload('blob_id', fd) - yield manager.set_flags('blob_id', [Flags.PROCESSING]) - flags = yield manager.get_flags('blob_id') - self.assertEquals([Flags.PROCESSING], flags) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_set_flags_raises_if_no_blob_found(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - with pytest.raises(SoledadError): - yield manager.set_flags('missing_id', [Flags.PENDING]) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_list_filter_flag(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("flag me") - yield manager._encrypt_and_upload('blob_id', fd) - yield manager.set_flags('blob_id', [Flags.PROCESSING]) - blobs_list = yield manager.remote_list(filter_flag=Flags.PENDING) - self.assertEquals([], blobs_list) - blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING) - self.assertEquals(['blob_id'], blobs_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_list_filter_flag_order_by_date(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - yield manager._encrypt_and_upload('blob_id1', BytesIO("x")) - yield manager._encrypt_and_upload('blob_id2', BytesIO("x")) - yield manager._encrypt_and_upload('blob_id3', BytesIO("x")) - yield manager.set_flags('blob_id1', [Flags.PROCESSING]) - yield manager.set_flags('blob_id2', [Flags.PROCESSING]) - yield manager.set_flags('blob_id3', [Flags.PROCESSING]) - blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING, - order_by='+date') - expected_list = ['blob_id1', 'blob_id2', 'blob_id3'] - self.assertEquals(expected_list, blobs_list) - blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING, - order_by='-date') - self.assertEquals(list(reversed(expected_list)), blobs_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_cant_set_invalid_flags(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("flag me") - yield manager._encrypt_and_upload('blob_id', fd) - with pytest.raises(InvalidFlagsError): - yield manager.set_flags('blob_id', ['invalid']) - flags = yield manager.get_flags('blob_id') - self.assertEquals([], flags) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_empty_flags(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("flag me") - yield manager._encrypt_and_upload('blob_id', fd) - flags = yield manager.get_flags('blob_id') - self.assertEquals([], flags) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_flags_ignored_by_listing(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("flag me") - yield manager._encrypt_and_upload('blob_id', fd) - yield manager.set_flags('blob_id', [Flags.PROCESSING]) - blobs_list = yield manager.remote_list() - self.assertEquals(['blob_id'], blobs_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_upload_changes_remote_list(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) - yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) - blobs_list = yield manager.remote_list() - self.assertEquals(set(['blob_id1', 'blob_id2']), set(blobs_list)) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_list_orders_by_date(self): - user_uid = uuid4().hex - manager = BlobManager('', self.uri, self.secret, - self.secret, user_uid) - yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) - yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) - blobs_list = yield manager.remote_list(order_by='date') - self.assertEquals(['blob_id1', 'blob_id2'], blobs_list) - parts = [user_uid, 'default', 'b', 'blo', 'blob_i', 'blob_id1'] - self.__touch(self.tempdir, *parts) - blobs_list = yield manager.remote_list(order_by='+date') - self.assertEquals(['blob_id2', 'blob_id1'], blobs_list) - blobs_list = yield manager.remote_list(order_by='-date') - self.assertEquals(['blob_id1', 'blob_id2'], blobs_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_count(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - deferreds = [] - for i in range(10): - deferreds.append(manager._encrypt_and_upload(str(i), BytesIO("1"))) - yield defer.gatherResults(deferreds) - - result = yield manager.count() - self.assertEquals({"count": len(deferreds)}, result) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_list_restricted_by_namespace(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - namespace = 'incoming' - yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), - namespace=namespace) - yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) - blobs_list = yield manager.remote_list(namespace=namespace) - self.assertEquals(['blob_id1'], blobs_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_list_default_doesnt_list_other_namespaces(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - namespace = 'incoming' - yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), - namespace=namespace) - yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) - blobs_list = yield manager.remote_list() - self.assertEquals(['blob_id2'], blobs_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_download_from_namespace(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - namespace, blob_id, content = 'incoming', 'blob_id1', 'test' - yield manager._encrypt_and_upload(blob_id, BytesIO(content), - namespace=namespace) - got_blob = yield manager._download_and_decrypt(blob_id, namespace) - self.assertEquals(content, got_blob[0].getvalue()) - - def __touch(self, *args): - path = os.path.join(*args) - with open(path, 'a'): - os.utime(path, None) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_upload_deny_duplicates(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - fd = BytesIO("save me") - yield manager._encrypt_and_upload('blob_id', fd) - fd = BytesIO("save me") - with pytest.raises(BlobAlreadyExistsError): - yield manager._encrypt_and_upload('blob_id', fd) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_send_missing(self): - manager = BlobManager(self.tempdir, self.uri, self.secret, - self.secret, uuid4().hex) - self.addCleanup(manager.close) - blob_id = 'local_only_blob_id' - yield manager.local.put(blob_id, BytesIO("X"), size=1) - yield manager.send_missing() - result = yield manager._download_and_decrypt(blob_id) - self.assertIsNotNone(result) - self.assertEquals(result[0].getvalue(), "X") - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_fetch_missing(self): - manager = BlobManager(self.tempdir, self.uri, self.secret, - self.secret, uuid4().hex) - self.addCleanup(manager.close) - blob_id = 'remote_only_blob_id' - yield manager._encrypt_and_upload(blob_id, BytesIO("X")) - yield manager.fetch_missing() - result = yield manager.local.get(blob_id) - self.assertIsNotNone(result) - self.assertEquals(result.getvalue(), "X") - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_upload_then_delete_updates_list(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) - yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) - yield manager._delete_from_remote('blob_id1') - blobs_list = yield manager.remote_list() - self.assertEquals(set(['blob_id2']), set(blobs_list)) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_upload_then_delete_updates_list_using_namespace(self): - manager = BlobManager('', self.uri, self.secret, - self.secret, uuid4().hex) - namespace = 'special_archives' - yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), - namespace=namespace) - yield manager._encrypt_and_upload('blob_id2', BytesIO("2"), - namespace=namespace) - yield manager._delete_from_remote('blob_id1', namespace=namespace) - blobs_list = yield manager.remote_list(namespace=namespace) - self.assertEquals(set(['blob_id2']), set(blobs_list)) diff --git a/testing/tests/server/test_config.py b/testing/tests/server/test_config.py deleted file mode 100644 index dfb09f4c..00000000 --- a/testing/tests/server/test_config.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- -# test_config.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for server configuration. -""" - -from twisted.trial import unittest -from pkg_resources import resource_filename - -from leap.soledad.server._config import _load_config -from leap.soledad.server._config import CONFIG_DEFAULTS - - -class ConfigurationParsingTest(unittest.TestCase): - - def setUp(self): - self.maxDiff = None - - def test_use_defaults_on_failure(self): - config = _load_config('this file will never exist') - expected = CONFIG_DEFAULTS - self.assertEquals(expected, config) - - def test_security_values_configuration(self): - # given - config_path = resource_filename('test_soledad', - 'fixture_soledad.conf') - # when - config = _load_config(config_path) - - # then - expected = {'members': ['user1', 'user2'], - 'members_roles': ['role1', 'role2'], - 'admins': ['user3', 'user4'], - 'admins_roles': ['role3', 'role3']} - self.assertDictEqual(expected, config['database-security']) - - def test_server_values_configuration(self): - # given - config_path = resource_filename('test_soledad', - 'fixture_soledad.conf') - # when - config = _load_config(config_path) - - # then - expected = {'couch_url': - 'http://soledad:passwd@localhost:5984', - 'create_cmd': - 'sudo -u soledad-admin /usr/bin/soledad-create-userdb', - 'admin_netrc': - '/etc/couchdb/couchdb-soledad-admin.netrc', - 'batching': False, - 'blobs': False, - 'services_tokens_file': '/etc/soledad/services.tokens', - 'blobs_path': '/var/lib/soledad/blobs'} - self.assertDictEqual(expected, config['soledad-server']) diff --git a/testing/tests/server/test_incoming_flow_integration.py b/testing/tests/server/test_incoming_flow_integration.py deleted file mode 100644 index b492534f..00000000 --- a/testing/tests/server/test_incoming_flow_integration.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- -# test_incoming_flow_integration.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Integration tests for the complete flow of IncomingBox feature -""" -import pytest -from uuid import uuid4 -from twisted.trial import unittest -from twisted.web.server import Site -from twisted.internet import reactor -from twisted.internet import defer -from twisted.web.resource import Resource -from zope.interface import implementer - -from leap.soledad.client.incoming import IncomingBoxProcessingLoop -from leap.soledad.client.incoming import IncomingBox -from leap.soledad.server import _blobs as server_blobs -from leap.soledad.client._db.blobs import BlobManager -from leap.soledad.server._incoming import IncomingResource -from leap.soledad.server._blobs import BlobsServerState -from leap.soledad.client import interfaces - - -@implementer(interfaces.IIncomingBoxConsumer) -class GoodConsumer(object): - def __init__(self): - self.name = 'GoodConsumer' - self.processed, self.saved = [], [] - - def process(self, item, item_id, encrypted=True): - self.processed.append(item_id) - return defer.succeed([item_id]) - - def save(self, parts, item_id): - self.saved.append(item_id) - return defer.succeed(None) - - -class IncomingFlowIntegrationTestCase(unittest.TestCase): - - def setUp(self): - root = Resource() - state = BlobsServerState('filesystem', blobs_path=self.tempdir) - incoming_resource = IncomingResource(state) - blobs_resource = server_blobs.BlobsResource("filesystem", self.tempdir) - root.putChild('blobs', blobs_resource) - root.putChild('incoming', incoming_resource) - site = Site(root) - self.port = reactor.listenTCP(0, site, interface='127.0.0.1') - self.host = self.port.getHost() - self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) - self.blobs_uri = self.uri + 'blobs/' - self.incoming_uri = self.uri + 'incoming' - self.user_id = 'user-' + uuid4().hex - self.secret = 'A' * 96 - self.blob_manager = BlobManager(self.tempdir, self.blobs_uri, - self.secret, self.secret, - self.user_id) - self.box = IncomingBox(self.blob_manager, 'MX') - self.loop = IncomingBoxProcessingLoop(self.box) - # FIXME: We use blob_manager client only to avoid DelayedCalls - # Somehow treq being used here keeps a connection pool open - self.client = self.blob_manager._client - - def fill(self, messages): - deferreds = [] - for message_id, message in messages: - uri = '%s/%s/%s' % (self.incoming_uri, self.user_id, message_id) - deferreds.append(self.blob_manager._client.put(uri, data=message)) - return defer.gatherResults(deferreds) - - def tearDown(self): - self.port.stopListening() - self.blob_manager.close() - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_consume_a_incoming_message(self): - yield self.fill([('msg1', 'blob')]) - consumer = GoodConsumer() - self.loop.add_consumer(consumer) - yield self.loop() - self.assertIn('msg1', consumer.processed) diff --git a/testing/tests/server/test_incoming_resource.py b/testing/tests/server/test_incoming_resource.py deleted file mode 100644 index 0d4918b9..00000000 --- a/testing/tests/server/test_incoming_resource.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- coding: utf-8 -*- -# test_incoming_resource.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Unit tests for incoming API resource -""" -from twisted.trial import unittest -from twisted.web.test.test_web import DummyRequest -from leap.soledad.server._incoming import IncomingResource -from leap.soledad.server._incoming import IncomingFormatter -from leap.soledad.common.crypto import EncryptionSchemes -from io import BytesIO -from uuid import uuid4 -from mock import Mock - - -class IncomingResourceTestCase(unittest.TestCase): - - def setUp(self): - self.couchdb = Mock() - self.backend_factory = Mock() - self.backend_factory.open_database.return_value = self.couchdb - self.resource = IncomingResource(self.backend_factory) - self.user_uuid = uuid4().hex - - def test_save_document(self): - formatter = IncomingFormatter() - doc_id, scheme = uuid4().hex, EncryptionSchemes.PUBKEY - content = 'Incoming content' - request = DummyRequest([self.user_uuid, doc_id]) - request.content = BytesIO(content) - self.resource.render_PUT(request) - - open_database = self.backend_factory.open_database - open_database.assert_called_once_with(self.user_uuid) - self.couchdb.put_doc.assert_called_once() - doc = self.couchdb.put_doc.call_args[0][0] - self.assertEquals(doc_id, doc.doc_id) - self.assertEquals(formatter.format(content, scheme), doc.content) - - def test_formatter(self): - formatter = IncomingFormatter() - formatted = formatter.format('content', EncryptionSchemes.PUBKEY) - self.assertEquals(formatted['_enc_scheme'], EncryptionSchemes.PUBKEY) diff --git a/testing/tests/server/test_incoming_server.py b/testing/tests/server/test_incoming_server.py deleted file mode 100644 index 241bc581..00000000 --- a/testing/tests/server/test_incoming_server.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- -# test_incoming_server.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Integration tests for incoming API -""" -import pytest -import json -from io import BytesIO -from uuid import uuid4 -from twisted.web.test.test_web import DummyRequest -from twisted.web.server import Site -from twisted.internet import reactor -from twisted.internet import defer -import treq - -from leap.soledad.server._incoming import IncomingResource -from leap.soledad.server._blobs import BlobsServerState -from leap.soledad.server._incoming import IncomingFormatter -from leap.soledad.common.crypto import EncryptionSchemes -from leap.soledad.common.blobs import Flags -from test_soledad.util import CouchServerStateForTests -from test_soledad.util import CouchDBTestCase - - -class IncomingOnCouchServerTestCase(CouchDBTestCase): - - def setUp(self): - self.port = None - - def tearDown(self): - if self.port: - self.port.stopListening() - - def prepare(self, backend): - self.user_id = 'user-' + uuid4().hex - if backend == 'couch': - self.state = CouchServerStateForTests(self.couch_url) - self.state.ensure_database(self.user_id) - else: - self.state = BlobsServerState(backend) - root = IncomingResource(self.state) - site = Site(root) - self.port = reactor.listenTCP(0, site, interface='127.0.0.1') - self.host = self.port.getHost() - self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_put_incoming_creates_a_document_using_couch(self): - self.prepare('couch') - user_id, doc_id = self.user_id, uuid4().hex - content, scheme = 'Hi', EncryptionSchemes.PUBKEY - formatter = IncomingFormatter() - incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id) - yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) - db = self.state.open_database(user_id) - - doc = db.get_doc(doc_id) - self.assertEquals(doc.content, formatter.format(content, scheme)) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_put_incoming_creates_a_blob_using_filesystem(self): - self.prepare('filesystem') - user_id, doc_id = self.user_id, uuid4().hex - content = 'Hi' - formatter = IncomingFormatter() - incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id) - yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) - - db = self.state.open_database(user_id) - request = DummyRequest([user_id, doc_id]) - yield db.read_blob(user_id, doc_id, request, 'MX') - flags = db.get_flags(user_id, doc_id, request, 'MX') - flags = json.loads(flags) - expected = formatter.preamble(content, doc_id) + ' ' + content - self.assertEquals(expected, request.written[0]) - self.assertIn(Flags.PENDING, flags) diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py deleted file mode 100644 index 25f0cc2d..00000000 --- a/testing/tests/server/test_server.py +++ /dev/null @@ -1,230 +0,0 @@ -# -*- coding: utf-8 -*- -# test_server.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for server-related functionality. -""" -import binascii -import os -import pytest - -from six.moves.urllib.parse import urljoin -from uuid import uuid4 - -from twisted.internet import defer - -from leap.soledad.common.couch.state import CouchServerState -from leap.soledad.common.couch import CouchDatabase -from test_soledad.u1db_tests import TestCaseWithServer -from test_soledad.util import CouchDBTestCase -from test_soledad.util import ( - make_token_soledad_app, - make_soledad_document_for_test, - soledad_sync_target, -) - -from leap.soledad.client import _crypto -from leap.soledad.client import Soledad - - -@pytest.mark.needs_couch -@pytest.mark.usefixtures("method_tmpdir") -class EncryptedSyncTestCase( - CouchDBTestCase, TestCaseWithServer): - - """ - Tests for encrypted sync using Soledad server backed by a couch database. - """ - - # increase twisted.trial's timeout because large files syncing might take - # some time to finish. - timeout = 500 - - @staticmethod - def make_app_with_state(state): - return make_token_soledad_app(state) - - make_document_for_test = make_soledad_document_for_test - - sync_target = soledad_sync_target - - def _soledad_instance(self, user=None, passphrase=u'123', - prefix='', - secrets_path='secrets.json', - local_db_path='soledad.u1db', - server_url='', - cert_file=None, auth_token=None): - """ - Instantiate Soledad. - """ - - # this callback ensures we save a document which is sent to the shared - # db. - def _put_doc_side_effect(doc): - self._doc_put = doc - - if not server_url: - # attempt to find the soledad server url - server_address = None - server = getattr(self, 'server', None) - if server: - server_address = getattr(self.server, 'server_address', None) - else: - host = self.port.getHost() - server_address = (host.host, host.port) - if server_address: - server_url = 'http://%s:%d' % (server_address) - - return Soledad( - user, - passphrase, - secrets_path=os.path.join(self.tempdir, prefix, secrets_path), - local_db_path=os.path.join( - self.tempdir, prefix, local_db_path), - server_url=server_url, - cert_file=cert_file, - auth_token=auth_token, - shared_db=self.get_default_shared_mock(_put_doc_side_effect)) - - def make_app(self): - self.request_state = CouchServerState(self.couch_url) - return self.make_app_with_state(self.request_state) - - def setUp(self): - CouchDBTestCase.setUp(self) - TestCaseWithServer.setUp(self) - - def tearDown(self): - CouchDBTestCase.tearDown(self) - TestCaseWithServer.tearDown(self) - - def _test_encrypted_sym_sync(self, passphrase=u'123', doc_size=2, - number_of_docs=1): - """ - Test the complete syncing chain between two soledad dbs using a - Soledad server backed by a couch database. - """ - self.startTwistedServer() - user = 'user-' + uuid4().hex - - # this will store all docs ids to avoid get_all_docs - created_ids = [] - - # instantiate soledad and create a document - sol1 = self._soledad_instance( - user=user, - # token is verified in test_target.make_token_soledad_app - auth_token='auth-token', - passphrase=passphrase) - - # instantiate another soledad using the same secret as the previous - # one (so we can correctly verify the mac of the synced document) - sol2 = self._soledad_instance( - user=user, - prefix='x', - auth_token='auth-token', - secrets_path=sol1.secrets_path, - passphrase=passphrase) - - # ensure remote db exists before syncing - db = CouchDatabase.open_database( - urljoin(self.couch_url, 'user-' + user), - create=True) - - def _db1AssertEmptyDocList(results): - _, doclist = results - self.assertEqual([], doclist) - - def _db1CreateDocs(results): - deferreds = [] - for i in xrange(number_of_docs): - content = binascii.hexlify(os.urandom(doc_size / 2)) - d = sol1.create_doc({'data': content}) - d.addCallback(created_ids.append) - deferreds.append(d) - return defer.DeferredList(deferreds) - - def _db1AssertDocsSyncedToServer(results): - self.assertEqual(number_of_docs, len(created_ids)) - for soldoc in created_ids: - couchdoc = db.get_doc(soldoc.doc_id) - self.assertTrue(couchdoc) - # assert document structure in couch server - self.assertEqual(soldoc.doc_id, couchdoc.doc_id) - self.assertEqual(soldoc.rev, couchdoc.rev) - couch_content = couchdoc.content.keys() - self.assertEqual(['raw'], couch_content) - content = couchdoc.get_json() - self.assertTrue(_crypto.is_symmetrically_encrypted(content)) - - d = sol1.get_all_docs() - d.addCallback(_db1AssertEmptyDocList) - d.addCallback(_db1CreateDocs) - d.addCallback(lambda _: sol1.sync()) - d.addCallback(_db1AssertDocsSyncedToServer) - - def _db2AssertEmptyDocList(results): - _, doclist = results - self.assertEqual([], doclist) - - def _getAllDocsFromBothDbs(results): - d1 = sol1.get_all_docs() - d2 = sol2.get_all_docs() - return defer.DeferredList([d1, d2]) - - d.addCallback(lambda _: sol2.get_all_docs()) - d.addCallback(_db2AssertEmptyDocList) - d.addCallback(lambda _: sol2.sync()) - d.addCallback(_getAllDocsFromBothDbs) - - def _assertDocSyncedFromDb1ToDb2(results): - r1, r2 = results - _, (gen1, doclist1) = r1 - _, (gen2, doclist2) = r2 - self.assertEqual(number_of_docs, gen1) - self.assertEqual(number_of_docs, gen2) - self.assertEqual(number_of_docs, len(doclist1)) - self.assertEqual(number_of_docs, len(doclist2)) - self.assertEqual(doclist1[0], doclist2[0]) - - d.addCallback(_assertDocSyncedFromDb1ToDb2) - - def _cleanUp(results): - db.delete_database() - db.close() - sol1.close() - sol2.close() - - d.addCallback(_cleanUp) - - return d - - def test_encrypted_sym_sync(self): - return self._test_encrypted_sym_sync() - - def test_encrypted_sym_sync_with_unicode_passphrase(self): - """ - Test the complete syncing chain between two soledad dbs using a - Soledad server backed by a couch database, using an unicode - passphrase. - """ - return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëíìïóòöõúùüñç') - - def test_sync_many_small_files(self): - """ - Test if Soledad can sync many smallfiles. - """ - return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100) diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py deleted file mode 100644 index 3dbd2740..00000000 --- a/testing/tests/server/test_session.py +++ /dev/null @@ -1,195 +0,0 @@ -# -*- coding: utf-8 -*- -# test_session.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for server session entrypoint. -""" -from twisted.trial import unittest - -from twisted.cred import portal -from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse -from twisted.cred.credentials import IUsernamePassword -from twisted.web.resource import getChildForRequest -from twisted.web.static import Data -from twisted.web.test.requesthelper import DummyRequest -from twisted.web.test.test_httpauth import b64encode -from twisted.web.test.test_httpauth import Realm -from twisted.web._auth.wrapper import UnauthorizedResource - -from leap.soledad.server.session import SoledadSession - - -class SoledadSessionTestCase(unittest.TestCase): - """ - Tests adapted from for - L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. - """ - - def makeRequest(self, *args, **kwargs): - request = DummyRequest(*args, **kwargs) - request.path = '/' - return request - - def setUp(self): - self.username = b'foo bar' - self.password = b'bar baz' - self.avatarContent = b"contents of the avatar resource itself" - self.childName = b"foo-child" - self.childContent = b"contents of the foo child of the avatar" - self.checker = InMemoryUsernamePasswordDatabaseDontUse() - self.checker.addUser(self.username, self.password) - self.avatar = Data(self.avatarContent, 'text/plain') - self.avatar.putChild( - self.childName, Data(self.childContent, 'text/plain')) - self.avatars = {self.username: self.avatar} - self.realm = Realm(self.avatars.get) - self.portal = portal.Portal(self.realm, [self.checker]) - self.wrapper = SoledadSession(self.portal) - - def _authorizedTokenLogin(self, request): - authorization = b64encode( - self.username + b':' + self.password) - request.requestHeaders.addRawHeader(b'authorization', - b'Token ' + authorization) - return getChildForRequest(self.wrapper, request) - - def test_getChildWithDefault(self): - request = self.makeRequest([self.childName]) - child = getChildForRequest(self.wrapper, request) - d = request.notifyFinish() - - def cbFinished(result): - self.assertEqual(request.responseCode, 401) - - d.addCallback(cbFinished) - request.render(child) - return d - - def _invalidAuthorizationTest(self, response): - request = self.makeRequest([self.childName]) - request.requestHeaders.addRawHeader(b'authorization', response) - child = getChildForRequest(self.wrapper, request) - d = request.notifyFinish() - - def cbFinished(result): - self.assertEqual(request.responseCode, 401) - - d.addCallback(cbFinished) - request.render(child) - return d - - def test_getChildWithDefaultUnauthorizedUser(self): - return self._invalidAuthorizationTest( - b'Basic ' + b64encode(b'foo:bar')) - - def test_getChildWithDefaultUnauthorizedPassword(self): - return self._invalidAuthorizationTest( - b'Basic ' + b64encode(self.username + b':bar')) - - def test_getChildWithDefaultUnrecognizedScheme(self): - return self._invalidAuthorizationTest(b'Quux foo bar baz') - - def test_getChildWithDefaultAuthorized(self): - request = self.makeRequest([self.childName]) - child = self._authorizedTokenLogin(request) - d = request.notifyFinish() - - def cbFinished(ignored): - self.assertEqual(request.written, [self.childContent]) - - d.addCallback(cbFinished) - request.render(child) - return d - - def test_renderAuthorized(self): - # Request it exactly, not any of its children. - request = self.makeRequest([]) - child = self._authorizedTokenLogin(request) - d = request.notifyFinish() - - def cbFinished(ignored): - self.assertEqual(request.written, [self.avatarContent]) - - d.addCallback(cbFinished) - request.render(child) - return d - - def test_decodeRaises(self): - request = self.makeRequest([self.childName]) - request.requestHeaders.addRawHeader(b'authorization', - b'Token decode should fail') - child = getChildForRequest(self.wrapper, request) - self.assertIsInstance(child, UnauthorizedResource) - - def test_parseResponse(self): - basicAuthorization = b'Basic abcdef123456' - self.assertEqual( - self.wrapper._parseHeader(basicAuthorization), - None) - tokenAuthorization = b'Token abcdef123456' - self.assertEqual( - self.wrapper._parseHeader(tokenAuthorization), - b'abcdef123456') - - def test_unexpectedDecodeError(self): - - class UnexpectedException(Exception): - pass - - class BadFactory(object): - scheme = b'bad' - - def getChallenge(self, client): - return {} - - def decode(self, response, request): - print("decode raised") - raise UnexpectedException() - - self.wrapper._credentialFactory = BadFactory() - request = self.makeRequest([self.childName]) - request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') - child = getChildForRequest(self.wrapper, request) - request.render(child) - self.assertEqual(request.responseCode, 500) - errors = self.flushLoggedErrors(UnexpectedException) - self.assertEqual(len(errors), 1) - - def test_unexpectedLoginError(self): - class UnexpectedException(Exception): - pass - - class BrokenChecker(object): - credentialInterfaces = (IUsernamePassword,) - - def requestAvatarId(self, credentials): - raise UnexpectedException() - - self.portal.registerChecker(BrokenChecker()) - request = self.makeRequest([self.childName]) - child = self._authorizedTokenLogin(request) - request.render(child) - self.assertEqual(request.responseCode, 500) - self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1) - - def test_cantAccessOtherUserPathByDefault(self): - request = self.makeRequest([]) - # valid url_mapper path, but for another user - request.path = '/blobs/another-user/' - child = self._authorizedTokenLogin(request) - - request.render(child) - self.assertEqual(request.responseCode, 500) diff --git a/testing/tests/server/test_shared_db.py b/testing/tests/server/test_shared_db.py deleted file mode 100644 index 96af6dff..00000000 --- a/testing/tests/server/test_shared_db.py +++ /dev/null @@ -1,69 +0,0 @@ -# -*- coding: utf-8 -*- -# test_shared_db.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the shared db on server side. -""" - - -import pytest - -from twisted.trial import unittest - -from leap.soledad.client.shared_db import SoledadSharedDatabase -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.l2db.errors import RevisionConflict - - -@pytest.mark.needs_couch -class SharedDbTests(unittest.TestCase): - """ - """ - - URL = 'http://127.0.0.1:2424/shared' - CREDS = {'token': {'uuid': 'an-uuid', 'token': 'an-auth-token'}} - - @pytest.fixture(autouse=True) - def soledad_client(self, soledad_server, soledad_dbs): - soledad_dbs('an-uuid') - self._db = SoledadSharedDatabase.open_database(self.URL, self.CREDS) - - @pytest.mark.thisone - def test_doc_update_succeeds(self): - doc_id = 'some-random-doc' - self.assertIsNone(self._db.get_doc(doc_id)) - # create a document in shared db - doc = SoledadDocument(doc_id=doc_id) - self._db.put_doc(doc) - # update that document - expected = {'new': 'content'} - doc.content = expected - self._db.put_doc(doc) - # ensure expected content was saved - doc = self._db.get_doc(doc_id) - self.assertEqual(expected, doc.content) - - @pytest.mark.thisone - def test_doc_update_fails_with_wrong_rev(self): - # create a document in shared db - doc_id = 'some-random-doc' - self.assertIsNone(self._db.get_doc(doc_id)) - # create a document in shared db - doc = SoledadDocument(doc_id=doc_id) - self._db.put_doc(doc) - # try to update document without including revision of old version - doc.rev = 'wrong-rev' - self.assertRaises(RevisionConflict, self._db.put_doc, doc) diff --git a/testing/tests/server/test_tac.py b/testing/tests/server/test_tac.py deleted file mode 100644 index 7bb50e35..00000000 --- a/testing/tests/server/test_tac.py +++ /dev/null @@ -1,87 +0,0 @@ -# -*- coding: utf-8 -*- -# test_tac.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the localhost/public APIs using .tac file. -See docs/auth.rst -""" - - -import os -import signal -import socket -import pytest -import treq - -from pkg_resources import resource_filename -from twisted.trial import unittest -from twisted.internet import defer, reactor -from twisted.internet.protocol import ProcessProtocol -from twisted.web.client import Agent - - -TAC_FILE_PATH = resource_filename('leap.soledad.server', 'server.tac') - - -class TacServerTestCase(unittest.TestCase): - - def test_tac_file_exists(self): - msg = "server.tac used on this test case was expected to be at %s" - self.assertTrue(os.path.isfile(TAC_FILE_PATH), msg % TAC_FILE_PATH) - - @defer.inlineCallbacks - def test_local_public_default_ports_on_server_tac(self): - yield self._spawnServer() - result = yield self._get('http://localhost:2525/incoming') - fail_msg = "Localhost endpoint must require authentication!" - self.assertEquals(401, result.code, fail_msg) - - public_endpoint_url = 'http://%s:2424/' % self._get_public_ip() - result = yield self._get(public_endpoint_url) - self.assertEquals(200, result.code, "server info not accessible") - - result = yield self._get(public_endpoint_url + 'other') - self.assertEquals(401, result.code, "public server lacks auth!") - - public_using_local_port_url = 'http://%s:2525/' % self._get_public_ip() - with pytest.raises(Exception): - yield self._get(public_using_local_port_url) - - def _spawnServer(self): - protocol = ProcessProtocol() - env = os.environ.get('VIRTUAL_ENV', '/usr') - executable = os.path.join(env, 'bin', 'twistd') - no_pid_argument = '--pidfile=' - args = [executable, no_pid_argument, '-noy', TAC_FILE_PATH] - env = {'DEBUG_SERVER': 'yes'} - t = reactor.spawnProcess(protocol, executable, args, env=env) - self.addCleanup(os.kill, t.pid, signal.SIGKILL) - self.addCleanup(t.loseConnection) - return self._sleep(1) # it takes a while to start server - - def _sleep(self, time): - d = defer.Deferred() - reactor.callLater(time, d.callback, True) - return d - - def _get(self, *args, **kwargs): - kwargs['agent'] = Agent(reactor) - return treq.get(*args, **kwargs) - - def _get_public_ip(self): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - return s.getsockname()[0] diff --git a/testing/tests/server/test_url_mapper.py b/testing/tests/server/test_url_mapper.py deleted file mode 100644 index a04e7593..00000000 --- a/testing/tests/server/test_url_mapper.py +++ /dev/null @@ -1,133 +0,0 @@ -# -*- coding: utf-8 -*- -# test_url_mapper.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for server-related functionality. -""" -import pytest - -from twisted.trial import unittest -from uuid import uuid4 - -from leap.soledad.server.url_mapper import URLMapper - - -class URLMapperTestCase(unittest.TestCase): - """ - Test if the URLMapper behaves as expected. - - The following table lists the authorized actions among all possible - u1db remote actions: - - URL path | Authorized actions - -------------------------------------------------- - / | GET - /shared-db | GET - /shared-db/docs | - - /shared-db/doc/{id} | - - /shared-db/sync-from/{source} | - - /user-db | - - /user-db/docs | - - /user-db/doc/{id} | - - /user-db/sync-from/{source} | GET, PUT, POST - """ - - def setUp(self): - self._uuid = uuid4().hex - self._urlmap = URLMapper() - self._dbname = 'user-%s' % self._uuid - - @pytest.mark.needs_couch - def test_root_authorized(self): - match = self._urlmap.match('/', 'GET') - self.assertIsNotNone(match) - - def test_shared_authorized(self): - self.assertIsNotNone(self._urlmap.match('/shared', 'GET')) - - def test_shared_unauthorized(self): - self.assertIsNone(self._urlmap.match('/shared', 'PUT')) - self.assertIsNone(self._urlmap.match('/shared', 'DELETE')) - self.assertIsNone(self._urlmap.match('/shared', 'POST')) - - def test_shared_docs_unauthorized(self): - self.assertIsNone(self._urlmap.match('/shared/docs', 'GET')) - self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT')) - self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE')) - self.assertIsNone(self._urlmap.match('/shared/docs', 'POST')) - - def test_shared_doc_authorized(self): - match = self._urlmap.match('/shared/doc/x', 'GET') - self.assertIsNotNone(match) - self.assertEqual('x', match.get('id')) - - match = self._urlmap.match('/shared/doc/x', 'PUT') - self.assertIsNotNone(match) - self.assertEqual('x', match.get('id')) - - match = self._urlmap.match('/shared/doc/x', 'DELETE') - self.assertIsNotNone(match) - self.assertEqual('x', match.get('id')) - - def test_shared_doc_unauthorized(self): - self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST')) - - def test_shared_sync_unauthorized(self): - self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET')) - self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT')) - self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE')) - self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST')) - - def test_user_db_unauthorized(self): - dbname = self._dbname - self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET')) - self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT')) - self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE')) - self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST')) - - def test_user_db_docs_unauthorized(self): - dbname = self._dbname - self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET')) - self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT')) - self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE')) - self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST')) - - def test_user_db_doc_unauthorized(self): - dbname = self._dbname - self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET')) - self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT')) - self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE')) - self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST')) - - def test_user_db_sync_authorized(self): - uuid = self._uuid - dbname = self._dbname - match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET') - self.assertEqual(uuid, match.get('uuid')) - self.assertEqual('x', match.get('source_replica_uid')) - - match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT') - self.assertEqual(uuid, match.get('uuid')) - self.assertEqual('x', match.get('source_replica_uid')) - - match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST') - self.assertEqual(uuid, match.get('uuid')) - self.assertEqual('x', match.get('source_replica_uid')) - - def test_user_db_sync_unauthorized(self): - dbname = self._dbname - self.assertIsNone( - self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE')) -- cgit v1.2.3