From 5a3d6cd05e8f12aba2527b2d711cd0aa7a27f3e8 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 19 Dec 2017 18:00:23 -0200 Subject: [feature] add ranges to blobs backend --- src/leap/soledad/server/_blobs/errors.py | 7 ++++ src/leap/soledad/server/_blobs/fs_backend.py | 53 ++++++++++++++++++++++++--- src/leap/soledad/server/_blobs/resource.py | 54 +++++++++++++++++++++++++--- src/leap/soledad/server/interfaces.py | 5 ++- 4 files changed, 109 insertions(+), 10 deletions(-) (limited to 'src/leap') diff --git a/src/leap/soledad/server/_blobs/errors.py b/src/leap/soledad/server/_blobs/errors.py index 8c1c532e..0cd2059f 100644 --- a/src/leap/soledad/server/_blobs/errors.py +++ b/src/leap/soledad/server/_blobs/errors.py @@ -42,3 +42,10 @@ class ImproperlyConfiguredException(Exception): """ Raised when there is a problem with the configuration of a backend. """ + + +class RangeNotSatisfiable(Exception): + """ + Raised when the Range: HTTP header was sent but the server doesn't know how + to satisfy it. + """ diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py index 6274a1c5..e1c49910 100644 --- a/src/leap/soledad/server/_blobs/fs_backend.py +++ b/src/leap/soledad/server/_blobs/fs_backend.py @@ -27,7 +27,8 @@ from zope.interface import implementer from twisted.internet import defer from twisted.internet import utils -from twisted.web.client import FileBodyProducer +from twisted.web.static import NoRangeStaticProducer +from twisted.web.static import SingleRangeStaticProducer from leap.common.files import mkdir_p from leap.soledad.common.blobs import ACCEPTED_FLAGS @@ -44,6 +45,43 @@ from .util import VALID_STRINGS logger = getLogger(__name__) +class NoRangeProducer(NoRangeStaticProducer): + """ + A static file producer that fires a deferred when it's finished. + """ + + def start(self): + NoRangeStaticProducer.start(self) + if self.request is None: + return defer.succeed(None) + self.deferred = defer.Deferred() + return self.deferred + + def stopProducing(self): + NoRangeStaticProducer.stopProducing(self) + if hasattr(self, 'deferred'): + self.deferred.callback(None) + + +class SingleRangeProducer(SingleRangeStaticProducer): + """ + A static file producer of a single file range that fires a deferred when + it's finished. + """ + + def start(self): + SingleRangeStaticProducer.start(self) + if self.request is None: + return defer.succeed(None) + self.deferred = defer.Deferred() + return self.deferred + + def stopProducing(self): + SingleRangeStaticProducer.stopProducing(self) + if hasattr(self, 'deferred'): + self.deferred.callback(None) + + @implementer(interfaces.IBlobsBackend) class FilesystemBlobsBackend(object): @@ -63,13 +101,20 @@ class FilesystemBlobsBackend(object): open(path, 'a') @defer.inlineCallbacks - def read_blob(self, user, blob_id, consumer, namespace=''): + def read_blob(self, user, blob_id, consumer, namespace='', range=None): logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace)) path = self._get_path(user, blob_id, namespace) logger.debug('blob path: %s' % path) with open(path) as fd: - producer = FileBodyProducer(fd) - yield producer.startProducing(consumer) + if range is None: + producer = NoRangeProducer(consumer, fd) + else: + start, end = range + offset = start + size = end - start + args = (consumer, fd, offset, size) + producer = SingleRangeProducer(*args) + yield producer.start() def get_flags(self, user, blob_id, namespace=''): try: diff --git a/src/leap/soledad/server/_blobs/resource.py b/src/leap/soledad/server/_blobs/resource.py index a6c209f2..dd9af861 100644 --- a/src/leap/soledad/server/_blobs/resource.py +++ b/src/leap/soledad/server/_blobs/resource.py @@ -19,6 +19,8 @@ A Twisted Web resource for blobs. """ import json +from twisted.python.compat import intToBytes +from twisted.python.compat import networkString from twisted.web import resource from twisted.web.client import FileBodyProducer from twisted.web.server import NOT_DONE_YET @@ -31,6 +33,7 @@ from .errors import BlobNotFound from .errors import BlobExists from .errors import ImproperlyConfiguredException from .errors import QuotaExceeded +from .errors import RangeNotSatisfiable from .util import VALID_STRINGS from leap.soledad.common.log import getLogger @@ -44,7 +47,7 @@ def _catchBlobNotFound(failure, request, user, blob_id): logger.error("Error 404: Blob %s does not exist for user %s" % (blob_id, user)) request.setResponseCode(404) - request.write("Blob doesn't exists: %s" % blob_id) + request.write("Blob doesn't exist: %s" % blob_id) request.finish() @@ -128,7 +131,7 @@ class BlobsResource(resource.Resource): d.addErrback(_catchAllErrors, request) return NOT_DONE_YET - def _get_blob(self, request, user, blob_id, namespace): + def _get_blob(self, request, user, blob_id, namespace, range): def _set_tag_header(tag): request.responseHeaders.setRawHeaders('Tag', [tag]) @@ -136,18 +139,32 @@ class BlobsResource(resource.Resource): def _read_blob(_): handler = self._handler consumer = request - d = handler.read_blob(user, blob_id, consumer, namespace=namespace) + d = handler.read_blob( + user, blob_id, consumer, namespace=namespace, range=range) return d d = self._handler.get_tag(user, blob_id, namespace) d.addCallback(_set_tag_header) d.addCallback(_read_blob) - d.addCallback(lambda _: request.finish()) d.addErrback(_catchBlobNotFound, request, user, blob_id) d.addErrback(_catchAllErrors, request, finishRequest=True) return NOT_DONE_YET + def _parseRange(self, range): + if not range: + return None + try: + kind, value = range.split(b'=', 1) + if kind.strip() != b'bytes': + raise Exception('Unknown unit: %s' % kind) + start, end = value.split('-') + start = int(start) if start else None + end = int(end) if end else None + return start, end + except Exception as e: + raise RangeNotSatisfiable(e) + def render_GET(self, request): logger.info("http get: %s" % request.path) user, blob_id, namespace = self._validate(request) @@ -162,7 +179,34 @@ class BlobsResource(resource.Resource): if only_flags: return self._only_flags(request, user, blob_id, namespace) - return self._get_blob(request, user, blob_id, namespace) + def _handleRangeHeader(size): + try: + range = self._parseRange(request.getHeader('Range')) + except RangeNotSatisfiable: + content_range = 'bytes */%d' % size + content_range = networkString(content_range) + request.setResponseCode(416) + request.setHeader(b'content-range', content_range) + request.finish() + return + + if not range: + start = end = None + request.setResponseCode(200) + request.setHeader(b'content-length', intToBytes(size)) + else: + start, end = range + content_range = 'bytes %d-%d/%d' % (start, end, size) + content_range = networkString(content_range) + length = intToBytes(end - start) + request.setResponseCode(206) + request.setHeader(b'content-range', content_range) + request.setHeader(b'content-length', length) + return self._get_blob(request, user, blob_id, namespace, range) + + d = self._handler.get_blob_size(user, blob_id, namespace=namespace) + d.addCallback(_handleRangeHeader) + return NOT_DONE_YET def render_DELETE(self, request): logger.info("http put: %s" % request.path) diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py index c2e79854..089111e3 100644 --- a/src/leap/soledad/server/interfaces.py +++ b/src/leap/soledad/server/interfaces.py @@ -25,7 +25,7 @@ class IBlobsBackend(Interface): An interface for a backend that can store blobs. """ - def read_blob(user, blob_id, consumer, namespace=''): + def read_blob(user, blob_id, consumer, namespace='', range=None): """ Read a blob from the backend storage. @@ -37,6 +37,9 @@ class IBlobsBackend(Interface): :type consumer: twisted.internet.interfaces.IConsumer provider :param namespace: An optional namespace for the blob. :type namespace: str + :param range: An optional tuple indicating start and end position of + the blob to be produced. + :type range: (int, int) :return: A deferred that fires when the blob has been written to the consumer. -- cgit v1.2.3