summaryrefslogtreecommitdiff
path: root/testing
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-07-08 07:04:49 -0300
committerdrebs <drebs@leap.se>2017-07-21 10:58:47 -0300
commit64107009a869a6ddb6cea129e65735d9740e697b (patch)
treedabe4ff46f652a864c95db59e4776b59f1114cb3 /testing
parenteb000dac7a66ea249adb8c9b7d154101b897311c (diff)
[feature] add get/set flags
IncomingBox spec has a flags feature for the processing flow of messages. This commit adds it using a .flags file. -- Resolves: #8869
Diffstat (limited to 'testing')
-rw-r--r--testing/tests/blobs/test_fs_backend.py15
-rw-r--r--testing/tests/server/test_blobs_server.py46
2 files changed, 56 insertions, 5 deletions
diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py
index a6d7272d..f742f702 100644
--- a/testing/tests/blobs/test_fs_backend.py
+++ b/testing/tests/blobs/test_fs_backend.py
@@ -153,14 +153,19 @@ class FilesystemBackendTestCase(unittest.TestCase):
def test_delete_blob(self, unlink_mock):
backend = _blobs.FilesystemBlobsBackend(self.tempdir)
backend.delete_blob('user', 'blob_id')
- unlink_mock.assert_called_once_with(backend._get_path('user',
- 'blob_id'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id') + '.flags')
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch('leap.soledad.server._blobs.os.unlink')
def test_delete_blob_custom_namespace(self, unlink_mock):
backend = _blobs.FilesystemBlobsBackend(self.tempdir)
backend.delete_blob('user', 'blob_id', namespace='trash')
- unlink_mock.assert_called_once_with(backend._get_path('user',
- 'blob_id',
- 'trash'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id',
+ 'trash'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id',
+ 'trash') + '.flags')
diff --git a/testing/tests/server/test_blobs_server.py b/testing/tests/server/test_blobs_server.py
index f4c9119b..0db64256 100644
--- a/testing/tests/server/test_blobs_server.py
+++ b/testing/tests/server/test_blobs_server.py
@@ -26,9 +26,11 @@ from twisted.internet import reactor
from twisted.internet import defer
from treq._utils import set_global_pool
+from leap.soledad.common.blobs import Flags
from leap.soledad.server import _blobs as server_blobs
from leap.soledad.client._db.blobs import BlobManager
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
+from leap.soledad.client._db.blobs import InvalidFlagsError
class BlobServerTestCase(unittest.TestCase):
@@ -57,6 +59,50 @@ class BlobServerTestCase(unittest.TestCase):
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
+ def test_set_get_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([Flags.PROCESSING], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_cant_set_invalid_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ with pytest.raises(InvalidFlagsError):
+ yield manager.set_flags('blob_id', ['invalid'])
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_empty_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_flags_ignored_by_listing(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ blobs_list = yield manager.remote_list()
+ self.assertEquals(['blob_id'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
def test_upload_changes_remote_list(self):
manager = BlobManager('', self.uri, self.secret,
self.secret, 'user')