From 91f6ecf09ebb03f584ceabb85effb45b5ef08f3e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 27 Nov 2017 21:08:12 -0300 Subject: [feature] stream blobs from a CooperativeTask Adds a CooperativeTask to stream blobs to client as the transport is able to handle it, pausing and resuming as necessary. -- Related: #8809 --- src/leap/soledad/server/_streaming_resource.py | 50 ++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index 05f2bab6..f96220b0 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -22,6 +22,9 @@ A twisted resource that serves download as a single stream of multiple blobs. import json import struct +from zope.interface import implementer +from twisted.internet.interfaces import IPushProducer +from twisted.internet import task from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource @@ -59,17 +62,52 @@ class StreamingResource(Resource): db = self._handler raw_content = request.content.read() blob_ids = json.loads(raw_content) + paths = [] for blob_id in blob_ids: path = db._get_path(user, blob_id, namespace) size = db.get_blob_size(user, blob_id, namespace) - request.write(SIZE_PACKER.pack(size)) + paths.append((blob_id, path, size)) + DownstreamProducer(request, paths).start() + return NOT_DONE_YET + + +@implementer(IPushProducer) +class DownstreamProducer(object): + chunk_size = 2**14 + + def __init__(self, request, paths): + self.request = request + self.paths = paths + + def start(self): + iterator = self._gen_data() + self.task = task.cooperate(iterator) + self.request.registerProducer(self, streaming=True) + + def resumeProducing(self): + return self.task.resume() + + def pauseProducing(self): + return self.task.pause() + + def _gen_data(self): + request, paths = self.request, self.paths + while paths: + blob_id, path, size = paths.pop(0) + request.write(SIZE_PACKER.pack(size)) # sends file size with open(path, 'rb') as blob_fd: - # TODO: use a producer blob_fd.seek(-16, 2) - request.write(blob_fd.read()) # sends tag + request.write(blob_fd.read()) # sends AES-GCM tag blob_fd.seek(0) request.write(' ') - request.write(blob_fd.read()) - + data = blob_fd.read(self.chunk_size) + while data: + yield + request.write(data) + data = blob_fd.read(self.chunk_size) + request.unregisterProducer() request.finish() - return NOT_DONE_YET + + def stopProducing(self): + self.request = None + return self.task.stop() -- cgit v1.2.3