diff options
| -rw-r--r-- | client/src/leap/soledad/client/_blobs.py | 158 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/_crypto.py | 11 | 
2 files changed, 112 insertions, 57 deletions
| diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py index 952f1da7..426de56c 100644 --- a/client/src/leap/soledad/client/_blobs.py +++ b/client/src/leap/soledad/client/_blobs.py @@ -19,12 +19,64 @@ import treq  from leap.soledad.client.sqlcipher import SQLCipherOptions  from leap.soledad.client import pragmas -from _crypto import docinfo, BlobEncryptor, BlobDecryptor +from _crypto import DocInfo, BlobEncryptor, BlobDecryptor  logger = Logger() +class _ConnectionPool(adbapi.ConnectionPool): + +    def blob(self, table, column, key, value): +        conn = self.connectionFactory(self) +        # XXX FIXME what are these values??? +        # shouldn't I pass the doc_id key?? Why is it asking for an integer??? +        blob = conn.blob(table, column, 1, 1) +        print "GOT BLOB", blob +        return blob + + +""" +Ideally, the decrypting flow goes like this: + +- GET a blob from remote server. +- Decrypt the preamble +- Allocate a zeroblob in the sqlcipher sink +- Mark the blob as unusable (ie, not verified) +- Decrypt the payload incrementally, and write chunks to sqlcipher +- Finalize the AES decryption +- If preamble + payload verifies correctly, mark the blob as usable + +""" + + +class DecrypterBuffer(object): + +    def __init__(self, doc_id, rev, secret): +        self.decrypter = None +        self.buffer = BytesIO() +        self.doc_info = DocInfo(doc_id, rev) +        self.secret = secret +        self.d = None + +    def write(self, data): +        if not self.decrypter: +            self.buffer.write(data) +            self.decrypter = BlobDecryptor( +                self.doc_info, self.buffer, +                secret=self.secret, +                armor=True, +                start_stream=False) +            self.d = self.decrypter.decrypt() +        else: +            self.decrypter.write(data) + +    def close(self): +        if self.d: +            self.d.addCallback(lambda result: (result, self.decrypter.size)) +        return self.d + +  class BlobManager(object):      def __init__(self, local_path, remote, key, secret): @@ -38,11 +90,10 @@ class BlobManager(object):          yield self.local.put(doc.blob_id, fd)          fd.seek(0)          up = BytesIO() -        doc_info = docinfo(doc.doc_id, doc.rev) +        doc_info = DocInfo(doc.doc_id, doc.rev)          # TODO ------------------------------------------ -        # this is wrong, is doing 2 stages. Cutting corners! -        # We should connect the pipeline, use Tubes:  +        # this is wrong, is doing 2 stages.          # the crypto producer can be passed to           # the uploader and react as data is written.          # ------------------------------------------------ @@ -51,29 +102,23 @@ class BlobManager(object):      @defer.inlineCallbacks      def get(self, blob_id, doc_id, rev): -        print "IN MANAGER: GETTING BLOB..."          local_blob = yield self.local.get(blob_id)          if local_blob: -            print 'LOCAL BLOB', local_blob +            print 'LOCAL BLOB', local_blob.getvalue()              defer.returnValue(local_blob)          print "NO LOCAL BLOB, WILL DOWNLOAD" -        # TODO pass the fd to the downloader, possible? -        remoteblob = yield self._download(blob_id) -        ciphertext = BytesIO(str(remoteblob)) - -        print 'remote ciphertext', remoteblob[:10], '[...]' -        logger.debug('got remote blob %s [...]' % remoteblob[:100]) -        del remoteblob +        blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev) +        print "Downloading", blob_id +        print "BLOB", blob.getvalue(), "SIZE", size -        doc_info = docinfo(doc_id, rev) -        blob = yield self._decrypt(doc_info, ciphertext) +        doc_info = DocInfo(doc_id, rev)          if blob:              print 'GOT DECRYPTED BLOB', type(blob) -            blob.seek(0)              print 'SAVING BLOB IN LOCAL STORE' -            yield self.local.put(blob_id, blob) +            blob.seek(0) +            yield self.local.put(blob_id, blob, size=size)              blob.seek(0)              defer.returnValue(blob)          else: @@ -87,51 +132,27 @@ class BlobManager(object):      @defer.inlineCallbacks      def _encrypt(self, doc_info, payload, result): -        # TODO WE SHOULD SKIP THE BASE64 STEP!!!! -        # this is going to be uploaded in binary mode +        # TODO pass armor=False when the decrypter is bug-free          crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret)          yield crypter.encrypt()      @defer.inlineCallbacks -    def _decrypt(self, doc_info, ciphertext): -        decrypter = BlobDecryptor(doc_info, ciphertext, secret=self.secret) -        blob = yield decrypter.decrypt() -        defer.returnValue(blob) - - -    @defer.inlineCallbacks      def _upload(self, blob_id, payload_fd):          uri = self.remote + 'put'          yield treq.post(uri, data={'dafile_filename': blob_id},                          files={'dafile': payload_fd})      @defer.inlineCallbacks -    def _download(self, blob_id): -        uri = self.remote + 'blobs/' + blob_id -        blob_resp = yield treq.get(uri) -        blob = yield treq.text_content(blob_resp) +    def _download_and_decrypt(self, blob_id, doc_id, rev): +        uri = self.remote + blob_id +        buf = DecrypterBuffer(doc_id, rev, self.secret) +        data = yield treq.get(uri) +        yield treq.collect(data, buf.write) +        blob = yield buf.close()          defer.returnValue(blob) - -# --------------------8<---------------------------------------------- -class BlobDoc(object): - -    # TODO probably not needed, but convenient for testing for now. - -    def __init__(self, doc_id, rev, content, blob_id=None): - -        self.doc_id = doc_id -        self.rev = rev -        self.is_blob = True -        self.blob_fd = content -        if blob_id is None: -            blob_id = uuid4().get_hex() -        self.blob_id = blob_id -# --------------------8<---------------------------------------------- - -  class SQLiteBlobBackend(object):      def __init__(self, path, key=None): @@ -144,18 +165,24 @@ class SQLiteBlobBackend(object):          pragmafun = partial(pragmas.set_init_pragmas, opts=opts)          openfun = _sqlcipherInitFactory(pragmafun) -        self.dbpool = dbpool = adbapi.ConnectionPool( +        self.dbpool = dbpool = _ConnectionPool(              backend, self.path, check_same_thread=False, timeout=5,              cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool') -      @defer.inlineCallbacks -    def put(self, blob_id, blob_fd): -        insert = str('INSERT INTO blobs VALUES (?, ?)') -        print 'inserting...' -        raw = blob_fd.getvalue() -        yield self.dbpool.runQuery(insert, (blob_id, Binary(raw))) -         +    def put(self, blob_id, blob_fd, size=None): +        insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))') +        yield self.dbpool.runQuery(insert, (blob_id, size)) +        cleartext = blob_fd.read() +        # FIXME --- I don't totally understand the parameters that are passed +        # to that call +        blob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id) +        # TODO pass blob.write to a FileBodyProducer!!! +        # should be as simple as: +        # producer = BodyProducer(fd) +        # producer.startproducing(blob) +        blob.write(cleartext) +      @defer.inlineCallbacks      def get(self, blob_id):          select = 'SELECT payload FROM blobs WHERE blob_id = ?' @@ -164,7 +191,6 @@ class SQLiteBlobBackend(object):              defer.returnValue(BytesIO(str(result[0][0]))) -  def _init_blob_table(conn):      maybe_create = (          "CREATE TABLE IF NOT EXISTS " @@ -179,3 +205,21 @@ def _sqlcipherInitFactory(fun):          fun(conn)          _init_blob_table(conn)      return _initialize + + +# --------------------8<---------------------------------------------- +class BlobDoc(object): + +    # TODO probably not needed, but convenient for testing for now. + +    def __init__(self, doc_id, rev, content, blob_id=None): + +        self.doc_id = doc_id +        self.rev = rev +        self.is_blob = True +        self.blob_fd = content +        if blob_id is None: +            blob_id = uuid4().get_hex() +        self.blob_id = blob_id +# --------------------8<---------------------------------------------- + diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index c57f0921..8fc5154c 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -541,3 +541,14 @@ def _mode_by_method(method):          return modes.GCM      else:          return modes.CTR + + +def _ceiling(size): +    """ +    Some simplistic ceiling scheme that uses powers of 2. +    We report everything below 4096 bytes as that minimum threshold. +    """ +    for i in xrange(12, 31): +        step = 2**i +        if size < step: +            return step | 
