diff options
| author | Victor Shyba <victor1984@riseup.net> | 2017-11-27 21:08:12 -0300 | 
|---|---|---|
| committer | Victor Shyba <victor1984@riseup.net> | 2017-12-01 01:38:08 -0300 | 
| commit | 91f6ecf09ebb03f584ceabb85effb45b5ef08f3e (patch) | |
| tree | 4f2efb49143bd7c50055f8f74798832fa5e46fba /src | |
| parent | c3d079de4675b0fceca130ed3c6b8890ec28d873 (diff) | |
[feature] stream blobs from a CooperativeTask
Adds a CooperativeTask to stream blobs to client as the transport is
able to handle it, pausing and resuming as necessary.
-- Related: #8809
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/soledad/server/_streaming_resource.py | 50 | 
1 files changed, 44 insertions, 6 deletions
| diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index 05f2bab6..f96220b0 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -22,6 +22,9 @@ A twisted resource that serves download as a single stream of multiple blobs.  import json  import struct +from zope.interface import implementer +from twisted.internet.interfaces import IPushProducer +from twisted.internet import task  from twisted.web.server import NOT_DONE_YET  from twisted.web.resource import Resource @@ -59,17 +62,52 @@ class StreamingResource(Resource):          db = self._handler          raw_content = request.content.read()          blob_ids = json.loads(raw_content) +        paths = []          for blob_id in blob_ids:              path = db._get_path(user, blob_id, namespace)              size = db.get_blob_size(user, blob_id, namespace) -            request.write(SIZE_PACKER.pack(size)) +            paths.append((blob_id, path, size)) +        DownstreamProducer(request, paths).start() +        return NOT_DONE_YET + + +@implementer(IPushProducer) +class DownstreamProducer(object): +    chunk_size = 2**14 + +    def __init__(self, request, paths): +        self.request = request +        self.paths = paths + +    def start(self): +        iterator = self._gen_data() +        self.task = task.cooperate(iterator) +        self.request.registerProducer(self, streaming=True) + +    def resumeProducing(self): +        return self.task.resume() + +    def pauseProducing(self): +        return self.task.pause() + +    def _gen_data(self): +        request, paths = self.request, self.paths +        while paths: +            blob_id, path, size = paths.pop(0) +            request.write(SIZE_PACKER.pack(size))  # sends file size              with open(path, 'rb') as blob_fd: -                # TODO: use a producer                  blob_fd.seek(-16, 2) -                request.write(blob_fd.read())  # sends tag +                request.write(blob_fd.read())  # sends AES-GCM tag                  blob_fd.seek(0)                  request.write(' ') -                request.write(blob_fd.read()) - +                data = blob_fd.read(self.chunk_size) +                while data: +                    yield +                    request.write(data) +                    data = blob_fd.read(self.chunk_size) +        request.unregisterProducer()          request.finish() -        return NOT_DONE_YET + +    def stopProducing(self): +        self.request = None +        return self.task.stop() | 
