summaryrefslogtreecommitdiff
path: root/testing
diff options
context:
space:
mode:
Diffstat (limited to 'testing')
-rw-r--r--testing/pytest.ini1
-rw-r--r--testing/test_soledad/util.py17
-rw-r--r--testing/tests/client/test_crypto.py263
-rw-r--r--testing/tests/client/test_crypto2.py171
-rw-r--r--testing/tests/sync/test_encdecpool.py48
-rw-r--r--testing/tests/sync/test_sqlcipher_sync.py14
-rw-r--r--testing/tox.ini3
7 files changed, 203 insertions, 314 deletions
diff --git a/testing/pytest.ini b/testing/pytest.ini
index 2d34c607..39d1e1c6 100644
--- a/testing/pytest.ini
+++ b/testing/pytest.ini
@@ -1,3 +1,4 @@
[pytest]
testpaths = tests
norecursedirs = tests/perf
+twisted = yes
diff --git a/testing/test_soledad/util.py b/testing/test_soledad/util.py
index d53f6cda..bde0b1b7 100644
--- a/testing/test_soledad/util.py
+++ b/testing/test_soledad/util.py
@@ -15,12 +15,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
"""
Utilities used by multiple test suites.
"""
-
import os
import random
import string
@@ -45,14 +43,13 @@ from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.couch.state import CouchServerState
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
from leap.soledad.client import Soledad
from leap.soledad.client import http_target
from leap.soledad.client import auth
-from leap.soledad.client.crypto import decrypt_doc_dict
from leap.soledad.client.sqlcipher import SQLCipherDatabase
from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client._crypto import is_symmetrically_encrypted
from leap.soledad.server import SoledadApp
from leap.soledad.server.auth import SoledadTokenAuthMiddleware
@@ -212,6 +209,7 @@ def soledad_sync_target(
# redefine the base leap test class so it inherits from twisted trial's
# TestCase. This is needed so trial knows that it has to manage a reactor and
# wait for deferreds returned by tests to be fired.
+
BaseLeapTest = type(
'BaseLeapTest', (unittest.TestCase,), dict(BaseLeapTest.__dict__))
@@ -311,6 +309,7 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
self.addCleanup(soledad.close)
return soledad
+ @pytest.inlineCallbacks
def assertGetEncryptedDoc(
self, db, doc_id, doc_rev, content, has_conflicts):
"""
@@ -320,13 +319,9 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
has_conflicts=has_conflicts)
doc = db.get_doc(doc_id)
- if ENC_SCHEME_KEY in doc.content:
- # XXX check for SYM_KEY too
- key = self._soledad._crypto.doc_passphrase(doc.doc_id)
- secret = self._soledad._crypto.secret
- decrypted = decrypt_doc_dict(
- doc.content, doc.doc_id, doc.rev,
- key, secret)
+ if is_symmetrically_encrypted(doc.content['raw']):
+ crypt = self._soledad._crypto
+ decrypted = yield crypt.decrypt_doc(doc)
doc.set_json(decrypted)
self.assertEqual(exp_doc.doc_id, doc.doc_id)
self.assertEqual(exp_doc.rev, doc.rev)
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 77252b46..dc3054f2 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -17,47 +17,184 @@
"""
Tests for cryptographic related stuff.
"""
-import os
-import hashlib
import binascii
+import base64
+import hashlib
+import json
+import os
+import struct
+
+from io import BytesIO
+
+import pytest
+
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
-from leap.soledad.client import crypto
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
-from leap.soledad.common.crypto import WrongMacError
-from leap.soledad.common.crypto import UnknownMacMethodError
-from leap.soledad.common.crypto import ENC_JSON_KEY
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
-from leap.soledad.common.crypto import MAC_KEY
-from leap.soledad.common.crypto import MAC_METHOD_KEY
+from leap.soledad.client import _crypto
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+
+snowden1 = (
+ "You can't come up against "
+ "the world's most powerful intelligence "
+ "agencies and not accept the risk. "
+ "If they want to get you, over time "
+ "they will.")
+
+
+class AESTest(unittest.TestCase):
+
+ def test_chunked_encryption(self):
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ fd = BytesIO()
+ aes = _crypto.AESEncryptor(key, iv, fd)
+
+ data = snowden1
+ block = 16
+
+ for i in range(len(data)/block):
+ chunk = data[i * block:(i+1)*block]
+ aes.write(chunk)
+ aes.end()
+
+ ciphertext_chunked = fd.getvalue()
+ ciphertext = _aes_encrypt(key, iv, data)
+
+ assert ciphertext_chunked == ciphertext
+
+
+ def test_decrypt(self):
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ data = snowden1
+ block = 16
+
+ ciphertext = _aes_encrypt(key, iv, data)
+
+ fd = BytesIO()
+ aes = _crypto.AESDecryptor(key, iv, fd)
+
+ for i in range(len(ciphertext)/block):
+ chunk = ciphertext[i * block:(i+1)*block]
+ aes.write(chunk)
+ aes.end()
+
+ cleartext_chunked = fd.getvalue()
+ assert cleartext_chunked == data
+
+
+
+class BlobTestCase(unittest.TestCase):
+
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ @defer.inlineCallbacks
+ def test_blob_encryptor(self):
+
+ inf = BytesIO()
+ inf.write(snowden1)
+ inf.seek(0)
+ outf = BytesIO()
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf, result=outf,
+ secret='A' * 96, iv='B'*16)
+
+ encrypted = yield blob.encrypt()
+ data = base64.urlsafe_b64decode(encrypted.getvalue())
+ assert data[0] == '\x80'
+ ts, sch, meth = struct.unpack(
+ 'Qbb', data[1:11])
+ assert sch == 1
+ assert meth == 1
+ iv = data[11:27]
+ assert iv == 'B' * 16
+ doc_id = data[27:37]
+ assert doc_id == 'D-deadbeef'
-class EncryptedSyncTestCase(BaseSoledadTest):
+ rev = data[37:71]
+ assert rev == self.doc_info.rev
- """
- Tests that guarantee that data will always be encrypted when syncing.
- """
+ ciphertext = data[71:-64]
+ aes_key = _crypto._get_sym_key_for_doc(
+ self.doc_info.doc_id, 'A'*96)
+ assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1)
- def test_encrypt_decrypt_json(self):
+ decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext)
+ assert str(decrypted) == snowden1
+
+
+ @defer.inlineCallbacks
+ def test_blob_decryptor(self):
+
+ inf = BytesIO()
+ inf.write(snowden1)
+ inf.seek(0)
+ outf = BytesIO()
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf, result=outf,
+ secret='A' * 96, iv='B' * 16)
+ yield blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, outf,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted.getvalue() == snowden1
+
+
+ @defer.inlineCallbacks
+ def test_encrypt_and_decrypt(self):
+ """
+ Check that encrypting and decrypting gives same doc.
"""
- Test encrypting and decrypting documents.
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ assert encrypted != payload
+ assert 'raw' in encrypted
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(encrypted)
+ decrypted = yield crypto.decrypt_doc(doc2)
+ assert len(decrypted) != 0
+ assert json.loads(decrypted) == payload
+
+
+ @defer.inlineCallbacks
+ def test_decrypt_with_wrong_mac_raises(self):
"""
- simpledoc = {'key': 'val'}
- doc1 = SoledadDocument(doc_id='id')
- doc1.content = simpledoc
-
- # encrypt doc
- doc1.set_json(self._soledad._crypto.encrypt_doc(doc1))
- # assert content is different and includes keys
- self.assertNotEqual(
- simpledoc, doc1.content,
- 'incorrect document encryption')
- self.assertTrue(ENC_JSON_KEY in doc1.content)
- self.assertTrue(ENC_SCHEME_KEY in doc1.content)
- # decrypt doc
- doc1.set_json(self._soledad._crypto.decrypt_doc(doc1))
- self.assertEqual(
- simpledoc, doc1.content, 'incorrect document encryption')
+ Trying to decrypt a document with wrong MAC should raise.
+ """
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ encdict = json.loads(encrypted)
+ raw = base64.urlsafe_b64decode(str(encdict['raw']))
+ # mess with MAC
+ messed = raw[:-64] + '0' * 64
+ newraw = base64.urlsafe_b64encode(str(messed))
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(json.dumps({"raw": str(newraw)}))
+
+ with pytest.raises(_crypto.InvalidBlob):
+ decrypted = yield crypto.decrypt_doc(doc2)
+
class RecoveryDocumentTestCase(BaseSoledadTest):
@@ -146,60 +283,22 @@ class SoledadSecretsTestCase(BaseSoledadTest):
"Should have a secret at this point")
-class MacAuthTestCase(BaseSoledadTest):
-
- def test_decrypt_with_wrong_mac_raises(self):
- """
- Trying to decrypt a document with wrong MAC should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC
- doc.content[MAC_KEY] = '1234567890ABCDEF'
- # try to decrypt doc
- self.assertRaises(
- WrongMacError,
- self._soledad._crypto.decrypt_doc, doc)
-
- def test_decrypt_with_unknown_mac_method_raises(self):
- """
- Trying to decrypt a document with unknown MAC method should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC method
- doc.content[MAC_METHOD_KEY] = 'mymac'
- # try to decrypt doc
- self.assertRaises(
- UnknownMacMethodError,
- self._soledad._crypto.decrypt_doc, doc)
-
class SoledadCryptoAESTestCase(BaseSoledadTest):
def test_encrypt_decrypt_sym(self):
# generate 256-bit key
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- plaintext = crypto.decrypt_sym(cyphertext, key, iv)
+ plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
self.assertEqual('data', plaintext)
def test_decrypt_with_wrong_iv_fails(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -208,13 +307,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
wrongiv = rawiv
while wrongiv == rawiv:
wrongiv = os.urandom(1) + rawiv[1:]
- plaintext = crypto.decrypt_sym(
+ plaintext = _crypto.decrypt_sym(
cyphertext, key, iv=binascii.b2a_base64(wrongiv))
self.assertNotEqual('data', plaintext)
def test_decrypt_with_wrong_key_fails(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -222,5 +321,19 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
# ensure keys are different in case we are extremely lucky
while wrongkey == key:
wrongkey = os.urandom(32)
- plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv)
+ plaintext = _crypto.decrypt_sym(cyphertext, wrongkey, iv)
self.assertNotEqual('data', plaintext)
+
+
+def _aes_encrypt(key, iv, data):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
+ encryptor = cipher.encryptor()
+ return encryptor.update(data) + encryptor.finalize()
+
+
+def _aes_decrypt(key, iv, data):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
+ decryptor = cipher.decryptor()
+ return decryptor.update(data) + decryptor.finalize()
diff --git a/testing/tests/client/test_crypto2.py b/testing/tests/client/test_crypto2.py
deleted file mode 100644
index f0f6c4af..00000000
--- a/testing/tests/client/test_crypto2.py
+++ /dev/null
@@ -1,171 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_crypto2.py
-# Copyright (C) 2016 LEAP Encryption Access Project
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Tests for the _crypto module
-"""
-
-import base64
-import binascii
-import time
-import struct
-import StringIO
-
-import leap.soledad.client
-from leap.soledad.client import _crypto
-
-from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-from cryptography.hazmat.backends import default_backend
-
-from twisted.trial import unittest
-
-
-snowden1 = (
- "You can't come up against "
- "the world's most powerful intelligence "
- "agencies and not accept the risk. "
- "If they want to get you, over time "
- "they will.")
-
-
-def _aes_encrypt(key, iv, data):
- backend = default_backend()
- cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
- encryptor = cipher.encryptor()
- return encryptor.update(data) + encryptor.finalize()
-
-def _aes_decrypt(key, iv, data):
- backend = default_backend()
- cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
- decryptor = cipher.decryptor()
- return decryptor.update(data) + decryptor.finalize()
-
-
-def test_chunked_encryption():
- key = 'A' * 32
- iv = 'A' * 16
-
- fd = StringIO.StringIO()
- aes = _crypto.AESEncryptor(key, iv, fd)
-
- data = snowden1
- block = 16
-
- for i in range(len(data)/block):
- chunk = data[i * block:(i+1)*block]
- aes.write(chunk)
- aes.end()
-
- ciphertext_chunked = fd.getvalue()
- ciphertext = _aes_encrypt(key, iv, data)
-
- assert ciphertext_chunked == ciphertext
-
-
-def test_decrypt():
- key = 'A' * 32
- iv = 'A' * 16
-
- data = snowden1
- block = 16
-
- ciphertext = _aes_encrypt(key, iv, data)
-
- fd = StringIO.StringIO()
- aes = _crypto.AESDecryptor(key, iv, fd)
-
- for i in range(len(ciphertext)/block):
- chunk = ciphertext[i * block:(i+1)*block]
- aes.write(chunk)
- aes.end()
-
- cleartext_chunked = fd.getvalue()
- assert cleartext_chunked == data
-
-
-
-class BlobTestCase(unittest.TestCase):
-
- class doc_info:
- doc_id = 'D-deadbeef'
- rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
-
- def test_blob_encryptor(self):
-
- inf = StringIO.StringIO()
- inf.write(snowden1)
- inf.seek(0)
- outf = StringIO.StringIO()
-
- blob = _crypto.BlobEncryptor(
- self.doc_info, inf, result=outf,
- secret='A' * 96, iv='B'*16)
-
- d = blob.encrypt()
- d.addCallback(self._test_blob_encryptor_cb, outf)
- return d
-
- def _test_blob_encryptor_cb(self, _, outf):
- encrypted = outf.getvalue()
- data = base64.urlsafe_b64decode(encrypted)
-
- assert data[0] == '\x80'
- ts, sch, meth = struct.unpack(
- 'Qbb', data[1:11])
- assert sch == 1
- assert meth == 1
- iv = data[11:27]
- assert iv == 'B' * 16
- doc_id = data[27:37]
- assert doc_id == 'D-deadbeef'
-
- rev = data[37:71]
- assert rev == self.doc_info.rev
-
- ciphertext = data[71:-64]
- aes_key = _crypto._get_sym_key_for_doc(
- self.doc_info.doc_id, 'A'*96)
- assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1)
-
- decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext)
- assert str(decrypted) == snowden1
-
- def test_blob_decryptor(self):
-
- inf = StringIO.StringIO()
- inf.write(snowden1)
- inf.seek(0)
- outf = StringIO.StringIO()
-
- blob = _crypto.BlobEncryptor(
- self.doc_info, inf, result=outf,
- secret='A' * 96, iv='B' * 16)
-
- def do_decrypt(_, outf):
- decryptor = _crypto.BlobDecryptor(
- self.doc_info, outf,
- secret='A' * 96)
- d = decryptor.decrypt()
- return d
-
- d = blob.encrypt()
- d.addCallback(do_decrypt, outf)
- d.addCallback(self._test_blob_decryptor_cb)
- return d
-
- def _test_blob_decryptor_cb(self, decrypted):
- assert decrypted.getvalue() == snowden1
diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py
deleted file mode 100644
index 7055a765..00000000
--- a/testing/tests/sync/test_encdecpool.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# -*- coding: utf-8 -*-
-import json
-from twisted.internet.defer import inlineCallbacks
-
-from leap.soledad.client.encdecpool import SyncEncrypterPool
-
-from leap.soledad.common.document import SoledadDocument
-from test_soledad.util import BaseSoledadTest
-
-DOC_ID = "mydoc"
-DOC_REV = "rev"
-DOC_CONTENT = {'simple': 'document'}
-
-
-class TestSyncEncrypterPool(BaseSoledadTest):
-
- def setUp(self):
- BaseSoledadTest.setUp(self)
- crypto = self._soledad._crypto
- sync_db = self._soledad._sync_db
- self._pool = SyncEncrypterPool(crypto, sync_db)
- self._pool.start()
-
- def tearDown(self):
- self._pool.stop()
- BaseSoledadTest.tearDown(self)
-
- @inlineCallbacks
- def test_get_encrypted_doc_returns_none(self):
- """
- Test that trying to get an encrypted doc from the pool returns None if
- the document was never added for encryption.
- """
- doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
- self.assertIsNone(doc)
-
- @inlineCallbacks
- def test_encrypt_doc_and_get_it_back(self):
- """
- Test that the pool actually encrypts a document added to the queue.
- """
- doc = SoledadDocument(
- doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
-
- yield self._pool.encrypt_doc(doc)
- encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
-
- self.assertIsNotNone(encrypted)
diff --git a/testing/tests/sync/test_sqlcipher_sync.py b/testing/tests/sync/test_sqlcipher_sync.py
index 3cbefc8b..2528600d 100644
--- a/testing/tests/sync/test_sqlcipher_sync.py
+++ b/testing/tests/sync/test_sqlcipher_sync.py
@@ -27,8 +27,6 @@ from leap.soledad.common.l2db import sync
from leap.soledad.common.l2db import vectorclock
from leap.soledad.common.l2db import errors
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
-from leap.soledad.client.crypto import decrypt_doc_dict
from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from test_soledad import u1db_tests as tests
@@ -545,13 +543,11 @@ class SQLCipherDatabaseSyncTests(
self.assertFalse(doc2.has_conflicts)
self.sync(self.db2, db3)
doc3 = db3.get_doc('the-doc')
- if ENC_SCHEME_KEY in doc3.content:
- _crypto = self._soledad._crypto
- key = _crypto.doc_passphrase(doc3.doc_id)
- secret = _crypto.secret
- doc3.set_json(decrypt_doc_dict(
- doc3.content,
- doc3.doc_id, doc3.rev, key, secret))
+
+ _crypto = self._soledad._crypto
+ decrypted = _crypto.decrypt_doc(doc3)
+ doc3.set_json(decrypted)
+
self.assertEqual(doc4.get_json(), doc3.get_json())
self.assertFalse(doc3.has_conflicts)
self.db1.close()
diff --git a/testing/tox.ini b/testing/tox.ini
index 31cb8a4f..0eeeab9e 100644
--- a/testing/tox.ini
+++ b/testing/tox.ini
@@ -1,5 +1,6 @@
[tox]
envlist = py27
+skipsdist=True
[testenv]
basepython = python2.7
@@ -7,6 +8,7 @@ commands = py.test --cov-report=html \
--cov-report=term \
--cov=leap.soledad \
{posargs}
+usedevelop = True
deps =
coverage
pytest
@@ -18,6 +20,7 @@ deps =
pdbpp
couchdb
requests
+ service_identity
# install soledad local packages
-e../common
-e../client