summaryrefslogtreecommitdiff
path: root/testing/tests/server
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-09-17 12:08:25 -0300
committerdrebs <drebs@riseup.net>2017-09-17 15:50:55 -0300
commitcfff46ff9becdbe5cf48816870e625ed253ecc57 (patch)
tree8d239e4499f559d86ed17ea3632008303b25d485 /testing/tests/server
parentf29abe28bd778838626d12fcabe3980a8ce4fa8c (diff)
[refactor] move tests to root of repository
Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952
Diffstat (limited to 'testing/tests/server')
-rw-r--r--testing/tests/server/__init__.py0
-rw-r--r--testing/tests/server/test__resource.py85
-rw-r--r--testing/tests/server/test__server_info.py43
-rw-r--r--testing/tests/server/test_auth.py138
-rw-r--r--testing/tests/server/test_blobs_resource_validation.py63
-rw-r--r--testing/tests/server/test_blobs_server.py286
-rw-r--r--testing/tests/server/test_config.py70
-rw-r--r--testing/tests/server/test_incoming_flow_integration.py97
-rw-r--r--testing/tests/server/test_incoming_resource.py57
-rw-r--r--testing/tests/server/test_incoming_server.py92
-rw-r--r--testing/tests/server/test_server.py230
-rw-r--r--testing/tests/server/test_session.py195
-rw-r--r--testing/tests/server/test_shared_db.py69
-rw-r--r--testing/tests/server/test_tac.py87
-rw-r--r--testing/tests/server/test_url_mapper.py133
15 files changed, 0 insertions, 1645 deletions
diff --git a/testing/tests/server/__init__.py b/testing/tests/server/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/testing/tests/server/__init__.py
+++ /dev/null
diff --git a/testing/tests/server/test__resource.py b/testing/tests/server/test__resource.py
deleted file mode 100644
index a43ac19f..00000000
--- a/testing/tests/server/test__resource.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# test__resource.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for Soledad server main resource.
-"""
-from twisted.trial import unittest
-from twisted.web.test.test_web import DummyRequest
-from twisted.web.wsgi import WSGIResource
-from twisted.web.resource import getChildForRequest
-from twisted.internet import reactor
-
-from leap.soledad.server._resource import PublicResource
-from leap.soledad.server._resource import LocalResource
-from leap.soledad.server._server_info import ServerInfo
-from leap.soledad.server._blobs import BlobsResource
-from leap.soledad.server._incoming import IncomingResource
-from leap.soledad.server.gzip_middleware import GzipMiddleware
-
-
-_pool = reactor.getThreadPool()
-
-
-class PublicResourceTestCase(unittest.TestCase):
-
- def test_get_root(self):
- blobs_resource = None # doesn't matter
- resource = PublicResource(
- blobs_resource=blobs_resource, sync_pool=_pool)
- request = DummyRequest([''])
- child = getChildForRequest(resource, request)
- self.assertIsInstance(child, ServerInfo)
-
- def test_get_blobs_enabled(self):
- blobs_resource = BlobsResource("filesystem", '/tmp')
- resource = PublicResource(
- blobs_resource=blobs_resource, sync_pool=_pool)
- request = DummyRequest(['blobs'])
- child = getChildForRequest(resource, request)
- self.assertIsInstance(child, BlobsResource)
-
- def test_get_blobs_disabled(self):
- blobs_resource = None
- resource = PublicResource(
- blobs_resource=blobs_resource, sync_pool=_pool)
- request = DummyRequest(['blobs'])
- child = getChildForRequest(resource, request)
- # if blobs is disabled, the request should be routed to sync
- self.assertIsInstance(child, WSGIResource)
- self.assertIsInstance(child._application, GzipMiddleware)
-
- def test_get_sync(self):
- blobs_resource = None # doesn't matter
- resource = PublicResource(
- blobs_resource=blobs_resource, sync_pool=_pool)
- request = DummyRequest(['user-db', 'sync-from', 'source-id'])
- child = getChildForRequest(resource, request)
- self.assertIsInstance(child, WSGIResource)
- self.assertIsInstance(child._application, GzipMiddleware)
-
- def test_no_incoming_on_public_resource(self):
- resource = PublicResource(None, sync_pool=_pool)
- request = DummyRequest(['incoming'])
- child = getChildForRequest(resource, request)
- # WSGIResource is returned if a path is unknown
- self.assertIsInstance(child, WSGIResource)
-
- def test_get_incoming(self):
- resource = LocalResource()
- request = DummyRequest(['incoming'])
- child = getChildForRequest(resource, request)
- self.assertIsInstance(child, IncomingResource)
diff --git a/testing/tests/server/test__server_info.py b/testing/tests/server/test__server_info.py
deleted file mode 100644
index 40567ef1..00000000
--- a/testing/tests/server/test__server_info.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# -*- coding: utf-8 -*-
-# test__server_info.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Tests for Soledad server information announcement.
-"""
-import json
-
-from twisted.trial import unittest
-from twisted.web.test.test_web import DummyRequest
-
-from leap.soledad.server._server_info import ServerInfo
-
-
-class ServerInfoTestCase(unittest.TestCase):
-
- def test_blobs_enabled(self):
- resource = ServerInfo(True)
- response = resource.render(DummyRequest(['']))
- _info = json.loads(response)
- self.assertEquals(_info['blobs'], True)
- self.assertTrue(isinstance(_info['version'], basestring))
-
- def test_blobs_disabled(self):
- resource = ServerInfo(False)
- response = resource.render(DummyRequest(['']))
- _info = json.loads(response)
- self.assertEquals(_info['blobs'], False)
- self.assertTrue(isinstance(_info['version'], basestring))
diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py
deleted file mode 100644
index 78cf20ab..00000000
--- a/testing/tests/server/test_auth.py
+++ /dev/null
@@ -1,138 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_auth.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for auth pieces.
-"""
-import os
-import collections
-import pytest
-
-from contextlib import contextmanager
-
-from twisted.cred.credentials import UsernamePassword
-from twisted.cred.error import UnauthorizedLogin
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
-from twisted.trial import unittest
-from twisted.web.resource import IResource
-from twisted.web.test import test_httpauth
-
-import leap.soledad.server.auth as auth_module
-from leap.soledad.server.auth import SoledadRealm
-from leap.soledad.server.auth import CouchDBTokenChecker
-from leap.soledad.server.auth import FileTokenChecker
-from leap.soledad.server.auth import TokenCredentialFactory
-from leap.soledad.server._resource import PublicResource
-
-
-class SoledadRealmTestCase(unittest.TestCase):
-
- def test_returned_resource(self):
- # we have to pass a pool to the realm , otherwise tests will hang
- conf = {'blobs': False}
- pool = reactor.getThreadPool()
- realm = SoledadRealm(conf=conf, sync_pool=pool)
- iface, avatar, logout = realm.requestAvatar('any', None, IResource)
- self.assertIsInstance(avatar, PublicResource)
- self.assertIsNone(logout())
-
-
-class DummyServer(object):
- """
- I fake the `couchdb.client.Server` GET api and always return the token
- given on my creation.
- """
-
- def __init__(self, token):
- self._token = token
-
- def get(self, _):
- return self._token
-
-
-@contextmanager
-def dummy_server(token):
- yield collections.defaultdict(lambda: DummyServer(token))
-
-
-class CouchDBTokenCheckerTestCase(unittest.TestCase):
-
- @inlineCallbacks
- def test_good_creds(self):
- # set up a dummy server which always return a *valid* token document
- token = {'user_id': 'user', 'type': 'Token'}
- server = dummy_server(token)
- # setup the checker with the custom server
- checker = CouchDBTokenChecker()
- auth_module.couch_server = lambda url: server
- # assert the checker *can* verify the creds
- creds = UsernamePassword('user', 'pass')
- avatarId = yield checker.requestAvatarId(creds)
- self.assertEqual('user', avatarId)
-
- @inlineCallbacks
- def test_bad_creds(self):
- # set up a dummy server which always return an *invalid* token document
- token = None
- server = dummy_server(token)
- # setup the checker with the custom server
- checker = CouchDBTokenChecker()
- auth_module.couch_server = lambda url: server
- # assert the checker *cannot* verify the creds
- creds = UsernamePassword('user', '')
- with self.assertRaises(UnauthorizedLogin):
- yield checker.requestAvatarId(creds)
-
-
-class FileTokenCheckerTestCase(unittest.TestCase):
-
- @inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_good_creds(self):
- auth_file_path = os.path.join(self.tempdir, 'auth.file')
- with open(auth_file_path, 'w') as tempfile:
- tempfile.write('goodservice:goodtoken')
- # setup the checker with the auth tokens file
- conf = {'services_tokens_file': auth_file_path}
- checker = FileTokenChecker(conf)
- # assert the checker *can* verify the creds
- creds = UsernamePassword('goodservice', 'goodtoken')
- avatarId = yield checker.requestAvatarId(creds)
- self.assertEqual('goodservice', avatarId)
-
- @inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_bad_creds(self):
- auth_file_path = os.path.join(self.tempdir, 'auth.file')
- with open(auth_file_path, 'w') as tempfile:
- tempfile.write('service:token')
- # setup the checker with the auth tokens file
- conf = {'services_tokens_file': auth_file_path}
- checker = FileTokenChecker(conf)
- # assert the checker *cannot* verify the creds
- creds = UsernamePassword('service', 'wrongtoken')
- with self.assertRaises(UnauthorizedLogin):
- yield checker.requestAvatarId(creds)
-
-
-class TokenCredentialFactoryTestcase(
- test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
- unittest.TestCase):
-
- def setUp(self):
- test_httpauth.BasicAuthTestsMixin.setUp(self)
- self.credentialFactory = TokenCredentialFactory()
diff --git a/testing/tests/server/test_blobs_resource_validation.py b/testing/tests/server/test_blobs_resource_validation.py
deleted file mode 100644
index 9f6dfc2f..00000000
--- a/testing/tests/server/test_blobs_resource_validation.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_blobs_resource_validation.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for invalid user or blob_id on blobs resource
-"""
-import pytest
-from twisted.trial import unittest
-from twisted.web.test.test_web import DummyRequest
-from leap.soledad.server import _blobs as server_blobs
-
-
-class BlobServerTestCase(unittest.TestCase):
-
- @pytest.mark.usefixtures("method_tmpdir")
- def setUp(self):
- self.resource = server_blobs.BlobsResource("filesystem", self.tempdir)
-
- @pytest.mark.usefixtures("method_tmpdir")
- def test_valid_arguments(self):
- request = DummyRequest(['v4l1d-us3r', 'v4l1d-bl0b-1d'])
- self.assertTrue(self.resource._validate(request))
-
- @pytest.mark.usefixtures("method_tmpdir")
- def test_invalid_user_get(self):
- request = DummyRequest(['invalid user', 'valid-blob-id'])
- request.path = '/blobs/'
- with pytest.raises(Exception):
- self.resource.render_GET(request)
-
- @pytest.mark.usefixtures("method_tmpdir")
- def test_invalid_user_put(self):
- request = DummyRequest(['invalid user', 'valid-blob-id'])
- request.path = '/blobs/'
- with pytest.raises(Exception):
- self.resource.render_PUT(request)
-
- @pytest.mark.usefixtures("method_tmpdir")
- def test_invalid_blob_id_get(self):
- request = DummyRequest(['valid-user', 'invalid blob id'])
- request.path = '/blobs/'
- with pytest.raises(Exception):
- self.resource.render_GET(request)
-
- @pytest.mark.usefixtures("method_tmpdir")
- def test_invalid_blob_id_put(self):
- request = DummyRequest(['valid-user', 'invalid blob id'])
- request.path = '/blobs/'
- with pytest.raises(Exception):
- self.resource.render_PUT(request)
diff --git a/testing/tests/server/test_blobs_server.py b/testing/tests/server/test_blobs_server.py
deleted file mode 100644
index 9eddf108..00000000
--- a/testing/tests/server/test_blobs_server.py
+++ /dev/null
@@ -1,286 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_blobs_server.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Integration tests for blobs server
-"""
-import os
-import pytest
-from uuid import uuid4
-from io import BytesIO
-from twisted.trial import unittest
-from twisted.web.server import Site
-from twisted.internet import reactor
-from twisted.internet import defer
-from treq._utils import set_global_pool
-
-from leap.soledad.common.blobs import Flags
-from leap.soledad.server import _blobs as server_blobs
-from leap.soledad.client._db.blobs import BlobManager
-from leap.soledad.client._db.blobs import BlobAlreadyExistsError
-from leap.soledad.client._db.blobs import InvalidFlagsError
-from leap.soledad.client._db.blobs import SoledadError
-
-
-class BlobServerTestCase(unittest.TestCase):
-
- def setUp(self):
- root = server_blobs.BlobsResource("filesystem", self.tempdir)
- site = Site(root)
- self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
- self.host = self.port.getHost()
- self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
- self.secret = 'A' * 96
- set_global_pool(None)
-
- def tearDown(self):
- self.port.stopListening()
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_upload_download(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("save me")
- yield manager._encrypt_and_upload('blob_id', fd)
- blob, size = yield manager._download_and_decrypt('blob_id')
- self.assertEquals(blob.getvalue(), "save me")
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_set_get_flags(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("flag me")
- yield manager._encrypt_and_upload('blob_id', fd)
- yield manager.set_flags('blob_id', [Flags.PROCESSING])
- flags = yield manager.get_flags('blob_id')
- self.assertEquals([Flags.PROCESSING], flags)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_set_flags_raises_if_no_blob_found(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- with pytest.raises(SoledadError):
- yield manager.set_flags('missing_id', [Flags.PENDING])
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_list_filter_flag(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("flag me")
- yield manager._encrypt_and_upload('blob_id', fd)
- yield manager.set_flags('blob_id', [Flags.PROCESSING])
- blobs_list = yield manager.remote_list(filter_flag=Flags.PENDING)
- self.assertEquals([], blobs_list)
- blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING)
- self.assertEquals(['blob_id'], blobs_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_list_filter_flag_order_by_date(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- yield manager._encrypt_and_upload('blob_id1', BytesIO("x"))
- yield manager._encrypt_and_upload('blob_id2', BytesIO("x"))
- yield manager._encrypt_and_upload('blob_id3', BytesIO("x"))
- yield manager.set_flags('blob_id1', [Flags.PROCESSING])
- yield manager.set_flags('blob_id2', [Flags.PROCESSING])
- yield manager.set_flags('blob_id3', [Flags.PROCESSING])
- blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING,
- order_by='+date')
- expected_list = ['blob_id1', 'blob_id2', 'blob_id3']
- self.assertEquals(expected_list, blobs_list)
- blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING,
- order_by='-date')
- self.assertEquals(list(reversed(expected_list)), blobs_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_cant_set_invalid_flags(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("flag me")
- yield manager._encrypt_and_upload('blob_id', fd)
- with pytest.raises(InvalidFlagsError):
- yield manager.set_flags('blob_id', ['invalid'])
- flags = yield manager.get_flags('blob_id')
- self.assertEquals([], flags)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_get_empty_flags(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("flag me")
- yield manager._encrypt_and_upload('blob_id', fd)
- flags = yield manager.get_flags('blob_id')
- self.assertEquals([], flags)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_flags_ignored_by_listing(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("flag me")
- yield manager._encrypt_and_upload('blob_id', fd)
- yield manager.set_flags('blob_id', [Flags.PROCESSING])
- blobs_list = yield manager.remote_list()
- self.assertEquals(['blob_id'], blobs_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_upload_changes_remote_list(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- yield manager._encrypt_and_upload('blob_id1', BytesIO("1"))
- yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
- blobs_list = yield manager.remote_list()
- self.assertEquals(set(['blob_id1', 'blob_id2']), set(blobs_list))
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_list_orders_by_date(self):
- user_uid = uuid4().hex
- manager = BlobManager('', self.uri, self.secret,
- self.secret, user_uid)
- yield manager._encrypt_and_upload('blob_id1', BytesIO("1"))
- yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
- blobs_list = yield manager.remote_list(order_by='date')
- self.assertEquals(['blob_id1', 'blob_id2'], blobs_list)
- parts = [user_uid, 'default', 'b', 'blo', 'blob_i', 'blob_id1']
- self.__touch(self.tempdir, *parts)
- blobs_list = yield manager.remote_list(order_by='+date')
- self.assertEquals(['blob_id2', 'blob_id1'], blobs_list)
- blobs_list = yield manager.remote_list(order_by='-date')
- self.assertEquals(['blob_id1', 'blob_id2'], blobs_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_count(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- deferreds = []
- for i in range(10):
- deferreds.append(manager._encrypt_and_upload(str(i), BytesIO("1")))
- yield defer.gatherResults(deferreds)
-
- result = yield manager.count()
- self.assertEquals({"count": len(deferreds)}, result)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_list_restricted_by_namespace(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- namespace = 'incoming'
- yield manager._encrypt_and_upload('blob_id1', BytesIO("1"),
- namespace=namespace)
- yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
- blobs_list = yield manager.remote_list(namespace=namespace)
- self.assertEquals(['blob_id1'], blobs_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_list_default_doesnt_list_other_namespaces(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- namespace = 'incoming'
- yield manager._encrypt_and_upload('blob_id1', BytesIO("1"),
- namespace=namespace)
- yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
- blobs_list = yield manager.remote_list()
- self.assertEquals(['blob_id2'], blobs_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_download_from_namespace(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- namespace, blob_id, content = 'incoming', 'blob_id1', 'test'
- yield manager._encrypt_and_upload(blob_id, BytesIO(content),
- namespace=namespace)
- got_blob = yield manager._download_and_decrypt(blob_id, namespace)
- self.assertEquals(content, got_blob[0].getvalue())
-
- def __touch(self, *args):
- path = os.path.join(*args)
- with open(path, 'a'):
- os.utime(path, None)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_upload_deny_duplicates(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- fd = BytesIO("save me")
- yield manager._encrypt_and_upload('blob_id', fd)
- fd = BytesIO("save me")
- with pytest.raises(BlobAlreadyExistsError):
- yield manager._encrypt_and_upload('blob_id', fd)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_send_missing(self):
- manager = BlobManager(self.tempdir, self.uri, self.secret,
- self.secret, uuid4().hex)
- self.addCleanup(manager.close)
- blob_id = 'local_only_blob_id'
- yield manager.local.put(blob_id, BytesIO("X"), size=1)
- yield manager.send_missing()
- result = yield manager._download_and_decrypt(blob_id)
- self.assertIsNotNone(result)
- self.assertEquals(result[0].getvalue(), "X")
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_fetch_missing(self):
- manager = BlobManager(self.tempdir, self.uri, self.secret,
- self.secret, uuid4().hex)
- self.addCleanup(manager.close)
- blob_id = 'remote_only_blob_id'
- yield manager._encrypt_and_upload(blob_id, BytesIO("X"))
- yield manager.fetch_missing()
- result = yield manager.local.get(blob_id)
- self.assertIsNotNone(result)
- self.assertEquals(result.getvalue(), "X")
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_upload_then_delete_updates_list(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- yield manager._encrypt_and_upload('blob_id1', BytesIO("1"))
- yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
- yield manager._delete_from_remote('blob_id1')
- blobs_list = yield manager.remote_list()
- self.assertEquals(set(['blob_id2']), set(blobs_list))
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_upload_then_delete_updates_list_using_namespace(self):
- manager = BlobManager('', self.uri, self.secret,
- self.secret, uuid4().hex)
- namespace = 'special_archives'
- yield manager._encrypt_and_upload('blob_id1', BytesIO("1"),
- namespace=namespace)
- yield manager._encrypt_and_upload('blob_id2', BytesIO("2"),
- namespace=namespace)
- yield manager._delete_from_remote('blob_id1', namespace=namespace)
- blobs_list = yield manager.remote_list(namespace=namespace)
- self.assertEquals(set(['blob_id2']), set(blobs_list))
diff --git a/testing/tests/server/test_config.py b/testing/tests/server/test_config.py
deleted file mode 100644
index dfb09f4c..00000000
--- a/testing/tests/server/test_config.py
+++ /dev/null
@@ -1,70 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_config.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for server configuration.
-"""
-
-from twisted.trial import unittest
-from pkg_resources import resource_filename
-
-from leap.soledad.server._config import _load_config
-from leap.soledad.server._config import CONFIG_DEFAULTS
-
-
-class ConfigurationParsingTest(unittest.TestCase):
-
- def setUp(self):
- self.maxDiff = None
-
- def test_use_defaults_on_failure(self):
- config = _load_config('this file will never exist')
- expected = CONFIG_DEFAULTS
- self.assertEquals(expected, config)
-
- def test_security_values_configuration(self):
- # given
- config_path = resource_filename('test_soledad',
- 'fixture_soledad.conf')
- # when
- config = _load_config(config_path)
-
- # then
- expected = {'members': ['user1', 'user2'],
- 'members_roles': ['role1', 'role2'],
- 'admins': ['user3', 'user4'],
- 'admins_roles': ['role3', 'role3']}
- self.assertDictEqual(expected, config['database-security'])
-
- def test_server_values_configuration(self):
- # given
- config_path = resource_filename('test_soledad',
- 'fixture_soledad.conf')
- # when
- config = _load_config(config_path)
-
- # then
- expected = {'couch_url':
- 'http://soledad:passwd@localhost:5984',
- 'create_cmd':
- 'sudo -u soledad-admin /usr/bin/soledad-create-userdb',
- 'admin_netrc':
- '/etc/couchdb/couchdb-soledad-admin.netrc',
- 'batching': False,
- 'blobs': False,
- 'services_tokens_file': '/etc/soledad/services.tokens',
- 'blobs_path': '/var/lib/soledad/blobs'}
- self.assertDictEqual(expected, config['soledad-server'])
diff --git a/testing/tests/server/test_incoming_flow_integration.py b/testing/tests/server/test_incoming_flow_integration.py
deleted file mode 100644
index b492534f..00000000
--- a/testing/tests/server/test_incoming_flow_integration.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_incoming_flow_integration.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Integration tests for the complete flow of IncomingBox feature
-"""
-import pytest
-from uuid import uuid4
-from twisted.trial import unittest
-from twisted.web.server import Site
-from twisted.internet import reactor
-from twisted.internet import defer
-from twisted.web.resource import Resource
-from zope.interface import implementer
-
-from leap.soledad.client.incoming import IncomingBoxProcessingLoop
-from leap.soledad.client.incoming import IncomingBox
-from leap.soledad.server import _blobs as server_blobs
-from leap.soledad.client._db.blobs import BlobManager
-from leap.soledad.server._incoming import IncomingResource
-from leap.soledad.server._blobs import BlobsServerState
-from leap.soledad.client import interfaces
-
-
-@implementer(interfaces.IIncomingBoxConsumer)
-class GoodConsumer(object):
- def __init__(self):
- self.name = 'GoodConsumer'
- self.processed, self.saved = [], []
-
- def process(self, item, item_id, encrypted=True):
- self.processed.append(item_id)
- return defer.succeed([item_id])
-
- def save(self, parts, item_id):
- self.saved.append(item_id)
- return defer.succeed(None)
-
-
-class IncomingFlowIntegrationTestCase(unittest.TestCase):
-
- def setUp(self):
- root = Resource()
- state = BlobsServerState('filesystem', blobs_path=self.tempdir)
- incoming_resource = IncomingResource(state)
- blobs_resource = server_blobs.BlobsResource("filesystem", self.tempdir)
- root.putChild('blobs', blobs_resource)
- root.putChild('incoming', incoming_resource)
- site = Site(root)
- self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
- self.host = self.port.getHost()
- self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
- self.blobs_uri = self.uri + 'blobs/'
- self.incoming_uri = self.uri + 'incoming'
- self.user_id = 'user-' + uuid4().hex
- self.secret = 'A' * 96
- self.blob_manager = BlobManager(self.tempdir, self.blobs_uri,
- self.secret, self.secret,
- self.user_id)
- self.box = IncomingBox(self.blob_manager, 'MX')
- self.loop = IncomingBoxProcessingLoop(self.box)
- # FIXME: We use blob_manager client only to avoid DelayedCalls
- # Somehow treq being used here keeps a connection pool open
- self.client = self.blob_manager._client
-
- def fill(self, messages):
- deferreds = []
- for message_id, message in messages:
- uri = '%s/%s/%s' % (self.incoming_uri, self.user_id, message_id)
- deferreds.append(self.blob_manager._client.put(uri, data=message))
- return defer.gatherResults(deferreds)
-
- def tearDown(self):
- self.port.stopListening()
- self.blob_manager.close()
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_consume_a_incoming_message(self):
- yield self.fill([('msg1', 'blob')])
- consumer = GoodConsumer()
- self.loop.add_consumer(consumer)
- yield self.loop()
- self.assertIn('msg1', consumer.processed)
diff --git a/testing/tests/server/test_incoming_resource.py b/testing/tests/server/test_incoming_resource.py
deleted file mode 100644
index 0d4918b9..00000000
--- a/testing/tests/server/test_incoming_resource.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_incoming_resource.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Unit tests for incoming API resource
-"""
-from twisted.trial import unittest
-from twisted.web.test.test_web import DummyRequest
-from leap.soledad.server._incoming import IncomingResource
-from leap.soledad.server._incoming import IncomingFormatter
-from leap.soledad.common.crypto import EncryptionSchemes
-from io import BytesIO
-from uuid import uuid4
-from mock import Mock
-
-
-class IncomingResourceTestCase(unittest.TestCase):
-
- def setUp(self):
- self.couchdb = Mock()
- self.backend_factory = Mock()
- self.backend_factory.open_database.return_value = self.couchdb
- self.resource = IncomingResource(self.backend_factory)
- self.user_uuid = uuid4().hex
-
- def test_save_document(self):
- formatter = IncomingFormatter()
- doc_id, scheme = uuid4().hex, EncryptionSchemes.PUBKEY
- content = 'Incoming content'
- request = DummyRequest([self.user_uuid, doc_id])
- request.content = BytesIO(content)
- self.resource.render_PUT(request)
-
- open_database = self.backend_factory.open_database
- open_database.assert_called_once_with(self.user_uuid)
- self.couchdb.put_doc.assert_called_once()
- doc = self.couchdb.put_doc.call_args[0][0]
- self.assertEquals(doc_id, doc.doc_id)
- self.assertEquals(formatter.format(content, scheme), doc.content)
-
- def test_formatter(self):
- formatter = IncomingFormatter()
- formatted = formatter.format('content', EncryptionSchemes.PUBKEY)
- self.assertEquals(formatted['_enc_scheme'], EncryptionSchemes.PUBKEY)
diff --git a/testing/tests/server/test_incoming_server.py b/testing/tests/server/test_incoming_server.py
deleted file mode 100644
index 241bc581..00000000
--- a/testing/tests/server/test_incoming_server.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_incoming_server.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Integration tests for incoming API
-"""
-import pytest
-import json
-from io import BytesIO
-from uuid import uuid4
-from twisted.web.test.test_web import DummyRequest
-from twisted.web.server import Site
-from twisted.internet import reactor
-from twisted.internet import defer
-import treq
-
-from leap.soledad.server._incoming import IncomingResource
-from leap.soledad.server._blobs import BlobsServerState
-from leap.soledad.server._incoming import IncomingFormatter
-from leap.soledad.common.crypto import EncryptionSchemes
-from leap.soledad.common.blobs import Flags
-from test_soledad.util import CouchServerStateForTests
-from test_soledad.util import CouchDBTestCase
-
-
-class IncomingOnCouchServerTestCase(CouchDBTestCase):
-
- def setUp(self):
- self.port = None
-
- def tearDown(self):
- if self.port:
- self.port.stopListening()
-
- def prepare(self, backend):
- self.user_id = 'user-' + uuid4().hex
- if backend == 'couch':
- self.state = CouchServerStateForTests(self.couch_url)
- self.state.ensure_database(self.user_id)
- else:
- self.state = BlobsServerState(backend)
- root = IncomingResource(self.state)
- site = Site(root)
- self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
- self.host = self.port.getHost()
- self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_put_incoming_creates_a_document_using_couch(self):
- self.prepare('couch')
- user_id, doc_id = self.user_id, uuid4().hex
- content, scheme = 'Hi', EncryptionSchemes.PUBKEY
- formatter = IncomingFormatter()
- incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
- yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
- db = self.state.open_database(user_id)
-
- doc = db.get_doc(doc_id)
- self.assertEquals(doc.content, formatter.format(content, scheme))
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_put_incoming_creates_a_blob_using_filesystem(self):
- self.prepare('filesystem')
- user_id, doc_id = self.user_id, uuid4().hex
- content = 'Hi'
- formatter = IncomingFormatter()
- incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
- yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
-
- db = self.state.open_database(user_id)
- request = DummyRequest([user_id, doc_id])
- yield db.read_blob(user_id, doc_id, request, 'MX')
- flags = db.get_flags(user_id, doc_id, request, 'MX')
- flags = json.loads(flags)
- expected = formatter.preamble(content, doc_id) + ' ' + content
- self.assertEquals(expected, request.written[0])
- self.assertIn(Flags.PENDING, flags)
diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py
deleted file mode 100644
index 25f0cc2d..00000000
--- a/testing/tests/server/test_server.py
+++ /dev/null
@@ -1,230 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_server.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for server-related functionality.
-"""
-import binascii
-import os
-import pytest
-
-from six.moves.urllib.parse import urljoin
-from uuid import uuid4
-
-from twisted.internet import defer
-
-from leap.soledad.common.couch.state import CouchServerState
-from leap.soledad.common.couch import CouchDatabase
-from test_soledad.u1db_tests import TestCaseWithServer
-from test_soledad.util import CouchDBTestCase
-from test_soledad.util import (
- make_token_soledad_app,
- make_soledad_document_for_test,
- soledad_sync_target,
-)
-
-from leap.soledad.client import _crypto
-from leap.soledad.client import Soledad
-
-
-@pytest.mark.needs_couch
-@pytest.mark.usefixtures("method_tmpdir")
-class EncryptedSyncTestCase(
- CouchDBTestCase, TestCaseWithServer):
-
- """
- Tests for encrypted sync using Soledad server backed by a couch database.
- """
-
- # increase twisted.trial's timeout because large files syncing might take
- # some time to finish.
- timeout = 500
-
- @staticmethod
- def make_app_with_state(state):
- return make_token_soledad_app(state)
-
- make_document_for_test = make_soledad_document_for_test
-
- sync_target = soledad_sync_target
-
- def _soledad_instance(self, user=None, passphrase=u'123',
- prefix='',
- secrets_path='secrets.json',
- local_db_path='soledad.u1db',
- server_url='',
- cert_file=None, auth_token=None):
- """
- Instantiate Soledad.
- """
-
- # this callback ensures we save a document which is sent to the shared
- # db.
- def _put_doc_side_effect(doc):
- self._doc_put = doc
-
- if not server_url:
- # attempt to find the soledad server url
- server_address = None
- server = getattr(self, 'server', None)
- if server:
- server_address = getattr(self.server, 'server_address', None)
- else:
- host = self.port.getHost()
- server_address = (host.host, host.port)
- if server_address:
- server_url = 'http://%s:%d' % (server_address)
-
- return Soledad(
- user,
- passphrase,
- secrets_path=os.path.join(self.tempdir, prefix, secrets_path),
- local_db_path=os.path.join(
- self.tempdir, prefix, local_db_path),
- server_url=server_url,
- cert_file=cert_file,
- auth_token=auth_token,
- shared_db=self.get_default_shared_mock(_put_doc_side_effect))
-
- def make_app(self):
- self.request_state = CouchServerState(self.couch_url)
- return self.make_app_with_state(self.request_state)
-
- def setUp(self):
- CouchDBTestCase.setUp(self)
- TestCaseWithServer.setUp(self)
-
- def tearDown(self):
- CouchDBTestCase.tearDown(self)
- TestCaseWithServer.tearDown(self)
-
- def _test_encrypted_sym_sync(self, passphrase=u'123', doc_size=2,
- number_of_docs=1):
- """
- Test the complete syncing chain between two soledad dbs using a
- Soledad server backed by a couch database.
- """
- self.startTwistedServer()
- user = 'user-' + uuid4().hex
-
- # this will store all docs ids to avoid get_all_docs
- created_ids = []
-
- # instantiate soledad and create a document
- sol1 = self._soledad_instance(
- user=user,
- # token is verified in test_target.make_token_soledad_app
- auth_token='auth-token',
- passphrase=passphrase)
-
- # instantiate another soledad using the same secret as the previous
- # one (so we can correctly verify the mac of the synced document)
- sol2 = self._soledad_instance(
- user=user,
- prefix='x',
- auth_token='auth-token',
- secrets_path=sol1.secrets_path,
- passphrase=passphrase)
-
- # ensure remote db exists before syncing
- db = CouchDatabase.open_database(
- urljoin(self.couch_url, 'user-' + user),
- create=True)
-
- def _db1AssertEmptyDocList(results):
- _, doclist = results
- self.assertEqual([], doclist)
-
- def _db1CreateDocs(results):
- deferreds = []
- for i in xrange(number_of_docs):
- content = binascii.hexlify(os.urandom(doc_size / 2))
- d = sol1.create_doc({'data': content})
- d.addCallback(created_ids.append)
- deferreds.append(d)
- return defer.DeferredList(deferreds)
-
- def _db1AssertDocsSyncedToServer(results):
- self.assertEqual(number_of_docs, len(created_ids))
- for soldoc in created_ids:
- couchdoc = db.get_doc(soldoc.doc_id)
- self.assertTrue(couchdoc)
- # assert document structure in couch server
- self.assertEqual(soldoc.doc_id, couchdoc.doc_id)
- self.assertEqual(soldoc.rev, couchdoc.rev)
- couch_content = couchdoc.content.keys()
- self.assertEqual(['raw'], couch_content)
- content = couchdoc.get_json()
- self.assertTrue(_crypto.is_symmetrically_encrypted(content))
-
- d = sol1.get_all_docs()
- d.addCallback(_db1AssertEmptyDocList)
- d.addCallback(_db1CreateDocs)
- d.addCallback(lambda _: sol1.sync())
- d.addCallback(_db1AssertDocsSyncedToServer)
-
- def _db2AssertEmptyDocList(results):
- _, doclist = results
- self.assertEqual([], doclist)
-
- def _getAllDocsFromBothDbs(results):
- d1 = sol1.get_all_docs()
- d2 = sol2.get_all_docs()
- return defer.DeferredList([d1, d2])
-
- d.addCallback(lambda _: sol2.get_all_docs())
- d.addCallback(_db2AssertEmptyDocList)
- d.addCallback(lambda _: sol2.sync())
- d.addCallback(_getAllDocsFromBothDbs)
-
- def _assertDocSyncedFromDb1ToDb2(results):
- r1, r2 = results
- _, (gen1, doclist1) = r1
- _, (gen2, doclist2) = r2
- self.assertEqual(number_of_docs, gen1)
- self.assertEqual(number_of_docs, gen2)
- self.assertEqual(number_of_docs, len(doclist1))
- self.assertEqual(number_of_docs, len(doclist2))
- self.assertEqual(doclist1[0], doclist2[0])
-
- d.addCallback(_assertDocSyncedFromDb1ToDb2)
-
- def _cleanUp(results):
- db.delete_database()
- db.close()
- sol1.close()
- sol2.close()
-
- d.addCallback(_cleanUp)
-
- return d
-
- def test_encrypted_sym_sync(self):
- return self._test_encrypted_sym_sync()
-
- def test_encrypted_sym_sync_with_unicode_passphrase(self):
- """
- Test the complete syncing chain between two soledad dbs using a
- Soledad server backed by a couch database, using an unicode
- passphrase.
- """
- return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëíìïóòöõúùüñç')
-
- def test_sync_many_small_files(self):
- """
- Test if Soledad can sync many smallfiles.
- """
- return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100)
diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py
deleted file mode 100644
index 3dbd2740..00000000
--- a/testing/tests/server/test_session.py
+++ /dev/null
@@ -1,195 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_session.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for server session entrypoint.
-"""
-from twisted.trial import unittest
-
-from twisted.cred import portal
-from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
-from twisted.cred.credentials import IUsernamePassword
-from twisted.web.resource import getChildForRequest
-from twisted.web.static import Data
-from twisted.web.test.requesthelper import DummyRequest
-from twisted.web.test.test_httpauth import b64encode
-from twisted.web.test.test_httpauth import Realm
-from twisted.web._auth.wrapper import UnauthorizedResource
-
-from leap.soledad.server.session import SoledadSession
-
-
-class SoledadSessionTestCase(unittest.TestCase):
- """
- Tests adapted from for
- L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}.
- """
-
- def makeRequest(self, *args, **kwargs):
- request = DummyRequest(*args, **kwargs)
- request.path = '/'
- return request
-
- def setUp(self):
- self.username = b'foo bar'
- self.password = b'bar baz'
- self.avatarContent = b"contents of the avatar resource itself"
- self.childName = b"foo-child"
- self.childContent = b"contents of the foo child of the avatar"
- self.checker = InMemoryUsernamePasswordDatabaseDontUse()
- self.checker.addUser(self.username, self.password)
- self.avatar = Data(self.avatarContent, 'text/plain')
- self.avatar.putChild(
- self.childName, Data(self.childContent, 'text/plain'))
- self.avatars = {self.username: self.avatar}
- self.realm = Realm(self.avatars.get)
- self.portal = portal.Portal(self.realm, [self.checker])
- self.wrapper = SoledadSession(self.portal)
-
- def _authorizedTokenLogin(self, request):
- authorization = b64encode(
- self.username + b':' + self.password)
- request.requestHeaders.addRawHeader(b'authorization',
- b'Token ' + authorization)
- return getChildForRequest(self.wrapper, request)
-
- def test_getChildWithDefault(self):
- request = self.makeRequest([self.childName])
- child = getChildForRequest(self.wrapper, request)
- d = request.notifyFinish()
-
- def cbFinished(result):
- self.assertEqual(request.responseCode, 401)
-
- d.addCallback(cbFinished)
- request.render(child)
- return d
-
- def _invalidAuthorizationTest(self, response):
- request = self.makeRequest([self.childName])
- request.requestHeaders.addRawHeader(b'authorization', response)
- child = getChildForRequest(self.wrapper, request)
- d = request.notifyFinish()
-
- def cbFinished(result):
- self.assertEqual(request.responseCode, 401)
-
- d.addCallback(cbFinished)
- request.render(child)
- return d
-
- def test_getChildWithDefaultUnauthorizedUser(self):
- return self._invalidAuthorizationTest(
- b'Basic ' + b64encode(b'foo:bar'))
-
- def test_getChildWithDefaultUnauthorizedPassword(self):
- return self._invalidAuthorizationTest(
- b'Basic ' + b64encode(self.username + b':bar'))
-
- def test_getChildWithDefaultUnrecognizedScheme(self):
- return self._invalidAuthorizationTest(b'Quux foo bar baz')
-
- def test_getChildWithDefaultAuthorized(self):
- request = self.makeRequest([self.childName])
- child = self._authorizedTokenLogin(request)
- d = request.notifyFinish()
-
- def cbFinished(ignored):
- self.assertEqual(request.written, [self.childContent])
-
- d.addCallback(cbFinished)
- request.render(child)
- return d
-
- def test_renderAuthorized(self):
- # Request it exactly, not any of its children.
- request = self.makeRequest([])
- child = self._authorizedTokenLogin(request)
- d = request.notifyFinish()
-
- def cbFinished(ignored):
- self.assertEqual(request.written, [self.avatarContent])
-
- d.addCallback(cbFinished)
- request.render(child)
- return d
-
- def test_decodeRaises(self):
- request = self.makeRequest([self.childName])
- request.requestHeaders.addRawHeader(b'authorization',
- b'Token decode should fail')
- child = getChildForRequest(self.wrapper, request)
- self.assertIsInstance(child, UnauthorizedResource)
-
- def test_parseResponse(self):
- basicAuthorization = b'Basic abcdef123456'
- self.assertEqual(
- self.wrapper._parseHeader(basicAuthorization),
- None)
- tokenAuthorization = b'Token abcdef123456'
- self.assertEqual(
- self.wrapper._parseHeader(tokenAuthorization),
- b'abcdef123456')
-
- def test_unexpectedDecodeError(self):
-
- class UnexpectedException(Exception):
- pass
-
- class BadFactory(object):
- scheme = b'bad'
-
- def getChallenge(self, client):
- return {}
-
- def decode(self, response, request):
- print("decode raised")
- raise UnexpectedException()
-
- self.wrapper._credentialFactory = BadFactory()
- request = self.makeRequest([self.childName])
- request.requestHeaders.addRawHeader(b'authorization', b'Bad abc')
- child = getChildForRequest(self.wrapper, request)
- request.render(child)
- self.assertEqual(request.responseCode, 500)
- errors = self.flushLoggedErrors(UnexpectedException)
- self.assertEqual(len(errors), 1)
-
- def test_unexpectedLoginError(self):
- class UnexpectedException(Exception):
- pass
-
- class BrokenChecker(object):
- credentialInterfaces = (IUsernamePassword,)
-
- def requestAvatarId(self, credentials):
- raise UnexpectedException()
-
- self.portal.registerChecker(BrokenChecker())
- request = self.makeRequest([self.childName])
- child = self._authorizedTokenLogin(request)
- request.render(child)
- self.assertEqual(request.responseCode, 500)
- self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
-
- def test_cantAccessOtherUserPathByDefault(self):
- request = self.makeRequest([])
- # valid url_mapper path, but for another user
- request.path = '/blobs/another-user/'
- child = self._authorizedTokenLogin(request)
-
- request.render(child)
- self.assertEqual(request.responseCode, 500)
diff --git a/testing/tests/server/test_shared_db.py b/testing/tests/server/test_shared_db.py
deleted file mode 100644
index 96af6dff..00000000
--- a/testing/tests/server/test_shared_db.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_shared_db.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for the shared db on server side.
-"""
-
-
-import pytest
-
-from twisted.trial import unittest
-
-from leap.soledad.client.shared_db import SoledadSharedDatabase
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.l2db.errors import RevisionConflict
-
-
-@pytest.mark.needs_couch
-class SharedDbTests(unittest.TestCase):
- """
- """
-
- URL = 'http://127.0.0.1:2424/shared'
- CREDS = {'token': {'uuid': 'an-uuid', 'token': 'an-auth-token'}}
-
- @pytest.fixture(autouse=True)
- def soledad_client(self, soledad_server, soledad_dbs):
- soledad_dbs('an-uuid')
- self._db = SoledadSharedDatabase.open_database(self.URL, self.CREDS)
-
- @pytest.mark.thisone
- def test_doc_update_succeeds(self):
- doc_id = 'some-random-doc'
- self.assertIsNone(self._db.get_doc(doc_id))
- # create a document in shared db
- doc = SoledadDocument(doc_id=doc_id)
- self._db.put_doc(doc)
- # update that document
- expected = {'new': 'content'}
- doc.content = expected
- self._db.put_doc(doc)
- # ensure expected content was saved
- doc = self._db.get_doc(doc_id)
- self.assertEqual(expected, doc.content)
-
- @pytest.mark.thisone
- def test_doc_update_fails_with_wrong_rev(self):
- # create a document in shared db
- doc_id = 'some-random-doc'
- self.assertIsNone(self._db.get_doc(doc_id))
- # create a document in shared db
- doc = SoledadDocument(doc_id=doc_id)
- self._db.put_doc(doc)
- # try to update document without including revision of old version
- doc.rev = 'wrong-rev'
- self.assertRaises(RevisionConflict, self._db.put_doc, doc)
diff --git a/testing/tests/server/test_tac.py b/testing/tests/server/test_tac.py
deleted file mode 100644
index 7bb50e35..00000000
--- a/testing/tests/server/test_tac.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_tac.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for the localhost/public APIs using .tac file.
-See docs/auth.rst
-"""
-
-
-import os
-import signal
-import socket
-import pytest
-import treq
-
-from pkg_resources import resource_filename
-from twisted.trial import unittest
-from twisted.internet import defer, reactor
-from twisted.internet.protocol import ProcessProtocol
-from twisted.web.client import Agent
-
-
-TAC_FILE_PATH = resource_filename('leap.soledad.server', 'server.tac')
-
-
-class TacServerTestCase(unittest.TestCase):
-
- def test_tac_file_exists(self):
- msg = "server.tac used on this test case was expected to be at %s"
- self.assertTrue(os.path.isfile(TAC_FILE_PATH), msg % TAC_FILE_PATH)
-
- @defer.inlineCallbacks
- def test_local_public_default_ports_on_server_tac(self):
- yield self._spawnServer()
- result = yield self._get('http://localhost:2525/incoming')
- fail_msg = "Localhost endpoint must require authentication!"
- self.assertEquals(401, result.code, fail_msg)
-
- public_endpoint_url = 'http://%s:2424/' % self._get_public_ip()
- result = yield self._get(public_endpoint_url)
- self.assertEquals(200, result.code, "server info not accessible")
-
- result = yield self._get(public_endpoint_url + 'other')
- self.assertEquals(401, result.code, "public server lacks auth!")
-
- public_using_local_port_url = 'http://%s:2525/' % self._get_public_ip()
- with pytest.raises(Exception):
- yield self._get(public_using_local_port_url)
-
- def _spawnServer(self):
- protocol = ProcessProtocol()
- env = os.environ.get('VIRTUAL_ENV', '/usr')
- executable = os.path.join(env, 'bin', 'twistd')
- no_pid_argument = '--pidfile='
- args = [executable, no_pid_argument, '-noy', TAC_FILE_PATH]
- env = {'DEBUG_SERVER': 'yes'}
- t = reactor.spawnProcess(protocol, executable, args, env=env)
- self.addCleanup(os.kill, t.pid, signal.SIGKILL)
- self.addCleanup(t.loseConnection)
- return self._sleep(1) # it takes a while to start server
-
- def _sleep(self, time):
- d = defer.Deferred()
- reactor.callLater(time, d.callback, True)
- return d
-
- def _get(self, *args, **kwargs):
- kwargs['agent'] = Agent(reactor)
- return treq.get(*args, **kwargs)
-
- def _get_public_ip(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- return s.getsockname()[0]
diff --git a/testing/tests/server/test_url_mapper.py b/testing/tests/server/test_url_mapper.py
deleted file mode 100644
index a04e7593..00000000
--- a/testing/tests/server/test_url_mapper.py
+++ /dev/null
@@ -1,133 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_url_mapper.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for server-related functionality.
-"""
-import pytest
-
-from twisted.trial import unittest
-from uuid import uuid4
-
-from leap.soledad.server.url_mapper import URLMapper
-
-
-class URLMapperTestCase(unittest.TestCase):
- """
- Test if the URLMapper behaves as expected.
-
- The following table lists the authorized actions among all possible
- u1db remote actions:
-
- URL path | Authorized actions
- --------------------------------------------------
- / | GET
- /shared-db | GET
- /shared-db/docs | -
- /shared-db/doc/{id} | -
- /shared-db/sync-from/{source} | -
- /user-db | -
- /user-db/docs | -
- /user-db/doc/{id} | -
- /user-db/sync-from/{source} | GET, PUT, POST
- """
-
- def setUp(self):
- self._uuid = uuid4().hex
- self._urlmap = URLMapper()
- self._dbname = 'user-%s' % self._uuid
-
- @pytest.mark.needs_couch
- def test_root_authorized(self):
- match = self._urlmap.match('/', 'GET')
- self.assertIsNotNone(match)
-
- def test_shared_authorized(self):
- self.assertIsNotNone(self._urlmap.match('/shared', 'GET'))
-
- def test_shared_unauthorized(self):
- self.assertIsNone(self._urlmap.match('/shared', 'PUT'))
- self.assertIsNone(self._urlmap.match('/shared', 'DELETE'))
- self.assertIsNone(self._urlmap.match('/shared', 'POST'))
-
- def test_shared_docs_unauthorized(self):
- self.assertIsNone(self._urlmap.match('/shared/docs', 'GET'))
- self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT'))
- self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE'))
- self.assertIsNone(self._urlmap.match('/shared/docs', 'POST'))
-
- def test_shared_doc_authorized(self):
- match = self._urlmap.match('/shared/doc/x', 'GET')
- self.assertIsNotNone(match)
- self.assertEqual('x', match.get('id'))
-
- match = self._urlmap.match('/shared/doc/x', 'PUT')
- self.assertIsNotNone(match)
- self.assertEqual('x', match.get('id'))
-
- match = self._urlmap.match('/shared/doc/x', 'DELETE')
- self.assertIsNotNone(match)
- self.assertEqual('x', match.get('id'))
-
- def test_shared_doc_unauthorized(self):
- self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST'))
-
- def test_shared_sync_unauthorized(self):
- self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET'))
- self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT'))
- self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE'))
- self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST'))
-
- def test_user_db_unauthorized(self):
- dbname = self._dbname
- self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET'))
- self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT'))
- self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE'))
- self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST'))
-
- def test_user_db_docs_unauthorized(self):
- dbname = self._dbname
- self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET'))
- self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT'))
- self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE'))
- self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST'))
-
- def test_user_db_doc_unauthorized(self):
- dbname = self._dbname
- self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET'))
- self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT'))
- self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE'))
- self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST'))
-
- def test_user_db_sync_authorized(self):
- uuid = self._uuid
- dbname = self._dbname
- match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET')
- self.assertEqual(uuid, match.get('uuid'))
- self.assertEqual('x', match.get('source_replica_uid'))
-
- match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT')
- self.assertEqual(uuid, match.get('uuid'))
- self.assertEqual('x', match.get('source_replica_uid'))
-
- match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST')
- self.assertEqual(uuid, match.get('uuid'))
- self.assertEqual('x', match.get('source_replica_uid'))
-
- def test_user_db_sync_unauthorized(self):
- dbname = self._dbname
- self.assertIsNone(
- self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE'))