summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/_crypto.py41
-rw-r--r--client/src/leap/soledad/client/secrets.py35
-rw-r--r--testing/tests/benchmarks/test_crypto.py4
-rw-r--r--testing/tests/client/test_crypto.py30
4 files changed, 69 insertions, 41 deletions
diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py
index d9211322..4bbdd044 100644
--- a/client/src/leap/soledad/client/_crypto.py
+++ b/client/src/leap/soledad/client/_crypto.py
@@ -54,7 +54,7 @@ BLOB_SIGNATURE_MAGIC = '\x13\x37'
ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1)
-ENC_METHOD = namedtuple('METHOD', 'aes_256_gcm')(1)
+ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2)
DocInfo = namedtuple('DocInfo', 'doc_id rev')
@@ -123,9 +123,9 @@ class SoledadCrypto(object):
return decryptor.decrypt()
-def encrypt_sym(data, key):
+def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm):
"""
- Encrypt data using AES-256 cipher in GCM mode.
+ Encrypt data using AES-256 cipher in selected mode.
:param data: The data to be encrypted.
:type data: str
@@ -136,17 +136,18 @@ def encrypt_sym(data, key):
encoded as base64.
:rtype: (str, str)
"""
- encryptor = AESWriter(key)
+ mode = _mode_by_method(method)
+ encryptor = AESWriter(key, mode=mode)
encryptor.write(data)
_, ciphertext = encryptor.end()
iv = base64.b64encode(encryptor.iv)
- tag = base64.b64encode(encryptor.tag)
- return iv, tag, ciphertext
+ tag = encryptor.tag or ''
+ return iv, ciphertext + tag
-def decrypt_sym(data, key, iv, tag):
+def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm):
"""
- Decrypt data using AES-256 cipher in GCM mode.
+ Decrypt data using AES-256 cipher in selected mode.
:param data: The data to be decrypted.
:type data: str
@@ -160,8 +161,11 @@ def decrypt_sym(data, key, iv, tag):
:rtype: str
"""
_iv = base64.b64decode(str(iv))
- tag = base64.b64decode(str(tag))
- decryptor = AESWriter(key, _iv, tag=tag)
+ mode = _mode_by_method(method)
+ tag = None
+ if mode == modes.GCM:
+ data, tag = data[:-16], data[-16:]
+ decryptor = AESWriter(key, _iv, tag=tag, mode=mode)
decryptor.write(data)
_, plaintext = decryptor.end()
return plaintext
@@ -315,12 +319,12 @@ class AESWriter(object):
"""
implements(interfaces.IConsumer)
- def __init__(self, key, iv=None, _buffer=None, tag=None):
+ def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM):
if len(key) != 32:
raise EncryptionDecryptionError('key is not 256 bits')
self.iv = iv or os.urandom(16)
self.buffer = _buffer or BytesIO()
- cipher = _get_aes_gcm_cipher(key, self.iv, tag)
+ cipher = _get_aes_cipher(key, self.iv, tag, mode)
cipher = cipher.decryptor() if tag else cipher.encryptor()
self.cipher, self.aead = cipher, ''
@@ -330,7 +334,7 @@ class AESWriter(object):
@property
def tag(self):
- return self.cipher.tag
+ return getattr(self.cipher, 'tag', None)
def write(self, data):
self.buffer.write(self.cipher.update(data))
@@ -366,10 +370,17 @@ def _get_sym_key_for_doc(doc_id, secret):
return _hmac_sha256(key, doc_id)
-def _get_aes_gcm_cipher(key, iv, tag):
- mode = modes.GCM(iv, tag)
+def _get_aes_cipher(key, iv, tag, mode=modes.GCM):
+ mode = mode(iv, tag) if mode == modes.GCM else mode(iv)
return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND)
def _split(base64_raw_payload):
return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload))
+
+
+def _mode_by_method(method):
+ if method == ENC_METHOD.aes_256_gcm:
+ return modes.GCM
+ else:
+ return modes.CTR
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index 1eb6f31d..06488f74 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -34,7 +34,7 @@ from leap.soledad.common import soledad_assert_type
from leap.soledad.common import document
from leap.soledad.common.log import getLogger
from leap.soledad.client import events
-from leap.soledad.client.crypto import encrypt_sym, decrypt_sym
+from leap.soledad.client import _crypto
logger = getLogger(__name__)
@@ -126,7 +126,7 @@ class SoledadSecrets(object):
instantiates Soledad.
"""
- IV_SEPARATOR = ":"
+ SEPARATOR = ":"
"""
A separator used for storing the encryption initial value prepended to the
ciphertext.
@@ -142,7 +142,8 @@ class SoledadSecrets(object):
KDF_SALT_KEY = 'kdf_salt'
KDF_LENGTH_KEY = 'kdf_length'
KDF_SCRYPT = 'scrypt'
- CIPHER_AES256 = 'aes256'
+ CIPHER_AES256 = 'aes256' # deprecated, AES-GCM
+ CIPHER_AES256_GCM = _crypto.ENC_METHOD.aes_256_gcm
RECOVERY_DOC_VERSION_KEY = 'version'
RECOVERY_DOC_VERSION = 1
"""
@@ -343,7 +344,7 @@ class SoledadSecrets(object):
'%s%s' %
(self._passphrase_as_string(), self._uuid)).hexdigest()
- def _export_recovery_document(self):
+ def _export_recovery_document(self, cipher=None):
"""
Export the storage secrets.
@@ -364,6 +365,9 @@ class SoledadSecrets(object):
Note that multiple storage secrets might be stored in one recovery
document.
+ :param cipher: (Optional) The ciper to use. Defaults to AES256
+ :type cipher: str
+
:return: The recovery document.
:rtype: dict
"""
@@ -371,7 +375,7 @@ class SoledadSecrets(object):
encrypted_secrets = {}
for secret_id in self._secrets:
encrypted_secrets[secret_id] = self._encrypt_storage_secret(
- self._secrets[secret_id])
+ self._secrets[secret_id], doc_cipher=cipher)
# create the recovery document
data = {
self.STORAGE_SECRETS_KEY: encrypted_secrets,
@@ -537,18 +541,20 @@ class SoledadSecrets(object):
)
if encrypted_secret_dict[self.KDF_LENGTH_KEY] != len(key):
raise SecretsException("Wrong length of decryption key.")
- if encrypted_secret_dict[self.CIPHER_KEY] != self.CIPHER_AES256:
+ supported_ciphers = [self.CIPHER_AES256, self.CIPHER_AES256_GCM]
+ doc_cipher = encrypted_secret_dict[self.CIPHER_KEY]
+ if doc_cipher not in supported_ciphers:
raise SecretsException("Unknown cipher in stored secret.")
# recover the initial value and ciphertext
iv, ciphertext = encrypted_secret_dict[self.SECRET_KEY].split(
- self.IV_SEPARATOR, 1)
+ self.SEPARATOR, 1)
ciphertext = binascii.a2b_base64(ciphertext)
- decrypted_secret = decrypt_sym(ciphertext, key, iv)
+ decrypted_secret = _crypto.decrypt_sym(ciphertext, key, iv, doc_cipher)
if encrypted_secret_dict[self.LENGTH_KEY] != len(decrypted_secret):
raise SecretsException("Wrong length of decrypted secret.")
return decrypted_secret
- def _encrypt_storage_secret(self, decrypted_secret):
+ def _encrypt_storage_secret(self, decrypted_secret, doc_cipher=None):
"""
Encrypt the storage secret.
@@ -567,6 +573,8 @@ class SoledadSecrets(object):
:param decrypted_secret: The decrypted storage secret.
:type decrypted_secret: str
+ :param cipher: (Optional) The ciper to use. Defaults to AES256
+ :type cipher: str
:return: The encrypted storage secret.
:rtype: dict
@@ -575,17 +583,18 @@ class SoledadSecrets(object):
salt = os.urandom(self.SALT_LENGTH)
# get a 256-bit key
key = scrypt.hash(self._passphrase_as_string(), salt, buflen=32)
- iv, ciphertext = encrypt_sym(decrypted_secret, key)
+ doc_cipher = doc_cipher or self.CIPHER_AES256
+ iv, ciphertext = _crypto.encrypt_sym(decrypted_secret, key, doc_cipher)
+ ciphertext = binascii.b2a_base64(ciphertext)
encrypted_secret_dict = {
# leap.soledad.crypto submodule uses AES256 for symmetric
# encryption.
self.KDF_KEY: self.KDF_SCRYPT,
self.KDF_SALT_KEY: binascii.b2a_base64(salt),
self.KDF_LENGTH_KEY: len(key),
- self.CIPHER_KEY: self.CIPHER_AES256,
+ self.CIPHER_KEY: doc_cipher,
self.LENGTH_KEY: len(decrypted_secret),
- self.SECRET_KEY: '%s%s%s' % (
- str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)),
+ self.SECRET_KEY: self.SEPARATOR.join([str(iv), ciphertext])
}
return encrypted_secret_dict
diff --git a/testing/tests/benchmarks/test_crypto.py b/testing/tests/benchmarks/test_crypto.py
index 631ac041..8ee9b899 100644
--- a/testing/tests/benchmarks/test_crypto.py
+++ b/testing/tests/benchmarks/test_crypto.py
@@ -66,8 +66,8 @@ def create_raw_decryption(size):
@pytest.mark.benchmark(group="test_crypto_raw_decrypt")
def test_raw_decrypt(benchmark, payload):
key = payload(32)
- iv, tag, ciphertext = _crypto.encrypt_sym(payload(size), key)
- benchmark(_crypto.decrypt_sym, ciphertext, key, iv, tag)
+ iv, ciphertext = _crypto.encrypt_sym(payload(size), key)
+ benchmark(_crypto.decrypt_sym, ciphertext, key, iv)
return test_raw_decrypt
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 10acba56..277d5430 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -114,7 +114,7 @@ class BlobTestCase(unittest.TestCase):
magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
assert magic == _crypto.BLOB_SIGNATURE_MAGIC
assert sch == 1
- assert meth == 1
+ assert meth == _crypto.ENC_METHOD.aes_256_gcm
assert iv == blob.iv
assert doc_id == 'D-deadbeef'
assert rev == self.doc_info.rev
@@ -163,7 +163,7 @@ class BlobTestCase(unittest.TestCase):
assert json.loads(decrypted) == payload
@defer.inlineCallbacks
- def test_decrypt_with_wrong_mac_raises(self):
+ def test_decrypt_with_wrong_tag_raises(self):
"""
Trying to decrypt a document with wrong MAC should raise.
"""
@@ -174,7 +174,7 @@ class BlobTestCase(unittest.TestCase):
encrypted = yield crypto.encrypt_doc(doc1)
encdict = json.loads(encrypted)
preamble, raw = _crypto._split(str(encdict['raw']))
- # mess with MAC
+ # mess with tag
messed = raw[:-16] + '0' * 16
preamble = base64.urlsafe_b64encode(preamble)
@@ -205,8 +205,8 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)
self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret)
- def test_import_recovery_document(self):
- rd = self._soledad.secrets._export_recovery_document()
+ def test_import_recovery_document(self, cipher='aes256'):
+ rd = self._soledad.secrets._export_recovery_document(cipher)
s = self._soledad_instance()
s.secrets._import_recovery_document(rd)
s.secrets.set_secret_id(self._soledad.secrets._secret_id)
@@ -215,6 +215,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
'Failed settinng secret for symmetric encryption.')
s.close()
+ def test_import_GCM_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256_GCM
+ self.test_import_recovery_document(cipher)
+
+ def test_import_legacy_CTR_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256
+ self.test_import_recovery_document(cipher)
+
class SoledadSecretsTestCase(BaseSoledadTest):
@@ -277,16 +285,16 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
def test_encrypt_decrypt_sym(self):
# generate 256-bit key
key = os.urandom(32)
- iv, tag, cyphertext = _crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- plaintext = _crypto.decrypt_sym(cyphertext, key, iv, tag)
+ plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
self.assertEqual('data', plaintext)
def test_decrypt_with_wrong_iv_raises(self):
key = os.urandom(32)
- iv, tag, cyphertext = _crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -297,11 +305,11 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
wrongiv = os.urandom(1) + rawiv[1:]
with pytest.raises(InvalidTag):
_crypto.decrypt_sym(
- cyphertext, key, iv=binascii.b2a_base64(wrongiv), tag=tag)
+ cyphertext, key, iv=binascii.b2a_base64(wrongiv))
def test_decrypt_with_wrong_key_raises(self):
key = os.urandom(32)
- iv, tag, cyphertext = _crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -310,7 +318,7 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
while wrongkey == key:
wrongkey = os.urandom(32)
with pytest.raises(InvalidTag):
- _crypto.decrypt_sym(cyphertext, wrongkey, iv, tag)
+ _crypto.decrypt_sym(cyphertext, wrongkey, iv)
def _aes_encrypt(key, iv, data):