summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-12-12 20:11:13 -0200
committerdrebs <drebs@leap.se>2017-12-13 13:52:30 -0200
commit383a19aaf87bd8e2665112c1350627140958eedf (patch)
tree402ba69a04158e6293ca3533d82b5515bba3c433
parentcc480a2e6c11856759a5297e94a0ff128d0a1593 (diff)
[refactor] use producer/consumer on write/read_blob respectivelly
-rw-r--r--src/leap/soledad/server/_blobs.py135
-rw-r--r--src/leap/soledad/server/_incoming.py5
-rw-r--r--src/leap/soledad/server/interfaces.py15
-rw-r--r--tests/benchmarks/test_blobs_fs_backend.py27
-rw-r--r--tests/blobs/test_fs_backend.py43
-rw-r--r--tests/server/test_incoming_server.py8
6 files changed, 125 insertions, 108 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
index 554fe5ac..70b55264 100644
--- a/src/leap/soledad/server/_blobs.py
+++ b/src/leap/soledad/server/_blobs.py
@@ -29,9 +29,6 @@ import base64
import json
import re
-from twisted.python.compat import intToBytes
-from twisted.web import http
-from twisted.web import static
from twisted.web import resource
from twisted.web.client import FileBodyProducer
from twisted.web.server import NOT_DONE_YET
@@ -93,15 +90,14 @@ class FilesystemBlobsBackend(object):
def __touch(self, path):
open(path, 'a')
- def read_blob(self, user, blob_id, namespace=''):
+ @defer.inlineCallbacks
+ def read_blob(self, user, blob_id, consumer, namespace=''):
logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
- try:
- path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
+ path = self._get_path(user, blob_id, namespace)
logger.debug('blob path: %s' % path)
- fd = open(path)
- return defer.succeed(fd)
+ with open(path) as fd:
+ producer = FileBodyProducer(fd)
+ yield producer.startProducing(consumer)
def get_flags(self, user, blob_id, namespace=''):
try:
@@ -132,7 +128,7 @@ class FilesystemBlobsBackend(object):
return defer.succeed(None)
@defer.inlineCallbacks
- def write_blob(self, user, blob_id, fd, namespace=''):
+ def write_blob(self, user, blob_id, producer, namespace=''):
yield self.semaphore.acquire()
path = self._get_path(user, blob_id, namespace)
try:
@@ -145,9 +141,8 @@ class FilesystemBlobsBackend(object):
if used > self.quota:
raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
- fbp = FileBodyProducer(fd)
with open(path, 'wb') as blobfile:
- yield fbp.startProducing(blobfile)
+ yield producer.startProducing(blobfile)
yield self.semaphore.release()
def delete_blob(self, user, blob_id, namespace=''):
@@ -292,23 +287,6 @@ class ImproperlyConfiguredException(Exception):
pass
-class BlobFile(resource.Resource):
-
- def __init__(self, fd):
- self.fd = fd
- self.fd.seek(0, 2)
- self.size = self.fd.tell()
- self.fd.seek(0)
-
- def render_GET(self, request):
- request.setHeader(b'content-length', intToBytes(self.size))
- request.setHeader(b'content-type', 'application/octet-stream')
- request.setResponseCode(http.OK)
- producer = static.NoRangeStaticProducer(request, self.fd)
- producer.start()
- return NOT_DONE_YET
-
-
def _catchBlobNotFound(failure, request, user, blob_id):
failure.trap(BlobNotFound)
logger.error("Error 404: Blob %s does not exist for user %s"
@@ -370,50 +348,70 @@ class BlobsResource(resource.Resource):
# TODO double check credentials, we can have then
# under request.
- def render_GET(self, request):
- logger.info("http get: %s" % request.path)
- user, blob_id, namespace = self._validate(request)
- if not blob_id and request.args.get('only_count', [False])[0]:
- d = self._handler.count(user, namespace)
- d.addCallback(lambda count: json.dumps({"count": count}))
- d.addCallback(lambda count: request.write(count))
- d.addCallback(lambda _: request.finish())
- return NOT_DONE_YET
- elif not blob_id:
- order = request.args.get('order_by', [None])[0]
- filter_flag = request.args.get('filter_flag', [False])[0]
- deleted = request.args.get('deleted', [False])[0]
- d = self._handler.list_blobs(user, namespace,
- order_by=order, deleted=deleted,
- filter_flag=filter_flag)
- d.addCallback(lambda blobs: json.dumps(blobs))
- d.addCallback(lambda blobs: request.write(blobs))
- d.addCallback(lambda _: request.finish())
- return NOT_DONE_YET
+ def _only_count(self, request, user, namespace):
+ d = self._handler.count(user, namespace)
+ d.addCallback(lambda count: json.dumps({"count": count}))
+ d.addCallback(lambda count: request.write(count))
+ d.addCallback(lambda _: request.finish())
+ return NOT_DONE_YET
- only_flags = request.args.get('only_flags', [False])[0]
- if only_flags:
- d = self._handler.get_flags(user, blob_id, namespace)
- d.addCallback(lambda flags: json.dumps(flags))
- d.addCallback(lambda flags: request.write(flags))
- d.addCallback(lambda _: request.finish())
- d.addErrback(_catchBlobNotFound, request, user, blob_id)
- d.addErrback(_catchAllErrors, request)
- return NOT_DONE_YET
+ def _list(self, request, user, namespace):
+ order = request.args.get('order_by', [None])[0]
+ filter_flag = request.args.get('filter_flag', [False])[0]
+ deleted = request.args.get('deleted', [False])[0]
+ d = self._handler.list_blobs(user, namespace,
+ order_by=order, deleted=deleted,
+ filter_flag=filter_flag)
+ d.addCallback(lambda blobs: json.dumps(blobs))
+ d.addCallback(lambda blobs: request.write(blobs))
+ d.addCallback(lambda _: request.finish())
+ return NOT_DONE_YET
+
+ def _only_flags(self, request, user, blob_id, namespace):
+ d = self._handler.get_flags(user, blob_id, namespace)
+ d.addCallback(lambda flags: json.dumps(flags))
+ d.addCallback(lambda flags: request.write(flags))
+ d.addCallback(lambda _: request.finish())
+ d.addErrback(_catchBlobNotFound, request, user, blob_id)
+ d.addErrback(_catchAllErrors, request)
+ return NOT_DONE_YET
+
+ def _get_blob(self, request, user, blob_id, namespace):
+
+ def _set_tag_header(tag):
+ request.responseHeaders.setRawHeaders('Tag', [tag])
+
+ def _read_blob(_):
+ handler = self._handler
+ consumer = request
+ d = handler.read_blob(user, blob_id, consumer, namespace=namespace)
+ return d
d = self._handler.get_tag(user, blob_id, namespace)
- d.addCallback(
- lambda tag: request.responseHeaders.setRawHeaders(
- 'Tag', [tag]))
- d.addCallback(lambda _: self._handler.read_blob(user, blob_id,
- namespace=namespace))
- d.addCallback(lambda fd: BlobFile(fd))
- d.addCallback(lambda res: res.render_GET(request))
+ d.addCallback(_set_tag_header)
+ d.addCallback(_read_blob)
+ d.addCallback(lambda _: request.finish())
d.addErrback(_catchBlobNotFound, request, user, blob_id)
d.addErrback(_catchAllErrors, request, finishRequest=True)
return NOT_DONE_YET
+ def render_GET(self, request):
+ logger.info("http get: %s" % request.path)
+ user, blob_id, namespace = self._validate(request)
+ only_flags = request.args.get('only_flags', [False])[0]
+
+ if not blob_id and request.args.get('only_count', [False])[0]:
+ return self._only_count(request, user, namespace)
+
+ if not blob_id:
+ return self._list(request, user, namespace)
+
+ if only_flags:
+ return self._only_flags(request, user, blob_id, namespace)
+
+ return self._get_blob(request, user, blob_id, namespace)
+
def render_DELETE(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
@@ -426,8 +424,9 @@ class BlobsResource(resource.Resource):
def render_PUT(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
- fd = request.content
- d = self._handler.write_blob(user, blob_id, fd, namespace=namespace)
+ producer = FileBodyProducer(request.content)
+ handler = self._handler
+ d = handler.write_blob(user, blob_id, producer, namespace=namespace)
d.addCallback(lambda _: request.finish())
d.addErrback(_catchBlobExists, request, user, blob_id)
d.addErrback(_catchQuotaExceeded, request, user)
diff --git a/src/leap/soledad/server/_incoming.py b/src/leap/soledad/server/_incoming.py
index 362039af..071a94f6 100644
--- a/src/leap/soledad/server/_incoming.py
+++ b/src/leap/soledad/server/_incoming.py
@@ -22,6 +22,7 @@ import base64
from io import BytesIO
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
+from twisted.web.client import FileBodyProducer
from leap.soledad.common.blobs import Flags
from leap.soledad.common.blobs import preamble
@@ -100,8 +101,8 @@ class IncomingResource(Resource):
request.write('Quota Exceeded!')
request.finish()
- fd = request.content
- d = db.write_blob(user, blob_id, fd, namespace='MX')
+ producer = FileBodyProducer(request.content)
+ d = db.write_blob(user, blob_id, producer, namespace='MX')
flags = [Flags.PENDING]
d.addCallback(lambda _: db.set_flags(user, blob_id, flags,
namespace='MX'))
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
index d7a4aa70..624c8ff6 100644
--- a/src/leap/soledad/server/interfaces.py
+++ b/src/leap/soledad/server/interfaces.py
@@ -25,7 +25,7 @@ class IBlobsBackend(Interface):
An interface for a backend that can store blobs.
"""
- def read_blob(user, blob_id, namespace=''):
+ def read_blob(user, blob_id, consumer, namespace=''):
"""
Read a blob from the backend storage.
@@ -33,15 +33,17 @@ class IBlobsBackend(Interface):
:type user: str
:param blob_id: The id of the blob.
:type blob_id: str
+ :param consumer: The object to write data to.
+ :type consumer: twisted.internet.interfaces.IConsumer provider
:param namespace: An optional namespace for the blob.
:type namespace: str
- :return: A deferred that fires with a file-like object that gives
- access to the contents of the blob.
+ :return: A deferred that fires when the blob has been written to the
+ consumer.
:rtype: twisted.internet.defer.Deferred
"""
- def write_blob(user, blob_id, fd, namespace=''):
+ def write_blob(user, blob_id, producer, namespace=''):
"""
Write a blob to the backend storage.
@@ -49,9 +51,8 @@ class IBlobsBackend(Interface):
:type user: str
:param blob_id: The id of the blob.
:type blob_id: str
- :param fd: A file-like object into which the contents of the blob
- should be written.
- :type fd: file-like
+ :param producer: The object to read data from.
+ :type producer: twisted.internet.interfaces.IProducer provider
:param namespace: An optional namespace for the blob.
:type namespace: str
diff --git a/tests/benchmarks/test_blobs_fs_backend.py b/tests/benchmarks/test_blobs_fs_backend.py
index d3a663d2..bb5b9b88 100644
--- a/tests/benchmarks/test_blobs_fs_backend.py
+++ b/tests/benchmarks/test_blobs_fs_backend.py
@@ -2,7 +2,7 @@ import pytest
from io import BytesIO
from leap.soledad.server._blobs import FilesystemBlobsBackend
from twisted.internet import defer
-from twisted.web.test.test_web import DummyRequest
+from twisted.web.client import FileBodyProducer
def create_write_test(amount, size):
@@ -12,12 +12,11 @@ def create_write_test(amount, size):
def test(txbenchmark, payload, tmpdir):
backend = FilesystemBlobsBackend(blobs_path=tmpdir.strpath)
data = payload(size)
+ semaphore = defer.DeferredSemaphore(100)
deferreds = []
for i in xrange(amount):
- fd = BytesIO(data)
- request = DummyRequest([''])
- request.content = fd
- d = backend.write_blob('user', str(i), request)
+ producer = FileBodyProducer(BytesIO(data))
+ d = semaphore.run(backend.write_blob, 'user', str(i), producer)
deferreds.append(d)
yield txbenchmark(defer.gatherResults, deferreds)
@@ -30,6 +29,12 @@ test_blobs_fs_backend_write_100_100k = create_write_test(100, 100 * 1000)
test_blobs_fs_backend_write_1000_10k = create_write_test(1000, 10 * 1000)
+class DevNull(object):
+
+ def write(self, data):
+ pass
+
+
def create_read_test(amount, size):
@pytest.inlineCallbacks
@@ -39,22 +44,20 @@ def create_read_test(amount, size):
data = payload(size)
# first write blobs to the backend...
+ semaphore = defer.DeferredSemaphore(100)
deferreds = []
for i in xrange(amount):
- fd = BytesIO(data)
- request = DummyRequest([''])
- request.content = fd
- d = backend.write_blob('user', str(i), request)
+ producer = FileBodyProducer(BytesIO(data))
+ d = semaphore.run(backend.write_blob, 'user', str(i), producer)
deferreds.append(d)
yield defer.gatherResults(deferreds)
# ... then measure the read operation
deferreds = []
for i in xrange(amount):
- request = DummyRequest([''])
- d = request.notifyFinish()
+ consumer = DevNull()
+ d = semaphore.run(backend.read_blob, 'user', str(i), consumer)
deferreds.append(d)
- backend.read_blob('user', str(i), request)
yield txbenchmark(defer.gatherResults, deferreds)
return test
diff --git a/tests/blobs/test_fs_backend.py b/tests/blobs/test_fs_backend.py
index 5b3ff30a..fc3d649a 100644
--- a/tests/blobs/test_fs_backend.py
+++ b/tests/blobs/test_fs_backend.py
@@ -28,7 +28,6 @@ import mock
import os
import base64
import io
-import json
import pytest
@@ -64,14 +63,16 @@ class FilesystemBackendTestCase(unittest.TestCase):
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch('leap.soledad.server._blobs.open')
- @mock.patch.object(_blobs.FilesystemBlobsBackend, '_get_path',
- Mock(return_value='path'))
+ @mock.patch('leap.soledad.server._blobs.FilesystemBlobsBackend._get_path')
@defer.inlineCallbacks
- def test_read_blob(self, open):
+ def test_read_blob(self, get_path, open):
+ get_path.return_value = 'path'
+ open.return_value = io.BytesIO('content')
backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir)
- yield backend.read_blob('user', 'blob_id')
- open.assert_called_once_with('path')
- backend._get_path.assert_called_once_with('user', 'blob_id', '')
+ consumer = Mock()
+ yield backend.read_blob('user', 'blob_id', consumer)
+ consumer.write.assert_called_with('content')
+ get_path.assert_called_once_with('user', 'blob_id', '')
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch.object(os.path, 'isfile')
@@ -215,22 +216,32 @@ class FilesystemBackendTestCase(unittest.TestCase):
namespace='custom')
default = yield backend.list_blobs('user')
custom = yield backend.list_blobs('user', namespace='custom')
- self.assertEquals([], json.loads(default))
- self.assertEquals(['blob_id'], json.loads(custom))
+ self.assertEquals([], default)
+ self.assertEquals(['blob_id'], custom)
@pytest.mark.usefixtures("method_tmpdir")
@defer.inlineCallbacks
def test_count(self):
backend = _blobs.FilesystemBlobsBackend(blobs_path=self.tempdir)
content = 'blah'
- yield backend.write_blob('user', 'blob_id_1', io.BytesIO(content))
- yield backend.write_blob('user', 'blob_id_2', io.BytesIO(content))
- yield backend.write_blob('user', 'blob_id_3', io.BytesIO(content))
+
+ ids = range(5)
+
+ def _write(namespace=''):
+ producer = FileBodyProducer(io.BytesIO(content))
+ d = backend.write_blob('user', str(ids.pop()), producer,
+ namespace=namespace)
+ return d
+
+ yield _write()
+ yield _write()
+ yield _write()
+
count = yield backend.count('user')
self.assertEqual(3, count)
- yield backend.write_blob('user', 'blob_id_1', io.BytesIO(content),
- namespace='xfiles')
- yield backend.write_blob('user', 'blob_id_2', io.BytesIO(content),
- namespace='xfiles')
+
+ yield _write(namespace='xfiles')
+ yield _write(namespace='xfiles')
+
count = yield backend.count('user', namespace='xfiles')
self.assertEqual(2, count)
diff --git a/tests/server/test_incoming_server.py b/tests/server/test_incoming_server.py
index f278be74..23c0aa90 100644
--- a/tests/server/test_incoming_server.py
+++ b/tests/server/test_incoming_server.py
@@ -18,12 +18,13 @@
Integration tests for incoming API
"""
import pytest
+import mock
+import treq
from io import BytesIO
from uuid import uuid4
from twisted.web.server import Site
from twisted.internet import reactor
from twisted.internet import defer
-import treq
from leap.soledad.server._incoming import IncomingResource
from leap.soledad.server._blobs import BlobsServerState
@@ -82,9 +83,10 @@ class IncomingOnCouchServerTestCase(CouchDBTestCase):
yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
db = self.state.open_database(user_id)
- fd = yield db.read_blob(user_id, doc_id, namespace='MX')
+ consumer = mock.Mock()
+ yield db.read_blob(user_id, doc_id, consumer, namespace='MX')
flags = yield db.get_flags(user_id, doc_id, namespace='MX')
- data = fd.read()
+ data = consumer.write.call_args[0][0]
expected_preamble = formatter.preamble(content, doc_id)
expected_preamble = decode_preamble(expected_preamble, True)
written_preamble, written_content = data.split()