From 383a19aaf87bd8e2665112c1350627140958eedf Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 Dec 2017 20:11:13 -0200 Subject: [refactor] use producer/consumer on write/read_blob respectivelly --- src/leap/soledad/server/_blobs.py | 135 +++++++++++++++++----------------- src/leap/soledad/server/_incoming.py | 5 +- src/leap/soledad/server/interfaces.py | 15 ++-- 3 files changed, 78 insertions(+), 77 deletions(-) (limited to 'src/leap/soledad/server') diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py index 554fe5ac..70b55264 100644 --- a/src/leap/soledad/server/_blobs.py +++ b/src/leap/soledad/server/_blobs.py @@ -29,9 +29,6 @@ import base64 import json import re -from twisted.python.compat import intToBytes -from twisted.web import http -from twisted.web import static from twisted.web import resource from twisted.web.client import FileBodyProducer from twisted.web.server import NOT_DONE_YET @@ -93,15 +90,14 @@ class FilesystemBlobsBackend(object): def __touch(self, path): open(path, 'a') - def read_blob(self, user, blob_id, namespace=''): + @defer.inlineCallbacks + def read_blob(self, user, blob_id, consumer, namespace=''): logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace)) - try: - path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) + path = self._get_path(user, blob_id, namespace) logger.debug('blob path: %s' % path) - fd = open(path) - return defer.succeed(fd) + with open(path) as fd: + producer = FileBodyProducer(fd) + yield producer.startProducing(consumer) def get_flags(self, user, blob_id, namespace=''): try: @@ -132,7 +128,7 @@ class FilesystemBlobsBackend(object): return defer.succeed(None) @defer.inlineCallbacks - def write_blob(self, user, blob_id, fd, namespace=''): + def write_blob(self, user, blob_id, producer, namespace=''): yield self.semaphore.acquire() path = self._get_path(user, blob_id, namespace) try: @@ -145,9 +141,8 @@ class FilesystemBlobsBackend(object): if used > self.quota: raise QuotaExceeded logger.info('writing blob: %s - %s' % (user, blob_id)) - fbp = FileBodyProducer(fd) with open(path, 'wb') as blobfile: - yield fbp.startProducing(blobfile) + yield producer.startProducing(blobfile) yield self.semaphore.release() def delete_blob(self, user, blob_id, namespace=''): @@ -292,23 +287,6 @@ class ImproperlyConfiguredException(Exception): pass -class BlobFile(resource.Resource): - - def __init__(self, fd): - self.fd = fd - self.fd.seek(0, 2) - self.size = self.fd.tell() - self.fd.seek(0) - - def render_GET(self, request): - request.setHeader(b'content-length', intToBytes(self.size)) - request.setHeader(b'content-type', 'application/octet-stream') - request.setResponseCode(http.OK) - producer = static.NoRangeStaticProducer(request, self.fd) - producer.start() - return NOT_DONE_YET - - def _catchBlobNotFound(failure, request, user, blob_id): failure.trap(BlobNotFound) logger.error("Error 404: Blob %s does not exist for user %s" @@ -370,50 +348,70 @@ class BlobsResource(resource.Resource): # TODO double check credentials, we can have then # under request. - def render_GET(self, request): - logger.info("http get: %s" % request.path) - user, blob_id, namespace = self._validate(request) - if not blob_id and request.args.get('only_count', [False])[0]: - d = self._handler.count(user, namespace) - d.addCallback(lambda count: json.dumps({"count": count})) - d.addCallback(lambda count: request.write(count)) - d.addCallback(lambda _: request.finish()) - return NOT_DONE_YET - elif not blob_id: - order = request.args.get('order_by', [None])[0] - filter_flag = request.args.get('filter_flag', [False])[0] - deleted = request.args.get('deleted', [False])[0] - d = self._handler.list_blobs(user, namespace, - order_by=order, deleted=deleted, - filter_flag=filter_flag) - d.addCallback(lambda blobs: json.dumps(blobs)) - d.addCallback(lambda blobs: request.write(blobs)) - d.addCallback(lambda _: request.finish()) - return NOT_DONE_YET + def _only_count(self, request, user, namespace): + d = self._handler.count(user, namespace) + d.addCallback(lambda count: json.dumps({"count": count})) + d.addCallback(lambda count: request.write(count)) + d.addCallback(lambda _: request.finish()) + return NOT_DONE_YET - only_flags = request.args.get('only_flags', [False])[0] - if only_flags: - d = self._handler.get_flags(user, blob_id, namespace) - d.addCallback(lambda flags: json.dumps(flags)) - d.addCallback(lambda flags: request.write(flags)) - d.addCallback(lambda _: request.finish()) - d.addErrback(_catchBlobNotFound, request, user, blob_id) - d.addErrback(_catchAllErrors, request) - return NOT_DONE_YET + def _list(self, request, user, namespace): + order = request.args.get('order_by', [None])[0] + filter_flag = request.args.get('filter_flag', [False])[0] + deleted = request.args.get('deleted', [False])[0] + d = self._handler.list_blobs(user, namespace, + order_by=order, deleted=deleted, + filter_flag=filter_flag) + d.addCallback(lambda blobs: json.dumps(blobs)) + d.addCallback(lambda blobs: request.write(blobs)) + d.addCallback(lambda _: request.finish()) + return NOT_DONE_YET + + def _only_flags(self, request, user, blob_id, namespace): + d = self._handler.get_flags(user, blob_id, namespace) + d.addCallback(lambda flags: json.dumps(flags)) + d.addCallback(lambda flags: request.write(flags)) + d.addCallback(lambda _: request.finish()) + d.addErrback(_catchBlobNotFound, request, user, blob_id) + d.addErrback(_catchAllErrors, request) + return NOT_DONE_YET + + def _get_blob(self, request, user, blob_id, namespace): + + def _set_tag_header(tag): + request.responseHeaders.setRawHeaders('Tag', [tag]) + + def _read_blob(_): + handler = self._handler + consumer = request + d = handler.read_blob(user, blob_id, consumer, namespace=namespace) + return d d = self._handler.get_tag(user, blob_id, namespace) - d.addCallback( - lambda tag: request.responseHeaders.setRawHeaders( - 'Tag', [tag])) - d.addCallback(lambda _: self._handler.read_blob(user, blob_id, - namespace=namespace)) - d.addCallback(lambda fd: BlobFile(fd)) - d.addCallback(lambda res: res.render_GET(request)) + d.addCallback(_set_tag_header) + d.addCallback(_read_blob) + d.addCallback(lambda _: request.finish()) d.addErrback(_catchBlobNotFound, request, user, blob_id) d.addErrback(_catchAllErrors, request, finishRequest=True) return NOT_DONE_YET + def render_GET(self, request): + logger.info("http get: %s" % request.path) + user, blob_id, namespace = self._validate(request) + only_flags = request.args.get('only_flags', [False])[0] + + if not blob_id and request.args.get('only_count', [False])[0]: + return self._only_count(request, user, namespace) + + if not blob_id: + return self._list(request, user, namespace) + + if only_flags: + return self._only_flags(request, user, blob_id, namespace) + + return self._get_blob(request, user, blob_id, namespace) + def render_DELETE(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) @@ -426,8 +424,9 @@ class BlobsResource(resource.Resource): def render_PUT(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) - fd = request.content - d = self._handler.write_blob(user, blob_id, fd, namespace=namespace) + producer = FileBodyProducer(request.content) + handler = self._handler + d = handler.write_blob(user, blob_id, producer, namespace=namespace) d.addCallback(lambda _: request.finish()) d.addErrback(_catchBlobExists, request, user, blob_id) d.addErrback(_catchQuotaExceeded, request, user) diff --git a/src/leap/soledad/server/_incoming.py b/src/leap/soledad/server/_incoming.py index 362039af..071a94f6 100644 --- a/src/leap/soledad/server/_incoming.py +++ b/src/leap/soledad/server/_incoming.py @@ -22,6 +22,7 @@ import base64 from io import BytesIO from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource +from twisted.web.client import FileBodyProducer from leap.soledad.common.blobs import Flags from leap.soledad.common.blobs import preamble @@ -100,8 +101,8 @@ class IncomingResource(Resource): request.write('Quota Exceeded!') request.finish() - fd = request.content - d = db.write_blob(user, blob_id, fd, namespace='MX') + producer = FileBodyProducer(request.content) + d = db.write_blob(user, blob_id, producer, namespace='MX') flags = [Flags.PENDING] d.addCallback(lambda _: db.set_flags(user, blob_id, flags, namespace='MX')) diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py index d7a4aa70..624c8ff6 100644 --- a/src/leap/soledad/server/interfaces.py +++ b/src/leap/soledad/server/interfaces.py @@ -25,7 +25,7 @@ class IBlobsBackend(Interface): An interface for a backend that can store blobs. """ - def read_blob(user, blob_id, namespace=''): + def read_blob(user, blob_id, consumer, namespace=''): """ Read a blob from the backend storage. @@ -33,15 +33,17 @@ class IBlobsBackend(Interface): :type user: str :param blob_id: The id of the blob. :type blob_id: str + :param consumer: The object to write data to. + :type consumer: twisted.internet.interfaces.IConsumer provider :param namespace: An optional namespace for the blob. :type namespace: str - :return: A deferred that fires with a file-like object that gives - access to the contents of the blob. + :return: A deferred that fires when the blob has been written to the + consumer. :rtype: twisted.internet.defer.Deferred """ - def write_blob(user, blob_id, fd, namespace=''): + def write_blob(user, blob_id, producer, namespace=''): """ Write a blob to the backend storage. @@ -49,9 +51,8 @@ class IBlobsBackend(Interface): :type user: str :param blob_id: The id of the blob. :type blob_id: str - :param fd: A file-like object into which the contents of the blob - should be written. - :type fd: file-like + :param producer: The object to read data from. + :type producer: twisted.internet.interfaces.IProducer provider :param namespace: An optional namespace for the blob. :type namespace: str -- cgit v1.2.3