From cfff46ff9becdbe5cf48816870e625ed253ecc57 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 17 Sep 2017 12:08:25 -0300 Subject: [refactor] move tests to root of repository Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952 --- tests/server/__init__.py | 0 tests/server/test__resource.py | 85 ++++++++ tests/server/test__server_info.py | 43 ++++ tests/server/test_auth.py | 138 ++++++++++++ tests/server/test_blobs_resource_validation.py | 63 ++++++ tests/server/test_blobs_server.py | 286 +++++++++++++++++++++++++ tests/server/test_config.py | 70 ++++++ tests/server/test_incoming_flow_integration.py | 97 +++++++++ tests/server/test_incoming_resource.py | 57 +++++ tests/server/test_incoming_server.py | 92 ++++++++ tests/server/test_server.py | 230 ++++++++++++++++++++ tests/server/test_session.py | 195 +++++++++++++++++ tests/server/test_shared_db.py | 69 ++++++ tests/server/test_soledad | 1 + tests/server/test_tac.py | 87 ++++++++ tests/server/test_url_mapper.py | 133 ++++++++++++ 16 files changed, 1646 insertions(+) create mode 100644 tests/server/__init__.py create mode 100644 tests/server/test__resource.py create mode 100644 tests/server/test__server_info.py create mode 100644 tests/server/test_auth.py create mode 100644 tests/server/test_blobs_resource_validation.py create mode 100644 tests/server/test_blobs_server.py create mode 100644 tests/server/test_config.py create mode 100644 tests/server/test_incoming_flow_integration.py create mode 100644 tests/server/test_incoming_resource.py create mode 100644 tests/server/test_incoming_server.py create mode 100644 tests/server/test_server.py create mode 100644 tests/server/test_session.py create mode 100644 tests/server/test_shared_db.py create mode 120000 tests/server/test_soledad create mode 100644 tests/server/test_tac.py create mode 100644 tests/server/test_url_mapper.py (limited to 'tests/server') diff --git a/tests/server/__init__.py b/tests/server/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/server/test__resource.py b/tests/server/test__resource.py new file mode 100644 index 00000000..a43ac19f --- /dev/null +++ b/tests/server/test__resource.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# test__resource.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for Soledad server main resource. +""" +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from twisted.web.wsgi import WSGIResource +from twisted.web.resource import getChildForRequest +from twisted.internet import reactor + +from leap.soledad.server._resource import PublicResource +from leap.soledad.server._resource import LocalResource +from leap.soledad.server._server_info import ServerInfo +from leap.soledad.server._blobs import BlobsResource +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server.gzip_middleware import GzipMiddleware + + +_pool = reactor.getThreadPool() + + +class PublicResourceTestCase(unittest.TestCase): + + def test_get_root(self): + blobs_resource = None # doesn't matter + resource = PublicResource( + blobs_resource=blobs_resource, sync_pool=_pool) + request = DummyRequest(['']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, ServerInfo) + + def test_get_blobs_enabled(self): + blobs_resource = BlobsResource("filesystem", '/tmp') + resource = PublicResource( + blobs_resource=blobs_resource, sync_pool=_pool) + request = DummyRequest(['blobs']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, BlobsResource) + + def test_get_blobs_disabled(self): + blobs_resource = None + resource = PublicResource( + blobs_resource=blobs_resource, sync_pool=_pool) + request = DummyRequest(['blobs']) + child = getChildForRequest(resource, request) + # if blobs is disabled, the request should be routed to sync + self.assertIsInstance(child, WSGIResource) + self.assertIsInstance(child._application, GzipMiddleware) + + def test_get_sync(self): + blobs_resource = None # doesn't matter + resource = PublicResource( + blobs_resource=blobs_resource, sync_pool=_pool) + request = DummyRequest(['user-db', 'sync-from', 'source-id']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, WSGIResource) + self.assertIsInstance(child._application, GzipMiddleware) + + def test_no_incoming_on_public_resource(self): + resource = PublicResource(None, sync_pool=_pool) + request = DummyRequest(['incoming']) + child = getChildForRequest(resource, request) + # WSGIResource is returned if a path is unknown + self.assertIsInstance(child, WSGIResource) + + def test_get_incoming(self): + resource = LocalResource() + request = DummyRequest(['incoming']) + child = getChildForRequest(resource, request) + self.assertIsInstance(child, IncomingResource) diff --git a/tests/server/test__server_info.py b/tests/server/test__server_info.py new file mode 100644 index 00000000..40567ef1 --- /dev/null +++ b/tests/server/test__server_info.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# test__server_info.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Tests for Soledad server information announcement. +""" +import json + +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest + +from leap.soledad.server._server_info import ServerInfo + + +class ServerInfoTestCase(unittest.TestCase): + + def test_blobs_enabled(self): + resource = ServerInfo(True) + response = resource.render(DummyRequest([''])) + _info = json.loads(response) + self.assertEquals(_info['blobs'], True) + self.assertTrue(isinstance(_info['version'], basestring)) + + def test_blobs_disabled(self): + resource = ServerInfo(False) + response = resource.render(DummyRequest([''])) + _info = json.loads(response) + self.assertEquals(_info['blobs'], False) + self.assertTrue(isinstance(_info['version'], basestring)) diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py new file mode 100644 index 00000000..78cf20ab --- /dev/null +++ b/tests/server/test_auth.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# test_auth.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for auth pieces. +""" +import os +import collections +import pytest + +from contextlib import contextmanager + +from twisted.cred.credentials import UsernamePassword +from twisted.cred.error import UnauthorizedLogin +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks +from twisted.trial import unittest +from twisted.web.resource import IResource +from twisted.web.test import test_httpauth + +import leap.soledad.server.auth as auth_module +from leap.soledad.server.auth import SoledadRealm +from leap.soledad.server.auth import CouchDBTokenChecker +from leap.soledad.server.auth import FileTokenChecker +from leap.soledad.server.auth import TokenCredentialFactory +from leap.soledad.server._resource import PublicResource + + +class SoledadRealmTestCase(unittest.TestCase): + + def test_returned_resource(self): + # we have to pass a pool to the realm , otherwise tests will hang + conf = {'blobs': False} + pool = reactor.getThreadPool() + realm = SoledadRealm(conf=conf, sync_pool=pool) + iface, avatar, logout = realm.requestAvatar('any', None, IResource) + self.assertIsInstance(avatar, PublicResource) + self.assertIsNone(logout()) + + +class DummyServer(object): + """ + I fake the `couchdb.client.Server` GET api and always return the token + given on my creation. + """ + + def __init__(self, token): + self._token = token + + def get(self, _): + return self._token + + +@contextmanager +def dummy_server(token): + yield collections.defaultdict(lambda: DummyServer(token)) + + +class CouchDBTokenCheckerTestCase(unittest.TestCase): + + @inlineCallbacks + def test_good_creds(self): + # set up a dummy server which always return a *valid* token document + token = {'user_id': 'user', 'type': 'Token'} + server = dummy_server(token) + # setup the checker with the custom server + checker = CouchDBTokenChecker() + auth_module.couch_server = lambda url: server + # assert the checker *can* verify the creds + creds = UsernamePassword('user', 'pass') + avatarId = yield checker.requestAvatarId(creds) + self.assertEqual('user', avatarId) + + @inlineCallbacks + def test_bad_creds(self): + # set up a dummy server which always return an *invalid* token document + token = None + server = dummy_server(token) + # setup the checker with the custom server + checker = CouchDBTokenChecker() + auth_module.couch_server = lambda url: server + # assert the checker *cannot* verify the creds + creds = UsernamePassword('user', '') + with self.assertRaises(UnauthorizedLogin): + yield checker.requestAvatarId(creds) + + +class FileTokenCheckerTestCase(unittest.TestCase): + + @inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_good_creds(self): + auth_file_path = os.path.join(self.tempdir, 'auth.file') + with open(auth_file_path, 'w') as tempfile: + tempfile.write('goodservice:goodtoken') + # setup the checker with the auth tokens file + conf = {'services_tokens_file': auth_file_path} + checker = FileTokenChecker(conf) + # assert the checker *can* verify the creds + creds = UsernamePassword('goodservice', 'goodtoken') + avatarId = yield checker.requestAvatarId(creds) + self.assertEqual('goodservice', avatarId) + + @inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_bad_creds(self): + auth_file_path = os.path.join(self.tempdir, 'auth.file') + with open(auth_file_path, 'w') as tempfile: + tempfile.write('service:token') + # setup the checker with the auth tokens file + conf = {'services_tokens_file': auth_file_path} + checker = FileTokenChecker(conf) + # assert the checker *cannot* verify the creds + creds = UsernamePassword('service', 'wrongtoken') + with self.assertRaises(UnauthorizedLogin): + yield checker.requestAvatarId(creds) + + +class TokenCredentialFactoryTestcase( + test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin, + unittest.TestCase): + + def setUp(self): + test_httpauth.BasicAuthTestsMixin.setUp(self) + self.credentialFactory = TokenCredentialFactory() diff --git a/tests/server/test_blobs_resource_validation.py b/tests/server/test_blobs_resource_validation.py new file mode 100644 index 00000000..9f6dfc2f --- /dev/null +++ b/tests/server/test_blobs_resource_validation.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# test_blobs_resource_validation.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for invalid user or blob_id on blobs resource +""" +import pytest +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from leap.soledad.server import _blobs as server_blobs + + +class BlobServerTestCase(unittest.TestCase): + + @pytest.mark.usefixtures("method_tmpdir") + def setUp(self): + self.resource = server_blobs.BlobsResource("filesystem", self.tempdir) + + @pytest.mark.usefixtures("method_tmpdir") + def test_valid_arguments(self): + request = DummyRequest(['v4l1d-us3r', 'v4l1d-bl0b-1d']) + self.assertTrue(self.resource._validate(request)) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_user_get(self): + request = DummyRequest(['invalid user', 'valid-blob-id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_GET(request) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_user_put(self): + request = DummyRequest(['invalid user', 'valid-blob-id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_PUT(request) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_blob_id_get(self): + request = DummyRequest(['valid-user', 'invalid blob id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_GET(request) + + @pytest.mark.usefixtures("method_tmpdir") + def test_invalid_blob_id_put(self): + request = DummyRequest(['valid-user', 'invalid blob id']) + request.path = '/blobs/' + with pytest.raises(Exception): + self.resource.render_PUT(request) diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py new file mode 100644 index 00000000..9eddf108 --- /dev/null +++ b/tests/server/test_blobs_server.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +# test_blobs_server.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Integration tests for blobs server +""" +import os +import pytest +from uuid import uuid4 +from io import BytesIO +from twisted.trial import unittest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +from treq._utils import set_global_pool + +from leap.soledad.common.blobs import Flags +from leap.soledad.server import _blobs as server_blobs +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.client._db.blobs import BlobAlreadyExistsError +from leap.soledad.client._db.blobs import InvalidFlagsError +from leap.soledad.client._db.blobs import SoledadError + + +class BlobServerTestCase(unittest.TestCase): + + def setUp(self): + root = server_blobs.BlobsResource("filesystem", self.tempdir) + site = Site(root) + self.port = reactor.listenTCP(0, site, interface='127.0.0.1') + self.host = self.port.getHost() + self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) + self.secret = 'A' * 96 + set_global_pool(None) + + def tearDown(self): + self.port.stopListening() + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_upload_download(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("save me") + yield manager._encrypt_and_upload('blob_id', fd) + blob, size = yield manager._download_and_decrypt('blob_id') + self.assertEquals(blob.getvalue(), "save me") + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_set_get_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + flags = yield manager.get_flags('blob_id') + self.assertEquals([Flags.PROCESSING], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_set_flags_raises_if_no_blob_found(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + with pytest.raises(SoledadError): + yield manager.set_flags('missing_id', [Flags.PENDING]) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_list_filter_flag(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + blobs_list = yield manager.remote_list(filter_flag=Flags.PENDING) + self.assertEquals([], blobs_list) + blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING) + self.assertEquals(['blob_id'], blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_list_filter_flag_order_by_date(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + yield manager._encrypt_and_upload('blob_id1', BytesIO("x")) + yield manager._encrypt_and_upload('blob_id2', BytesIO("x")) + yield manager._encrypt_and_upload('blob_id3', BytesIO("x")) + yield manager.set_flags('blob_id1', [Flags.PROCESSING]) + yield manager.set_flags('blob_id2', [Flags.PROCESSING]) + yield manager.set_flags('blob_id3', [Flags.PROCESSING]) + blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING, + order_by='+date') + expected_list = ['blob_id1', 'blob_id2', 'blob_id3'] + self.assertEquals(expected_list, blobs_list) + blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING, + order_by='-date') + self.assertEquals(list(reversed(expected_list)), blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_cant_set_invalid_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + with pytest.raises(InvalidFlagsError): + yield manager.set_flags('blob_id', ['invalid']) + flags = yield manager.get_flags('blob_id') + self.assertEquals([], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_empty_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + flags = yield manager.get_flags('blob_id') + self.assertEquals([], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_flags_ignored_by_listing(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + blobs_list = yield manager.remote_list() + self.assertEquals(['blob_id'], blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_upload_changes_remote_list(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) + yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) + blobs_list = yield manager.remote_list() + self.assertEquals(set(['blob_id1', 'blob_id2']), set(blobs_list)) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_list_orders_by_date(self): + user_uid = uuid4().hex + manager = BlobManager('', self.uri, self.secret, + self.secret, user_uid) + yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) + yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) + blobs_list = yield manager.remote_list(order_by='date') + self.assertEquals(['blob_id1', 'blob_id2'], blobs_list) + parts = [user_uid, 'default', 'b', 'blo', 'blob_i', 'blob_id1'] + self.__touch(self.tempdir, *parts) + blobs_list = yield manager.remote_list(order_by='+date') + self.assertEquals(['blob_id2', 'blob_id1'], blobs_list) + blobs_list = yield manager.remote_list(order_by='-date') + self.assertEquals(['blob_id1', 'blob_id2'], blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_count(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + deferreds = [] + for i in range(10): + deferreds.append(manager._encrypt_and_upload(str(i), BytesIO("1"))) + yield defer.gatherResults(deferreds) + + result = yield manager.count() + self.assertEquals({"count": len(deferreds)}, result) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_list_restricted_by_namespace(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + namespace = 'incoming' + yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), + namespace=namespace) + yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) + blobs_list = yield manager.remote_list(namespace=namespace) + self.assertEquals(['blob_id1'], blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_list_default_doesnt_list_other_namespaces(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + namespace = 'incoming' + yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), + namespace=namespace) + yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) + blobs_list = yield manager.remote_list() + self.assertEquals(['blob_id2'], blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_download_from_namespace(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + namespace, blob_id, content = 'incoming', 'blob_id1', 'test' + yield manager._encrypt_and_upload(blob_id, BytesIO(content), + namespace=namespace) + got_blob = yield manager._download_and_decrypt(blob_id, namespace) + self.assertEquals(content, got_blob[0].getvalue()) + + def __touch(self, *args): + path = os.path.join(*args) + with open(path, 'a'): + os.utime(path, None) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_upload_deny_duplicates(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + fd = BytesIO("save me") + yield manager._encrypt_and_upload('blob_id', fd) + fd = BytesIO("save me") + with pytest.raises(BlobAlreadyExistsError): + yield manager._encrypt_and_upload('blob_id', fd) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_send_missing(self): + manager = BlobManager(self.tempdir, self.uri, self.secret, + self.secret, uuid4().hex) + self.addCleanup(manager.close) + blob_id = 'local_only_blob_id' + yield manager.local.put(blob_id, BytesIO("X"), size=1) + yield manager.send_missing() + result = yield manager._download_and_decrypt(blob_id) + self.assertIsNotNone(result) + self.assertEquals(result[0].getvalue(), "X") + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_fetch_missing(self): + manager = BlobManager(self.tempdir, self.uri, self.secret, + self.secret, uuid4().hex) + self.addCleanup(manager.close) + blob_id = 'remote_only_blob_id' + yield manager._encrypt_and_upload(blob_id, BytesIO("X")) + yield manager.fetch_missing() + result = yield manager.local.get(blob_id) + self.assertIsNotNone(result) + self.assertEquals(result.getvalue(), "X") + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_upload_then_delete_updates_list(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) + yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) + yield manager._delete_from_remote('blob_id1') + blobs_list = yield manager.remote_list() + self.assertEquals(set(['blob_id2']), set(blobs_list)) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_upload_then_delete_updates_list_using_namespace(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, uuid4().hex) + namespace = 'special_archives' + yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), + namespace=namespace) + yield manager._encrypt_and_upload('blob_id2', BytesIO("2"), + namespace=namespace) + yield manager._delete_from_remote('blob_id1', namespace=namespace) + blobs_list = yield manager.remote_list(namespace=namespace) + self.assertEquals(set(['blob_id2']), set(blobs_list)) diff --git a/tests/server/test_config.py b/tests/server/test_config.py new file mode 100644 index 00000000..dfb09f4c --- /dev/null +++ b/tests/server/test_config.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# test_config.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for server configuration. +""" + +from twisted.trial import unittest +from pkg_resources import resource_filename + +from leap.soledad.server._config import _load_config +from leap.soledad.server._config import CONFIG_DEFAULTS + + +class ConfigurationParsingTest(unittest.TestCase): + + def setUp(self): + self.maxDiff = None + + def test_use_defaults_on_failure(self): + config = _load_config('this file will never exist') + expected = CONFIG_DEFAULTS + self.assertEquals(expected, config) + + def test_security_values_configuration(self): + # given + config_path = resource_filename('test_soledad', + 'fixture_soledad.conf') + # when + config = _load_config(config_path) + + # then + expected = {'members': ['user1', 'user2'], + 'members_roles': ['role1', 'role2'], + 'admins': ['user3', 'user4'], + 'admins_roles': ['role3', 'role3']} + self.assertDictEqual(expected, config['database-security']) + + def test_server_values_configuration(self): + # given + config_path = resource_filename('test_soledad', + 'fixture_soledad.conf') + # when + config = _load_config(config_path) + + # then + expected = {'couch_url': + 'http://soledad:passwd@localhost:5984', + 'create_cmd': + 'sudo -u soledad-admin /usr/bin/soledad-create-userdb', + 'admin_netrc': + '/etc/couchdb/couchdb-soledad-admin.netrc', + 'batching': False, + 'blobs': False, + 'services_tokens_file': '/etc/soledad/services.tokens', + 'blobs_path': '/var/lib/soledad/blobs'} + self.assertDictEqual(expected, config['soledad-server']) diff --git a/tests/server/test_incoming_flow_integration.py b/tests/server/test_incoming_flow_integration.py new file mode 100644 index 00000000..b492534f --- /dev/null +++ b/tests/server/test_incoming_flow_integration.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# test_incoming_flow_integration.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Integration tests for the complete flow of IncomingBox feature +""" +import pytest +from uuid import uuid4 +from twisted.trial import unittest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +from twisted.web.resource import Resource +from zope.interface import implementer + +from leap.soledad.client.incoming import IncomingBoxProcessingLoop +from leap.soledad.client.incoming import IncomingBox +from leap.soledad.server import _blobs as server_blobs +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._blobs import BlobsServerState +from leap.soledad.client import interfaces + + +@implementer(interfaces.IIncomingBoxConsumer) +class GoodConsumer(object): + def __init__(self): + self.name = 'GoodConsumer' + self.processed, self.saved = [], [] + + def process(self, item, item_id, encrypted=True): + self.processed.append(item_id) + return defer.succeed([item_id]) + + def save(self, parts, item_id): + self.saved.append(item_id) + return defer.succeed(None) + + +class IncomingFlowIntegrationTestCase(unittest.TestCase): + + def setUp(self): + root = Resource() + state = BlobsServerState('filesystem', blobs_path=self.tempdir) + incoming_resource = IncomingResource(state) + blobs_resource = server_blobs.BlobsResource("filesystem", self.tempdir) + root.putChild('blobs', blobs_resource) + root.putChild('incoming', incoming_resource) + site = Site(root) + self.port = reactor.listenTCP(0, site, interface='127.0.0.1') + self.host = self.port.getHost() + self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) + self.blobs_uri = self.uri + 'blobs/' + self.incoming_uri = self.uri + 'incoming' + self.user_id = 'user-' + uuid4().hex + self.secret = 'A' * 96 + self.blob_manager = BlobManager(self.tempdir, self.blobs_uri, + self.secret, self.secret, + self.user_id) + self.box = IncomingBox(self.blob_manager, 'MX') + self.loop = IncomingBoxProcessingLoop(self.box) + # FIXME: We use blob_manager client only to avoid DelayedCalls + # Somehow treq being used here keeps a connection pool open + self.client = self.blob_manager._client + + def fill(self, messages): + deferreds = [] + for message_id, message in messages: + uri = '%s/%s/%s' % (self.incoming_uri, self.user_id, message_id) + deferreds.append(self.blob_manager._client.put(uri, data=message)) + return defer.gatherResults(deferreds) + + def tearDown(self): + self.port.stopListening() + self.blob_manager.close() + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_consume_a_incoming_message(self): + yield self.fill([('msg1', 'blob')]) + consumer = GoodConsumer() + self.loop.add_consumer(consumer) + yield self.loop() + self.assertIn('msg1', consumer.processed) diff --git a/tests/server/test_incoming_resource.py b/tests/server/test_incoming_resource.py new file mode 100644 index 00000000..0d4918b9 --- /dev/null +++ b/tests/server/test_incoming_resource.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# test_incoming_resource.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Unit tests for incoming API resource +""" +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._incoming import IncomingFormatter +from leap.soledad.common.crypto import EncryptionSchemes +from io import BytesIO +from uuid import uuid4 +from mock import Mock + + +class IncomingResourceTestCase(unittest.TestCase): + + def setUp(self): + self.couchdb = Mock() + self.backend_factory = Mock() + self.backend_factory.open_database.return_value = self.couchdb + self.resource = IncomingResource(self.backend_factory) + self.user_uuid = uuid4().hex + + def test_save_document(self): + formatter = IncomingFormatter() + doc_id, scheme = uuid4().hex, EncryptionSchemes.PUBKEY + content = 'Incoming content' + request = DummyRequest([self.user_uuid, doc_id]) + request.content = BytesIO(content) + self.resource.render_PUT(request) + + open_database = self.backend_factory.open_database + open_database.assert_called_once_with(self.user_uuid) + self.couchdb.put_doc.assert_called_once() + doc = self.couchdb.put_doc.call_args[0][0] + self.assertEquals(doc_id, doc.doc_id) + self.assertEquals(formatter.format(content, scheme), doc.content) + + def test_formatter(self): + formatter = IncomingFormatter() + formatted = formatter.format('content', EncryptionSchemes.PUBKEY) + self.assertEquals(formatted['_enc_scheme'], EncryptionSchemes.PUBKEY) diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py new file mode 100644 index 00000000..241bc581 --- /dev/null +++ b/tests/server/test_incoming_server.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# test_incoming_server.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Integration tests for incoming API +""" +import pytest +import json +from io import BytesIO +from uuid import uuid4 +from twisted.web.test.test_web import DummyRequest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +import treq + +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._blobs import BlobsServerState +from leap.soledad.server._incoming import IncomingFormatter +from leap.soledad.common.crypto import EncryptionSchemes +from leap.soledad.common.blobs import Flags +from test_soledad.util import CouchServerStateForTests +from test_soledad.util import CouchDBTestCase + + +class IncomingOnCouchServerTestCase(CouchDBTestCase): + + def setUp(self): + self.port = None + + def tearDown(self): + if self.port: + self.port.stopListening() + + def prepare(self, backend): + self.user_id = 'user-' + uuid4().hex + if backend == 'couch': + self.state = CouchServerStateForTests(self.couch_url) + self.state.ensure_database(self.user_id) + else: + self.state = BlobsServerState(backend) + root = IncomingResource(self.state) + site = Site(root) + self.port = reactor.listenTCP(0, site, interface='127.0.0.1') + self.host = self.port.getHost() + self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_put_incoming_creates_a_document_using_couch(self): + self.prepare('couch') + user_id, doc_id = self.user_id, uuid4().hex + content, scheme = 'Hi', EncryptionSchemes.PUBKEY + formatter = IncomingFormatter() + incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id) + yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) + db = self.state.open_database(user_id) + + doc = db.get_doc(doc_id) + self.assertEquals(doc.content, formatter.format(content, scheme)) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_put_incoming_creates_a_blob_using_filesystem(self): + self.prepare('filesystem') + user_id, doc_id = self.user_id, uuid4().hex + content = 'Hi' + formatter = IncomingFormatter() + incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id) + yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) + + db = self.state.open_database(user_id) + request = DummyRequest([user_id, doc_id]) + yield db.read_blob(user_id, doc_id, request, 'MX') + flags = db.get_flags(user_id, doc_id, request, 'MX') + flags = json.loads(flags) + expected = formatter.preamble(content, doc_id) + ' ' + content + self.assertEquals(expected, request.written[0]) + self.assertIn(Flags.PENDING, flags) diff --git a/tests/server/test_server.py b/tests/server/test_server.py new file mode 100644 index 00000000..25f0cc2d --- /dev/null +++ b/tests/server/test_server.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +# test_server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for server-related functionality. +""" +import binascii +import os +import pytest + +from six.moves.urllib.parse import urljoin +from uuid import uuid4 + +from twisted.internet import defer + +from leap.soledad.common.couch.state import CouchServerState +from leap.soledad.common.couch import CouchDatabase +from test_soledad.u1db_tests import TestCaseWithServer +from test_soledad.util import CouchDBTestCase +from test_soledad.util import ( + make_token_soledad_app, + make_soledad_document_for_test, + soledad_sync_target, +) + +from leap.soledad.client import _crypto +from leap.soledad.client import Soledad + + +@pytest.mark.needs_couch +@pytest.mark.usefixtures("method_tmpdir") +class EncryptedSyncTestCase( + CouchDBTestCase, TestCaseWithServer): + + """ + Tests for encrypted sync using Soledad server backed by a couch database. + """ + + # increase twisted.trial's timeout because large files syncing might take + # some time to finish. + timeout = 500 + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_soledad_document_for_test + + sync_target = soledad_sync_target + + def _soledad_instance(self, user=None, passphrase=u'123', + prefix='', + secrets_path='secrets.json', + local_db_path='soledad.u1db', + server_url='', + cert_file=None, auth_token=None): + """ + Instantiate Soledad. + """ + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + if not server_url: + # attempt to find the soledad server url + server_address = None + server = getattr(self, 'server', None) + if server: + server_address = getattr(self.server, 'server_address', None) + else: + host = self.port.getHost() + server_address = (host.host, host.port) + if server_address: + server_url = 'http://%s:%d' % (server_address) + + return Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + shared_db=self.get_default_shared_mock(_put_doc_side_effect)) + + def make_app(self): + self.request_state = CouchServerState(self.couch_url) + return self.make_app_with_state(self.request_state) + + def setUp(self): + CouchDBTestCase.setUp(self) + TestCaseWithServer.setUp(self) + + def tearDown(self): + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + def _test_encrypted_sym_sync(self, passphrase=u'123', doc_size=2, + number_of_docs=1): + """ + Test the complete syncing chain between two soledad dbs using a + Soledad server backed by a couch database. + """ + self.startTwistedServer() + user = 'user-' + uuid4().hex + + # this will store all docs ids to avoid get_all_docs + created_ids = [] + + # instantiate soledad and create a document + sol1 = self._soledad_instance( + user=user, + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token', + passphrase=passphrase) + + # instantiate another soledad using the same secret as the previous + # one (so we can correctly verify the mac of the synced document) + sol2 = self._soledad_instance( + user=user, + prefix='x', + auth_token='auth-token', + secrets_path=sol1.secrets_path, + passphrase=passphrase) + + # ensure remote db exists before syncing + db = CouchDatabase.open_database( + urljoin(self.couch_url, 'user-' + user), + create=True) + + def _db1AssertEmptyDocList(results): + _, doclist = results + self.assertEqual([], doclist) + + def _db1CreateDocs(results): + deferreds = [] + for i in xrange(number_of_docs): + content = binascii.hexlify(os.urandom(doc_size / 2)) + d = sol1.create_doc({'data': content}) + d.addCallback(created_ids.append) + deferreds.append(d) + return defer.DeferredList(deferreds) + + def _db1AssertDocsSyncedToServer(results): + self.assertEqual(number_of_docs, len(created_ids)) + for soldoc in created_ids: + couchdoc = db.get_doc(soldoc.doc_id) + self.assertTrue(couchdoc) + # assert document structure in couch server + self.assertEqual(soldoc.doc_id, couchdoc.doc_id) + self.assertEqual(soldoc.rev, couchdoc.rev) + couch_content = couchdoc.content.keys() + self.assertEqual(['raw'], couch_content) + content = couchdoc.get_json() + self.assertTrue(_crypto.is_symmetrically_encrypted(content)) + + d = sol1.get_all_docs() + d.addCallback(_db1AssertEmptyDocList) + d.addCallback(_db1CreateDocs) + d.addCallback(lambda _: sol1.sync()) + d.addCallback(_db1AssertDocsSyncedToServer) + + def _db2AssertEmptyDocList(results): + _, doclist = results + self.assertEqual([], doclist) + + def _getAllDocsFromBothDbs(results): + d1 = sol1.get_all_docs() + d2 = sol2.get_all_docs() + return defer.DeferredList([d1, d2]) + + d.addCallback(lambda _: sol2.get_all_docs()) + d.addCallback(_db2AssertEmptyDocList) + d.addCallback(lambda _: sol2.sync()) + d.addCallback(_getAllDocsFromBothDbs) + + def _assertDocSyncedFromDb1ToDb2(results): + r1, r2 = results + _, (gen1, doclist1) = r1 + _, (gen2, doclist2) = r2 + self.assertEqual(number_of_docs, gen1) + self.assertEqual(number_of_docs, gen2) + self.assertEqual(number_of_docs, len(doclist1)) + self.assertEqual(number_of_docs, len(doclist2)) + self.assertEqual(doclist1[0], doclist2[0]) + + d.addCallback(_assertDocSyncedFromDb1ToDb2) + + def _cleanUp(results): + db.delete_database() + db.close() + sol1.close() + sol2.close() + + d.addCallback(_cleanUp) + + return d + + def test_encrypted_sym_sync(self): + return self._test_encrypted_sym_sync() + + def test_encrypted_sym_sync_with_unicode_passphrase(self): + """ + Test the complete syncing chain between two soledad dbs using a + Soledad server backed by a couch database, using an unicode + passphrase. + """ + return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëíìïóòöõúùüñç') + + def test_sync_many_small_files(self): + """ + Test if Soledad can sync many smallfiles. + """ + return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100) diff --git a/tests/server/test_session.py b/tests/server/test_session.py new file mode 100644 index 00000000..3dbd2740 --- /dev/null +++ b/tests/server/test_session.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# test_session.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for server session entrypoint. +""" +from twisted.trial import unittest + +from twisted.cred import portal +from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse +from twisted.cred.credentials import IUsernamePassword +from twisted.web.resource import getChildForRequest +from twisted.web.static import Data +from twisted.web.test.requesthelper import DummyRequest +from twisted.web.test.test_httpauth import b64encode +from twisted.web.test.test_httpauth import Realm +from twisted.web._auth.wrapper import UnauthorizedResource + +from leap.soledad.server.session import SoledadSession + + +class SoledadSessionTestCase(unittest.TestCase): + """ + Tests adapted from for + L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. + """ + + def makeRequest(self, *args, **kwargs): + request = DummyRequest(*args, **kwargs) + request.path = '/' + return request + + def setUp(self): + self.username = b'foo bar' + self.password = b'bar baz' + self.avatarContent = b"contents of the avatar resource itself" + self.childName = b"foo-child" + self.childContent = b"contents of the foo child of the avatar" + self.checker = InMemoryUsernamePasswordDatabaseDontUse() + self.checker.addUser(self.username, self.password) + self.avatar = Data(self.avatarContent, 'text/plain') + self.avatar.putChild( + self.childName, Data(self.childContent, 'text/plain')) + self.avatars = {self.username: self.avatar} + self.realm = Realm(self.avatars.get) + self.portal = portal.Portal(self.realm, [self.checker]) + self.wrapper = SoledadSession(self.portal) + + def _authorizedTokenLogin(self, request): + authorization = b64encode( + self.username + b':' + self.password) + request.requestHeaders.addRawHeader(b'authorization', + b'Token ' + authorization) + return getChildForRequest(self.wrapper, request) + + def test_getChildWithDefault(self): + request = self.makeRequest([self.childName]) + child = getChildForRequest(self.wrapper, request) + d = request.notifyFinish() + + def cbFinished(result): + self.assertEqual(request.responseCode, 401) + + d.addCallback(cbFinished) + request.render(child) + return d + + def _invalidAuthorizationTest(self, response): + request = self.makeRequest([self.childName]) + request.requestHeaders.addRawHeader(b'authorization', response) + child = getChildForRequest(self.wrapper, request) + d = request.notifyFinish() + + def cbFinished(result): + self.assertEqual(request.responseCode, 401) + + d.addCallback(cbFinished) + request.render(child) + return d + + def test_getChildWithDefaultUnauthorizedUser(self): + return self._invalidAuthorizationTest( + b'Basic ' + b64encode(b'foo:bar')) + + def test_getChildWithDefaultUnauthorizedPassword(self): + return self._invalidAuthorizationTest( + b'Basic ' + b64encode(self.username + b':bar')) + + def test_getChildWithDefaultUnrecognizedScheme(self): + return self._invalidAuthorizationTest(b'Quux foo bar baz') + + def test_getChildWithDefaultAuthorized(self): + request = self.makeRequest([self.childName]) + child = self._authorizedTokenLogin(request) + d = request.notifyFinish() + + def cbFinished(ignored): + self.assertEqual(request.written, [self.childContent]) + + d.addCallback(cbFinished) + request.render(child) + return d + + def test_renderAuthorized(self): + # Request it exactly, not any of its children. + request = self.makeRequest([]) + child = self._authorizedTokenLogin(request) + d = request.notifyFinish() + + def cbFinished(ignored): + self.assertEqual(request.written, [self.avatarContent]) + + d.addCallback(cbFinished) + request.render(child) + return d + + def test_decodeRaises(self): + request = self.makeRequest([self.childName]) + request.requestHeaders.addRawHeader(b'authorization', + b'Token decode should fail') + child = getChildForRequest(self.wrapper, request) + self.assertIsInstance(child, UnauthorizedResource) + + def test_parseResponse(self): + basicAuthorization = b'Basic abcdef123456' + self.assertEqual( + self.wrapper._parseHeader(basicAuthorization), + None) + tokenAuthorization = b'Token abcdef123456' + self.assertEqual( + self.wrapper._parseHeader(tokenAuthorization), + b'abcdef123456') + + def test_unexpectedDecodeError(self): + + class UnexpectedException(Exception): + pass + + class BadFactory(object): + scheme = b'bad' + + def getChallenge(self, client): + return {} + + def decode(self, response, request): + print("decode raised") + raise UnexpectedException() + + self.wrapper._credentialFactory = BadFactory() + request = self.makeRequest([self.childName]) + request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') + child = getChildForRequest(self.wrapper, request) + request.render(child) + self.assertEqual(request.responseCode, 500) + errors = self.flushLoggedErrors(UnexpectedException) + self.assertEqual(len(errors), 1) + + def test_unexpectedLoginError(self): + class UnexpectedException(Exception): + pass + + class BrokenChecker(object): + credentialInterfaces = (IUsernamePassword,) + + def requestAvatarId(self, credentials): + raise UnexpectedException() + + self.portal.registerChecker(BrokenChecker()) + request = self.makeRequest([self.childName]) + child = self._authorizedTokenLogin(request) + request.render(child) + self.assertEqual(request.responseCode, 500) + self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1) + + def test_cantAccessOtherUserPathByDefault(self): + request = self.makeRequest([]) + # valid url_mapper path, but for another user + request.path = '/blobs/another-user/' + child = self._authorizedTokenLogin(request) + + request.render(child) + self.assertEqual(request.responseCode, 500) diff --git a/tests/server/test_shared_db.py b/tests/server/test_shared_db.py new file mode 100644 index 00000000..96af6dff --- /dev/null +++ b/tests/server/test_shared_db.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# test_shared_db.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the shared db on server side. +""" + + +import pytest + +from twisted.trial import unittest + +from leap.soledad.client.shared_db import SoledadSharedDatabase +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.l2db.errors import RevisionConflict + + +@pytest.mark.needs_couch +class SharedDbTests(unittest.TestCase): + """ + """ + + URL = 'http://127.0.0.1:2424/shared' + CREDS = {'token': {'uuid': 'an-uuid', 'token': 'an-auth-token'}} + + @pytest.fixture(autouse=True) + def soledad_client(self, soledad_server, soledad_dbs): + soledad_dbs('an-uuid') + self._db = SoledadSharedDatabase.open_database(self.URL, self.CREDS) + + @pytest.mark.thisone + def test_doc_update_succeeds(self): + doc_id = 'some-random-doc' + self.assertIsNone(self._db.get_doc(doc_id)) + # create a document in shared db + doc = SoledadDocument(doc_id=doc_id) + self._db.put_doc(doc) + # update that document + expected = {'new': 'content'} + doc.content = expected + self._db.put_doc(doc) + # ensure expected content was saved + doc = self._db.get_doc(doc_id) + self.assertEqual(expected, doc.content) + + @pytest.mark.thisone + def test_doc_update_fails_with_wrong_rev(self): + # create a document in shared db + doc_id = 'some-random-doc' + self.assertIsNone(self._db.get_doc(doc_id)) + # create a document in shared db + doc = SoledadDocument(doc_id=doc_id) + self._db.put_doc(doc) + # try to update document without including revision of old version + doc.rev = 'wrong-rev' + self.assertRaises(RevisionConflict, self._db.put_doc, doc) diff --git a/tests/server/test_soledad b/tests/server/test_soledad new file mode 120000 index 00000000..c1a35d32 --- /dev/null +++ b/tests/server/test_soledad @@ -0,0 +1 @@ +../test_soledad \ No newline at end of file diff --git a/tests/server/test_tac.py b/tests/server/test_tac.py new file mode 100644 index 00000000..7bb50e35 --- /dev/null +++ b/tests/server/test_tac.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# test_tac.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the localhost/public APIs using .tac file. +See docs/auth.rst +""" + + +import os +import signal +import socket +import pytest +import treq + +from pkg_resources import resource_filename +from twisted.trial import unittest +from twisted.internet import defer, reactor +from twisted.internet.protocol import ProcessProtocol +from twisted.web.client import Agent + + +TAC_FILE_PATH = resource_filename('leap.soledad.server', 'server.tac') + + +class TacServerTestCase(unittest.TestCase): + + def test_tac_file_exists(self): + msg = "server.tac used on this test case was expected to be at %s" + self.assertTrue(os.path.isfile(TAC_FILE_PATH), msg % TAC_FILE_PATH) + + @defer.inlineCallbacks + def test_local_public_default_ports_on_server_tac(self): + yield self._spawnServer() + result = yield self._get('http://localhost:2525/incoming') + fail_msg = "Localhost endpoint must require authentication!" + self.assertEquals(401, result.code, fail_msg) + + public_endpoint_url = 'http://%s:2424/' % self._get_public_ip() + result = yield self._get(public_endpoint_url) + self.assertEquals(200, result.code, "server info not accessible") + + result = yield self._get(public_endpoint_url + 'other') + self.assertEquals(401, result.code, "public server lacks auth!") + + public_using_local_port_url = 'http://%s:2525/' % self._get_public_ip() + with pytest.raises(Exception): + yield self._get(public_using_local_port_url) + + def _spawnServer(self): + protocol = ProcessProtocol() + env = os.environ.get('VIRTUAL_ENV', '/usr') + executable = os.path.join(env, 'bin', 'twistd') + no_pid_argument = '--pidfile=' + args = [executable, no_pid_argument, '-noy', TAC_FILE_PATH] + env = {'DEBUG_SERVER': 'yes'} + t = reactor.spawnProcess(protocol, executable, args, env=env) + self.addCleanup(os.kill, t.pid, signal.SIGKILL) + self.addCleanup(t.loseConnection) + return self._sleep(1) # it takes a while to start server + + def _sleep(self, time): + d = defer.Deferred() + reactor.callLater(time, d.callback, True) + return d + + def _get(self, *args, **kwargs): + kwargs['agent'] = Agent(reactor) + return treq.get(*args, **kwargs) + + def _get_public_ip(self): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] diff --git a/tests/server/test_url_mapper.py b/tests/server/test_url_mapper.py new file mode 100644 index 00000000..a04e7593 --- /dev/null +++ b/tests/server/test_url_mapper.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# test_url_mapper.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for server-related functionality. +""" +import pytest + +from twisted.trial import unittest +from uuid import uuid4 + +from leap.soledad.server.url_mapper import URLMapper + + +class URLMapperTestCase(unittest.TestCase): + """ + Test if the URLMapper behaves as expected. + + The following table lists the authorized actions among all possible + u1db remote actions: + + URL path | Authorized actions + -------------------------------------------------- + / | GET + /shared-db | GET + /shared-db/docs | - + /shared-db/doc/{id} | - + /shared-db/sync-from/{source} | - + /user-db | - + /user-db/docs | - + /user-db/doc/{id} | - + /user-db/sync-from/{source} | GET, PUT, POST + """ + + def setUp(self): + self._uuid = uuid4().hex + self._urlmap = URLMapper() + self._dbname = 'user-%s' % self._uuid + + @pytest.mark.needs_couch + def test_root_authorized(self): + match = self._urlmap.match('/', 'GET') + self.assertIsNotNone(match) + + def test_shared_authorized(self): + self.assertIsNotNone(self._urlmap.match('/shared', 'GET')) + + def test_shared_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared', 'PUT')) + self.assertIsNone(self._urlmap.match('/shared', 'DELETE')) + self.assertIsNone(self._urlmap.match('/shared', 'POST')) + + def test_shared_docs_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared/docs', 'GET')) + self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT')) + self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE')) + self.assertIsNone(self._urlmap.match('/shared/docs', 'POST')) + + def test_shared_doc_authorized(self): + match = self._urlmap.match('/shared/doc/x', 'GET') + self.assertIsNotNone(match) + self.assertEqual('x', match.get('id')) + + match = self._urlmap.match('/shared/doc/x', 'PUT') + self.assertIsNotNone(match) + self.assertEqual('x', match.get('id')) + + match = self._urlmap.match('/shared/doc/x', 'DELETE') + self.assertIsNotNone(match) + self.assertEqual('x', match.get('id')) + + def test_shared_doc_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST')) + + def test_shared_sync_unauthorized(self): + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET')) + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT')) + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE')) + self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST')) + + def test_user_db_unauthorized(self): + dbname = self._dbname + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET')) + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT')) + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE')) + self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST')) + + def test_user_db_docs_unauthorized(self): + dbname = self._dbname + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET')) + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT')) + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE')) + self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST')) + + def test_user_db_doc_unauthorized(self): + dbname = self._dbname + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET')) + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT')) + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE')) + self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST')) + + def test_user_db_sync_authorized(self): + uuid = self._uuid + dbname = self._dbname + match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET') + self.assertEqual(uuid, match.get('uuid')) + self.assertEqual('x', match.get('source_replica_uid')) + + match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT') + self.assertEqual(uuid, match.get('uuid')) + self.assertEqual('x', match.get('source_replica_uid')) + + match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST') + self.assertEqual(uuid, match.get('uuid')) + self.assertEqual('x', match.get('source_replica_uid')) + + def test_user_db_sync_unauthorized(self): + dbname = self._dbname + self.assertIsNone( + self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE')) -- cgit v1.2.3