summaryrefslogtreecommitdiff
path: root/tests/server
diff options
context:
space:
mode:
authordrebs <drebs@riseup.net>2017-09-17 12:08:25 -0300
committerdrebs <drebs@riseup.net>2017-09-17 15:50:55 -0300
commitcfff46ff9becdbe5cf48816870e625ed253ecc57 (patch)
tree8d239e4499f559d86ed17ea3632008303b25d485 /tests/server
parentf29abe28bd778838626d12fcabe3980a8ce4fa8c (diff)
[refactor] move tests to root of repository
Tests entrypoint was in a testing/ subfolder in the root of the repository. This was made mainly because we had some common files for tests and we didn't want to ship them (files in testing/test_soledad, which is itself a python package. This sometimes causes errors when loading tests (it seems setuptools is confused with having one python package in a subdirectory of another). This commit moves the tests entrypoint to the root of the repository. Closes: #8952
Diffstat (limited to 'tests/server')
-rw-r--r--tests/server/__init__.py0
-rw-r--r--tests/server/test__resource.py85
-rw-r--r--tests/server/test__server_info.py43
-rw-r--r--tests/server/test_auth.py138
-rw-r--r--tests/server/test_blobs_resource_validation.py63
-rw-r--r--tests/server/test_blobs_server.py286
-rw-r--r--tests/server/test_config.py70
-rw-r--r--tests/server/test_incoming_flow_integration.py97
-rw-r--r--tests/server/test_incoming_resource.py57
-rw-r--r--tests/server/test_incoming_server.py92
-rw-r--r--tests/server/test_server.py230
-rw-r--r--tests/server/test_session.py195
-rw-r--r--tests/server/test_shared_db.py69
l---------tests/server/test_soledad1
-rw-r--r--tests/server/test_tac.py87
-rw-r--r--tests/server/test_url_mapper.py133
16 files changed, 1646 insertions, 0 deletions
diff --git a/tests/server/__init__.py b/tests/server/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/server/__init__.py
diff --git a/tests/server/test__resource.py b/tests/server/test__resource.py
new file mode 100644
index 00000000..a43ac19f
--- /dev/null
+++ b/tests/server/test__resource.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+# test__resource.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for Soledad server main resource.
+"""
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+from twisted.web.wsgi import WSGIResource
+from twisted.web.resource import getChildForRequest
+from twisted.internet import reactor
+
+from leap.soledad.server._resource import PublicResource
+from leap.soledad.server._resource import LocalResource
+from leap.soledad.server._server_info import ServerInfo
+from leap.soledad.server._blobs import BlobsResource
+from leap.soledad.server._incoming import IncomingResource
+from leap.soledad.server.gzip_middleware import GzipMiddleware
+
+
+_pool = reactor.getThreadPool()
+
+
+class PublicResourceTestCase(unittest.TestCase):
+
+ def test_get_root(self):
+ blobs_resource = None # doesn't matter
+ resource = PublicResource(
+ blobs_resource=blobs_resource, sync_pool=_pool)
+ request = DummyRequest([''])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, ServerInfo)
+
+ def test_get_blobs_enabled(self):
+ blobs_resource = BlobsResource("filesystem", '/tmp')
+ resource = PublicResource(
+ blobs_resource=blobs_resource, sync_pool=_pool)
+ request = DummyRequest(['blobs'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, BlobsResource)
+
+ def test_get_blobs_disabled(self):
+ blobs_resource = None
+ resource = PublicResource(
+ blobs_resource=blobs_resource, sync_pool=_pool)
+ request = DummyRequest(['blobs'])
+ child = getChildForRequest(resource, request)
+ # if blobs is disabled, the request should be routed to sync
+ self.assertIsInstance(child, WSGIResource)
+ self.assertIsInstance(child._application, GzipMiddleware)
+
+ def test_get_sync(self):
+ blobs_resource = None # doesn't matter
+ resource = PublicResource(
+ blobs_resource=blobs_resource, sync_pool=_pool)
+ request = DummyRequest(['user-db', 'sync-from', 'source-id'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, WSGIResource)
+ self.assertIsInstance(child._application, GzipMiddleware)
+
+ def test_no_incoming_on_public_resource(self):
+ resource = PublicResource(None, sync_pool=_pool)
+ request = DummyRequest(['incoming'])
+ child = getChildForRequest(resource, request)
+ # WSGIResource is returned if a path is unknown
+ self.assertIsInstance(child, WSGIResource)
+
+ def test_get_incoming(self):
+ resource = LocalResource()
+ request = DummyRequest(['incoming'])
+ child = getChildForRequest(resource, request)
+ self.assertIsInstance(child, IncomingResource)
diff --git a/tests/server/test__server_info.py b/tests/server/test__server_info.py
new file mode 100644
index 00000000..40567ef1
--- /dev/null
+++ b/tests/server/test__server_info.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# test__server_info.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for Soledad server information announcement.
+"""
+import json
+
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+
+from leap.soledad.server._server_info import ServerInfo
+
+
+class ServerInfoTestCase(unittest.TestCase):
+
+ def test_blobs_enabled(self):
+ resource = ServerInfo(True)
+ response = resource.render(DummyRequest(['']))
+ _info = json.loads(response)
+ self.assertEquals(_info['blobs'], True)
+ self.assertTrue(isinstance(_info['version'], basestring))
+
+ def test_blobs_disabled(self):
+ resource = ServerInfo(False)
+ response = resource.render(DummyRequest(['']))
+ _info = json.loads(response)
+ self.assertEquals(_info['blobs'], False)
+ self.assertTrue(isinstance(_info['version'], basestring))
diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py
new file mode 100644
index 00000000..78cf20ab
--- /dev/null
+++ b/tests/server/test_auth.py
@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+# test_auth.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for auth pieces.
+"""
+import os
+import collections
+import pytest
+
+from contextlib import contextmanager
+
+from twisted.cred.credentials import UsernamePassword
+from twisted.cred.error import UnauthorizedLogin
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial import unittest
+from twisted.web.resource import IResource
+from twisted.web.test import test_httpauth
+
+import leap.soledad.server.auth as auth_module
+from leap.soledad.server.auth import SoledadRealm
+from leap.soledad.server.auth import CouchDBTokenChecker
+from leap.soledad.server.auth import FileTokenChecker
+from leap.soledad.server.auth import TokenCredentialFactory
+from leap.soledad.server._resource import PublicResource
+
+
+class SoledadRealmTestCase(unittest.TestCase):
+
+ def test_returned_resource(self):
+ # we have to pass a pool to the realm , otherwise tests will hang
+ conf = {'blobs': False}
+ pool = reactor.getThreadPool()
+ realm = SoledadRealm(conf=conf, sync_pool=pool)
+ iface, avatar, logout = realm.requestAvatar('any', None, IResource)
+ self.assertIsInstance(avatar, PublicResource)
+ self.assertIsNone(logout())
+
+
+class DummyServer(object):
+ """
+ I fake the `couchdb.client.Server` GET api and always return the token
+ given on my creation.
+ """
+
+ def __init__(self, token):
+ self._token = token
+
+ def get(self, _):
+ return self._token
+
+
+@contextmanager
+def dummy_server(token):
+ yield collections.defaultdict(lambda: DummyServer(token))
+
+
+class CouchDBTokenCheckerTestCase(unittest.TestCase):
+
+ @inlineCallbacks
+ def test_good_creds(self):
+ # set up a dummy server which always return a *valid* token document
+ token = {'user_id': 'user', 'type': 'Token'}
+ server = dummy_server(token)
+ # setup the checker with the custom server
+ checker = CouchDBTokenChecker()
+ auth_module.couch_server = lambda url: server
+ # assert the checker *can* verify the creds
+ creds = UsernamePassword('user', 'pass')
+ avatarId = yield checker.requestAvatarId(creds)
+ self.assertEqual('user', avatarId)
+
+ @inlineCallbacks
+ def test_bad_creds(self):
+ # set up a dummy server which always return an *invalid* token document
+ token = None
+ server = dummy_server(token)
+ # setup the checker with the custom server
+ checker = CouchDBTokenChecker()
+ auth_module.couch_server = lambda url: server
+ # assert the checker *cannot* verify the creds
+ creds = UsernamePassword('user', '')
+ with self.assertRaises(UnauthorizedLogin):
+ yield checker.requestAvatarId(creds)
+
+
+class FileTokenCheckerTestCase(unittest.TestCase):
+
+ @inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_good_creds(self):
+ auth_file_path = os.path.join(self.tempdir, 'auth.file')
+ with open(auth_file_path, 'w') as tempfile:
+ tempfile.write('goodservice:goodtoken')
+ # setup the checker with the auth tokens file
+ conf = {'services_tokens_file': auth_file_path}
+ checker = FileTokenChecker(conf)
+ # assert the checker *can* verify the creds
+ creds = UsernamePassword('goodservice', 'goodtoken')
+ avatarId = yield checker.requestAvatarId(creds)
+ self.assertEqual('goodservice', avatarId)
+
+ @inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_bad_creds(self):
+ auth_file_path = os.path.join(self.tempdir, 'auth.file')
+ with open(auth_file_path, 'w') as tempfile:
+ tempfile.write('service:token')
+ # setup the checker with the auth tokens file
+ conf = {'services_tokens_file': auth_file_path}
+ checker = FileTokenChecker(conf)
+ # assert the checker *cannot* verify the creds
+ creds = UsernamePassword('service', 'wrongtoken')
+ with self.assertRaises(UnauthorizedLogin):
+ yield checker.requestAvatarId(creds)
+
+
+class TokenCredentialFactoryTestcase(
+ test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
+ unittest.TestCase):
+
+ def setUp(self):
+ test_httpauth.BasicAuthTestsMixin.setUp(self)
+ self.credentialFactory = TokenCredentialFactory()
diff --git a/tests/server/test_blobs_resource_validation.py b/tests/server/test_blobs_resource_validation.py
new file mode 100644
index 00000000..9f6dfc2f
--- /dev/null
+++ b/tests/server/test_blobs_resource_validation.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+# test_blobs_resource_validation.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for invalid user or blob_id on blobs resource
+"""
+import pytest
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+from leap.soledad.server import _blobs as server_blobs
+
+
+class BlobServerTestCase(unittest.TestCase):
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ def setUp(self):
+ self.resource = server_blobs.BlobsResource("filesystem", self.tempdir)
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_valid_arguments(self):
+ request = DummyRequest(['v4l1d-us3r', 'v4l1d-bl0b-1d'])
+ self.assertTrue(self.resource._validate(request))
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_invalid_user_get(self):
+ request = DummyRequest(['invalid user', 'valid-blob-id'])
+ request.path = '/blobs/'
+ with pytest.raises(Exception):
+ self.resource.render_GET(request)
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_invalid_user_put(self):
+ request = DummyRequest(['invalid user', 'valid-blob-id'])
+ request.path = '/blobs/'
+ with pytest.raises(Exception):
+ self.resource.render_PUT(request)
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_invalid_blob_id_get(self):
+ request = DummyRequest(['valid-user', 'invalid blob id'])
+ request.path = '/blobs/'
+ with pytest.raises(Exception):
+ self.resource.render_GET(request)
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_invalid_blob_id_put(self):
+ request = DummyRequest(['valid-user', 'invalid blob id'])
+ request.path = '/blobs/'
+ with pytest.raises(Exception):
+ self.resource.render_PUT(request)
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
new file mode 100644
index 00000000..9eddf108
--- /dev/null
+++ b/tests/server/test_blobs_server.py
@@ -0,0 +1,286 @@
+# -*- coding: utf-8 -*-
+# test_blobs_server.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Integration tests for blobs server
+"""
+import os
+import pytest
+from uuid import uuid4
+from io import BytesIO
+from twisted.trial import unittest
+from twisted.web.server import Site
+from twisted.internet import reactor
+from twisted.internet import defer
+from treq._utils import set_global_pool
+
+from leap.soledad.common.blobs import Flags
+from leap.soledad.server import _blobs as server_blobs
+from leap.soledad.client._db.blobs import BlobManager
+from leap.soledad.client._db.blobs import BlobAlreadyExistsError
+from leap.soledad.client._db.blobs import InvalidFlagsError
+from leap.soledad.client._db.blobs import SoledadError
+
+
+class BlobServerTestCase(unittest.TestCase):
+
+ def setUp(self):
+ root = server_blobs.BlobsResource("filesystem", self.tempdir)
+ site = Site(root)
+ self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
+ self.host = self.port.getHost()
+ self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
+ self.secret = 'A' * 96
+ set_global_pool(None)
+
+ def tearDown(self):
+ self.port.stopListening()
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_upload_download(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("save me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ blob, size = yield manager._download_and_decrypt('blob_id')
+ self.assertEquals(blob.getvalue(), "save me")
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_set_get_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([Flags.PROCESSING], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_set_flags_raises_if_no_blob_found(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ with pytest.raises(SoledadError):
+ yield manager.set_flags('missing_id', [Flags.PENDING])
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_list_filter_flag(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ blobs_list = yield manager.remote_list(filter_flag=Flags.PENDING)
+ self.assertEquals([], blobs_list)
+ blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING)
+ self.assertEquals(['blob_id'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_list_filter_flag_order_by_date(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("x"))
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("x"))
+ yield manager._encrypt_and_upload('blob_id3', BytesIO("x"))
+ yield manager.set_flags('blob_id1', [Flags.PROCESSING])
+ yield manager.set_flags('blob_id2', [Flags.PROCESSING])
+ yield manager.set_flags('blob_id3', [Flags.PROCESSING])
+ blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING,
+ order_by='+date')
+ expected_list = ['blob_id1', 'blob_id2', 'blob_id3']
+ self.assertEquals(expected_list, blobs_list)
+ blobs_list = yield manager.remote_list(filter_flag=Flags.PROCESSING,
+ order_by='-date')
+ self.assertEquals(list(reversed(expected_list)), blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_cant_set_invalid_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ with pytest.raises(InvalidFlagsError):
+ yield manager.set_flags('blob_id', ['invalid'])
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_empty_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_flags_ignored_by_listing(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ blobs_list = yield manager.remote_list()
+ self.assertEquals(['blob_id'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_upload_changes_remote_list(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("1"))
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
+ blobs_list = yield manager.remote_list()
+ self.assertEquals(set(['blob_id1', 'blob_id2']), set(blobs_list))
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_list_orders_by_date(self):
+ user_uid = uuid4().hex
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, user_uid)
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("1"))
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
+ blobs_list = yield manager.remote_list(order_by='date')
+ self.assertEquals(['blob_id1', 'blob_id2'], blobs_list)
+ parts = [user_uid, 'default', 'b', 'blo', 'blob_i', 'blob_id1']
+ self.__touch(self.tempdir, *parts)
+ blobs_list = yield manager.remote_list(order_by='+date')
+ self.assertEquals(['blob_id2', 'blob_id1'], blobs_list)
+ blobs_list = yield manager.remote_list(order_by='-date')
+ self.assertEquals(['blob_id1', 'blob_id2'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_count(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ deferreds = []
+ for i in range(10):
+ deferreds.append(manager._encrypt_and_upload(str(i), BytesIO("1")))
+ yield defer.gatherResults(deferreds)
+
+ result = yield manager.count()
+ self.assertEquals({"count": len(deferreds)}, result)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_list_restricted_by_namespace(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ namespace = 'incoming'
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("1"),
+ namespace=namespace)
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
+ blobs_list = yield manager.remote_list(namespace=namespace)
+ self.assertEquals(['blob_id1'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_list_default_doesnt_list_other_namespaces(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ namespace = 'incoming'
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("1"),
+ namespace=namespace)
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
+ blobs_list = yield manager.remote_list()
+ self.assertEquals(['blob_id2'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_download_from_namespace(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ namespace, blob_id, content = 'incoming', 'blob_id1', 'test'
+ yield manager._encrypt_and_upload(blob_id, BytesIO(content),
+ namespace=namespace)
+ got_blob = yield manager._download_and_decrypt(blob_id, namespace)
+ self.assertEquals(content, got_blob[0].getvalue())
+
+ def __touch(self, *args):
+ path = os.path.join(*args)
+ with open(path, 'a'):
+ os.utime(path, None)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_upload_deny_duplicates(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ fd = BytesIO("save me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ fd = BytesIO("save me")
+ with pytest.raises(BlobAlreadyExistsError):
+ yield manager._encrypt_and_upload('blob_id', fd)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_send_missing(self):
+ manager = BlobManager(self.tempdir, self.uri, self.secret,
+ self.secret, uuid4().hex)
+ self.addCleanup(manager.close)
+ blob_id = 'local_only_blob_id'
+ yield manager.local.put(blob_id, BytesIO("X"), size=1)
+ yield manager.send_missing()
+ result = yield manager._download_and_decrypt(blob_id)
+ self.assertIsNotNone(result)
+ self.assertEquals(result[0].getvalue(), "X")
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_fetch_missing(self):
+ manager = BlobManager(self.tempdir, self.uri, self.secret,
+ self.secret, uuid4().hex)
+ self.addCleanup(manager.close)
+ blob_id = 'remote_only_blob_id'
+ yield manager._encrypt_and_upload(blob_id, BytesIO("X"))
+ yield manager.fetch_missing()
+ result = yield manager.local.get(blob_id)
+ self.assertIsNotNone(result)
+ self.assertEquals(result.getvalue(), "X")
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_upload_then_delete_updates_list(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("1"))
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("2"))
+ yield manager._delete_from_remote('blob_id1')
+ blobs_list = yield manager.remote_list()
+ self.assertEquals(set(['blob_id2']), set(blobs_list))
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_upload_then_delete_updates_list_using_namespace(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, uuid4().hex)
+ namespace = 'special_archives'
+ yield manager._encrypt_and_upload('blob_id1', BytesIO("1"),
+ namespace=namespace)
+ yield manager._encrypt_and_upload('blob_id2', BytesIO("2"),
+ namespace=namespace)
+ yield manager._delete_from_remote('blob_id1', namespace=namespace)
+ blobs_list = yield manager.remote_list(namespace=namespace)
+ self.assertEquals(set(['blob_id2']), set(blobs_list))
diff --git a/tests/server/test_config.py b/tests/server/test_config.py
new file mode 100644
index 00000000..dfb09f4c
--- /dev/null
+++ b/tests/server/test_config.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# test_config.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server configuration.
+"""
+
+from twisted.trial import unittest
+from pkg_resources import resource_filename
+
+from leap.soledad.server._config import _load_config
+from leap.soledad.server._config import CONFIG_DEFAULTS
+
+
+class ConfigurationParsingTest(unittest.TestCase):
+
+ def setUp(self):
+ self.maxDiff = None
+
+ def test_use_defaults_on_failure(self):
+ config = _load_config('this file will never exist')
+ expected = CONFIG_DEFAULTS
+ self.assertEquals(expected, config)
+
+ def test_security_values_configuration(self):
+ # given
+ config_path = resource_filename('test_soledad',
+ 'fixture_soledad.conf')
+ # when
+ config = _load_config(config_path)
+
+ # then
+ expected = {'members': ['user1', 'user2'],
+ 'members_roles': ['role1', 'role2'],
+ 'admins': ['user3', 'user4'],
+ 'admins_roles': ['role3', 'role3']}
+ self.assertDictEqual(expected, config['database-security'])
+
+ def test_server_values_configuration(self):
+ # given
+ config_path = resource_filename('test_soledad',
+ 'fixture_soledad.conf')
+ # when
+ config = _load_config(config_path)
+
+ # then
+ expected = {'couch_url':
+ 'http://soledad:passwd@localhost:5984',
+ 'create_cmd':
+ 'sudo -u soledad-admin /usr/bin/soledad-create-userdb',
+ 'admin_netrc':
+ '/etc/couchdb/couchdb-soledad-admin.netrc',
+ 'batching': False,
+ 'blobs': False,
+ 'services_tokens_file': '/etc/soledad/services.tokens',
+ 'blobs_path': '/var/lib/soledad/blobs'}
+ self.assertDictEqual(expected, config['soledad-server'])
diff --git a/tests/server/test_incoming_flow_integration.py b/tests/server/test_incoming_flow_integration.py
new file mode 100644
index 00000000..b492534f
--- /dev/null
+++ b/tests/server/test_incoming_flow_integration.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+# test_incoming_flow_integration.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Integration tests for the complete flow of IncomingBox feature
+"""
+import pytest
+from uuid import uuid4
+from twisted.trial import unittest
+from twisted.web.server import Site
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.web.resource import Resource
+from zope.interface import implementer
+
+from leap.soledad.client.incoming import IncomingBoxProcessingLoop
+from leap.soledad.client.incoming import IncomingBox
+from leap.soledad.server import _blobs as server_blobs
+from leap.soledad.client._db.blobs import BlobManager
+from leap.soledad.server._incoming import IncomingResource
+from leap.soledad.server._blobs import BlobsServerState
+from leap.soledad.client import interfaces
+
+
+@implementer(interfaces.IIncomingBoxConsumer)
+class GoodConsumer(object):
+ def __init__(self):
+ self.name = 'GoodConsumer'
+ self.processed, self.saved = [], []
+
+ def process(self, item, item_id, encrypted=True):
+ self.processed.append(item_id)
+ return defer.succeed([item_id])
+
+ def save(self, parts, item_id):
+ self.saved.append(item_id)
+ return defer.succeed(None)
+
+
+class IncomingFlowIntegrationTestCase(unittest.TestCase):
+
+ def setUp(self):
+ root = Resource()
+ state = BlobsServerState('filesystem', blobs_path=self.tempdir)
+ incoming_resource = IncomingResource(state)
+ blobs_resource = server_blobs.BlobsResource("filesystem", self.tempdir)
+ root.putChild('blobs', blobs_resource)
+ root.putChild('incoming', incoming_resource)
+ site = Site(root)
+ self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
+ self.host = self.port.getHost()
+ self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
+ self.blobs_uri = self.uri + 'blobs/'
+ self.incoming_uri = self.uri + 'incoming'
+ self.user_id = 'user-' + uuid4().hex
+ self.secret = 'A' * 96
+ self.blob_manager = BlobManager(self.tempdir, self.blobs_uri,
+ self.secret, self.secret,
+ self.user_id)
+ self.box = IncomingBox(self.blob_manager, 'MX')
+ self.loop = IncomingBoxProcessingLoop(self.box)
+ # FIXME: We use blob_manager client only to avoid DelayedCalls
+ # Somehow treq being used here keeps a connection pool open
+ self.client = self.blob_manager._client
+
+ def fill(self, messages):
+ deferreds = []
+ for message_id, message in messages:
+ uri = '%s/%s/%s' % (self.incoming_uri, self.user_id, message_id)
+ deferreds.append(self.blob_manager._client.put(uri, data=message))
+ return defer.gatherResults(deferreds)
+
+ def tearDown(self):
+ self.port.stopListening()
+ self.blob_manager.close()
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_consume_a_incoming_message(self):
+ yield self.fill([('msg1', 'blob')])
+ consumer = GoodConsumer()
+ self.loop.add_consumer(consumer)
+ yield self.loop()
+ self.assertIn('msg1', consumer.processed)
diff --git a/tests/server/test_incoming_resource.py b/tests/server/test_incoming_resource.py
new file mode 100644
index 00000000..0d4918b9
--- /dev/null
+++ b/tests/server/test_incoming_resource.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# test_incoming_resource.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Unit tests for incoming API resource
+"""
+from twisted.trial import unittest
+from twisted.web.test.test_web import DummyRequest
+from leap.soledad.server._incoming import IncomingResource
+from leap.soledad.server._incoming import IncomingFormatter
+from leap.soledad.common.crypto import EncryptionSchemes
+from io import BytesIO
+from uuid import uuid4
+from mock import Mock
+
+
+class IncomingResourceTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.couchdb = Mock()
+ self.backend_factory = Mock()
+ self.backend_factory.open_database.return_value = self.couchdb
+ self.resource = IncomingResource(self.backend_factory)
+ self.user_uuid = uuid4().hex
+
+ def test_save_document(self):
+ formatter = IncomingFormatter()
+ doc_id, scheme = uuid4().hex, EncryptionSchemes.PUBKEY
+ content = 'Incoming content'
+ request = DummyRequest([self.user_uuid, doc_id])
+ request.content = BytesIO(content)
+ self.resource.render_PUT(request)
+
+ open_database = self.backend_factory.open_database
+ open_database.assert_called_once_with(self.user_uuid)
+ self.couchdb.put_doc.assert_called_once()
+ doc = self.couchdb.put_doc.call_args[0][0]
+ self.assertEquals(doc_id, doc.doc_id)
+ self.assertEquals(formatter.format(content, scheme), doc.content)
+
+ def test_formatter(self):
+ formatter = IncomingFormatter()
+ formatted = formatter.format('content', EncryptionSchemes.PUBKEY)
+ self.assertEquals(formatted['_enc_scheme'], EncryptionSchemes.PUBKEY)
diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py
new file mode 100644
index 00000000..241bc581
--- /dev/null
+++ b/tests/server/test_incoming_server.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# test_incoming_server.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Integration tests for incoming API
+"""
+import pytest
+import json
+from io import BytesIO
+from uuid import uuid4
+from twisted.web.test.test_web import DummyRequest
+from twisted.web.server import Site
+from twisted.internet import reactor
+from twisted.internet import defer
+import treq
+
+from leap.soledad.server._incoming import IncomingResource
+from leap.soledad.server._blobs import BlobsServerState
+from leap.soledad.server._incoming import IncomingFormatter
+from leap.soledad.common.crypto import EncryptionSchemes
+from leap.soledad.common.blobs import Flags
+from test_soledad.util import CouchServerStateForTests
+from test_soledad.util import CouchDBTestCase
+
+
+class IncomingOnCouchServerTestCase(CouchDBTestCase):
+
+ def setUp(self):
+ self.port = None
+
+ def tearDown(self):
+ if self.port:
+ self.port.stopListening()
+
+ def prepare(self, backend):
+ self.user_id = 'user-' + uuid4().hex
+ if backend == 'couch':
+ self.state = CouchServerStateForTests(self.couch_url)
+ self.state.ensure_database(self.user_id)
+ else:
+ self.state = BlobsServerState(backend)
+ root = IncomingResource(self.state)
+ site = Site(root)
+ self.port = reactor.listenTCP(0, site, interface='127.0.0.1')
+ self.host = self.port.getHost()
+ self.uri = 'http://%s:%s/' % (self.host.host, self.host.port)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_put_incoming_creates_a_document_using_couch(self):
+ self.prepare('couch')
+ user_id, doc_id = self.user_id, uuid4().hex
+ content, scheme = 'Hi', EncryptionSchemes.PUBKEY
+ formatter = IncomingFormatter()
+ incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
+ yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
+ db = self.state.open_database(user_id)
+
+ doc = db.get_doc(doc_id)
+ self.assertEquals(doc.content, formatter.format(content, scheme))
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_put_incoming_creates_a_blob_using_filesystem(self):
+ self.prepare('filesystem')
+ user_id, doc_id = self.user_id, uuid4().hex
+ content = 'Hi'
+ formatter = IncomingFormatter()
+ incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
+ yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
+
+ db = self.state.open_database(user_id)
+ request = DummyRequest([user_id, doc_id])
+ yield db.read_blob(user_id, doc_id, request, 'MX')
+ flags = db.get_flags(user_id, doc_id, request, 'MX')
+ flags = json.loads(flags)
+ expected = formatter.preamble(content, doc_id) + ' ' + content
+ self.assertEquals(expected, request.written[0])
+ self.assertIn(Flags.PENDING, flags)
diff --git a/tests/server/test_server.py b/tests/server/test_server.py
new file mode 100644
index 00000000..25f0cc2d
--- /dev/null
+++ b/tests/server/test_server.py
@@ -0,0 +1,230 @@
+# -*- coding: utf-8 -*-
+# test_server.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server-related functionality.
+"""
+import binascii
+import os
+import pytest
+
+from six.moves.urllib.parse import urljoin
+from uuid import uuid4
+
+from twisted.internet import defer
+
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
+from test_soledad.u1db_tests import TestCaseWithServer
+from test_soledad.util import CouchDBTestCase
+from test_soledad.util import (
+ make_token_soledad_app,
+ make_soledad_document_for_test,
+ soledad_sync_target,
+)
+
+from leap.soledad.client import _crypto
+from leap.soledad.client import Soledad
+
+
+@pytest.mark.needs_couch
+@pytest.mark.usefixtures("method_tmpdir")
+class EncryptedSyncTestCase(
+ CouchDBTestCase, TestCaseWithServer):
+
+ """
+ Tests for encrypted sync using Soledad server backed by a couch database.
+ """
+
+ # increase twisted.trial's timeout because large files syncing might take
+ # some time to finish.
+ timeout = 500
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_soledad_document_for_test
+
+ sync_target = soledad_sync_target
+
+ def _soledad_instance(self, user=None, passphrase=u'123',
+ prefix='',
+ secrets_path='secrets.json',
+ local_db_path='soledad.u1db',
+ server_url='',
+ cert_file=None, auth_token=None):
+ """
+ Instantiate Soledad.
+ """
+
+ # this callback ensures we save a document which is sent to the shared
+ # db.
+ def _put_doc_side_effect(doc):
+ self._doc_put = doc
+
+ if not server_url:
+ # attempt to find the soledad server url
+ server_address = None
+ server = getattr(self, 'server', None)
+ if server:
+ server_address = getattr(self.server, 'server_address', None)
+ else:
+ host = self.port.getHost()
+ server_address = (host.host, host.port)
+ if server_address:
+ server_url = 'http://%s:%d' % (server_address)
+
+ return Soledad(
+ user,
+ passphrase,
+ secrets_path=os.path.join(self.tempdir, prefix, secrets_path),
+ local_db_path=os.path.join(
+ self.tempdir, prefix, local_db_path),
+ server_url=server_url,
+ cert_file=cert_file,
+ auth_token=auth_token,
+ shared_db=self.get_default_shared_mock(_put_doc_side_effect))
+
+ def make_app(self):
+ self.request_state = CouchServerState(self.couch_url)
+ return self.make_app_with_state(self.request_state)
+
+ def setUp(self):
+ CouchDBTestCase.setUp(self)
+ TestCaseWithServer.setUp(self)
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ def _test_encrypted_sym_sync(self, passphrase=u'123', doc_size=2,
+ number_of_docs=1):
+ """
+ Test the complete syncing chain between two soledad dbs using a
+ Soledad server backed by a couch database.
+ """
+ self.startTwistedServer()
+ user = 'user-' + uuid4().hex
+
+ # this will store all docs ids to avoid get_all_docs
+ created_ids = []
+
+ # instantiate soledad and create a document
+ sol1 = self._soledad_instance(
+ user=user,
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token',
+ passphrase=passphrase)
+
+ # instantiate another soledad using the same secret as the previous
+ # one (so we can correctly verify the mac of the synced document)
+ sol2 = self._soledad_instance(
+ user=user,
+ prefix='x',
+ auth_token='auth-token',
+ secrets_path=sol1.secrets_path,
+ passphrase=passphrase)
+
+ # ensure remote db exists before syncing
+ db = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + user),
+ create=True)
+
+ def _db1AssertEmptyDocList(results):
+ _, doclist = results
+ self.assertEqual([], doclist)
+
+ def _db1CreateDocs(results):
+ deferreds = []
+ for i in xrange(number_of_docs):
+ content = binascii.hexlify(os.urandom(doc_size / 2))
+ d = sol1.create_doc({'data': content})
+ d.addCallback(created_ids.append)
+ deferreds.append(d)
+ return defer.DeferredList(deferreds)
+
+ def _db1AssertDocsSyncedToServer(results):
+ self.assertEqual(number_of_docs, len(created_ids))
+ for soldoc in created_ids:
+ couchdoc = db.get_doc(soldoc.doc_id)
+ self.assertTrue(couchdoc)
+ # assert document structure in couch server
+ self.assertEqual(soldoc.doc_id, couchdoc.doc_id)
+ self.assertEqual(soldoc.rev, couchdoc.rev)
+ couch_content = couchdoc.content.keys()
+ self.assertEqual(['raw'], couch_content)
+ content = couchdoc.get_json()
+ self.assertTrue(_crypto.is_symmetrically_encrypted(content))
+
+ d = sol1.get_all_docs()
+ d.addCallback(_db1AssertEmptyDocList)
+ d.addCallback(_db1CreateDocs)
+ d.addCallback(lambda _: sol1.sync())
+ d.addCallback(_db1AssertDocsSyncedToServer)
+
+ def _db2AssertEmptyDocList(results):
+ _, doclist = results
+ self.assertEqual([], doclist)
+
+ def _getAllDocsFromBothDbs(results):
+ d1 = sol1.get_all_docs()
+ d2 = sol2.get_all_docs()
+ return defer.DeferredList([d1, d2])
+
+ d.addCallback(lambda _: sol2.get_all_docs())
+ d.addCallback(_db2AssertEmptyDocList)
+ d.addCallback(lambda _: sol2.sync())
+ d.addCallback(_getAllDocsFromBothDbs)
+
+ def _assertDocSyncedFromDb1ToDb2(results):
+ r1, r2 = results
+ _, (gen1, doclist1) = r1
+ _, (gen2, doclist2) = r2
+ self.assertEqual(number_of_docs, gen1)
+ self.assertEqual(number_of_docs, gen2)
+ self.assertEqual(number_of_docs, len(doclist1))
+ self.assertEqual(number_of_docs, len(doclist2))
+ self.assertEqual(doclist1[0], doclist2[0])
+
+ d.addCallback(_assertDocSyncedFromDb1ToDb2)
+
+ def _cleanUp(results):
+ db.delete_database()
+ db.close()
+ sol1.close()
+ sol2.close()
+
+ d.addCallback(_cleanUp)
+
+ return d
+
+ def test_encrypted_sym_sync(self):
+ return self._test_encrypted_sym_sync()
+
+ def test_encrypted_sym_sync_with_unicode_passphrase(self):
+ """
+ Test the complete syncing chain between two soledad dbs using a
+ Soledad server backed by a couch database, using an unicode
+ passphrase.
+ """
+ return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëíìïóòöõúùüñç')
+
+ def test_sync_many_small_files(self):
+ """
+ Test if Soledad can sync many smallfiles.
+ """
+ return self._test_encrypted_sym_sync(doc_size=2, number_of_docs=100)
diff --git a/tests/server/test_session.py b/tests/server/test_session.py
new file mode 100644
index 00000000..3dbd2740
--- /dev/null
+++ b/tests/server/test_session.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# test_session.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server session entrypoint.
+"""
+from twisted.trial import unittest
+
+from twisted.cred import portal
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred.credentials import IUsernamePassword
+from twisted.web.resource import getChildForRequest
+from twisted.web.static import Data
+from twisted.web.test.requesthelper import DummyRequest
+from twisted.web.test.test_httpauth import b64encode
+from twisted.web.test.test_httpauth import Realm
+from twisted.web._auth.wrapper import UnauthorizedResource
+
+from leap.soledad.server.session import SoledadSession
+
+
+class SoledadSessionTestCase(unittest.TestCase):
+ """
+ Tests adapted from for
+ L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}.
+ """
+
+ def makeRequest(self, *args, **kwargs):
+ request = DummyRequest(*args, **kwargs)
+ request.path = '/'
+ return request
+
+ def setUp(self):
+ self.username = b'foo bar'
+ self.password = b'bar baz'
+ self.avatarContent = b"contents of the avatar resource itself"
+ self.childName = b"foo-child"
+ self.childContent = b"contents of the foo child of the avatar"
+ self.checker = InMemoryUsernamePasswordDatabaseDontUse()
+ self.checker.addUser(self.username, self.password)
+ self.avatar = Data(self.avatarContent, 'text/plain')
+ self.avatar.putChild(
+ self.childName, Data(self.childContent, 'text/plain'))
+ self.avatars = {self.username: self.avatar}
+ self.realm = Realm(self.avatars.get)
+ self.portal = portal.Portal(self.realm, [self.checker])
+ self.wrapper = SoledadSession(self.portal)
+
+ def _authorizedTokenLogin(self, request):
+ authorization = b64encode(
+ self.username + b':' + self.password)
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token ' + authorization)
+ return getChildForRequest(self.wrapper, request)
+
+ def test_getChildWithDefault(self):
+ request = self.makeRequest([self.childName])
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def _invalidAuthorizationTest(self, response):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', response)
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_getChildWithDefaultUnauthorizedUser(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(b'foo:bar'))
+
+ def test_getChildWithDefaultUnauthorizedPassword(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(self.username + b':bar'))
+
+ def test_getChildWithDefaultUnrecognizedScheme(self):
+ return self._invalidAuthorizationTest(b'Quux foo bar baz')
+
+ def test_getChildWithDefaultAuthorized(self):
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.childContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_renderAuthorized(self):
+ # Request it exactly, not any of its children.
+ request = self.makeRequest([])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.avatarContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_decodeRaises(self):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token decode should fail')
+ child = getChildForRequest(self.wrapper, request)
+ self.assertIsInstance(child, UnauthorizedResource)
+
+ def test_parseResponse(self):
+ basicAuthorization = b'Basic abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(basicAuthorization),
+ None)
+ tokenAuthorization = b'Token abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(tokenAuthorization),
+ b'abcdef123456')
+
+ def test_unexpectedDecodeError(self):
+
+ class UnexpectedException(Exception):
+ pass
+
+ class BadFactory(object):
+ scheme = b'bad'
+
+ def getChallenge(self, client):
+ return {}
+
+ def decode(self, response, request):
+ print("decode raised")
+ raise UnexpectedException()
+
+ self.wrapper._credentialFactory = BadFactory()
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', b'Bad abc')
+ child = getChildForRequest(self.wrapper, request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+ errors = self.flushLoggedErrors(UnexpectedException)
+ self.assertEqual(len(errors), 1)
+
+ def test_unexpectedLoginError(self):
+ class UnexpectedException(Exception):
+ pass
+
+ class BrokenChecker(object):
+ credentialInterfaces = (IUsernamePassword,)
+
+ def requestAvatarId(self, credentials):
+ raise UnexpectedException()
+
+ self.portal.registerChecker(BrokenChecker())
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+ self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
+
+ def test_cantAccessOtherUserPathByDefault(self):
+ request = self.makeRequest([])
+ # valid url_mapper path, but for another user
+ request.path = '/blobs/another-user/'
+ child = self._authorizedTokenLogin(request)
+
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
diff --git a/tests/server/test_shared_db.py b/tests/server/test_shared_db.py
new file mode 100644
index 00000000..96af6dff
--- /dev/null
+++ b/tests/server/test_shared_db.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# test_shared_db.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the shared db on server side.
+"""
+
+
+import pytest
+
+from twisted.trial import unittest
+
+from leap.soledad.client.shared_db import SoledadSharedDatabase
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common.l2db.errors import RevisionConflict
+
+
+@pytest.mark.needs_couch
+class SharedDbTests(unittest.TestCase):
+ """
+ """
+
+ URL = 'http://127.0.0.1:2424/shared'
+ CREDS = {'token': {'uuid': 'an-uuid', 'token': 'an-auth-token'}}
+
+ @pytest.fixture(autouse=True)
+ def soledad_client(self, soledad_server, soledad_dbs):
+ soledad_dbs('an-uuid')
+ self._db = SoledadSharedDatabase.open_database(self.URL, self.CREDS)
+
+ @pytest.mark.thisone
+ def test_doc_update_succeeds(self):
+ doc_id = 'some-random-doc'
+ self.assertIsNone(self._db.get_doc(doc_id))
+ # create a document in shared db
+ doc = SoledadDocument(doc_id=doc_id)
+ self._db.put_doc(doc)
+ # update that document
+ expected = {'new': 'content'}
+ doc.content = expected
+ self._db.put_doc(doc)
+ # ensure expected content was saved
+ doc = self._db.get_doc(doc_id)
+ self.assertEqual(expected, doc.content)
+
+ @pytest.mark.thisone
+ def test_doc_update_fails_with_wrong_rev(self):
+ # create a document in shared db
+ doc_id = 'some-random-doc'
+ self.assertIsNone(self._db.get_doc(doc_id))
+ # create a document in shared db
+ doc = SoledadDocument(doc_id=doc_id)
+ self._db.put_doc(doc)
+ # try to update document without including revision of old version
+ doc.rev = 'wrong-rev'
+ self.assertRaises(RevisionConflict, self._db.put_doc, doc)
diff --git a/tests/server/test_soledad b/tests/server/test_soledad
new file mode 120000
index 00000000..c1a35d32
--- /dev/null
+++ b/tests/server/test_soledad
@@ -0,0 +1 @@
+../test_soledad \ No newline at end of file
diff --git a/tests/server/test_tac.py b/tests/server/test_tac.py
new file mode 100644
index 00000000..7bb50e35
--- /dev/null
+++ b/tests/server/test_tac.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+# test_tac.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the localhost/public APIs using .tac file.
+See docs/auth.rst
+"""
+
+
+import os
+import signal
+import socket
+import pytest
+import treq
+
+from pkg_resources import resource_filename
+from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from twisted.internet.protocol import ProcessProtocol
+from twisted.web.client import Agent
+
+
+TAC_FILE_PATH = resource_filename('leap.soledad.server', 'server.tac')
+
+
+class TacServerTestCase(unittest.TestCase):
+
+ def test_tac_file_exists(self):
+ msg = "server.tac used on this test case was expected to be at %s"
+ self.assertTrue(os.path.isfile(TAC_FILE_PATH), msg % TAC_FILE_PATH)
+
+ @defer.inlineCallbacks
+ def test_local_public_default_ports_on_server_tac(self):
+ yield self._spawnServer()
+ result = yield self._get('http://localhost:2525/incoming')
+ fail_msg = "Localhost endpoint must require authentication!"
+ self.assertEquals(401, result.code, fail_msg)
+
+ public_endpoint_url = 'http://%s:2424/' % self._get_public_ip()
+ result = yield self._get(public_endpoint_url)
+ self.assertEquals(200, result.code, "server info not accessible")
+
+ result = yield self._get(public_endpoint_url + 'other')
+ self.assertEquals(401, result.code, "public server lacks auth!")
+
+ public_using_local_port_url = 'http://%s:2525/' % self._get_public_ip()
+ with pytest.raises(Exception):
+ yield self._get(public_using_local_port_url)
+
+ def _spawnServer(self):
+ protocol = ProcessProtocol()
+ env = os.environ.get('VIRTUAL_ENV', '/usr')
+ executable = os.path.join(env, 'bin', 'twistd')
+ no_pid_argument = '--pidfile='
+ args = [executable, no_pid_argument, '-noy', TAC_FILE_PATH]
+ env = {'DEBUG_SERVER': 'yes'}
+ t = reactor.spawnProcess(protocol, executable, args, env=env)
+ self.addCleanup(os.kill, t.pid, signal.SIGKILL)
+ self.addCleanup(t.loseConnection)
+ return self._sleep(1) # it takes a while to start server
+
+ def _sleep(self, time):
+ d = defer.Deferred()
+ reactor.callLater(time, d.callback, True)
+ return d
+
+ def _get(self, *args, **kwargs):
+ kwargs['agent'] = Agent(reactor)
+ return treq.get(*args, **kwargs)
+
+ def _get_public_ip(self):
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.connect(("8.8.8.8", 80))
+ return s.getsockname()[0]
diff --git a/tests/server/test_url_mapper.py b/tests/server/test_url_mapper.py
new file mode 100644
index 00000000..a04e7593
--- /dev/null
+++ b/tests/server/test_url_mapper.py
@@ -0,0 +1,133 @@
+# -*- coding: utf-8 -*-
+# test_url_mapper.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server-related functionality.
+"""
+import pytest
+
+from twisted.trial import unittest
+from uuid import uuid4
+
+from leap.soledad.server.url_mapper import URLMapper
+
+
+class URLMapperTestCase(unittest.TestCase):
+ """
+ Test if the URLMapper behaves as expected.
+
+ The following table lists the authorized actions among all possible
+ u1db remote actions:
+
+ URL path | Authorized actions
+ --------------------------------------------------
+ / | GET
+ /shared-db | GET
+ /shared-db/docs | -
+ /shared-db/doc/{id} | -
+ /shared-db/sync-from/{source} | -
+ /user-db | -
+ /user-db/docs | -
+ /user-db/doc/{id} | -
+ /user-db/sync-from/{source} | GET, PUT, POST
+ """
+
+ def setUp(self):
+ self._uuid = uuid4().hex
+ self._urlmap = URLMapper()
+ self._dbname = 'user-%s' % self._uuid
+
+ @pytest.mark.needs_couch
+ def test_root_authorized(self):
+ match = self._urlmap.match('/', 'GET')
+ self.assertIsNotNone(match)
+
+ def test_shared_authorized(self):
+ self.assertIsNotNone(self._urlmap.match('/shared', 'GET'))
+
+ def test_shared_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared', 'POST'))
+
+ def test_shared_docs_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'GET'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared/docs', 'POST'))
+
+ def test_shared_doc_authorized(self):
+ match = self._urlmap.match('/shared/doc/x', 'GET')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ match = self._urlmap.match('/shared/doc/x', 'PUT')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ match = self._urlmap.match('/shared/doc/x', 'DELETE')
+ self.assertIsNotNone(match)
+ self.assertEqual('x', match.get('id'))
+
+ def test_shared_doc_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/doc/x', 'POST'))
+
+ def test_shared_sync_unauthorized(self):
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'GET'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'PUT'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/shared/sync-from/x', 'POST'))
+
+ def test_user_db_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s' % dbname, 'POST'))
+
+ def test_user_db_docs_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s/docs' % dbname, 'POST'))
+
+ def test_user_db_doc_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'GET'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'PUT'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'DELETE'))
+ self.assertIsNone(self._urlmap.match('/%s/doc/x' % dbname, 'POST'))
+
+ def test_user_db_sync_authorized(self):
+ uuid = self._uuid
+ dbname = self._dbname
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'GET')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'PUT')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ match = self._urlmap.match('/%s/sync-from/x' % dbname, 'POST')
+ self.assertEqual(uuid, match.get('uuid'))
+ self.assertEqual('x', match.get('source_replica_uid'))
+
+ def test_user_db_sync_unauthorized(self):
+ dbname = self._dbname
+ self.assertIsNone(
+ self._urlmap.match('/%s/sync-from/x' % dbname, 'DELETE'))