summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-12-19 18:00:23 -0200
committerdrebs <drebs@leap.se>2017-12-26 09:04:50 -0200
commit5a3d6cd05e8f12aba2527b2d711cd0aa7a27f3e8 (patch)
treeb2f0b7b55feda0f3cd7ddd1e97a0975f2c1af794
parent0cf535b26c8989a60df646f64dfcb14b59de1a86 (diff)
[feature] add ranges to blobs backend
-rw-r--r--src/leap/soledad/server/_blobs/errors.py7
-rw-r--r--src/leap/soledad/server/_blobs/fs_backend.py53
-rw-r--r--src/leap/soledad/server/_blobs/resource.py54
-rw-r--r--src/leap/soledad/server/interfaces.py5
-rw-r--r--tests/blobs/test_fs_backend.py25
-rw-r--r--tests/server/test_blobs_server.py52
-rw-r--r--tests/server/test_incoming_server.py6
7 files changed, 182 insertions, 20 deletions
diff --git a/src/leap/soledad/server/_blobs/errors.py b/src/leap/soledad/server/_blobs/errors.py
index 8c1c532e..0cd2059f 100644
--- a/src/leap/soledad/server/_blobs/errors.py
+++ b/src/leap/soledad/server/_blobs/errors.py
@@ -42,3 +42,10 @@ class ImproperlyConfiguredException(Exception):
"""
Raised when there is a problem with the configuration of a backend.
"""
+
+
+class RangeNotSatisfiable(Exception):
+ """
+ Raised when the Range: HTTP header was sent but the server doesn't know how
+ to satisfy it.
+ """
diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py
index 6274a1c5..e1c49910 100644
--- a/src/leap/soledad/server/_blobs/fs_backend.py
+++ b/src/leap/soledad/server/_blobs/fs_backend.py
@@ -27,7 +27,8 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet import utils
-from twisted.web.client import FileBodyProducer
+from twisted.web.static import NoRangeStaticProducer
+from twisted.web.static import SingleRangeStaticProducer
from leap.common.files import mkdir_p
from leap.soledad.common.blobs import ACCEPTED_FLAGS
@@ -44,6 +45,43 @@ from .util import VALID_STRINGS
logger = getLogger(__name__)
+class NoRangeProducer(NoRangeStaticProducer):
+ """
+ A static file producer that fires a deferred when it's finished.
+ """
+
+ def start(self):
+ NoRangeStaticProducer.start(self)
+ if self.request is None:
+ return defer.succeed(None)
+ self.deferred = defer.Deferred()
+ return self.deferred
+
+ def stopProducing(self):
+ NoRangeStaticProducer.stopProducing(self)
+ if hasattr(self, 'deferred'):
+ self.deferred.callback(None)
+
+
+class SingleRangeProducer(SingleRangeStaticProducer):
+ """
+ A static file producer of a single file range that fires a deferred when
+ it's finished.
+ """
+
+ def start(self):
+ SingleRangeStaticProducer.start(self)
+ if self.request is None:
+ return defer.succeed(None)
+ self.deferred = defer.Deferred()
+ return self.deferred
+
+ def stopProducing(self):
+ SingleRangeStaticProducer.stopProducing(self)
+ if hasattr(self, 'deferred'):
+ self.deferred.callback(None)
+
+
@implementer(interfaces.IBlobsBackend)
class FilesystemBlobsBackend(object):
@@ -63,13 +101,20 @@ class FilesystemBlobsBackend(object):
open(path, 'a')
@defer.inlineCallbacks
- def read_blob(self, user, blob_id, consumer, namespace=''):
+ def read_blob(self, user, blob_id, consumer, namespace='', range=None):
logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
path = self._get_path(user, blob_id, namespace)
logger.debug('blob path: %s' % path)
with open(path) as fd:
- producer = FileBodyProducer(fd)
- yield producer.startProducing(consumer)
+ if range is None:
+ producer = NoRangeProducer(consumer, fd)
+ else:
+ start, end = range
+ offset = start
+ size = end - start
+ args = (consumer, fd, offset, size)
+ producer = SingleRangeProducer(*args)
+ yield producer.start()
def get_flags(self, user, blob_id, namespace=''):
try:
diff --git a/src/leap/soledad/server/_blobs/resource.py b/src/leap/soledad/server/_blobs/resource.py
index a6c209f2..dd9af861 100644
--- a/src/leap/soledad/server/_blobs/resource.py
+++ b/src/leap/soledad/server/_blobs/resource.py
@@ -19,6 +19,8 @@ A Twisted Web resource for blobs.
"""
import json
+from twisted.python.compat import intToBytes
+from twisted.python.compat import networkString
from twisted.web import resource
from twisted.web.client import FileBodyProducer
from twisted.web.server import NOT_DONE_YET
@@ -31,6 +33,7 @@ from .errors import BlobNotFound
from .errors import BlobExists
from .errors import ImproperlyConfiguredException
from .errors import QuotaExceeded
+from .errors import RangeNotSatisfiable
from .util import VALID_STRINGS
from leap.soledad.common.log import getLogger
@@ -44,7 +47,7 @@ def _catchBlobNotFound(failure, request, user, blob_id):
logger.error("Error 404: Blob %s does not exist for user %s"
% (blob_id, user))
request.setResponseCode(404)
- request.write("Blob doesn't exists: %s" % blob_id)
+ request.write("Blob doesn't exist: %s" % blob_id)
request.finish()
@@ -128,7 +131,7 @@ class BlobsResource(resource.Resource):
d.addErrback(_catchAllErrors, request)
return NOT_DONE_YET
- def _get_blob(self, request, user, blob_id, namespace):
+ def _get_blob(self, request, user, blob_id, namespace, range):
def _set_tag_header(tag):
request.responseHeaders.setRawHeaders('Tag', [tag])
@@ -136,18 +139,32 @@ class BlobsResource(resource.Resource):
def _read_blob(_):
handler = self._handler
consumer = request
- d = handler.read_blob(user, blob_id, consumer, namespace=namespace)
+ d = handler.read_blob(
+ user, blob_id, consumer, namespace=namespace, range=range)
return d
d = self._handler.get_tag(user, blob_id, namespace)
d.addCallback(_set_tag_header)
d.addCallback(_read_blob)
- d.addCallback(lambda _: request.finish())
d.addErrback(_catchBlobNotFound, request, user, blob_id)
d.addErrback(_catchAllErrors, request, finishRequest=True)
return NOT_DONE_YET
+ def _parseRange(self, range):
+ if not range:
+ return None
+ try:
+ kind, value = range.split(b'=', 1)
+ if kind.strip() != b'bytes':
+ raise Exception('Unknown unit: %s' % kind)
+ start, end = value.split('-')
+ start = int(start) if start else None
+ end = int(end) if end else None
+ return start, end
+ except Exception as e:
+ raise RangeNotSatisfiable(e)
+
def render_GET(self, request):
logger.info("http get: %s" % request.path)
user, blob_id, namespace = self._validate(request)
@@ -162,7 +179,34 @@ class BlobsResource(resource.Resource):
if only_flags:
return self._only_flags(request, user, blob_id, namespace)
- return self._get_blob(request, user, blob_id, namespace)
+ def _handleRangeHeader(size):
+ try:
+ range = self._parseRange(request.getHeader('Range'))
+ except RangeNotSatisfiable:
+ content_range = 'bytes */%d' % size
+ content_range = networkString(content_range)
+ request.setResponseCode(416)
+ request.setHeader(b'content-range', content_range)
+ request.finish()
+ return
+
+ if not range:
+ start = end = None
+ request.setResponseCode(200)
+ request.setHeader(b'content-length', intToBytes(size))
+ else:
+ start, end = range
+ content_range = 'bytes %d-%d/%d' % (start, end, size)
+ content_range = networkString(content_range)
+ length = intToBytes(end - start)
+ request.setResponseCode(206)
+ request.setHeader(b'content-range', content_range)
+ request.setHeader(b'content-length', length)
+ return self._get_blob(request, user, blob_id, namespace, range)
+
+ d = self._handler.get_blob_size(user, blob_id, namespace=namespace)
+ d.addCallback(_handleRangeHeader)
+ return NOT_DONE_YET
def render_DELETE(self, request):
logger.info("http put: %s" % request.path)
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
index c2e79854..089111e3 100644
--- a/src/leap/soledad/server/interfaces.py
+++ b/src/leap/soledad/server/interfaces.py
@@ -25,7 +25,7 @@ class IBlobsBackend(Interface):
An interface for a backend that can store blobs.
"""
- def read_blob(user, blob_id, consumer, namespace=''):
+ def read_blob(user, blob_id, consumer, namespace='', range=None):
"""
Read a blob from the backend storage.
@@ -37,6 +37,9 @@ class IBlobsBackend(Interface):
:type consumer: twisted.internet.interfaces.IConsumer provider
:param namespace: An optional namespace for the blob.
:type namespace: str
+ :param range: An optional tuple indicating start and end position of
+ the blob to be produced.
+ :type range: (int, int)
:return: A deferred that fires when the blob has been written to the
consumer.
diff --git a/tests/blobs/test_fs_backend.py b/tests/blobs/test_fs_backend.py
index 5c086a51..2485ccd1 100644
--- a/tests/blobs/test_fs_backend.py
+++ b/tests/blobs/test_fs_backend.py
@@ -20,6 +20,7 @@ Tests for blobs backend on server side.
from twisted.trial import unittest
from twisted.internet import defer
from twisted.web.client import FileBodyProducer
+from twisted.web.test.requesthelper import DummyRequest
from leap.common.files import mkdir_p
from leap.soledad.server import _blobs
from mock import Mock
@@ -61,18 +62,18 @@ class FilesystemBackendTestCase(unittest.TestCase):
self.assertEquals(10, size)
@pytest.mark.usefixtures("method_tmpdir")
- @mock.patch('leap.soledad.server._blobs.fs_backend.open')
@mock.patch('leap.soledad.server._blobs.fs_backend'
'.FilesystemBlobsBackend._get_path')
@defer.inlineCallbacks
- def test_read_blob(self, get_path, open):
- get_path.return_value = 'path'
- open.return_value = io.BytesIO('content')
+ def test_read_blob(self, get_path):
+ path = os.path.join(self.tempdir, 'blob')
+ with open(path, 'w') as f:
+ f.write('bl0b')
+ get_path.return_value = path
backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir)
- consumer = Mock()
+ consumer = DummyRequest([''])
yield backend.read_blob('user', 'blob_id', consumer)
- consumer.write.assert_called_with('content')
- get_path.assert_called_once_with('user', 'blob_id', '')
+ self.assertEqual(['bl0b'], consumer.written)
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch.object(os.path, 'isfile')
@@ -246,3 +247,13 @@ class FilesystemBackendTestCase(unittest.TestCase):
count = yield backend.count('user', namespace='xfiles')
self.assertEqual(2, count)
+
+ @pytest.mark.usefixtures("method_tmpdir")
+ @defer.inlineCallbacks
+ def test_read_range(self):
+ backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir)
+ producer = FileBodyProducer(io.BytesIO("0123456789"))
+ yield backend.write_blob('user', 'blob-id', producer)
+ consumer = DummyRequest([''])
+ yield backend.read_blob('user', 'blob-id', consumer, range=(1, 3))
+ self.assertEqual(['12'], consumer.written)
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
index bf929386..6fed6d65 100644
--- a/tests/server/test_blobs_server.py
+++ b/tests/server/test_blobs_server.py
@@ -19,6 +19,8 @@ Integration tests for blobs server
"""
import os
import pytest
+import re
+import treq
from urlparse import urljoin
from uuid import uuid4
from io import BytesIO
@@ -49,6 +51,11 @@ def sleep(x):
return d
+def _get(*args, **kwargs):
+ kwargs.update({'persistent': False})
+ return treq.get(*args, **kwargs)
+
+
class BlobServerTestCase(unittest.TestCase):
def setUp(self):
@@ -455,3 +462,48 @@ class BlobServerTestCase(unittest.TestCase):
self.addCleanup(manager.close)
with pytest.raises(SoledadError):
yield manager.delete('missing_id')
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_range(self):
+ user_id = uuid4().hex
+ manager = BlobManager(self.tempdir, self.uri, self.secret,
+ self.secret, user_id)
+ self.addCleanup(manager.close)
+ blob_id, content = 'blob_id', '0123456789'
+ doc = BlobDoc(BytesIO(content), blob_id)
+ yield manager.put(doc, len(content))
+ uri = urljoin(self.uri, '%s/%s' % (user_id, blob_id))
+ res = yield _get(uri, headers={'Range': 'bytes=10-20'})
+ text = yield res.text()
+ self.assertTrue(res.headers.hasHeader('content-range'))
+ content_range = res.headers.getRawHeaders('content-range').pop()
+ self.assertIsNotNone(re.match('^bytes 10-20/[0-9]+$', content_range))
+ self.assertEqual(10, len(text))
+
+ @defer.inlineCallbacks
+ @pytest.mark.usefixtures("method_tmpdir")
+ def test_get_range_not_satisfiable(self):
+ # put a blob in place
+ user_id = uuid4().hex
+ manager = BlobManager(self.tempdir, self.uri, self.secret,
+ self.secret, user_id)
+ self.addCleanup(manager.close)
+ blob_id, content = uuid4().hex, 'content'
+ doc = BlobDoc(BytesIO(content), blob_id)
+ yield manager.put(doc, len(content))
+ # and check possible parsing errors
+ uri = urljoin(self.uri, '%s/%s' % (user_id, blob_id))
+ ranges = [
+ 'bytes',
+ 'bytes=',
+ 'bytes=1',
+ 'bytes=blah-100',
+ 'potatoes=10-100'
+ 'blah'
+ ]
+ for range in ranges:
+ res = yield _get(uri, headers={'Range': range})
+ self.assertEqual(416, res.code)
+ content_range = res.headers.getRawHeaders('content-range').pop()
+ self.assertIsNotNone(re.match('^bytes \*/[0-9]+$', content_range))
diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py
index 23c0aa90..16d5d5e6 100644
--- a/tests/server/test_incoming_server.py
+++ b/tests/server/test_incoming_server.py
@@ -18,13 +18,13 @@
Integration tests for incoming API
"""
import pytest
-import mock
import treq
from io import BytesIO
from uuid import uuid4
from twisted.web.server import Site
from twisted.internet import reactor
from twisted.internet import defer
+from twisted.web.test.requesthelper import DummyRequest
from leap.soledad.server._incoming import IncomingResource
from leap.soledad.server._blobs import BlobsServerState
@@ -83,10 +83,10 @@ class IncomingOnCouchServerTestCase(CouchDBTestCase):
yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
db = self.state.open_database(user_id)
- consumer = mock.Mock()
+ consumer = DummyRequest([''])
yield db.read_blob(user_id, doc_id, consumer, namespace='MX')
flags = yield db.get_flags(user_id, doc_id, namespace='MX')
- data = consumer.write.call_args[0][0]
+ data = consumer.written.pop()
expected_preamble = formatter.preamble(content, doc_id)
expected_preamble = decode_preamble(expected_preamble, True)
written_preamble, written_content = data.split()