diff options
Diffstat (limited to 'tests/server')
| -rw-r--r-- | tests/server/__init__.py | 0 | ||||
| -rw-r--r-- | tests/server/test__resource.py | 85 | ||||
| -rw-r--r-- | tests/server/test__server_info.py | 43 | ||||
| -rw-r--r-- | tests/server/test_auth.py | 138 | ||||
| -rw-r--r-- | tests/server/test_blobs_resource_validation.py | 63 | ||||
| -rw-r--r-- | tests/server/test_blobs_server.py | 286 | ||||
| -rw-r--r-- | tests/server/test_config.py | 70 | ||||
| -rw-r--r-- | tests/server/test_incoming_flow_integration.py | 97 | ||||
| -rw-r--r-- | tests/server/test_incoming_resource.py | 57 | ||||
| -rw-r--r-- | tests/server/test_incoming_server.py | 92 | ||||
| -rw-r--r-- | tests/server/test_server.py | 230 | ||||
| -rw-r--r-- | tests/server/test_session.py | 195 | ||||
| -rw-r--r-- | tests/server/test_shared_db.py | 69 | ||||
| l--------- | tests/server/test_soledad | 1 | ||||
| -rw-r--r-- | tests/server/test_tac.py | 87 | ||||
| -rw-r--r-- | tests/server/test_url_mapper.py | 133 | 
16 files changed, 1646 insertions, 0 deletions
diff --git a/tests/server/__init__.py b/tests/server/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/server/__init__.py diff --git a/tests/server/test__resource.py b/tests/server/test__resource.py new file mode 100644 index 00000000..a43ac19f --- /dev/null +++ b/tests/server/test__resource.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# test__resource.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for Soledad server main resource. +""" +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from twisted.web.wsgi import WSGIResource +from twisted.web.resource import getChildForRequest +from twisted.internet import reactor + +from leap.soledad.server._resource import PublicResource +from leap.soledad.server._resource import LocalResource +from leap.soledad.server._server_info import ServerInfo +from leap.soledad.server._blobs import BlobsResource +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server.gzip_middleware import GzipMiddleware + + +_pool = reactor.getThreadPool() + + +class PublicResourceTestCase(unittest.TestCase): + +    def test_get_root(self): +        blobs_resource = None  # doesn't matter +        resource = PublicResource( +            blobs_resource=blobs_resource, sync_pool=_pool) +        request = DummyRequest(['']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, ServerInfo) + +    def test_get_blobs_enabled(self): +        blobs_resource = BlobsResource("filesystem", '/tmp') +        resource = PublicResource( +            blobs_resource=blobs_resource, sync_pool=_pool) +        request = DummyRequest(['blobs']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, BlobsResource) + +    def test_get_blobs_disabled(self): +        blobs_resource = None +        resource = PublicResource( +            blobs_resource=blobs_resource, sync_pool=_pool) +        request = DummyRequest(['blobs']) +        child = getChildForRequest(resource, request) +        # if blobs is disabled, the request should be routed to sync +        self.assertIsInstance(child, WSGIResource) +        self.assertIsInstance(child._application, GzipMiddleware) + +    def test_get_sync(self): +        blobs_resource = None  # doesn't matter +        resource = PublicResource( +            blobs_resource=blobs_resource, sync_pool=_pool) +        request = DummyRequest(['user-db', 'sync-from', 'source-id']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, WSGIResource) +        self.assertIsInstance(child._application, GzipMiddleware) + +    def test_no_incoming_on_public_resource(self): +        resource = PublicResource(None, sync_pool=_pool) +        request = DummyRequest(['incoming']) +        child = getChildForRequest(resource, request) +        # WSGIResource is returned if a path is unknown +        self.assertIsInstance(child, WSGIResource) + +    def test_get_incoming(self): +        resource = LocalResource() +        request = DummyRequest(['incoming']) +        child = getChildForRequest(resource, request) +        self.assertIsInstance(child, IncomingResource) diff --git a/tests/server/test__server_info.py b/tests/server/test__server_info.py new file mode 100644 index 00000000..40567ef1 --- /dev/null +++ b/tests/server/test__server_info.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# test__server_info.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Tests for Soledad server information announcement. +""" +import json + +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest + +from leap.soledad.server._server_info import ServerInfo + + +class ServerInfoTestCase(unittest.TestCase): + +    def test_blobs_enabled(self): +        resource = ServerInfo(True) +        response = resource.render(DummyRequest([''])) +        _info = json.loads(response) +        self.assertEquals(_info['blobs'], True) +        self.assertTrue(isinstance(_info['version'], basestring)) + +    def test_blobs_disabled(self): +        resource = ServerInfo(False) +        response = resource.render(DummyRequest([''])) +        _info = json.loads(response) +        self.assertEquals(_info['blobs'], False) +        self.assertTrue(isinstance(_info['version'], basestring)) diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py new file mode 100644 index 00000000..78cf20ab --- /dev/null +++ b/tests/server/test_auth.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# test_auth.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for auth pieces. +""" +import os +import collections +import pytest + +from contextlib import contextmanager + +from twisted.cred.credentials import UsernamePassword +from twisted.cred.error import UnauthorizedLogin +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks +from twisted.trial import unittest +from twisted.web.resource import IResource +from twisted.web.test import test_httpauth + +import leap.soledad.server.auth as auth_module +from leap.soledad.server.auth import SoledadRealm +from leap.soledad.server.auth import CouchDBTokenChecker +from leap.soledad.server.auth import FileTokenChecker +from leap.soledad.server.auth import TokenCredentialFactory +from leap.soledad.server._resource import PublicResource + + +class SoledadRealmTestCase(unittest.TestCase): + +    def test_returned_resource(self): +        # we have to pass a pool to the realm , otherwise tests will hang +        conf = {'blobs': False} +        pool = reactor.getThreadPool() +        realm = SoledadRealm(conf=conf, sync_pool=pool) +        iface, avatar, logout = realm.requestAvatar('any', None, IResource) +        self.assertIsInstance(avatar, PublicResource) +        self.assertIsNone(logout()) + + +class DummyServer(object): +    """ +    I fake the `couchdb.client.Server` GET api and always return the token +    given on my creation. +    """ + +    def __init__(self, token): +        self._token = token + +    def get(self, _): +        return self._token + + +@contextmanager +def dummy_server(token): +    yield collections.defaultdict(lambda: DummyServer(token)) + + +class CouchDBTokenCheckerTestCase(unittest.TestCase): + +    @inlineCallbacks +    def test_good_creds(self): +        # set up a dummy server which always return a *valid* token document +        token = {'user_id': 'user', 'type': 'Token'} +        server = dummy_server(token) +        # setup the checker with the custom server +        checker = CouchDBTokenChecker() +        auth_module.couch_server = lambda url: server +        # assert the checker *can* verify the creds +        creds = UsernamePassword('user', 'pass') +        avatarId = yield checker.requestAvatarId(creds) +        self.assertEqual('user', avatarId) + +    @inlineCallbacks +    def test_bad_creds(self): +        # set up a dummy server which always return an *invalid* token document +        token = None +        server = dummy_server(token) +        # setup the checker with the custom server +        checker = CouchDBTokenChecker() +        auth_module.couch_server = lambda url: server +        # assert the checker *cannot* verify the creds +        creds = UsernamePassword('user', '') +        with self.assertRaises(UnauthorizedLogin): +            yield checker.requestAvatarId(creds) + + +class FileTokenCheckerTestCase(unittest.TestCase): + +    @inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_good_creds(self): +        auth_file_path = os.path.join(self.tempdir, 'auth.file') +        with open(auth_file_path, 'w') as tempfile: +            tempfile.write('goodservice:goodtoken') +        # setup the checker with the auth tokens file +        conf = {'services_tokens_file': auth_file_path} +        checker = FileTokenChecker(conf) +        # assert the checker *can* verify the creds +        creds = UsernamePassword('goodservice', 'goodtoken') +        avatarId = yield checker.requestAvatarId(creds) +        self.assertEqual('goodservice', avatarId) + +    @inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_bad_creds(self): +        auth_file_path = os.path.join(self.tempdir, 'auth.file') +        with open(auth_file_path, 'w') as tempfile: +            tempfile.write('service:token') +        # setup the checker with the auth tokens file +        conf = {'services_tokens_file': auth_file_path} +        checker = FileTokenChecker(conf) +        # assert the checker *cannot* verify the creds +        creds = UsernamePassword('service', 'wrongtoken') +        with self.assertRaises(UnauthorizedLogin): +            yield checker.requestAvatarId(creds) + + +class TokenCredentialFactoryTestcase( +        test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin, +        unittest.TestCase): + +    def setUp(self): +        test_httpauth.BasicAuthTestsMixin.setUp(self) +        self.credentialFactory = TokenCredentialFactory() diff --git a/tests/server/test_blobs_resource_validation.py b/tests/server/test_blobs_resource_validation.py new file mode 100644 index 00000000..9f6dfc2f --- /dev/null +++ b/tests/server/test_blobs_resource_validation.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# test_blobs_resource_validation.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for invalid user or blob_id on blobs resource +""" +import pytest +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from leap.soledad.server import _blobs as server_blobs + + +class BlobServerTestCase(unittest.TestCase): + +    @pytest.mark.usefixtures("method_tmpdir") +    def setUp(self): +        self.resource = server_blobs.BlobsResource("filesystem", self.tempdir) + +    @pytest.mark.usefixtures("method_tmpdir") +    def test_valid_arguments(self): +        request = DummyRequest(['v4l1d-us3r', 'v4l1d-bl0b-1d']) +        self.assertTrue(self.resource._validate(request)) + +    @pytest.mark.usefixtures("method_tmpdir") +    def test_invalid_user_get(self): +        request = DummyRequest(['invalid user', 'valid-blob-id']) +        request.path = '/blobs/' +        with pytest.raises(Exception): +            self.resource.render_GET(request) + +    @pytest.mark.usefixtures("method_tmpdir") +    def test_invalid_user_put(self): +        request = DummyRequest(['invalid user', 'valid-blob-id']) +        request.path = '/blobs/' +        with pytest.raises(Exception): +            self.resource.render_PUT(request) + +    @pytest.mark.usefixtures("method_tmpdir") +    def test_invalid_blob_id_get(self): +        request = DummyRequest(['valid-user', 'invalid blob id']) +        request.path = '/blobs/' +        with pytest.raises(Exception): +            self.resource.render_GET(request) + +    @pytest.mark.usefixtures("method_tmpdir") +    def test_invalid_blob_id_put(self): +        request = DummyRequest(['valid-user', 'invalid blob id']) +        request.path = '/blobs/' +        with pytest.raises(Exception): +            self.resource.render_PUT(request) diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py new file mode 100644 index 00000000..9eddf108 --- /dev/null +++ b/tests/server/test_blobs_server.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +# test_blobs_server.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Integration tests for blobs server +""" +import os +import pytest +from uuid import uuid4 +from io import BytesIO +from twisted.trial import unittest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +from treq._utils import set_global_pool + +from leap.soledad.common.blobs import Flags +from leap.soledad.server import _blobs as server_blobs +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.client._db.blobs import BlobAlreadyExistsError +from leap.soledad.client._db.blobs import InvalidFlagsError +from leap.soledad.client._db.blobs import SoledadError + + +class BlobServerTestCase(unittest.TestCase): + +    def setUp(self): +        root = server_blobs.BlobsResource("filesystem", self.tempdir) +        site = Site(root) +        self.port = reactor.listenTCP(0, site, interface='127.0.0.1') +        self.host = self.port.getHost() +        self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) +        self.secret = 'A' * 96 +        set_global_pool(None) + +    def tearDown(self): +        self.port.stopListening() + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_upload_download(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("save me") +        yield manager._encrypt_and_upload('blob_id', fd) +        blob, size = yield manager._download_and_decrypt('blob_id') +        self.assertEquals(blob.getvalue(), "save me") + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_set_get_flags(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("flag me") +        yield manager._encrypt_and_upload('blob_id', fd) +        yield manager.set_flags('blob_id', [Flags.PROCESSING]) +        flags = yield manager.get_flags('blob_id') +        self.assertEquals([Flags.PROCESSING], flags) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_set_flags_raises_if_no_blob_found(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        with pytest.raises(SoledadError): +            yield manager.set_flags('missing_id', [Flags.PENDING]) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_list_filter_flag(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("flag me") +        yield manager._encrypt_and_upload('blob_id', fd) +        yield manager.set_flags('blob_id', [Flags.PROCESSING]) +        blobs_list = yield manager.remote_list(filter_flag=Flags.PENDING) +        self.assertEquals([], blobs_list) +        blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING) +        self.assertEquals(['blob_id'], blobs_list) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_list_filter_flag_order_by_date(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        yield manager._encrypt_and_upload('blob_id1', BytesIO("x")) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("x")) +        yield manager._encrypt_and_upload('blob_id3', BytesIO("x")) +        yield manager.set_flags('blob_id1', [Flags.PROCESSING]) +        yield manager.set_flags('blob_id2', [Flags.PROCESSING]) +        yield manager.set_flags('blob_id3', [Flags.PROCESSING]) +        blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING, +                                               order_by='+date') +        expected_list = ['blob_id1', 'blob_id2', 'blob_id3'] +        self.assertEquals(expected_list, blobs_list) +        blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING, +                                               order_by='-date') +        self.assertEquals(list(reversed(expected_list)), blobs_list) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_cant_set_invalid_flags(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("flag me") +        yield manager._encrypt_and_upload('blob_id', fd) +        with pytest.raises(InvalidFlagsError): +            yield manager.set_flags('blob_id', ['invalid']) +        flags = yield manager.get_flags('blob_id') +        self.assertEquals([], flags) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_get_empty_flags(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("flag me") +        yield manager._encrypt_and_upload('blob_id', fd) +        flags = yield manager.get_flags('blob_id') +        self.assertEquals([], flags) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_flags_ignored_by_listing(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("flag me") +        yield manager._encrypt_and_upload('blob_id', fd) +        yield manager.set_flags('blob_id', [Flags.PROCESSING]) +        blobs_list = yield manager.remote_list() +        self.assertEquals(['blob_id'], blobs_list) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_upload_changes_remote_list(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) +        blobs_list = yield manager.remote_list() +        self.assertEquals(set(['blob_id1', 'blob_id2']), set(blobs_list)) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_list_orders_by_date(self): +        user_uid = uuid4().hex +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, user_uid) +        yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) +        blobs_list = yield manager.remote_list(order_by='date') +        self.assertEquals(['blob_id1', 'blob_id2'], blobs_list) +        parts = [user_uid, 'default', 'b', 'blo', 'blob_i', 'blob_id1'] +        self.__touch(self.tempdir, *parts) +        blobs_list = yield manager.remote_list(order_by='+date') +        self.assertEquals(['blob_id2', 'blob_id1'], blobs_list) +        blobs_list = yield manager.remote_list(order_by='-date') +        self.assertEquals(['blob_id1', 'blob_id2'], blobs_list) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_count(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        deferreds = [] +        for i in range(10): +            deferreds.append(manager._encrypt_and_upload(str(i), BytesIO("1"))) +        yield defer.gatherResults(deferreds) + +        result = yield manager.count() +        self.assertEquals({"count": len(deferreds)}, result) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_list_restricted_by_namespace(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        namespace = 'incoming' +        yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), +                                          namespace=namespace) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) +        blobs_list = yield manager.remote_list(namespace=namespace) +        self.assertEquals(['blob_id1'], blobs_list) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_list_default_doesnt_list_other_namespaces(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        namespace = 'incoming' +        yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), +                                          namespace=namespace) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) +        blobs_list = yield manager.remote_list() +        self.assertEquals(['blob_id2'], blobs_list) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_download_from_namespace(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        namespace, blob_id, content = 'incoming', 'blob_id1', 'test' +        yield manager._encrypt_and_upload(blob_id, BytesIO(content), +                                          namespace=namespace) +        got_blob = yield manager._download_and_decrypt(blob_id, namespace) +        self.assertEquals(content, got_blob[0].getvalue()) + +    def __touch(self, *args): +        path = os.path.join(*args) +        with open(path, 'a'): +            os.utime(path, None) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_upload_deny_duplicates(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        fd = BytesIO("save me") +        yield manager._encrypt_and_upload('blob_id', fd) +        fd = BytesIO("save me") +        with pytest.raises(BlobAlreadyExistsError): +            yield manager._encrypt_and_upload('blob_id', fd) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_send_missing(self): +        manager = BlobManager(self.tempdir, self.uri, self.secret, +                              self.secret, uuid4().hex) +        self.addCleanup(manager.close) +        blob_id = 'local_only_blob_id' +        yield manager.local.put(blob_id, BytesIO("X"), size=1) +        yield manager.send_missing() +        result = yield manager._download_and_decrypt(blob_id) +        self.assertIsNotNone(result) +        self.assertEquals(result[0].getvalue(), "X") + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_fetch_missing(self): +        manager = BlobManager(self.tempdir, self.uri, self.secret, +                              self.secret, uuid4().hex) +        self.addCleanup(manager.close) +        blob_id = 'remote_only_blob_id' +        yield manager._encrypt_and_upload(blob_id, BytesIO("X")) +        yield manager.fetch_missing() +        result = yield manager.local.get(blob_id) +        self.assertIsNotNone(result) +        self.assertEquals(result.getvalue(), "X") + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_upload_then_delete_updates_list(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        yield manager._encrypt_and_upload('blob_id1', BytesIO("1")) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("2")) +        yield manager._delete_from_remote('blob_id1') +        blobs_list = yield manager.remote_list() +        self.assertEquals(set(['blob_id2']), set(blobs_list)) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_upload_then_delete_updates_list_using_namespace(self): +        manager = BlobManager('', self.uri, self.secret, +                              self.secret, uuid4().hex) +        namespace = 'special_archives' +        yield manager._encrypt_and_upload('blob_id1', BytesIO("1"), +                                          namespace=namespace) +        yield manager._encrypt_and_upload('blob_id2', BytesIO("2"), +                                          namespace=namespace) +        yield manager._delete_from_remote('blob_id1', namespace=namespace) +        blobs_list = yield manager.remote_list(namespace=namespace) +        self.assertEquals(set(['blob_id2']), set(blobs_list)) diff --git a/tests/server/test_config.py b/tests/server/test_config.py new file mode 100644 index 00000000..dfb09f4c --- /dev/null +++ b/tests/server/test_config.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# test_config.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server configuration. +""" + +from twisted.trial import unittest +from pkg_resources import resource_filename + +from leap.soledad.server._config import _load_config +from leap.soledad.server._config import CONFIG_DEFAULTS + + +class ConfigurationParsingTest(unittest.TestCase): + +    def setUp(self): +        self.maxDiff = None + +    def test_use_defaults_on_failure(self): +        config = _load_config('this file will never exist') +        expected = CONFIG_DEFAULTS +        self.assertEquals(expected, config) + +    def test_security_values_configuration(self): +        # given +        config_path = resource_filename('test_soledad', +                                        'fixture_soledad.conf') +        # when +        config = _load_config(config_path) + +        # then +        expected = {'members': ['user1', 'user2'], +                    'members_roles': ['role1', 'role2'], +                    'admins': ['user3', 'user4'], +                    'admins_roles': ['role3', 'role3']} +        self.assertDictEqual(expected, config['database-security']) + +    def test_server_values_configuration(self): +        # given +        config_path = resource_filename('test_soledad', +                                        'fixture_soledad.conf') +        # when +        config = _load_config(config_path) + +        # then +        expected = {'couch_url': +                    'http://soledad:passwd@localhost:5984', +                    'create_cmd': +                    'sudo -u soledad-admin /usr/bin/soledad-create-userdb', +                    'admin_netrc': +                    '/etc/couchdb/couchdb-soledad-admin.netrc', +                    'batching': False, +                    'blobs': False, +                    'services_tokens_file': '/etc/soledad/services.tokens', +                    'blobs_path': '/var/lib/soledad/blobs'} +        self.assertDictEqual(expected, config['soledad-server']) diff --git a/tests/server/test_incoming_flow_integration.py b/tests/server/test_incoming_flow_integration.py new file mode 100644 index 00000000..b492534f --- /dev/null +++ b/tests/server/test_incoming_flow_integration.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# test_incoming_flow_integration.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Integration tests for the complete flow of IncomingBox feature +""" +import pytest +from uuid import uuid4 +from twisted.trial import unittest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +from twisted.web.resource import Resource +from zope.interface import implementer + +from leap.soledad.client.incoming import IncomingBoxProcessingLoop +from leap.soledad.client.incoming import IncomingBox +from leap.soledad.server import _blobs as server_blobs +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._blobs import BlobsServerState +from leap.soledad.client import interfaces + + +@implementer(interfaces.IIncomingBoxConsumer) +class GoodConsumer(object): +    def __init__(self): +        self.name = 'GoodConsumer' +        self.processed, self.saved = [], [] + +    def process(self, item, item_id, encrypted=True): +        self.processed.append(item_id) +        return defer.succeed([item_id]) + +    def save(self, parts, item_id): +        self.saved.append(item_id) +        return defer.succeed(None) + + +class IncomingFlowIntegrationTestCase(unittest.TestCase): + +    def setUp(self): +        root = Resource() +        state = BlobsServerState('filesystem', blobs_path=self.tempdir) +        incoming_resource = IncomingResource(state) +        blobs_resource = server_blobs.BlobsResource("filesystem", self.tempdir) +        root.putChild('blobs', blobs_resource) +        root.putChild('incoming', incoming_resource) +        site = Site(root) +        self.port = reactor.listenTCP(0, site, interface='127.0.0.1') +        self.host = self.port.getHost() +        self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) +        self.blobs_uri = self.uri + 'blobs/' +        self.incoming_uri = self.uri + 'incoming' +        self.user_id = 'user-' + uuid4().hex +        self.secret = 'A' * 96 +        self.blob_manager = BlobManager(self.tempdir, self.blobs_uri, +                                        self.secret, self.secret, +                                        self.user_id) +        self.box = IncomingBox(self.blob_manager, 'MX') +        self.loop = IncomingBoxProcessingLoop(self.box) +        # FIXME: We use blob_manager client only to avoid DelayedCalls +        # Somehow treq being used here keeps a connection pool open +        self.client = self.blob_manager._client + +    def fill(self, messages): +        deferreds = [] +        for message_id, message in messages: +            uri = '%s/%s/%s' % (self.incoming_uri, self.user_id, message_id) +            deferreds.append(self.blob_manager._client.put(uri, data=message)) +        return defer.gatherResults(deferreds) + +    def tearDown(self): +        self.port.stopListening() +        self.blob_manager.close() + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_consume_a_incoming_message(self): +        yield self.fill([('msg1', 'blob')]) +        consumer = GoodConsumer() +        self.loop.add_consumer(consumer) +        yield self.loop() +        self.assertIn('msg1', consumer.processed) diff --git a/tests/server/test_incoming_resource.py b/tests/server/test_incoming_resource.py new file mode 100644 index 00000000..0d4918b9 --- /dev/null +++ b/tests/server/test_incoming_resource.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# test_incoming_resource.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Unit tests for incoming API resource +""" +from twisted.trial import unittest +from twisted.web.test.test_web import DummyRequest +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._incoming import IncomingFormatter +from leap.soledad.common.crypto import EncryptionSchemes +from io import BytesIO +from uuid import uuid4 +from mock import Mock + + +class IncomingResourceTestCase(unittest.TestCase): + +    def setUp(self): +        self.couchdb = Mock() +        self.backend_factory = Mock() +        self.backend_factory.open_database.return_value = self.couchdb +        self.resource = IncomingResource(self.backend_factory) +        self.user_uuid = uuid4().hex + +    def test_save_document(self): +        formatter = IncomingFormatter() +        doc_id, scheme = uuid4().hex, EncryptionSchemes.PUBKEY +        content = 'Incoming content' +        request = DummyRequest([self.user_uuid, doc_id]) +        request.content = BytesIO(content) +        self.resource.render_PUT(request) + +        open_database = self.backend_factory.open_database +        open_database.assert_called_once_with(self.user_uuid) +        self.couchdb.put_doc.assert_called_once() +        doc = self.couchdb.put_doc.call_args[0][0] +        self.assertEquals(doc_id, doc.doc_id) +        self.assertEquals(formatter.format(content, scheme), doc.content) + +    def test_formatter(self): +        formatter = IncomingFormatter() +        formatted = formatter.format('content', EncryptionSchemes.PUBKEY) +        self.assertEquals(formatted['_enc_scheme'], EncryptionSchemes.PUBKEY) diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py new file mode 100644 index 00000000..241bc581 --- /dev/null +++ b/tests/server/test_incoming_server.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# test_incoming_server.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Integration tests for incoming API +""" +import pytest +import json +from io import BytesIO +from uuid import uuid4 +from twisted.web.test.test_web import DummyRequest +from twisted.web.server import Site +from twisted.internet import reactor +from twisted.internet import defer +import treq + +from leap.soledad.server._incoming import IncomingResource +from leap.soledad.server._blobs import BlobsServerState +from leap.soledad.server._incoming import IncomingFormatter +from leap.soledad.common.crypto import EncryptionSchemes +from leap.soledad.common.blobs import Flags +from test_soledad.util import CouchServerStateForTests +from test_soledad.util import CouchDBTestCase + + +class IncomingOnCouchServerTestCase(CouchDBTestCase): + +    def setUp(self): +        self.port = None + +    def tearDown(self): +        if self.port: +            self.port.stopListening() + +    def prepare(self, backend): +        self.user_id = 'user-' + uuid4().hex +        if backend == 'couch': +            self.state = CouchServerStateForTests(self.couch_url) +            self.state.ensure_database(self.user_id) +        else: +            self.state = BlobsServerState(backend) +        root = IncomingResource(self.state) +        site = Site(root) +        self.port = reactor.listenTCP(0, site, interface='127.0.0.1') +        self.host = self.port.getHost() +        self.uri = 'http://%s:%s/' % (self.host.host, self.host.port) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_put_incoming_creates_a_document_using_couch(self): +        self.prepare('couch') +        user_id, doc_id = self.user_id, uuid4().hex +        content, scheme = 'Hi', EncryptionSchemes.PUBKEY +        formatter = IncomingFormatter() +        incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id) +        yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) +        db = self.state.open_database(user_id) + +        doc = db.get_doc(doc_id) +        self.assertEquals(doc.content, formatter.format(content, scheme)) + +    @defer.inlineCallbacks +    @pytest.mark.usefixtures("method_tmpdir") +    def test_put_incoming_creates_a_blob_using_filesystem(self): +        self.prepare('filesystem') +        user_id, doc_id = self.user_id, uuid4().hex +        content = 'Hi' +        formatter = IncomingFormatter() +        incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id) +        yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) + +        db = self.state.open_database(user_id) +        request = DummyRequest([user_id, doc_id]) +        yield db.read_blob(user_id, doc_id, request, 'MX') +        flags = db.get_flags(user_id, doc_id, request, 'MX') +        flags = json.loads(flags) +        expected = formatter.preamble(content, doc_id) + ' ' + content +        self.assertEquals(expected, request.written[0]) +        self.assertIn(Flags.PENDING, flags) diff --git a/tests/server/test_server.py b/tests/server/test_server.py new file mode 100644 index 00000000..25f0cc2d --- /dev/null +++ b/tests/server/test_server.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +# test_server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server-related functionality. +""" +import binascii +import os +import pytest + +from six.moves.urllib.parse import urljoin +from uuid import uuid4 + +from twisted.internet import defer + +from leap.soledad.common.couch.state import CouchServerState +from leap.soledad.common.couch import CouchDatabase +from test_soledad.u1db_tests import TestCaseWithServer +from test_soledad.util import CouchDBTestCase +from test_soledad.util import ( +    make_token_soledad_app, +    make_soledad_document_for_test, +    soledad_sync_target, +) + +from leap.soledad.client import _crypto +from leap.soledad.client import Soledad + + +@pytest.mark.needs_couch +@pytest.mark.usefixtures("method_tmpdir") +class EncryptedSyncTestCase( +        CouchDBTestCase, TestCaseWithServer): + +    """ +    Tests for encrypted sync using Soledad server backed by a couch database. +    """ + +    # increase twisted.trial's timeout because large files syncing might take +    # some time to finish. +    timeout = 500 + +    @staticmethod +    def make_app_with_state(state): +        return make_token_soledad_app(state) + +    make_document_for_test = make_soledad_document_for_test + +    sync_target = soledad_sync_target + +    def _soledad_instance(self, user=None, passphrase=u'123', +                          prefix='', +                          secrets_path='secrets.json', +                          local_db_path='soledad.u1db', +                          server_url='', +                          cert_file=None, auth_token=None): +        """ +        Instantiate Soledad. +        """ + +        # this callback ensures we save a document which is sent to the shared +        # db. +        def _put_doc_side_effect(doc): +            self._doc_put = doc + +        if not server_url: +            # attempt to find the soledad server url +            server_address = None +            server = getattr(self, 'server', None) +            if server: +                server_address = getattr(self.server, 'server_address', None) +            else: +                host = self.port.getHost() +                server_address = (host.host, host.port) +            if server_address: +                server_url = 'http://%s:%d' % (server_address) + +        return Soledad( +            user, +            passphrase, +            secrets_path=os.path.join(self.tempdir, prefix, secrets_path), +            local_db_path=os.path.join( +                self.tempdir, prefix, local_db_path), +            server_url=server_url, +            cert_file=cert_file, +            auth_token=auth_token, +            shared_db=self.get_default_shared_mock(_put_doc_side_effect)) + +    def make_app(self): +        self.request_state = CouchServerState(self.couch_url) +        return self.make_app_with_state(self.request_state) + +    def setUp(self): +        CouchDBTestCase.setUp(self) +        TestCaseWithServer.setUp(self) + +    def tearDown(self): +        CouchDBTestCase.tearDown(self) +        TestCaseWithServer.tearDown(self) + +    def _test_encrypted_sym_sync(self, passphrase=u'123', doc_size=2, +                                 number_of_docs=1): +        """ +        Test the complete syncing chain between two soledad dbs using a +        Soledad server backed by a couch database. +        """ +        self.startTwistedServer() +        user = 'user-' + uuid4().hex + +        # this will store all docs ids to avoid get_all_docs +        created_ids = [] + +        # instantiate soledad and create a document +        sol1 = self._soledad_instance( +            user=user, +            # token is verified in test_target.make_token_soledad_app +            auth_token='auth-token', +            passphrase=passphrase) + +        # instantiate another soledad using the same secret as the previous +        # one (so we can correctly verify the mac of the synced document) +        sol2 = self._soledad_instance( +            user=user, +            prefix='x', +            auth_token='auth-token', +            secrets_path=sol1.secrets_path, +            passphrase=passphrase) + +        # ensure remote db exists before syncing +        db = CouchDatabase.open_database( +            urljoin(self.couch_url, 'user-' + user), +            create=True) + +        def _db1AssertEmptyDocList(results): +            _, doclist = results +            self.assertEqual([], doclist) + +        def _db1CreateDocs(results): +            deferreds = [] +            for i in xrange(number_of_docs): +                content = binascii.hexlify(os.urandom(doc_size / 2)) +                d = sol1.create_doc({'data': content}) +                d.addCallback(created_ids.append) +                deferreds.append(d) +            return defer.DeferredList(deferreds) + +        def _db1AssertDocsSyncedToServer(results): +            self.assertEqual(number_of_docs, len(created_ids)) +            for soldoc in created_ids: +                couchdoc = db.get_doc(soldoc.doc_id) +                self.assertTrue(couchdoc) +                # assert document structure in couch server +                self.assertEqual(soldoc.doc_id, couchdoc.doc_id) +                self.assertEqual(soldoc.rev, couchdoc.rev) +                couch_content = couchdoc.content.keys() +                self.assertEqual(['raw'], couch_content) +                content = couchdoc.get_json() +                self.assertTrue(_crypto.is_symmetrically_encrypted(content)) + +        d = sol1.get_all_docs() +        d.addCallback(_db1AssertEmptyDocList) +        d.addCallback(_db1CreateDocs) +        d.addCallback(lambda _: sol1.sync()) +        d.addCallback(_db1AssertDocsSyncedToServer) + +        def _db2AssertEmptyDocList(results): +            _, doclist = results +            self.assertEqual([], doclist) + +        def _getAllDocsFromBothDbs(results): +            d1 = sol1.get_all_docs() +            d2 = sol2.get_all_docs() +            return defer.DeferredList([d1, d2]) + +        d.addCallback(lambda _: sol2.get_all_docs()) +        d.addCallback(_db2AssertEmptyDocList) +        d.addCallback(lambda _: sol2.sync()) +        d.addCallback(_getAllDocsFromBothDbs) + +        def _assertDocSyncedFromDb1ToDb2(results): +            r1, r2 = results +            _, (gen1, doclist1) = r1 +            _, (gen2, doclist2) = r2 +            self.assertEqual(number_of_docs, gen1) +            self.assertEqual(number_of_docs, gen2) +            self.assertEqual(number_of_docs, len(doclist1)) +            self.assertEqual(number_of_docs, len(doclist2)) +            self.assertEqual(doclist1[0], doclist2[0]) + +        d.addCallback(_assertDocSyncedFromDb1ToDb2) + +        def _cleanUp(results): +            db.delete_database() +            db.close() +            sol1.close() +            sol2.close() + +        d.addCallback(_cleanUp) + +        return d + +    def test_encrypted_sym_sync(self): +        return self._test_encrypted_sym_sync() + +    def test_encrypted_sym_sync_with_unicode_passphrase(self): +        """ +        Test the complete syncing chain between two soledad dbs using a +        Soledad server backed by a couch database, using an unicode +        passphrase. +        """ +        return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëíìïóòöõúùüñç') + +    def test_sync_many_small_files(self): +        """ +        Test if Soledad can sync many smallfiles. +        """ +        return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100) diff --git a/tests/server/test_session.py b/tests/server/test_session.py new file mode 100644 index 00000000..3dbd2740 --- /dev/null +++ b/tests/server/test_session.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# test_session.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server session entrypoint. +""" +from twisted.trial import unittest + +from twisted.cred import portal +from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse +from twisted.cred.credentials import IUsernamePassword +from twisted.web.resource import getChildForRequest +from twisted.web.static import Data +from twisted.web.test.requesthelper import DummyRequest +from twisted.web.test.test_httpauth import b64encode +from twisted.web.test.test_httpauth import Realm +from twisted.web._auth.wrapper import UnauthorizedResource + +from leap.soledad.server.session import SoledadSession + + +class SoledadSessionTestCase(unittest.TestCase): +    """ +    Tests adapted from for +    L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. +    """ + +    def makeRequest(self, *args, **kwargs): +        request = DummyRequest(*args, **kwargs) +        request.path = '/' +        return request + +    def setUp(self): +        self.username = b'foo bar' +        self.password = b'bar baz' +        self.avatarContent = b"contents of the avatar resource itself" +        self.childName = b"foo-child" +        self.childContent = b"contents of the foo child of the avatar" +        self.checker = InMemoryUsernamePasswordDatabaseDontUse() +        self.checker.addUser(self.username, self.password) +        self.avatar = Data(self.avatarContent, 'text/plain') +        self.avatar.putChild( +            self.childName, Data(self.childContent, 'text/plain')) +        self.avatars = {self.username: self.avatar} +        self.realm = Realm(self.avatars.get) +        self.portal = portal.Portal(self.realm, [self.checker]) +        self.wrapper = SoledadSession(self.portal) + +    def _authorizedTokenLogin(self, request): +        authorization = b64encode( +            self.username + b':' + self.password) +        request.requestHeaders.addRawHeader(b'authorization', +                                            b'Token ' + authorization) +        return getChildForRequest(self.wrapper, request) + +    def test_getChildWithDefault(self): +        request = self.makeRequest([self.childName]) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(result): +            self.assertEqual(request.responseCode, 401) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def _invalidAuthorizationTest(self, response): +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', response) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(result): +            self.assertEqual(request.responseCode, 401) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_getChildWithDefaultUnauthorizedUser(self): +        return self._invalidAuthorizationTest( +            b'Basic ' + b64encode(b'foo:bar')) + +    def test_getChildWithDefaultUnauthorizedPassword(self): +        return self._invalidAuthorizationTest( +            b'Basic ' + b64encode(self.username + b':bar')) + +    def test_getChildWithDefaultUnrecognizedScheme(self): +        return self._invalidAuthorizationTest(b'Quux foo bar baz') + +    def test_getChildWithDefaultAuthorized(self): +        request = self.makeRequest([self.childName]) +        child = self._authorizedTokenLogin(request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [self.childContent]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_renderAuthorized(self): +        # Request it exactly, not any of its children. +        request = self.makeRequest([]) +        child = self._authorizedTokenLogin(request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [self.avatarContent]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_decodeRaises(self): +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', +                                            b'Token decode should fail') +        child = getChildForRequest(self.wrapper, request) +        self.assertIsInstance(child, UnauthorizedResource) + +    def test_parseResponse(self): +        basicAuthorization = b'Basic abcdef123456' +        self.assertEqual( +            self.wrapper._parseHeader(basicAuthorization), +            None) +        tokenAuthorization = b'Token abcdef123456' +        self.assertEqual( +            self.wrapper._parseHeader(tokenAuthorization), +            b'abcdef123456') + +    def test_unexpectedDecodeError(self): + +        class UnexpectedException(Exception): +            pass + +        class BadFactory(object): +            scheme = b'bad' + +            def getChallenge(self, client): +                return {} + +            def decode(self, response, request): +                print("decode raised") +                raise UnexpectedException() + +        self.wrapper._credentialFactory = BadFactory() +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') +        child = getChildForRequest(self.wrapper, request) +        request.render(child) +        self.assertEqual(request.responseCode, 500) +        errors = self.flushLoggedErrors(UnexpectedException) +        self.assertEqual(len(errors), 1) + +    def test_unexpectedLoginError(self): +        class UnexpectedException(Exception): +            pass + +        class BrokenChecker(object): +            credentialInterfaces = (IUsernamePassword,) + +            def requestAvatarId(self, credentials): +                raise UnexpectedException() + +        self.portal.registerChecker(BrokenChecker()) +        request = self.makeRequest([self.childName]) +        child = self._authorizedTokenLogin(request) +        request.render(child) +        self.assertEqual(request.responseCode, 500) +        self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1) + +    def test_cantAccessOtherUserPathByDefault(self): +        request = self.makeRequest([]) +        # valid url_mapper path, but for another user +        request.path = '/blobs/another-user/' +        child = self._authorizedTokenLogin(request) + +        request.render(child) +        self.assertEqual(request.responseCode, 500) diff --git a/tests/server/test_shared_db.py b/tests/server/test_shared_db.py new file mode 100644 index 00000000..96af6dff --- /dev/null +++ b/tests/server/test_shared_db.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# test_shared_db.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the shared db on server side. +""" + + +import pytest + +from twisted.trial import unittest + +from leap.soledad.client.shared_db import SoledadSharedDatabase +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.l2db.errors import RevisionConflict + + +@pytest.mark.needs_couch +class SharedDbTests(unittest.TestCase): +    """ +    """ + +    URL = 'http://127.0.0.1:2424/shared' +    CREDS = {'token': {'uuid': 'an-uuid', 'token': 'an-auth-token'}} + +    @pytest.fixture(autouse=True) +    def soledad_client(self, soledad_server, soledad_dbs): +        soledad_dbs('an-uuid') +        self._db = SoledadSharedDatabase.open_database(self.URL, self.CREDS) + +    @pytest.mark.thisone +    def test_doc_update_succeeds(self): +        doc_id = 'some-random-doc' +        self.assertIsNone(self._db.get_doc(doc_id)) +        # create a document in shared db +        doc = SoledadDocument(doc_id=doc_id) +        self._db.put_doc(doc) +        # update that document +        expected = {'new': 'content'} +        doc.content = expected +        self._db.put_doc(doc) +        # ensure expected content was saved +        doc = self._db.get_doc(doc_id) +        self.assertEqual(expected, doc.content) + +    @pytest.mark.thisone +    def test_doc_update_fails_with_wrong_rev(self): +        # create a document in shared db +        doc_id = 'some-random-doc' +        self.assertIsNone(self._db.get_doc(doc_id)) +        # create a document in shared db +        doc = SoledadDocument(doc_id=doc_id) +        self._db.put_doc(doc) +        # try to update document without including revision of old version +        doc.rev = 'wrong-rev' +        self.assertRaises(RevisionConflict, self._db.put_doc, doc) diff --git a/tests/server/test_soledad b/tests/server/test_soledad new file mode 120000 index 00000000..c1a35d32 --- /dev/null +++ b/tests/server/test_soledad @@ -0,0 +1 @@ +../test_soledad
\ No newline at end of file diff --git a/tests/server/test_tac.py b/tests/server/test_tac.py new file mode 100644 index 00000000..7bb50e35 --- /dev/null +++ b/tests/server/test_tac.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# test_tac.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the localhost/public APIs using .tac file. +See docs/auth.rst +""" + + +import os +import signal +import socket +import pytest +import treq + +from pkg_resources import resource_filename +from twisted.trial import unittest +from twisted.internet import defer, reactor +from twisted.internet.protocol import ProcessProtocol +from twisted.web.client import Agent + + +TAC_FILE_PATH = resource_filename('leap.soledad.server', 'server.tac') + + +class TacServerTestCase(unittest.TestCase): + +    def test_tac_file_exists(self): +        msg = "server.tac used on this test case was expected to be at %s" +        self.assertTrue(os.path.isfile(TAC_FILE_PATH), msg % TAC_FILE_PATH) + +    @defer.inlineCallbacks +    def test_local_public_default_ports_on_server_tac(self): +        yield self._spawnServer() +        result = yield self._get('http://localhost:2525/incoming') +        fail_msg = "Localhost endpoint must require authentication!" +        self.assertEquals(401, result.code, fail_msg) + +        public_endpoint_url = 'http://%s:2424/' % self._get_public_ip() +        result = yield self._get(public_endpoint_url) +        self.assertEquals(200, result.code, "server info not accessible") + +        result = yield self._get(public_endpoint_url + 'other') +        self.assertEquals(401, result.code, "public server lacks auth!") + +        public_using_local_port_url = 'http://%s:2525/' % self._get_public_ip() +        with pytest.raises(Exception): +            yield self._get(public_using_local_port_url) + +    def _spawnServer(self): +        protocol = ProcessProtocol() +        env = os.environ.get('VIRTUAL_ENV', '/usr') +        executable = os.path.join(env, 'bin', 'twistd') +        no_pid_argument = '--pidfile=' +        args = [executable, no_pid_argument, '-noy', TAC_FILE_PATH] +        env = {'DEBUG_SERVER': 'yes'} +        t = reactor.spawnProcess(protocol, executable, args, env=env) +        self.addCleanup(os.kill, t.pid, signal.SIGKILL) +        self.addCleanup(t.loseConnection) +        return self._sleep(1)  # it takes a while to start server + +    def _sleep(self, time): +        d = defer.Deferred() +        reactor.callLater(time, d.callback, True) +        return d + +    def _get(self, *args, **kwargs): +        kwargs['agent'] = Agent(reactor) +        return treq.get(*args, **kwargs) + +    def _get_public_ip(self): +        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +        s.connect(("8.8.8.8", 80)) +        return s.getsockname()[0] diff --git a/tests/server/test_url_mapper.py b/tests/server/test_url_mapper.py new file mode 100644 index 00000000..a04e7593 --- /dev/null +++ b/tests/server/test_url_mapper.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# test_url_mapper.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server-related functionality. +""" +import pytest + +from twisted.trial import unittest +from uuid import uuid4 + +from leap.soledad.server.url_mapper import URLMapper + + +class URLMapperTestCase(unittest.TestCase): +    """ +    Test if the URLMapper behaves as expected. + +    The following table lists the authorized actions among all possible +    u1db remote actions: + +        URL path                      | Authorized actions +        -------------------------------------------------- +        /                             | GET +        /shared-db                    | GET +        /shared-db/docs               | - +        /shared-db/doc/{id}           | - +        /shared-db/sync-from/{source} | - +        /user-db                      | - +        /user-db/docs                 | - +        /user-db/doc/{id}             | - +        /user-db/sync-from/{source}   | GET, PUT, POST +    """ + +    def setUp(self): +        self._uuid = uuid4().hex +        self._urlmap = URLMapper() +        self._dbname = 'user-%s' % self._uuid + +    @pytest.mark.needs_couch +    def test_root_authorized(self): +        match = self._urlmap.match('/', 'GET') +        self.assertIsNotNone(match) + +    def test_shared_authorized(self): +        self.assertIsNotNone(self._urlmap.match('/shared', 'GET')) + +    def test_shared_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared', 'PUT')) +        self.assertIsNone(self._urlmap.match('/shared', 'DELETE')) +        self.assertIsNone(self._urlmap.match('/shared', 'POST')) + +    def test_shared_docs_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared/docs', 'GET')) +        self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT')) +        self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE')) +        self.assertIsNone(self._urlmap.match('/shared/docs', 'POST')) + +    def test_shared_doc_authorized(self): +        match = self._urlmap.match('/shared/doc/x', 'GET') +        self.assertIsNotNone(match) +        self.assertEqual('x', match.get('id')) + +        match = self._urlmap.match('/shared/doc/x', 'PUT') +        self.assertIsNotNone(match) +        self.assertEqual('x', match.get('id')) + +        match = self._urlmap.match('/shared/doc/x', 'DELETE') +        self.assertIsNotNone(match) +        self.assertEqual('x', match.get('id')) + +    def test_shared_doc_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST')) + +    def test_shared_sync_unauthorized(self): +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET')) +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT')) +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE')) +        self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST')) + +    def test_user_db_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET')) +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT')) +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE')) +        self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST')) + +    def test_user_db_docs_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET')) +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT')) +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE')) +        self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST')) + +    def test_user_db_doc_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET')) +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT')) +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE')) +        self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST')) + +    def test_user_db_sync_authorized(self): +        uuid = self._uuid +        dbname = self._dbname +        match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET') +        self.assertEqual(uuid, match.get('uuid')) +        self.assertEqual('x', match.get('source_replica_uid')) + +        match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT') +        self.assertEqual(uuid, match.get('uuid')) +        self.assertEqual('x', match.get('source_replica_uid')) + +        match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST') +        self.assertEqual(uuid, match.get('uuid')) +        self.assertEqual('x', match.get('source_replica_uid')) + +    def test_user_db_sync_unauthorized(self): +        dbname = self._dbname +        self.assertIsNone( +            self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE'))  | 
