From 64071964e02fe730fdd62602ec40c93bc7813f7a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 22 Feb 2017 13:53:08 +0100 Subject: [feature] close mvp loop with client --- client/src/leap/soledad/client/_blobs.py | 189 ++++++++++++++++++------------ client/src/leap/soledad/client/_crypto.py | 42 +++++-- 2 files changed, 144 insertions(+), 87 deletions(-) (limited to 'client/src/leap/soledad') diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py index 4581af60..f41f3cf4 100644 --- a/client/src/leap/soledad/client/_blobs.py +++ b/client/src/leap/soledad/client/_blobs.py @@ -2,6 +2,7 @@ Clientside BlobBackend Storage. """ +from copy import copy from uuid import uuid4 import os.path @@ -13,6 +14,7 @@ from sqlite3 import Binary from twisted.logger import Logger from twisted.enterprise import adbapi from twisted.internet import defer, reactor +from twisted.web.client import FileBodyProducer import treq @@ -36,22 +38,6 @@ class _ConnectionPool(adbapi.ConnectionPool): return blob -""" -Ideally, the decrypting flow goes like this: - -- GET a blob from remote server. -- Decrypt the preamble -- Allocate a zeroblob in the sqlcipher sink -- Mark the blob as unusable (ie, not verified) -- Decrypt the payload incrementally, and write chunks to sqlcipher - ** Is it possible to use a small buffer for the aes writer w/o - ** allocating all the memory in openssl? -- Finalize the AES decryption -- If preamble + payload verifies correctly, mark the blob as usable - -""" - - class DecrypterBuffer(object): def __init__(self, doc_id, rev, secret): @@ -80,42 +66,46 @@ class DecrypterBuffer(object): class BlobManager(object): - - def __init__(self, local_path, remote, key, secret): + """ + Ideally, the decrypting flow goes like this: + + - GET a blob from remote server. + - Decrypt the preamble + - Allocate a zeroblob in the sqlcipher sink + - Mark the blob as unusable (ie, not verified) + - Decrypt the payload incrementally, and write chunks to sqlcipher + ** Is it possible to use a small buffer for the aes writer w/o + ** allocating all the memory in openssl? + - Finalize the AES decryption + - If preamble + payload verifies correctly, mark the blob as usable + + """ + + def __init__(self, local_path, remote, key, secret, user): self.local = SQLiteBlobBackend(local_path, key) self.remote = remote self.secret = secret + self.user = user @defer.inlineCallbacks def put(self, doc): fd = doc.blob_fd + # TODO this is a tee really, but ok... could do db and upload + # concurrently. not sure if we'd gain something. yield self.local.put(doc.blob_id, fd) fd.seek(0) - up = BytesIO() - doc_info = DocInfo(doc.doc_id, doc.rev) - - # TODO ------------------------------------------ - # this is wrong, is doing 2 stages. - # the crypto producer can be passed to - # the uploader and react as data is written. - # ------------------------------------------------ - yield self._encrypt(doc_info, fd, up) - yield self._upload(doc.blob_id, up) + yield self._encrypt_and_upload(doc.blob_id, fd, up) @defer.inlineCallbacks def get(self, blob_id, doc_id, rev): local_blob = yield self.local.get(blob_id) if local_blob: - print 'LOCAL BLOB', local_blob.getvalue() + print "GOT LOCAL BLOB", local_blob defer.returnValue(local_blob) - print "NO LOCAL BLOB, WILL DOWNLOAD" - blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev) - print "Downloading", blob_id - print "BLOB", blob.getvalue(), "SIZE", size + print "DOWNLOADED BLOB, SIZE:", size - doc_info = DocInfo(doc_id, rev) if blob: print 'GOT DECRYPTED BLOB', type(blob) print 'SAVING BLOB IN LOCAL STORE' @@ -124,30 +114,33 @@ class BlobManager(object): blob.seek(0) defer.returnValue(blob) else: - # XXX we shouldn't get here, but we will... - # lots of ugly error handling possible: - # 1. retry, might be network error - # 2. try later, maybe didn't finished streaming - # 3.. resignation, might be error while verifying + # XXX we shouldn't get here, but we will... + # lots of ugly error handling possible: + # 1. retry, might be network error + # 2. try later, maybe didn't finished streaming + # 3.. resignation, might be error while verifying logger.error('sorry, dunno what happened') - - @defer.inlineCallbacks - def _encrypt(self, doc_info, payload, result): - # TODO pass armor=False when the decrypter is bug-free - crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret) - yield crypter.encrypt() - - @defer.inlineCallbacks - def _upload(self, blob_id, payload_fd): - uri = self.remote + 'put' - yield treq.post(uri, data={'dafile_filename': blob_id}, - files={'dafile': payload_fd}) + def _encrypt_and_upload(self, blob_id, doc_id, rev, payload): + # TODO ------------------------------------------ + # this is wrong, is doing 2 stages. + # the crypto producer can be passed to + # the uploader and react as data is written. + # try to rewrite as a tube: pass the fd to aes and let aes writer + # produce data to the treq request fd. + # ------------------------------------------------ + doc_info = DocInfo(doc_id, rev) + uri = self.remote + '/' + self.user + '/' + blob_id + crypter = BlobEncryptor(doc_info, payload, secret=self.secret, + armor=True) + result = yield crypter.encrypt() + yield treq.put(uri, data=result) @defer.inlineCallbacks def _download_and_decrypt(self, blob_id, doc_id, rev): - uri = self.remote + blob_id + # TODO this needs to be connected in a tube + uri = self.remote + self.user + '/' + blob_id buf = DecrypterBuffer(doc_id, rev, self.secret) data = yield treq.get(uri) yield treq.collect(data, buf.write) @@ -167,7 +160,7 @@ class SQLiteBlobBackend(object): pragmafun = partial(pragmas.set_init_pragmas, opts=opts) openfun = _sqlcipherInitFactory(pragmafun) - self.dbpool = dbpool = _ConnectionPool( + self.dbpool = _ConnectionPool( backend, self.path, check_same_thread=False, timeout=5, cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool') @@ -175,18 +168,21 @@ class SQLiteBlobBackend(object): def put(self, blob_id, blob_fd, size=None): insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))') yield self.dbpool.runQuery(insert, (blob_id, size)) - cleartext = blob_fd.read() - # FIXME --- I don't totally understand the parameters that are passed - # to that call - blob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id) - # TODO pass blob.write to a FileBodyProducer!!! - # should be as simple as: - # producer = BodyProducer(fd) - # producer.startproducing(blob) - blob.write(cleartext) + dbblob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id) + blob_fd.seek(0) + # XXX I have to copy the buffer here so that I'm able to + # return a non-closed file to the caller (blobmanager.get) + # FIXME should remove this duplication! + # have a look at how treq does cope with closing the handle + # for uploading a file + producer = FileBodyProducer(copy(blob_fd)) + done = yield producer.startProducing(dbblob) + defer.returnValue(done) @defer.inlineCallbacks def get(self, blob_id): + # TODO we can also stream the blob value using sqlite + # incremental interface for blobs - and just return the raw fd instead select = 'SELECT payload FROM blobs WHERE blob_id = ?' result = yield self.dbpool.runQuery(select, (blob_id,)) if result: @@ -208,20 +204,63 @@ def _sqlcipherInitFactory(fun): _init_blob_table(conn) return _initialize - # --------------------8<---------------------------------------------- -class BlobDoc(object): - +#class BlobDoc(object): +# # TODO probably not needed, but convenient for testing for now. - - def __init__(self, doc_id, rev, content, blob_id=None): - - self.doc_id = doc_id - self.rev = rev - self.is_blob = True - self.blob_fd = content - if blob_id is None: - blob_id = uuid4().get_hex() - self.blob_id = blob_id +# + #def __init__(self, doc_id, rev, content, blob_id=None): +# + #self.doc_id = doc_id + #self.rev = rev + #self.is_blob = True + #self.blob_fd = content + #if blob_id is None: + #blob_id = uuid4().get_hex() + #self.blob_id = blob_id # --------------------8<---------------------------------------------- +@defer.inlineCallbacks +def testit(reactor): + + # TODO convert this into proper unittests + + import sys + try: + cmd = sys.argv[1] + except: + cmd = '' + + if cmd == 'upload': + src = sys.argv[2] + blob_id = sys.argv[3] + + doc_info = DocInfo('mydoc', '1') + print "DOC INFO", doc_info + + # I don't use BlobManager here because I need to avoid + # putting the blob on local db on upload + crypter = BlobEncryptor( + doc_info, open(src, 'r'), 'A' * 32, armor=True) + print "UPLOADING WITH ENCRYPTOR" + result = yield crypter.encrypt() + yield treq.put('http://localhost:9000/user/' + blob_id, data=result) + + elif cmd == 'download': + blob_id = sys.argv[2] + manager = BlobManager( + '/tmp/blobs', 'http://localhost:9000/', + 'A' * 32, 'secret', 'user') + result = yield manager.get(blob_id, 'mydoc', '1') + print result.getvalue() + + else: + print "Usage:" + print "cd server/src/leap/soledad/server/ && python _blobs.py" + print "python _blobs.py upload /path/to/file blob_id" + print "python _blobs.py download blob_id" + + +if __name__ == '__main__': + from twisted.internet.task import react + react(testit) diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index f72571c2..3eef2f65 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -206,6 +206,8 @@ def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm): return plaintext +# TODO maybe rename this to Encryptor, since it will be used by blobs an non +# blobs in soledad. class BlobEncryptor(object): """ Produces encrypted data from the cleartext data associated with a given @@ -218,7 +220,15 @@ class BlobEncryptor(object): Both the production input and output are file descriptors, so they can be applied to a stream of data. """ - def __init__(self, doc_info, content_fd, secret=None, armor=True): + # TODO + # This class needs further work to allow for proper streaming. + # RIght now we HAVE TO WAIT until the end of the stream before encoding the + # result. It should be possible to do that just encoding the chunks and + # passing them to a sink, but for that we have to encode the chunks at + # proper alignment (3 byes?) with b64 if armor is defined. + + def __init__(self, doc_info, content_fd, secret=None, armor=True, + sink=None): if not secret: raise EncryptionDecryptionError('no secret given') @@ -227,22 +237,25 @@ class BlobEncryptor(object): self.armor = armor self._content_fd = content_fd - content_fd.seek(0, os.SEEK_END) - self._content_size = _ceiling(content_fd.tell()) - content_fd.seek(0) + self._content_size = self._get_size(content_fd) self._producer = FileBodyProducer(content_fd, readSize=2**16) self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) - self._aes = AESWriter(self.sym_key) + self._aes = AESWriter(self.sym_key, _buffer=sink) self._aes.authenticate(self._encode_preamble()) + def _get_size(self, fd): + fd.seek(0, os.SEEK_END) + size = _ceiling(fd.tell()) + fd.seek(0) + return size + @property def iv(self): return self._aes.iv @property def tag(self): - print "TAG?", binascii.b2a_hex(self._aes.tag) return self._aes.tag def encrypt(self): @@ -250,11 +263,12 @@ class BlobEncryptor(object): Starts producing encrypted data from the cleartext data. :return: A deferred which will be fired when encryption ends and whose - callback will be invoked with the resulting ciphertext. + callback will be invoked with the resulting ciphertext. :rtype: twisted.internet.defer.Deferred """ + # XXX pass a sink to aes? d = self._producer.startProducing(self._aes) - d.addCallback(lambda _: self._end_crypto_stream()) + d.addCallback(lambda _: self._end_crypto_stream_and_encode_result()) return d def _encode_preamble(self): @@ -271,12 +285,13 @@ class BlobEncryptor(object): self._content_size) return preamble - def _end_crypto_stream(self): + def _end_crypto_stream_and_encode_result(self): # TODO ---- this needs to be refactored to allow PROPER streaming # We should write the preamble as soon as possible, # Is it possible to write the AES stream as soon as it is encrypted by # chunks? + # FIXME also, it needs to be able to encode chunks with base64 if armor preamble, encrypted = self._aes.end() result = BytesIO() @@ -337,6 +352,8 @@ class CryptoStreamBodyProducer(FileBodyProducer): yield None +# TODO maybe rename this to just Decryptor, since it will be used by blobs an non +# blobs in soledad. class BlobDecryptor(object): """ Decrypts an encrypted blob associated with a given Document. @@ -416,7 +433,7 @@ class BlobDecryptor(object): if rev != self.rev: raise InvalidBlob('invalid revision') if doc_id != self.doc_id: - raise InvalidBlob('invalid revision') + raise InvalidBlob('invalid doc id') self.fd.seek(0) tail = ''.join(parts[1:]) @@ -518,7 +535,8 @@ def is_symmetrically_encrypted(content): :rtype: bool """ - return content and content[:13] == '{"raw": "EzcB' + sym_signature = '{"raw": "EzcB' + return content and content.startswith(sym_signature) # utils @@ -552,6 +570,6 @@ def _ceiling(size): See #8759 for research pending for less simplistic/aggresive strategies. """ for i in xrange(12, 31): - step = 2**i + step = 2 ** i if size < step: return step -- cgit v1.2.3