summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/server/_streaming_resource.py39
1 files changed, 38 insertions, 1 deletions
diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py
index 59bb383e..bb5677ca 100644
--- a/src/leap/soledad/server/_streaming_resource.py
+++ b/src/leap/soledad/server/_streaming_resource.py
@@ -19,6 +19,7 @@ A twisted resource that serves download as a single stream of multiple blobs.
-> POST .../uuid/ DATA: [blob_id, blob_id2, ..., blob_idn]
<- [(size(blob_id), content(blob_id)) for blob_id in DATA] (as a binary stream)
"""
+import os
import json
import base64
@@ -28,6 +29,7 @@ from twisted.internet import task, defer
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
+from leap.common.files import mkdir_p
from leap.soledad.common.log import getLogger
from . import interfaces
from ._blobs import FilesystemBlobsBackend
@@ -58,10 +60,45 @@ class StreamingResource(Resource):
def render_POST(self, request):
user = request.postpath[0]
namespace = request.args.get('namespace', ['default'])[0]
- db = self._handler
+ direction = request.args.get('direction', ['download'])[0]
+ if direction == 'download':
+ return self._startDownstream(user, namespace, request)
+ elif direction == 'upload':
+ return self._startUpstream(user, namespace, request)
+ logger.error("Invalid direction value: %s - %s" % (user, direction))
+ request.setResponseCode(500)
+ request.write('error, supported direction values are download/upload')
+ request.finish()
+ return ''
+
+ def _startUpstream(self, user, namespace, request):
+ # TODO: at this point, Twisted wrote the request to a temporary file,
+ # so it's a disk->disk operation. This has to be improved if benchmark
+ # shows its worth.
+ content = request.content
+ incoming_list = json.loads(content.readline())
+ # TODO: NEEDS SANITIZING
+ for (blob_id, size) in incoming_list:
+ db = self._handler
+ path = db._get_path(user, blob_id, namespace)
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError as e:
+ logger.warn("Got exception trying to create directory: %r" % e)
+ with open(path, 'wb') as blob_fd:
+ consumed = 0
+ while consumed < size:
+ read_size = min(size - consumed, 2**16)
+ data = content.read(read_size)
+ consumed += read_size
+ blob_fd.write(data)
+ return ''
+
+ def _startDownstream(self, user, namespace, request):
raw_content = request.content.read()
blob_ids = json.loads(raw_content)
deferreds = []
+ db = self._handler
for blob_id in blob_ids:
def _get_blob_info(blob_id, path):