summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/_db/blobs/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client/_db/blobs/__init__.py')
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py19
1 files changed, 6 insertions, 13 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index a0b48035..8d469760 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -50,6 +50,7 @@ from .sql import SyncStatus
from .sql import Priority
from .sql import SQLiteBlobBackend
from .sync import BlobsSynchronizer
+from .upstream_producer import BlobsUpstreamProducer
from .errors import (
BlobAlreadyExistsError, MaximumRetriesError,
RetriableTransferError, BlobNotFoundError, InvalidFlagsError)
@@ -498,25 +499,17 @@ class BlobManager(BlobsSynchronizer):
@defer.inlineCallbacks
def _upstream(self, blobs_id_list, namespace=''):
+ local, secret = self.local, self.secret
uri = urljoin(self.remote_stream, self.user)
params = {'namespace': namespace} if namespace else None
sizes = yield self.local.get_size_list(blobs_id_list, namespace)
convert = get_unarmored_ciphertext_size
- sizes = map(lambda (x, y): (x, convert(y)), sizes)
- data = BytesIO() # TODO: stream from db
- data.write(json.dumps(sizes) + '\n')
- for blob_id in blobs_id_list:
- blob_fd = yield self.local.get(blob_id, namespace=namespace)
- doc_info = DocInfo(blob_id, FIXED_REV)
- crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
- armor=False)
- fd = yield crypter.encrypt()
- data.write(fd.read())
- data.seek(0)
+ sizes = map(lambda (blob_id, size): (blob_id, convert(size)), sizes)
+ producer = BlobsUpstreamProducer(local, sizes, namespace, secret)
params = {'namespace': namespace} if namespace else {}
params['direction'] = 'upload'
- response = yield self._client.post(uri, data=data, params=params)
- check_http_status(response.code, blob_id)
+ response = yield self._client.post(uri, data=producer, params=params)
+ check_http_status(response.code, 'stream')
logger.info("Finished stream up: %s" % (blobs_id_list,))
@defer.inlineCallbacks