diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-12-20 13:59:47 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-12-22 13:51:28 -0300 |
commit | a3d267e087c6b268f64f32f5f6e65e89b0c577df (patch) | |
tree | 924072278d51a4d281716b049ff93efe34bc76ab | |
parent | 1bce0175b98f94c784181b02a4e17d4c14c732c5 (diff) |
[feature] consume received stream in a thread
As it's blocking and Twisted already stored everything on a file.
-- Related: #8809
-rw-r--r-- | src/leap/soledad/server/_streaming_resource.py | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index bb5677ca..3cf798e3 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -25,7 +25,7 @@ import base64 from zope.interface import implementer from twisted.internet.interfaces import IPushProducer -from twisted.internet import task, defer +from twisted.internet import task, defer, threads from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource @@ -75,11 +75,19 @@ class StreamingResource(Resource): # TODO: at this point, Twisted wrote the request to a temporary file, # so it's a disk->disk operation. This has to be improved if benchmark # shows its worth. + args = (user, namespace, request) + d = threads.deferToThread(self._consume_stream, *args) + d.addCallback(lambda _: request.finish()) + + return NOT_DONE_YET + + def _consume_stream(self, user, namespace, request): + chunk_size = 2**14 content = request.content incoming_list = json.loads(content.readline()) - # TODO: NEEDS SANITIZING for (blob_id, size) in incoming_list: db = self._handler + # TODO: NEEDS SANITIZING path = db._get_path(user, blob_id, namespace) try: mkdir_p(os.path.split(path)[0]) @@ -88,11 +96,10 @@ class StreamingResource(Resource): with open(path, 'wb') as blob_fd: consumed = 0 while consumed < size: - read_size = min(size - consumed, 2**16) + read_size = min(size - consumed, chunk_size) data = content.read(read_size) consumed += read_size blob_fd.write(data) - return '' def _startDownstream(self, user, namespace, request): raw_content = request.content.read() |