From e8fbdbb58693e0901032b47d9b2fa4a8ace1d8af Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 17 Feb 2017 15:50:53 +0100 Subject: [refactor] refactor crypto api to better allow streaming Motivation is that I need to pass partial data to the decryptor, mainly. --- client/src/leap/soledad/client/_crypto.py | 190 ++++++++++++++++++++++++++---- 1 file changed, 166 insertions(+), 24 deletions(-) (limited to 'client/src/leap') diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index f9a20285..b9f9030f 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -16,9 +16,41 @@ # along with this program. If not, see . """ -Cryptographic operations for the soledad client +Cryptographic operations for the soledad client. + +This module implements streaming crypto operations. +It replaces the old client.crypto module, that will be deprecated in soledad +0.12. + +The algorithm for encryptig and decrypting is as follow: + +The KEY is a 32 bytes value. +The PREAMBLE is a packed_structure with encryption metadata. +The SEPARATOR is a space. + +Encryption +---------- + +ciphertext = b64_encode(packed_preamble) + + SEPARATOR + + b64(AES_GCM(ciphertext) + tag) + + +Decryption +---------- + +PREAMBLE + SEPARATOR + PAYLOAD + +Ciphertext and Tag CAN be encoded in b64 (armor=True) or raw (False) + +check_preamble(b64_decode(ciphertext.split(SEPARATOR)[0]) + +PAYLOAD = ciphertext + tag + +decrypt(PAYLOAD) """ + import binascii import base64 import hashlib @@ -47,6 +79,7 @@ from zope.interface import implementer SECRET_LENGTH = 64 +SEPARATOR = ' ' CRYPTO_BACKEND = MultiBackend([OpenSSLBackend()]) @@ -183,12 +216,14 @@ class BlobEncryptor(object): Both the production input and output are file descriptors, so they can be applied to a stream of data. """ - def __init__(self, doc_info, content_fd, secret=None): + def __init__(self, doc_info, content_fd, secret=None, armor=True): if not secret: raise EncryptionDecryptionError('no secret given') self.doc_id = doc_info.doc_id self.rev = doc_info.rev + self.armor = armor + self._content_fd = content_fd content_fd.seek(0, os.SEEK_END) self._content_size = content_fd.tell() @@ -205,6 +240,7 @@ class BlobEncryptor(object): @property def tag(self): + print "TAG?", binascii.b2a_hex(self._aes.tag) return self._aes.tag def encrypt(self): @@ -222,7 +258,7 @@ class BlobEncryptor(object): def _encode_preamble(self): current_time = int(time.time()) - return PACMAN.pack( + preamble = PACMAN.pack( BLOB_SIGNATURE_MAGIC, ENC_SCHEME.symkey, ENC_METHOD.aes_256_gcm, @@ -231,18 +267,74 @@ class BlobEncryptor(object): str(self.doc_id), str(self.rev), self._content_size) + return preamble def _end_crypto_stream(self): + + # TODO ---- this needs to be refactored to allow PROPER streaming + # We should write the preamble as soon as possible, + # Is it possible to write the AES stream as soon as it is encrypted by + # chunks? + preamble, encrypted = self._aes.end() result = BytesIO() result.write( base64.urlsafe_b64encode(preamble)) - result.write(' ') - result.write( - base64.urlsafe_b64encode(encrypted + self.tag)) + result.write(SEPARATOR) + + if self.armor: + result.write( + base64.urlsafe_b64encode(encrypted + self.tag)) + else: + result.write(encrypted) + result.write(self.tag) + + result.seek(0) return defer.succeed(result) +class CryptoStreamBodyProducer(FileBodyProducer): + + """ + A BodyProducer that gets the tag from the last 16 bytes before closing the + fd. + """ + _tag = None + + @property + def tag(self): + # XXX this is a bit tricky. If you call this + # before the end of the stream, you will ruin everything + if not self._tag: + self._writeTag() + return self._tag + + def _writeTag(self): + fd = self._inputFile + fd.seek(-16, os.SEEK_END) + self._tag = fd.read(16) + fd.seek(0) + + def stopProducing(self): + self._writeTag() + self._inputFile.close() + self._task.stop() + + def _writeloop(self, consumer): + """ + Return an iterator which reads one chunk of bytes from the input file + and writes them to the consumer for each time it is iterated. + """ + while True: + bytes = self._inputFile.read(self._readSize) + if not bytes: + self._writeTag() + self._inputFile.close() + break + consumer.write(base64.urlsafe_b64decode(bytes)) + yield None + + class BlobDecryptor(object): """ Decrypts an encrypted blob associated with a given Document. @@ -250,32 +342,47 @@ class BlobDecryptor(object): Will raise an exception if the blob doesn't have the expected structure, or if the GCM tag doesn't verify. """ + # TODO enable the ascii armor = False def __init__(self, doc_info, ciphertext_fd, result=None, - secret=None): + secret=None, armor=True, start_stream=True): if not secret: raise EncryptionDecryptionError('no secret given') + if armor is False: + raise NotImplementedError self.doc_id = doc_info.doc_id self.rev = doc_info.rev + self.fd = ciphertext_fd + self.armor = armor + self._producer = None - ciphertext_fd, preamble, iv = self._consume_preamble(ciphertext_fd) + preamble, iv = self._consume_preamble() + assert preamble + assert iv self.result = result or BytesIO() sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) - self._aes = AESWriter(sym_key, iv, self.result, tag=self.tag) + + self._aes = AESWriter(sym_key, iv, self.result, tag=None) self._aes.authenticate(preamble) + if start_stream: + self._start_stream() + + def _start_stream(self): + self._producer = CryptoStreamBodyProducer(self.fd, readSize=2**16) + self._producer.armor = self.armor - self._producer = FileBodyProducer(ciphertext_fd, readSize=2**16) + def _consume_preamble(self): - def _consume_preamble(self, ciphertext_fd): - ciphertext_fd.seek(0) + self.fd.seek(0) try: - preamble, ciphertext = _split(ciphertext_fd.getvalue()) - self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16] - except (TypeError, binascii.Error): + parts = self.fd.getvalue().split() + encoded_preamble = parts[0] + preamble = base64.urlsafe_b64decode(encoded_preamble) + + except (TypeError, ValueError) as exc: raise InvalidBlob - ciphertext_fd.close() try: if len(preamble) == LEGACY_PACMAN.size: @@ -295,7 +402,9 @@ class BlobDecryptor(object): if magic != BLOB_SIGNATURE_MAGIC: raise InvalidBlob - # TODO check timestamp + # TODO check timestamp. Just as a sanity check, but for instance + # we can refuse to process something that is in the future or + # too far in the past (1984 would be nice, hehe) if sch != ENC_SCHEME.symkey: raise InvalidBlob('invalid scheme') if meth != ENC_METHOD.aes_256_gcm: @@ -304,13 +413,25 @@ class BlobDecryptor(object): raise InvalidBlob('invalid revision') if doc_id != self.doc_id: raise InvalidBlob('invalid revision') - return BytesIO(ciphertext), preamble, iv + + self.fd.seek(0) + tail = ''.join(parts[1:]) + self.fd.write(tail) + self.fd.seek(len(tail)) + self.fd.truncate() + self.fd.seek(0) + return preamble, iv def _end_stream(self): try: - return self._aes.end()[1] + self._aes.end()[1] except InvalidTag: raise InvalidBlob('Invalid Tag. Blob authentication failed.') + fd = self.result + fd.seek(-16, os.SEEK_END) + fd.truncate() + fd.seek(0) + return self.result def decrypt(self): """ @@ -320,21 +441,46 @@ class BlobDecryptor(object): callback will be invoked with the resulting ciphertext. :rtype: twisted.internet.defer.Deferred """ - d = self._producer.startProducing(self._aes) + d = self.startProducing() d.addCallback(lambda _: self._end_stream()) return d + def startProducing(self): + if not self._producer: + self._start_stream() + return self._producer.startProducing(self._aes) + + def endStream(self): + self._end_stream() + + def write(self, data): + self._aes.write(data) + + def close(self): + result = self._aes.end() + return result + @implementer(interfaces.IConsumer) class AESWriter(object): """ A Twisted's Consumer implementation that takes an input file descriptor and applies AES-256 cipher in GCM mode. + + It is used both for encryption and decryption of a stream, depending of the + value of the tag parameter. If you pass a tag, it will operate in + decryption mode, authenticating the preamble. If no tag is passed, + encryption mode is assumed. """ def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM): if len(key) != 32: raise EncryptionDecryptionError('key is not 256 bits') + + if tag is not None: + # if tag, we're decrypting + assert iv is not None + self.iv = iv or os.urandom(16) self.buffer = _buffer or BytesIO() cipher = _get_aes_cipher(key, self.iv, tag, mode) @@ -388,10 +534,6 @@ def _get_aes_cipher(key, iv, tag, mode=modes.GCM): return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND) -def _split(base64_raw_payload): - return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload)) - - def _mode_by_method(method): if method == ENC_METHOD.aes_256_gcm: return modes.GCM -- cgit v1.2.3