summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/server/_streaming_resource.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py
index bb5677ca..3cf798e3 100644
--- a/src/leap/soledad/server/_streaming_resource.py
+++ b/src/leap/soledad/server/_streaming_resource.py
@@ -25,7 +25,7 @@ import base64
from zope.interface import implementer
from twisted.internet.interfaces import IPushProducer
-from twisted.internet import task, defer
+from twisted.internet import task, defer, threads
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
@@ -75,11 +75,19 @@ class StreamingResource(Resource):
# TODO: at this point, Twisted wrote the request to a temporary file,
# so it's a disk->disk operation. This has to be improved if benchmark
# shows its worth.
+ args = (user, namespace, request)
+ d = threads.deferToThread(self._consume_stream, *args)
+ d.addCallback(lambda _: request.finish())
+
+ return NOT_DONE_YET
+
+ def _consume_stream(self, user, namespace, request):
+ chunk_size = 2**14
content = request.content
incoming_list = json.loads(content.readline())
- # TODO: NEEDS SANITIZING
for (blob_id, size) in incoming_list:
db = self._handler
+ # TODO: NEEDS SANITIZING
path = db._get_path(user, blob_id, namespace)
try:
mkdir_p(os.path.split(path)[0])
@@ -88,11 +96,10 @@ class StreamingResource(Resource):
with open(path, 'wb') as blob_fd:
consumed = 0
while consumed < size:
- read_size = min(size - consumed, 2**16)
+ read_size = min(size - consumed, chunk_size)
data = content.read(read_size)
consumed += read_size
blob_fd.write(data)
- return ''
def _startDownstream(self, user, namespace, request):
raw_content = request.content.read()