diff options
-rw-r--r-- | client/src/leap/soledad/client/_crypto.py | 45 | ||||
-rw-r--r-- | testing/tests/client/test_crypto.py | 32 |
2 files changed, 29 insertions, 48 deletions
diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index 4a59159c..109cf299 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -146,12 +146,11 @@ def encrypt_sym(data, key): encoded as base64. :rtype: (str, str) """ - iv = os.urandom(16) - encryptor = AESEncryptor(key, iv) + encryptor = AESEncryptor(key) encryptor.write(data) encryptor.end() ciphertext = encryptor.fd.getvalue() - return base64.b64encode(iv), ciphertext + return base64.b64encode(encryptor.iv), ciphertext def decrypt_sym(data, key, iv): @@ -188,13 +187,7 @@ class BlobEncryptor(object): Both the production input and output are file descriptors, so they can be applied to a stream of data. """ - def __init__(self, doc_info, content_fd, result=None, secret=None, - iv=None): - if iv is None: - iv = os.urandom(16) - else: - log.warn('Using a fixed IV. Use only for testing!') - self.iv = iv + def __init__(self, doc_info, content_fd, result=None, secret=None): if not secret: raise EncryptionDecryptionError('no secret given') @@ -206,20 +199,22 @@ class BlobEncryptor(object): self._content_fd = content_fd self._preamble = BytesIO() - if result is None: - result = BytesIO() - self.result = result + self.result = result or BytesIO() sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) mac_key = _get_mac_key_for_doc(doc_info.doc_id, secret) self._aes_fd = BytesIO() - self._aes = AESEncryptor(sym_key, self.iv, self._aes_fd) + self._aes = AESEncryptor(sym_key, self._aes_fd) self._hmac = HMACWriter(mac_key) self._write_preamble() self._crypter = VerifiedEncrypter(self._aes, self._hmac) + @property + def iv(self): + return self._aes.iv + def encrypt(self): """ Starts producing encrypted data from the cleartext data. @@ -298,9 +293,7 @@ class BlobDecryptor(object): self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) self.mac_key = _get_mac_key_for_doc(doc_info.doc_id, secret) - if result is None: - result = BytesIO() - self.result = result + self.result = result or BytesIO() def decrypt(self): try: @@ -360,18 +353,15 @@ class AESEncryptor(object): """ implements(interfaces.IConsumer) - def __init__(self, key, iv, fd=None): + def __init__(self, key, fd=None): if len(key) != 32: raise EncryptionDecryptionError('key is not 256 bits') - if len(iv) != 16: - raise EncryptionDecryptionError('iv is not 128 bits') + self.iv = os.urandom(16) - cipher = _get_aes_ctr_cipher(key, iv) + cipher = _get_aes_ctr_cipher(key, self.iv) self.encryptor = cipher.encryptor() - if fd is None: - fd = BytesIO() - self.fd = fd + self.fd = fd or BytesIO() self.done = False @@ -429,8 +419,7 @@ class AESDecryptor(object): implements(interfaces.IConsumer) def __init__(self, key, iv, fd=None): - if iv is None: - iv = os.urandom(16) + iv = iv or os.urandom(16) if len(key) != 32: raise EncryptionDecryptionError('key is not 256 bits') if len(iv) != 16: @@ -439,9 +428,7 @@ class AESDecryptor(object): cipher = _get_aes_ctr_cipher(key, iv) self.decryptor = cipher.decryptor() - if fd is None: - fd = BytesIO() - self.fd = fd + self.fd = fd or BytesIO() self.done = False self.deferred = defer.Deferred() diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 483c7803..6d896604 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -51,10 +51,10 @@ class AESTest(unittest.TestCase): def test_chunked_encryption(self): key = 'A' * 32 - iv = 'A' * 16 fd = BytesIO() - aes = _crypto.AESEncryptor(key, iv, fd) + aes = _crypto.AESEncryptor(key, fd) + iv = aes.iv data = snowden1 block = 16 @@ -99,14 +99,11 @@ class BlobTestCase(unittest.TestCase): @defer.inlineCallbacks def test_blob_encryptor(self): - inf = BytesIO() - inf.write(snowden1) - inf.seek(0) - outf = BytesIO() + inf = BytesIO(snowden1) blob = _crypto.BlobEncryptor( - self.doc_info, inf, result=outf, - secret='A' * 96, iv='B' * 16) + self.doc_info, inf, + secret='A' * 96) encrypted = yield blob.encrypt() data = base64.urlsafe_b64decode(encrypted.getvalue()) @@ -117,7 +114,7 @@ class BlobTestCase(unittest.TestCase): assert sch == 1 assert meth == 1 iv = data[11:27] - assert iv == 'B' * 16 + assert iv == blob.iv doc_id = data[27:37] assert doc_id == 'D-deadbeef' @@ -127,26 +124,23 @@ class BlobTestCase(unittest.TestCase): ciphertext = data[71:-64] aes_key = _crypto._get_sym_key_for_doc( self.doc_info.doc_id, 'A' * 96) - assert ciphertext == _aes_encrypt(aes_key, 'B' * 16, snowden1) + assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1) - decrypted = _aes_decrypt(aes_key, 'B' * 16, ciphertext) + decrypted = _aes_decrypt(aes_key, blob.iv, ciphertext) assert str(decrypted) == snowden1 @defer.inlineCallbacks def test_blob_decryptor(self): - inf = BytesIO() - inf.write(snowden1) - inf.seek(0) - outf = BytesIO() + inf = BytesIO(snowden1) blob = _crypto.BlobEncryptor( - self.doc_info, inf, result=outf, - secret='A' * 96, iv='B' * 16) - yield blob.encrypt() + self.doc_info, inf, + secret='A' * 96) + ciphertext = yield blob.encrypt() decryptor = _crypto.BlobDecryptor( - self.doc_info, outf, + self.doc_info, ciphertext, secret='A' * 96) decrypted = yield decryptor.decrypt() assert decrypted.getvalue() == snowden1 |