summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/server/_blobs.py121
-rw-r--r--src/leap/soledad/server/_streaming_resource.py18
2 files changed, 75 insertions, 64 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
index 4a963e10..cb726788 100644
--- a/src/leap/soledad/server/_blobs.py
+++ b/src/leap/soledad/server/_blobs.py
@@ -103,7 +103,7 @@ class FilesystemBlobsBackend(object):
def get_flags(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- return defer.fail(BlobNotFound)
+ return defer.fail(BlobNotFound())
if not os.path.isfile(path + '.flags'):
return defer.succeed([])
with open(path + '.flags', 'r') as flags_file:
@@ -113,13 +113,14 @@ class FilesystemBlobsBackend(object):
def set_flags(self, user, blob_id, flags, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound
+ return defer.fail(BlobNotFound())
for flag in flags:
if flag not in ACCEPTED_FLAGS:
- raise InvalidFlag(flag)
+ return defer.fail(InvalidFlag(flag))
with open(path + '.flags', 'w') as flags_file:
raw_flags = json.dumps(flags)
flags_file.write(raw_flags)
+ return defer.succeed(None)
@defer.inlineCallbacks
def write_blob(self, user, blob_id, fd, namespace=''):
@@ -143,7 +144,7 @@ class FilesystemBlobsBackend(object):
def delete_blob(self, user, blob_id, namespace=''):
blob_path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(blob_path):
- raise BlobNotFound
+ return defer.fail(BlobNotFound())
self.__touch(blob_path + '.deleted')
os.unlink(blob_path)
try:
@@ -182,7 +183,8 @@ class FilesystemBlobsBackend(object):
elif order_by == '-date':
blob_ids.sort(key=lambda x: os.path.getmtime(x), reverse=True)
elif order_by:
- raise Exception("Unsupported order_by parameter: %s" % order_by)
+ exc = Exception("Unsupported order_by parameter: %s" % order_by)
+ return defer.fail(exc)
if filter_flag:
blob_ids = list(self._filter_flag(blob_ids, filter_flag))
blob_ids = [os.path.basename(path).replace('.deleted', '')
@@ -205,7 +207,7 @@ class FilesystemBlobsBackend(object):
def get_tag(self, user, blob_id, namespace=''):
blob_path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(blob_path):
- raise BlobNotFound
+ return defer.fail(BlobNotFound())
with open(blob_path) as doc_file:
doc_file.seek(-16, 2)
tag = base64.urlsafe_b64encode(doc_file.read())
@@ -276,6 +278,48 @@ class BlobFile(resource.Resource):
return NOT_DONE_YET
+def _catchBlobNotFound(failure, request, user, blob_id):
+ failure.trap(BlobNotFound)
+ logger.error("Error 404: Blob %s does not exist for user %s"
+ % (blob_id, user))
+ request.setResponseCode(404)
+ request.write("Blob doesn't exists: %s" % blob_id)
+ request.finish()
+
+
+def _catchBlobExists(failure, request, user, blob_id):
+ failure.trap(BlobExists)
+ logger.error("Error 409: Blob %s already exists for user %s"
+ % (blob_id, user))
+ request.setResponseCode(409)
+ request.write("Blob already exists: %s" % blob_id)
+ request.finish()
+
+
+def _catchQuotaExceeded(failure, request, user):
+ failure.trap(QuotaExceeded)
+ logger.error("Error 507: Quota exceeded for user: %s" % user)
+ request.setResponseCode(507)
+ request.write('Quota Exceeded!')
+ request.finish()
+
+
+def _catchInvalidFlag(failure, request, user, blob_id):
+ failure.trap(InvalidFlag)
+ flag = failure.value.message
+ logger.error("Error 406: Attempted to set invalid flag %s for blob %s "
+ "for user %s" % (flag, blob_id, user))
+ request.setResponseCode(406)
+ request.write("Invalid flag: %s" % str(flag))
+ request.finish()
+
+
+def _catchAllErrors(self, e, request):
+ logger.error('Error processing request: %s' % e.getErrorMessage())
+ request.setResponseCode(500)
+ request.finish()
+
+
class BlobsResource(resource.Resource):
isLeaf = True
@@ -316,19 +360,14 @@ class BlobsResource(resource.Resource):
d.addCallback(lambda _: request.finish())
return NOT_DONE_YET
- def catchBlobNotFound(failure):
- failure.trap(BlobNotFound)
- request.setResponseCode(404)
- request.write("Blob doesn't exists: %s" % blob_id)
- request.finish()
-
only_flags = request.args.get('only_flags', [False])[0]
if only_flags:
d = self._handler.get_flags(user, blob_id, namespace)
- d.addErrback(catchBlobNotFound)
d.addCallback(lambda flags: json.dumps(flags))
d.addCallback(lambda flags: request.write(flags))
d.addCallback(lambda _: request.finish())
+ d.addErrback(_catchBlobNotFound, request, user, blob_id)
+ d.addErrback(_catchAllErrors, request)
return NOT_DONE_YET
d = self._handler.get_tag(user, blob_id, namespace)
@@ -339,47 +378,29 @@ class BlobsResource(resource.Resource):
namespace=namespace))
d.addCallback(lambda fd: BlobFile(fd))
d.addCallback(lambda res: res.render_GET(request))
+ d.addErrback(_catchBlobNotFound, request, user, blob_id)
+ d.addErrback(_catchAllErrors, request, finishRequest=True)
return NOT_DONE_YET
def render_DELETE(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
-
- def catchBlobNotFound(failure):
- failure.trap(BlobNotFound)
- request.setResponseCode(404)
- request.write("Blob doesn't exists: %s" % blob_id)
- request.finish()
-
d = self._handler.delete_blob(user, blob_id, namespace=namespace)
d.addCallback(lambda _: request.finish())
- d.addErrback(catchBlobNotFound)
+ d.addErrback(_catchBlobNotFound, request, user, blob_id)
+ d.addErrback(_catchAllErrors, request)
return NOT_DONE_YET
def render_PUT(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
-
- def catchBlobExists(failure):
- failure.trap(BlobExists)
- request.setResponseCode(409)
- request.write("Blob already exists: %s" % blob_id)
- request.finish()
-
- def catchQuotaExceeded(failure):
- failure.trap(QuotaExceeded)
- logger.error("Error 507: Quota exceeded for user: %s" % user)
- request.setResponseCode(507)
- request.write('Quota Exceeded!')
- request.finish()
-
fd = request.content
d = self._handler.write_blob(user, blob_id, fd, namespace=namespace)
d.addCallback(lambda _: request.finish())
- d.addErrback(catchBlobExists)
- d.addErrback(catchQuotaExceeded)
- d.addErrback(self._error, request)
+ d.addErrback(_catchBlobExists, request, user, blob_id)
+ d.addErrback(_catchQuotaExceeded, request, user)
+ d.addErrback(_catchAllErrors, request)
return NOT_DONE_YET
def render_POST(self, request):
@@ -387,32 +408,14 @@ class BlobsResource(resource.Resource):
user, blob_id, namespace = self._validate(request)
raw_flags = request.content.read()
flags = json.loads(raw_flags)
-
- def catchBlobNotFound(failure):
- failure.trap(BlobNotFound)
- request.setResponseCode(404)
- request.write("Blob doesn't exists: %s" % blob_id)
- request.finish()
-
- def catchInvalidFlag(failure):
- e = failure.trap(InvalidFlag)
- request.setResponseCode(406)
- flag = e.message
- request.write("Invalid flag: %s" % str(flag))
- request.finish()
-
d = self._handler.set_flags(user, blob_id, flags, namespace=namespace)
d.addCallback(lambda _: request.write(''))
d.addCallback(lambda _: request.finish())
- d.addErrback(catchBlobNotFound)
- d.addErrback(catchInvalidFlag)
+ d.addErrback(_catchBlobNotFound, request, user, blob_id)
+ d.addErrback(_catchInvalidFlag, request, user, blob_id)
+ d.addErrback(_catchAllErrors, request)
return NOT_DONE_YET
- def _error(self, e, request):
- logger.error('Error processing request: %s' % e.getErrorMessage())
- request.setResponseCode(500)
- request.finish()
-
def _validate(self, request):
for arg in request.postpath:
if arg and not VALID_STRINGS.match(arg):
diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py
index 9b672107..59bb383e 100644
--- a/src/leap/soledad/server/_streaming_resource.py
+++ b/src/leap/soledad/server/_streaming_resource.py
@@ -24,7 +24,7 @@ import base64
from zope.interface import implementer
from twisted.internet.interfaces import IPushProducer
-from twisted.internet import task
+from twisted.internet import task, defer
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
@@ -61,12 +61,20 @@ class StreamingResource(Resource):
db = self._handler
raw_content = request.content.read()
blob_ids = json.loads(raw_content)
- paths = []
+ deferreds = []
for blob_id in blob_ids:
+
+ def _get_blob_info(blob_id, path):
+ d = db.get_blob_size(user, blob_id, namespace)
+ d.addCallback(lambda size: (blob_id, path, size))
+ return d
+
path = db._get_path(user, blob_id, namespace)
- size = db.get_blob_size(user, blob_id, namespace)
- paths.append((blob_id, path, size))
- DownstreamProducer(request, paths).start()
+ d = _get_blob_info(blob_id, path)
+ deferreds.append(d)
+ d = defer.gatherResults(deferreds)
+ d.addCallback(
+ lambda paths: DownstreamProducer(request, paths).start())
return NOT_DONE_YET