diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-07-08 07:04:49 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2017-07-21 10:58:47 -0300 |
commit | 64107009a869a6ddb6cea129e65735d9740e697b (patch) | |
tree | dabe4ff46f652a864c95db59e4776b59f1114cb3 /testing | |
parent | eb000dac7a66ea249adb8c9b7d154101b897311c (diff) |
[feature] add get/set flags
IncomingBox spec has a flags feature for the processing flow of
messages. This commit adds it using a .flags file.
-- Resolves: #8869
Diffstat (limited to 'testing')
-rw-r--r-- | testing/tests/blobs/test_fs_backend.py | 15 | ||||
-rw-r--r-- | testing/tests/server/test_blobs_server.py | 46 |
2 files changed, 56 insertions, 5 deletions
diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py index a6d7272d..f742f702 100644 --- a/testing/tests/blobs/test_fs_backend.py +++ b/testing/tests/blobs/test_fs_backend.py @@ -153,14 +153,19 @@ class FilesystemBackendTestCase(unittest.TestCase): def test_delete_blob(self, unlink_mock): backend = _blobs.FilesystemBlobsBackend(self.tempdir) backend.delete_blob('user', 'blob_id') - unlink_mock.assert_called_once_with(backend._get_path('user', - 'blob_id')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id') + '.flags') @pytest.mark.usefixtures("method_tmpdir") @mock.patch('leap.soledad.server._blobs.os.unlink') def test_delete_blob_custom_namespace(self, unlink_mock): backend = _blobs.FilesystemBlobsBackend(self.tempdir) backend.delete_blob('user', 'blob_id', namespace='trash') - unlink_mock.assert_called_once_with(backend._get_path('user', - 'blob_id', - 'trash')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id', + 'trash')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id', + 'trash') + '.flags') diff --git a/testing/tests/server/test_blobs_server.py b/testing/tests/server/test_blobs_server.py index f4c9119b..0db64256 100644 --- a/testing/tests/server/test_blobs_server.py +++ b/testing/tests/server/test_blobs_server.py @@ -26,9 +26,11 @@ from twisted.internet import reactor from twisted.internet import defer from treq._utils import set_global_pool +from leap.soledad.common.blobs import Flags from leap.soledad.server import _blobs as server_blobs from leap.soledad.client._db.blobs import BlobManager from leap.soledad.client._db.blobs import BlobAlreadyExistsError +from leap.soledad.client._db.blobs import InvalidFlagsError class BlobServerTestCase(unittest.TestCase): @@ -57,6 +59,50 @@ class BlobServerTestCase(unittest.TestCase): @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") + def test_set_get_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + flags = yield manager.get_flags('blob_id') + self.assertEquals([Flags.PROCESSING], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_cant_set_invalid_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + with pytest.raises(InvalidFlagsError): + yield manager.set_flags('blob_id', ['invalid']) + flags = yield manager.get_flags('blob_id') + self.assertEquals([], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_empty_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + flags = yield manager.get_flags('blob_id') + self.assertEquals([], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_flags_ignored_by_listing(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + blobs_list = yield manager.remote_list() + self.assertEquals(['blob_id'], blobs_list) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") def test_upload_changes_remote_list(self): manager = BlobManager('', self.uri, self.secret, self.secret, 'user') |