diff options
-rw-r--r-- | src/leap/soledad/server/_blobs.py | 135 | ||||
-rw-r--r-- | src/leap/soledad/server/_incoming.py | 5 | ||||
-rw-r--r-- | src/leap/soledad/server/interfaces.py | 15 | ||||
-rw-r--r-- | tests/benchmarks/test_blobs_fs_backend.py | 27 | ||||
-rw-r--r-- | tests/blobs/test_fs_backend.py | 43 | ||||
-rw-r--r-- | tests/server/test_incoming_server.py | 8 |
6 files changed, 125 insertions, 108 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py index 554fe5ac..70b55264 100644 --- a/src/leap/soledad/server/_blobs.py +++ b/src/leap/soledad/server/_blobs.py @@ -29,9 +29,6 @@ import base64 import json import re -from twisted.python.compat import intToBytes -from twisted.web import http -from twisted.web import static from twisted.web import resource from twisted.web.client import FileBodyProducer from twisted.web.server import NOT_DONE_YET @@ -93,15 +90,14 @@ class FilesystemBlobsBackend(object): def __touch(self, path): open(path, 'a') - def read_blob(self, user, blob_id, namespace=''): + @defer.inlineCallbacks + def read_blob(self, user, blob_id, consumer, namespace=''): logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace)) - try: - path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) + path = self._get_path(user, blob_id, namespace) logger.debug('blob path: %s' % path) - fd = open(path) - return defer.succeed(fd) + with open(path) as fd: + producer = FileBodyProducer(fd) + yield producer.startProducing(consumer) def get_flags(self, user, blob_id, namespace=''): try: @@ -132,7 +128,7 @@ class FilesystemBlobsBackend(object): return defer.succeed(None) @defer.inlineCallbacks - def write_blob(self, user, blob_id, fd, namespace=''): + def write_blob(self, user, blob_id, producer, namespace=''): yield self.semaphore.acquire() path = self._get_path(user, blob_id, namespace) try: @@ -145,9 +141,8 @@ class FilesystemBlobsBackend(object): if used > self.quota: raise QuotaExceeded logger.info('writing blob: %s - %s' % (user, blob_id)) - fbp = FileBodyProducer(fd) with open(path, 'wb') as blobfile: - yield fbp.startProducing(blobfile) + yield producer.startProducing(blobfile) yield self.semaphore.release() def delete_blob(self, user, blob_id, namespace=''): @@ -292,23 +287,6 @@ class ImproperlyConfiguredException(Exception): pass -class BlobFile(resource.Resource): - - def __init__(self, fd): - self.fd = fd - self.fd.seek(0, 2) - self.size = self.fd.tell() - self.fd.seek(0) - - def render_GET(self, request): - request.setHeader(b'content-length', intToBytes(self.size)) - request.setHeader(b'content-type', 'application/octet-stream') - request.setResponseCode(http.OK) - producer = static.NoRangeStaticProducer(request, self.fd) - producer.start() - return NOT_DONE_YET - - def _catchBlobNotFound(failure, request, user, blob_id): failure.trap(BlobNotFound) logger.error("Error 404: Blob %s does not exist for user %s" @@ -370,50 +348,70 @@ class BlobsResource(resource.Resource): # TODO double check credentials, we can have then # under request. - def render_GET(self, request): - logger.info("http get: %s" % request.path) - user, blob_id, namespace = self._validate(request) - if not blob_id and request.args.get('only_count', [False])[0]: - d = self._handler.count(user, namespace) - d.addCallback(lambda count: json.dumps({"count": count})) - d.addCallback(lambda count: request.write(count)) - d.addCallback(lambda _: request.finish()) - return NOT_DONE_YET - elif not blob_id: - order = request.args.get('order_by', [None])[0] - filter_flag = request.args.get('filter_flag', [False])[0] - deleted = request.args.get('deleted', [False])[0] - d = self._handler.list_blobs(user, namespace, - order_by=order, deleted=deleted, - filter_flag=filter_flag) - d.addCallback(lambda blobs: json.dumps(blobs)) - d.addCallback(lambda blobs: request.write(blobs)) - d.addCallback(lambda _: request.finish()) - return NOT_DONE_YET + def _only_count(self, request, user, namespace): + d = self._handler.count(user, namespace) + d.addCallback(lambda count: json.dumps({"count": count})) + d.addCallback(lambda count: request.write(count)) + d.addCallback(lambda _: request.finish()) + return NOT_DONE_YET - only_flags = request.args.get('only_flags', [False])[0] - if only_flags: - d = self._handler.get_flags(user, blob_id, namespace) - d.addCallback(lambda flags: json.dumps(flags)) - d.addCallback(lambda flags: request.write(flags)) - d.addCallback(lambda _: request.finish()) - d.addErrback(_catchBlobNotFound, request, user, blob_id) - d.addErrback(_catchAllErrors, request) - return NOT_DONE_YET + def _list(self, request, user, namespace): + order = request.args.get('order_by', [None])[0] + filter_flag = request.args.get('filter_flag', [False])[0] + deleted = request.args.get('deleted', [False])[0] + d = self._handler.list_blobs(user, namespace, + order_by=order, deleted=deleted, + filter_flag=filter_flag) + d.addCallback(lambda blobs: json.dumps(blobs)) + d.addCallback(lambda blobs: request.write(blobs)) + d.addCallback(lambda _: request.finish()) + return NOT_DONE_YET + + def _only_flags(self, request, user, blob_id, namespace): + d = self._handler.get_flags(user, blob_id, namespace) + d.addCallback(lambda flags: json.dumps(flags)) + d.addCallback(lambda flags: request.write(flags)) + d.addCallback(lambda _: request.finish()) + d.addErrback(_catchBlobNotFound, request, user, blob_id) + d.addErrback(_catchAllErrors, request) + return NOT_DONE_YET + + def _get_blob(self, request, user, blob_id, namespace): + + def _set_tag_header(tag): + request.responseHeaders.setRawHeaders('Tag', [tag]) + + def _read_blob(_): + handler = self._handler + consumer = request + d = handler.read_blob(user, blob_id, consumer, namespace=namespace) + return d d = self._handler.get_tag(user, blob_id, namespace) - d.addCallback( - lambda tag: request.responseHeaders.setRawHeaders( - 'Tag', [tag])) - d.addCallback(lambda _: self._handler.read_blob(user, blob_id, - namespace=namespace)) - d.addCallback(lambda fd: BlobFile(fd)) - d.addCallback(lambda res: res.render_GET(request)) + d.addCallback(_set_tag_header) + d.addCallback(_read_blob) + d.addCallback(lambda _: request.finish()) d.addErrback(_catchBlobNotFound, request, user, blob_id) d.addErrback(_catchAllErrors, request, finishRequest=True) return NOT_DONE_YET + def render_GET(self, request): + logger.info("http get: %s" % request.path) + user, blob_id, namespace = self._validate(request) + only_flags = request.args.get('only_flags', [False])[0] + + if not blob_id and request.args.get('only_count', [False])[0]: + return self._only_count(request, user, namespace) + + if not blob_id: + return self._list(request, user, namespace) + + if only_flags: + return self._only_flags(request, user, blob_id, namespace) + + return self._get_blob(request, user, blob_id, namespace) + def render_DELETE(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) @@ -426,8 +424,9 @@ class BlobsResource(resource.Resource): def render_PUT(self, request): logger.info("http put: %s" % request.path) user, blob_id, namespace = self._validate(request) - fd = request.content - d = self._handler.write_blob(user, blob_id, fd, namespace=namespace) + producer = FileBodyProducer(request.content) + handler = self._handler + d = handler.write_blob(user, blob_id, producer, namespace=namespace) d.addCallback(lambda _: request.finish()) d.addErrback(_catchBlobExists, request, user, blob_id) d.addErrback(_catchQuotaExceeded, request, user) diff --git a/src/leap/soledad/server/_incoming.py b/src/leap/soledad/server/_incoming.py index 362039af..071a94f6 100644 --- a/src/leap/soledad/server/_incoming.py +++ b/src/leap/soledad/server/_incoming.py @@ -22,6 +22,7 @@ import base64 from io import BytesIO from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource +from twisted.web.client import FileBodyProducer from leap.soledad.common.blobs import Flags from leap.soledad.common.blobs import preamble @@ -100,8 +101,8 @@ class IncomingResource(Resource): request.write('Quota Exceeded!') request.finish() - fd = request.content - d = db.write_blob(user, blob_id, fd, namespace='MX') + producer = FileBodyProducer(request.content) + d = db.write_blob(user, blob_id, producer, namespace='MX') flags = [Flags.PENDING] d.addCallback(lambda _: db.set_flags(user, blob_id, flags, namespace='MX')) diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py index d7a4aa70..624c8ff6 100644 --- a/src/leap/soledad/server/interfaces.py +++ b/src/leap/soledad/server/interfaces.py @@ -25,7 +25,7 @@ class IBlobsBackend(Interface): An interface for a backend that can store blobs. """ - def read_blob(user, blob_id, namespace=''): + def read_blob(user, blob_id, consumer, namespace=''): """ Read a blob from the backend storage. @@ -33,15 +33,17 @@ class IBlobsBackend(Interface): :type user: str :param blob_id: The id of the blob. :type blob_id: str + :param consumer: The object to write data to. + :type consumer: twisted.internet.interfaces.IConsumer provider :param namespace: An optional namespace for the blob. :type namespace: str - :return: A deferred that fires with a file-like object that gives - access to the contents of the blob. + :return: A deferred that fires when the blob has been written to the + consumer. :rtype: twisted.internet.defer.Deferred """ - def write_blob(user, blob_id, fd, namespace=''): + def write_blob(user, blob_id, producer, namespace=''): """ Write a blob to the backend storage. @@ -49,9 +51,8 @@ class IBlobsBackend(Interface): :type user: str :param blob_id: The id of the blob. :type blob_id: str - :param fd: A file-like object into which the contents of the blob - should be written. - :type fd: file-like + :param producer: The object to read data from. + :type producer: twisted.internet.interfaces.IProducer provider :param namespace: An optional namespace for the blob. :type namespace: str diff --git a/tests/benchmarks/test_blobs_fs_backend.py b/tests/benchmarks/test_blobs_fs_backend.py index d3a663d2..bb5b9b88 100644 --- a/tests/benchmarks/test_blobs_fs_backend.py +++ b/tests/benchmarks/test_blobs_fs_backend.py @@ -2,7 +2,7 @@ import pytest from io import BytesIO from leap.soledad.server._blobs import FilesystemBlobsBackend from twisted.internet import defer -from twisted.web.test.test_web import DummyRequest +from twisted.web.client import FileBodyProducer def create_write_test(amount, size): @@ -12,12 +12,11 @@ def create_write_test(amount, size): def test(txbenchmark, payload, tmpdir): backend = FilesystemBlobsBackend(blobs_path=tmpdir.strpath) data = payload(size) + semaphore = defer.DeferredSemaphore(100) deferreds = [] for i in xrange(amount): - fd = BytesIO(data) - request = DummyRequest(['']) - request.content = fd - d = backend.write_blob('user', str(i), request) + producer = FileBodyProducer(BytesIO(data)) + d = semaphore.run(backend.write_blob, 'user', str(i), producer) deferreds.append(d) yield txbenchmark(defer.gatherResults, deferreds) @@ -30,6 +29,12 @@ test_blobs_fs_backend_write_100_100k = create_write_test(100, 100 * 1000) test_blobs_fs_backend_write_1000_10k = create_write_test(1000, 10 * 1000) +class DevNull(object): + + def write(self, data): + pass + + def create_read_test(amount, size): @pytest.inlineCallbacks @@ -39,22 +44,20 @@ def create_read_test(amount, size): data = payload(size) # first write blobs to the backend... + semaphore = defer.DeferredSemaphore(100) deferreds = [] for i in xrange(amount): - fd = BytesIO(data) - request = DummyRequest(['']) - request.content = fd - d = backend.write_blob('user', str(i), request) + producer = FileBodyProducer(BytesIO(data)) + d = semaphore.run(backend.write_blob, 'user', str(i), producer) deferreds.append(d) yield defer.gatherResults(deferreds) # ... then measure the read operation deferreds = [] for i in xrange(amount): - request = DummyRequest(['']) - d = request.notifyFinish() + consumer = DevNull() + d = semaphore.run(backend.read_blob, 'user', str(i), consumer) deferreds.append(d) - backend.read_blob('user', str(i), request) yield txbenchmark(defer.gatherResults, deferreds) return test diff --git a/tests/blobs/test_fs_backend.py b/tests/blobs/test_fs_backend.py index 5b3ff30a..fc3d649a 100644 --- a/tests/blobs/test_fs_backend.py +++ b/tests/blobs/test_fs_backend.py @@ -28,7 +28,6 @@ import mock import os import base64 import io -import json import pytest @@ -64,14 +63,16 @@ class FilesystemBackendTestCase(unittest.TestCase): @pytest.mark.usefixtures("method_tmpdir") @mock.patch('leap.soledad.server._blobs.open') - @mock.patch.object(_blobs.FilesystemBlobsBackend, '_get_path', - Mock(return_value='path')) + @mock.patch('leap.soledad.server._blobs.FilesystemBlobsBackend._get_path') @defer.inlineCallbacks - def test_read_blob(self, open): + def test_read_blob(self, get_path, open): + get_path.return_value = 'path' + open.return_value = io.BytesIO('content') backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir) - yield backend.read_blob('user', 'blob_id') - open.assert_called_once_with('path') - backend._get_path.assert_called_once_with('user', 'blob_id', '') + consumer = Mock() + yield backend.read_blob('user', 'blob_id', consumer) + consumer.write.assert_called_with('content') + get_path.assert_called_once_with('user', 'blob_id', '') @pytest.mark.usefixtures("method_tmpdir") @mock.patch.object(os.path, 'isfile') @@ -215,22 +216,32 @@ class FilesystemBackendTestCase(unittest.TestCase): namespace='custom') default = yield backend.list_blobs('user') custom = yield backend.list_blobs('user', namespace='custom') - self.assertEquals([], json.loads(default)) - self.assertEquals(['blob_id'], json.loads(custom)) + self.assertEquals([], default) + self.assertEquals(['blob_id'], custom) @pytest.mark.usefixtures("method_tmpdir") @defer.inlineCallbacks def test_count(self): backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir) content = 'blah' - yield backend.write_blob('user', 'blob_id_1', io.BytesIO(content)) - yield backend.write_blob('user', 'blob_id_2', io.BytesIO(content)) - yield backend.write_blob('user', 'blob_id_3', io.BytesIO(content)) + + ids = range(5) + + def _write(namespace=''): + producer = FileBodyProducer(io.BytesIO(content)) + d = backend.write_blob('user', str(ids.pop()), producer, + namespace=namespace) + return d + + yield _write() + yield _write() + yield _write() + count = yield backend.count('user') self.assertEqual(3, count) - yield backend.write_blob('user', 'blob_id_1', io.BytesIO(content), - namespace='xfiles') - yield backend.write_blob('user', 'blob_id_2', io.BytesIO(content), - namespace='xfiles') + + yield _write(namespace='xfiles') + yield _write(namespace='xfiles') + count = yield backend.count('user', namespace='xfiles') self.assertEqual(2, count) diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py index f278be74..23c0aa90 100644 --- a/tests/server/test_incoming_server.py +++ b/tests/server/test_incoming_server.py @@ -18,12 +18,13 @@ Integration tests for incoming API """ import pytest +import mock +import treq from io import BytesIO from uuid import uuid4 from twisted.web.server import Site from twisted.internet import reactor from twisted.internet import defer -import treq from leap.soledad.server._incoming import IncomingResource from leap.soledad.server._blobs import BlobsServerState @@ -82,9 +83,10 @@ class IncomingOnCouchServerTestCase(CouchDBTestCase): yield treq.put(incoming_endpoint, BytesIO(content), persistent=False) db = self.state.open_database(user_id) - fd = yield db.read_blob(user_id, doc_id, namespace='MX') + consumer = mock.Mock() + yield db.read_blob(user_id, doc_id, consumer, namespace='MX') flags = yield db.get_flags(user_id, doc_id, namespace='MX') - data = fd.read() + data = consumer.write.call_args[0][0] expected_preamble = formatter.preamble(content, doc_id) expected_preamble = decode_preamble(expected_preamble, True) written_preamble, written_content = data.split() |