diff options
-rw-r--r-- | client/src/leap/soledad/client/_crypto.py | 30 | ||||
-rw-r--r-- | testing/tests/client/test_crypto.py | 52 |
2 files changed, 49 insertions, 33 deletions
diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index 8dbdc094..923891fc 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -300,8 +300,7 @@ class BlobEncryptor(object): result.write( base64.urlsafe_b64encode(encrypted + self.tag)) else: - result.write(encrypted) - result.write(self.tag) + result.write(encrypted + self.tag) result.seek(0) return defer.succeed(result) @@ -319,28 +318,25 @@ class BlobDecryptor(object): # TODO enable the ascii armor = False def __init__(self, doc_info, ciphertext_fd, result=None, - secret=None, armor=True, start_stream=True): + secret=None, armor=True, start_stream=True, tag=None): if not secret: raise EncryptionDecryptionError('no secret given') - if armor is False: - raise NotImplementedError self.doc_id = doc_info.doc_id self.rev = doc_info.rev self.fd = ciphertext_fd self.armor = armor self._producer = None + self.result = result or BytesIO() + self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) self.size = None - + self.tag = tag preamble, iv = self._consume_preamble() assert preamble assert iv - - self.result = result or BytesIO() - sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) - - self._aes = AESWriter(sym_key, iv, self.result, tag=self.tag) + self._aes = AESWriter(self.sym_key, iv, self.result, tag=self.tag) self._aes.authenticate(preamble) + if start_stream: self._start_stream() @@ -348,14 +344,14 @@ class BlobDecryptor(object): self._producer = FileBodyProducer(self.fd, readSize=2**16) def _consume_preamble(self): - self.fd.seek(0) try: - parts = self.fd.getvalue().split() - encoded_preamble = parts[0] - preamble = base64.urlsafe_b64decode(encoded_preamble) - ciphertext = base64.urlsafe_b64decode(parts[1]) - self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16] + preamble, ciphertext = self.fd.getvalue().split(SEPARATOR, 1) + preamble = base64.urlsafe_b64decode(preamble) + if self.armor: + ciphertext = base64.urlsafe_b64decode(ciphertext) + tag, ciphertext = ciphertext[-16:], ciphertext[:-16] + self.tag = self.tag or tag except (TypeError, ValueError): raise InvalidBlob diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index e268428d..44e1b2a5 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -95,16 +95,30 @@ class BlobTestCase(unittest.TestCase): doc_id = 'D-deadbeef' rev = '397932e0c77f45fcb7c3732930e7e9b2:1' - @defer.inlineCallbacks - def test_blob_encryptor(self): + def setUp(self): + self.inf = BytesIO(snowden1) + self.blob = _crypto.BlobEncryptor( + self.doc_info, self.inf, + armor=True, + secret='A' * 96) - inf = BytesIO(snowden1) + @defer.inlineCallbacks + def test_unarmored_blob_encrypt(self): + self.blob.armor = False + encrypted = yield self.blob.encrypt() + decode = base64.urlsafe_b64decode + with pytest.raises(TypeError): + assert map(decode, encrypted.getvalue().split()) - blob = _crypto.BlobEncryptor( - self.doc_info, inf, - secret='A' * 96) + @defer.inlineCallbacks + def test_default_armored_blob_encrypt(self): + encrypted = yield self.blob.encrypt() + decode = base64.urlsafe_b64decode + assert map(decode, encrypted.getvalue().split()) - encrypted = yield blob.encrypt() + @defer.inlineCallbacks + def test_blob_encryptor(self): + encrypted = yield self.blob.encrypt() preamble, ciphertext = encrypted.getvalue().split() preamble = base64.urlsafe_b64decode(preamble) ciphertext = base64.urlsafe_b64decode(ciphertext) @@ -116,30 +130,36 @@ class BlobTestCase(unittest.TestCase): assert magic == _crypto.BLOB_SIGNATURE_MAGIC assert sch == 1 assert meth == _crypto.ENC_METHOD.aes_256_gcm - assert iv == blob.iv + assert iv == self.blob.iv assert doc_id == 'D-deadbeef' assert rev == self.doc_info.rev aes_key = _crypto._get_sym_key_for_doc( self.doc_info.doc_id, 'A' * 96) - assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0] + assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0] - decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext, - preamble) + decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag, + ciphertext, preamble) assert str(decrypted) == snowden1 @defer.inlineCallbacks def test_blob_decryptor(self): + ciphertext = yield self.blob.encrypt() - inf = BytesIO(snowden1) - - blob = _crypto.BlobEncryptor( - self.doc_info, inf, + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, secret='A' * 96) - ciphertext = yield blob.encrypt() + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_unarmored_blob_decryptor(self): + self.blob.armor = False + ciphertext = yield self.blob.encrypt() decryptor = _crypto.BlobDecryptor( self.doc_info, ciphertext, + armor=False, secret='A' * 96) decrypted = yield decryptor.decrypt() assert decrypted.getvalue() == snowden1 |