diff options
Diffstat (limited to 'tests/blobs')
-rw-r--r-- | tests/blobs/test_blob_manager.py | 177 | ||||
-rw-r--r-- | tests/blobs/test_decrypter_buffer.py | 72 | ||||
-rw-r--r-- | tests/blobs/test_fs_backend.py | 173 | ||||
-rw-r--r-- | tests/blobs/test_sqlcipher_client_backend.py | 75 |
4 files changed, 497 insertions, 0 deletions
diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py new file mode 100644 index 00000000..7d985768 --- /dev/null +++ b/tests/blobs/test_blob_manager.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# test_local_backend.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for BlobManager. +""" +from twisted.trial import unittest +from twisted.internet import defer +from twisted.web.error import SchemeNotSupported +from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV +from leap.soledad.client._db.blobs import BlobAlreadyExistsError +from leap.soledad.client._db.blobs import SyncStatus +from io import BytesIO +from mock import Mock +from uuid import uuid4 +import pytest +import os + + +class BlobManagerTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = FIXED_REV + + def setUp(self): + self.cleartext = BytesIO('rosa de foc') + self.secret = 'A' * 96 + self.manager = BlobManager( + self.tempdir, '', + 'A' * 32, self.secret, + uuid4().hex, 'token', None) + self.addCleanup(self.manager.close) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_missing(self): + self.manager._download_and_decrypt = Mock(return_value=None) + missing_blob_id = uuid4().hex + result = yield self.manager.get(missing_blob_id) + self.assertIsNone(result) + args = missing_blob_id, '' + self.manager._download_and_decrypt.assert_called_once_with(*args) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_from_existing_value(self): + self.manager._download_and_decrypt = Mock(return_value=None) + msg, blob_id = "It's me, M4r10!", uuid4().hex + yield self.manager.local.put(blob_id, BytesIO(msg), + size=len(msg)) + result = yield self.manager.get(blob_id) + self.assertEquals(result.getvalue(), msg) + self.assertNot(self.manager._download_and_decrypt.called) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_put_stores_on_local_db(self): + self.manager._encrypt_and_upload = Mock(return_value=None) + msg, blob_id = "Hey Joe", uuid4().hex + doc = BlobDoc(BytesIO(msg), blob_id=blob_id) + yield self.manager.put(doc, size=len(msg)) + result = yield self.manager.local.get(blob_id) + self.assertEquals(result.getvalue(), msg) + self.assertTrue(self.manager._encrypt_and_upload.called) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_put_then_get_using_real_file_descriptor(self): + self.manager._encrypt_and_upload = Mock(return_value=None) + self.manager._download_and_decrypt = Mock(return_value=None) + msg, blob_id = "Fuuuuull cycleee! \o/", uuid4().hex + tmpfile = os.tmpfile() + tmpfile.write(msg) + tmpfile.seek(0) + doc = BlobDoc(tmpfile, blob_id) + yield self.manager.put(doc, size=len(msg)) + result = yield self.manager.get(doc.blob_id) + self.assertEquals(result.getvalue(), msg) + self.assertTrue(self.manager._encrypt_and_upload.called) + self.assertFalse(self.manager._download_and_decrypt.called) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_local_list_blobs(self): + self.manager._encrypt_and_upload = Mock(return_value=None) + msg, blob_id1, blob_id2 = "1337", uuid4().hex, uuid4().hex + doc = BlobDoc(BytesIO(msg), blob_id1) + yield self.manager.put(doc, size=len(msg)) + doc2 = BlobDoc(BytesIO(msg), blob_id2) + yield self.manager.put(doc2, size=len(msg)) + blobs_list = yield self.manager.local_list() + + self.assertEquals(set([blob_id1, blob_id2]), set(blobs_list)) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_send_missing(self): + fd, missing_id = BytesIO('test'), uuid4().hex + self.manager._encrypt_and_upload = Mock(return_value=None) + self.manager.remote_list = Mock(return_value=[]) + yield self.manager.local.put(missing_id, fd, 4) + yield self.manager.send_missing() + + call_list = self.manager._encrypt_and_upload.call_args_list + self.assertEquals(1, len(call_list)) + call_blob_id, call_fd = call_list[0][0] + self.assertEquals(missing_id, call_blob_id) + self.assertEquals('test', call_fd.getvalue()) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_duplicated_blob_error_on_put(self): + self.manager._encrypt_and_upload = Mock(return_value=None) + content, existing_id = "Blob content", uuid4().hex + doc1 = BlobDoc(BytesIO(content), existing_id) + yield self.manager.put(doc1, len(content)) + doc2 = BlobDoc(BytesIO(content), existing_id) + self.manager._encrypt_and_upload.reset_mock() + with pytest.raises(BlobAlreadyExistsError): + yield self.manager.put(doc2, len(content)) + self.assertFalse(self.manager._encrypt_and_upload.called) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_delete_from_local_and_remote(self): + self.manager._encrypt_and_upload = Mock(return_value=None) + self.manager._delete_from_remote = Mock(return_value=None) + content, blob_id = "Blob content", uuid4().hex + doc1 = BlobDoc(BytesIO(content), blob_id) + yield self.manager.put(doc1, len(content)) + yield self.manager.delete(blob_id) + local_list = yield self.manager.local_list() + self.assertEquals(0, len(local_list)) + params = {'namespace': ''} + self.manager._delete_from_remote.assert_called_with(blob_id, **params) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_local_sync_status_pending_upload(self): + upload_failure = defer.fail(Exception()) + self.manager._encrypt_and_upload = Mock(return_value=upload_failure) + content, blob_id = "Blob content", uuid4().hex + doc1 = BlobDoc(BytesIO(content), blob_id) + with pytest.raises(Exception): + yield self.manager.put(doc1, len(content)) + pending_upload = SyncStatus.PENDING_UPLOAD + local_list = yield self.manager.local_list(sync_status=pending_upload) + self.assertIn(blob_id, local_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_upload_retry_limit(self): + self.manager.remote_list = Mock(return_value=[]) + content, blob_id = "Blob content", uuid4().hex + doc1 = BlobDoc(BytesIO(content), blob_id) + with pytest.raises(Exception): + yield self.manager.put(doc1, len(content)) + for _ in range(self.manager.max_retries + 1): + with pytest.raises(SchemeNotSupported): + yield self.manager.send_missing() + failed_upload = SyncStatus.FAILED_UPLOAD + local_list = yield self.manager.local_list(sync_status=failed_upload) + self.assertIn(blob_id, local_list) diff --git a/tests/blobs/test_decrypter_buffer.py b/tests/blobs/test_decrypter_buffer.py new file mode 100644 index 00000000..83fbaad3 --- /dev/null +++ b/tests/blobs/test_decrypter_buffer.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# test_blobs.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for blobs decrypter buffer. A component which is used as a decryption +sink during blob stream download. +""" +from io import BytesIO +from mock import Mock + +from twisted.trial import unittest +from twisted.internet import defer + +from leap.soledad.client._db.blobs import DecrypterBuffer +from leap.soledad.client._db.blobs import BlobManager +from leap.soledad.client._db.blobs import FIXED_REV +from leap.soledad.client import _crypto + + +class DecrypterBufferCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-BLOB-ID' + rev = FIXED_REV + + def setUp(self): + self.cleartext = BytesIO('rosa de foc') + self.secret = 'A' * 96 + self.blob = _crypto.BlobEncryptor( + self.doc_info, self.cleartext, + armor=False, + secret='A' * 96) + + @defer.inlineCallbacks + def test_decrypt_buffer(self): + encrypted = (yield self.blob.encrypt()).getvalue() + tag = encrypted[-16:] + buf = DecrypterBuffer(self.doc_info.doc_id, self.secret, tag) + buf.write(encrypted) + fd, size = buf.close() + self.assertEquals(fd.getvalue(), 'rosa de foc') + + @defer.inlineCallbacks + def test_decrypt_uploading_encrypted_blob(self): + + @defer.inlineCallbacks + def _check_result(uri, data, *args, **kwargs): + decryptor = _crypto.BlobDecryptor( + self.doc_info, data, + armor=False, + secret=self.secret) + decrypted = yield decryptor.decrypt() + self.assertEquals(decrypted.getvalue(), 'up and up') + defer.returnValue(Mock(code=200)) + + manager = BlobManager('', '', self.secret, self.secret, 'user') + fd = BytesIO('up and up') + manager._client.put = _check_result + yield manager._encrypt_and_upload(self.doc_info.doc_id, fd) diff --git a/tests/blobs/test_fs_backend.py b/tests/blobs/test_fs_backend.py new file mode 100644 index 00000000..53f3127d --- /dev/null +++ b/tests/blobs/test_fs_backend.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# test_fs_backend.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for blobs backend on server side. +""" +from twisted.trial import unittest +from twisted.internet import defer +from twisted.web.test.test_web import DummyRequest +from leap.soledad.server import _blobs +from io import BytesIO +from mock import Mock +import mock +import os +import base64 +import json +import pytest + + +class FilesystemBackendTestCase(unittest.TestCase): + + @mock.patch.object(_blobs, 'open') + def test_tag_header(self, open_mock): + open_mock.return_value = BytesIO('A' * 40 + 'B' * 16) + expected_tag = base64.urlsafe_b64encode('B' * 16) + expected_method = Mock() + backend = _blobs.FilesystemBlobsBackend() + request = Mock(responseHeaders=Mock(setRawHeaders=expected_method)) + backend.add_tag_header('user', 'blob_id', request) + + expected_method.assert_called_once_with('Tag', [expected_tag]) + + @mock.patch.object(_blobs.static, 'File') + def test_read_blob(self, file_mock): + render_mock = Mock() + file_mock.return_value = render_mock + backend = _blobs.FilesystemBlobsBackend() + request = DummyRequest(['']) + backend._get_path = Mock(return_value='path') + backend.read_blob('user', 'blob_id', request) + + backend._get_path.assert_called_once_with('user', 'blob_id', '') + ctype = 'application/octet-stream' + _blobs.static.File.assert_called_once_with('path', defaultType=ctype) + render_mock.render_GET.assert_called_once_with(request) + + @mock.patch.object(os.path, 'isfile') + @defer.inlineCallbacks + def test_cannot_overwrite(self, isfile): + isfile.return_value = True + backend = _blobs.FilesystemBlobsBackend() + backend._get_path = Mock(return_value='path') + request = DummyRequest(['']) + yield backend.write_blob('user', 'blob_id', request) + self.assertEquals(request.written[0], "Blob already exists: blob_id") + self.assertEquals(request.responseCode, 409) + + @pytest.mark.usefixtures("method_tmpdir") + @mock.patch.object(os.path, 'isfile') + @defer.inlineCallbacks + def test_write_cannot_exceed_quota(self, isfile): + isfile.return_value = False + backend = _blobs.FilesystemBlobsBackend() + backend._get_path = Mock(return_value=self.tempdir) + request = Mock() + + backend.get_total_storage = lambda x: 100 + backend.quota = 90 + yield backend.write_blob('user', 'blob_id', request) + + request.setResponseCode.assert_called_once_with(507) + request.write.assert_called_once_with('Quota Exceeded!') + + def test_get_path_partitioning_by_default(self): + backend = _blobs.FilesystemBlobsBackend() + backend.path = '/somewhere/' + path = backend._get_path('user', 'blob_id', '') + expected = '/somewhere/user/default/b/blo/blob_i/blob_id' + self.assertEquals(path, expected) + + def test_get_path_custom(self): + backend = _blobs.FilesystemBlobsBackend() + backend.path = '/somewhere/' + path = backend._get_path('user', 'blob_id', 'wonderland') + expected = '/somewhere/user/wonderland/b/blo/blob_i/blob_id' + self.assertEquals(expected, path) + + def test_get_path_namespace_traversal_raises(self): + backend = _blobs.FilesystemBlobsBackend() + backend.path = '/somewhere/' + with pytest.raises(Exception): + backend._get_path('user', 'blob_id', '..') + + @pytest.mark.usefixtures("method_tmpdir") + @mock.patch('leap.soledad.server._blobs.os.walk') + def test_list_blobs(self, walk_mock): + backend, _ = _blobs.FilesystemBlobsBackend(self.tempdir), None + walk_mock.return_value = [('', _, ['blob_0']), ('', _, ['blob_1'])] + result = json.loads(backend.list_blobs('user', DummyRequest(['']))) + self.assertEquals(result, ['blob_0', 'blob_1']) + + @pytest.mark.usefixtures("method_tmpdir") + @mock.patch('leap.soledad.server._blobs.os.walk') + def test_list_blobs_limited_by_namespace(self, walk_mock): + backend, _ = _blobs.FilesystemBlobsBackend(self.tempdir), None + walk_mock.return_value = [('', _, ['blob_0']), ('', _, ['blob_1'])] + result = json.loads(backend.list_blobs('user', DummyRequest(['']), + namespace='incoming')) + self.assertEquals(result, ['blob_0', 'blob_1']) + target_dir = os.path.join(self.tempdir, 'user', 'incoming') + walk_mock.assert_called_once_with(target_dir) + + @pytest.mark.usefixtures("method_tmpdir") + def test_path_validation_on_read_blob(self): + blobs_path, request = self.tempdir, DummyRequest(['']) + backend = _blobs.FilesystemBlobsBackend(blobs_path) + with pytest.raises(Exception): + backend.read_blob('..', '..', request) + with pytest.raises(Exception): + backend.read_blob('user', '../../../', request) + with pytest.raises(Exception): + backend.read_blob('../../../', 'blob_id', request) + with pytest.raises(Exception): + backend.read_blob('user', 'blob_id', request, namespace='..') + + @pytest.mark.usefixtures("method_tmpdir") + @defer.inlineCallbacks + def test_path_validation_on_write_blob(self): + blobs_path, request = self.tempdir, DummyRequest(['']) + backend = _blobs.FilesystemBlobsBackend(blobs_path) + with pytest.raises(Exception): + yield backend.write_blob('..', '..', request) + with pytest.raises(Exception): + yield backend.write_blob('user', '../../../', request) + with pytest.raises(Exception): + yield backend.write_blob('../../../', 'id1', request) + with pytest.raises(Exception): + yield backend.write_blob('user', 'id2', request, namespace='..') + + @pytest.mark.usefixtures("method_tmpdir") + @mock.patch('leap.soledad.server._blobs.os.unlink') + def test_delete_blob(self, unlink_mock): + backend = _blobs.FilesystemBlobsBackend(self.tempdir) + backend.delete_blob('user', 'blob_id') + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id') + '.flags') + + @pytest.mark.usefixtures("method_tmpdir") + @mock.patch('leap.soledad.server._blobs.os.unlink') + def test_delete_blob_custom_namespace(self, unlink_mock): + backend = _blobs.FilesystemBlobsBackend(self.tempdir) + backend.delete_blob('user', 'blob_id', namespace='trash') + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id', + 'trash')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id', + 'trash') + '.flags') diff --git a/tests/blobs/test_sqlcipher_client_backend.py b/tests/blobs/test_sqlcipher_client_backend.py new file mode 100644 index 00000000..daf561c7 --- /dev/null +++ b/tests/blobs/test_sqlcipher_client_backend.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# test_sqlcipher_client_backend.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for sqlcipher backend on blobs client. +""" +from twisted.trial import unittest +from twisted.internet import defer +from leap.soledad.client._db.blobs import SQLiteBlobBackend +from io import BytesIO +from uuid import uuid4 +import pytest + + +class SQLBackendTestCase(unittest.TestCase): + + def setUp(self): + self.key = "A" * 96 + self.local = SQLiteBlobBackend(self.tempdir, self.key) + self.addCleanup(self.local.close) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_inexisting(self): + bad_blob_id = uuid4().hex + self.assertFalse((yield self.local.exists(bad_blob_id))) + result = yield self.local.get(bad_blob_id) + self.assertIsNone(result) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_existing(self): + blob_id = uuid4().hex + content = "x" + yield self.local.put(blob_id, BytesIO(content), len(content)) + result = yield self.local.get(blob_id) + self.assertTrue((yield self.local.exists(blob_id))) + self.assertEquals(result.getvalue(), content) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_delete(self): + blob_id1, blob_id2 = uuid4().hex, uuid4().hex + content = "x" + yield self.local.put(blob_id1, BytesIO(content), len(content)) + yield self.local.put(blob_id2, BytesIO(content), len(content)) + yield self.local.delete(blob_id1) + self.assertFalse((yield self.local.exists(blob_id1))) + self.assertTrue((yield self.local.exists(blob_id2))) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_list(self): + blob_ids = [uuid4().hex for _ in range(10)] + content = "x" + deferreds = [] + for blob_id in blob_ids: + deferreds.append(self.local.put(blob_id, BytesIO(content), + len(content))) + yield defer.gatherResults(deferreds) + result = yield self.local.list() + self.assertEquals(set(blob_ids), set(result)) |