diff options
Diffstat (limited to 'testing/tests/server')
-rw-r--r-- | testing/tests/server/test__resource.py | 66 | ||||
-rw-r--r-- | testing/tests/server/test__server_info.py | 43 | ||||
-rw-r--r-- | testing/tests/server/test_auth.py | 104 | ||||
-rw-r--r-- | testing/tests/server/test_config.py | 69 | ||||
-rw-r--r-- | testing/tests/server/test_server.py | 291 | ||||
-rw-r--r-- | testing/tests/server/test_session.py | 186 | ||||
-rw-r--r-- | testing/tests/server/test_url_mapper.py | 131 |
7 files changed, 600 insertions, 290 deletions
diff --git a/testing/tests/server/test__resource.py b/testing/tests/server/test__resource.py new file mode 100644 index 00000000..c066435e --- /dev/null +++ b/testing/tests/server/test__resource.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# test__resource.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for Soledad server main resource. +""" +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from twisted.web.wsgi import WSGIResource +from twisted.web.resource import getChildForRequest +from twisted.internet import reactor + +from leap.soledad.server._resource import SoledadResource +from leap.soledad.server._server_info import ServerInfo +from leap.soledad.server._blobs import BlobsResource +from leap.soledad.server.gzip_middleware import GzipMiddleware + + +_pool = reactor.getThreadPool() + + +class SoledadResourceTestCase(unittest.TestCase): + + def test_get_root(self): + enable_blobs = None # doesn't matter + resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) + request = DummyRequest(['']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, ServerInfo) + + def test_get_blobs_enabled(self): + enable_blobs = True + resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) + request = DummyRequest(['blobs']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, BlobsResource) + + def test_get_blobs_disabled(self): + enable_blobs = False + resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) + request = DummyRequest(['blobs']) + child = getChildForRequest(resource, request) + # if blobs is disabled, the request should be routed to sync + self.assertIsInstance(child, WSGIResource) + self.assertIsInstance(child._application, GzipMiddleware) + + def test_get_sync(self): + enable_blobs = None # doesn't matter + resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) + request = DummyRequest(['user-db', 'sync-from', 'source-id']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, WSGIResource) + self.assertIsInstance(child._application, GzipMiddleware) diff --git a/testing/tests/server/test__server_info.py b/testing/tests/server/test__server_info.py new file mode 100644 index 00000000..40567ef1 --- /dev/null +++ b/testing/tests/server/test__server_info.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# test__server_info.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Tests for Soledad server information announcement. +""" +import json + +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest + +from leap.soledad.server._server_info import ServerInfo + + +class ServerInfoTestCase(unittest.TestCase): + + def test_blobs_enabled(self): + resource = ServerInfo(True) + response = resource.render(DummyRequest([''])) + _info = json.loads(response) + self.assertEquals(_info['blobs'], True) + self.assertTrue(isinstance(_info['version'], basestring)) + + def test_blobs_disabled(self): + resource = ServerInfo(False) + response = resource.render(DummyRequest([''])) + _info = json.loads(response) + self.assertEquals(_info['blobs'], False) + self.assertTrue(isinstance(_info['version'], basestring)) diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py new file mode 100644 index 00000000..6eb647ee --- /dev/null +++ b/testing/tests/server/test_auth.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# test_auth.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for auth pieces. +""" +import collections + +from contextlib import contextmanager + +from twisted.cred.credentials import UsernamePassword +from twisted.cred.error import UnauthorizedLogin +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks +from twisted.trial import unittest +from twisted.web.resource import IResource +from twisted.web.test import test_httpauth + +import leap.soledad.server.auth as auth_module +from leap.soledad.server.auth import SoledadRealm +from leap.soledad.server.auth import TokenChecker +from leap.soledad.server.auth import TokenCredentialFactory +from leap.soledad.server._resource import SoledadResource + + +class SoledadRealmTestCase(unittest.TestCase): + + def test_returned_resource(self): + # we have to pass a pool to the realm , otherwise tests will hang + conf = {'blobs': False} + pool = reactor.getThreadPool() + realm = SoledadRealm(conf=conf, sync_pool=pool) + iface, avatar, logout = realm.requestAvatar('any', None, IResource) + self.assertIsInstance(avatar, SoledadResource) + self.assertIsNone(logout()) + + +class DummyServer(object): + """ + I fake the `couchdb.client.Server` GET api and always return the token + given on my creation. + """ + + def __init__(self, token): + self._token = token + + def get(self, _): + return self._token + + +@contextmanager +def dummy_server(token): + yield collections.defaultdict(lambda: DummyServer(token)) + + +class TokenCheckerTestCase(unittest.TestCase): + + @inlineCallbacks + def test_good_creds(self): + # set up a dummy server which always return a *valid* token document + token = {'user_id': 'user', 'type': 'Token'} + server = dummy_server(token) + # setup the checker with the custom server + checker = TokenChecker() + auth_module.couch_server = lambda url: server + # assert the checker *can* verify the creds + creds = UsernamePassword('user', 'pass') + avatarId = yield checker.requestAvatarId(creds) + self.assertEqual('user', avatarId) + + @inlineCallbacks + def test_bad_creds(self): + # set up a dummy server which always return an *invalid* token document + token = None + server = dummy_server(token) + # setup the checker with the custom server + checker = TokenChecker() + auth_module.couch_server = lambda url: server + # assert the checker *cannot* verify the creds + creds = UsernamePassword('user', '') + with self.assertRaises(UnauthorizedLogin): + yield checker.requestAvatarId(creds) + + +class TokenCredentialFactoryTestcase( + test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin, + unittest.TestCase): + + def setUp(self): + test_httpauth.BasicAuthTestsMixin.setUp(self) + self.credentialFactory = TokenCredentialFactory() diff --git a/testing/tests/server/test_config.py b/testing/tests/server/test_config.py new file mode 100644 index 00000000..d2a8a9de --- /dev/null +++ b/testing/tests/server/test_config.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# test_config.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server configuration. +""" + +from twisted.trial import unittest +from pkg_resources import resource_filename + +from leap.soledad.server._config import _load_config +from leap.soledad.server._config import CONFIG_DEFAULTS + + +class ConfigurationParsingTest(unittest.TestCase): + + def setUp(self): + self.maxDiff = None + + def test_use_defaults_on_failure(self): + config = _load_config('this file will never exist') + expected = CONFIG_DEFAULTS + self.assertEquals(expected, config) + + def test_security_values_configuration(self): + # given + config_path = resource_filename('test_soledad', + 'fixture_soledad.conf') + # when + config = _load_config(config_path) + + # then + expected = {'members': ['user1', 'user2'], + 'members_roles': ['role1', 'role2'], + 'admins': ['user3', 'user4'], + 'admins_roles': ['role3', 'role3']} + self.assertDictEqual(expected, config['database-security']) + + def test_server_values_configuration(self): + # given + config_path = resource_filename('test_soledad', + 'fixture_soledad.conf') + # when + config = _load_config(config_path) + + # then + expected = {'couch_url': + 'http://soledad:passwd@localhost:5984', + 'create_cmd': + 'sudo -u soledad-admin /usr/bin/create-user-db', + 'admin_netrc': + '/etc/couchdb/couchdb-soledad-admin.netrc', + 'batching': False, + 'blobs': False, + 'blobs_path': '/srv/leap/soledad/blobs'} + self.assertDictEqual(expected, config['soledad-server']) diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py index 6710caaf..4a5ec43f 100644 --- a/testing/tests/server/test_server.py +++ b/testing/tests/server/test_server.py @@ -18,17 +18,13 @@ Tests for server-related functionality. """ import binascii -import mock import os import pytest -from hashlib import sha512 -from pkg_resources import resource_filename from urlparse import urljoin from uuid import uuid4 from twisted.internet import defer -from twisted.trial import unittest from leap.soledad.common.couch.state import CouchServerState from leap.soledad.common.couch import CouchDatabase @@ -38,253 +34,10 @@ from test_soledad.util import ( make_token_soledad_app, make_soledad_document_for_test, soledad_sync_target, - BaseSoledadTest, ) from leap.soledad.client import _crypto from leap.soledad.client import Soledad -from leap.soledad.server.config import load_configuration -from leap.soledad.server.config import CONFIG_DEFAULTS -from leap.soledad.server.auth import URLToAuthorization -from leap.soledad.server.auth import SoledadTokenAuthMiddleware - - -class ServerAuthenticationMiddlewareTestCase(CouchDBTestCase): - - def setUp(self): - super(ServerAuthenticationMiddlewareTestCase, self).setUp() - app = mock.Mock() - self._state = CouchServerState(self.couch_url) - app.state = self._state - self.auth_middleware = SoledadTokenAuthMiddleware(app) - self._authorize('valid-uuid', 'valid-token') - - def _authorize(self, uuid, token): - token_doc = {} - token_doc['_id'] = sha512(token).hexdigest() - token_doc[self._state.TOKENS_USER_ID_KEY] = uuid - token_doc[self._state.TOKENS_TYPE_KEY] = \ - self._state.TOKENS_TYPE_DEF - dbname = self._state._tokens_dbname() - db = self.couch_server.create(dbname) - db.save(token_doc) - self.addCleanup(self.delete_db, db.name) - - def test_authorized_user(self): - is_authorized = self.auth_middleware._verify_authentication_data - self.assertTrue(is_authorized('valid-uuid', 'valid-token')) - self.assertFalse(is_authorized('valid-uuid', 'invalid-token')) - self.assertFalse(is_authorized('invalid-uuid', 'valid-token')) - self.assertFalse(is_authorized('eve', 'invalid-token')) - - -class ServerAuthorizationTestCase(BaseSoledadTest): - - """ - Tests related to Soledad server authorization. - """ - - def setUp(self): - pass - - def tearDown(self): - pass - - def _make_environ(self, path_info, request_method): - return { - 'PATH_INFO': path_info, - 'REQUEST_METHOD': request_method, - } - - def test_verify_action_with_correct_dbnames(self): - """ - Test encrypting and decrypting documents. - - The following table lists the authorized actions among all possible - u1db remote actions: - - URL path | Authorized actions - -------------------------------------------------- - / | GET - /shared-db | GET - /shared-db/docs | - - /shared-db/doc/{id} | GET, PUT, DELETE - /shared-db/sync-from/{source} | - - /user-db | GET, PUT, DELETE - /user-db/docs | - - /user-db/doc/{id} | - - /user-db/sync-from/{source} | GET, PUT, POST - """ - uuid = uuid4().hex - authmap = URLToAuthorization(uuid,) - dbname = authmap._user_db_name - # test global auth - self.assertTrue( - authmap.is_authorized(self._make_environ('/', 'GET'))) - # test shared-db database resource auth - self.assertTrue( - authmap.is_authorized( - self._make_environ('/shared', 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared', 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared', 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared', 'POST'))) - # test shared-db docs resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/docs', 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/docs', 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/docs', 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/docs', 'POST'))) - # test shared-db doc resource auth - self.assertTrue( - authmap.is_authorized( - self._make_environ('/shared/doc/x', 'GET'))) - self.assertTrue( - authmap.is_authorized( - self._make_environ('/shared/doc/x', 'PUT'))) - self.assertTrue( - authmap.is_authorized( - self._make_environ('/shared/doc/x', 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/doc/x', 'POST'))) - # test shared-db sync resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/sync-from/x', 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/sync-from/x', 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/sync-from/x', 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/shared/sync-from/x', 'POST'))) - # test user-db database resource auth - self.assertTrue( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'GET'))) - self.assertTrue( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'PUT'))) - self.assertTrue( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'POST'))) - # test user-db docs resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'POST'))) - # test user-db doc resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'POST'))) - # test user-db sync resource auth - self.assertTrue( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'GET'))) - self.assertTrue( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'DELETE'))) - self.assertTrue( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'POST'))) - - def test_verify_action_with_wrong_dbnames(self): - """ - Test if authorization fails for a wrong dbname. - """ - uuid = uuid4().hex - authmap = URLToAuthorization(uuid) - dbname = 'somedb' - # test wrong-db database resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s' % dbname, 'POST'))) - # test wrong-db docs resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/docs' % dbname, 'POST'))) - # test wrong-db doc resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/doc/x' % dbname, 'POST'))) - # test wrong-db sync resource auth - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'GET'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'PUT'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'DELETE'))) - self.assertFalse( - authmap.is_authorized( - self._make_environ('/%s/sync-from/x' % dbname, 'POST'))) @pytest.mark.usefixtures("method_tmpdir") @@ -382,7 +135,7 @@ class EncryptedSyncTestCase( user=user, prefix='x', auth_token='auth-token', - secrets_path=sol1._secrets_path, + secrets_path=sol1.secrets_path, passphrase=passphrase) # ensure remote db exists before syncing @@ -474,45 +227,3 @@ class EncryptedSyncTestCase( Test if Soledad can sync many smallfiles. """ return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100) - - -class ConfigurationParsingTest(unittest.TestCase): - - def setUp(self): - self.maxDiff = None - - def test_use_defaults_on_failure(self): - config = load_configuration('this file will never exist') - expected = CONFIG_DEFAULTS - self.assertEquals(expected, config) - - def test_security_values_configuration(self): - # given - config_path = resource_filename('test_soledad', - 'fixture_soledad.conf') - # when - config = load_configuration(config_path) - - # then - expected = {'members': ['user1', 'user2'], - 'members_roles': ['role1', 'role2'], - 'admins': ['user3', 'user4'], - 'admins_roles': ['role3', 'role3']} - self.assertDictEqual(expected, config['database-security']) - - def test_server_values_configuration(self): - # given - config_path = resource_filename('test_soledad', - 'fixture_soledad.conf') - # when - config = load_configuration(config_path) - - # then - expected = {'couch_url': - 'http://soledad:passwd@localhost:5984', - 'create_cmd': - 'sudo -u soledad-admin /usr/bin/create-user-db', - 'admin_netrc': - '/etc/couchdb/couchdb-soledad-admin.netrc', - 'batching': False} - self.assertDictEqual(expected, config['soledad-server']) diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py new file mode 100644 index 00000000..ebb94476 --- /dev/null +++ b/testing/tests/server/test_session.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# test_session.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server session entrypoint. +""" +from twisted.trial import unittest + +from twisted.cred import portal +from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse +from twisted.cred.credentials import IUsernamePassword +from twisted.web.resource import getChildForRequest +from twisted.web.static import Data +from twisted.web.test.requesthelper import DummyRequest +from twisted.web.test.test_httpauth import b64encode +from twisted.web.test.test_httpauth import Realm +from twisted.web._auth.wrapper import UnauthorizedResource + +from leap.soledad.server.session import SoledadSession + + +class SoledadSessionTestCase(unittest.TestCase): + """ + Tests adapted from for + L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. + """ + + def makeRequest(self, *args, **kwargs): + request = DummyRequest(*args, **kwargs) + request.path = '/' + return request + + def setUp(self): + self.username = b'foo bar' + self.password = b'bar baz' + self.avatarContent = b"contents of the avatar resource itself" + self.childName = b"foo-child" + self.childContent = b"contents of the foo child of the avatar" + self.checker = InMemoryUsernamePasswordDatabaseDontUse() + self.checker.addUser(self.username, self.password) + self.avatar = Data(self.avatarContent, 'text/plain') + self.avatar.putChild( + self.childName, Data(self.childContent, 'text/plain')) + self.avatars = {self.username: self.avatar} + self.realm = Realm(self.avatars.get) + self.portal = portal.Portal(self.realm, [self.checker]) + self.wrapper = SoledadSession(self.portal) + + def _authorizedTokenLogin(self, request): + authorization = b64encode( + self.username + b':' + self.password) + request.requestHeaders.addRawHeader(b'authorization', + b'Token ' + authorization) + return getChildForRequest(self.wrapper, request) + + def test_getChildWithDefault(self): + request = self.makeRequest([self.childName]) + child = getChildForRequest(self.wrapper, request) + d = request.notifyFinish() + + def cbFinished(result): + self.assertEqual(request.responseCode, 401) + + d.addCallback(cbFinished) + request.render(child) + return d + + def _invalidAuthorizationTest(self, response): + request = self.makeRequest([self.childName]) + request.requestHeaders.addRawHeader(b'authorization', response) + child = getChildForRequest(self.wrapper, request) + d = request.notifyFinish() + + def cbFinished(result): + self.assertEqual(request.responseCode, 401) + + d.addCallback(cbFinished) + request.render(child) + return d + + def test_getChildWithDefaultUnauthorizedUser(self): + return self._invalidAuthorizationTest( + b'Basic ' + b64encode(b'foo:bar')) + + def test_getChildWithDefaultUnauthorizedPassword(self): + return self._invalidAuthorizationTest( + b'Basic ' + b64encode(self.username + b':bar')) + + def test_getChildWithDefaultUnrecognizedScheme(self): + return self._invalidAuthorizationTest(b'Quux foo bar baz') + + def test_getChildWithDefaultAuthorized(self): + request = self.makeRequest([self.childName]) + child = self._authorizedTokenLogin(request) + d = request.notifyFinish() + + def cbFinished(ignored): + self.assertEqual(request.written, [self.childContent]) + + d.addCallback(cbFinished) + request.render(child) + return d + + def test_renderAuthorized(self): + # Request it exactly, not any of its children. + request = self.makeRequest([]) + child = self._authorizedTokenLogin(request) + d = request.notifyFinish() + + def cbFinished(ignored): + self.assertEqual(request.written, [self.avatarContent]) + + d.addCallback(cbFinished) + request.render(child) + return d + + def test_decodeRaises(self): + request = self.makeRequest([self.childName]) + request.requestHeaders.addRawHeader(b'authorization', + b'Token decode should fail') + child = getChildForRequest(self.wrapper, request) + self.assertIsInstance(child, UnauthorizedResource) + + def test_parseResponse(self): + basicAuthorization = b'Basic abcdef123456' + self.assertEqual( + self.wrapper._parseHeader(basicAuthorization), + None) + tokenAuthorization = b'Token abcdef123456' + self.assertEqual( + self.wrapper._parseHeader(tokenAuthorization), + b'abcdef123456') + + def test_unexpectedDecodeError(self): + + class UnexpectedException(Exception): + pass + + class BadFactory(object): + scheme = b'bad' + + def getChallenge(self, client): + return {} + + def decode(self, response, request): + print "decode raised" + raise UnexpectedException() + + self.wrapper._credentialFactory = BadFactory() + request = self.makeRequest([self.childName]) + request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') + child = getChildForRequest(self.wrapper, request) + request.render(child) + self.assertEqual(request.responseCode, 500) + errors = self.flushLoggedErrors(UnexpectedException) + self.assertEqual(len(errors), 1) + + def test_unexpectedLoginError(self): + class UnexpectedException(Exception): + pass + + class BrokenChecker(object): + credentialInterfaces = (IUsernamePassword,) + + def requestAvatarId(self, credentials): + raise UnexpectedException() + + self.portal.registerChecker(BrokenChecker()) + request = self.makeRequest([self.childName]) + child = self._authorizedTokenLogin(request) + request.render(child) + self.assertEqual(request.responseCode, 500) + self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1) diff --git a/testing/tests/server/test_url_mapper.py b/testing/tests/server/test_url_mapper.py new file mode 100644 index 00000000..fa99cae7 --- /dev/null +++ b/testing/tests/server/test_url_mapper.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# test_url_mapper.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server-related functionality. +""" + +from twisted.trial import unittest +from uuid import uuid4 + +from leap.soledad.server.url_mapper import URLMapper + + +class URLMapperTestCase(unittest.TestCase): + """ + Test if the URLMapper behaves as expected. + + The following table lists the authorized actions among all possible + u1db remote actions: + + URL path | Authorized actions + -------------------------------------------------- + / | GET + /shared-db | GET + /shared-db/docs | - + /shared-db/doc/{id} | - + /shared-db/sync-from/{source} | - + /user-db | - + /user-db/docs | - + /user-db/doc/{id} | - + /user-db/sync-from/{source} | GET, PUT, POST + """ + + def setUp(self): + self._uuid = uuid4().hex + self._urlmap = URLMapper() + self._dbname = 'user-%s' % self._uuid + + def test_root_authorized(self): + match = self._urlmap.match('/', 'GET') + self.assertIsNotNone(match) + + def test_shared_authorized(self): + self.assertIsNotNone(self._urlmap.match('/shared', 'GET')) + + def test_shared_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared', 'PUT')) + self.assertIsNone(self._urlmap.match('/shared', 'DELETE')) + self.assertIsNone(self._urlmap.match('/shared', 'POST')) + + def test_shared_docs_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared/docs', 'GET')) + self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT')) + self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE')) + self.assertIsNone(self._urlmap.match('/shared/docs', 'POST')) + + def test_shared_doc_authorized(self): + match = self._urlmap.match('/shared/doc/x', 'GET') + self.assertIsNotNone(match) + self.assertEqual('x', match.get('id')) + + match = self._urlmap.match('/shared/doc/x', 'PUT') + self.assertIsNotNone(match) + self.assertEqual('x', match.get('id')) + + match = self._urlmap.match('/shared/doc/x', 'DELETE') + self.assertIsNotNone(match) + self.assertEqual('x', match.get('id')) + + def test_shared_doc_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST')) + + def test_shared_sync_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET')) + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT')) + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE')) + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST')) + + def test_user_db_unauthorized(self): + dbname = self._dbname + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET')) + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT')) + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE')) + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST')) + + def test_user_db_docs_unauthorized(self): + dbname = self._dbname + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET')) + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT')) + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE')) + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST')) + + def test_user_db_doc_unauthorized(self): + dbname = self._dbname + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET')) + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT')) + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE')) + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST')) + + def test_user_db_sync_authorized(self): + uuid = self._uuid + dbname = self._dbname + match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET') + self.assertEqual(uuid, match.get('uuid')) + self.assertEqual('x', match.get('source_replica_uid')) + + match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT') + self.assertEqual(uuid, match.get('uuid')) + self.assertEqual('x', match.get('source_replica_uid')) + + match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST') + self.assertEqual(uuid, match.get('uuid')) + self.assertEqual('x', match.get('source_replica_uid')) + + def test_user_db_sync_unauthorized(self): + dbname = self._dbname + self.assertIsNone( + self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE')) |