From a3d267e087c6b268f64f32f5f6e65e89b0c577df Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 20 Dec 2017 13:59:47 -0300 Subject: [feature] consume received stream in a thread As it's blocking and Twisted already stored everything on a file. -- Related: #8809 --- src/leap/soledad/server/_streaming_resource.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index bb5677ca..3cf798e3 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -25,7 +25,7 @@ import base64 from zope.interface import implementer from twisted.internet.interfaces import IPushProducer -from twisted.internet import task, defer +from twisted.internet import task, defer, threads from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource @@ -75,11 +75,19 @@ class StreamingResource(Resource): # TODO: at this point, Twisted wrote the request to a temporary file, # so it's a disk->disk operation. This has to be improved if benchmark # shows its worth. + args = (user, namespace, request) + d = threads.deferToThread(self._consume_stream, *args) + d.addCallback(lambda _: request.finish()) + + return NOT_DONE_YET + + def _consume_stream(self, user, namespace, request): + chunk_size = 2**14 content = request.content incoming_list = json.loads(content.readline()) - # TODO: NEEDS SANITIZING for (blob_id, size) in incoming_list: db = self._handler + # TODO: NEEDS SANITIZING path = db._get_path(user, blob_id, namespace) try: mkdir_p(os.path.split(path)[0]) @@ -88,11 +96,10 @@ class StreamingResource(Resource): with open(path, 'wb') as blob_fd: consumed = 0 while consumed < size: - read_size = min(size - consumed, 2**16) + read_size = min(size - consumed, chunk_size) data = content.read(read_size) consumed += read_size blob_fd.write(data) - return '' def _startDownstream(self, user, namespace, request): raw_content = request.content.read() -- cgit v1.2.3