diff options
Diffstat (limited to 'tests/client/test_crypto.py')
-rw-r--r-- | tests/client/test_crypto.py | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/tests/client/test_crypto.py b/tests/client/test_crypto.py new file mode 100644 index 00000000..5b647b73 --- /dev/null +++ b/tests/client/test_crypto.py @@ -0,0 +1,384 @@ +# -*- coding: utf-8 -*- +# test_crypto.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for cryptographic related stuff. +""" +import binascii +import base64 +import json +import os + +from io import BytesIO + +import pytest + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.exceptions import InvalidTag + +from leap.soledad.common.document import SoledadDocument +from test_soledad.util import BaseSoledadTest +from leap.soledad.client import _crypto +from leap.soledad.client import _scrypt +from leap.soledad.common.blobs import preamble as _preamble + +from twisted.trial import unittest +from twisted.internet import defer + + +snowden1 = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + + +class ScryptTest(unittest.TestCase): + + def test_scrypt(self): + secret = 'supersikret' + salt = 'randomsalt' + key = _scrypt.hash(secret, salt, buflen=32) + expected = ('47996b569ea58d51ccbcc318d710' + 'a537acd28bb7a94615ab8d061d4b2a920f01') + assert binascii.b2a_hex(key) == expected + + +class AESTest(unittest.TestCase): + + def test_chunked_encryption(self): + key = 'A' * 32 + + fd = BytesIO() + aes = _crypto.AESWriter(key, _buffer=fd) + iv = aes.iv + + data = snowden1 + block = 16 + + for i in range(len(data) / block): + chunk = data[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + ciphertext_chunked = fd.getvalue() + ciphertext, tag = _aes_encrypt(key, iv, data) + + assert ciphertext_chunked == ciphertext + + def test_decrypt(self): + key = 'A' * 32 + iv = 'A' * 16 + + data = snowden1 + block = 16 + + ciphertext, tag = _aes_encrypt(key, iv, data) + + fd = BytesIO() + aes = _crypto.AESWriter(key, iv, fd, tag=tag) + + for i in range(len(ciphertext) / block): + chunk = ciphertext[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + cleartext_chunked = fd.getvalue() + assert cleartext_chunked == data + + +class BlobTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + def setUp(self): + self.inf = BytesIO(snowden1) + self.blob = _crypto.BlobEncryptor( + self.doc_info, self.inf, + armor=True, + secret='A' * 96) + + @defer.inlineCallbacks + def test_unarmored_blob_encrypt(self): + self.blob.armor = False + encrypted = yield self.blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, encrypted, armor=False, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_default_armored_blob_encrypt(self): + encrypted = yield self.blob.encrypt() + decode = base64.urlsafe_b64decode + assert map(decode, encrypted.getvalue().split()) + + @defer.inlineCallbacks + def test_blob_encryptor(self): + encrypted = yield self.blob.encrypt() + preamble, ciphertext = encrypted.getvalue().split() + preamble = base64.urlsafe_b64decode(preamble) + ciphertext = base64.urlsafe_b64decode(ciphertext) + ciphertext = ciphertext[:-16] + + assert len(preamble) == _preamble.PACMAN.size + unpacked_data = _preamble.PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data + assert magic == _crypto.MAGIC + assert sch == 1 + assert meth == _crypto.ENC_METHOD.aes_256_gcm + assert iv == self.blob.iv + assert doc_id == 'D-deadbeef' + assert rev == self.doc_info.rev + + aes_key = _crypto._get_sym_key_for_doc( + self.doc_info.doc_id, 'A' * 96) + assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0] + + decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag, + ciphertext, preamble) + assert str(decrypted) == snowden1 + + @defer.inlineCallbacks + def test_init_with_preamble_alone(self): + ciphertext = yield self.blob.encrypt() + preamble = ciphertext.getvalue().split()[0] + decryptor = _crypto.BlobDecryptor( + self.doc_info, BytesIO(preamble), + start_stream=False, + secret='A' * 96) + assert decryptor._consume_preamble() + + @defer.inlineCallbacks + def test_incremental_blob_decryptor(self): + ciphertext = yield self.blob.encrypt() + preamble, ciphertext = ciphertext.getvalue().split() + ciphertext = base64.urlsafe_b64decode(ciphertext) + + decryptor = _crypto.BlobDecryptor( + self.doc_info, BytesIO(preamble), + start_stream=False, + secret='A' * 96, + tag=ciphertext[-16:]) + ciphertext = BytesIO(ciphertext[:-16]) + chunk = ciphertext.read(10) + while chunk: + decryptor.write(chunk) + chunk = ciphertext.read(10) + decrypted = decryptor._end_stream() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_blob_decryptor(self): + ciphertext = yield self.blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_unarmored_blob_decryptor(self): + self.blob.armor = False + ciphertext = yield self.blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, + armor=False, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + @defer.inlineCallbacks + def test_encrypt_and_decrypt(self): + """ + Check that encrypting and decrypting gives same doc. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + assert encrypted != payload + assert 'raw' in encrypted + doc2 = SoledadDocument('id1', '1') + doc2.set_json(encrypted) + assert _crypto.is_symmetrically_encrypted(encrypted) + decrypted = (yield crypto.decrypt_doc(doc2)).getvalue() + assert len(decrypted) != 0 + assert json.loads(decrypted) == payload + + @defer.inlineCallbacks + def test_decrypt_with_wrong_tag_raises(self): + """ + Trying to decrypt a document with wrong MAC should raise. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + encdict = json.loads(encrypted) + preamble, raw = str(encdict['raw']).split() + preamble = base64.urlsafe_b64decode(preamble) + raw = base64.urlsafe_b64decode(raw) + # mess with tag + messed = raw[:-16] + '0' * 16 + + preamble = base64.urlsafe_b64encode(preamble) + newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) + doc2 = SoledadDocument('id1', '1') + doc2.set_json(json.dumps({"raw": str(newraw)})) + + with pytest.raises(_crypto.InvalidBlob): + yield crypto.decrypt_doc(doc2) + + +class SoledadSecretsTestCase(BaseSoledadTest): + + def test_generated_secrets_have_correct_length(self): + expected = self._soledad.secrets.lengths + for name, length in expected.iteritems(): + secret = getattr(self._soledad.secrets, name) + self.assertEqual(length, len(secret)) + + +class SoledadCryptoAESTestCase(BaseSoledadTest): + + def test_encrypt_decrypt_sym(self): + # generate 256-bit key + key = os.urandom(32) + iv, cyphertext = _crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) + self.assertEqual('data', plaintext) + + def test_decrypt_with_wrong_iv_raises(self): + key = os.urandom(32) + iv, cyphertext = _crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + # get a different iv by changing the first byte + rawiv = binascii.a2b_base64(iv) + wrongiv = rawiv + while wrongiv == rawiv: + wrongiv = os.urandom(1) + rawiv[1:] + with pytest.raises(InvalidTag): + _crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) + + def test_decrypt_with_wrong_key_raises(self): + key = os.urandom(32) + iv, cyphertext = _crypto.encrypt_sym('data', key) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + wrongkey = os.urandom(32) # 256-bits key + # ensure keys are different in case we are extremely lucky + while wrongkey == key: + wrongkey = os.urandom(32) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym(cyphertext, wrongkey, iv) + + +class PreambleTestCase(unittest.TestCase): + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + def setUp(self): + self.cleartext = BytesIO(snowden1) + self.blob = _crypto.BlobEncryptor( + self.doc_info, self.cleartext, + secret='A' * 96) + + def test_preamble_starts_with_magic_signature(self): + preamble = self.blob._encode_preamble() + assert preamble.startswith(_crypto.MAGIC) + + def test_preamble_has_cipher_metadata(self): + preamble = self.blob._encode_preamble() + unpacked = _preamble.PACMAN.unpack(preamble) + encryption_scheme, encryption_method = unpacked[1:3] + assert encryption_scheme in _crypto.ENC_SCHEME + assert encryption_method in _crypto.ENC_METHOD + assert unpacked[4] == self.blob.iv + + def test_preamble_has_document_sync_metadata(self): + preamble = self.blob._encode_preamble() + unpacked = _preamble.PACMAN.unpack(preamble) + doc_id, doc_rev = unpacked[5:7] + assert doc_id == self.doc_info.doc_id + assert doc_rev == self.doc_info.rev + + def test_preamble_has_document_size(self): + preamble = self.blob._encode_preamble() + unpacked = _preamble.PACMAN.unpack(preamble) + size = unpacked[7] + assert size == _crypto._ceiling(len(snowden1)) + + @defer.inlineCallbacks + def test_preamble_can_come_without_size(self): + # XXX: This test case is here only to test backwards compatibility! + preamble = self.blob._encode_preamble() + # repack preamble using legacy format, without doc size + unpacked = _preamble.PACMAN.unpack(preamble) + preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7]) + # encrypt it manually for custom tag + ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv, + self.cleartext.getvalue(), + aead=preamble_without_size) + ciphertext = ciphertext + tag + # encode it + ciphertext = base64.urlsafe_b64encode(ciphertext) + preamble_without_size = base64.urlsafe_b64encode(preamble_without_size) + # decrypt it + ciphertext = preamble_without_size + ' ' + ciphertext + cleartext = yield _crypto.BlobDecryptor( + self.doc_info, BytesIO(ciphertext), + secret='A' * 96).decrypt() + assert cleartext.getvalue() == self.cleartext.getvalue() + warnings = self.flushWarnings() + assert len(warnings) == 1 + assert 'legacy preamble without size' in warnings[0]['message'] + + +def _aes_encrypt(key, iv, data, aead=''): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) + encryptor = cipher.encryptor() + if aead: + encryptor.authenticate_additional_data(aead) + return encryptor.update(data) + encryptor.finalize(), encryptor.tag + + +def _aes_decrypt(key, iv, tag, data, aead=''): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) + decryptor = cipher.decryptor() + if aead: + decryptor.authenticate_additional_data(aead) + return decryptor.update(data) + decryptor.finalize() |