summaryrefslogtreecommitdiff
path: root/testing/tests/server
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/server')
-rw-r--r--testing/tests/server/test__resource.py66
-rw-r--r--testing/tests/server/test__server_info.py43
-rw-r--r--testing/tests/server/test_auth.py104
-rw-r--r--testing/tests/server/test_config.py69
-rw-r--r--testing/tests/server/test_server.py291
-rw-r--r--testing/tests/server/test_session.py186
-rw-r--r--testing/tests/server/test_url_mapper.py131
7 files changed, 600 insertions, 290 deletions
diff --git a/testing/tests/server/test__resource.py b/testing/tests/server/test__resource.py
new file mode 100644
index 00000000..c066435e
--- /dev/null
+++ b/testing/tests/server/test__resource.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+# test__resource.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for Soledad server main resource.
+"""
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+from twisted.web.wsgi import WSGIResource
+from twisted.web.resource import getChildForRequest
+from twisted.internet import reactor
+
+from leap.soledad.server._resource import SoledadResource
+from leap.soledad.server._server_info import ServerInfo
+from leap.soledad.server._blobs import BlobsResource
+from leap.soledad.server.gzip_middleware import GzipMiddleware
+
+
+_pool = reactor.getThreadPool()
+
+
+class SoledadResourceTestCase(unittest.TestCase):
+
+ def test_get_root(self):
+ enable_blobs = None # doesn't matter
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest([''])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, ServerInfo)
+
+ def test_get_blobs_enabled(self):
+ enable_blobs = True
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest(['blobs'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, BlobsResource)
+
+ def test_get_blobs_disabled(self):
+ enable_blobs = False
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest(['blobs'])
+ child = getChildForRequest(resource, request)
+ # if blobs is disabled, the request should be routed to sync
+ self.assertIsInstance(child, WSGIResource)
+ self.assertIsInstance(child._application, GzipMiddleware)
+
+ def test_get_sync(self):
+ enable_blobs = None # doesn't matter
+ resource = SoledadResource(enable_blobs=enable_blobs, sync_pool=_pool)
+ request = DummyRequest(['user-db', 'sync-from', 'source-id'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, WSGIResource)
+ self.assertIsInstance(child._application, GzipMiddleware)
diff --git a/testing/tests/server/test__server_info.py b/testing/tests/server/test__server_info.py
new file mode 100644
index 00000000..40567ef1
--- /dev/null
+++ b/testing/tests/server/test__server_info.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# test__server_info.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for Soledad server information announcement.
+"""
+import json
+
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+
+from leap.soledad.server._server_info import ServerInfo
+
+
+class ServerInfoTestCase(unittest.TestCase):
+
+ def test_blobs_enabled(self):
+ resource = ServerInfo(True)
+ response = resource.render(DummyRequest(['']))
+ _info = json.loads(response)
+ self.assertEquals(_info['blobs'], True)
+ self.assertTrue(isinstance(_info['version'], basestring))
+
+ def test_blobs_disabled(self):
+ resource = ServerInfo(False)
+ response = resource.render(DummyRequest(['']))
+ _info = json.loads(response)
+ self.assertEquals(_info['blobs'], False)
+ self.assertTrue(isinstance(_info['version'], basestring))
diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py
new file mode 100644
index 00000000..6eb647ee
--- /dev/null
+++ b/testing/tests/server/test_auth.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# test_auth.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for auth pieces.
+"""
+import collections
+
+from contextlib import contextmanager
+
+from twisted.cred.credentials import UsernamePassword
+from twisted.cred.error import UnauthorizedLogin
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial import unittest
+from twisted.web.resource import IResource
+from twisted.web.test import test_httpauth
+
+import leap.soledad.server.auth as auth_module
+from leap.soledad.server.auth import SoledadRealm
+from leap.soledad.server.auth import TokenChecker
+from leap.soledad.server.auth import TokenCredentialFactory
+from leap.soledad.server._resource import SoledadResource
+
+
+class SoledadRealmTestCase(unittest.TestCase):
+
+ def test_returned_resource(self):
+ # we have to pass a pool to the realm , otherwise tests will hang
+ conf = {'blobs': False}
+ pool = reactor.getThreadPool()
+ realm = SoledadRealm(conf=conf, sync_pool=pool)
+ iface, avatar, logout = realm.requestAvatar('any', None, IResource)
+ self.assertIsInstance(avatar, SoledadResource)
+ self.assertIsNone(logout())
+
+
+class DummyServer(object):
+ """
+ I fake the `couchdb.client.Server` GET api and always return the token
+ given on my creation.
+ """
+
+ def __init__(self, token):
+ self._token = token
+
+ def get(self, _):
+ return self._token
+
+
+@contextmanager
+def dummy_server(token):
+ yield collections.defaultdict(lambda: DummyServer(token))
+
+
+class TokenCheckerTestCase(unittest.TestCase):
+
+ @inlineCallbacks
+ def test_good_creds(self):
+ # set up a dummy server which always return a *valid* token document
+ token = {'user_id': 'user', 'type': 'Token'}
+ server = dummy_server(token)
+ # setup the checker with the custom server
+ checker = TokenChecker()
+ auth_module.couch_server = lambda url: server
+ # assert the checker *can* verify the creds
+ creds = UsernamePassword('user', 'pass')
+ avatarId = yield checker.requestAvatarId(creds)
+ self.assertEqual('user', avatarId)
+
+ @inlineCallbacks
+ def test_bad_creds(self):
+ # set up a dummy server which always return an *invalid* token document
+ token = None
+ server = dummy_server(token)
+ # setup the checker with the custom server
+ checker = TokenChecker()
+ auth_module.couch_server = lambda url: server
+ # assert the checker *cannot* verify the creds
+ creds = UsernamePassword('user', '')
+ with self.assertRaises(UnauthorizedLogin):
+ yield checker.requestAvatarId(creds)
+
+
+class TokenCredentialFactoryTestcase(
+ test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
+ unittest.TestCase):
+
+ def setUp(self):
+ test_httpauth.BasicAuthTestsMixin.setUp(self)
+ self.credentialFactory = TokenCredentialFactory()
diff --git a/testing/tests/server/test_config.py b/testing/tests/server/test_config.py
new file mode 100644
index 00000000..d2a8a9de
--- /dev/null
+++ b/testing/tests/server/test_config.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# test_config.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server configuration.
+"""
+
+from twisted.trial import unittest
+from pkg_resources import resource_filename
+
+from leap.soledad.server._config import _load_config
+from leap.soledad.server._config import CONFIG_DEFAULTS
+
+
+class ConfigurationParsingTest(unittest.TestCase):
+
+ def setUp(self):
+ self.maxDiff = None
+
+ def test_use_defaults_on_failure(self):
+ config = _load_config('this file will never exist')
+ expected = CONFIG_DEFAULTS
+ self.assertEquals(expected, config)
+
+ def test_security_values_configuration(self):
+ # given
+ config_path = resource_filename('test_soledad',
+ 'fixture_soledad.conf')
+ # when
+ config = _load_config(config_path)
+
+ # then
+ expected = {'members': ['user1', 'user2'],
+ 'members_roles': ['role1', 'role2'],
+ 'admins': ['user3', 'user4'],
+ 'admins_roles': ['role3', 'role3']}
+ self.assertDictEqual(expected, config['database-security'])
+
+ def test_server_values_configuration(self):
+ # given
+ config_path = resource_filename('test_soledad',
+ 'fixture_soledad.conf')
+ # when
+ config = _load_config(config_path)
+
+ # then
+ expected = {'couch_url':
+ 'http://soledad:passwd@localhost:5984',
+ 'create_cmd':
+ 'sudo -u soledad-admin /usr/bin/create-user-db',
+ 'admin_netrc':
+ '/etc/couchdb/couchdb-soledad-admin.netrc',
+ 'batching': False,
+ 'blobs': False,
+ 'blobs_path': '/srv/leap/soledad/blobs'}
+ self.assertDictEqual(expected, config['soledad-server'])
diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py
index 6710caaf..4a5ec43f 100644
--- a/testing/tests/server/test_server.py
+++ b/testing/tests/server/test_server.py
@@ -18,17 +18,13 @@
Tests for server-related functionality.
"""
import binascii
-import mock
import os
import pytest
-from hashlib import sha512
-from pkg_resources import resource_filename
from urlparse import urljoin
from uuid import uuid4
from twisted.internet import defer
-from twisted.trial import unittest
from leap.soledad.common.couch.state import CouchServerState
from leap.soledad.common.couch import CouchDatabase
@@ -38,253 +34,10 @@ from test_soledad.util import (
make_token_soledad_app,
make_soledad_document_for_test,
soledad_sync_target,
- BaseSoledadTest,
)
from leap.soledad.client import _crypto
from leap.soledad.client import Soledad
-from leap.soledad.server.config import load_configuration
-from leap.soledad.server.config import CONFIG_DEFAULTS
-from leap.soledad.server.auth import URLToAuthorization
-from leap.soledad.server.auth import SoledadTokenAuthMiddleware
-
-
-class ServerAuthenticationMiddlewareTestCase(CouchDBTestCase):
-
- def setUp(self):
- super(ServerAuthenticationMiddlewareTestCase, self).setUp()
- app = mock.Mock()
- self._state = CouchServerState(self.couch_url)
- app.state = self._state
- self.auth_middleware = SoledadTokenAuthMiddleware(app)
- self._authorize('valid-uuid', 'valid-token')
-
- def _authorize(self, uuid, token):
- token_doc = {}
- token_doc['_id'] = sha512(token).hexdigest()
- token_doc[self._state.TOKENS_USER_ID_KEY] = uuid
- token_doc[self._state.TOKENS_TYPE_KEY] = \
- self._state.TOKENS_TYPE_DEF
- dbname = self._state._tokens_dbname()
- db = self.couch_server.create(dbname)
- db.save(token_doc)
- self.addCleanup(self.delete_db, db.name)
-
- def test_authorized_user(self):
- is_authorized = self.auth_middleware._verify_authentication_data
- self.assertTrue(is_authorized('valid-uuid', 'valid-token'))
- self.assertFalse(is_authorized('valid-uuid', 'invalid-token'))
- self.assertFalse(is_authorized('invalid-uuid', 'valid-token'))
- self.assertFalse(is_authorized('eve', 'invalid-token'))
-
-
-class ServerAuthorizationTestCase(BaseSoledadTest):
-
- """
- Tests related to Soledad server authorization.
- """
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def _make_environ(self, path_info, request_method):
- return {
- 'PATH_INFO': path_info,
- 'REQUEST_METHOD': request_method,
- }
-
- def test_verify_action_with_correct_dbnames(self):
- """
- Test encrypting and decrypting documents.
-
- The following table lists the authorized actions among all possible
- u1db remote actions:
-
- URL path | Authorized actions
- --------------------------------------------------
- / | GET
- /shared-db | GET
- /shared-db/docs | -
- /shared-db/doc/{id} | GET, PUT, DELETE
- /shared-db/sync-from/{source} | -
- /user-db | GET, PUT, DELETE
- /user-db/docs | -
- /user-db/doc/{id} | -
- /user-db/sync-from/{source} | GET, PUT, POST
- """
- uuid = uuid4().hex
- authmap = URLToAuthorization(uuid,)
- dbname = authmap._user_db_name
- # test global auth
- self.assertTrue(
- authmap.is_authorized(self._make_environ('/', 'GET')))
- # test shared-db database resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared', 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared', 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared', 'POST')))
- # test shared-db docs resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/docs', 'POST')))
- # test shared-db doc resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'GET')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'PUT')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/doc/x', 'POST')))
- # test shared-db sync resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/shared/sync-from/x', 'POST')))
- # test user-db database resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'GET')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'PUT')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'POST')))
- # test user-db docs resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'POST')))
- # test user-db doc resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'POST')))
- # test user-db sync resource auth
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'GET')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'DELETE')))
- self.assertTrue(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'POST')))
-
- def test_verify_action_with_wrong_dbnames(self):
- """
- Test if authorization fails for a wrong dbname.
- """
- uuid = uuid4().hex
- authmap = URLToAuthorization(uuid)
- dbname = 'somedb'
- # test wrong-db database resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s' % dbname, 'POST')))
- # test wrong-db docs resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/docs' % dbname, 'POST')))
- # test wrong-db doc resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/doc/x' % dbname, 'POST')))
- # test wrong-db sync resource auth
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'GET')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'PUT')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'DELETE')))
- self.assertFalse(
- authmap.is_authorized(
- self._make_environ('/%s/sync-from/x' % dbname, 'POST')))
@pytest.mark.usefixtures("method_tmpdir")
@@ -382,7 +135,7 @@ class EncryptedSyncTestCase(
user=user,
prefix='x',
auth_token='auth-token',
- secrets_path=sol1._secrets_path,
+ secrets_path=sol1.secrets_path,
passphrase=passphrase)
# ensure remote db exists before syncing
@@ -474,45 +227,3 @@ class EncryptedSyncTestCase(
Test if Soledad can sync many smallfiles.
"""
return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100)
-
-
-class ConfigurationParsingTest(unittest.TestCase):
-
- def setUp(self):
- self.maxDiff = None
-
- def test_use_defaults_on_failure(self):
- config = load_configuration('this file will never exist')
- expected = CONFIG_DEFAULTS
- self.assertEquals(expected, config)
-
- def test_security_values_configuration(self):
- # given
- config_path = resource_filename('test_soledad',
- 'fixture_soledad.conf')
- # when
- config = load_configuration(config_path)
-
- # then
- expected = {'members': ['user1', 'user2'],
- 'members_roles': ['role1', 'role2'],
- 'admins': ['user3', 'user4'],
- 'admins_roles': ['role3', 'role3']}
- self.assertDictEqual(expected, config['database-security'])
-
- def test_server_values_configuration(self):
- # given
- config_path = resource_filename('test_soledad',
- 'fixture_soledad.conf')
- # when
- config = load_configuration(config_path)
-
- # then
- expected = {'couch_url':
- 'http://soledad:passwd@localhost:5984',
- 'create_cmd':
- 'sudo -u soledad-admin /usr/bin/create-user-db',
- 'admin_netrc':
- '/etc/couchdb/couchdb-soledad-admin.netrc',
- 'batching': False}
- self.assertDictEqual(expected, config['soledad-server'])
diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py
new file mode 100644
index 00000000..ebb94476
--- /dev/null
+++ b/testing/tests/server/test_session.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+# test_session.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server session entrypoint.
+"""
+from twisted.trial import unittest
+
+from twisted.cred import portal
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred.credentials import IUsernamePassword
+from twisted.web.resource import getChildForRequest
+from twisted.web.static import Data
+from twisted.web.test.requesthelper import DummyRequest
+from twisted.web.test.test_httpauth import b64encode
+from twisted.web.test.test_httpauth import Realm
+from twisted.web._auth.wrapper import UnauthorizedResource
+
+from leap.soledad.server.session import SoledadSession
+
+
+class SoledadSessionTestCase(unittest.TestCase):
+ """
+ Tests adapted from for
+ L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}.
+ """
+
+ def makeRequest(self, *args, **kwargs):
+ request = DummyRequest(*args, **kwargs)
+ request.path = '/'
+ return request
+
+ def setUp(self):
+ self.username = b'foo bar'
+ self.password = b'bar baz'
+ self.avatarContent = b"contents of the avatar resource itself"
+ self.childName = b"foo-child"
+ self.childContent = b"contents of the foo child of the avatar"
+ self.checker = InMemoryUsernamePasswordDatabaseDontUse()
+ self.checker.addUser(self.username, self.password)
+ self.avatar = Data(self.avatarContent, 'text/plain')
+ self.avatar.putChild(
+ self.childName, Data(self.childContent, 'text/plain'))
+ self.avatars = {self.username: self.avatar}
+ self.realm = Realm(self.avatars.get)
+ self.portal = portal.Portal(self.realm, [self.checker])
+ self.wrapper = SoledadSession(self.portal)
+
+ def _authorizedTokenLogin(self, request):
+ authorization = b64encode(
+ self.username + b':' + self.password)
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token ' + authorization)
+ return getChildForRequest(self.wrapper, request)
+
+ def test_getChildWithDefault(self):
+ request = self.makeRequest([self.childName])
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def _invalidAuthorizationTest(self, response):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', response)
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_getChildWithDefaultUnauthorizedUser(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(b'foo:bar'))
+
+ def test_getChildWithDefaultUnauthorizedPassword(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(self.username + b':bar'))
+
+ def test_getChildWithDefaultUnrecognizedScheme(self):
+ return self._invalidAuthorizationTest(b'Quux foo bar baz')
+
+ def test_getChildWithDefaultAuthorized(self):
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.childContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_renderAuthorized(self):
+ # Request it exactly, not any of its children.
+ request = self.makeRequest([])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.avatarContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_decodeRaises(self):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token decode should fail')
+ child = getChildForRequest(self.wrapper, request)
+ self.assertIsInstance(child, UnauthorizedResource)
+
+ def test_parseResponse(self):
+ basicAuthorization = b'Basic abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(basicAuthorization),
+ None)
+ tokenAuthorization = b'Token abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(tokenAuthorization),
+ b'abcdef123456')
+
+ def test_unexpectedDecodeError(self):
+
+ class UnexpectedException(Exception):
+ pass
+
+ class BadFactory(object):
+ scheme = b'bad'
+
+ def getChallenge(self, client):
+ return {}
+
+ def decode(self, response, request):
+ print "decode raised"
+ raise UnexpectedException()
+
+ self.wrapper._credentialFactory = BadFactory()
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', b'Bad abc')
+ child = getChildForRequest(self.wrapper, request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+ errors = self.flushLoggedErrors(UnexpectedException)
+ self.assertEqual(len(errors), 1)
+
+ def test_unexpectedLoginError(self):
+ class UnexpectedException(Exception):
+ pass
+
+ class BrokenChecker(object):
+ credentialInterfaces = (IUsernamePassword,)
+
+ def requestAvatarId(self, credentials):
+ raise UnexpectedException()
+
+ self.portal.registerChecker(BrokenChecker())
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+ self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
diff --git a/testing/tests/server/test_url_mapper.py b/testing/tests/server/test_url_mapper.py
new file mode 100644
index 00000000..fa99cae7
--- /dev/null
+++ b/testing/tests/server/test_url_mapper.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# test_url_mapper.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server-related functionality.
+"""
+
+from twisted.trial import unittest
+from uuid import uuid4
+
+from leap.soledad.server.url_mapper import URLMapper
+
+
+class URLMapperTestCase(unittest.TestCase):
+ """
+ Test if the URLMapper behaves as expected.
+
+ The following table lists the authorized actions among all possible
+ u1db remote actions:
+
+ URL path | Authorized actions
+ --------------------------------------------------
+ / | GET
+ /shared-db | GET
+ /shared-db/docs | -
+ /shared-db/doc/{id} | -
+ /shared-db/sync-from/{source} | -
+ /user-db | -
+ /user-db/docs | -
+ /user-db/doc/{id} | -
+ /user-db/sync-from/{source} | GET, PUT, POST
+ """
+
+ def setUp(self):
+ self._uuid = uuid4().hex
+ self._urlmap = URLMapper()
+ self._dbname = 'user-%s' % self._uuid
+
+ def test_root_authorized(self):
+ match = self._urlmap.match('/', 'GET')
+ self.assertIsNotNone(match)
+
+ def test_shared_authorized(self):
+ self.assertIsNotNone(self._urlmap.match('/shared', 'GET'))
+
+ def test_shared_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared', 'POST'))
+
+ def test_shared_docs_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'GET'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'POST'))
+
+ def test_shared_doc_authorized(self):
+ match = self._urlmap.match('/shared/doc/x', 'GET')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ match = self._urlmap.match('/shared/doc/x', 'PUT')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ match = self._urlmap.match('/shared/doc/x', 'DELETE')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ def test_shared_doc_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST'))
+
+ def test_shared_sync_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST'))
+
+ def test_user_db_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST'))
+
+ def test_user_db_docs_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST'))
+
+ def test_user_db_doc_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST'))
+
+ def test_user_db_sync_authorized(self):
+ uuid = self._uuid
+ dbname = self._dbname
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ def test_user_db_sync_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(
+ self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE'))