diff options
Diffstat (limited to 'testing/tests/client')
-rw-r--r-- | testing/tests/client/test_aux_methods.py | 4 | ||||
-rw-r--r-- | testing/tests/client/test_crypto.py | 282 | ||||
-rw-r--r-- | testing/tests/client/test_deprecated_crypto.py | 91 |
3 files changed, 290 insertions, 87 deletions
diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py index c25ff8ca..9b4a175f 100644 --- a/testing/tests/client/test_aux_methods.py +++ b/testing/tests/client/test_aux_methods.py @@ -21,10 +21,10 @@ import os from twisted.internet import defer -from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import Soledad from leap.soledad.client.adbapi import U1DBConnectionPool from leap.soledad.client.secrets import PassphraseTooShort +from leap.soledad.client.secrets import SecretsException from test_soledad.util import BaseSoledadTest @@ -108,7 +108,7 @@ class AuxMethodsTestCase(BaseSoledadTest): sol.change_passphrase(u'654321') sol.close() - with self.assertRaises(DatabaseAccessError): + with self.assertRaises(SecretsException): self._soledad_instance( 'leap@leap.se', passphrase=u'123', diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 77252b46..49a61438 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -17,47 +17,173 @@ """ Tests for cryptographic related stuff. """ -import os -import hashlib import binascii +import base64 +import hashlib +import json +import os + +from io import BytesIO + +import pytest + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.exceptions import InvalidTag -from leap.soledad.client import crypto from leap.soledad.common.document import SoledadDocument from test_soledad.util import BaseSoledadTest -from leap.soledad.common.crypto import WrongMacError -from leap.soledad.common.crypto import UnknownMacMethodError -from leap.soledad.common.crypto import ENC_JSON_KEY -from leap.soledad.common.crypto import ENC_SCHEME_KEY -from leap.soledad.common.crypto import MAC_KEY -from leap.soledad.common.crypto import MAC_METHOD_KEY +from leap.soledad.client import _crypto + +from twisted.trial import unittest +from twisted.internet import defer + + +snowden1 = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + + +class AESTest(unittest.TestCase): + + def test_chunked_encryption(self): + key = 'A' * 32 + + fd = BytesIO() + aes = _crypto.AESWriter(key, _buffer=fd) + iv = aes.iv + + data = snowden1 + block = 16 + + for i in range(len(data) / block): + chunk = data[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + ciphertext_chunked = fd.getvalue() + ciphertext, tag = _aes_encrypt(key, iv, data) + + assert ciphertext_chunked == ciphertext + + def test_decrypt(self): + key = 'A' * 32 + iv = 'A' * 16 + + data = snowden1 + block = 16 + + ciphertext, tag = _aes_encrypt(key, iv, data) + + fd = BytesIO() + aes = _crypto.AESWriter(key, iv, fd, tag=tag) + + for i in range(len(ciphertext) / block): + chunk = ciphertext[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + cleartext_chunked = fd.getvalue() + assert cleartext_chunked == data + + +class BlobTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + @defer.inlineCallbacks + def test_blob_encryptor(self): + + inf = BytesIO(snowden1) + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, + secret='A' * 96) + encrypted = yield blob.encrypt() + preamble, ciphertext = _crypto._split(encrypted.getvalue()) + ciphertext = ciphertext[:-16] -class EncryptedSyncTestCase(BaseSoledadTest): + assert len(preamble) == _crypto.PACMAN.size + unpacked_data = _crypto.PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev = unpacked_data + assert magic == _crypto.BLOB_SIGNATURE_MAGIC + assert sch == 1 + assert meth == _crypto.ENC_METHOD.aes_256_gcm + assert iv == blob.iv + assert doc_id == 'D-deadbeef' + assert rev == self.doc_info.rev - """ - Tests that guarantee that data will always be encrypted when syncing. - """ + aes_key = _crypto._get_sym_key_for_doc( + self.doc_info.doc_id, 'A' * 96) + assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0] - def test_encrypt_decrypt_json(self): + decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext, + preamble) + assert str(decrypted) == snowden1 + + @defer.inlineCallbacks + def test_blob_decryptor(self): + + inf = BytesIO(snowden1) + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, + secret='A' * 96) + ciphertext = yield blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted == snowden1 + + @defer.inlineCallbacks + def test_encrypt_and_decrypt(self): """ - Test encrypting and decrypting documents. + Check that encrypting and decrypting gives same doc. """ - simpledoc = {'key': 'val'} - doc1 = SoledadDocument(doc_id='id') - doc1.content = simpledoc - - # encrypt doc - doc1.set_json(self._soledad._crypto.encrypt_doc(doc1)) - # assert content is different and includes keys - self.assertNotEqual( - simpledoc, doc1.content, - 'incorrect document encryption') - self.assertTrue(ENC_JSON_KEY in doc1.content) - self.assertTrue(ENC_SCHEME_KEY in doc1.content) - # decrypt doc - doc1.set_json(self._soledad._crypto.decrypt_doc(doc1)) - self.assertEqual( - simpledoc, doc1.content, 'incorrect document encryption') + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + assert encrypted != payload + assert 'raw' in encrypted + doc2 = SoledadDocument('id1', '1') + doc2.set_json(encrypted) + assert _crypto.is_symmetrically_encrypted(encrypted) + decrypted = yield crypto.decrypt_doc(doc2) + assert len(decrypted) != 0 + assert json.loads(decrypted) == payload + + @defer.inlineCallbacks + def test_decrypt_with_wrong_tag_raises(self): + """ + Trying to decrypt a document with wrong MAC should raise. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + encdict = json.loads(encrypted) + preamble, raw = _crypto._split(str(encdict['raw'])) + # mess with tag + messed = raw[:-16] + '0' * 16 + + preamble = base64.urlsafe_b64encode(preamble) + newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) + doc2 = SoledadDocument('id1', '1') + doc2.set_json(json.dumps({"raw": str(newraw)})) + + with pytest.raises(_crypto.InvalidBlob): + yield crypto.decrypt_doc(doc2) class RecoveryDocumentTestCase(BaseSoledadTest): @@ -74,13 +200,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest): encrypted_secret = rd[ self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id] self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret) - self.assertTrue( - encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256') + self.assertEquals( + _crypto.ENC_METHOD.aes_256_gcm, + encrypted_secret[self._soledad.secrets.CIPHER_KEY]) self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) - def test_import_recovery_document(self): - rd = self._soledad.secrets._export_recovery_document() + def test_import_recovery_document(self, cipher='aes256'): + rd = self._soledad.secrets._export_recovery_document(cipher) s = self._soledad_instance() s.secrets._import_recovery_document(rd) s.secrets.set_secret_id(self._soledad.secrets._secret_id) @@ -89,6 +216,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest): 'Failed settinng secret for symmetric encryption.') s.close() + def test_import_GCM_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256_GCM + self.test_import_recovery_document(cipher) + + def test_import_legacy_CTR_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256 + self.test_import_recovery_document(cipher) + class SoledadSecretsTestCase(BaseSoledadTest): @@ -146,60 +281,21 @@ class SoledadSecretsTestCase(BaseSoledadTest): "Should have a secret at this point") -class MacAuthTestCase(BaseSoledadTest): - - def test_decrypt_with_wrong_mac_raises(self): - """ - Trying to decrypt a document with wrong MAC should raise. - """ - simpledoc = {'key': 'val'} - doc = SoledadDocument(doc_id='id') - doc.content = simpledoc - # encrypt doc - doc.set_json(self._soledad._crypto.encrypt_doc(doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) - # mess with MAC - doc.content[MAC_KEY] = '1234567890ABCDEF' - # try to decrypt doc - self.assertRaises( - WrongMacError, - self._soledad._crypto.decrypt_doc, doc) - - def test_decrypt_with_unknown_mac_method_raises(self): - """ - Trying to decrypt a document with unknown MAC method should raise. - """ - simpledoc = {'key': 'val'} - doc = SoledadDocument(doc_id='id') - doc.content = simpledoc - # encrypt doc - doc.set_json(self._soledad._crypto.encrypt_doc(doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) - # mess with MAC method - doc.content[MAC_METHOD_KEY] = 'mymac' - # try to decrypt doc - self.assertRaises( - UnknownMacMethodError, - self._soledad._crypto.decrypt_doc, doc) - - class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): # generate 256-bit key key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - plaintext = crypto.decrypt_sym(cyphertext, key, iv) + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) self.assertEqual('data', plaintext) - def test_decrypt_with_wrong_iv_fails(self): + def test_decrypt_with_wrong_iv_raises(self): key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -208,13 +304,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): wrongiv = rawiv while wrongiv == rawiv: wrongiv = os.urandom(1) + rawiv[1:] - plaintext = crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - self.assertNotEqual('data', plaintext) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - def test_decrypt_with_wrong_key_fails(self): + def test_decrypt_with_wrong_key_raises(self): key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -222,5 +318,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): # ensure keys are different in case we are extremely lucky while wrongkey == key: wrongkey = os.urandom(32) - plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv) - self.assertNotEqual('data', plaintext) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym(cyphertext, wrongkey, iv) + + +def _aes_encrypt(key, iv, data): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) + encryptor = cipher.encryptor() + return encryptor.update(data) + encryptor.finalize(), encryptor.tag + + +def _aes_decrypt(key, iv, tag, data, aead=''): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) + decryptor = cipher.decryptor() + if aead: + decryptor.authenticate_additional_data(aead) + return decryptor.update(data) + decryptor.finalize() diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py new file mode 100644 index 00000000..8ee3735c --- /dev/null +++ b/testing/tests/client/test_deprecated_crypto.py @@ -0,0 +1,91 @@ +import json +from twisted.internet import defer +from uuid import uuid4 +from urlparse import urljoin + +from leap.soledad.client import crypto as old_crypto +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common import crypto as common_crypto + +from test_soledad.u1db_tests import simple_doc +from test_soledad.util import SoledadWithCouchServerMixin +from test_soledad.util import make_token_soledad_app +from test_soledad.u1db_tests import TestCaseWithServer + + +def deprecate_client_crypto(client): + secret = client._crypto.secret + _crypto = old_crypto.SoledadCrypto(secret) + setattr(client._dbsyncer, '_crypto', _crypto) + return client + + +def couch_database(couch_url, uuid): + db = CouchDatabase(couch_url, "user-%s" % (uuid,)) + return db + + +class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer): + + def setUp(self): + SoledadWithCouchServerMixin.setUp(self) + TestCaseWithServer.setUp(self) + + def tearDown(self): + SoledadWithCouchServerMixin.tearDown(self) + TestCaseWithServer.tearDown(self) + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + @defer.inlineCallbacks + def test_touch_updates_remote_representation(self): + self.startTwistedServer() + user = 'user-' + uuid4().hex + server_url = 'http://%s:%d' % (self.server_address) + client = self._soledad_instance(user=user, server_url=server_url) + deprecated_client = deprecate_client_crypto( + self._soledad_instance(user=user, server_url=server_url)) + + self.make_app() + remote = self.request_state._create_database(replica_uid=client._uuid) + remote = CouchDatabase.open_database( + urljoin(self.couch_url, 'user-' + user), + create=True) + + # ensure remote db is empty + gen, docs = remote.get_all_docs() + assert gen == 0 + assert len(docs) == 0 + + # create a doc with deprecated client and sync + yield deprecated_client.create_doc(json.loads(simple_doc)) + yield deprecated_client.sync() + + # check for doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 1 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert common_crypto.ENC_JSON_KEY in content + assert common_crypto.ENC_SCHEME_KEY in content + assert common_crypto.ENC_METHOD_KEY in content + assert common_crypto.ENC_IV_KEY in content + assert common_crypto.MAC_KEY in content + assert common_crypto.MAC_METHOD_KEY in content + + # "touch" the document with a newer client and synx + _, docs = yield client.get_all_docs() + yield client.put_doc(doc) + yield client.sync() + + # check for newer representation of doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 2 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert len(content) == 1 + assert 'raw' in content |