summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-11-27 21:08:12 -0300
committerVictor Shyba <victor1984@riseup.net>2017-12-01 01:38:08 -0300
commit91f6ecf09ebb03f584ceabb85effb45b5ef08f3e (patch)
tree4f2efb49143bd7c50055f8f74798832fa5e46fba
parentc3d079de4675b0fceca130ed3c6b8890ec28d873 (diff)
[feature] stream blobs from a CooperativeTask
Adds a CooperativeTask to stream blobs to client as the transport is able to handle it, pausing and resuming as necessary. -- Related: #8809
-rw-r--r--src/leap/soledad/server/_streaming_resource.py50
1 files changed, 44 insertions, 6 deletions
diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py
index 05f2bab6..f96220b0 100644
--- a/src/leap/soledad/server/_streaming_resource.py
+++ b/src/leap/soledad/server/_streaming_resource.py
@@ -22,6 +22,9 @@ A twisted resource that serves download as a single stream of multiple blobs.
import json
import struct
+from zope.interface import implementer
+from twisted.internet.interfaces import IPushProducer
+from twisted.internet import task
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
@@ -59,17 +62,52 @@ class StreamingResource(Resource):
db = self._handler
raw_content = request.content.read()
blob_ids = json.loads(raw_content)
+ paths = []
for blob_id in blob_ids:
path = db._get_path(user, blob_id, namespace)
size = db.get_blob_size(user, blob_id, namespace)
- request.write(SIZE_PACKER.pack(size))
+ paths.append((blob_id, path, size))
+ DownstreamProducer(request, paths).start()
+ return NOT_DONE_YET
+
+
+@implementer(IPushProducer)
+class DownstreamProducer(object):
+ chunk_size = 2**14
+
+ def __init__(self, request, paths):
+ self.request = request
+ self.paths = paths
+
+ def start(self):
+ iterator = self._gen_data()
+ self.task = task.cooperate(iterator)
+ self.request.registerProducer(self, streaming=True)
+
+ def resumeProducing(self):
+ return self.task.resume()
+
+ def pauseProducing(self):
+ return self.task.pause()
+
+ def _gen_data(self):
+ request, paths = self.request, self.paths
+ while paths:
+ blob_id, path, size = paths.pop(0)
+ request.write(SIZE_PACKER.pack(size)) # sends file size
with open(path, 'rb') as blob_fd:
- # TODO: use a producer
blob_fd.seek(-16, 2)
- request.write(blob_fd.read()) # sends tag
+ request.write(blob_fd.read()) # sends AES-GCM tag
blob_fd.seek(0)
request.write(' ')
- request.write(blob_fd.read())
-
+ data = blob_fd.read(self.chunk_size)
+ while data:
+ yield
+ request.write(data)
+ data = blob_fd.read(self.chunk_size)
+ request.unregisterProducer()
request.finish()
- return NOT_DONE_YET
+
+ def stopProducing(self):
+ self.request = None
+ return self.task.stop()