diff options
| -rw-r--r-- | client/src/leap/soledad/client/_blobs.py | 189 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/_crypto.py | 42 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/_blobs.py | 1 | 
3 files changed, 145 insertions, 87 deletions
| diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py index 4581af60..f41f3cf4 100644 --- a/client/src/leap/soledad/client/_blobs.py +++ b/client/src/leap/soledad/client/_blobs.py @@ -2,6 +2,7 @@  Clientside BlobBackend Storage.  """ +from copy import copy  from uuid import uuid4  import os.path @@ -13,6 +14,7 @@ from sqlite3 import Binary  from twisted.logger import Logger  from twisted.enterprise import adbapi  from twisted.internet import defer, reactor +from twisted.web.client import FileBodyProducer  import treq @@ -36,22 +38,6 @@ class _ConnectionPool(adbapi.ConnectionPool):          return blob -""" -Ideally, the decrypting flow goes like this: - -- GET a blob from remote server. -- Decrypt the preamble -- Allocate a zeroblob in the sqlcipher sink -- Mark the blob as unusable (ie, not verified) -- Decrypt the payload incrementally, and write chunks to sqlcipher -  ** Is it possible to use a small buffer for the aes writer w/o  -  ** allocating all the memory in openssl? -- Finalize the AES decryption -- If preamble + payload verifies correctly, mark the blob as usable - -""" - -  class DecrypterBuffer(object):      def __init__(self, doc_id, rev, secret): @@ -80,42 +66,46 @@ class DecrypterBuffer(object):  class BlobManager(object): - -    def __init__(self, local_path, remote, key, secret): +    """ +    Ideally, the decrypting flow goes like this: + +    - GET a blob from remote server. +    - Decrypt the preamble +    - Allocate a zeroblob in the sqlcipher sink +    - Mark the blob as unusable (ie, not verified) +    - Decrypt the payload incrementally, and write chunks to sqlcipher +      ** Is it possible to use a small buffer for the aes writer w/o +      ** allocating all the memory in openssl? +    - Finalize the AES decryption +    - If preamble + payload verifies correctly, mark the blob as usable + +    """ + +    def __init__(self, local_path, remote, key, secret, user):          self.local = SQLiteBlobBackend(local_path, key)          self.remote = remote          self.secret = secret +        self.user = user      @defer.inlineCallbacks      def put(self, doc):          fd = doc.blob_fd +        # TODO this is a tee really, but ok... could do db and upload +        # concurrently. not sure if we'd gain something.          yield self.local.put(doc.blob_id, fd)          fd.seek(0) -        up = BytesIO() -        doc_info = DocInfo(doc.doc_id, doc.rev) - -        # TODO ------------------------------------------ -        # this is wrong, is doing 2 stages. -        # the crypto producer can be passed to  -        # the uploader and react as data is written. -        # ------------------------------------------------ -        yield self._encrypt(doc_info, fd, up) -        yield self._upload(doc.blob_id, up) +        yield self._encrypt_and_upload(doc.blob_id,  fd, up)      @defer.inlineCallbacks      def get(self, blob_id, doc_id, rev):          local_blob = yield self.local.get(blob_id)          if local_blob: -            print 'LOCAL BLOB', local_blob.getvalue() +            print "GOT LOCAL BLOB", local_blob              defer.returnValue(local_blob) -        print "NO LOCAL BLOB, WILL DOWNLOAD" -          blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev) -        print "Downloading", blob_id -        print "BLOB", blob.getvalue(), "SIZE", size +        print "DOWNLOADED BLOB, SIZE:", size -        doc_info = DocInfo(doc_id, rev)          if blob:              print 'GOT DECRYPTED BLOB', type(blob)              print 'SAVING BLOB IN LOCAL STORE' @@ -124,30 +114,33 @@ class BlobManager(object):              blob.seek(0)              defer.returnValue(blob)          else: -        # XXX we shouldn't get here, but we will... -        # lots of ugly error handling possible: -        # 1. retry, might be network error -        # 2. try later, maybe didn't finished streaming -        # 3.. resignation, might be error while verifying +            # XXX we shouldn't get here, but we will... +            # lots of ugly error handling possible: +            # 1. retry, might be network error +            # 2. try later, maybe didn't finished streaming +            # 3.. resignation, might be error while verifying              logger.error('sorry, dunno what happened') - -    @defer.inlineCallbacks -    def _encrypt(self, doc_info, payload, result): -        # TODO pass armor=False when the decrypter is bug-free -        crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret) -        yield crypter.encrypt() - -      @defer.inlineCallbacks -    def _upload(self, blob_id, payload_fd): -        uri = self.remote + 'put' -        yield treq.post(uri, data={'dafile_filename': blob_id}, -                        files={'dafile': payload_fd}) +    def _encrypt_and_upload(self, blob_id, doc_id, rev, payload): +        # TODO ------------------------------------------ +        # this is wrong, is doing 2 stages. +        # the crypto producer can be passed to  +        # the uploader and react as data is written. +        # try to rewrite as a tube: pass the fd to aes and let aes writer +        # produce data to the treq request fd. +        # ------------------------------------------------ +        doc_info = DocInfo(doc_id, rev) +        uri = self.remote + '/' + self.user + '/' + blob_id +        crypter = BlobEncryptor(doc_info, payload, secret=self.secret, +                                armor=True) +        result = yield crypter.encrypt() +        yield treq.put(uri, data=result)      @defer.inlineCallbacks      def _download_and_decrypt(self, blob_id, doc_id, rev): -        uri = self.remote + blob_id +        # TODO this needs to be connected in a tube +        uri = self.remote + self.user + '/' + blob_id          buf = DecrypterBuffer(doc_id, rev, self.secret)          data = yield treq.get(uri)          yield treq.collect(data, buf.write) @@ -167,7 +160,7 @@ class SQLiteBlobBackend(object):          pragmafun = partial(pragmas.set_init_pragmas, opts=opts)          openfun = _sqlcipherInitFactory(pragmafun) -        self.dbpool = dbpool = _ConnectionPool( +        self.dbpool = _ConnectionPool(              backend, self.path, check_same_thread=False, timeout=5,              cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool') @@ -175,18 +168,21 @@ class SQLiteBlobBackend(object):      def put(self, blob_id, blob_fd, size=None):          insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))')          yield self.dbpool.runQuery(insert, (blob_id, size)) -        cleartext = blob_fd.read() -        # FIXME --- I don't totally understand the parameters that are passed -        # to that call -        blob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id) -        # TODO pass blob.write to a FileBodyProducer!!! -        # should be as simple as: -        # producer = BodyProducer(fd) -        # producer.startproducing(blob) -        blob.write(cleartext) +        dbblob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id) +        blob_fd.seek(0) +        # XXX I have to copy the buffer here so that I'm able to +        # return a non-closed file to the caller (blobmanager.get) +        # FIXME should remove this duplication! +        # have a look at how treq does cope with closing the handle +        # for uploading a file +        producer = FileBodyProducer(copy(blob_fd)) +        done = yield producer.startProducing(dbblob) +        defer.returnValue(done)      @defer.inlineCallbacks      def get(self, blob_id): +        # TODO we can also stream the blob value using sqlite +        # incremental interface for blobs - and just return the raw fd instead          select = 'SELECT payload FROM blobs WHERE blob_id = ?'          result = yield self.dbpool.runQuery(select, (blob_id,))          if result: @@ -208,20 +204,63 @@ def _sqlcipherInitFactory(fun):          _init_blob_table(conn)      return _initialize -  # --------------------8<---------------------------------------------- -class BlobDoc(object): - +#class BlobDoc(object): +#      # TODO probably not needed, but convenient for testing for now. - -    def __init__(self, doc_id, rev, content, blob_id=None): - -        self.doc_id = doc_id -        self.rev = rev -        self.is_blob = True -        self.blob_fd = content -        if blob_id is None: -            blob_id = uuid4().get_hex() -        self.blob_id = blob_id +# +    #def __init__(self, doc_id, rev, content, blob_id=None): +# +        #self.doc_id = doc_id +        #self.rev = rev +        #self.is_blob = True +        #self.blob_fd = content +        #if blob_id is None: +            #blob_id = uuid4().get_hex() +        #self.blob_id = blob_id  # --------------------8<---------------------------------------------- +@defer.inlineCallbacks +def testit(reactor): + +    # TODO convert this into proper unittests + +    import sys +    try: +        cmd = sys.argv[1] +    except: +        cmd = '' + +    if cmd == 'upload': +        src = sys.argv[2] +        blob_id = sys.argv[3] + +        doc_info = DocInfo('mydoc', '1') +        print "DOC INFO", doc_info + +        # I don't use BlobManager here because I need to avoid +        # putting the blob on local db on upload +        crypter = BlobEncryptor( +            doc_info, open(src, 'r'), 'A' * 32, armor=True) +        print "UPLOADING WITH ENCRYPTOR" +        result = yield crypter.encrypt() +        yield treq.put('http://localhost:9000/user/' + blob_id, data=result) + +    elif cmd == 'download': +        blob_id = sys.argv[2] +        manager = BlobManager( +            '/tmp/blobs', 'http://localhost:9000/', +            'A' * 32, 'secret', 'user') +        result = yield manager.get(blob_id, 'mydoc', '1') +        print result.getvalue() + +    else: +        print "Usage:" +        print "cd server/src/leap/soledad/server/ && python _blobs.py" +        print "python _blobs.py upload /path/to/file blob_id" +        print "python _blobs.py download blob_id" + + +if __name__ == '__main__': +    from twisted.internet.task import react +    react(testit) diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index f72571c2..3eef2f65 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -206,6 +206,8 @@ def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm):      return plaintext +# TODO maybe rename this to Encryptor, since it will be used by blobs an non +# blobs in soledad.  class BlobEncryptor(object):      """      Produces encrypted data from the cleartext data associated with a given @@ -218,7 +220,15 @@ class BlobEncryptor(object):      Both the production input and output are file descriptors, so they can be      applied to a stream of data.      """ -    def __init__(self, doc_info, content_fd, secret=None, armor=True): +    # TODO +    # This class needs further work to allow for proper streaming. +    # RIght now we HAVE TO WAIT until the end of the stream before encoding the +    # result. It should be possible to do that just encoding the chunks and +    # passing them to a sink, but for that we have to encode the chunks at +    # proper alignment (3 byes?) with b64 if armor is defined. + +    def __init__(self, doc_info, content_fd, secret=None, armor=True, +                 sink=None):          if not secret:              raise EncryptionDecryptionError('no secret given') @@ -227,22 +237,25 @@ class BlobEncryptor(object):          self.armor = armor          self._content_fd = content_fd -        content_fd.seek(0, os.SEEK_END) -        self._content_size = _ceiling(content_fd.tell()) -        content_fd.seek(0) +        self._content_size = self._get_size(content_fd)          self._producer = FileBodyProducer(content_fd, readSize=2**16)          self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) -        self._aes = AESWriter(self.sym_key) +        self._aes = AESWriter(self.sym_key, _buffer=sink)          self._aes.authenticate(self._encode_preamble()) +    def _get_size(self, fd): +        fd.seek(0, os.SEEK_END) +        size = _ceiling(fd.tell()) +        fd.seek(0) +        return size +      @property      def iv(self):          return self._aes.iv      @property      def tag(self): -        print "TAG?", binascii.b2a_hex(self._aes.tag)          return self._aes.tag      def encrypt(self): @@ -250,11 +263,12 @@ class BlobEncryptor(object):          Starts producing encrypted data from the cleartext data.          :return: A deferred which will be fired when encryption ends and whose -            callback will be invoked with the resulting ciphertext. +                 callback will be invoked with the resulting ciphertext.          :rtype: twisted.internet.defer.Deferred          """ +        # XXX pass a sink to aes?          d = self._producer.startProducing(self._aes) -        d.addCallback(lambda _: self._end_crypto_stream()) +        d.addCallback(lambda _: self._end_crypto_stream_and_encode_result())          return d      def _encode_preamble(self): @@ -271,12 +285,13 @@ class BlobEncryptor(object):              self._content_size)          return preamble -    def _end_crypto_stream(self): +    def _end_crypto_stream_and_encode_result(self):          # TODO ---- this needs to be refactored to allow PROPER streaming          # We should write the preamble as soon as possible,          # Is it possible to write the AES stream as soon as it is encrypted by          # chunks? +        # FIXME also, it needs to be able to encode chunks with base64 if armor          preamble, encrypted = self._aes.end()          result = BytesIO() @@ -337,6 +352,8 @@ class CryptoStreamBodyProducer(FileBodyProducer):              yield None +# TODO maybe rename this to just Decryptor, since it will be used by blobs an non +# blobs in soledad.  class BlobDecryptor(object):      """      Decrypts an encrypted blob associated with a given Document. @@ -416,7 +433,7 @@ class BlobDecryptor(object):          if rev != self.rev:              raise InvalidBlob('invalid revision')          if doc_id != self.doc_id: -            raise InvalidBlob('invalid revision') +            raise InvalidBlob('invalid doc id')          self.fd.seek(0)          tail = ''.join(parts[1:]) @@ -518,7 +535,8 @@ def is_symmetrically_encrypted(content):      :rtype: bool      """ -    return content and content[:13] == '{"raw": "EzcB' +    sym_signature = '{"raw": "EzcB' +    return content and content.startswith(sym_signature)  # utils @@ -552,6 +570,6 @@ def _ceiling(size):      See #8759 for research pending for less simplistic/aggresive strategies.      """      for i in xrange(12, 31): -        step = 2**i +        step = 2 ** i          if size < step:              return step diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py index 03d37059..6ff0de4b 100644 --- a/server/src/leap/soledad/server/_blobs.py +++ b/server/src/leap/soledad/server/_blobs.py @@ -143,6 +143,7 @@ class BlobsResource(resource.Resource):      # under request.      def render_GET(self, request): +        print "GETTING", request.path          user, blob_id = self._split_path(request.path)          return self._handler.read_blob(user, blob_id, request) | 
