diff options
Diffstat (limited to 'testing/tests/client/test_crypto.py')
-rw-r--r-- | testing/tests/client/test_crypto.py | 384 |
1 files changed, 0 insertions, 384 deletions
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py deleted file mode 100644 index 5b647b73..00000000 --- a/testing/tests/client/test_crypto.py +++ /dev/null @@ -1,384 +0,0 @@ -# -*- coding: utf-8 -*- -# test_crypto.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for cryptographic related stuff. -""" -import binascii -import base64 -import json -import os - -from io import BytesIO - -import pytest - -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend -from cryptography.exceptions import InvalidTag - -from leap.soledad.common.document import SoledadDocument -from test_soledad.util import BaseSoledadTest -from leap.soledad.client import _crypto -from leap.soledad.client import _scrypt -from leap.soledad.common.blobs import preamble as _preamble - -from twisted.trial import unittest -from twisted.internet import defer - - -snowden1 = ( - "You can't come up against " - "the world's most powerful intelligence " - "agencies and not accept the risk. " - "If they want to get you, over time " - "they will.") - - -class ScryptTest(unittest.TestCase): - - def test_scrypt(self): - secret = 'supersikret' - salt = 'randomsalt' - key = _scrypt.hash(secret, salt, buflen=32) - expected = ('47996b569ea58d51ccbcc318d710' - 'a537acd28bb7a94615ab8d061d4b2a920f01') - assert binascii.b2a_hex(key) == expected - - -class AESTest(unittest.TestCase): - - def test_chunked_encryption(self): - key = 'A' * 32 - - fd = BytesIO() - aes = _crypto.AESWriter(key, _buffer=fd) - iv = aes.iv - - data = snowden1 - block = 16 - - for i in range(len(data) / block): - chunk = data[i * block:(i + 1) * block] - aes.write(chunk) - aes.end() - - ciphertext_chunked = fd.getvalue() - ciphertext, tag = _aes_encrypt(key, iv, data) - - assert ciphertext_chunked == ciphertext - - def test_decrypt(self): - key = 'A' * 32 - iv = 'A' * 16 - - data = snowden1 - block = 16 - - ciphertext, tag = _aes_encrypt(key, iv, data) - - fd = BytesIO() - aes = _crypto.AESWriter(key, iv, fd, tag=tag) - - for i in range(len(ciphertext) / block): - chunk = ciphertext[i * block:(i + 1) * block] - aes.write(chunk) - aes.end() - - cleartext_chunked = fd.getvalue() - assert cleartext_chunked == data - - -class BlobTestCase(unittest.TestCase): - - class doc_info: - doc_id = 'D-deadbeef' - rev = '397932e0c77f45fcb7c3732930e7e9b2:1' - - def setUp(self): - self.inf = BytesIO(snowden1) - self.blob = _crypto.BlobEncryptor( - self.doc_info, self.inf, - armor=True, - secret='A' * 96) - - @defer.inlineCallbacks - def test_unarmored_blob_encrypt(self): - self.blob.armor = False - encrypted = yield self.blob.encrypt() - - decryptor = _crypto.BlobDecryptor( - self.doc_info, encrypted, armor=False, - secret='A' * 96) - decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_default_armored_blob_encrypt(self): - encrypted = yield self.blob.encrypt() - decode = base64.urlsafe_b64decode - assert map(decode, encrypted.getvalue().split()) - - @defer.inlineCallbacks - def test_blob_encryptor(self): - encrypted = yield self.blob.encrypt() - preamble, ciphertext = encrypted.getvalue().split() - preamble = base64.urlsafe_b64decode(preamble) - ciphertext = base64.urlsafe_b64decode(ciphertext) - ciphertext = ciphertext[:-16] - - assert len(preamble) == _preamble.PACMAN.size - unpacked_data = _preamble.PACMAN.unpack(preamble) - magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data - assert magic == _crypto.MAGIC - assert sch == 1 - assert meth == _crypto.ENC_METHOD.aes_256_gcm - assert iv == self.blob.iv - assert doc_id == 'D-deadbeef' - assert rev == self.doc_info.rev - - aes_key = _crypto._get_sym_key_for_doc( - self.doc_info.doc_id, 'A' * 96) - assert ciphertext == _aes_encrypt(aes_key, self.blob.iv, snowden1)[0] - - decrypted = _aes_decrypt(aes_key, self.blob.iv, self.blob.tag, - ciphertext, preamble) - assert str(decrypted) == snowden1 - - @defer.inlineCallbacks - def test_init_with_preamble_alone(self): - ciphertext = yield self.blob.encrypt() - preamble = ciphertext.getvalue().split()[0] - decryptor = _crypto.BlobDecryptor( - self.doc_info, BytesIO(preamble), - start_stream=False, - secret='A' * 96) - assert decryptor._consume_preamble() - - @defer.inlineCallbacks - def test_incremental_blob_decryptor(self): - ciphertext = yield self.blob.encrypt() - preamble, ciphertext = ciphertext.getvalue().split() - ciphertext = base64.urlsafe_b64decode(ciphertext) - - decryptor = _crypto.BlobDecryptor( - self.doc_info, BytesIO(preamble), - start_stream=False, - secret='A' * 96, - tag=ciphertext[-16:]) - ciphertext = BytesIO(ciphertext[:-16]) - chunk = ciphertext.read(10) - while chunk: - decryptor.write(chunk) - chunk = ciphertext.read(10) - decrypted = decryptor._end_stream() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_blob_decryptor(self): - ciphertext = yield self.blob.encrypt() - - decryptor = _crypto.BlobDecryptor( - self.doc_info, ciphertext, - secret='A' * 96) - decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_unarmored_blob_decryptor(self): - self.blob.armor = False - ciphertext = yield self.blob.encrypt() - - decryptor = _crypto.BlobDecryptor( - self.doc_info, ciphertext, - armor=False, - secret='A' * 96) - decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 - - @defer.inlineCallbacks - def test_encrypt_and_decrypt(self): - """ - Check that encrypting and decrypting gives same doc. - """ - crypto = _crypto.SoledadCrypto('A' * 96) - payload = {'key': 'someval'} - doc1 = SoledadDocument('id1', '1', json.dumps(payload)) - - encrypted = yield crypto.encrypt_doc(doc1) - assert encrypted != payload - assert 'raw' in encrypted - doc2 = SoledadDocument('id1', '1') - doc2.set_json(encrypted) - assert _crypto.is_symmetrically_encrypted(encrypted) - decrypted = (yield crypto.decrypt_doc(doc2)).getvalue() - assert len(decrypted) != 0 - assert json.loads(decrypted) == payload - - @defer.inlineCallbacks - def test_decrypt_with_wrong_tag_raises(self): - """ - Trying to decrypt a document with wrong MAC should raise. - """ - crypto = _crypto.SoledadCrypto('A' * 96) - payload = {'key': 'someval'} - doc1 = SoledadDocument('id1', '1', json.dumps(payload)) - - encrypted = yield crypto.encrypt_doc(doc1) - encdict = json.loads(encrypted) - preamble, raw = str(encdict['raw']).split() - preamble = base64.urlsafe_b64decode(preamble) - raw = base64.urlsafe_b64decode(raw) - # mess with tag - messed = raw[:-16] + '0' * 16 - - preamble = base64.urlsafe_b64encode(preamble) - newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) - doc2 = SoledadDocument('id1', '1') - doc2.set_json(json.dumps({"raw": str(newraw)})) - - with pytest.raises(_crypto.InvalidBlob): - yield crypto.decrypt_doc(doc2) - - -class SoledadSecretsTestCase(BaseSoledadTest): - - def test_generated_secrets_have_correct_length(self): - expected = self._soledad.secrets.lengths - for name, length in expected.iteritems(): - secret = getattr(self._soledad.secrets, name) - self.assertEqual(length, len(secret)) - - -class SoledadCryptoAESTestCase(BaseSoledadTest): - - def test_encrypt_decrypt_sym(self): - # generate 256-bit key - key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - plaintext = _crypto.decrypt_sym(cyphertext, key, iv) - self.assertEqual('data', plaintext) - - def test_decrypt_with_wrong_iv_raises(self): - key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - # get a different iv by changing the first byte - rawiv = binascii.a2b_base64(iv) - wrongiv = rawiv - while wrongiv == rawiv: - wrongiv = os.urandom(1) + rawiv[1:] - with pytest.raises(InvalidTag): - _crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - - def test_decrypt_with_wrong_key_raises(self): - key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - wrongkey = os.urandom(32) # 256-bits key - # ensure keys are different in case we are extremely lucky - while wrongkey == key: - wrongkey = os.urandom(32) - with pytest.raises(InvalidTag): - _crypto.decrypt_sym(cyphertext, wrongkey, iv) - - -class PreambleTestCase(unittest.TestCase): - class doc_info: - doc_id = 'D-deadbeef' - rev = '397932e0c77f45fcb7c3732930e7e9b2:1' - - def setUp(self): - self.cleartext = BytesIO(snowden1) - self.blob = _crypto.BlobEncryptor( - self.doc_info, self.cleartext, - secret='A' * 96) - - def test_preamble_starts_with_magic_signature(self): - preamble = self.blob._encode_preamble() - assert preamble.startswith(_crypto.MAGIC) - - def test_preamble_has_cipher_metadata(self): - preamble = self.blob._encode_preamble() - unpacked = _preamble.PACMAN.unpack(preamble) - encryption_scheme, encryption_method = unpacked[1:3] - assert encryption_scheme in _crypto.ENC_SCHEME - assert encryption_method in _crypto.ENC_METHOD - assert unpacked[4] == self.blob.iv - - def test_preamble_has_document_sync_metadata(self): - preamble = self.blob._encode_preamble() - unpacked = _preamble.PACMAN.unpack(preamble) - doc_id, doc_rev = unpacked[5:7] - assert doc_id == self.doc_info.doc_id - assert doc_rev == self.doc_info.rev - - def test_preamble_has_document_size(self): - preamble = self.blob._encode_preamble() - unpacked = _preamble.PACMAN.unpack(preamble) - size = unpacked[7] - assert size == _crypto._ceiling(len(snowden1)) - - @defer.inlineCallbacks - def test_preamble_can_come_without_size(self): - # XXX: This test case is here only to test backwards compatibility! - preamble = self.blob._encode_preamble() - # repack preamble using legacy format, without doc size - unpacked = _preamble.PACMAN.unpack(preamble) - preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7]) - # encrypt it manually for custom tag - ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv, - self.cleartext.getvalue(), - aead=preamble_without_size) - ciphertext = ciphertext + tag - # encode it - ciphertext = base64.urlsafe_b64encode(ciphertext) - preamble_without_size = base64.urlsafe_b64encode(preamble_without_size) - # decrypt it - ciphertext = preamble_without_size + ' ' + ciphertext - cleartext = yield _crypto.BlobDecryptor( - self.doc_info, BytesIO(ciphertext), - secret='A' * 96).decrypt() - assert cleartext.getvalue() == self.cleartext.getvalue() - warnings = self.flushWarnings() - assert len(warnings) == 1 - assert 'legacy preamble without size' in warnings[0]['message'] - - -def _aes_encrypt(key, iv, data, aead=''): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) - encryptor = cipher.encryptor() - if aead: - encryptor.authenticate_additional_data(aead) - return encryptor.update(data) + encryptor.finalize(), encryptor.tag - - -def _aes_decrypt(key, iv, tag, data, aead=''): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) - decryptor = cipher.decryptor() - if aead: - decryptor.authenticate_additional_data(aead) - return decryptor.update(data) + decryptor.finalize() |