From fc7b99dab54ed59f0465f77f17b61486d4323fd0 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 21 Feb 2017 10:17:59 +0100 Subject: [feature] save blob to db, incrementally --- client/src/leap/soledad/client/_blobs.py | 158 +++++++++++++++++++----------- client/src/leap/soledad/client/_crypto.py | 11 +++ 2 files changed, 112 insertions(+), 57 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py index 952f1da7..426de56c 100644 --- a/client/src/leap/soledad/client/_blobs.py +++ b/client/src/leap/soledad/client/_blobs.py @@ -19,12 +19,64 @@ import treq from leap.soledad.client.sqlcipher import SQLCipherOptions from leap.soledad.client import pragmas -from _crypto import docinfo, BlobEncryptor, BlobDecryptor +from _crypto import DocInfo, BlobEncryptor, BlobDecryptor logger = Logger() +class _ConnectionPool(adbapi.ConnectionPool): + + def blob(self, table, column, key, value): + conn = self.connectionFactory(self) + # XXX FIXME what are these values??? + # shouldn't I pass the doc_id key?? Why is it asking for an integer??? + blob = conn.blob(table, column, 1, 1) + print "GOT BLOB", blob + return blob + + +""" +Ideally, the decrypting flow goes like this: + +- GET a blob from remote server. +- Decrypt the preamble +- Allocate a zeroblob in the sqlcipher sink +- Mark the blob as unusable (ie, not verified) +- Decrypt the payload incrementally, and write chunks to sqlcipher +- Finalize the AES decryption +- If preamble + payload verifies correctly, mark the blob as usable + +""" + + +class DecrypterBuffer(object): + + def __init__(self, doc_id, rev, secret): + self.decrypter = None + self.buffer = BytesIO() + self.doc_info = DocInfo(doc_id, rev) + self.secret = secret + self.d = None + + def write(self, data): + if not self.decrypter: + self.buffer.write(data) + self.decrypter = BlobDecryptor( + self.doc_info, self.buffer, + secret=self.secret, + armor=True, + start_stream=False) + self.d = self.decrypter.decrypt() + else: + self.decrypter.write(data) + + def close(self): + if self.d: + self.d.addCallback(lambda result: (result, self.decrypter.size)) + return self.d + + class BlobManager(object): def __init__(self, local_path, remote, key, secret): @@ -38,11 +90,10 @@ class BlobManager(object): yield self.local.put(doc.blob_id, fd) fd.seek(0) up = BytesIO() - doc_info = docinfo(doc.doc_id, doc.rev) + doc_info = DocInfo(doc.doc_id, doc.rev) # TODO ------------------------------------------ - # this is wrong, is doing 2 stages. Cutting corners! - # We should connect the pipeline, use Tubes: + # this is wrong, is doing 2 stages. # the crypto producer can be passed to # the uploader and react as data is written. # ------------------------------------------------ @@ -51,29 +102,23 @@ class BlobManager(object): @defer.inlineCallbacks def get(self, blob_id, doc_id, rev): - print "IN MANAGER: GETTING BLOB..." local_blob = yield self.local.get(blob_id) if local_blob: - print 'LOCAL BLOB', local_blob + print 'LOCAL BLOB', local_blob.getvalue() defer.returnValue(local_blob) print "NO LOCAL BLOB, WILL DOWNLOAD" - # TODO pass the fd to the downloader, possible? - remoteblob = yield self._download(blob_id) - ciphertext = BytesIO(str(remoteblob)) - - print 'remote ciphertext', remoteblob[:10], '[...]' - logger.debug('got remote blob %s [...]' % remoteblob[:100]) - del remoteblob + blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev) + print "Downloading", blob_id + print "BLOB", blob.getvalue(), "SIZE", size - doc_info = docinfo(doc_id, rev) - blob = yield self._decrypt(doc_info, ciphertext) + doc_info = DocInfo(doc_id, rev) if blob: print 'GOT DECRYPTED BLOB', type(blob) - blob.seek(0) print 'SAVING BLOB IN LOCAL STORE' - yield self.local.put(blob_id, blob) + blob.seek(0) + yield self.local.put(blob_id, blob, size=size) blob.seek(0) defer.returnValue(blob) else: @@ -87,19 +132,11 @@ class BlobManager(object): @defer.inlineCallbacks def _encrypt(self, doc_info, payload, result): - # TODO WE SHOULD SKIP THE BASE64 STEP!!!! - # this is going to be uploaded in binary mode + # TODO pass armor=False when the decrypter is bug-free crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret) yield crypter.encrypt() - @defer.inlineCallbacks - def _decrypt(self, doc_info, ciphertext): - decrypter = BlobDecryptor(doc_info, ciphertext, secret=self.secret) - blob = yield decrypter.decrypt() - defer.returnValue(blob) - - @defer.inlineCallbacks def _upload(self, blob_id, payload_fd): uri = self.remote + 'put' @@ -107,31 +144,15 @@ class BlobManager(object): files={'dafile': payload_fd}) @defer.inlineCallbacks - def _download(self, blob_id): - uri = self.remote + 'blobs/' + blob_id - blob_resp = yield treq.get(uri) - blob = yield treq.text_content(blob_resp) + def _download_and_decrypt(self, blob_id, doc_id, rev): + uri = self.remote + blob_id + buf = DecrypterBuffer(doc_id, rev, self.secret) + data = yield treq.get(uri) + yield treq.collect(data, buf.write) + blob = yield buf.close() defer.returnValue(blob) - -# --------------------8<---------------------------------------------- -class BlobDoc(object): - - # TODO probably not needed, but convenient for testing for now. - - def __init__(self, doc_id, rev, content, blob_id=None): - - self.doc_id = doc_id - self.rev = rev - self.is_blob = True - self.blob_fd = content - if blob_id is None: - blob_id = uuid4().get_hex() - self.blob_id = blob_id -# --------------------8<---------------------------------------------- - - class SQLiteBlobBackend(object): def __init__(self, path, key=None): @@ -144,18 +165,24 @@ class SQLiteBlobBackend(object): pragmafun = partial(pragmas.set_init_pragmas, opts=opts) openfun = _sqlcipherInitFactory(pragmafun) - self.dbpool = dbpool = adbapi.ConnectionPool( + self.dbpool = dbpool = _ConnectionPool( backend, self.path, check_same_thread=False, timeout=5, cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool') - @defer.inlineCallbacks - def put(self, blob_id, blob_fd): - insert = str('INSERT INTO blobs VALUES (?, ?)') - print 'inserting...' - raw = blob_fd.getvalue() - yield self.dbpool.runQuery(insert, (blob_id, Binary(raw))) - + def put(self, blob_id, blob_fd, size=None): + insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))') + yield self.dbpool.runQuery(insert, (blob_id, size)) + cleartext = blob_fd.read() + # FIXME --- I don't totally understand the parameters that are passed + # to that call + blob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id) + # TODO pass blob.write to a FileBodyProducer!!! + # should be as simple as: + # producer = BodyProducer(fd) + # producer.startproducing(blob) + blob.write(cleartext) + @defer.inlineCallbacks def get(self, blob_id): select = 'SELECT payload FROM blobs WHERE blob_id = ?' @@ -164,7 +191,6 @@ class SQLiteBlobBackend(object): defer.returnValue(BytesIO(str(result[0][0]))) - def _init_blob_table(conn): maybe_create = ( "CREATE TABLE IF NOT EXISTS " @@ -179,3 +205,21 @@ def _sqlcipherInitFactory(fun): fun(conn) _init_blob_table(conn) return _initialize + + +# --------------------8<---------------------------------------------- +class BlobDoc(object): + + # TODO probably not needed, but convenient for testing for now. + + def __init__(self, doc_id, rev, content, blob_id=None): + + self.doc_id = doc_id + self.rev = rev + self.is_blob = True + self.blob_fd = content + if blob_id is None: + blob_id = uuid4().get_hex() + self.blob_id = blob_id +# --------------------8<---------------------------------------------- + diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index c57f0921..8fc5154c 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -541,3 +541,14 @@ def _mode_by_method(method): return modes.GCM else: return modes.CTR + + +def _ceiling(size): + """ + Some simplistic ceiling scheme that uses powers of 2. + We report everything below 4096 bytes as that minimum threshold. + """ + for i in xrange(12, 31): + step = 2**i + if size < step: + return step -- cgit v1.2.3