diff options
Diffstat (limited to 'src/leap/soledad/client/_db')
-rw-r--r-- | src/leap/soledad/client/_db/blobs/__init__.py | 72 |
1 files changed, 71 insertions, 1 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index ddd22b4b..3daf8d1a 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -23,6 +23,7 @@ from urlparse import urljoin import os import json import base64 +import struct from collections import defaultdict from io import BytesIO @@ -104,6 +105,55 @@ class DecrypterBuffer(object): return self.raw_data, self.raw_data.tell() +class StreamDecrypterBuffer(object): + size_pack = struct.Struct('<I') + + def __init__(self, secret, blobs_list, done_callback): + self.blobs_list = blobs_list + self.secret = secret + self.done_callback = done_callback + self.buf = b'' + self.reset() + + def reset(self): + self.current_blob_size = False + self.current_blob_id = None + self.received = 0 + + def write(self, data): + if not self.current_blob_size: + self.buf += data + if ' ' in self.buf: + marker, self.buf = self.buf.split(' ') + assert(len(marker) == 20) # 16 byte tag + 4 byte size + size, tag = marker[:4], marker[4:] + self.current_blob_size = self.size_pack.unpack(size)[0] + self.received = len(self.buf) + blob_id = self.blobs_list.pop(0) + buf = DecrypterBuffer(blob_id, self.secret, tag) + self.current_blob_id = blob_id + buf.write(self.buf) + self.buf = buf + elif (self.received + len(data)) < self.current_blob_size: + self.buf.write(data) + self.received += len(data) + else: + missing = self.current_blob_size - self.received + self.buf.write(data[:missing]) + blob_id = self.current_blob_id + fd, size = self.buf.close() + self.done_callback(blob_id, fd, size) + self.buf = data[missing:] + self.reset() + + def close(self): + if self.received != 0: + missing = self.current_blob_size - self.received + raise Exception("Incomplete download! missing: %s" % missing) + if self.blobs_list: + raise Exception("Missing from stream: %s" % self.blobs_list) + + class BlobManager(BlobsSynchronizer): """ The BlobManager can list, put, get, set flags and synchronize blobs stored @@ -115,7 +165,7 @@ class BlobManager(BlobsSynchronizer): def __init__( self, local_path, remote, key, secret, user, token=None, - cert_file=None): + cert_file=None, remote_stream=None): """ Initialize the blob manager. @@ -131,12 +181,15 @@ class BlobManager(BlobsSynchronizer): :type token: str :param cert_file: The path to the CA certificate file. :type cert_file: str + :param cert_file: Remote storage stream URL, if supported. + :type cert_file: str """ super(BlobsSynchronizer, self).__init__() if local_path: mkdir_p(os.path.dirname(local_path)) self.local = SQLiteBlobBackend(local_path, key=key, user=user) self.remote = remote + self.remote_stream = remote_stream self.secret = secret self.user = user self._client = HTTPClient(user, token, cert_file) @@ -424,6 +477,23 @@ class BlobManager(BlobsSynchronizer): logger.info("Finished upload: %s" % (blob_id,)) @defer.inlineCallbacks + def _downstream(self, blobs_id_list, namespace=''): + uri = urljoin(self.remote_stream, self.user) + params = {'namespace': namespace} if namespace else None + data = BytesIO(json.dumps(list(blobs_id_list))) + response = yield self._client.post(uri, params=params, data=data) + deferreds = [] + + def done_cb(blob_id, blobfd, size): + d = self.local.put(blob_id, blobfd, size=size, namespace=namespace) + deferreds.append(d) + buf = StreamDecrypterBuffer(self.secret, blobs_id_list, done_cb) + + yield treq.collect(response, buf.write) + yield defer.gatherResults(deferreds, consumeErrors=True) + buf.close() + + @defer.inlineCallbacks def _download_and_decrypt(self, blob_id, namespace=''): logger.info("Staring download of blob: %s" % blob_id) # TODO this needs to be connected in a tube |