summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/_blobs.py181
1 files changed, 181 insertions, 0 deletions
diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py
new file mode 100644
index 00000000..952f1da7
--- /dev/null
+++ b/client/src/leap/soledad/client/_blobs.py
@@ -0,0 +1,181 @@
+"""
+Clientside BlobBackend Storage.
+"""
+
+from uuid import uuid4
+import os.path
+
+from io import BytesIO
+from functools import partial
+
+from sqlite3 import Binary
+
+from twisted.logger import Logger
+from twisted.enterprise import adbapi
+from twisted.internet import defer, reactor
+
+import treq
+
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client import pragmas
+
+from _crypto import docinfo, BlobEncryptor, BlobDecryptor
+
+
+logger = Logger()
+
+
+class BlobManager(object):
+
+ def __init__(self, local_path, remote, key, secret):
+ self.local = SQLiteBlobBackend(local_path, key)
+ self.remote = remote
+ self.secret = secret
+
+ @defer.inlineCallbacks
+ def put(self, doc):
+ fd = doc.blob_fd
+ yield self.local.put(doc.blob_id, fd)
+ fd.seek(0)
+ up = BytesIO()
+ doc_info = docinfo(doc.doc_id, doc.rev)
+
+ # TODO ------------------------------------------
+ # this is wrong, is doing 2 stages. Cutting corners!
+ # We should connect the pipeline, use Tubes:
+ # the crypto producer can be passed to
+ # the uploader and react as data is written.
+ # ------------------------------------------------
+ yield self._encrypt(doc_info, fd, up)
+ yield self._upload(doc.blob_id, up)
+
+ @defer.inlineCallbacks
+ def get(self, blob_id, doc_id, rev):
+ print "IN MANAGER: GETTING BLOB..."
+ local_blob = yield self.local.get(blob_id)
+ if local_blob:
+ print 'LOCAL BLOB', local_blob
+ defer.returnValue(local_blob)
+
+ print "NO LOCAL BLOB, WILL DOWNLOAD"
+
+ # TODO pass the fd to the downloader, possible?
+ remoteblob = yield self._download(blob_id)
+ ciphertext = BytesIO(str(remoteblob))
+
+ print 'remote ciphertext', remoteblob[:10], '[...]'
+ logger.debug('got remote blob %s [...]' % remoteblob[:100])
+ del remoteblob
+
+ doc_info = docinfo(doc_id, rev)
+ blob = yield self._decrypt(doc_info, ciphertext)
+ if blob:
+ print 'GOT DECRYPTED BLOB', type(blob)
+ blob.seek(0)
+ print 'SAVING BLOB IN LOCAL STORE'
+ yield self.local.put(blob_id, blob)
+ blob.seek(0)
+ defer.returnValue(blob)
+ else:
+ # XXX we shouldn't get here, but we will...
+ # lots of ugly error handling possible:
+ # 1. retry, might be network error
+ # 2. try later, maybe didn't finished streaming
+ # 3.. resignation, might be error while verifying
+ logger.error('sorry, dunno what happened')
+
+
+ @defer.inlineCallbacks
+ def _encrypt(self, doc_info, payload, result):
+ # TODO WE SHOULD SKIP THE BASE64 STEP!!!!
+ # this is going to be uploaded in binary mode
+ crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret)
+ yield crypter.encrypt()
+
+
+ @defer.inlineCallbacks
+ def _decrypt(self, doc_info, ciphertext):
+ decrypter = BlobDecryptor(doc_info, ciphertext, secret=self.secret)
+ blob = yield decrypter.decrypt()
+ defer.returnValue(blob)
+
+
+ @defer.inlineCallbacks
+ def _upload(self, blob_id, payload_fd):
+ uri = self.remote + 'put'
+ yield treq.post(uri, data={'dafile_filename': blob_id},
+ files={'dafile': payload_fd})
+
+ @defer.inlineCallbacks
+ def _download(self, blob_id):
+ uri = self.remote + 'blobs/' + blob_id
+ blob_resp = yield treq.get(uri)
+ blob = yield treq.text_content(blob_resp)
+ defer.returnValue(blob)
+
+
+
+# --------------------8<----------------------------------------------
+class BlobDoc(object):
+
+ # TODO probably not needed, but convenient for testing for now.
+
+ def __init__(self, doc_id, rev, content, blob_id=None):
+
+ self.doc_id = doc_id
+ self.rev = rev
+ self.is_blob = True
+ self.blob_fd = content
+ if blob_id is None:
+ blob_id = uuid4().get_hex()
+ self.blob_id = blob_id
+# --------------------8<----------------------------------------------
+
+
+class SQLiteBlobBackend(object):
+
+ def __init__(self, path, key=None):
+ self.path = os.path.abspath(
+ os.path.join(path, 'soledad_blob.db'))
+ if not key:
+ raise ValueError('key cannot be None')
+ backend = 'pysqlcipher.dbapi2'
+ opts = SQLCipherOptions('/tmp/ignored', key)
+ pragmafun = partial(pragmas.set_init_pragmas, opts=opts)
+ openfun = _sqlcipherInitFactory(pragmafun)
+
+ self.dbpool = dbpool = adbapi.ConnectionPool(
+ backend, self.path, check_same_thread=False, timeout=5,
+ cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool')
+
+
+ @defer.inlineCallbacks
+ def put(self, blob_id, blob_fd):
+ insert = str('INSERT INTO blobs VALUES (?, ?)')
+ print 'inserting...'
+ raw = blob_fd.getvalue()
+ yield self.dbpool.runQuery(insert, (blob_id, Binary(raw)))
+
+ @defer.inlineCallbacks
+ def get(self, blob_id):
+ select = 'SELECT payload FROM blobs WHERE blob_id = ?'
+ result = yield self.dbpool.runQuery(select, (blob_id,))
+ if result:
+ defer.returnValue(BytesIO(str(result[0][0])))
+
+
+
+def _init_blob_table(conn):
+ maybe_create = (
+ "CREATE TABLE IF NOT EXISTS "
+ "blobs ("
+ "blob_id PRIMARY KEY, "
+ "payload BLOB)")
+ conn.execute(maybe_create)
+
+
+def _sqlcipherInitFactory(fun):
+ def _initialize(conn):
+ fun(conn)
+ _init_blob_table(conn)
+ return _initialize