From fcf3b3046dd2005992638ebf993d53897af8ed3a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 14 Sep 2016 01:52:36 -0400 Subject: [refactor] remove encryption pool --- testing/tests/client/test_crypto2.py | 63 ++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 testing/tests/client/test_crypto2.py (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto2.py b/testing/tests/client/test_crypto2.py new file mode 100644 index 00000000..ae280020 --- /dev/null +++ b/testing/tests/client/test_crypto2.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# test_crypto2.py +# Copyright (C) 2016 LEAP Encryption Access Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Tests for the _crypto module +""" + +import StringIO + + +import leap.soledad.client +from leap.soledad.client import _crypto + + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend + + +def _aes_encrypt(key, iv, data): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + encryptor = cipher.encryptor() + return encryptor.update(data) + encryptor.finalize() + + +def test_chunked_encryption(): + key = 'A' * 32 + iv = 'A' * 16 + data = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + + fd = StringIO.StringIO() + aes = _crypto.AESWriter(key, fd, iv) + + block = 16 + + for i in range(len(data)/block): + chunk = data[i * block:(i+1)*block] + aes.write(chunk) + aes.end() + + ciphertext_chunked = fd.getvalue() + ciphertext = _aes_encrypt(key, iv, data) + + assert ciphertext_chunked == ciphertext -- cgit v1.2.3 From 510c0ba3a0c0ade334090a1c36dab9ccae0ba1b4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 16 Sep 2016 19:24:55 -0400 Subject: [feature] blob encryptor / decryptor --- testing/tests/client/test_crypto2.py | 126 ++++++++++++++++++++++++++++++++--- 1 file changed, 117 insertions(+), 9 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto2.py b/testing/tests/client/test_crypto2.py index ae280020..f0f6c4af 100644 --- a/testing/tests/client/test_crypto2.py +++ b/testing/tests/client/test_crypto2.py @@ -19,16 +19,28 @@ Tests for the _crypto module """ +import base64 +import binascii +import time +import struct import StringIO - import leap.soledad.client from leap.soledad.client import _crypto - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend +from twisted.trial import unittest + + +snowden1 = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + def _aes_encrypt(key, iv, data): backend = default_backend() @@ -36,20 +48,21 @@ def _aes_encrypt(key, iv, data): encryptor = cipher.encryptor() return encryptor.update(data) + encryptor.finalize() +def _aes_decrypt(key, iv, data): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + decryptor = cipher.decryptor() + return decryptor.update(data) + decryptor.finalize() + def test_chunked_encryption(): key = 'A' * 32 iv = 'A' * 16 - data = ( - "You can't come up against " - "the world's most powerful intelligence " - "agencies and not accept the risk. " - "If they want to get you, over time " - "they will.") fd = StringIO.StringIO() - aes = _crypto.AESWriter(key, fd, iv) + aes = _crypto.AESEncryptor(key, iv, fd) + data = snowden1 block = 16 for i in range(len(data)/block): @@ -61,3 +74,98 @@ def test_chunked_encryption(): ciphertext = _aes_encrypt(key, iv, data) assert ciphertext_chunked == ciphertext + + +def test_decrypt(): + key = 'A' * 32 + iv = 'A' * 16 + + data = snowden1 + block = 16 + + ciphertext = _aes_encrypt(key, iv, data) + + fd = StringIO.StringIO() + aes = _crypto.AESDecryptor(key, iv, fd) + + for i in range(len(ciphertext)/block): + chunk = ciphertext[i * block:(i+1)*block] + aes.write(chunk) + aes.end() + + cleartext_chunked = fd.getvalue() + assert cleartext_chunked == data + + + +class BlobTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + def test_blob_encryptor(self): + + inf = StringIO.StringIO() + inf.write(snowden1) + inf.seek(0) + outf = StringIO.StringIO() + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, result=outf, + secret='A' * 96, iv='B'*16) + + d = blob.encrypt() + d.addCallback(self._test_blob_encryptor_cb, outf) + return d + + def _test_blob_encryptor_cb(self, _, outf): + encrypted = outf.getvalue() + data = base64.urlsafe_b64decode(encrypted) + + assert data[0] == '\x80' + ts, sch, meth = struct.unpack( + 'Qbb', data[1:11]) + assert sch == 1 + assert meth == 1 + iv = data[11:27] + assert iv == 'B' * 16 + doc_id = data[27:37] + assert doc_id == 'D-deadbeef' + + rev = data[37:71] + assert rev == self.doc_info.rev + + ciphertext = data[71:-64] + aes_key = _crypto._get_sym_key_for_doc( + self.doc_info.doc_id, 'A'*96) + assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1) + + decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext) + assert str(decrypted) == snowden1 + + def test_blob_decryptor(self): + + inf = StringIO.StringIO() + inf.write(snowden1) + inf.seek(0) + outf = StringIO.StringIO() + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, result=outf, + secret='A' * 96, iv='B' * 16) + + def do_decrypt(_, outf): + decryptor = _crypto.BlobDecryptor( + self.doc_info, outf, + secret='A' * 96) + d = decryptor.decrypt() + return d + + d = blob.encrypt() + d.addCallback(do_decrypt, outf) + d.addCallback(self._test_blob_decryptor_cb) + return d + + def _test_blob_decryptor_cb(self, decrypted): + assert decrypted.getvalue() == snowden1 -- cgit v1.2.3 From 75208477a2f1634664b80b8501818e5a905aa0f3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 22 Sep 2016 01:42:26 -0400 Subject: [tests] adapt tests --- testing/tests/client/test_crypto.py | 263 +++++++++++++++++++++++++---------- testing/tests/client/test_crypto2.py | 171 ----------------------- 2 files changed, 188 insertions(+), 246 deletions(-) delete mode 100644 testing/tests/client/test_crypto2.py (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 77252b46..dc3054f2 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -17,47 +17,184 @@ """ Tests for cryptographic related stuff. """ -import os -import hashlib import binascii +import base64 +import hashlib +import json +import os +import struct + +from io import BytesIO + +import pytest + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend -from leap.soledad.client import crypto from leap.soledad.common.document import SoledadDocument from test_soledad.util import BaseSoledadTest -from leap.soledad.common.crypto import WrongMacError -from leap.soledad.common.crypto import UnknownMacMethodError -from leap.soledad.common.crypto import ENC_JSON_KEY -from leap.soledad.common.crypto import ENC_SCHEME_KEY -from leap.soledad.common.crypto import MAC_KEY -from leap.soledad.common.crypto import MAC_METHOD_KEY +from leap.soledad.client import _crypto + +from twisted.trial import unittest +from twisted.internet import defer + + +snowden1 = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + + +class AESTest(unittest.TestCase): + + def test_chunked_encryption(self): + key = 'A' * 32 + iv = 'A' * 16 + + fd = BytesIO() + aes = _crypto.AESEncryptor(key, iv, fd) + + data = snowden1 + block = 16 + + for i in range(len(data)/block): + chunk = data[i * block:(i+1)*block] + aes.write(chunk) + aes.end() + + ciphertext_chunked = fd.getvalue() + ciphertext = _aes_encrypt(key, iv, data) + + assert ciphertext_chunked == ciphertext + + + def test_decrypt(self): + key = 'A' * 32 + iv = 'A' * 16 + + data = snowden1 + block = 16 + + ciphertext = _aes_encrypt(key, iv, data) + + fd = BytesIO() + aes = _crypto.AESDecryptor(key, iv, fd) + + for i in range(len(ciphertext)/block): + chunk = ciphertext[i * block:(i+1)*block] + aes.write(chunk) + aes.end() + + cleartext_chunked = fd.getvalue() + assert cleartext_chunked == data + + + +class BlobTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + @defer.inlineCallbacks + def test_blob_encryptor(self): + + inf = BytesIO() + inf.write(snowden1) + inf.seek(0) + outf = BytesIO() + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, result=outf, + secret='A' * 96, iv='B'*16) + + encrypted = yield blob.encrypt() + data = base64.urlsafe_b64decode(encrypted.getvalue()) + assert data[0] == '\x80' + ts, sch, meth = struct.unpack( + 'Qbb', data[1:11]) + assert sch == 1 + assert meth == 1 + iv = data[11:27] + assert iv == 'B' * 16 + doc_id = data[27:37] + assert doc_id == 'D-deadbeef' -class EncryptedSyncTestCase(BaseSoledadTest): + rev = data[37:71] + assert rev == self.doc_info.rev - """ - Tests that guarantee that data will always be encrypted when syncing. - """ + ciphertext = data[71:-64] + aes_key = _crypto._get_sym_key_for_doc( + self.doc_info.doc_id, 'A'*96) + assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1) - def test_encrypt_decrypt_json(self): + decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext) + assert str(decrypted) == snowden1 + + + @defer.inlineCallbacks + def test_blob_decryptor(self): + + inf = BytesIO() + inf.write(snowden1) + inf.seek(0) + outf = BytesIO() + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, result=outf, + secret='A' * 96, iv='B' * 16) + yield blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, outf, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted.getvalue() == snowden1 + + + @defer.inlineCallbacks + def test_encrypt_and_decrypt(self): + """ + Check that encrypting and decrypting gives same doc. """ - Test encrypting and decrypting documents. + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + assert encrypted != payload + assert 'raw' in encrypted + doc2 = SoledadDocument('id1', '1') + doc2.set_json(encrypted) + decrypted = yield crypto.decrypt_doc(doc2) + assert len(decrypted) != 0 + assert json.loads(decrypted) == payload + + + @defer.inlineCallbacks + def test_decrypt_with_wrong_mac_raises(self): """ - simpledoc = {'key': 'val'} - doc1 = SoledadDocument(doc_id='id') - doc1.content = simpledoc - - # encrypt doc - doc1.set_json(self._soledad._crypto.encrypt_doc(doc1)) - # assert content is different and includes keys - self.assertNotEqual( - simpledoc, doc1.content, - 'incorrect document encryption') - self.assertTrue(ENC_JSON_KEY in doc1.content) - self.assertTrue(ENC_SCHEME_KEY in doc1.content) - # decrypt doc - doc1.set_json(self._soledad._crypto.decrypt_doc(doc1)) - self.assertEqual( - simpledoc, doc1.content, 'incorrect document encryption') + Trying to decrypt a document with wrong MAC should raise. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + encdict = json.loads(encrypted) + raw = base64.urlsafe_b64decode(str(encdict['raw'])) + # mess with MAC + messed = raw[:-64] + '0' * 64 + newraw = base64.urlsafe_b64encode(str(messed)) + doc2 = SoledadDocument('id1', '1') + doc2.set_json(json.dumps({"raw": str(newraw)})) + + with pytest.raises(_crypto.InvalidBlob): + decrypted = yield crypto.decrypt_doc(doc2) + class RecoveryDocumentTestCase(BaseSoledadTest): @@ -146,60 +283,22 @@ class SoledadSecretsTestCase(BaseSoledadTest): "Should have a secret at this point") -class MacAuthTestCase(BaseSoledadTest): - - def test_decrypt_with_wrong_mac_raises(self): - """ - Trying to decrypt a document with wrong MAC should raise. - """ - simpledoc = {'key': 'val'} - doc = SoledadDocument(doc_id='id') - doc.content = simpledoc - # encrypt doc - doc.set_json(self._soledad._crypto.encrypt_doc(doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) - # mess with MAC - doc.content[MAC_KEY] = '1234567890ABCDEF' - # try to decrypt doc - self.assertRaises( - WrongMacError, - self._soledad._crypto.decrypt_doc, doc) - - def test_decrypt_with_unknown_mac_method_raises(self): - """ - Trying to decrypt a document with unknown MAC method should raise. - """ - simpledoc = {'key': 'val'} - doc = SoledadDocument(doc_id='id') - doc.content = simpledoc - # encrypt doc - doc.set_json(self._soledad._crypto.encrypt_doc(doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) - # mess with MAC method - doc.content[MAC_METHOD_KEY] = 'mymac' - # try to decrypt doc - self.assertRaises( - UnknownMacMethodError, - self._soledad._crypto.decrypt_doc, doc) - class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): # generate 256-bit key key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - plaintext = crypto.decrypt_sym(cyphertext, key, iv) + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) self.assertEqual('data', plaintext) def test_decrypt_with_wrong_iv_fails(self): key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -208,13 +307,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): wrongiv = rawiv while wrongiv == rawiv: wrongiv = os.urandom(1) + rawiv[1:] - plaintext = crypto.decrypt_sym( + plaintext = _crypto.decrypt_sym( cyphertext, key, iv=binascii.b2a_base64(wrongiv)) self.assertNotEqual('data', plaintext) def test_decrypt_with_wrong_key_fails(self): key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -222,5 +321,19 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): # ensure keys are different in case we are extremely lucky while wrongkey == key: wrongkey = os.urandom(32) - plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv) + plaintext = _crypto.decrypt_sym(cyphertext, wrongkey, iv) self.assertNotEqual('data', plaintext) + + +def _aes_encrypt(key, iv, data): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + encryptor = cipher.encryptor() + return encryptor.update(data) + encryptor.finalize() + + +def _aes_decrypt(key, iv, data): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + decryptor = cipher.decryptor() + return decryptor.update(data) + decryptor.finalize() diff --git a/testing/tests/client/test_crypto2.py b/testing/tests/client/test_crypto2.py deleted file mode 100644 index f0f6c4af..00000000 --- a/testing/tests/client/test_crypto2.py +++ /dev/null @@ -1,171 +0,0 @@ -# -*- coding: utf-8 -*- -# test_crypto2.py -# Copyright (C) 2016 LEAP Encryption Access Project -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -Tests for the _crypto module -""" - -import base64 -import binascii -import time -import struct -import StringIO - -import leap.soledad.client -from leap.soledad.client import _crypto - -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend - -from twisted.trial import unittest - - -snowden1 = ( - "You can't come up against " - "the world's most powerful intelligence " - "agencies and not accept the risk. " - "If they want to get you, over time " - "they will.") - - -def _aes_encrypt(key, iv, data): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) - encryptor = cipher.encryptor() - return encryptor.update(data) + encryptor.finalize() - -def _aes_decrypt(key, iv, data): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) - decryptor = cipher.decryptor() - return decryptor.update(data) + decryptor.finalize() - - -def test_chunked_encryption(): - key = 'A' * 32 - iv = 'A' * 16 - - fd = StringIO.StringIO() - aes = _crypto.AESEncryptor(key, iv, fd) - - data = snowden1 - block = 16 - - for i in range(len(data)/block): - chunk = data[i * block:(i+1)*block] - aes.write(chunk) - aes.end() - - ciphertext_chunked = fd.getvalue() - ciphertext = _aes_encrypt(key, iv, data) - - assert ciphertext_chunked == ciphertext - - -def test_decrypt(): - key = 'A' * 32 - iv = 'A' * 16 - - data = snowden1 - block = 16 - - ciphertext = _aes_encrypt(key, iv, data) - - fd = StringIO.StringIO() - aes = _crypto.AESDecryptor(key, iv, fd) - - for i in range(len(ciphertext)/block): - chunk = ciphertext[i * block:(i+1)*block] - aes.write(chunk) - aes.end() - - cleartext_chunked = fd.getvalue() - assert cleartext_chunked == data - - - -class BlobTestCase(unittest.TestCase): - - class doc_info: - doc_id = 'D-deadbeef' - rev = '397932e0c77f45fcb7c3732930e7e9b2:1' - - def test_blob_encryptor(self): - - inf = StringIO.StringIO() - inf.write(snowden1) - inf.seek(0) - outf = StringIO.StringIO() - - blob = _crypto.BlobEncryptor( - self.doc_info, inf, result=outf, - secret='A' * 96, iv='B'*16) - - d = blob.encrypt() - d.addCallback(self._test_blob_encryptor_cb, outf) - return d - - def _test_blob_encryptor_cb(self, _, outf): - encrypted = outf.getvalue() - data = base64.urlsafe_b64decode(encrypted) - - assert data[0] == '\x80' - ts, sch, meth = struct.unpack( - 'Qbb', data[1:11]) - assert sch == 1 - assert meth == 1 - iv = data[11:27] - assert iv == 'B' * 16 - doc_id = data[27:37] - assert doc_id == 'D-deadbeef' - - rev = data[37:71] - assert rev == self.doc_info.rev - - ciphertext = data[71:-64] - aes_key = _crypto._get_sym_key_for_doc( - self.doc_info.doc_id, 'A'*96) - assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1) - - decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext) - assert str(decrypted) == snowden1 - - def test_blob_decryptor(self): - - inf = StringIO.StringIO() - inf.write(snowden1) - inf.seek(0) - outf = StringIO.StringIO() - - blob = _crypto.BlobEncryptor( - self.doc_info, inf, result=outf, - secret='A' * 96, iv='B' * 16) - - def do_decrypt(_, outf): - decryptor = _crypto.BlobDecryptor( - self.doc_info, outf, - secret='A' * 96) - d = decryptor.decrypt() - return d - - d = blob.encrypt() - d.addCallback(do_decrypt, outf) - d.addCallback(self._test_blob_decryptor_cb) - return d - - def _test_blob_decryptor_cb(self, decrypted): - assert decrypted.getvalue() == snowden1 -- cgit v1.2.3 From 529dbdf27804f12da80907d25c412d10e9fa3763 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 17 Nov 2016 01:33:04 -0300 Subject: [style] fix pep8 and confs Fixes setup.cfg, adding current exclude rules, simplified tox.ini to use setup.cfg and fixed all. --- testing/tests/client/test_crypto.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index dc3054f2..483c7803 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -59,8 +59,8 @@ class AESTest(unittest.TestCase): data = snowden1 block = 16 - for i in range(len(data)/block): - chunk = data[i * block:(i+1)*block] + for i in range(len(data) / block): + chunk = data[i * block:(i + 1) * block] aes.write(chunk) aes.end() @@ -69,7 +69,6 @@ class AESTest(unittest.TestCase): assert ciphertext_chunked == ciphertext - def test_decrypt(self): key = 'A' * 32 iv = 'A' * 16 @@ -82,8 +81,8 @@ class AESTest(unittest.TestCase): fd = BytesIO() aes = _crypto.AESDecryptor(key, iv, fd) - for i in range(len(ciphertext)/block): - chunk = ciphertext[i * block:(i+1)*block] + for i in range(len(ciphertext) / block): + chunk = ciphertext[i * block:(i + 1) * block] aes.write(chunk) aes.end() @@ -91,7 +90,6 @@ class AESTest(unittest.TestCase): assert cleartext_chunked == data - class BlobTestCase(unittest.TestCase): class doc_info: @@ -108,13 +106,13 @@ class BlobTestCase(unittest.TestCase): blob = _crypto.BlobEncryptor( self.doc_info, inf, result=outf, - secret='A' * 96, iv='B'*16) + secret='A' * 96, iv='B' * 16) encrypted = yield blob.encrypt() data = base64.urlsafe_b64decode(encrypted.getvalue()) assert data[0] == '\x80' - ts, sch, meth = struct.unpack( + ts, sch, meth = struct.unpack( 'Qbb', data[1:11]) assert sch == 1 assert meth == 1 @@ -128,13 +126,12 @@ class BlobTestCase(unittest.TestCase): ciphertext = data[71:-64] aes_key = _crypto._get_sym_key_for_doc( - self.doc_info.doc_id, 'A'*96) - assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1) + self.doc_info.doc_id, 'A' * 96) + assert ciphertext == _aes_encrypt(aes_key, 'B' * 16, snowden1) - decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext) + decrypted = _aes_decrypt(aes_key, 'B' * 16, ciphertext) assert str(decrypted) == snowden1 - @defer.inlineCallbacks def test_blob_decryptor(self): @@ -154,7 +151,6 @@ class BlobTestCase(unittest.TestCase): decrypted = yield decryptor.decrypt() assert decrypted.getvalue() == snowden1 - @defer.inlineCallbacks def test_encrypt_and_decrypt(self): """ @@ -173,7 +169,6 @@ class BlobTestCase(unittest.TestCase): assert len(decrypted) != 0 assert json.loads(decrypted) == payload - @defer.inlineCallbacks def test_decrypt_with_wrong_mac_raises(self): """ @@ -193,8 +188,7 @@ class BlobTestCase(unittest.TestCase): doc2.set_json(json.dumps({"raw": str(newraw)})) with pytest.raises(_crypto.InvalidBlob): - decrypted = yield crypto.decrypt_doc(doc2) - + yield crypto.decrypt_doc(doc2) class RecoveryDocumentTestCase(BaseSoledadTest): @@ -283,7 +277,6 @@ class SoledadSecretsTestCase(BaseSoledadTest): "Should have a secret at this point") - class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): -- cgit v1.2.3 From 87259d4210e3488b00876d7ec83a8cc21e341712 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 17 Nov 2016 22:16:28 -0200 Subject: [test] add test for deprecated crypto format update --- testing/tests/client/test_deprecated_crypto.py | 67 ++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 testing/tests/client/test_deprecated_crypto.py (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py new file mode 100644 index 00000000..ca1b1558 --- /dev/null +++ b/testing/tests/client/test_deprecated_crypto.py @@ -0,0 +1,67 @@ +import json +import pytest + +from leap.soledad.client import crypto as old_crypto +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common import crypto as common_crypto + +from test_soledad.u1db_tests import simple_doc + + +def deprecate_client_crypto(client): + secret = client._crypto.secret + _crypto = old_crypto.SoledadCrypto(secret) + setattr(client._dbsyncer, '_crypto', _crypto) + return client + + +def couch_database(couch_url, uuid): + db = CouchDatabase(couch_url, "user-%s" % (uuid,)) + return db + + +@pytest.inlineCallbacks +def test_touch_updates_remote_representation( + soledad_client, request): + + client = soledad_client() + deprecated_client = deprecate_client_crypto(soledad_client()) + + couch_url = request.config.option.couch_url + remote = couch_database(couch_url, client._uuid) + + # ensure remote db is empty + gen, docs = remote.get_all_docs() + assert gen == 0 + assert len(docs) == 0 + + # create a doc with deprecated client and sync + yield deprecated_client.create_doc(json.loads(simple_doc)) + yield deprecated_client.sync() + + # check for doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 1 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert common_crypto.ENC_JSON_KEY in content + assert common_crypto.ENC_SCHEME_KEY in content + assert common_crypto.ENC_METHOD_KEY in content + assert common_crypto.ENC_IV_KEY in content + assert common_crypto.MAC_KEY in content + assert common_crypto.MAC_METHOD_KEY in content + + # "touch" the document with a newer client and synx + _, docs = yield client.get_all_docs() + yield client.put_doc(doc) + yield client.sync() + + # check for newer representation of doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 2 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert len(content) == 1 + assert 'raw' in content -- cgit v1.2.3 From 1804b5f74a4efa6b25c06fe353ac960fd42e4fb6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 19 Nov 2016 04:13:52 -0300 Subject: [tests] migrate pytest to trial test_deprecated_crypto was using pytest, which unfortunately doesnt work when mixed with trial. Migrated back. Also added norecursedirs option back, as it is necessary for parallel testing mode. --- testing/tests/client/test_deprecated_crypto.py | 116 +++++++++++++++---------- 1 file changed, 70 insertions(+), 46 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py index ca1b1558..8ee3735c 100644 --- a/testing/tests/client/test_deprecated_crypto.py +++ b/testing/tests/client/test_deprecated_crypto.py @@ -1,11 +1,16 @@ import json -import pytest +from twisted.internet import defer +from uuid import uuid4 +from urlparse import urljoin from leap.soledad.client import crypto as old_crypto from leap.soledad.common.couch import CouchDatabase from leap.soledad.common import crypto as common_crypto from test_soledad.u1db_tests import simple_doc +from test_soledad.util import SoledadWithCouchServerMixin +from test_soledad.util import make_token_soledad_app +from test_soledad.u1db_tests import TestCaseWithServer def deprecate_client_crypto(client): @@ -20,48 +25,67 @@ def couch_database(couch_url, uuid): return db -@pytest.inlineCallbacks -def test_touch_updates_remote_representation( - soledad_client, request): - - client = soledad_client() - deprecated_client = deprecate_client_crypto(soledad_client()) - - couch_url = request.config.option.couch_url - remote = couch_database(couch_url, client._uuid) - - # ensure remote db is empty - gen, docs = remote.get_all_docs() - assert gen == 0 - assert len(docs) == 0 - - # create a doc with deprecated client and sync - yield deprecated_client.create_doc(json.loads(simple_doc)) - yield deprecated_client.sync() - - # check for doc in remote db - gen, docs = remote.get_all_docs() - assert gen == 1 - assert len(docs) == 1 - doc = docs.pop() - content = doc.content - assert common_crypto.ENC_JSON_KEY in content - assert common_crypto.ENC_SCHEME_KEY in content - assert common_crypto.ENC_METHOD_KEY in content - assert common_crypto.ENC_IV_KEY in content - assert common_crypto.MAC_KEY in content - assert common_crypto.MAC_METHOD_KEY in content - - # "touch" the document with a newer client and synx - _, docs = yield client.get_all_docs() - yield client.put_doc(doc) - yield client.sync() - - # check for newer representation of doc in remote db - gen, docs = remote.get_all_docs() - assert gen == 2 - assert len(docs) == 1 - doc = docs.pop() - content = doc.content - assert len(content) == 1 - assert 'raw' in content +class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer): + + def setUp(self): + SoledadWithCouchServerMixin.setUp(self) + TestCaseWithServer.setUp(self) + + def tearDown(self): + SoledadWithCouchServerMixin.tearDown(self) + TestCaseWithServer.tearDown(self) + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + @defer.inlineCallbacks + def test_touch_updates_remote_representation(self): + self.startTwistedServer() + user = 'user-' + uuid4().hex + server_url = 'http://%s:%d' % (self.server_address) + client = self._soledad_instance(user=user, server_url=server_url) + deprecated_client = deprecate_client_crypto( + self._soledad_instance(user=user, server_url=server_url)) + + self.make_app() + remote = self.request_state._create_database(replica_uid=client._uuid) + remote = CouchDatabase.open_database( + urljoin(self.couch_url, 'user-' + user), + create=True) + + # ensure remote db is empty + gen, docs = remote.get_all_docs() + assert gen == 0 + assert len(docs) == 0 + + # create a doc with deprecated client and sync + yield deprecated_client.create_doc(json.loads(simple_doc)) + yield deprecated_client.sync() + + # check for doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 1 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert common_crypto.ENC_JSON_KEY in content + assert common_crypto.ENC_SCHEME_KEY in content + assert common_crypto.ENC_METHOD_KEY in content + assert common_crypto.ENC_IV_KEY in content + assert common_crypto.MAC_KEY in content + assert common_crypto.MAC_METHOD_KEY in content + + # "touch" the document with a newer client and synx + _, docs = yield client.get_all_docs() + yield client.put_doc(doc) + yield client.sync() + + # check for newer representation of doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 2 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert len(content) == 1 + assert 'raw' in content -- cgit v1.2.3 From 1c8e3359734831562fca76b529c0b1f95af565d5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 25 Nov 2016 19:55:36 -0300 Subject: [refactor] Hide IV, simplify some calls IV was being set during tests and this required some defensive coding to avoid IV being set in production. This commits makes the test use the generated IV and "hides" it using a read-only property to let it clear this should never happen. Also refactored out some parameters that are generated automatically to reduce some lines of code and enhance readability. --- testing/tests/client/test_crypto.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 483c7803..6d896604 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -51,10 +51,10 @@ class AESTest(unittest.TestCase): def test_chunked_encryption(self): key = 'A' * 32 - iv = 'A' * 16 fd = BytesIO() - aes = _crypto.AESEncryptor(key, iv, fd) + aes = _crypto.AESEncryptor(key, fd) + iv = aes.iv data = snowden1 block = 16 @@ -99,14 +99,11 @@ class BlobTestCase(unittest.TestCase): @defer.inlineCallbacks def test_blob_encryptor(self): - inf = BytesIO() - inf.write(snowden1) - inf.seek(0) - outf = BytesIO() + inf = BytesIO(snowden1) blob = _crypto.BlobEncryptor( - self.doc_info, inf, result=outf, - secret='A' * 96, iv='B' * 16) + self.doc_info, inf, + secret='A' * 96) encrypted = yield blob.encrypt() data = base64.urlsafe_b64decode(encrypted.getvalue()) @@ -117,7 +114,7 @@ class BlobTestCase(unittest.TestCase): assert sch == 1 assert meth == 1 iv = data[11:27] - assert iv == 'B' * 16 + assert iv == blob.iv doc_id = data[27:37] assert doc_id == 'D-deadbeef' @@ -127,26 +124,23 @@ class BlobTestCase(unittest.TestCase): ciphertext = data[71:-64] aes_key = _crypto._get_sym_key_for_doc( self.doc_info.doc_id, 'A' * 96) - assert ciphertext == _aes_encrypt(aes_key, 'B' * 16, snowden1) + assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1) - decrypted = _aes_decrypt(aes_key, 'B' * 16, ciphertext) + decrypted = _aes_decrypt(aes_key, blob.iv, ciphertext) assert str(decrypted) == snowden1 @defer.inlineCallbacks def test_blob_decryptor(self): - inf = BytesIO() - inf.write(snowden1) - inf.seek(0) - outf = BytesIO() + inf = BytesIO(snowden1) blob = _crypto.BlobEncryptor( - self.doc_info, inf, result=outf, - secret='A' * 96, iv='B' * 16) - yield blob.encrypt() + self.doc_info, inf, + secret='A' * 96) + ciphertext = yield blob.encrypt() decryptor = _crypto.BlobDecryptor( - self.doc_info, outf, + self.doc_info, ciphertext, secret='A' * 96) decrypted = yield decryptor.decrypt() assert decrypted.getvalue() == snowden1 -- cgit v1.2.3 From e65cb7bfecd530252e86878dfec117c2793aa04b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 26 Nov 2016 01:11:28 -0300 Subject: [feature] delimit preamble from ciphertext We now encode preamble and ciphertext+hmac in two distinct payloads separated by a space. This allows metadata to be extracted and used before decoding the whole document. It also introduces a single packer for packing and unpacking of data instead of reads and writes. Downside: doc_id and rev are limited to 255 chars now. --- testing/tests/client/test_crypto.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 6d896604..78da8d24 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -22,7 +22,6 @@ import base64 import hashlib import json import os -import struct from io import BytesIO @@ -106,22 +105,19 @@ class BlobTestCase(unittest.TestCase): secret='A' * 96) encrypted = yield blob.encrypt() - data = base64.urlsafe_b64decode(encrypted.getvalue()) + preamble, ciphertext = _crypto._split(encrypted.getvalue()) + ciphertext = ciphertext[:-64] - assert data[0] == '\x80' - ts, sch, meth = struct.unpack( - 'Qbb', data[1:11]) + assert len(preamble) == _crypto.PACMAN.size + unpacked_data = _crypto.PACMAN.unpack(preamble) + pad, ts, sch, meth, iv, doc_id, rev = unpacked_data + assert pad == '\x80' assert sch == 1 assert meth == 1 - iv = data[11:27] assert iv == blob.iv - doc_id = data[27:37] assert doc_id == 'D-deadbeef' - - rev = data[37:71] assert rev == self.doc_info.rev - ciphertext = data[71:-64] aes_key = _crypto._get_sym_key_for_doc( self.doc_info.doc_id, 'A' * 96) assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1) @@ -159,6 +155,7 @@ class BlobTestCase(unittest.TestCase): assert 'raw' in encrypted doc2 = SoledadDocument('id1', '1') doc2.set_json(encrypted) + assert _crypto.is_symmetrically_encrypted(doc2) decrypted = yield crypto.decrypt_doc(doc2) assert len(decrypted) != 0 assert json.loads(decrypted) == payload @@ -174,10 +171,12 @@ class BlobTestCase(unittest.TestCase): encrypted = yield crypto.encrypt_doc(doc1) encdict = json.loads(encrypted) - raw = base64.urlsafe_b64decode(str(encdict['raw'])) + preamble, raw = _crypto._split(str(encdict['raw'])) # mess with MAC messed = raw[:-64] + '0' * 64 - newraw = base64.urlsafe_b64encode(str(messed)) + + preamble = base64.urlsafe_b64encode(preamble) + newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) doc2 = SoledadDocument('id1', '1') doc2.set_json(json.dumps({"raw": str(newraw)})) -- cgit v1.2.3 From 42082cfa648ec10612823086e72dc2a70a0e773c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 26 Nov 2016 18:09:26 -0300 Subject: [feature] make _crypto stream on decryption We are already doing this on encryption, now we can stream also from decryption. This unblocks the reactor and will be valuable for blobs-io. --- testing/tests/client/test_crypto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 78da8d24..863873f7 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -139,7 +139,7 @@ class BlobTestCase(unittest.TestCase): self.doc_info, ciphertext, secret='A' * 96) decrypted = yield decryptor.decrypt() - assert decrypted.getvalue() == snowden1 + assert decrypted == snowden1 @defer.inlineCallbacks def test_encrypt_and_decrypt(self): -- cgit v1.2.3 From bae95c183e68481db0fe36f066cd14c97bff3013 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 26 Nov 2016 21:26:23 -0300 Subject: [refactor] simplify _crypto After adding the streaming decrypt, some classes were doing almost the same thing. Unified them. Also fixed some module level variables to upper case and some class name to camel case. --- testing/tests/client/test_crypto.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 863873f7..7643f75d 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -52,7 +52,7 @@ class AESTest(unittest.TestCase): key = 'A' * 32 fd = BytesIO() - aes = _crypto.AESEncryptor(key, fd) + aes = _crypto.AESConsumer(key, _buffer=fd) iv = aes.iv data = snowden1 @@ -78,7 +78,8 @@ class AESTest(unittest.TestCase): ciphertext = _aes_encrypt(key, iv, data) fd = BytesIO() - aes = _crypto.AESDecryptor(key, iv, fd) + operation = _crypto.AESConsumer.decrypt + aes = _crypto.AESConsumer(key, iv, fd, operation) for i in range(len(ciphertext) / block): chunk = ciphertext[i * block:(i + 1) * block] -- cgit v1.2.3 From dc80d2b59edd14ab463dc74e5fa19d1a04c27ca1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 27 Nov 2016 02:25:07 -0300 Subject: [refactor] introduces a GenericWriter AESWriter and HMACWriter are just applying hmac or aes into a flow of data. Abstracted the application of those operations into a super class and highlighted just the difference on each implementation. --- testing/tests/client/test_crypto.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 7643f75d..aad588c0 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -52,7 +52,7 @@ class AESTest(unittest.TestCase): key = 'A' * 32 fd = BytesIO() - aes = _crypto.AESConsumer(key, _buffer=fd) + aes = _crypto.AESWriter(key, _buffer=fd) iv = aes.iv data = snowden1 @@ -78,8 +78,7 @@ class AESTest(unittest.TestCase): ciphertext = _aes_encrypt(key, iv, data) fd = BytesIO() - operation = _crypto.AESConsumer.decrypt - aes = _crypto.AESConsumer(key, iv, fd, operation) + aes = _crypto.AESWriter(key, iv, fd, encrypt=False) for i in range(len(ciphertext) / block): chunk = ciphertext[i * block:(i + 1) * block] -- cgit v1.2.3 From 694e5670da53e923cf809948e400cd546154162b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 30 Nov 2016 00:07:24 -0300 Subject: [refactor] improve blob signature magic usage Our magic value wasn't being used and were represented as a string. Refactored it to a constant, increased it's size to 2 bytes and optimzed is_symmetrically_encrypted to look for the magic and symmetrically encrypted flag under base64 encoding. Most file types will use this feature to help identifying themselves, so it got refactored to serve the purpose it was created. --- testing/tests/client/test_crypto.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index aad588c0..33a660c9 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -110,8 +110,8 @@ class BlobTestCase(unittest.TestCase): assert len(preamble) == _crypto.PACMAN.size unpacked_data = _crypto.PACMAN.unpack(preamble) - pad, ts, sch, meth, iv, doc_id, rev = unpacked_data - assert pad == '\x80' + magic, sch, meth, ts, iv, doc_id, rev = unpacked_data + assert magic == _crypto.BLOB_SIGNATURE_MAGIC assert sch == 1 assert meth == 1 assert iv == blob.iv @@ -155,7 +155,7 @@ class BlobTestCase(unittest.TestCase): assert 'raw' in encrypted doc2 = SoledadDocument('id1', '1') doc2.set_json(encrypted) - assert _crypto.is_symmetrically_encrypted(doc2) + assert _crypto.is_symmetrically_encrypted(encrypted) decrypted = yield crypto.decrypt_doc(doc2) assert len(decrypted) != 0 assert json.loads(decrypted) == payload -- cgit v1.2.3 From 349a49d2be011a428023a4ece14001fda57e65c4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 6 Dec 2016 23:16:28 -0300 Subject: [feature] use GCM instead of CTR+HMAC Resolves: #8668 - client: substitute usage of CTR mode + HMAC by GCM cipher mode Signed-off-by: Victor Shyba --- testing/tests/client/test_crypto.py | 48 ++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 22 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 33a660c9..10acba56 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -29,6 +29,7 @@ import pytest from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend +from cryptography.exceptions import InvalidTag from leap.soledad.common.document import SoledadDocument from test_soledad.util import BaseSoledadTest @@ -64,7 +65,7 @@ class AESTest(unittest.TestCase): aes.end() ciphertext_chunked = fd.getvalue() - ciphertext = _aes_encrypt(key, iv, data) + ciphertext, tag = _aes_encrypt(key, iv, data) assert ciphertext_chunked == ciphertext @@ -75,10 +76,10 @@ class AESTest(unittest.TestCase): data = snowden1 block = 16 - ciphertext = _aes_encrypt(key, iv, data) + ciphertext, tag = _aes_encrypt(key, iv, data) fd = BytesIO() - aes = _crypto.AESWriter(key, iv, fd, encrypt=False) + aes = _crypto.AESWriter(key, iv, fd, tag=tag) for i in range(len(ciphertext) / block): chunk = ciphertext[i * block:(i + 1) * block] @@ -106,7 +107,7 @@ class BlobTestCase(unittest.TestCase): encrypted = yield blob.encrypt() preamble, ciphertext = _crypto._split(encrypted.getvalue()) - ciphertext = ciphertext[:-64] + ciphertext = ciphertext[:-16] assert len(preamble) == _crypto.PACMAN.size unpacked_data = _crypto.PACMAN.unpack(preamble) @@ -120,9 +121,10 @@ class BlobTestCase(unittest.TestCase): aes_key = _crypto._get_sym_key_for_doc( self.doc_info.doc_id, 'A' * 96) - assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1) + assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0] - decrypted = _aes_decrypt(aes_key, blob.iv, ciphertext) + decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext, + preamble) assert str(decrypted) == snowden1 @defer.inlineCallbacks @@ -173,7 +175,7 @@ class BlobTestCase(unittest.TestCase): encdict = json.loads(encrypted) preamble, raw = _crypto._split(str(encdict['raw'])) # mess with MAC - messed = raw[:-64] + '0' * 64 + messed = raw[:-16] + '0' * 16 preamble = base64.urlsafe_b64encode(preamble) newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) @@ -275,16 +277,16 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): # generate 256-bit key key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) + iv, tag, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - plaintext = _crypto.decrypt_sym(cyphertext, key, iv) + plaintext = _crypto.decrypt_sym(cyphertext, key, iv, tag) self.assertEqual('data', plaintext) - def test_decrypt_with_wrong_iv_fails(self): + def test_decrypt_with_wrong_iv_raises(self): key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) + iv, tag, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -293,13 +295,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): wrongiv = rawiv while wrongiv == rawiv: wrongiv = os.urandom(1) + rawiv[1:] - plaintext = _crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - self.assertNotEqual('data', plaintext) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv), tag=tag) - def test_decrypt_with_wrong_key_fails(self): + def test_decrypt_with_wrong_key_raises(self): key = os.urandom(32) - iv, cyphertext = _crypto.encrypt_sym('data', key) + iv, tag, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -307,19 +309,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): # ensure keys are different in case we are extremely lucky while wrongkey == key: wrongkey = os.urandom(32) - plaintext = _crypto.decrypt_sym(cyphertext, wrongkey, iv) - self.assertNotEqual('data', plaintext) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym(cyphertext, wrongkey, iv, tag) def _aes_encrypt(key, iv, data): backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) encryptor = cipher.encryptor() - return encryptor.update(data) + encryptor.finalize() + return encryptor.update(data) + encryptor.finalize(), encryptor.tag -def _aes_decrypt(key, iv, data): +def _aes_decrypt(key, iv, tag, data, aead=''): backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) decryptor = cipher.decryptor() + if aead: + decryptor.authenticate_additional_data(aead) return decryptor.update(data) + decryptor.finalize() -- cgit v1.2.3 From b3fcc5c5bddc73475596c4fe74e3402f0d5c021a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Dec 2016 01:24:53 -0300 Subject: [feature] Add retro compat on secrets.py ciphers Integrated the secrets's JSON key that specifies ciphers into _crypto and added optional GCM. Also added a test to check if both cipher types can be imported. Resolves: #8680 Signed-off-by: Victor Shyba --- testing/tests/client/test_crypto.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 10acba56..277d5430 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -114,7 +114,7 @@ class BlobTestCase(unittest.TestCase): magic, sch, meth, ts, iv, doc_id, rev = unpacked_data assert magic == _crypto.BLOB_SIGNATURE_MAGIC assert sch == 1 - assert meth == 1 + assert meth == _crypto.ENC_METHOD.aes_256_gcm assert iv == blob.iv assert doc_id == 'D-deadbeef' assert rev == self.doc_info.rev @@ -163,7 +163,7 @@ class BlobTestCase(unittest.TestCase): assert json.loads(decrypted) == payload @defer.inlineCallbacks - def test_decrypt_with_wrong_mac_raises(self): + def test_decrypt_with_wrong_tag_raises(self): """ Trying to decrypt a document with wrong MAC should raise. """ @@ -174,7 +174,7 @@ class BlobTestCase(unittest.TestCase): encrypted = yield crypto.encrypt_doc(doc1) encdict = json.loads(encrypted) preamble, raw = _crypto._split(str(encdict['raw'])) - # mess with MAC + # mess with tag messed = raw[:-16] + '0' * 16 preamble = base64.urlsafe_b64encode(preamble) @@ -205,8 +205,8 @@ class RecoveryDocumentTestCase(BaseSoledadTest): self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) - def test_import_recovery_document(self): - rd = self._soledad.secrets._export_recovery_document() + def test_import_recovery_document(self, cipher='aes256'): + rd = self._soledad.secrets._export_recovery_document(cipher) s = self._soledad_instance() s.secrets._import_recovery_document(rd) s.secrets.set_secret_id(self._soledad.secrets._secret_id) @@ -215,6 +215,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest): 'Failed settinng secret for symmetric encryption.') s.close() + def test_import_GCM_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256_GCM + self.test_import_recovery_document(cipher) + + def test_import_legacy_CTR_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256 + self.test_import_recovery_document(cipher) + class SoledadSecretsTestCase(BaseSoledadTest): @@ -277,16 +285,16 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): # generate 256-bit key key = os.urandom(32) - iv, tag, cyphertext = _crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - plaintext = _crypto.decrypt_sym(cyphertext, key, iv, tag) + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) self.assertEqual('data', plaintext) def test_decrypt_with_wrong_iv_raises(self): key = os.urandom(32) - iv, tag, cyphertext = _crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -297,11 +305,11 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): wrongiv = os.urandom(1) + rawiv[1:] with pytest.raises(InvalidTag): _crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv), tag=tag) + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) def test_decrypt_with_wrong_key_raises(self): key = os.urandom(32) - iv, tag, cyphertext = _crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -310,7 +318,7 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): while wrongkey == key: wrongkey = os.urandom(32) with pytest.raises(InvalidTag): - _crypto.decrypt_sym(cyphertext, wrongkey, iv, tag) + _crypto.decrypt_sym(cyphertext, wrongkey, iv) def _aes_encrypt(key, iv, data): -- cgit v1.2.3 From 7877527fe64eaee1f7f107913a4a3dc78767a338 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Dec 2016 02:03:58 -0300 Subject: [feature] Change CTR to GCM on secrets.py Current implementation can allow tampering and the CTR->GCM exchange can help to avoid it. This commits also alters a behaviour where we moved ahead after failing to decrypt a recovery document. IMHO we can't move ahead as this is a fatal error. Signed-off-by: Victor Shyba --- testing/tests/client/test_aux_methods.py | 4 ++-- testing/tests/client/test_crypto.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'testing/tests/client') diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py index c25ff8ca..9b4a175f 100644 --- a/testing/tests/client/test_aux_methods.py +++ b/testing/tests/client/test_aux_methods.py @@ -21,10 +21,10 @@ import os from twisted.internet import defer -from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import Soledad from leap.soledad.client.adbapi import U1DBConnectionPool from leap.soledad.client.secrets import PassphraseTooShort +from leap.soledad.client.secrets import SecretsException from test_soledad.util import BaseSoledadTest @@ -108,7 +108,7 @@ class AuxMethodsTestCase(BaseSoledadTest): sol.change_passphrase(u'654321') sol.close() - with self.assertRaises(DatabaseAccessError): + with self.assertRaises(SecretsException): self._soledad_instance( 'leap@leap.se', passphrase=u'123', diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 277d5430..49a61438 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -200,8 +200,9 @@ class RecoveryDocumentTestCase(BaseSoledadTest): encrypted_secret = rd[ self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id] self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret) - self.assertTrue( - encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256') + self.assertEquals( + _crypto.ENC_METHOD.aes_256_gcm, + encrypted_secret[self._soledad.secrets.CIPHER_KEY]) self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) -- cgit v1.2.3