summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-12-12 20:11:13 -0200
committerdrebs <drebs@leap.se>2017-12-13 13:52:30 -0200
commit383a19aaf87bd8e2665112c1350627140958eedf (patch)
tree402ba69a04158e6293ca3533d82b5515bba3c433 /src
parentcc480a2e6c11856759a5297e94a0ff128d0a1593 (diff)
[refactor] use producer/consumer on write/read_blob respectivelly
Diffstat (limited to 'src')
-rw-r--r--src/leap/soledad/server/_blobs.py135
-rw-r--r--src/leap/soledad/server/_incoming.py5
-rw-r--r--src/leap/soledad/server/interfaces.py15
3 files changed, 78 insertions, 77 deletions
diff --git a/src/leap/soledad/server/_blobs.py b/src/leap/soledad/server/_blobs.py
index 554fe5ac..70b55264 100644
--- a/src/leap/soledad/server/_blobs.py
+++ b/src/leap/soledad/server/_blobs.py
@@ -29,9 +29,6 @@ import base64
import json
import re
-from twisted.python.compat import intToBytes
-from twisted.web import http
-from twisted.web import static
from twisted.web import resource
from twisted.web.client import FileBodyProducer
from twisted.web.server import NOT_DONE_YET
@@ -93,15 +90,14 @@ class FilesystemBlobsBackend(object):
def __touch(self, path):
open(path, 'a')
- def read_blob(self, user, blob_id, namespace=''):
+ @defer.inlineCallbacks
+ def read_blob(self, user, blob_id, consumer, namespace=''):
logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
- try:
- path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
+ path = self._get_path(user, blob_id, namespace)
logger.debug('blob path: %s' % path)
- fd = open(path)
- return defer.succeed(fd)
+ with open(path) as fd:
+ producer = FileBodyProducer(fd)
+ yield producer.startProducing(consumer)
def get_flags(self, user, blob_id, namespace=''):
try:
@@ -132,7 +128,7 @@ class FilesystemBlobsBackend(object):
return defer.succeed(None)
@defer.inlineCallbacks
- def write_blob(self, user, blob_id, fd, namespace=''):
+ def write_blob(self, user, blob_id, producer, namespace=''):
yield self.semaphore.acquire()
path = self._get_path(user, blob_id, namespace)
try:
@@ -145,9 +141,8 @@ class FilesystemBlobsBackend(object):
if used > self.quota:
raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
- fbp = FileBodyProducer(fd)
with open(path, 'wb') as blobfile:
- yield fbp.startProducing(blobfile)
+ yield producer.startProducing(blobfile)
yield self.semaphore.release()
def delete_blob(self, user, blob_id, namespace=''):
@@ -292,23 +287,6 @@ class ImproperlyConfiguredException(Exception):
pass
-class BlobFile(resource.Resource):
-
- def __init__(self, fd):
- self.fd = fd
- self.fd.seek(0, 2)
- self.size = self.fd.tell()
- self.fd.seek(0)
-
- def render_GET(self, request):
- request.setHeader(b'content-length', intToBytes(self.size))
- request.setHeader(b'content-type', 'application/octet-stream')
- request.setResponseCode(http.OK)
- producer = static.NoRangeStaticProducer(request, self.fd)
- producer.start()
- return NOT_DONE_YET
-
-
def _catchBlobNotFound(failure, request, user, blob_id):
failure.trap(BlobNotFound)
logger.error("Error 404: Blob %s does not exist for user %s"
@@ -370,50 +348,70 @@ class BlobsResource(resource.Resource):
# TODO double check credentials, we can have then
# under request.
- def render_GET(self, request):
- logger.info("http get: %s" % request.path)
- user, blob_id, namespace = self._validate(request)
- if not blob_id and request.args.get('only_count', [False])[0]:
- d = self._handler.count(user, namespace)
- d.addCallback(lambda count: json.dumps({"count": count}))
- d.addCallback(lambda count: request.write(count))
- d.addCallback(lambda _: request.finish())
- return NOT_DONE_YET
- elif not blob_id:
- order = request.args.get('order_by', [None])[0]
- filter_flag = request.args.get('filter_flag', [False])[0]
- deleted = request.args.get('deleted', [False])[0]
- d = self._handler.list_blobs(user, namespace,
- order_by=order, deleted=deleted,
- filter_flag=filter_flag)
- d.addCallback(lambda blobs: json.dumps(blobs))
- d.addCallback(lambda blobs: request.write(blobs))
- d.addCallback(lambda _: request.finish())
- return NOT_DONE_YET
+ def _only_count(self, request, user, namespace):
+ d = self._handler.count(user, namespace)
+ d.addCallback(lambda count: json.dumps({"count": count}))
+ d.addCallback(lambda count: request.write(count))
+ d.addCallback(lambda _: request.finish())
+ return NOT_DONE_YET
- only_flags = request.args.get('only_flags', [False])[0]
- if only_flags:
- d = self._handler.get_flags(user, blob_id, namespace)
- d.addCallback(lambda flags: json.dumps(flags))
- d.addCallback(lambda flags: request.write(flags))
- d.addCallback(lambda _: request.finish())
- d.addErrback(_catchBlobNotFound, request, user, blob_id)
- d.addErrback(_catchAllErrors, request)
- return NOT_DONE_YET
+ def _list(self, request, user, namespace):
+ order = request.args.get('order_by', [None])[0]
+ filter_flag = request.args.get('filter_flag', [False])[0]
+ deleted = request.args.get('deleted', [False])[0]
+ d = self._handler.list_blobs(user, namespace,
+ order_by=order, deleted=deleted,
+ filter_flag=filter_flag)
+ d.addCallback(lambda blobs: json.dumps(blobs))
+ d.addCallback(lambda blobs: request.write(blobs))
+ d.addCallback(lambda _: request.finish())
+ return NOT_DONE_YET
+
+ def _only_flags(self, request, user, blob_id, namespace):
+ d = self._handler.get_flags(user, blob_id, namespace)
+ d.addCallback(lambda flags: json.dumps(flags))
+ d.addCallback(lambda flags: request.write(flags))
+ d.addCallback(lambda _: request.finish())
+ d.addErrback(_catchBlobNotFound, request, user, blob_id)
+ d.addErrback(_catchAllErrors, request)
+ return NOT_DONE_YET
+
+ def _get_blob(self, request, user, blob_id, namespace):
+
+ def _set_tag_header(tag):
+ request.responseHeaders.setRawHeaders('Tag', [tag])
+
+ def _read_blob(_):
+ handler = self._handler
+ consumer = request
+ d = handler.read_blob(user, blob_id, consumer, namespace=namespace)
+ return d
d = self._handler.get_tag(user, blob_id, namespace)
- d.addCallback(
- lambda tag: request.responseHeaders.setRawHeaders(
- 'Tag', [tag]))
- d.addCallback(lambda _: self._handler.read_blob(user, blob_id,
- namespace=namespace))
- d.addCallback(lambda fd: BlobFile(fd))
- d.addCallback(lambda res: res.render_GET(request))
+ d.addCallback(_set_tag_header)
+ d.addCallback(_read_blob)
+ d.addCallback(lambda _: request.finish())
d.addErrback(_catchBlobNotFound, request, user, blob_id)
d.addErrback(_catchAllErrors, request, finishRequest=True)
return NOT_DONE_YET
+ def render_GET(self, request):
+ logger.info("http get: %s" % request.path)
+ user, blob_id, namespace = self._validate(request)
+ only_flags = request.args.get('only_flags', [False])[0]
+
+ if not blob_id and request.args.get('only_count', [False])[0]:
+ return self._only_count(request, user, namespace)
+
+ if not blob_id:
+ return self._list(request, user, namespace)
+
+ if only_flags:
+ return self._only_flags(request, user, blob_id, namespace)
+
+ return self._get_blob(request, user, blob_id, namespace)
+
def render_DELETE(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
@@ -426,8 +424,9 @@ class BlobsResource(resource.Resource):
def render_PUT(self, request):
logger.info("http put: %s" % request.path)
user, blob_id, namespace = self._validate(request)
- fd = request.content
- d = self._handler.write_blob(user, blob_id, fd, namespace=namespace)
+ producer = FileBodyProducer(request.content)
+ handler = self._handler
+ d = handler.write_blob(user, blob_id, producer, namespace=namespace)
d.addCallback(lambda _: request.finish())
d.addErrback(_catchBlobExists, request, user, blob_id)
d.addErrback(_catchQuotaExceeded, request, user)
diff --git a/src/leap/soledad/server/_incoming.py b/src/leap/soledad/server/_incoming.py
index 362039af..071a94f6 100644
--- a/src/leap/soledad/server/_incoming.py
+++ b/src/leap/soledad/server/_incoming.py
@@ -22,6 +22,7 @@ import base64
from io import BytesIO
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
+from twisted.web.client import FileBodyProducer
from leap.soledad.common.blobs import Flags
from leap.soledad.common.blobs import preamble
@@ -100,8 +101,8 @@ class IncomingResource(Resource):
request.write('Quota Exceeded!')
request.finish()
- fd = request.content
- d = db.write_blob(user, blob_id, fd, namespace='MX')
+ producer = FileBodyProducer(request.content)
+ d = db.write_blob(user, blob_id, producer, namespace='MX')
flags = [Flags.PENDING]
d.addCallback(lambda _: db.set_flags(user, blob_id, flags,
namespace='MX'))
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
index d7a4aa70..624c8ff6 100644
--- a/src/leap/soledad/server/interfaces.py
+++ b/src/leap/soledad/server/interfaces.py
@@ -25,7 +25,7 @@ class IBlobsBackend(Interface):
An interface for a backend that can store blobs.
"""
- def read_blob(user, blob_id, namespace=''):
+ def read_blob(user, blob_id, consumer, namespace=''):
"""
Read a blob from the backend storage.
@@ -33,15 +33,17 @@ class IBlobsBackend(Interface):
:type user: str
:param blob_id: The id of the blob.
:type blob_id: str
+ :param consumer: The object to write data to.
+ :type consumer: twisted.internet.interfaces.IConsumer provider
:param namespace: An optional namespace for the blob.
:type namespace: str
- :return: A deferred that fires with a file-like object that gives
- access to the contents of the blob.
+ :return: A deferred that fires when the blob has been written to the
+ consumer.
:rtype: twisted.internet.defer.Deferred
"""
- def write_blob(user, blob_id, fd, namespace=''):
+ def write_blob(user, blob_id, producer, namespace=''):
"""
Write a blob to the backend storage.
@@ -49,9 +51,8 @@ class IBlobsBackend(Interface):
:type user: str
:param blob_id: The id of the blob.
:type blob_id: str
- :param fd: A file-like object into which the contents of the blob
- should be written.
- :type fd: file-like
+ :param producer: The object to read data from.
+ :type producer: twisted.internet.interfaces.IProducer provider
:param namespace: An optional namespace for the blob.
:type namespace: str