From 64107009a869a6ddb6cea129e65735d9740e697b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 8 Jul 2017 07:04:49 -0300 Subject: [feature] add get/set flags IncomingBox spec has a flags feature for the processing flow of messages. This commit adds it using a .flags file. -- Resolves: #8869 --- src/leap/soledad/client/_db/blobs.py | 25 ++++++++++++++++ src/leap/soledad/common/blobs/__init__.py | 3 ++ src/leap/soledad/server/_blobs.py | 50 ++++++++++++++++++++++++++++++- testing/tests/blobs/test_fs_backend.py | 15 ++++++---- testing/tests/server/test_blobs_server.py | 46 ++++++++++++++++++++++++++++ 5 files changed, 133 insertions(+), 6 deletions(-) diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 604d4bef..f693ec20 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -22,6 +22,7 @@ from urlparse import urljoin import binascii import os +import json import base64 from io import BytesIO @@ -57,6 +58,10 @@ class BlobAlreadyExistsError(SoledadError): pass +class InvalidFlagsError(SoledadError): + pass + + class ConnectionPool(adbapi.ConnectionPool): def insertAndGetLastRowid(self, *args, **kwargs): @@ -106,6 +111,8 @@ class ConnectionPool(adbapi.ConnectionPool): def check_http_status(code): if code == 409: raise BlobAlreadyExistsError() + elif code == 406: + raise InvalidFlagsError() elif code != 200: raise SoledadError("Server Error") @@ -213,6 +220,24 @@ class BlobManager(object): fd = yield self.local.get(doc.blob_id) yield self._encrypt_and_upload(doc.blob_id, fd) + @defer.inlineCallbacks + def set_flags(self, blob_id, flags, **params): + flags = BytesIO(json.dumps(flags)) + uri = urljoin(self.remote, self.user + "/" + blob_id) + response = yield self._client.post(uri, data=flags, params=params) + check_http_status(response.code) + + @defer.inlineCallbacks + def get_flags(self, blob_id, **params): + uri = urljoin(self.remote, self.user + "/" + blob_id) + params.update({'only_flags': True}) + data = yield self._client.get(uri, params=params) + try: + data = yield data.json() + except: + data = [] + defer.returnValue(data) + @defer.inlineCallbacks def get(self, blob_id): local_blob = yield self.local.get(blob_id) diff --git a/src/leap/soledad/common/blobs/__init__.py b/src/leap/soledad/common/blobs/__init__.py index e69de29b..814829ab 100644 --- a/src/leap/soledad/common/blobs/__init__.py +++ b/src/leap/soledad/common/blobs/__init__.py @@ -0,0 +1,3 @@ +from collections import namedtuple +ACCEPTED_FLAGS = ['PENDING', 'PROCESSING', 'PROCESSED', 'FAILED'] +Flags = namedtuple('Flags', ' '.join(ACCEPTED_FLAGS))(*ACCEPTED_FLAGS) diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py index 37247c9e..2a0e1d57 100644 --- a/src/leap/soledad/server/_blobs.py +++ b/src/leap/soledad/server/_blobs.py @@ -40,6 +40,7 @@ from zope.interface import implementer from leap.common.files import mkdir_p from leap.soledad.server import interfaces +from leap.soledad.common.blobs import ACCEPTED_FLAGS __all__ = ['BlobsResource'] @@ -73,6 +74,40 @@ class FilesystemBlobsBackend(object): _file = static.File(path, defaultType='application/octet-stream') return _file.render_GET(request) + def get_flags(self, user, blob_id, request, namespace=''): + path = self._get_path(user, blob_id, namespace) + try: + mkdir_p(os.path.split(path)[0]) + except OSError: + pass + if not os.path.isfile(path): + # 404 - Not Found + request.setResponseCode(404) + return "Blob doesn't exists: %s" % blob_id + if not os.path.isfile(path + '.flags'): + return '[]' + with open(path + '.flags', 'r') as flags_file: + return flags_file.read() + + def set_flags(self, user, blob_id, request, namespace=''): + path = self._get_path(user, blob_id, namespace) + try: + mkdir_p(os.path.split(path)[0]) + except OSError: + pass + if not os.path.isfile(path): + # 404 - Not Found + request.setResponseCode(404) + return "Blob doesn't exists: %s" % blob_id + raw_flags = request.content.read() + flags = json.loads(raw_flags) + for flag in flags: + if flag not in ACCEPTED_FLAGS: + request.setResponseCode(406) + return "Unsupported flag: %s" % flag + with open(path + '.flags', 'w') as flags_file: + flags_file.write(raw_flags) + @defer.inlineCallbacks def write_blob(self, user, blob_id, request, namespace=''): path = self._get_path(user, blob_id, namespace) @@ -98,6 +133,10 @@ class FilesystemBlobsBackend(object): def delete_blob(self, user, blob_id, namespace=''): blob_path = self._get_path(user, blob_id, namespace) os.unlink(blob_path) + try: + os.unlink(blob_path + '.flags') + except: + pass def get_blob_size(user, blob_id, namespace=''): raise NotImplementedError @@ -106,7 +145,8 @@ class FilesystemBlobsBackend(object): blob_ids = [] base_path = self._get_path(user, custom_preffix=namespace) for root, dirs, filenames in os.walk(base_path): - blob_ids += [os.path.join(root, name) for name in filenames] + blob_ids += [os.path.join(root, name) for name in filenames + if not name.endswith('.flags')] if order_by in ['date', '+date']: blob_ids.sort(key=lambda x: os.path.getmtime(x)) elif order_by == '-date': @@ -191,6 +231,8 @@ class BlobsResource(resource.Resource): order = request.args.get('order_by', [None])[0] return self._handler.list_blobs(user, request, namespace, order_by=order) + if 'only_flags' in request.args: + return self._handler.get_flags(user, blob_id, request, namespace) self._handler.add_tag_header(user, blob_id, request) return self._handler.read_blob(user, blob_id, request, namespace) @@ -208,6 +250,12 @@ class BlobsResource(resource.Resource): d.addErrback(self._error, request) return NOT_DONE_YET + def render_POST(self, request): + logger.info("http post: %s" % request.path) + user, blob_id, namespace = self._validate(request) + self._handler.set_flags(user, blob_id, request, namespace) + return '' + def _error(self, e, request): logger.error('Error processing request: %s' % e.getErrorMessage()) request.setResponseCode(500) diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py index a6d7272d..f742f702 100644 --- a/testing/tests/blobs/test_fs_backend.py +++ b/testing/tests/blobs/test_fs_backend.py @@ -153,14 +153,19 @@ class FilesystemBackendTestCase(unittest.TestCase): def test_delete_blob(self, unlink_mock): backend = _blobs.FilesystemBlobsBackend(self.tempdir) backend.delete_blob('user', 'blob_id') - unlink_mock.assert_called_once_with(backend._get_path('user', - 'blob_id')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id') + '.flags') @pytest.mark.usefixtures("method_tmpdir") @mock.patch('leap.soledad.server._blobs.os.unlink') def test_delete_blob_custom_namespace(self, unlink_mock): backend = _blobs.FilesystemBlobsBackend(self.tempdir) backend.delete_blob('user', 'blob_id', namespace='trash') - unlink_mock.assert_called_once_with(backend._get_path('user', - 'blob_id', - 'trash')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id', + 'trash')) + unlink_mock.assert_any_call(backend._get_path('user', + 'blob_id', + 'trash') + '.flags') diff --git a/testing/tests/server/test_blobs_server.py b/testing/tests/server/test_blobs_server.py index f4c9119b..0db64256 100644 --- a/testing/tests/server/test_blobs_server.py +++ b/testing/tests/server/test_blobs_server.py @@ -26,9 +26,11 @@ from twisted.internet import reactor from twisted.internet import defer from treq._utils import set_global_pool +from leap.soledad.common.blobs import Flags from leap.soledad.server import _blobs as server_blobs from leap.soledad.client._db.blobs import BlobManager from leap.soledad.client._db.blobs import BlobAlreadyExistsError +from leap.soledad.client._db.blobs import InvalidFlagsError class BlobServerTestCase(unittest.TestCase): @@ -55,6 +57,50 @@ class BlobServerTestCase(unittest.TestCase): blob, size = yield manager._download_and_decrypt('blob_id') self.assertEquals(blob.getvalue(), "save me") + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_set_get_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + flags = yield manager.get_flags('blob_id') + self.assertEquals([Flags.PROCESSING], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_cant_set_invalid_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + with pytest.raises(InvalidFlagsError): + yield manager.set_flags('blob_id', ['invalid']) + flags = yield manager.get_flags('blob_id') + self.assertEquals([], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_get_empty_flags(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + flags = yield manager.get_flags('blob_id') + self.assertEquals([], flags) + + @defer.inlineCallbacks + @pytest.mark.usefixtures("method_tmpdir") + def test_flags_ignored_by_listing(self): + manager = BlobManager('', self.uri, self.secret, + self.secret, 'user') + fd = BytesIO("flag me") + yield manager._encrypt_and_upload('blob_id', fd) + yield manager.set_flags('blob_id', [Flags.PROCESSING]) + blobs_list = yield manager.remote_list() + self.assertEquals(['blob_id'], blobs_list) + @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") def test_upload_changes_remote_list(self): -- cgit v1.2.3