diff options
Diffstat (limited to 'testing/tests/server')
| -rw-r--r-- | testing/tests/server/test__resource.py | 66 | ||||
| -rw-r--r-- | testing/tests/server/test__server_info.py | 43 | ||||
| -rw-r--r-- | testing/tests/server/test_auth.py | 104 | ||||
| -rw-r--r-- | testing/tests/server/test_config.py | 69 | ||||
| -rw-r--r-- | testing/tests/server/test_server.py | 291 | ||||
| -rw-r--r-- | testing/tests/server/test_session.py | 186 | ||||
| -rw-r--r-- | testing/tests/server/test_url_mapper.py | 131 | 
7 files changed, 600 insertions, 290 deletions
| diff --git a/testing/tests/server/test__resource.py b/testing/tests/server/test__resource.py new file mode 100644 index 00000000..c066435e --- /dev/null +++ b/testing/tests/server/test__resource.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# test__resource.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for Soledad server main resource. +""" +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from twisted.web.wsgi import WSGIResource +from twisted.web.resource import getChildForRequest +from twisted.internet import reactor + +from leap.soledad.server._resource import SoledadResource +from leap.soledad.server._server_info import ServerInfo +from leap.soledad.server._blobs import BlobsResource +from leap.soledad.server.gzip_middleware import GzipMiddleware + + +_pool = reactor.getThreadPool() + + +class SoledadResourceTestCase(unittest.TestCase): + +    def test_get_root(self): +        enable_blobs = None  # doesn't matter +        resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) +        request = DummyRequest(['']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, ServerInfo) + +    def test_get_blobs_enabled(self): +        enable_blobs = True +        resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) +        request = DummyRequest(['blobs']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, BlobsResource) + +    def test_get_blobs_disabled(self): +        enable_blobs = False +        resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) +        request = DummyRequest(['blobs']) +        child = getChildForRequest(resource, request) +        # if blobs is disabled, the request should be routed to sync +        self.assertIsInstance(child, WSGIResource) +        self.assertIsInstance(child._application, GzipMiddleware) + +    def test_get_sync(self): +        enable_blobs = None  # doesn't matter +        resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool) +        request = DummyRequest(['user-db', 'sync-from', 'source-id']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, WSGIResource) +        self.assertIsInstance(child._application, GzipMiddleware) diff --git a/testing/tests/server/test__server_info.py b/testing/tests/server/test__server_info.py new file mode 100644 index 00000000..40567ef1 --- /dev/null +++ b/testing/tests/server/test__server_info.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# test__server_info.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Tests for Soledad server information announcement. +""" +import json + +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest + +from leap.soledad.server._server_info import ServerInfo + + +class ServerInfoTestCase(unittest.TestCase): + +    def test_blobs_enabled(self): +        resource = ServerInfo(True) +        response = resource.render(DummyRequest([''])) +        _info = json.loads(response) +        self.assertEquals(_info['blobs'], True) +        self.assertTrue(isinstance(_info['version'], basestring)) + +    def test_blobs_disabled(self): +        resource = ServerInfo(False) +        response = resource.render(DummyRequest([''])) +        _info = json.loads(response) +        self.assertEquals(_info['blobs'], False) +        self.assertTrue(isinstance(_info['version'], basestring)) diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py new file mode 100644 index 00000000..6eb647ee --- /dev/null +++ b/testing/tests/server/test_auth.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# test_auth.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for auth pieces. +""" +import collections + +from contextlib import contextmanager + +from twisted.cred.credentials import UsernamePassword +from twisted.cred.error import UnauthorizedLogin +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks +from twisted.trial import unittest +from twisted.web.resource import IResource +from twisted.web.test import test_httpauth + +import leap.soledad.server.auth as auth_module +from leap.soledad.server.auth import SoledadRealm +from leap.soledad.server.auth import TokenChecker +from leap.soledad.server.auth import TokenCredentialFactory +from leap.soledad.server._resource import SoledadResource + + +class SoledadRealmTestCase(unittest.TestCase): + +    def test_returned_resource(self): +        # we have to pass a pool to the realm , otherwise tests will hang +        conf = {'blobs': False} +        pool = reactor.getThreadPool() +        realm = SoledadRealm(conf=conf, sync_pool=pool) +        iface, avatar, logout = realm.requestAvatar('any', None, IResource) +        self.assertIsInstance(avatar, SoledadResource) +        self.assertIsNone(logout()) + + +class DummyServer(object): +    """ +    I fake the `couchdb.client.Server` GET api and always return the token +    given on my creation. +    """ + +    def __init__(self, token): +        self._token = token + +    def get(self, _): +        return self._token + + +@contextmanager +def dummy_server(token): +    yield collections.defaultdict(lambda: DummyServer(token)) + + +class TokenCheckerTestCase(unittest.TestCase): + +    @inlineCallbacks +    def test_good_creds(self): +        # set up a dummy server which always return a *valid* token document +        token = {'user_id': 'user', 'type': 'Token'} +        server = dummy_server(token) +        # setup the checker with the custom server +        checker = TokenChecker() +        auth_module.couch_server = lambda url: server +        # assert the checker *can* verify the creds +        creds = UsernamePassword('user', 'pass') +        avatarId = yield checker.requestAvatarId(creds) +        self.assertEqual('user', avatarId) + +    @inlineCallbacks +    def test_bad_creds(self): +        # set up a dummy server which always return an *invalid* token document +        token = None +        server = dummy_server(token) +        # setup the checker with the custom server +        checker = TokenChecker() +        auth_module.couch_server = lambda url: server +        # assert the checker *cannot* verify the creds +        creds = UsernamePassword('user', '') +        with self.assertRaises(UnauthorizedLogin): +            yield checker.requestAvatarId(creds) + + +class TokenCredentialFactoryTestcase( +        test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin, +        unittest.TestCase): + +    def setUp(self): +        test_httpauth.BasicAuthTestsMixin.setUp(self) +        self.credentialFactory = TokenCredentialFactory() diff --git a/testing/tests/server/test_config.py b/testing/tests/server/test_config.py new file mode 100644 index 00000000..d2a8a9de --- /dev/null +++ b/testing/tests/server/test_config.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# test_config.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server configuration. +""" + +from twisted.trial import unittest +from pkg_resources import resource_filename + +from leap.soledad.server._config import _load_config +from leap.soledad.server._config import CONFIG_DEFAULTS + + +class ConfigurationParsingTest(unittest.TestCase): + +    def setUp(self): +        self.maxDiff = None + +    def test_use_defaults_on_failure(self): +        config = _load_config('this file will never exist') +        expected = CONFIG_DEFAULTS +        self.assertEquals(expected, config) + +    def test_security_values_configuration(self): +        # given +        config_path = resource_filename('test_soledad', +                                        'fixture_soledad.conf') +        # when +        config = _load_config(config_path) + +        # then +        expected = {'members': ['user1', 'user2'], +                    'members_roles': ['role1', 'role2'], +                    'admins': ['user3', 'user4'], +                    'admins_roles': ['role3', 'role3']} +        self.assertDictEqual(expected, config['database-security']) + +    def test_server_values_configuration(self): +        # given +        config_path = resource_filename('test_soledad', +                                        'fixture_soledad.conf') +        # when +        config = _load_config(config_path) + +        # then +        expected = {'couch_url': +                    'http://soledad:passwd@localhost:5984', +                    'create_cmd': +                    'sudo -u soledad-admin /usr/bin/create-user-db', +                    'admin_netrc': +                    '/etc/couchdb/couchdb-soledad-admin.netrc', +                    'batching': False, +                    'blobs': False, +                    'blobs_path': '/srv/leap/soledad/blobs'} +        self.assertDictEqual(expected, config['soledad-server']) diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py index 6710caaf..4a5ec43f 100644 --- a/testing/tests/server/test_server.py +++ b/testing/tests/server/test_server.py @@ -18,17 +18,13 @@  Tests for server-related functionality.  """  import binascii -import mock  import os  import pytest -from hashlib import sha512 -from pkg_resources import resource_filename  from urlparse import urljoin  from uuid import uuid4  from twisted.internet import defer -from twisted.trial import unittest  from leap.soledad.common.couch.state import CouchServerState  from leap.soledad.common.couch import CouchDatabase @@ -38,253 +34,10 @@ from test_soledad.util import (      make_token_soledad_app,      make_soledad_document_for_test,      soledad_sync_target, -    BaseSoledadTest,  )  from leap.soledad.client import _crypto  from leap.soledad.client import Soledad -from leap.soledad.server.config import load_configuration -from leap.soledad.server.config import CONFIG_DEFAULTS -from leap.soledad.server.auth import URLToAuthorization -from leap.soledad.server.auth import SoledadTokenAuthMiddleware - - -class ServerAuthenticationMiddlewareTestCase(CouchDBTestCase): - -    def setUp(self): -        super(ServerAuthenticationMiddlewareTestCase, self).setUp() -        app = mock.Mock() -        self._state = CouchServerState(self.couch_url) -        app.state = self._state -        self.auth_middleware = SoledadTokenAuthMiddleware(app) -        self._authorize('valid-uuid', 'valid-token') - -    def _authorize(self, uuid, token): -        token_doc = {} -        token_doc['_id'] = sha512(token).hexdigest() -        token_doc[self._state.TOKENS_USER_ID_KEY] = uuid -        token_doc[self._state.TOKENS_TYPE_KEY] = \ -            self._state.TOKENS_TYPE_DEF -        dbname = self._state._tokens_dbname() -        db = self.couch_server.create(dbname) -        db.save(token_doc) -        self.addCleanup(self.delete_db, db.name) - -    def test_authorized_user(self): -        is_authorized = self.auth_middleware._verify_authentication_data -        self.assertTrue(is_authorized('valid-uuid', 'valid-token')) -        self.assertFalse(is_authorized('valid-uuid', 'invalid-token')) -        self.assertFalse(is_authorized('invalid-uuid', 'valid-token')) -        self.assertFalse(is_authorized('eve', 'invalid-token')) - - -class ServerAuthorizationTestCase(BaseSoledadTest): - -    """ -    Tests related to Soledad server authorization. -    """ - -    def setUp(self): -        pass - -    def tearDown(self): -        pass - -    def _make_environ(self, path_info, request_method): -        return { -            'PATH_INFO': path_info, -            'REQUEST_METHOD': request_method, -        } - -    def test_verify_action_with_correct_dbnames(self): -        """ -        Test encrypting and decrypting documents. - -        The following table lists the authorized actions among all possible -        u1db remote actions: - -            URL path                      | Authorized actions -            -------------------------------------------------- -            /                             | GET -            /shared-db                    | GET -            /shared-db/docs               | - -            /shared-db/doc/{id}           | GET, PUT, DELETE -            /shared-db/sync-from/{source} | - -            /user-db                      | GET, PUT, DELETE -            /user-db/docs                 | - -            /user-db/doc/{id}             | - -            /user-db/sync-from/{source}   | GET, PUT, POST -        """ -        uuid = uuid4().hex -        authmap = URLToAuthorization(uuid,) -        dbname = authmap._user_db_name -        # test global auth -        self.assertTrue( -            authmap.is_authorized(self._make_environ('/', 'GET'))) -        # test shared-db database resource auth -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/shared', 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared', 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared', 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared', 'POST'))) -        # test shared-db docs resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/docs', 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/docs', 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/docs', 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/docs', 'POST'))) -        # test shared-db doc resource auth -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/shared/doc/x', 'GET'))) -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/shared/doc/x', 'PUT'))) -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/shared/doc/x', 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/doc/x', 'POST'))) -        # test shared-db sync resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/sync-from/x', 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/sync-from/x', 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/sync-from/x', 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/shared/sync-from/x', 'POST'))) -        # test user-db database resource auth -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'GET'))) -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'PUT'))) -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'POST'))) -        # test user-db docs resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'POST'))) -        # test user-db doc resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'POST'))) -        # test user-db sync resource auth -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'GET'))) -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'DELETE'))) -        self.assertTrue( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'POST'))) - -    def test_verify_action_with_wrong_dbnames(self): -        """ -        Test if authorization fails for a wrong dbname. -        """ -        uuid = uuid4().hex -        authmap = URLToAuthorization(uuid) -        dbname = 'somedb' -        # test wrong-db database resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s' % dbname, 'POST'))) -        # test wrong-db docs resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/docs' % dbname, 'POST'))) -        # test wrong-db doc resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/doc/x' % dbname, 'POST'))) -        # test wrong-db sync resource auth -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'GET'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'PUT'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'DELETE'))) -        self.assertFalse( -            authmap.is_authorized( -                self._make_environ('/%s/sync-from/x' % dbname, 'POST')))  @pytest.mark.usefixtures("method_tmpdir") @@ -382,7 +135,7 @@ class EncryptedSyncTestCase(              user=user,              prefix='x',              auth_token='auth-token', -            secrets_path=sol1._secrets_path, +            secrets_path=sol1.secrets_path,              passphrase=passphrase)          # ensure remote db exists before syncing @@ -474,45 +227,3 @@ class EncryptedSyncTestCase(          Test if Soledad can sync many smallfiles.          """          return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100) - - -class ConfigurationParsingTest(unittest.TestCase): - -    def setUp(self): -        self.maxDiff = None - -    def test_use_defaults_on_failure(self): -        config = load_configuration('this file will never exist') -        expected = CONFIG_DEFAULTS -        self.assertEquals(expected, config) - -    def test_security_values_configuration(self): -        # given -        config_path = resource_filename('test_soledad', -                                        'fixture_soledad.conf') -        # when -        config = load_configuration(config_path) - -        # then -        expected = {'members': ['user1', 'user2'], -                    'members_roles': ['role1', 'role2'], -                    'admins': ['user3', 'user4'], -                    'admins_roles': ['role3', 'role3']} -        self.assertDictEqual(expected, config['database-security']) - -    def test_server_values_configuration(self): -        # given -        config_path = resource_filename('test_soledad', -                                        'fixture_soledad.conf') -        # when -        config = load_configuration(config_path) - -        # then -        expected = {'couch_url': -                    'http://soledad:passwd@localhost:5984', -                    'create_cmd': -                    'sudo -u soledad-admin /usr/bin/create-user-db', -                    'admin_netrc': -                    '/etc/couchdb/couchdb-soledad-admin.netrc', -                    'batching': False} -        self.assertDictEqual(expected, config['soledad-server']) diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py new file mode 100644 index 00000000..ebb94476 --- /dev/null +++ b/testing/tests/server/test_session.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# test_session.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server session entrypoint. +""" +from twisted.trial import unittest + +from twisted.cred import portal +from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse +from twisted.cred.credentials import IUsernamePassword +from twisted.web.resource import getChildForRequest +from twisted.web.static import Data +from twisted.web.test.requesthelper import DummyRequest +from twisted.web.test.test_httpauth import b64encode +from twisted.web.test.test_httpauth import Realm +from twisted.web._auth.wrapper import UnauthorizedResource + +from leap.soledad.server.session import SoledadSession + + +class SoledadSessionTestCase(unittest.TestCase): +    """ +    Tests adapted from for +    L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. +    """ + +    def makeRequest(self, *args, **kwargs): +        request = DummyRequest(*args, **kwargs) +        request.path = '/' +        return request + +    def setUp(self): +        self.username = b'foo bar' +        self.password = b'bar baz' +        self.avatarContent = b"contents of the avatar resource itself" +        self.childName = b"foo-child" +        self.childContent = b"contents of the foo child of the avatar" +        self.checker = InMemoryUsernamePasswordDatabaseDontUse() +        self.checker.addUser(self.username, self.password) +        self.avatar = Data(self.avatarContent, 'text/plain') +        self.avatar.putChild( +            self.childName, Data(self.childContent, 'text/plain')) +        self.avatars = {self.username: self.avatar} +        self.realm = Realm(self.avatars.get) +        self.portal = portal.Portal(self.realm, [self.checker]) +        self.wrapper = SoledadSession(self.portal) + +    def _authorizedTokenLogin(self, request): +        authorization = b64encode( +            self.username + b':' + self.password) +        request.requestHeaders.addRawHeader(b'authorization', +                                            b'Token ' + authorization) +        return getChildForRequest(self.wrapper, request) + +    def test_getChildWithDefault(self): +        request = self.makeRequest([self.childName]) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(result): +            self.assertEqual(request.responseCode, 401) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def _invalidAuthorizationTest(self, response): +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', response) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(result): +            self.assertEqual(request.responseCode, 401) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_getChildWithDefaultUnauthorizedUser(self): +        return self._invalidAuthorizationTest( +            b'Basic ' + b64encode(b'foo:bar')) + +    def test_getChildWithDefaultUnauthorizedPassword(self): +        return self._invalidAuthorizationTest( +            b'Basic ' + b64encode(self.username + b':bar')) + +    def test_getChildWithDefaultUnrecognizedScheme(self): +        return self._invalidAuthorizationTest(b'Quux foo bar baz') + +    def test_getChildWithDefaultAuthorized(self): +        request = self.makeRequest([self.childName]) +        child = self._authorizedTokenLogin(request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [self.childContent]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_renderAuthorized(self): +        # Request it exactly, not any of its children. +        request = self.makeRequest([]) +        child = self._authorizedTokenLogin(request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [self.avatarContent]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_decodeRaises(self): +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', +                                            b'Token decode should fail') +        child = getChildForRequest(self.wrapper, request) +        self.assertIsInstance(child, UnauthorizedResource) + +    def test_parseResponse(self): +        basicAuthorization = b'Basic abcdef123456' +        self.assertEqual( +            self.wrapper._parseHeader(basicAuthorization), +            None) +        tokenAuthorization = b'Token abcdef123456' +        self.assertEqual( +            self.wrapper._parseHeader(tokenAuthorization), +            b'abcdef123456') + +    def test_unexpectedDecodeError(self): + +        class UnexpectedException(Exception): +            pass + +        class BadFactory(object): +            scheme = b'bad' + +            def getChallenge(self, client): +                return {} + +            def decode(self, response, request): +                print "decode raised" +                raise UnexpectedException() + +        self.wrapper._credentialFactory = BadFactory() +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') +        child = getChildForRequest(self.wrapper, request) +        request.render(child) +        self.assertEqual(request.responseCode, 500) +        errors = self.flushLoggedErrors(UnexpectedException) +        self.assertEqual(len(errors), 1) + +    def test_unexpectedLoginError(self): +        class UnexpectedException(Exception): +            pass + +        class BrokenChecker(object): +            credentialInterfaces = (IUsernamePassword,) + +            def requestAvatarId(self, credentials): +                raise UnexpectedException() + +        self.portal.registerChecker(BrokenChecker()) +        request = self.makeRequest([self.childName]) +        child = self._authorizedTokenLogin(request) +        request.render(child) +        self.assertEqual(request.responseCode, 500) +        self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1) diff --git a/testing/tests/server/test_url_mapper.py b/testing/tests/server/test_url_mapper.py new file mode 100644 index 00000000..fa99cae7 --- /dev/null +++ b/testing/tests/server/test_url_mapper.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# test_url_mapper.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server-related functionality. +""" + +from twisted.trial import unittest +from uuid import uuid4 + +from leap.soledad.server.url_mapper import URLMapper + + +class URLMapperTestCase(unittest.TestCase): +    """ +    Test if the URLMapper behaves as expected. + +    The following table lists the authorized actions among all possible +    u1db remote actions: + +        URL path                      | Authorized actions +        -------------------------------------------------- +        /                             | GET +        /shared-db                    | GET +        /shared-db/docs               | - +        /shared-db/doc/{id}           | - +        /shared-db/sync-from/{source} | - +        /user-db                      | - +        /user-db/docs                 | - +        /user-db/doc/{id}             | - +        /user-db/sync-from/{source}   | GET, PUT, POST +    """ + +    def setUp(self): +        self._uuid = uuid4().hex +        self._urlmap = URLMapper() +        self._dbname = 'user-%s' % self._uuid + +    def test_root_authorized(self): +        match = self._urlmap.match('/', 'GET') +        self.assertIsNotNone(match) + +    def test_shared_authorized(self): +        self.assertIsNotNone(self._urlmap.match('/shared', 'GET')) + +    def test_shared_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared', 'PUT')) +        self.assertIsNone(self._urlmap.match('/shared', 'DELETE')) +        self.assertIsNone(self._urlmap.match('/shared', 'POST')) + +    def test_shared_docs_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared/docs', 'GET')) +        self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT')) +        self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE')) +        self.assertIsNone(self._urlmap.match('/shared/docs', 'POST')) + +    def test_shared_doc_authorized(self): +        match = self._urlmap.match('/shared/doc/x', 'GET') +        self.assertIsNotNone(match) +        self.assertEqual('x', match.get('id')) + +        match = self._urlmap.match('/shared/doc/x', 'PUT') +        self.assertIsNotNone(match) +        self.assertEqual('x', match.get('id')) + +        match = self._urlmap.match('/shared/doc/x', 'DELETE') +        self.assertIsNotNone(match) +        self.assertEqual('x', match.get('id')) + +    def test_shared_doc_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST')) + +    def test_shared_sync_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET')) +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT')) +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE')) +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST')) + +    def test_user_db_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET')) +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT')) +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE')) +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST')) + +    def test_user_db_docs_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET')) +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT')) +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE')) +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST')) + +    def test_user_db_doc_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET')) +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT')) +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE')) +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST')) + +    def test_user_db_sync_authorized(self): +        uuid = self._uuid +        dbname = self._dbname +        match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET') +        self.assertEqual(uuid, match.get('uuid')) +        self.assertEqual('x', match.get('source_replica_uid')) + +        match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT') +        self.assertEqual(uuid, match.get('uuid')) +        self.assertEqual('x', match.get('source_replica_uid')) + +        match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST') +        self.assertEqual(uuid, match.get('uuid')) +        self.assertEqual('x', match.get('source_replica_uid')) + +    def test_user_db_sync_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone( +            self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE')) | 
