summaryrefslogtreecommitdiff
path: root/testing/tests/client/test_crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/client/test_crypto.py')
-rw-r--r--testing/tests/client/test_crypto.py282
1 files changed, 197 insertions, 85 deletions
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 77252b46..49a61438 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -17,47 +17,173 @@
"""
Tests for cryptographic related stuff.
"""
-import os
-import hashlib
import binascii
+import base64
+import hashlib
+import json
+import os
+
+from io import BytesIO
+
+import pytest
+
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+from cryptography.exceptions import InvalidTag
-from leap.soledad.client import crypto
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
-from leap.soledad.common.crypto import WrongMacError
-from leap.soledad.common.crypto import UnknownMacMethodError
-from leap.soledad.common.crypto import ENC_JSON_KEY
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
-from leap.soledad.common.crypto import MAC_KEY
-from leap.soledad.common.crypto import MAC_METHOD_KEY
+from leap.soledad.client import _crypto
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+
+snowden1 = (
+ "You can't come up against "
+ "the world's most powerful intelligence "
+ "agencies and not accept the risk. "
+ "If they want to get you, over time "
+ "they will.")
+
+
+class AESTest(unittest.TestCase):
+
+ def test_chunked_encryption(self):
+ key = 'A' * 32
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, _buffer=fd)
+ iv = aes.iv
+
+ data = snowden1
+ block = 16
+
+ for i in range(len(data) / block):
+ chunk = data[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ ciphertext_chunked = fd.getvalue()
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ assert ciphertext_chunked == ciphertext
+
+ def test_decrypt(self):
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ data = snowden1
+ block = 16
+
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, iv, fd, tag=tag)
+
+ for i in range(len(ciphertext) / block):
+ chunk = ciphertext[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ cleartext_chunked = fd.getvalue()
+ assert cleartext_chunked == data
+
+
+class BlobTestCase(unittest.TestCase):
+
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ @defer.inlineCallbacks
+ def test_blob_encryptor(self):
+
+ inf = BytesIO(snowden1)
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf,
+ secret='A' * 96)
+ encrypted = yield blob.encrypt()
+ preamble, ciphertext = _crypto._split(encrypted.getvalue())
+ ciphertext = ciphertext[:-16]
-class EncryptedSyncTestCase(BaseSoledadTest):
+ assert len(preamble) == _crypto.PACMAN.size
+ unpacked_data = _crypto.PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ assert magic == _crypto.BLOB_SIGNATURE_MAGIC
+ assert sch == 1
+ assert meth == _crypto.ENC_METHOD.aes_256_gcm
+ assert iv == blob.iv
+ assert doc_id == 'D-deadbeef'
+ assert rev == self.doc_info.rev
- """
- Tests that guarantee that data will always be encrypted when syncing.
- """
+ aes_key = _crypto._get_sym_key_for_doc(
+ self.doc_info.doc_id, 'A' * 96)
+ assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0]
- def test_encrypt_decrypt_json(self):
+ decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext,
+ preamble)
+ assert str(decrypted) == snowden1
+
+ @defer.inlineCallbacks
+ def test_blob_decryptor(self):
+
+ inf = BytesIO(snowden1)
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf,
+ secret='A' * 96)
+ ciphertext = yield blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, ciphertext,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted == snowden1
+
+ @defer.inlineCallbacks
+ def test_encrypt_and_decrypt(self):
"""
- Test encrypting and decrypting documents.
+ Check that encrypting and decrypting gives same doc.
"""
- simpledoc = {'key': 'val'}
- doc1 = SoledadDocument(doc_id='id')
- doc1.content = simpledoc
-
- # encrypt doc
- doc1.set_json(self._soledad._crypto.encrypt_doc(doc1))
- # assert content is different and includes keys
- self.assertNotEqual(
- simpledoc, doc1.content,
- 'incorrect document encryption')
- self.assertTrue(ENC_JSON_KEY in doc1.content)
- self.assertTrue(ENC_SCHEME_KEY in doc1.content)
- # decrypt doc
- doc1.set_json(self._soledad._crypto.decrypt_doc(doc1))
- self.assertEqual(
- simpledoc, doc1.content, 'incorrect document encryption')
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ assert encrypted != payload
+ assert 'raw' in encrypted
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(encrypted)
+ assert _crypto.is_symmetrically_encrypted(encrypted)
+ decrypted = yield crypto.decrypt_doc(doc2)
+ assert len(decrypted) != 0
+ assert json.loads(decrypted) == payload
+
+ @defer.inlineCallbacks
+ def test_decrypt_with_wrong_tag_raises(self):
+ """
+ Trying to decrypt a document with wrong MAC should raise.
+ """
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ encdict = json.loads(encrypted)
+ preamble, raw = _crypto._split(str(encdict['raw']))
+ # mess with tag
+ messed = raw[:-16] + '0' * 16
+
+ preamble = base64.urlsafe_b64encode(preamble)
+ newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed))
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(json.dumps({"raw": str(newraw)}))
+
+ with pytest.raises(_crypto.InvalidBlob):
+ yield crypto.decrypt_doc(doc2)
class RecoveryDocumentTestCase(BaseSoledadTest):
@@ -74,13 +200,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
encrypted_secret = rd[
self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]
self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret)
- self.assertTrue(
- encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256')
+ self.assertEquals(
+ _crypto.ENC_METHOD.aes_256_gcm,
+ encrypted_secret[self._soledad.secrets.CIPHER_KEY])
self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)
self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret)
- def test_import_recovery_document(self):
- rd = self._soledad.secrets._export_recovery_document()
+ def test_import_recovery_document(self, cipher='aes256'):
+ rd = self._soledad.secrets._export_recovery_document(cipher)
s = self._soledad_instance()
s.secrets._import_recovery_document(rd)
s.secrets.set_secret_id(self._soledad.secrets._secret_id)
@@ -89,6 +216,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
'Failed settinng secret for symmetric encryption.')
s.close()
+ def test_import_GCM_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256_GCM
+ self.test_import_recovery_document(cipher)
+
+ def test_import_legacy_CTR_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256
+ self.test_import_recovery_document(cipher)
+
class SoledadSecretsTestCase(BaseSoledadTest):
@@ -146,60 +281,21 @@ class SoledadSecretsTestCase(BaseSoledadTest):
"Should have a secret at this point")
-class MacAuthTestCase(BaseSoledadTest):
-
- def test_decrypt_with_wrong_mac_raises(self):
- """
- Trying to decrypt a document with wrong MAC should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC
- doc.content[MAC_KEY] = '1234567890ABCDEF'
- # try to decrypt doc
- self.assertRaises(
- WrongMacError,
- self._soledad._crypto.decrypt_doc, doc)
-
- def test_decrypt_with_unknown_mac_method_raises(self):
- """
- Trying to decrypt a document with unknown MAC method should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC method
- doc.content[MAC_METHOD_KEY] = 'mymac'
- # try to decrypt doc
- self.assertRaises(
- UnknownMacMethodError,
- self._soledad._crypto.decrypt_doc, doc)
-
-
class SoledadCryptoAESTestCase(BaseSoledadTest):
def test_encrypt_decrypt_sym(self):
# generate 256-bit key
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- plaintext = crypto.decrypt_sym(cyphertext, key, iv)
+ plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
self.assertEqual('data', plaintext)
- def test_decrypt_with_wrong_iv_fails(self):
+ def test_decrypt_with_wrong_iv_raises(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -208,13 +304,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
wrongiv = rawiv
while wrongiv == rawiv:
wrongiv = os.urandom(1) + rawiv[1:]
- plaintext = crypto.decrypt_sym(
- cyphertext, key, iv=binascii.b2a_base64(wrongiv))
- self.assertNotEqual('data', plaintext)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(
+ cyphertext, key, iv=binascii.b2a_base64(wrongiv))
- def test_decrypt_with_wrong_key_fails(self):
+ def test_decrypt_with_wrong_key_raises(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -222,5 +318,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
# ensure keys are different in case we are extremely lucky
while wrongkey == key:
wrongkey = os.urandom(32)
- plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv)
- self.assertNotEqual('data', plaintext)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(cyphertext, wrongkey, iv)
+
+
+def _aes_encrypt(key, iv, data):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)
+ encryptor = cipher.encryptor()
+ return encryptor.update(data) + encryptor.finalize(), encryptor.tag
+
+
+def _aes_decrypt(key, iv, tag, data, aead=''):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
+ decryptor = cipher.decryptor()
+ if aead:
+ decryptor.authenticate_additional_data(aead)
+ return decryptor.update(data) + decryptor.finalize()