diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-05-01 04:21:39 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-05-01 04:34:18 -0300 |
commit | a79e59656b08c9c60e607e9863191dca41591ae0 (patch) | |
tree | c650833aae23475039890315385ecf8144e6ec14 /testing/tests/blobs/test_local_backend.py | |
parent | a1899b45442bc20ea0fe485e7b317521c53672f8 (diff) |
[test] rename file to match test case name
- Related: #8812
Diffstat (limited to 'testing/tests/blobs/test_local_backend.py')
-rw-r--r-- | testing/tests/blobs/test_local_backend.py | 132 |
1 files changed, 0 insertions, 132 deletions
diff --git a/testing/tests/blobs/test_local_backend.py b/testing/tests/blobs/test_local_backend.py deleted file mode 100644 index b5a14de4..00000000 --- a/testing/tests/blobs/test_local_backend.py +++ /dev/null @@ -1,132 +0,0 @@ -# -*- coding: utf-8 -*- -# test_local_backend.py -# Copyright (C) 2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for sqlcipher backend on blobs client. -""" -from twisted.trial import unittest -from twisted.internet import defer -from leap.soledad.client._blobs import BlobManager, BlobDoc, FIXED_REV -from leap.soledad.client._blobs import BlobAlreadyExistsError -from io import BytesIO -from mock import Mock -import pytest -import os - - -class BlobManagerTestCase(unittest.TestCase): - - class doc_info: - doc_id = 'D-deadbeef' - rev = FIXED_REV - - def setUp(self): - self.cleartext = BytesIO('rosa de foc') - self.secret = 'A' * 96 - self.manager = BlobManager( - self.tempdir, '', - 'A' * 32, self.secret, - 'uuid', 'token', None) - self.addCleanup(self.manager.close) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_inexistent(self): - self.manager._download_and_decrypt = Mock(return_value=None) - bad_blob_id = 'inexsistent_id' - result = yield self.manager.get(bad_blob_id) - self.assertIsNone(result) - self.manager._download_and_decrypt.assert_called_once_with(bad_blob_id) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_get_from_existing_value(self): - self.manager._download_and_decrypt = Mock(return_value=None) - msg = "It's me, M4r10!" - yield self.manager.local.put('myblob_id', BytesIO(msg), - size=len(msg)) - result = yield self.manager.get('myblob_id') - self.assertEquals(result.getvalue(), msg) - self.assertNot(self.manager._download_and_decrypt.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_put_stores_on_local_db(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - msg = "Hey Joe" - doc = BlobDoc(BytesIO(msg), blob_id='myblob_id') - yield self.manager.put(doc, size=len(msg)) - result = yield self.manager.local.get('myblob_id') - self.assertEquals(result.getvalue(), msg) - self.assertTrue(self.manager._encrypt_and_upload.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_put_then_get_using_real_file_descriptor(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - self.manager._download_and_decrypt = Mock(return_value=None) - msg = "Fuuuuull cycleee! \o/" - tmpfile = os.tmpfile() - tmpfile.write(msg) - tmpfile.seek(0) - doc = BlobDoc(tmpfile, 'myblob_id') - yield self.manager.put(doc, size=len(msg)) - result = yield self.manager.get(doc.blob_id) - self.assertEquals(result.getvalue(), msg) - self.assertTrue(self.manager._encrypt_and_upload.called) - self.assertFalse(self.manager._download_and_decrypt.called) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_local_list_blobs(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - msg = "1337" - doc = BlobDoc(BytesIO(msg), 'myblob_id') - yield self.manager.put(doc, size=len(msg)) - doc2 = BlobDoc(BytesIO(msg), 'myblob_id2') - yield self.manager.put(doc2, size=len(msg)) - blobs_list = yield self.manager.local_list() - - self.assertEquals(set(['myblob_id', 'myblob_id2']), set(blobs_list)) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_send_missing(self): - fd = BytesIO('test') - self.manager._encrypt_and_upload = Mock(return_value=None) - self.manager.remote_list = Mock(return_value=[]) - yield self.manager.local.put('missing_id', fd, 4) - yield self.manager.send_missing() - - call_list = self.manager._encrypt_and_upload.call_args_list - self.assertEquals(1, len(call_list)) - call_blob_id, call_fd = call_list[0][0] - self.assertEquals('missing_id', call_blob_id) - self.assertEquals('test', call_fd.getvalue()) - - @defer.inlineCallbacks - @pytest.mark.usefixtures("method_tmpdir") - def test_duplicated_blob_error_on_put(self): - self.manager._encrypt_and_upload = Mock(return_value=None) - content = "Blob content" - doc1 = BlobDoc(BytesIO(content), 'existing_id') - yield self.manager.put(doc1, len(content)) - doc2 = BlobDoc(BytesIO(content), 'existing_id') - # reset mock, so we can check that upload wasnt called - self.manager._encrypt_and_upload = Mock(return_value=None) - with pytest.raises(BlobAlreadyExistsError): - yield self.manager.put(doc2, len(content)) - self.assertFalse(self.manager._encrypt_and_upload.called) |