diff options
-rw-r--r-- | src/leap/soledad/server/_blobs.py | 121 | ||||
-rw-r--r-- | src/leap/soledad/server/_streaming_resource.py | 18 |
2 files changed, 75 insertions, 64 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py index 4a963e10..cb726788 100644 --- a/src/leap/soledad/server/_blobs.py +++ b/src/leap/soledad/server/_blobs.py @@ -103,7 +103,7 @@ class FilesystemBlobsBackend(object): def get_flags(self, user, blob_id, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - return defer.fail(BlobNotFound) + return defer.fail(BlobNotFound()) if not os.path.isfile(path + '.flags'): return defer.succeed([]) with open(path + '.flags', 'r') as flags_file: @@ -113,13 +113,14 @@ class FilesystemBlobsBackend(object): def set_flags(self, user, blob_id, flags, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound + return defer.fail(BlobNotFound()) for flag in flags: if flag not in ACCEPTED_FLAGS: - raise InvalidFlag(flag) + return defer.fail(InvalidFlag(flag)) with open(path + '.flags', 'w') as flags_file: raw_flags = json.dumps(flags) flags_file.write(raw_flags) + return defer.succeed(None) @defer.inlineCallbacks def write_blob(self, user, blob_id, fd, namespace=''): @@ -143,7 +144,7 @@ class FilesystemBlobsBackend(object): def delete_blob(self, user, blob_id, namespace=''): blob_path = self._get_path(user, blob_id, namespace) if not os.path.isfile(blob_path): - raise BlobNotFound + return defer.fail(BlobNotFound()) self.__touch(blob_path + '.deleted') os.unlink(blob_path) try: @@ -182,7 +183,8 @@ class FilesystemBlobsBackend(object): elif order_by == '-date': blob_ids.sort(key=lambda x: os.path.getmtime(x), reverse=True) elif order_by: - raise Exception("Unsupported order_by parameter: %s" % order_by) + exc = Exception("Unsupported order_by parameter: %s" % order_by) + return defer.fail(exc) if filter_flag: blob_ids = list(self._filter_flag(blob_ids, filter_flag)) blob_ids = [os.path.basename(path).replace('.deleted', '') @@ -205,7 +207,7 @@ class FilesystemBlobsBackend(object): def get_tag(self, user, blob_id, namespace=''): blob_path = self._get_path(user, blob_id, namespace) if not os.path.isfile(blob_path): - raise BlobNotFound + return defer.fail(BlobNotFound()) with open(blob_path) as doc_file: doc_file.seek(-16, 2) tag = base64.urlsafe_b64encode(doc_file.read()) @@ -276,6 +278,48 @@ class BlobFile(resource.Resource): return NOT_DONE_YET +def _catchBlobNotFound(failure, request, user, blob_id): + failure.trap(BlobNotFound) + logger.error("Error 404: Blob %s does not exist for user %s" + % (blob_id, user)) + request.setResponseCode(404) + request.write("Blob doesn't exists: %s" % blob_id) + request.finish() + + +def _catchBlobExists(failure, request, user, blob_id): + failure.trap(BlobExists) + logger.error("Error 409: Blob %s already exists for user %s" + % (blob_id, user)) + request.setResponseCode(409) + request.write("Blob already exists: %s" % blob_id) + request.finish() + + +def _catchQuotaExceeded(failure, request, user): + failure.trap(QuotaExceeded) + logger.error("Error 507: Quota exceeded for user: %s" % user) + request.setResponseCode(507) + request.write('Quota Exceeded!') + request.finish() + + +def _catchInvalidFlag(failure, request, user, blob_id): + failure.trap(InvalidFlag) + flag = failure.value.message + logger.error("Error 406: Attempted to set invalid flag %s for blob %s " + "for user %s" % (flag, blob_id, user)) + request.setResponseCode(406) + request.write("Invalid flag: %s" % str(flag)) + request.finish() + + +def _catchAllErrors(self, e, request): + logger.error('Error processing request: %s' % e.getErrorMessage()) + request.setResponseCode(500) + request.finish() + + class BlobsResource(resource.Resource): isLeaf = True @@ -316,19 +360,14 @@ class BlobsResource(resource.Resource): d.addCallback(lambda _: request.finish()) return NOT_DONE_YET - def catchBlobNotFound(failure): - failure.trap(BlobNotFound) - request.setResponseCode(404) - request.write("Blob doesn't exists: %s" % blob_id) - request.finish() - only_flags = request.args.get('only_flags', [False])[0] if only_flags: d = self._handler.get_flags(user, blob_id, namespace) - d.addErrback(catchBlobNotFound) d.addCallback(lambda flags: json.dumps(flags)) d.addCallback(lambda flags: request.write(flags)) d.addCallback(lambda _: request.finish()) + d.addErrback(_catchBlobNotFound, request, user, blob_id) + d.addErrback(_catchAllErrors, request) return NOT_DONE_YET d = self._handler.get_tag(user, blob_id, namespace) @@ -339,47 +378,29 @@ class BlobsResource(resource.Resource): namespace=namespace)) d.addCallback(lambda fd: BlobFile(fd)) d.addCallback(lambda res: res.render_GET(request)) + d.addErrback(_catchBlobNotFound, request, user, blob_id) + d.addErrback(_catchAllErrors, request, finishRequest=True) return NOT_DONE_YET def render_DELETE(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) - - def catchBlobNotFound(failure): - failure.trap(BlobNotFound) - request.setResponseCode(404) - request.write("Blob doesn't exists: %s" % blob_id) - request.finish() - d = self._handler.delete_blob(user, blob_id, namespace=namespace) d.addCallback(lambda _: request.finish()) - d.addErrback(catchBlobNotFound) + d.addErrback(_catchBlobNotFound, request, user, blob_id) + d.addErrback(_catchAllErrors, request) return NOT_DONE_YET def render_PUT(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) - - def catchBlobExists(failure): - failure.trap(BlobExists) - request.setResponseCode(409) - request.write("Blob already exists: %s" % blob_id) - request.finish() - - def catchQuotaExceeded(failure): - failure.trap(QuotaExceeded) - logger.error("Error 507: Quota exceeded for user: %s" % user) - request.setResponseCode(507) - request.write('Quota Exceeded!') - request.finish() - fd = request.content d = self._handler.write_blob(user, blob_id, fd, namespace=namespace) d.addCallback(lambda _: request.finish()) - d.addErrback(catchBlobExists) - d.addErrback(catchQuotaExceeded) - d.addErrback(self._error, request) + d.addErrback(_catchBlobExists, request, user, blob_id) + d.addErrback(_catchQuotaExceeded, request, user) + d.addErrback(_catchAllErrors, request) return NOT_DONE_YET def render_POST(self, request): @@ -387,32 +408,14 @@ class BlobsResource(resource.Resource): user, blob_id, namespace = self._validate(request) raw_flags = request.content.read() flags = json.loads(raw_flags) - - def catchBlobNotFound(failure): - failure.trap(BlobNotFound) - request.setResponseCode(404) - request.write("Blob doesn't exists: %s" % blob_id) - request.finish() - - def catchInvalidFlag(failure): - e = failure.trap(InvalidFlag) - request.setResponseCode(406) - flag = e.message - request.write("Invalid flag: %s" % str(flag)) - request.finish() - d = self._handler.set_flags(user, blob_id, flags, namespace=namespace) d.addCallback(lambda _: request.write('')) d.addCallback(lambda _: request.finish()) - d.addErrback(catchBlobNotFound) - d.addErrback(catchInvalidFlag) + d.addErrback(_catchBlobNotFound, request, user, blob_id) + d.addErrback(_catchInvalidFlag, request, user, blob_id) + d.addErrback(_catchAllErrors, request) return NOT_DONE_YET - def _error(self, e, request): - logger.error('Error processing request: %s' % e.getErrorMessage()) - request.setResponseCode(500) - request.finish() - def _validate(self, request): for arg in request.postpath: if arg and not VALID_STRINGS.match(arg): diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index 9b672107..59bb383e 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -24,7 +24,7 @@ import base64 from zope.interface import implementer from twisted.internet.interfaces import IPushProducer -from twisted.internet import task +from twisted.internet import task, defer from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource @@ -61,12 +61,20 @@ class StreamingResource(Resource): db = self._handler raw_content = request.content.read() blob_ids = json.loads(raw_content) - paths = [] + deferreds = [] for blob_id in blob_ids: + + def _get_blob_info(blob_id, path): + d = db.get_blob_size(user, blob_id, namespace) + d.addCallback(lambda size: (blob_id, path, size)) + return d + path = db._get_path(user, blob_id, namespace) - size = db.get_blob_size(user, blob_id, namespace) - paths.append((blob_id, path, size)) - DownstreamProducer(request, paths).start() + d = _get_blob_info(blob_id, path) + deferreds.append(d) + d = defer.gatherResults(deferreds) + d.addCallback( + lambda paths: DownstreamProducer(request, paths).start()) return NOT_DONE_YET |