From 5ca40e181f445393260641093e66d9b42e455f55 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 14 Dec 2017 14:30:52 -0300 Subject: [feature] client implementation for upstream -- Resolves #8773 --- src/leap/soledad/client/_db/blobs/__init__.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 4db7df6a..a0b48035 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -41,6 +41,7 @@ from leap.soledad.client._crypto import InvalidBlob from leap.soledad.client._crypto import BlobEncryptor from leap.soledad.client._crypto import BlobDecryptor from leap.soledad.client._crypto import EncryptionSchemeNotImplementedException +from leap.soledad.client._crypto import get_unarmored_ciphertext_size from leap.soledad.client._http import HTTPClient from leap.soledad.client._pipes import TruncatedTailPipe from leap.soledad.client._pipes import PreamblePipe @@ -495,6 +496,29 @@ class BlobManager(BlobsSynchronizer): yield defer.gatherResults(deferreds, consumeErrors=True) buf.close() + @defer.inlineCallbacks + def _upstream(self, blobs_id_list, namespace=''): + uri = urljoin(self.remote_stream, self.user) + params = {'namespace': namespace} if namespace else None + sizes = yield self.local.get_size_list(blobs_id_list, namespace) + convert = get_unarmored_ciphertext_size + sizes = map(lambda (x, y): (x, convert(y)), sizes) + data = BytesIO() # TODO: stream from db + data.write(json.dumps(sizes) + '\n') + for blob_id in blobs_id_list: + blob_fd = yield self.local.get(blob_id, namespace=namespace) + doc_info = DocInfo(blob_id, FIXED_REV) + crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret, + armor=False) + fd = yield crypter.encrypt() + data.write(fd.read()) + data.seek(0) + params = {'namespace': namespace} if namespace else {} + params['direction'] = 'upload' + response = yield self._client.post(uri, data=data, params=params) + check_http_status(response.code, blob_id) + logger.info("Finished stream up: %s" % (blobs_id_list,)) + @defer.inlineCallbacks def _download_and_decrypt(self, blob_id, namespace=''): logger.info("Staring download of blob: %s" % blob_id) -- cgit v1.2.3