From 1bc85d13569635644f9954dea5f615c9256c8c56 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Dec 2016 01:24:53 -0300 Subject: [feature] Add retro compat on secrets.py ciphers Integrated the secrets's JSON key that specifies ciphers into _crypto and added optional GCM. Also added a test to check if both cipher types can be imported. Resolves: #8680 Signed-off-by: Victor Shyba --- client/src/leap/soledad/client/_crypto.py | 41 ++++++++++++++++++++----------- client/src/leap/soledad/client/secrets.py | 35 ++++++++++++++++---------- testing/tests/benchmarks/test_crypto.py | 4 +-- testing/tests/client/test_crypto.py | 30 +++++++++++++--------- 4 files changed, 69 insertions(+), 41 deletions(-) diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py index d9211322..4bbdd044 100644 --- a/client/src/leap/soledad/client/_crypto.py +++ b/client/src/leap/soledad/client/_crypto.py @@ -54,7 +54,7 @@ BLOB_SIGNATURE_MAGIC = '\x13\x37' ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1) -ENC_METHOD = namedtuple('METHOD', 'aes_256_gcm')(1) +ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2) DocInfo = namedtuple('DocInfo', 'doc_id rev') @@ -123,9 +123,9 @@ class SoledadCrypto(object): return decryptor.decrypt() -def encrypt_sym(data, key): +def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm): """ - Encrypt data using AES-256 cipher in GCM mode. + Encrypt data using AES-256 cipher in selected mode. :param data: The data to be encrypted. :type data: str @@ -136,17 +136,18 @@ def encrypt_sym(data, key): encoded as base64. :rtype: (str, str) """ - encryptor = AESWriter(key) + mode = _mode_by_method(method) + encryptor = AESWriter(key, mode=mode) encryptor.write(data) _, ciphertext = encryptor.end() iv = base64.b64encode(encryptor.iv) - tag = base64.b64encode(encryptor.tag) - return iv, tag, ciphertext + tag = encryptor.tag or '' + return iv, ciphertext + tag -def decrypt_sym(data, key, iv, tag): +def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm): """ - Decrypt data using AES-256 cipher in GCM mode. + Decrypt data using AES-256 cipher in selected mode. :param data: The data to be decrypted. :type data: str @@ -160,8 +161,11 @@ def decrypt_sym(data, key, iv, tag): :rtype: str """ _iv = base64.b64decode(str(iv)) - tag = base64.b64decode(str(tag)) - decryptor = AESWriter(key, _iv, tag=tag) + mode = _mode_by_method(method) + tag = None + if mode == modes.GCM: + data, tag = data[:-16], data[-16:] + decryptor = AESWriter(key, _iv, tag=tag, mode=mode) decryptor.write(data) _, plaintext = decryptor.end() return plaintext @@ -315,12 +319,12 @@ class AESWriter(object): """ implements(interfaces.IConsumer) - def __init__(self, key, iv=None, _buffer=None, tag=None): + def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM): if len(key) != 32: raise EncryptionDecryptionError('key is not 256 bits') self.iv = iv or os.urandom(16) self.buffer = _buffer or BytesIO() - cipher = _get_aes_gcm_cipher(key, self.iv, tag) + cipher = _get_aes_cipher(key, self.iv, tag, mode) cipher = cipher.decryptor() if tag else cipher.encryptor() self.cipher, self.aead = cipher, '' @@ -330,7 +334,7 @@ class AESWriter(object): @property def tag(self): - return self.cipher.tag + return getattr(self.cipher, 'tag', None) def write(self, data): self.buffer.write(self.cipher.update(data)) @@ -366,10 +370,17 @@ def _get_sym_key_for_doc(doc_id, secret): return _hmac_sha256(key, doc_id) -def _get_aes_gcm_cipher(key, iv, tag): - mode = modes.GCM(iv, tag) +def _get_aes_cipher(key, iv, tag, mode=modes.GCM): + mode = mode(iv, tag) if mode == modes.GCM else mode(iv) return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND) def _split(base64_raw_payload): return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload)) + + +def _mode_by_method(method): + if method == ENC_METHOD.aes_256_gcm: + return modes.GCM + else: + return modes.CTR diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 1eb6f31d..06488f74 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -34,7 +34,7 @@ from leap.soledad.common import soledad_assert_type from leap.soledad.common import document from leap.soledad.common.log import getLogger from leap.soledad.client import events -from leap.soledad.client.crypto import encrypt_sym, decrypt_sym +from leap.soledad.client import _crypto logger = getLogger(__name__) @@ -126,7 +126,7 @@ class SoledadSecrets(object): instantiates Soledad. """ - IV_SEPARATOR = ":" + SEPARATOR = ":" """ A separator used for storing the encryption initial value prepended to the ciphertext. @@ -142,7 +142,8 @@ class SoledadSecrets(object): KDF_SALT_KEY = 'kdf_salt' KDF_LENGTH_KEY = 'kdf_length' KDF_SCRYPT = 'scrypt' - CIPHER_AES256 = 'aes256' + CIPHER_AES256 = 'aes256' # deprecated, AES-GCM + CIPHER_AES256_GCM = _crypto.ENC_METHOD.aes_256_gcm RECOVERY_DOC_VERSION_KEY = 'version' RECOVERY_DOC_VERSION = 1 """ @@ -343,7 +344,7 @@ class SoledadSecrets(object): '%s%s' % (self._passphrase_as_string(), self._uuid)).hexdigest() - def _export_recovery_document(self): + def _export_recovery_document(self, cipher=None): """ Export the storage secrets. @@ -364,6 +365,9 @@ class SoledadSecrets(object): Note that multiple storage secrets might be stored in one recovery document. + :param cipher: (Optional) The ciper to use. Defaults to AES256 + :type cipher: str + :return: The recovery document. :rtype: dict """ @@ -371,7 +375,7 @@ class SoledadSecrets(object): encrypted_secrets = {} for secret_id in self._secrets: encrypted_secrets[secret_id] = self._encrypt_storage_secret( - self._secrets[secret_id]) + self._secrets[secret_id], doc_cipher=cipher) # create the recovery document data = { self.STORAGE_SECRETS_KEY: encrypted_secrets, @@ -537,18 +541,20 @@ class SoledadSecrets(object): ) if encrypted_secret_dict[self.KDF_LENGTH_KEY] != len(key): raise SecretsException("Wrong length of decryption key.") - if encrypted_secret_dict[self.CIPHER_KEY] != self.CIPHER_AES256: + supported_ciphers = [self.CIPHER_AES256, self.CIPHER_AES256_GCM] + doc_cipher = encrypted_secret_dict[self.CIPHER_KEY] + if doc_cipher not in supported_ciphers: raise SecretsException("Unknown cipher in stored secret.") # recover the initial value and ciphertext iv, ciphertext = encrypted_secret_dict[self.SECRET_KEY].split( - self.IV_SEPARATOR, 1) + self.SEPARATOR, 1) ciphertext = binascii.a2b_base64(ciphertext) - decrypted_secret = decrypt_sym(ciphertext, key, iv) + decrypted_secret = _crypto.decrypt_sym(ciphertext, key, iv, doc_cipher) if encrypted_secret_dict[self.LENGTH_KEY] != len(decrypted_secret): raise SecretsException("Wrong length of decrypted secret.") return decrypted_secret - def _encrypt_storage_secret(self, decrypted_secret): + def _encrypt_storage_secret(self, decrypted_secret, doc_cipher=None): """ Encrypt the storage secret. @@ -567,6 +573,8 @@ class SoledadSecrets(object): :param decrypted_secret: The decrypted storage secret. :type decrypted_secret: str + :param cipher: (Optional) The ciper to use. Defaults to AES256 + :type cipher: str :return: The encrypted storage secret. :rtype: dict @@ -575,17 +583,18 @@ class SoledadSecrets(object): salt = os.urandom(self.SALT_LENGTH) # get a 256-bit key key = scrypt.hash(self._passphrase_as_string(), salt, buflen=32) - iv, ciphertext = encrypt_sym(decrypted_secret, key) + doc_cipher = doc_cipher or self.CIPHER_AES256 + iv, ciphertext = _crypto.encrypt_sym(decrypted_secret, key, doc_cipher) + ciphertext = binascii.b2a_base64(ciphertext) encrypted_secret_dict = { # leap.soledad.crypto submodule uses AES256 for symmetric # encryption. self.KDF_KEY: self.KDF_SCRYPT, self.KDF_SALT_KEY: binascii.b2a_base64(salt), self.KDF_LENGTH_KEY: len(key), - self.CIPHER_KEY: self.CIPHER_AES256, + self.CIPHER_KEY: doc_cipher, self.LENGTH_KEY: len(decrypted_secret), - self.SECRET_KEY: '%s%s%s' % ( - str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)), + self.SECRET_KEY: self.SEPARATOR.join([str(iv), ciphertext]) } return encrypted_secret_dict diff --git a/testing/tests/benchmarks/test_crypto.py b/testing/tests/benchmarks/test_crypto.py index 631ac041..8ee9b899 100644 --- a/testing/tests/benchmarks/test_crypto.py +++ b/testing/tests/benchmarks/test_crypto.py @@ -66,8 +66,8 @@ def create_raw_decryption(size): @pytest.mark.benchmark(group="test_crypto_raw_decrypt") def test_raw_decrypt(benchmark, payload): key = payload(32) - iv, tag, ciphertext = _crypto.encrypt_sym(payload(size), key) - benchmark(_crypto.decrypt_sym, ciphertext, key, iv, tag) + iv, ciphertext = _crypto.encrypt_sym(payload(size), key) + benchmark(_crypto.decrypt_sym, ciphertext, key, iv) return test_raw_decrypt diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 10acba56..277d5430 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -114,7 +114,7 @@ class BlobTestCase(unittest.TestCase): magic, sch, meth, ts, iv, doc_id, rev = unpacked_data assert magic == _crypto.BLOB_SIGNATURE_MAGIC assert sch == 1 - assert meth == 1 + assert meth == _crypto.ENC_METHOD.aes_256_gcm assert iv == blob.iv assert doc_id == 'D-deadbeef' assert rev == self.doc_info.rev @@ -163,7 +163,7 @@ class BlobTestCase(unittest.TestCase): assert json.loads(decrypted) == payload @defer.inlineCallbacks - def test_decrypt_with_wrong_mac_raises(self): + def test_decrypt_with_wrong_tag_raises(self): """ Trying to decrypt a document with wrong MAC should raise. """ @@ -174,7 +174,7 @@ class BlobTestCase(unittest.TestCase): encrypted = yield crypto.encrypt_doc(doc1) encdict = json.loads(encrypted) preamble, raw = _crypto._split(str(encdict['raw'])) - # mess with MAC + # mess with tag messed = raw[:-16] + '0' * 16 preamble = base64.urlsafe_b64encode(preamble) @@ -205,8 +205,8 @@ class RecoveryDocumentTestCase(BaseSoledadTest): self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) - def test_import_recovery_document(self): - rd = self._soledad.secrets._export_recovery_document() + def test_import_recovery_document(self, cipher='aes256'): + rd = self._soledad.secrets._export_recovery_document(cipher) s = self._soledad_instance() s.secrets._import_recovery_document(rd) s.secrets.set_secret_id(self._soledad.secrets._secret_id) @@ -215,6 +215,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest): 'Failed settinng secret for symmetric encryption.') s.close() + def test_import_GCM_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256_GCM + self.test_import_recovery_document(cipher) + + def test_import_legacy_CTR_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256 + self.test_import_recovery_document(cipher) + class SoledadSecretsTestCase(BaseSoledadTest): @@ -277,16 +285,16 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): # generate 256-bit key key = os.urandom(32) - iv, tag, cyphertext = _crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - plaintext = _crypto.decrypt_sym(cyphertext, key, iv, tag) + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) self.assertEqual('data', plaintext) def test_decrypt_with_wrong_iv_raises(self): key = os.urandom(32) - iv, tag, cyphertext = _crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -297,11 +305,11 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): wrongiv = os.urandom(1) + rawiv[1:] with pytest.raises(InvalidTag): _crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv), tag=tag) + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) def test_decrypt_with_wrong_key_raises(self): key = os.urandom(32) - iv, tag, cyphertext = _crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -310,7 +318,7 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): while wrongkey == key: wrongkey = os.urandom(32) with pytest.raises(InvalidTag): - _crypto.decrypt_sym(cyphertext, wrongkey, iv, tag) + _crypto.decrypt_sym(cyphertext, wrongkey, iv) def _aes_encrypt(key, iv, data): -- cgit v1.2.3