summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-07-08 07:04:49 -0300
committerdrebs <drebs@leap.se>2017-07-21 10:58:47 -0300
commit64107009a869a6ddb6cea129e65735d9740e697b (patch)
treedabe4ff46f652a864c95db59e4776b59f1114cb3
parenteb000dac7a66ea249adb8c9b7d154101b897311c (diff)
[feature] add get/set flags
IncomingBox spec has a flags feature for the processing flow of messages. This commit adds it using a .flags file. -- Resolves: #8869
-rw-r--r--src/leap/soledad/client/_db/blobs.py25
-rw-r--r--src/leap/soledad/common/blobs/__init__.py3
-rw-r--r--src/leap/soledad/server/_blobs.py50
-rw-r--r--testing/tests/blobs/test_fs_backend.py15
-rw-r--r--testing/tests/server/test_blobs_server.py46
5 files changed, 133 insertions, 6 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 604d4bef..f693ec20 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -22,6 +22,7 @@ from urlparse import urljoin
import binascii
import os
+import json
import base64
from io import BytesIO
@@ -57,6 +58,10 @@ class BlobAlreadyExistsError(SoledadError):
pass
+class InvalidFlagsError(SoledadError):
+ pass
+
+
class ConnectionPool(adbapi.ConnectionPool):
def insertAndGetLastRowid(self, *args, **kwargs):
@@ -106,6 +111,8 @@ class ConnectionPool(adbapi.ConnectionPool):
def check_http_status(code):
if code == 409:
raise BlobAlreadyExistsError()
+ elif code == 406:
+ raise InvalidFlagsError()
elif code != 200:
raise SoledadError("Server Error")
@@ -214,6 +221,24 @@ class BlobManager(object):
yield self._encrypt_and_upload(doc.blob_id, fd)
@defer.inlineCallbacks
+ def set_flags(self, blob_id, flags, **params):
+ flags = BytesIO(json.dumps(flags))
+ uri = urljoin(self.remote, self.user + "/" + blob_id)
+ response = yield self._client.post(uri, data=flags, params=params)
+ check_http_status(response.code)
+
+ @defer.inlineCallbacks
+ def get_flags(self, blob_id, **params):
+ uri = urljoin(self.remote, self.user + "/" + blob_id)
+ params.update({'only_flags': True})
+ data = yield self._client.get(uri, params=params)
+ try:
+ data = yield data.json()
+ except:
+ data = []
+ defer.returnValue(data)
+
+ @defer.inlineCallbacks
def get(self, blob_id):
local_blob = yield self.local.get(blob_id)
if local_blob:
diff --git a/src/leap/soledad/common/blobs/__init__.py b/src/leap/soledad/common/blobs/__init__.py
index e69de29b..814829ab 100644
--- a/src/leap/soledad/common/blobs/__init__.py
+++ b/src/leap/soledad/common/blobs/__init__.py
@@ -0,0 +1,3 @@
+from collections import namedtuple
+ACCEPTED_FLAGS = ['PENDING', 'PROCESSING', 'PROCESSED', 'FAILED']
+Flags = namedtuple('Flags', ' '.join(ACCEPTED_FLAGS))(*ACCEPTED_FLAGS)
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
index 37247c9e..2a0e1d57 100644
--- a/src/leap/soledad/server/_blobs.py
+++ b/src/leap/soledad/server/_blobs.py
@@ -40,6 +40,7 @@ from zope.interface import implementer
from leap.common.files import mkdir_p
from leap.soledad.server import interfaces
+from leap.soledad.common.blobs import ACCEPTED_FLAGS
__all__ = ['BlobsResource']
@@ -73,6 +74,40 @@ class FilesystemBlobsBackend(object):
_file = static.File(path, defaultType='application/octet-stream')
return _file.render_GET(request)
+ def get_flags(self, user, blob_id, request, namespace=''):
+ path = self._get_path(user, blob_id, namespace)
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError:
+ pass
+ if not os.path.isfile(path):
+ # 404 - Not Found
+ request.setResponseCode(404)
+ return "Blob doesn't exists: %s" % blob_id
+ if not os.path.isfile(path + '.flags'):
+ return '[]'
+ with open(path + '.flags', 'r') as flags_file:
+ return flags_file.read()
+
+ def set_flags(self, user, blob_id, request, namespace=''):
+ path = self._get_path(user, blob_id, namespace)
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError:
+ pass
+ if not os.path.isfile(path):
+ # 404 - Not Found
+ request.setResponseCode(404)
+ return "Blob doesn't exists: %s" % blob_id
+ raw_flags = request.content.read()
+ flags = json.loads(raw_flags)
+ for flag in flags:
+ if flag not in ACCEPTED_FLAGS:
+ request.setResponseCode(406)
+ return "Unsupported flag: %s" % flag
+ with open(path + '.flags', 'w') as flags_file:
+ flags_file.write(raw_flags)
+
@defer.inlineCallbacks
def write_blob(self, user, blob_id, request, namespace=''):
path = self._get_path(user, blob_id, namespace)
@@ -98,6 +133,10 @@ class FilesystemBlobsBackend(object):
def delete_blob(self, user, blob_id, namespace=''):
blob_path = self._get_path(user, blob_id, namespace)
os.unlink(blob_path)
+ try:
+ os.unlink(blob_path + '.flags')
+ except:
+ pass
def get_blob_size(user, blob_id, namespace=''):
raise NotImplementedError
@@ -106,7 +145,8 @@ class FilesystemBlobsBackend(object):
blob_ids = []
base_path = self._get_path(user, custom_preffix=namespace)
for root, dirs, filenames in os.walk(base_path):
- blob_ids += [os.path.join(root, name) for name in filenames]
+ blob_ids += [os.path.join(root, name) for name in filenames
+ if not name.endswith('.flags')]
if order_by in ['date', '+date']:
blob_ids.sort(key=lambda x: os.path.getmtime(x))
elif order_by == '-date':
@@ -191,6 +231,8 @@ class BlobsResource(resource.Resource):
order = request.args.get('order_by', [None])[0]
return self._handler.list_blobs(user, request, namespace,
order_by=order)
+ if 'only_flags' in request.args:
+ return self._handler.get_flags(user, blob_id, request, namespace)
self._handler.add_tag_header(user, blob_id, request)
return self._handler.read_blob(user, blob_id, request, namespace)
@@ -208,6 +250,12 @@ class BlobsResource(resource.Resource):
d.addErrback(self._error, request)
return NOT_DONE_YET
+ def render_POST(self, request):
+ logger.info("http post: %s" % request.path)
+ user, blob_id, namespace = self._validate(request)
+ self._handler.set_flags(user, blob_id, request, namespace)
+ return ''
+
def _error(self, e, request):
logger.error('Error processing request: %s' % e.getErrorMessage())
request.setResponseCode(500)
diff --git a/testing/tests/blobs/test_fs_backend.py b/testing/tests/blobs/test_fs_backend.py
index a6d7272d..f742f702 100644
--- a/testing/tests/blobs/test_fs_backend.py
+++ b/testing/tests/blobs/test_fs_backend.py
@@ -153,14 +153,19 @@ class FilesystemBackendTestCase(unittest.TestCase):
def test_delete_blob(self, unlink_mock):
backend = _blobs.FilesystemBlobsBackend(self.tempdir)
backend.delete_blob('user', 'blob_id')
- unlink_mock.assert_called_once_with(backend._get_path('user',
- 'blob_id'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id') + '.flags')
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch('leap.soledad.server._blobs.os.unlink')
def test_delete_blob_custom_namespace(self, unlink_mock):
backend = _blobs.FilesystemBlobsBackend(self.tempdir)
backend.delete_blob('user', 'blob_id', namespace='trash')
- unlink_mock.assert_called_once_with(backend._get_path('user',
- 'blob_id',
- 'trash'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id',
+ 'trash'))
+ unlink_mock.assert_any_call(backend._get_path('user',
+ 'blob_id',
+ 'trash') + '.flags')
diff --git a/testing/tests/server/test_blobs_server.py b/testing/tests/server/test_blobs_server.py
index f4c9119b..0db64256 100644
--- a/testing/tests/server/test_blobs_server.py
+++ b/testing/tests/server/test_blobs_server.py
@@ -26,9 +26,11 @@ from twisted.internet import reactor
from twisted.internet import defer
from treq._utils import set_global_pool
+from leap.soledad.common.blobs import Flags
from leap.soledad.server import _blobs as server_blobs
from leap.soledad.client._db.blobs import BlobManager
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
+from leap.soledad.client._db.blobs import InvalidFlagsError
class BlobServerTestCase(unittest.TestCase):
@@ -57,6 +59,50 @@ class BlobServerTestCase(unittest.TestCase):
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
+ def test_set_get_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([Flags.PROCESSING], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_cant_set_invalid_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ with pytest.raises(InvalidFlagsError):
+ yield manager.set_flags('blob_id', ['invalid'])
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_empty_flags(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ flags = yield manager.get_flags('blob_id')
+ self.assertEquals([], flags)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_flags_ignored_by_listing(self):
+ manager = BlobManager('', self.uri, self.secret,
+ self.secret, 'user')
+ fd = BytesIO("flag me")
+ yield manager._encrypt_and_upload('blob_id', fd)
+ yield manager.set_flags('blob_id', [Flags.PROCESSING])
+ blobs_list = yield manager.remote_list()
+ self.assertEquals(['blob_id'], blobs_list)
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
def test_upload_changes_remote_list(self):
manager = BlobManager('', self.uri, self.secret,
self.secret, 'user')