summaryrefslogtreecommitdiff
path: root/testing/tests/blobs
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/blobs')
-rw-r--r--testing/tests/blobs/test_blob_manager.py177
-rw-r--r--testing/tests/blobs/test_decrypter_buffer.py72
-rw-r--r--testing/tests/blobs/test_fs_backend.py173
-rw-r--r--testing/tests/blobs/test_sqlcipher_client_backend.py75
4 files changed, 0 insertions, 497 deletions
diff --git a/testing/tests/blobs/test_blob_manager.py b/testing/tests/blobs/test_blob_manager.py
deleted file mode 100644
index 7d985768..00000000
--- a/testing/tests/blobs/test_blob_manager.py
+++ /dev/null
@@ -1,177 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_local_backend.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for BlobManager.
-"""
-from twisted.trial import unittest
-from twisted.internet import defer
-from twisted.web.error import SchemeNotSupported
-from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV
-from leap.soledad.client._db.blobs import BlobAlreadyExistsError
-from leap.soledad.client._db.blobs import SyncStatus
-from io import BytesIO
-from mock import Mock
-from uuid import uuid4
-import pytest
-import os
-
-
-class BlobManagerTestCase(unittest.TestCase):
-
- class doc_info:
- doc_id = 'D-deadbeef'
- rev = FIXED_REV
-
- def setUp(self):
- self.cleartext = BytesIO('rosa de foc')
- self.secret = 'A' * 96
- self.manager = BlobManager(
- self.tempdir, '',
- 'A' * 32, self.secret,
- uuid4().hex, 'token', None)
- self.addCleanup(self.manager.close)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_get_missing(self):
- self.manager._download_and_decrypt = Mock(return_value=None)
- missing_blob_id = uuid4().hex
- result = yield self.manager.get(missing_blob_id)
- self.assertIsNone(result)
- args = missing_blob_id, ''
- self.manager._download_and_decrypt.assert_called_once_with(*args)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_get_from_existing_value(self):
- self.manager._download_and_decrypt = Mock(return_value=None)
- msg, blob_id = "It's me, M4r10!", uuid4().hex
- yield self.manager.local.put(blob_id, BytesIO(msg),
- size=len(msg))
- result = yield self.manager.get(blob_id)
- self.assertEquals(result.getvalue(), msg)
- self.assertNot(self.manager._download_and_decrypt.called)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_put_stores_on_local_db(self):
- self.manager._encrypt_and_upload = Mock(return_value=None)
- msg, blob_id = "Hey Joe", uuid4().hex
- doc = BlobDoc(BytesIO(msg), blob_id=blob_id)
- yield self.manager.put(doc, size=len(msg))
- result = yield self.manager.local.get(blob_id)
- self.assertEquals(result.getvalue(), msg)
- self.assertTrue(self.manager._encrypt_and_upload.called)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_put_then_get_using_real_file_descriptor(self):
- self.manager._encrypt_and_upload = Mock(return_value=None)
- self.manager._download_and_decrypt = Mock(return_value=None)
- msg, blob_id = "Fuuuuull cycleee! \o/", uuid4().hex
- tmpfile = os.tmpfile()
- tmpfile.write(msg)
- tmpfile.seek(0)
- doc = BlobDoc(tmpfile, blob_id)
- yield self.manager.put(doc, size=len(msg))
- result = yield self.manager.get(doc.blob_id)
- self.assertEquals(result.getvalue(), msg)
- self.assertTrue(self.manager._encrypt_and_upload.called)
- self.assertFalse(self.manager._download_and_decrypt.called)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_local_list_blobs(self):
- self.manager._encrypt_and_upload = Mock(return_value=None)
- msg, blob_id1, blob_id2 = "1337", uuid4().hex, uuid4().hex
- doc = BlobDoc(BytesIO(msg), blob_id1)
- yield self.manager.put(doc, size=len(msg))
- doc2 = BlobDoc(BytesIO(msg), blob_id2)
- yield self.manager.put(doc2, size=len(msg))
- blobs_list = yield self.manager.local_list()
-
- self.assertEquals(set([blob_id1, blob_id2]), set(blobs_list))
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_send_missing(self):
- fd, missing_id = BytesIO('test'), uuid4().hex
- self.manager._encrypt_and_upload = Mock(return_value=None)
- self.manager.remote_list = Mock(return_value=[])
- yield self.manager.local.put(missing_id, fd, 4)
- yield self.manager.send_missing()
-
- call_list = self.manager._encrypt_and_upload.call_args_list
- self.assertEquals(1, len(call_list))
- call_blob_id, call_fd = call_list[0][0]
- self.assertEquals(missing_id, call_blob_id)
- self.assertEquals('test', call_fd.getvalue())
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_duplicated_blob_error_on_put(self):
- self.manager._encrypt_and_upload = Mock(return_value=None)
- content, existing_id = "Blob content", uuid4().hex
- doc1 = BlobDoc(BytesIO(content), existing_id)
- yield self.manager.put(doc1, len(content))
- doc2 = BlobDoc(BytesIO(content), existing_id)
- self.manager._encrypt_and_upload.reset_mock()
- with pytest.raises(BlobAlreadyExistsError):
- yield self.manager.put(doc2, len(content))
- self.assertFalse(self.manager._encrypt_and_upload.called)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_delete_from_local_and_remote(self):
- self.manager._encrypt_and_upload = Mock(return_value=None)
- self.manager._delete_from_remote = Mock(return_value=None)
- content, blob_id = "Blob content", uuid4().hex
- doc1 = BlobDoc(BytesIO(content), blob_id)
- yield self.manager.put(doc1, len(content))
- yield self.manager.delete(blob_id)
- local_list = yield self.manager.local_list()
- self.assertEquals(0, len(local_list))
- params = {'namespace': ''}
- self.manager._delete_from_remote.assert_called_with(blob_id, **params)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_local_sync_status_pending_upload(self):
- upload_failure = defer.fail(Exception())
- self.manager._encrypt_and_upload = Mock(return_value=upload_failure)
- content, blob_id = "Blob content", uuid4().hex
- doc1 = BlobDoc(BytesIO(content), blob_id)
- with pytest.raises(Exception):
- yield self.manager.put(doc1, len(content))
- pending_upload = SyncStatus.PENDING_UPLOAD
- local_list = yield self.manager.local_list(sync_status=pending_upload)
- self.assertIn(blob_id, local_list)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_upload_retry_limit(self):
- self.manager.remote_list = Mock(return_value=[])
- content, blob_id = "Blob content", uuid4().hex
- doc1 = BlobDoc(BytesIO(content), blob_id)
- with pytest.raises(Exception):
- yield self.manager.put(doc1, len(content))
- for _ in range(self.manager.max_retries + 1):
- with pytest.raises(SchemeNotSupported):
- yield self.manager.send_missing()
- failed_upload = SyncStatus.FAILED_UPLOAD
- local_list = yield self.manager.local_list(sync_status=failed_upload)
- self.assertIn(blob_id, local_list)
diff --git a/testing/tests/blobs/test_decrypter_buffer.py b/testing/tests/blobs/test_decrypter_buffer.py
deleted file mode 100644
index 83fbaad3..00000000
--- a/testing/tests/blobs/test_decrypter_buffer.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_blobs.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for blobs decrypter buffer. A component which is used as a decryption
-sink during blob stream download.
-"""
-from io import BytesIO
-from mock import Mock
-
-from twisted.trial import unittest
-from twisted.internet import defer
-
-from leap.soledad.client._db.blobs import DecrypterBuffer
-from leap.soledad.client._db.blobs import BlobManager
-from leap.soledad.client._db.blobs import FIXED_REV
-from leap.soledad.client import _crypto
-
-
-class DecrypterBufferCase(unittest.TestCase):
-
- class doc_info:
- doc_id = 'D-BLOB-ID'
- rev = FIXED_REV
-
- def setUp(self):
- self.cleartext = BytesIO('rosa de foc')
- self.secret = 'A' * 96
- self.blob = _crypto.BlobEncryptor(
- self.doc_info, self.cleartext,
- armor=False,
- secret='A' * 96)
-
- @defer.inlineCallbacks
- def test_decrypt_buffer(self):
- encrypted = (yield self.blob.encrypt()).getvalue()
- tag = encrypted[-16:]
- buf = DecrypterBuffer(self.doc_info.doc_id, self.secret, tag)
- buf.write(encrypted)
- fd, size = buf.close()
- self.assertEquals(fd.getvalue(), 'rosa de foc')
-
- @defer.inlineCallbacks
- def test_decrypt_uploading_encrypted_blob(self):
-
- @defer.inlineCallbacks
- def _check_result(uri, data, *args, **kwargs):
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, data,
- armor=False,
- secret=self.secret)
- decrypted = yield decryptor.decrypt()
- self.assertEquals(decrypted.getvalue(), 'up and up')
- defer.returnValue(Mock(code=200))
-
- manager = BlobManager('', '', self.secret, self.secret, 'user')
- fd = BytesIO('up and up')
- manager._client.put = _check_result
- yield manager._encrypt_and_upload(self.doc_info.doc_id, fd)
diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py
deleted file mode 100644
index 53f3127d..00000000
--- a/testing/tests/blobs/test_fs_backend.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_fs_backend.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for blobs backend on server side.
-"""
-from twisted.trial import unittest
-from twisted.internet import defer
-from twisted.web.test.test_web import DummyRequest
-from leap.soledad.server import _blobs
-from io import BytesIO
-from mock import Mock
-import mock
-import os
-import base64
-import json
-import pytest
-
-
-class FilesystemBackendTestCase(unittest.TestCase):
-
- @mock.patch.object(_blobs, 'open')
- def test_tag_header(self, open_mock):
- open_mock.return_value = BytesIO('A' * 40 + 'B' * 16)
- expected_tag = base64.urlsafe_b64encode('B' * 16)
- expected_method = Mock()
- backend = _blobs.FilesystemBlobsBackend()
- request = Mock(responseHeaders=Mock(setRawHeaders=expected_method))
- backend.add_tag_header('user', 'blob_id', request)
-
- expected_method.assert_called_once_with('Tag', [expected_tag])
-
- @mock.patch.object(_blobs.static, 'File')
- def test_read_blob(self, file_mock):
- render_mock = Mock()
- file_mock.return_value = render_mock
- backend = _blobs.FilesystemBlobsBackend()
- request = DummyRequest([''])
- backend._get_path = Mock(return_value='path')
- backend.read_blob('user', 'blob_id', request)
-
- backend._get_path.assert_called_once_with('user', 'blob_id', '')
- ctype = 'application/octet-stream'
- _blobs.static.File.assert_called_once_with('path', defaultType=ctype)
- render_mock.render_GET.assert_called_once_with(request)
-
- @mock.patch.object(os.path, 'isfile')
- @defer.inlineCallbacks
- def test_cannot_overwrite(self, isfile):
- isfile.return_value = True
- backend = _blobs.FilesystemBlobsBackend()
- backend._get_path = Mock(return_value='path')
- request = DummyRequest([''])
- yield backend.write_blob('user', 'blob_id', request)
- self.assertEquals(request.written[0], "Blob already exists: blob_id")
- self.assertEquals(request.responseCode, 409)
-
- @pytest.mark.usefixtures("method_tmpdir")
- @mock.patch.object(os.path, 'isfile')
- @defer.inlineCallbacks
- def test_write_cannot_exceed_quota(self, isfile):
- isfile.return_value = False
- backend = _blobs.FilesystemBlobsBackend()
- backend._get_path = Mock(return_value=self.tempdir)
- request = Mock()
-
- backend.get_total_storage = lambda x: 100
- backend.quota = 90
- yield backend.write_blob('user', 'blob_id', request)
-
- request.setResponseCode.assert_called_once_with(507)
- request.write.assert_called_once_with('Quota Exceeded!')
-
- def test_get_path_partitioning_by_default(self):
- backend = _blobs.FilesystemBlobsBackend()
- backend.path = '/somewhere/'
- path = backend._get_path('user', 'blob_id', '')
- expected = '/somewhere/user/default/b/blo/blob_i/blob_id'
- self.assertEquals(path, expected)
-
- def test_get_path_custom(self):
- backend = _blobs.FilesystemBlobsBackend()
- backend.path = '/somewhere/'
- path = backend._get_path('user', 'blob_id', 'wonderland')
- expected = '/somewhere/user/wonderland/b/blo/blob_i/blob_id'
- self.assertEquals(expected, path)
-
- def test_get_path_namespace_traversal_raises(self):
- backend = _blobs.FilesystemBlobsBackend()
- backend.path = '/somewhere/'
- with pytest.raises(Exception):
- backend._get_path('user', 'blob_id', '..')
-
- @pytest.mark.usefixtures("method_tmpdir")
- @mock.patch('leap.soledad.server._blobs.os.walk')
- def test_list_blobs(self, walk_mock):
- backend, _ = _blobs.FilesystemBlobsBackend(self.tempdir), None
- walk_mock.return_value = [('', _, ['blob_0']), ('', _, ['blob_1'])]
- result = json.loads(backend.list_blobs('user', DummyRequest([''])))
- self.assertEquals(result, ['blob_0', 'blob_1'])
-
- @pytest.mark.usefixtures("method_tmpdir")
- @mock.patch('leap.soledad.server._blobs.os.walk')
- def test_list_blobs_limited_by_namespace(self, walk_mock):
- backend, _ = _blobs.FilesystemBlobsBackend(self.tempdir), None
- walk_mock.return_value = [('', _, ['blob_0']), ('', _, ['blob_1'])]
- result = json.loads(backend.list_blobs('user', DummyRequest(['']),
- namespace='incoming'))
- self.assertEquals(result, ['blob_0', 'blob_1'])
- target_dir = os.path.join(self.tempdir, 'user', 'incoming')
- walk_mock.assert_called_once_with(target_dir)
-
- @pytest.mark.usefixtures("method_tmpdir")
- def test_path_validation_on_read_blob(self):
- blobs_path, request = self.tempdir, DummyRequest([''])
- backend = _blobs.FilesystemBlobsBackend(blobs_path)
- with pytest.raises(Exception):
- backend.read_blob('..', '..', request)
- with pytest.raises(Exception):
- backend.read_blob('user', '../../../', request)
- with pytest.raises(Exception):
- backend.read_blob('../../../', 'blob_id', request)
- with pytest.raises(Exception):
- backend.read_blob('user', 'blob_id', request, namespace='..')
-
- @pytest.mark.usefixtures("method_tmpdir")
- @defer.inlineCallbacks
- def test_path_validation_on_write_blob(self):
- blobs_path, request = self.tempdir, DummyRequest([''])
- backend = _blobs.FilesystemBlobsBackend(blobs_path)
- with pytest.raises(Exception):
- yield backend.write_blob('..', '..', request)
- with pytest.raises(Exception):
- yield backend.write_blob('user', '../../../', request)
- with pytest.raises(Exception):
- yield backend.write_blob('../../../', 'id1', request)
- with pytest.raises(Exception):
- yield backend.write_blob('user', 'id2', request, namespace='..')
-
- @pytest.mark.usefixtures("method_tmpdir")
- @mock.patch('leap.soledad.server._blobs.os.unlink')
- def test_delete_blob(self, unlink_mock):
- backend = _blobs.FilesystemBlobsBackend(self.tempdir)
- backend.delete_blob('user', 'blob_id')
- unlink_mock.assert_any_call(backend._get_path('user',
- 'blob_id'))
- unlink_mock.assert_any_call(backend._get_path('user',
- 'blob_id') + '.flags')
-
- @pytest.mark.usefixtures("method_tmpdir")
- @mock.patch('leap.soledad.server._blobs.os.unlink')
- def test_delete_blob_custom_namespace(self, unlink_mock):
- backend = _blobs.FilesystemBlobsBackend(self.tempdir)
- backend.delete_blob('user', 'blob_id', namespace='trash')
- unlink_mock.assert_any_call(backend._get_path('user',
- 'blob_id',
- 'trash'))
- unlink_mock.assert_any_call(backend._get_path('user',
- 'blob_id',
- 'trash') + '.flags')
diff --git a/testing/tests/blobs/test_sqlcipher_client_backend.py b/testing/tests/blobs/test_sqlcipher_client_backend.py
deleted file mode 100644
index daf561c7..00000000
--- a/testing/tests/blobs/test_sqlcipher_client_backend.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_sqlcipher_client_backend.py
-# Copyright (C) 2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for sqlcipher backend on blobs client.
-"""
-from twisted.trial import unittest
-from twisted.internet import defer
-from leap.soledad.client._db.blobs import SQLiteBlobBackend
-from io import BytesIO
-from uuid import uuid4
-import pytest
-
-
-class SQLBackendTestCase(unittest.TestCase):
-
- def setUp(self):
- self.key = "A" * 96
- self.local = SQLiteBlobBackend(self.tempdir, self.key)
- self.addCleanup(self.local.close)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_get_inexisting(self):
- bad_blob_id = uuid4().hex
- self.assertFalse((yield self.local.exists(bad_blob_id)))
- result = yield self.local.get(bad_blob_id)
- self.assertIsNone(result)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_get_existing(self):
- blob_id = uuid4().hex
- content = "x"
- yield self.local.put(blob_id, BytesIO(content), len(content))
- result = yield self.local.get(blob_id)
- self.assertTrue((yield self.local.exists(blob_id)))
- self.assertEquals(result.getvalue(), content)
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_delete(self):
- blob_id1, blob_id2 = uuid4().hex, uuid4().hex
- content = "x"
- yield self.local.put(blob_id1, BytesIO(content), len(content))
- yield self.local.put(blob_id2, BytesIO(content), len(content))
- yield self.local.delete(blob_id1)
- self.assertFalse((yield self.local.exists(blob_id1)))
- self.assertTrue((yield self.local.exists(blob_id2)))
-
- @defer.inlineCallbacks
- @pytest.mark.usefixtures("method_tmpdir")
- def test_list(self):
- blob_ids = [uuid4().hex for _ in range(10)]
- content = "x"
- deferreds = []
- for blob_id in blob_ids:
- deferreds.append(self.local.put(blob_id, BytesIO(content),
- len(content)))
- yield defer.gatherResults(deferreds)
- result = yield self.local.list()
- self.assertEquals(set(blob_ids), set(result))