summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/_blobs.py189
-rw-r--r--client/src/leap/soledad/client/_crypto.py42
-rw-r--r--server/src/leap/soledad/server/_blobs.py1
3 files changed, 145 insertions, 87 deletions
diff --git a/client/src/leap/soledad/client/_blobs.py b/client/src/leap/soledad/client/_blobs.py
index 4581af60..f41f3cf4 100644
--- a/client/src/leap/soledad/client/_blobs.py
+++ b/client/src/leap/soledad/client/_blobs.py
@@ -2,6 +2,7 @@
Clientside BlobBackend Storage.
"""
+from copy import copy
from uuid import uuid4
import os.path
@@ -13,6 +14,7 @@ from sqlite3 import Binary
from twisted.logger import Logger
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
+from twisted.web.client import FileBodyProducer
import treq
@@ -36,22 +38,6 @@ class _ConnectionPool(adbapi.ConnectionPool):
return blob
-"""
-Ideally, the decrypting flow goes like this:
-
-- GET a blob from remote server.
-- Decrypt the preamble
-- Allocate a zeroblob in the sqlcipher sink
-- Mark the blob as unusable (ie, not verified)
-- Decrypt the payload incrementally, and write chunks to sqlcipher
- ** Is it possible to use a small buffer for the aes writer w/o
- ** allocating all the memory in openssl?
-- Finalize the AES decryption
-- If preamble + payload verifies correctly, mark the blob as usable
-
-"""
-
-
class DecrypterBuffer(object):
def __init__(self, doc_id, rev, secret):
@@ -80,42 +66,46 @@ class DecrypterBuffer(object):
class BlobManager(object):
-
- def __init__(self, local_path, remote, key, secret):
+ """
+ Ideally, the decrypting flow goes like this:
+
+ - GET a blob from remote server.
+ - Decrypt the preamble
+ - Allocate a zeroblob in the sqlcipher sink
+ - Mark the blob as unusable (ie, not verified)
+ - Decrypt the payload incrementally, and write chunks to sqlcipher
+ ** Is it possible to use a small buffer for the aes writer w/o
+ ** allocating all the memory in openssl?
+ - Finalize the AES decryption
+ - If preamble + payload verifies correctly, mark the blob as usable
+
+ """
+
+ def __init__(self, local_path, remote, key, secret, user):
self.local = SQLiteBlobBackend(local_path, key)
self.remote = remote
self.secret = secret
+ self.user = user
@defer.inlineCallbacks
def put(self, doc):
fd = doc.blob_fd
+ # TODO this is a tee really, but ok... could do db and upload
+ # concurrently. not sure if we'd gain something.
yield self.local.put(doc.blob_id, fd)
fd.seek(0)
- up = BytesIO()
- doc_info = DocInfo(doc.doc_id, doc.rev)
-
- # TODO ------------------------------------------
- # this is wrong, is doing 2 stages.
- # the crypto producer can be passed to
- # the uploader and react as data is written.
- # ------------------------------------------------
- yield self._encrypt(doc_info, fd, up)
- yield self._upload(doc.blob_id, up)
+ yield self._encrypt_and_upload(doc.blob_id, fd, up)
@defer.inlineCallbacks
def get(self, blob_id, doc_id, rev):
local_blob = yield self.local.get(blob_id)
if local_blob:
- print 'LOCAL BLOB', local_blob.getvalue()
+ print "GOT LOCAL BLOB", local_blob
defer.returnValue(local_blob)
- print "NO LOCAL BLOB, WILL DOWNLOAD"
-
blob, size = yield self._download_and_decrypt(blob_id, doc_id, rev)
- print "Downloading", blob_id
- print "BLOB", blob.getvalue(), "SIZE", size
+ print "DOWNLOADED BLOB, SIZE:", size
- doc_info = DocInfo(doc_id, rev)
if blob:
print 'GOT DECRYPTED BLOB', type(blob)
print 'SAVING BLOB IN LOCAL STORE'
@@ -124,30 +114,33 @@ class BlobManager(object):
blob.seek(0)
defer.returnValue(blob)
else:
- # XXX we shouldn't get here, but we will...
- # lots of ugly error handling possible:
- # 1. retry, might be network error
- # 2. try later, maybe didn't finished streaming
- # 3.. resignation, might be error while verifying
+ # XXX we shouldn't get here, but we will...
+ # lots of ugly error handling possible:
+ # 1. retry, might be network error
+ # 2. try later, maybe didn't finished streaming
+ # 3.. resignation, might be error while verifying
logger.error('sorry, dunno what happened')
-
- @defer.inlineCallbacks
- def _encrypt(self, doc_info, payload, result):
- # TODO pass armor=False when the decrypter is bug-free
- crypter = BlobEncryptor(doc_info, payload, result=result, secret=self.secret)
- yield crypter.encrypt()
-
-
@defer.inlineCallbacks
- def _upload(self, blob_id, payload_fd):
- uri = self.remote + 'put'
- yield treq.post(uri, data={'dafile_filename': blob_id},
- files={'dafile': payload_fd})
+ def _encrypt_and_upload(self, blob_id, doc_id, rev, payload):
+ # TODO ------------------------------------------
+ # this is wrong, is doing 2 stages.
+ # the crypto producer can be passed to
+ # the uploader and react as data is written.
+ # try to rewrite as a tube: pass the fd to aes and let aes writer
+ # produce data to the treq request fd.
+ # ------------------------------------------------
+ doc_info = DocInfo(doc_id, rev)
+ uri = self.remote + '/' + self.user + '/' + blob_id
+ crypter = BlobEncryptor(doc_info, payload, secret=self.secret,
+ armor=True)
+ result = yield crypter.encrypt()
+ yield treq.put(uri, data=result)
@defer.inlineCallbacks
def _download_and_decrypt(self, blob_id, doc_id, rev):
- uri = self.remote + blob_id
+ # TODO this needs to be connected in a tube
+ uri = self.remote + self.user + '/' + blob_id
buf = DecrypterBuffer(doc_id, rev, self.secret)
data = yield treq.get(uri)
yield treq.collect(data, buf.write)
@@ -167,7 +160,7 @@ class SQLiteBlobBackend(object):
pragmafun = partial(pragmas.set_init_pragmas, opts=opts)
openfun = _sqlcipherInitFactory(pragmafun)
- self.dbpool = dbpool = _ConnectionPool(
+ self.dbpool = _ConnectionPool(
backend, self.path, check_same_thread=False, timeout=5,
cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool')
@@ -175,18 +168,21 @@ class SQLiteBlobBackend(object):
def put(self, blob_id, blob_fd, size=None):
insert = str('INSERT INTO blobs VALUES (?, zeroblob(?))')
yield self.dbpool.runQuery(insert, (blob_id, size))
- cleartext = blob_fd.read()
- # FIXME --- I don't totally understand the parameters that are passed
- # to that call
- blob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id)
- # TODO pass blob.write to a FileBodyProducer!!!
- # should be as simple as:
- # producer = BodyProducer(fd)
- # producer.startproducing(blob)
- blob.write(cleartext)
+ dbblob = self.dbpool.blob('blobs', 'payload', 'blob_id', blob_id)
+ blob_fd.seek(0)
+ # XXX I have to copy the buffer here so that I'm able to
+ # return a non-closed file to the caller (blobmanager.get)
+ # FIXME should remove this duplication!
+ # have a look at how treq does cope with closing the handle
+ # for uploading a file
+ producer = FileBodyProducer(copy(blob_fd))
+ done = yield producer.startProducing(dbblob)
+ defer.returnValue(done)
@defer.inlineCallbacks
def get(self, blob_id):
+ # TODO we can also stream the blob value using sqlite
+ # incremental interface for blobs - and just return the raw fd instead
select = 'SELECT payload FROM blobs WHERE blob_id = ?'
result = yield self.dbpool.runQuery(select, (blob_id,))
if result:
@@ -208,20 +204,63 @@ def _sqlcipherInitFactory(fun):
_init_blob_table(conn)
return _initialize
-
# --------------------8<----------------------------------------------
-class BlobDoc(object):
-
+#class BlobDoc(object):
+#
# TODO probably not needed, but convenient for testing for now.
-
- def __init__(self, doc_id, rev, content, blob_id=None):
-
- self.doc_id = doc_id
- self.rev = rev
- self.is_blob = True
- self.blob_fd = content
- if blob_id is None:
- blob_id = uuid4().get_hex()
- self.blob_id = blob_id
+#
+ #def __init__(self, doc_id, rev, content, blob_id=None):
+#
+ #self.doc_id = doc_id
+ #self.rev = rev
+ #self.is_blob = True
+ #self.blob_fd = content
+ #if blob_id is None:
+ #blob_id = uuid4().get_hex()
+ #self.blob_id = blob_id
# --------------------8<----------------------------------------------
+@defer.inlineCallbacks
+def testit(reactor):
+
+ # TODO convert this into proper unittests
+
+ import sys
+ try:
+ cmd = sys.argv[1]
+ except:
+ cmd = ''
+
+ if cmd == 'upload':
+ src = sys.argv[2]
+ blob_id = sys.argv[3]
+
+ doc_info = DocInfo('mydoc', '1')
+ print "DOC INFO", doc_info
+
+ # I don't use BlobManager here because I need to avoid
+ # putting the blob on local db on upload
+ crypter = BlobEncryptor(
+ doc_info, open(src, 'r'), 'A' * 32, armor=True)
+ print "UPLOADING WITH ENCRYPTOR"
+ result = yield crypter.encrypt()
+ yield treq.put('http://localhost:9000/user/' + blob_id, data=result)
+
+ elif cmd == 'download':
+ blob_id = sys.argv[2]
+ manager = BlobManager(
+ '/tmp/blobs', 'http://localhost:9000/',
+ 'A' * 32, 'secret', 'user')
+ result = yield manager.get(blob_id, 'mydoc', '1')
+ print result.getvalue()
+
+ else:
+ print "Usage:"
+ print "cd server/src/leap/soledad/server/ && python _blobs.py"
+ print "python _blobs.py upload /path/to/file blob_id"
+ print "python _blobs.py download blob_id"
+
+
+if __name__ == '__main__':
+ from twisted.internet.task import react
+ react(testit)
diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py
index f72571c2..3eef2f65 100644
--- a/client/src/leap/soledad/client/_crypto.py
+++ b/client/src/leap/soledad/client/_crypto.py
@@ -206,6 +206,8 @@ def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm):
return plaintext
+# TODO maybe rename this to Encryptor, since it will be used by blobs an non
+# blobs in soledad.
class BlobEncryptor(object):
"""
Produces encrypted data from the cleartext data associated with a given
@@ -218,7 +220,15 @@ class BlobEncryptor(object):
Both the production input and output are file descriptors, so they can be
applied to a stream of data.
"""
- def __init__(self, doc_info, content_fd, secret=None, armor=True):
+ # TODO
+ # This class needs further work to allow for proper streaming.
+ # RIght now we HAVE TO WAIT until the end of the stream before encoding the
+ # result. It should be possible to do that just encoding the chunks and
+ # passing them to a sink, but for that we have to encode the chunks at
+ # proper alignment (3 byes?) with b64 if armor is defined.
+
+ def __init__(self, doc_info, content_fd, secret=None, armor=True,
+ sink=None):
if not secret:
raise EncryptionDecryptionError('no secret given')
@@ -227,22 +237,25 @@ class BlobEncryptor(object):
self.armor = armor
self._content_fd = content_fd
- content_fd.seek(0, os.SEEK_END)
- self._content_size = _ceiling(content_fd.tell())
- content_fd.seek(0)
+ self._content_size = self._get_size(content_fd)
self._producer = FileBodyProducer(content_fd, readSize=2**16)
self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret)
- self._aes = AESWriter(self.sym_key)
+ self._aes = AESWriter(self.sym_key, _buffer=sink)
self._aes.authenticate(self._encode_preamble())
+ def _get_size(self, fd):
+ fd.seek(0, os.SEEK_END)
+ size = _ceiling(fd.tell())
+ fd.seek(0)
+ return size
+
@property
def iv(self):
return self._aes.iv
@property
def tag(self):
- print "TAG?", binascii.b2a_hex(self._aes.tag)
return self._aes.tag
def encrypt(self):
@@ -250,11 +263,12 @@ class BlobEncryptor(object):
Starts producing encrypted data from the cleartext data.
:return: A deferred which will be fired when encryption ends and whose
- callback will be invoked with the resulting ciphertext.
+ callback will be invoked with the resulting ciphertext.
:rtype: twisted.internet.defer.Deferred
"""
+ # XXX pass a sink to aes?
d = self._producer.startProducing(self._aes)
- d.addCallback(lambda _: self._end_crypto_stream())
+ d.addCallback(lambda _: self._end_crypto_stream_and_encode_result())
return d
def _encode_preamble(self):
@@ -271,12 +285,13 @@ class BlobEncryptor(object):
self._content_size)
return preamble
- def _end_crypto_stream(self):
+ def _end_crypto_stream_and_encode_result(self):
# TODO ---- this needs to be refactored to allow PROPER streaming
# We should write the preamble as soon as possible,
# Is it possible to write the AES stream as soon as it is encrypted by
# chunks?
+ # FIXME also, it needs to be able to encode chunks with base64 if armor
preamble, encrypted = self._aes.end()
result = BytesIO()
@@ -337,6 +352,8 @@ class CryptoStreamBodyProducer(FileBodyProducer):
yield None
+# TODO maybe rename this to just Decryptor, since it will be used by blobs an non
+# blobs in soledad.
class BlobDecryptor(object):
"""
Decrypts an encrypted blob associated with a given Document.
@@ -416,7 +433,7 @@ class BlobDecryptor(object):
if rev != self.rev:
raise InvalidBlob('invalid revision')
if doc_id != self.doc_id:
- raise InvalidBlob('invalid revision')
+ raise InvalidBlob('invalid doc id')
self.fd.seek(0)
tail = ''.join(parts[1:])
@@ -518,7 +535,8 @@ def is_symmetrically_encrypted(content):
:rtype: bool
"""
- return content and content[:13] == '{"raw": "EzcB'
+ sym_signature = '{"raw": "EzcB'
+ return content and content.startswith(sym_signature)
# utils
@@ -552,6 +570,6 @@ def _ceiling(size):
See #8759 for research pending for less simplistic/aggresive strategies.
"""
for i in xrange(12, 31):
- step = 2**i
+ step = 2 ** i
if size < step:
return step
diff --git a/server/src/leap/soledad/server/_blobs.py b/server/src/leap/soledad/server/_blobs.py
index 03d37059..6ff0de4b 100644
--- a/server/src/leap/soledad/server/_blobs.py
+++ b/server/src/leap/soledad/server/_blobs.py
@@ -143,6 +143,7 @@ class BlobsResource(resource.Resource):
# under request.
def render_GET(self, request):
+ print "GETTING", request.path
user, blob_id = self._split_path(request.path)
return self._handler.read_blob(user, blob_id, request)