From 56a4ffd94a529722d50367ada38c1bcff64446fe Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 14 Dec 2017 14:33:21 -0300 Subject: [feature] add a resource for streaming -- Related: #8809 --- src/leap/soledad/server/_streaming_resource.py | 39 +++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) (limited to 'src/leap/soledad/server') diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index 59bb383e..bb5677ca 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -19,6 +19,7 @@ A twisted resource that serves download as a single stream of multiple blobs. -> POST .../uuid/ DATA: [blob_id, blob_id2, ..., blob_idn] <- [(size(blob_id), content(blob_id)) for blob_id in DATA] (as a binary stream) """ +import os import json import base64 @@ -28,6 +29,7 @@ from twisted.internet import task, defer from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource +from leap.common.files import mkdir_p from leap.soledad.common.log import getLogger from . import interfaces from ._blobs import FilesystemBlobsBackend @@ -58,10 +60,45 @@ class StreamingResource(Resource): def render_POST(self, request): user = request.postpath[0] namespace = request.args.get('namespace', ['default'])[0] - db = self._handler + direction = request.args.get('direction', ['download'])[0] + if direction == 'download': + return self._startDownstream(user, namespace, request) + elif direction == 'upload': + return self._startUpstream(user, namespace, request) + logger.error("Invalid direction value: %s - %s" % (user, direction)) + request.setResponseCode(500) + request.write('error, supported direction values are download/upload') + request.finish() + return '' + + def _startUpstream(self, user, namespace, request): + # TODO: at this point, Twisted wrote the request to a temporary file, + # so it's a disk->disk operation. This has to be improved if benchmark + # shows its worth. + content = request.content + incoming_list = json.loads(content.readline()) + # TODO: NEEDS SANITIZING + for (blob_id, size) in incoming_list: + db = self._handler + path = db._get_path(user, blob_id, namespace) + try: + mkdir_p(os.path.split(path)[0]) + except OSError as e: + logger.warn("Got exception trying to create directory: %r" % e) + with open(path, 'wb') as blob_fd: + consumed = 0 + while consumed < size: + read_size = min(size - consumed, 2**16) + data = content.read(read_size) + consumed += read_size + blob_fd.write(data) + return '' + + def _startDownstream(self, user, namespace, request): raw_content = request.content.read() blob_ids = json.loads(raw_content) deferreds = [] + db = self._handler for blob_id in blob_ids: def _get_blob_info(blob_id, path): -- cgit v1.2.3