diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-12-14 14:30:52 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-12-22 13:51:27 -0300 |
commit | 5ca40e181f445393260641093e66d9b42e455f55 (patch) | |
tree | c76ef9ca947de240102b28ddcdd49c133fc2d2bb | |
parent | 91da999132b9c7afbbd9259915556c61f49497ba (diff) |
[feature] client implementation for upstream
-- Resolves #8773
-rw-r--r-- | src/leap/soledad/client/_db/blobs/__init__.py | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 4db7df6a..a0b48035 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -41,6 +41,7 @@ from leap.soledad.client._crypto import InvalidBlob from leap.soledad.client._crypto import BlobEncryptor from leap.soledad.client._crypto import BlobDecryptor from leap.soledad.client._crypto import EncryptionSchemeNotImplementedException +from leap.soledad.client._crypto import get_unarmored_ciphertext_size from leap.soledad.client._http import HTTPClient from leap.soledad.client._pipes import TruncatedTailPipe from leap.soledad.client._pipes import PreamblePipe @@ -496,6 +497,29 @@ class BlobManager(BlobsSynchronizer): buf.close() @defer.inlineCallbacks + def _upstream(self, blobs_id_list, namespace=''): + uri = urljoin(self.remote_stream, self.user) + params = {'namespace': namespace} if namespace else None + sizes = yield self.local.get_size_list(blobs_id_list, namespace) + convert = get_unarmored_ciphertext_size + sizes = map(lambda (x, y): (x, convert(y)), sizes) + data = BytesIO() # TODO: stream from db + data.write(json.dumps(sizes) + '\n') + for blob_id in blobs_id_list: + blob_fd = yield self.local.get(blob_id, namespace=namespace) + doc_info = DocInfo(blob_id, FIXED_REV) + crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret, + armor=False) + fd = yield crypter.encrypt() + data.write(fd.read()) + data.seek(0) + params = {'namespace': namespace} if namespace else {} + params['direction'] = 'upload' + response = yield self._client.post(uri, data=data, params=params) + check_http_status(response.code, blob_id) + logger.info("Finished stream up: %s" % (blobs_id_list,)) + + @defer.inlineCallbacks def _download_and_decrypt(self, blob_id, namespace=''): logger.info("Staring download of blob: %s" % blob_id) # TODO this needs to be connected in a tube |