summaryrefslogtreecommitdiff
path: root/testing/tests/client
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/client')
-rw-r--r--testing/tests/client/test_aux_methods.py4
-rw-r--r--testing/tests/client/test_crypto.py282
-rw-r--r--testing/tests/client/test_deprecated_crypto.py91
3 files changed, 290 insertions, 87 deletions
diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py
index c25ff8ca..9b4a175f 100644
--- a/testing/tests/client/test_aux_methods.py
+++ b/testing/tests/client/test_aux_methods.py
@@ -21,10 +21,10 @@ import os
from twisted.internet import defer
-from leap.soledad.common.errors import DatabaseAccessError
from leap.soledad.client import Soledad
from leap.soledad.client.adbapi import U1DBConnectionPool
from leap.soledad.client.secrets import PassphraseTooShort
+from leap.soledad.client.secrets import SecretsException
from test_soledad.util import BaseSoledadTest
@@ -108,7 +108,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol.change_passphrase(u'654321')
sol.close()
- with self.assertRaises(DatabaseAccessError):
+ with self.assertRaises(SecretsException):
self._soledad_instance(
'leap@leap.se',
passphrase=u'123',
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 77252b46..49a61438 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -17,47 +17,173 @@
"""
Tests for cryptographic related stuff.
"""
-import os
-import hashlib
import binascii
+import base64
+import hashlib
+import json
+import os
+
+from io import BytesIO
+
+import pytest
+
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+from cryptography.exceptions import InvalidTag
-from leap.soledad.client import crypto
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
-from leap.soledad.common.crypto import WrongMacError
-from leap.soledad.common.crypto import UnknownMacMethodError
-from leap.soledad.common.crypto import ENC_JSON_KEY
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
-from leap.soledad.common.crypto import MAC_KEY
-from leap.soledad.common.crypto import MAC_METHOD_KEY
+from leap.soledad.client import _crypto
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+
+snowden1 = (
+ "You can't come up against "
+ "the world's most powerful intelligence "
+ "agencies and not accept the risk. "
+ "If they want to get you, over time "
+ "they will.")
+
+
+class AESTest(unittest.TestCase):
+
+ def test_chunked_encryption(self):
+ key = 'A' * 32
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, _buffer=fd)
+ iv = aes.iv
+
+ data = snowden1
+ block = 16
+
+ for i in range(len(data) / block):
+ chunk = data[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ ciphertext_chunked = fd.getvalue()
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ assert ciphertext_chunked == ciphertext
+
+ def test_decrypt(self):
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ data = snowden1
+ block = 16
+
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, iv, fd, tag=tag)
+
+ for i in range(len(ciphertext) / block):
+ chunk = ciphertext[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ cleartext_chunked = fd.getvalue()
+ assert cleartext_chunked == data
+
+
+class BlobTestCase(unittest.TestCase):
+
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ @defer.inlineCallbacks
+ def test_blob_encryptor(self):
+
+ inf = BytesIO(snowden1)
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf,
+ secret='A' * 96)
+ encrypted = yield blob.encrypt()
+ preamble, ciphertext = _crypto._split(encrypted.getvalue())
+ ciphertext = ciphertext[:-16]
-class EncryptedSyncTestCase(BaseSoledadTest):
+ assert len(preamble) == _crypto.PACMAN.size
+ unpacked_data = _crypto.PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ assert magic == _crypto.BLOB_SIGNATURE_MAGIC
+ assert sch == 1
+ assert meth == _crypto.ENC_METHOD.aes_256_gcm
+ assert iv == blob.iv
+ assert doc_id == 'D-deadbeef'
+ assert rev == self.doc_info.rev
- """
- Tests that guarantee that data will always be encrypted when syncing.
- """
+ aes_key = _crypto._get_sym_key_for_doc(
+ self.doc_info.doc_id, 'A' * 96)
+ assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0]
- def test_encrypt_decrypt_json(self):
+ decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext,
+ preamble)
+ assert str(decrypted) == snowden1
+
+ @defer.inlineCallbacks
+ def test_blob_decryptor(self):
+
+ inf = BytesIO(snowden1)
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf,
+ secret='A' * 96)
+ ciphertext = yield blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, ciphertext,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted == snowden1
+
+ @defer.inlineCallbacks
+ def test_encrypt_and_decrypt(self):
"""
- Test encrypting and decrypting documents.
+ Check that encrypting and decrypting gives same doc.
"""
- simpledoc = {'key': 'val'}
- doc1 = SoledadDocument(doc_id='id')
- doc1.content = simpledoc
-
- # encrypt doc
- doc1.set_json(self._soledad._crypto.encrypt_doc(doc1))
- # assert content is different and includes keys
- self.assertNotEqual(
- simpledoc, doc1.content,
- 'incorrect document encryption')
- self.assertTrue(ENC_JSON_KEY in doc1.content)
- self.assertTrue(ENC_SCHEME_KEY in doc1.content)
- # decrypt doc
- doc1.set_json(self._soledad._crypto.decrypt_doc(doc1))
- self.assertEqual(
- simpledoc, doc1.content, 'incorrect document encryption')
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ assert encrypted != payload
+ assert 'raw' in encrypted
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(encrypted)
+ assert _crypto.is_symmetrically_encrypted(encrypted)
+ decrypted = yield crypto.decrypt_doc(doc2)
+ assert len(decrypted) != 0
+ assert json.loads(decrypted) == payload
+
+ @defer.inlineCallbacks
+ def test_decrypt_with_wrong_tag_raises(self):
+ """
+ Trying to decrypt a document with wrong MAC should raise.
+ """
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ encdict = json.loads(encrypted)
+ preamble, raw = _crypto._split(str(encdict['raw']))
+ # mess with tag
+ messed = raw[:-16] + '0' * 16
+
+ preamble = base64.urlsafe_b64encode(preamble)
+ newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed))
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(json.dumps({"raw": str(newraw)}))
+
+ with pytest.raises(_crypto.InvalidBlob):
+ yield crypto.decrypt_doc(doc2)
class RecoveryDocumentTestCase(BaseSoledadTest):
@@ -74,13 +200,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
encrypted_secret = rd[
self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]
self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret)
- self.assertTrue(
- encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256')
+ self.assertEquals(
+ _crypto.ENC_METHOD.aes_256_gcm,
+ encrypted_secret[self._soledad.secrets.CIPHER_KEY])
self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)
self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret)
- def test_import_recovery_document(self):
- rd = self._soledad.secrets._export_recovery_document()
+ def test_import_recovery_document(self, cipher='aes256'):
+ rd = self._soledad.secrets._export_recovery_document(cipher)
s = self._soledad_instance()
s.secrets._import_recovery_document(rd)
s.secrets.set_secret_id(self._soledad.secrets._secret_id)
@@ -89,6 +216,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
'Failed settinng secret for symmetric encryption.')
s.close()
+ def test_import_GCM_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256_GCM
+ self.test_import_recovery_document(cipher)
+
+ def test_import_legacy_CTR_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256
+ self.test_import_recovery_document(cipher)
+
class SoledadSecretsTestCase(BaseSoledadTest):
@@ -146,60 +281,21 @@ class SoledadSecretsTestCase(BaseSoledadTest):
"Should have a secret at this point")
-class MacAuthTestCase(BaseSoledadTest):
-
- def test_decrypt_with_wrong_mac_raises(self):
- """
- Trying to decrypt a document with wrong MAC should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC
- doc.content[MAC_KEY] = '1234567890ABCDEF'
- # try to decrypt doc
- self.assertRaises(
- WrongMacError,
- self._soledad._crypto.decrypt_doc, doc)
-
- def test_decrypt_with_unknown_mac_method_raises(self):
- """
- Trying to decrypt a document with unknown MAC method should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC method
- doc.content[MAC_METHOD_KEY] = 'mymac'
- # try to decrypt doc
- self.assertRaises(
- UnknownMacMethodError,
- self._soledad._crypto.decrypt_doc, doc)
-
-
class SoledadCryptoAESTestCase(BaseSoledadTest):
def test_encrypt_decrypt_sym(self):
# generate 256-bit key
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- plaintext = crypto.decrypt_sym(cyphertext, key, iv)
+ plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
self.assertEqual('data', plaintext)
- def test_decrypt_with_wrong_iv_fails(self):
+ def test_decrypt_with_wrong_iv_raises(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -208,13 +304,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
wrongiv = rawiv
while wrongiv == rawiv:
wrongiv = os.urandom(1) + rawiv[1:]
- plaintext = crypto.decrypt_sym(
- cyphertext, key, iv=binascii.b2a_base64(wrongiv))
- self.assertNotEqual('data', plaintext)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(
+ cyphertext, key, iv=binascii.b2a_base64(wrongiv))
- def test_decrypt_with_wrong_key_fails(self):
+ def test_decrypt_with_wrong_key_raises(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -222,5 +318,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
# ensure keys are different in case we are extremely lucky
while wrongkey == key:
wrongkey = os.urandom(32)
- plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv)
- self.assertNotEqual('data', plaintext)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(cyphertext, wrongkey, iv)
+
+
+def _aes_encrypt(key, iv, data):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)
+ encryptor = cipher.encryptor()
+ return encryptor.update(data) + encryptor.finalize(), encryptor.tag
+
+
+def _aes_decrypt(key, iv, tag, data, aead=''):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
+ decryptor = cipher.decryptor()
+ if aead:
+ decryptor.authenticate_additional_data(aead)
+ return decryptor.update(data) + decryptor.finalize()
diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py
new file mode 100644
index 00000000..8ee3735c
--- /dev/null
+++ b/testing/tests/client/test_deprecated_crypto.py
@@ -0,0 +1,91 @@
+import json
+from twisted.internet import defer
+from uuid import uuid4
+from urlparse import urljoin
+
+from leap.soledad.client import crypto as old_crypto
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.common import crypto as common_crypto
+
+from test_soledad.u1db_tests import simple_doc
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import make_token_soledad_app
+from test_soledad.u1db_tests import TestCaseWithServer
+
+
+def deprecate_client_crypto(client):
+ secret = client._crypto.secret
+ _crypto = old_crypto.SoledadCrypto(secret)
+ setattr(client._dbsyncer, '_crypto', _crypto)
+ return client
+
+
+def couch_database(couch_url, uuid):
+ db = CouchDatabase(couch_url, "user-%s" % (uuid,))
+ return db
+
+
+class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ TestCaseWithServer.setUp(self)
+
+ def tearDown(self):
+ SoledadWithCouchServerMixin.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ @defer.inlineCallbacks
+ def test_touch_updates_remote_representation(self):
+ self.startTwistedServer()
+ user = 'user-' + uuid4().hex
+ server_url = 'http://%s:%d' % (self.server_address)
+ client = self._soledad_instance(user=user, server_url=server_url)
+ deprecated_client = deprecate_client_crypto(
+ self._soledad_instance(user=user, server_url=server_url))
+
+ self.make_app()
+ remote = self.request_state._create_database(replica_uid=client._uuid)
+ remote = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + user),
+ create=True)
+
+ # ensure remote db is empty
+ gen, docs = remote.get_all_docs()
+ assert gen == 0
+ assert len(docs) == 0
+
+ # create a doc with deprecated client and sync
+ yield deprecated_client.create_doc(json.loads(simple_doc))
+ yield deprecated_client.sync()
+
+ # check for doc in remote db
+ gen, docs = remote.get_all_docs()
+ assert gen == 1
+ assert len(docs) == 1
+ doc = docs.pop()
+ content = doc.content
+ assert common_crypto.ENC_JSON_KEY in content
+ assert common_crypto.ENC_SCHEME_KEY in content
+ assert common_crypto.ENC_METHOD_KEY in content
+ assert common_crypto.ENC_IV_KEY in content
+ assert common_crypto.MAC_KEY in content
+ assert common_crypto.MAC_METHOD_KEY in content
+
+ # "touch" the document with a newer client and synx
+ _, docs = yield client.get_all_docs()
+ yield client.put_doc(doc)
+ yield client.sync()
+
+ # check for newer representation of doc in remote db
+ gen, docs = remote.get_all_docs()
+ assert gen == 2
+ assert len(docs) == 1
+ doc = docs.pop()
+ content = doc.content
+ assert len(content) == 1
+ assert 'raw' in content