diff options
Diffstat (limited to 'testing/tests/blobs')
-rw-r--r-- | testing/tests/blobs/test_blob_manager.py | 177 | ||||
-rw-r--r-- | testing/tests/blobs/test_decrypter_buffer.py | 72 | ||||
-rw-r--r-- | testing/tests/blobs/test_fs_backend.py | 173 | ||||
-rw-r--r-- | testing/tests/blobs/test_sqlcipher_client_backend.py | 75 |
4 files changed, 0 insertions, 497 deletions
diff --git a/testing/tests/blobs/test_blob_manager.py b/testing/tests/blobs/test_blob_manager.py deleted file mode 100644 index 7d985768..00000000 --- a/testing/tests/blobs/test_blob_manager.py +++ /dev/null @@ -1,177 +0,0 @@ -# -*- coding: utf-8 -*- -# test_local_backend.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for BlobManager. -""" -from twisted.trial import unittest -from twisted.internet import defer -from twisted.web.error import SchemeNotSupported -from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV -from leap.soledad.client._db.blobs import BlobAlreadyExistsError -from leap.soledad.client._db.blobs import SyncStatus -from io import BytesIO -from mock import Mock -from uuid import uuid4 -import pytest -import os - - -class BlobManagerTestCase(unittest.TestCase): - - class doc_info: - doc_id = 'D-deadbeef' - rev = FIXED_REV - - def setUp(self): - self.cleartext = BytesIO('rosa de foc') - self.secret = 'A' * 96 - self.manager = BlobManager( - self.tempdir, '', - 'A' * 32, self.secret, - uuid4().hex, 'token', None) - self.addCleanup(self.manager.close) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_missing(self): - self.manager._download_and_decrypt = Mock(return_value=None) - missing_blob_id = uuid4().hex - result = yield self.manager.get(missing_blob_id) - self.assertIsNone(result) - args = missing_blob_id, '' - self.manager._download_and_decrypt.assert_called_once_with(*args) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_from_existing_value(self): - self.manager._download_and_decrypt = Mock(return_value=None) - msg, blob_id = "It's me, M4r10!", uuid4().hex - yield self.manager.local.put(blob_id, BytesIO(msg), - size=len(msg)) - result = yield self.manager.get(blob_id) - self.assertEquals(result.getvalue(), msg) - self.assertNot(self.manager._download_and_decrypt.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_put_stores_on_local_db(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - msg, blob_id = "Hey Joe", uuid4().hex - doc = BlobDoc(BytesIO(msg), blob_id=blob_id) - yield self.manager.put(doc, size=len(msg)) - result = yield self.manager.local.get(blob_id) - self.assertEquals(result.getvalue(), msg) - self.assertTrue(self.manager._encrypt_and_upload.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_put_then_get_using_real_file_descriptor(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - self.manager._download_and_decrypt = Mock(return_value=None) - msg, blob_id = "Fuuuuull cycleee! \o/", uuid4().hex - tmpfile = os.tmpfile() - tmpfile.write(msg) - tmpfile.seek(0) - doc = BlobDoc(tmpfile, blob_id) - yield self.manager.put(doc, size=len(msg)) - result = yield self.manager.get(doc.blob_id) - self.assertEquals(result.getvalue(), msg) - self.assertTrue(self.manager._encrypt_and_upload.called) - self.assertFalse(self.manager._download_and_decrypt.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_local_list_blobs(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - msg, blob_id1, blob_id2 = "1337", uuid4().hex, uuid4().hex - doc = BlobDoc(BytesIO(msg), blob_id1) - yield self.manager.put(doc, size=len(msg)) - doc2 = BlobDoc(BytesIO(msg), blob_id2) - yield self.manager.put(doc2, size=len(msg)) - blobs_list = yield self.manager.local_list() - - self.assertEquals(set([blob_id1, blob_id2]), set(blobs_list)) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_send_missing(self): - fd, missing_id = BytesIO('test'), uuid4().hex - self.manager._encrypt_and_upload = Mock(return_value=None) - self.manager.remote_list = Mock(return_value=[]) - yield self.manager.local.put(missing_id, fd, 4) - yield self.manager.send_missing() - - call_list = self.manager._encrypt_and_upload.call_args_list - self.assertEquals(1, len(call_list)) - call_blob_id, call_fd = call_list[0][0] - self.assertEquals(missing_id, call_blob_id) - self.assertEquals('test', call_fd.getvalue()) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_duplicated_blob_error_on_put(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - content, existing_id = "Blob content", uuid4().hex - doc1 = BlobDoc(BytesIO(content), existing_id) - yield self.manager.put(doc1, len(content)) - doc2 = BlobDoc(BytesIO(content), existing_id) - self.manager._encrypt_and_upload.reset_mock() - with pytest.raises(BlobAlreadyExistsError): - yield self.manager.put(doc2, len(content)) - self.assertFalse(self.manager._encrypt_and_upload.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_delete_from_local_and_remote(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - self.manager._delete_from_remote = Mock(return_value=None) - content, blob_id = "Blob content", uuid4().hex - doc1 = BlobDoc(BytesIO(content), blob_id) - yield self.manager.put(doc1, len(content)) - yield self.manager.delete(blob_id) - local_list = yield self.manager.local_list() - self.assertEquals(0, len(local_list)) - params = {'namespace': ''} - self.manager._delete_from_remote.assert_called_with(blob_id, **params) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_local_sync_status_pending_upload(self): - upload_failure = defer.fail(Exception()) - self.manager._encrypt_and_upload = Mock(return_value=upload_failure) - content, blob_id = "Blob content", uuid4().hex - doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(Exception): - yield self.manager.put(doc1, len(content)) - pending_upload = SyncStatus.PENDING_UPLOAD - local_list = yield self.manager.local_list(sync_status=pending_upload) - self.assertIn(blob_id, local_list) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_upload_retry_limit(self): - self.manager.remote_list = Mock(return_value=[]) - content, blob_id = "Blob content", uuid4().hex - doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(Exception): - yield self.manager.put(doc1, len(content)) - for _ in range(self.manager.max_retries + 1): - with pytest.raises(SchemeNotSupported): - yield self.manager.send_missing() - failed_upload = SyncStatus.FAILED_UPLOAD - local_list = yield self.manager.local_list(sync_status=failed_upload) - self.assertIn(blob_id, local_list) diff --git a/testing/tests/blobs/test_decrypter_buffer.py b/testing/tests/blobs/test_decrypter_buffer.py deleted file mode 100644 index 83fbaad3..00000000 --- a/testing/tests/blobs/test_decrypter_buffer.py +++ /dev/null @@ -1,72 +0,0 @@ -# -*- coding: utf-8 -*- -# test_blobs.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for blobs decrypter buffer. A component which is used as a decryption -sink during blob stream download. -""" -from io import BytesIO -from mock import Mock - -from twisted.trial import unittest -from twisted.internet import defer - -from leap.soledad.client._db.blobs import DecrypterBuffer -from leap.soledad.client._db.blobs import BlobManager -from leap.soledad.client._db.blobs import FIXED_REV -from leap.soledad.client import _crypto - - -class DecrypterBufferCase(unittest.TestCase): - - class doc_info: - doc_id = 'D-BLOB-ID' - rev = FIXED_REV - - def setUp(self): - self.cleartext = BytesIO('rosa de foc') - self.secret = 'A' * 96 - self.blob = _crypto.BlobEncryptor( - self.doc_info, self.cleartext, - armor=False, - secret='A' * 96) - - @defer.inlineCallbacks - def test_decrypt_buffer(self): - encrypted = (yield self.blob.encrypt()).getvalue() - tag = encrypted[-16:] - buf = DecrypterBuffer(self.doc_info.doc_id, self.secret, tag) - buf.write(encrypted) - fd, size = buf.close() - self.assertEquals(fd.getvalue(), 'rosa de foc') - - @defer.inlineCallbacks - def test_decrypt_uploading_encrypted_blob(self): - - @defer.inlineCallbacks - def _check_result(uri, data, *args, **kwargs): - decryptor = _crypto.BlobDecryptor( - self.doc_info, data, - armor=False, - secret=self.secret) - decrypted = yield decryptor.decrypt() - self.assertEquals(decrypted.getvalue(), 'up and up') - defer.returnValue(Mock(code=200)) - - manager = BlobManager('', '', self.secret, self.secret, 'user') - fd = BytesIO('up and up') - manager._client.put = _check_result - yield manager._encrypt_and_upload(self.doc_info.doc_id, fd) diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py deleted file mode 100644 index 53f3127d..00000000 --- a/testing/tests/blobs/test_fs_backend.py +++ /dev/null @@ -1,173 +0,0 @@ -# -*- coding: utf-8 -*- -# test_fs_backend.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for blobs backend on server side. -""" -from twisted.trial import unittest -from twisted.internet import defer -from twisted.web.test.test_web import DummyRequest -from leap.soledad.server import _blobs -from io import BytesIO -from mock import Mock -import mock -import os -import base64 -import json -import pytest - - -class FilesystemBackendTestCase(unittest.TestCase): - - @mock.patch.object(_blobs, 'open') - def test_tag_header(self, open_mock): - open_mock.return_value = BytesIO('A' * 40 + 'B' * 16) - expected_tag = base64.urlsafe_b64encode('B' * 16) - expected_method = Mock() - backend = _blobs.FilesystemBlobsBackend() - request = Mock(responseHeaders=Mock(setRawHeaders=expected_method)) - backend.add_tag_header('user', 'blob_id', request) - - expected_method.assert_called_once_with('Tag', [expected_tag]) - - @mock.patch.object(_blobs.static, 'File') - def test_read_blob(self, file_mock): - render_mock = Mock() - file_mock.return_value = render_mock - backend = _blobs.FilesystemBlobsBackend() - request = DummyRequest(['']) - backend._get_path = Mock(return_value='path') - backend.read_blob('user', 'blob_id', request) - - backend._get_path.assert_called_once_with('user', 'blob_id', '') - ctype = 'application/octet-stream' - _blobs.static.File.assert_called_once_with('path', defaultType=ctype) - render_mock.render_GET.assert_called_once_with(request) - - @mock.patch.object(os.path, 'isfile') - @defer.inlineCallbacks - def test_cannot_overwrite(self, isfile): - isfile.return_value = True - backend = _blobs.FilesystemBlobsBackend() - backend._get_path = Mock(return_value='path') - request = DummyRequest(['']) - yield backend.write_blob('user', 'blob_id', request) - self.assertEquals(request.written[0], "Blob already exists: blob_id") - self.assertEquals(request.responseCode, 409) - - @pytest.mark.usefixtures("method_tmpdir") - @mock.patch.object(os.path, 'isfile') - @defer.inlineCallbacks - def test_write_cannot_exceed_quota(self, isfile): - isfile.return_value = False - backend = _blobs.FilesystemBlobsBackend() - backend._get_path = Mock(return_value=self.tempdir) - request = Mock() - - backend.get_total_storage = lambda x: 100 - backend.quota = 90 - yield backend.write_blob('user', 'blob_id', request) - - request.setResponseCode.assert_called_once_with(507) - request.write.assert_called_once_with('Quota Exceeded!') - - def test_get_path_partitioning_by_default(self): - backend = _blobs.FilesystemBlobsBackend() - backend.path = '/somewhere/' - path = backend._get_path('user', 'blob_id', '') - expected = '/somewhere/user/default/b/blo/blob_i/blob_id' - self.assertEquals(path, expected) - - def test_get_path_custom(self): - backend = _blobs.FilesystemBlobsBackend() - backend.path = '/somewhere/' - path = backend._get_path('user', 'blob_id', 'wonderland') - expected = '/somewhere/user/wonderland/b/blo/blob_i/blob_id' - self.assertEquals(expected, path) - - def test_get_path_namespace_traversal_raises(self): - backend = _blobs.FilesystemBlobsBackend() - backend.path = '/somewhere/' - with pytest.raises(Exception): - backend._get_path('user', 'blob_id', '..') - - @pytest.mark.usefixtures("method_tmpdir") - @mock.patch('leap.soledad.server._blobs.os.walk') - def test_list_blobs(self, walk_mock): - backend, _ = _blobs.FilesystemBlobsBackend(self.tempdir), None - walk_mock.return_value = [('', _, ['blob_0']), ('', _, ['blob_1'])] - result = json.loads(backend.list_blobs('user', DummyRequest(['']))) - self.assertEquals(result, ['blob_0', 'blob_1']) - - @pytest.mark.usefixtures("method_tmpdir") - @mock.patch('leap.soledad.server._blobs.os.walk') - def test_list_blobs_limited_by_namespace(self, walk_mock): - backend, _ = _blobs.FilesystemBlobsBackend(self.tempdir), None - walk_mock.return_value = [('', _, ['blob_0']), ('', _, ['blob_1'])] - result = json.loads(backend.list_blobs('user', DummyRequest(['']), - namespace='incoming')) - self.assertEquals(result, ['blob_0', 'blob_1']) - target_dir = os.path.join(self.tempdir, 'user', 'incoming') - walk_mock.assert_called_once_with(target_dir) - - @pytest.mark.usefixtures("method_tmpdir") - def test_path_validation_on_read_blob(self): - blobs_path, request = self.tempdir, DummyRequest(['']) - backend = _blobs.FilesystemBlobsBackend(blobs_path) - with pytest.raises(Exception): - backend.read_blob('..', '..', request) - with pytest.raises(Exception): - backend.read_blob('user', '../../../', request) - with pytest.raises(Exception): - backend.read_blob('../../../', 'blob_id', request) - with pytest.raises(Exception): - backend.read_blob('user', 'blob_id', request, namespace='..') - - @pytest.mark.usefixtures("method_tmpdir") - @defer.inlineCallbacks - def test_path_validation_on_write_blob(self): - blobs_path, request = self.tempdir, DummyRequest(['']) - backend = _blobs.FilesystemBlobsBackend(blobs_path) - with pytest.raises(Exception): - yield backend.write_blob('..', '..', request) - with pytest.raises(Exception): - yield backend.write_blob('user', '../../../', request) - with pytest.raises(Exception): - yield backend.write_blob('../../../', 'id1', request) - with pytest.raises(Exception): - yield backend.write_blob('user', 'id2', request, namespace='..') - - @pytest.mark.usefixtures("method_tmpdir") - @mock.patch('leap.soledad.server._blobs.os.unlink') - def test_delete_blob(self, unlink_mock): - backend = _blobs.FilesystemBlobsBackend(self.tempdir) - backend.delete_blob('user', 'blob_id') - unlink_mock.assert_any_call(backend._get_path('user', - 'blob_id')) - unlink_mock.assert_any_call(backend._get_path('user', - 'blob_id') + '.flags') - - @pytest.mark.usefixtures("method_tmpdir") - @mock.patch('leap.soledad.server._blobs.os.unlink') - def test_delete_blob_custom_namespace(self, unlink_mock): - backend = _blobs.FilesystemBlobsBackend(self.tempdir) - backend.delete_blob('user', 'blob_id', namespace='trash') - unlink_mock.assert_any_call(backend._get_path('user', - 'blob_id', - 'trash')) - unlink_mock.assert_any_call(backend._get_path('user', - 'blob_id', - 'trash') + '.flags') diff --git a/testing/tests/blobs/test_sqlcipher_client_backend.py b/testing/tests/blobs/test_sqlcipher_client_backend.py deleted file mode 100644 index daf561c7..00000000 --- a/testing/tests/blobs/test_sqlcipher_client_backend.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# test_sqlcipher_client_backend.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for sqlcipher backend on blobs client. -""" -from twisted.trial import unittest -from twisted.internet import defer -from leap.soledad.client._db.blobs import SQLiteBlobBackend -from io import BytesIO -from uuid import uuid4 -import pytest - - -class SQLBackendTestCase(unittest.TestCase): - - def setUp(self): - self.key = "A" * 96 - self.local = SQLiteBlobBackend(self.tempdir, self.key) - self.addCleanup(self.local.close) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_inexisting(self): - bad_blob_id = uuid4().hex - self.assertFalse((yield self.local.exists(bad_blob_id))) - result = yield self.local.get(bad_blob_id) - self.assertIsNone(result) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_existing(self): - blob_id = uuid4().hex - content = "x" - yield self.local.put(blob_id, BytesIO(content), len(content)) - result = yield self.local.get(blob_id) - self.assertTrue((yield self.local.exists(blob_id))) - self.assertEquals(result.getvalue(), content) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_delete(self): - blob_id1, blob_id2 = uuid4().hex, uuid4().hex - content = "x" - yield self.local.put(blob_id1, BytesIO(content), len(content)) - yield self.local.put(blob_id2, BytesIO(content), len(content)) - yield self.local.delete(blob_id1) - self.assertFalse((yield self.local.exists(blob_id1))) - self.assertTrue((yield self.local.exists(blob_id2))) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_list(self): - blob_ids = [uuid4().hex for _ in range(10)] - content = "x" - deferreds = [] - for blob_id in blob_ids: - deferreds.append(self.local.put(blob_id, BytesIO(content), - len(content))) - yield defer.gatherResults(deferreds) - result = yield self.local.list() - self.assertEquals(set(blob_ids), set(result)) |