summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2017-02-21 10:17:59 +0100
committerdrebs <drebs@leap.se>2017-04-04 18:27:31 +0200
commitfc7b99dab54ed59f0465f77f17b61486d4323fd0 (patch)
tree781647ec52e635a274d32f9417364535f136f67c /client/src/leap/soledad
parentdf858f17066dbebf3892efe85414b57951767ac5 (diff)
[feature] save blob to db, incrementally
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/_blobs.py158
-rw-r--r--client/src/leap/soledad/client/_crypto.py11
2 files changed, 112 insertions, 57 deletions
diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py
index 952f1da7..426de56c 100644
--- a/client/src/leap/soledad/client/_blobs.py
+++ b/client/src/leap/soledad/client/_blobs.py
@@ -19,12 +19,64 @@ import treq
from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client import pragmas
-from _crypto import docinfo, BlobEncryptor, BlobDecryptor
+from _crypto import DocInfo, BlobEncryptor, BlobDecryptor
logger = Logger()
+class _ConnectionPool(adbapi.ConnectionPool):
+
+ def blob(self, table, column, key, value):
+ conn = self.connectionFactory(self)
+ # XXX FIXME what are these values???
+ # shouldn't I pass the doc_id key?? Why is it asking for an integer???
+ blob = conn.blob(table, column, 1, 1)
+ print "GOT BLOB", blob
+ return blob
+
+
+"""
+Ideally, the decrypting flow goes like this:
+
+- GET a blob from remote server.
+- Decrypt the preamble
+- Allocate a zeroblob in the sqlcipher sink
+- Mark the blob as unusable (ie, not verified)
+- Decrypt the payload incrementally, and write chunks to sqlcipher
+- Finalize the AES decryption
+- If preamble + payload verifies correctly, mark the blob as usable
+
+"""
+
+
+class DecrypterBuffer(object):
+
+ def __init__(self, doc_id, rev, secret):
+ self.decrypter = None
+ self.buffer = BytesIO()
+ self.doc_info = DocInfo(doc_id, rev)
+ self.secret = secret
+ self.d = None
+
+ def write(self, data):
+ if not self.decrypter:
+ self.buffer.write(data)
+ self.decrypter = BlobDecryptor(
+ self.doc_info, self.buffer,
+ secret=self.secret,
+ armor=True,
+ start_stream=False)
+ self.d = self.decrypter.decrypt()
+ else:
+ self.decrypter.write(data)
+
+ def close(self):
+ if self.d:
+ self.d.addCallback(lambda result: (result, self.decrypter.size))
+ return self.d
+
+
class BlobManager(object):
def __init__(self, local_path, remote, key, secret):
@@ -38,11 +90,10 @@ class BlobManager(object):
yield self.local.put(doc.blob_id, fd)
fd.seek(0)
up = BytesIO()
- doc_info = docinfo(doc.doc_id, doc.rev)
+ doc_info = DocInfo(doc.doc_id, doc.rev)
# TODO ------------------------------------------
- # this is wrong, is doing 2 stages. Cutting corners!
- # We should connect the pipeline, use Tubes:
+ # this is wrong, is doing 2 stages.
# the crypto producer can be passed to
# the uploader and react as data is written.
# ------------------------------------------------
@@ -51,29 +102,23 @@ class BlobManager(object):
@defer.inlineCallbacks
def get(self, blob_id, doc_id, rev):
- print "IN MANAGER: GETTING BLOB..."
local_blob = yield self.local.get(blob_id)
if local_blob:
- print 'LOCAL BLOB', local_blob
+ print 'LOCAL BLOB', local_blob.getvalue()
defer.returnValue(local_blob)
print "NO LOCAL BLOB, WILL DOWNLOAD"
- # TODO pass the fd to the downloader, possible?
- remoteblob = yield self._download(blob_id)
- ciphertext = BytesIO(str(remoteblob))
-
- print 'remote ciphertext', remoteblob[:10], '[...]'
- logger.debug('got remote blob %s [...]' % remoteblob[:100])
- del remoteblob
+ blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev)
+ print "Downloading", blob_id
+ print "BLOB", blob.getvalue(), "SIZE", size
- doc_info = docinfo(doc_id, rev)
- blob = yield self._decrypt(doc_info, ciphertext)
+ doc_info = DocInfo(doc_id, rev)
if blob:
print 'GOT DECRYPTED BLOB', type(blob)
- blob.seek(0)
print 'SAVING BLOB IN LOCAL STORE'
- yield self.local.put(blob_id, blob)
+ blob.seek(0)
+ yield self.local.put(blob_id, blob, size=size)
blob.seek(0)
defer.returnValue(blob)
else:
@@ -87,51 +132,27 @@ class BlobManager(object):
@defer.inlineCallbacks
def _encrypt(self, doc_info, payload, result):
- # TODO WE SHOULD SKIP THE BASE64 STEP!!!!
- # this is going to be uploaded in binary mode
+ # TODO pass armor=False when the decrypter is bug-free
crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret)
yield crypter.encrypt()
@defer.inlineCallbacks
- def _decrypt(self, doc_info, ciphertext):
- decrypter = BlobDecryptor(doc_info, ciphertext, secret=self.secret)
- blob = yield decrypter.decrypt()
- defer.returnValue(blob)
-
-
- @defer.inlineCallbacks
def _upload(self, blob_id, payload_fd):
uri = self.remote + 'put'
yield treq.post(uri, data={'dafile_filename': blob_id},
files={'dafile': payload_fd})
@defer.inlineCallbacks
- def _download(self, blob_id):
- uri = self.remote + 'blobs/' + blob_id
- blob_resp = yield treq.get(uri)
- blob = yield treq.text_content(blob_resp)
+ def _download_and_decrypt(self, blob_id, doc_id, rev):
+ uri = self.remote + blob_id
+ buf = DecrypterBuffer(doc_id, rev, self.secret)
+ data = yield treq.get(uri)
+ yield treq.collect(data, buf.write)
+ blob = yield buf.close()
defer.returnValue(blob)
-
-# --------------------8<----------------------------------------------
-class BlobDoc(object):
-
- # TODO probably not needed, but convenient for testing for now.
-
- def __init__(self, doc_id, rev, content, blob_id=None):
-
- self.doc_id = doc_id
- self.rev = rev
- self.is_blob = True
- self.blob_fd = content
- if blob_id is None:
- blob_id = uuid4().get_hex()
- self.blob_id = blob_id
-# --------------------8<----------------------------------------------
-
-
class SQLiteBlobBackend(object):
def __init__(self, path, key=None):
@@ -144,18 +165,24 @@ class SQLiteBlobBackend(object):
pragmafun = partial(pragmas.set_init_pragmas, opts=opts)
openfun = _sqlcipherInitFactory(pragmafun)
- self.dbpool = dbpool = adbapi.ConnectionPool(
+ self.dbpool = dbpool = _ConnectionPool(
backend, self.path, check_same_thread=False, timeout=5,
cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool')
-
@defer.inlineCallbacks
- def put(self, blob_id, blob_fd):
- insert = str('INSERT INTO blobs VALUES (?, ?)')
- print 'inserting...'
- raw = blob_fd.getvalue()
- yield self.dbpool.runQuery(insert, (blob_id, Binary(raw)))
-
+ def put(self, blob_id, blob_fd, size=None):
+ insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))')
+ yield self.dbpool.runQuery(insert, (blob_id, size))
+ cleartext = blob_fd.read()
+ # FIXME --- I don't totally understand the parameters that are passed
+ # to that call
+ blob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id)
+ # TODO pass blob.write to a FileBodyProducer!!!
+ # should be as simple as:
+ # producer = BodyProducer(fd)
+ # producer.startproducing(blob)
+ blob.write(cleartext)
+
@defer.inlineCallbacks
def get(self, blob_id):
select = 'SELECT payload FROM blobs WHERE blob_id = ?'
@@ -164,7 +191,6 @@ class SQLiteBlobBackend(object):
defer.returnValue(BytesIO(str(result[0][0])))
-
def _init_blob_table(conn):
maybe_create = (
"CREATE TABLE IF NOT EXISTS "
@@ -179,3 +205,21 @@ def _sqlcipherInitFactory(fun):
fun(conn)
_init_blob_table(conn)
return _initialize
+
+
+# --------------------8<----------------------------------------------
+class BlobDoc(object):
+
+ # TODO probably not needed, but convenient for testing for now.
+
+ def __init__(self, doc_id, rev, content, blob_id=None):
+
+ self.doc_id = doc_id
+ self.rev = rev
+ self.is_blob = True
+ self.blob_fd = content
+ if blob_id is None:
+ blob_id = uuid4().get_hex()
+ self.blob_id = blob_id
+# --------------------8<----------------------------------------------
+
diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py
index c57f0921..8fc5154c 100644
--- a/client/src/leap/soledad/client/_crypto.py
+++ b/client/src/leap/soledad/client/_crypto.py
@@ -541,3 +541,14 @@ def _mode_by_method(method):
return modes.GCM
else:
return modes.CTR
+
+
+def _ceiling(size):
+ """
+ Some simplistic ceiling scheme that uses powers of 2.
+ We report everything below 4096 bytes as that minimum threshold.
+ """
+ for i in xrange(12, 31):
+ step = 2**i
+ if size < step:
+ return step