From 3462abf72b9057855fe7e1b92b12f04bdbd67537 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 3 Jun 2013 16:19:13 -0300 Subject: Move symmetric encryption/decryption code from leap.common to leap.soledad. --- src/leap/soledad/tests/test_crypto.py | 217 ++++++++++++++-------------------- 1 file changed, 88 insertions(+), 129 deletions(-) (limited to 'src/leap/soledad/tests') diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index ae84dad3..59d20fa7 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -25,27 +25,20 @@ import shutil import tempfile import simplejson as json import hashlib +import binascii -from leap.soledad.backends.leap_backend import ( - LeapDocument, - encrypt_doc, - decrypt_doc, - EncryptionSchemes, - LeapSyncTarget, - ENC_JSON_KEY, - ENC_SCHEME_KEY, - MAC_METHOD_KEY, - MAC_KEY, - UnknownMacMethod, - WrongMac, -) -from leap.soledad.backends.couch import CouchDatabase +from leap.common.testing.basetest import BaseLeapTest +from Crypto import Random + + +from leap.common.testing.basetest import BaseLeapTest from leap.soledad import Soledad -from leap.soledad.crypto import SoledadCrypto -from leap.soledad.tests import BaseSoledadTest -from leap.soledad.tests.test_couch import CouchDBTestCase +from leap.soledad import crypto +from leap.soledad.backends import leap_backend +from leap.soledad.backends.couch import CouchDatabase from leap.soledad.tests import ( + BaseSoledadTest, KEY_FINGERPRINT, PRIVATE_KEY, ) @@ -54,8 +47,12 @@ from leap.soledad.tests.u1db_tests import ( nested_doc, TestCaseWithServer, ) -from leap.soledad.tests.test_leap_backend import make_leap_document_for_test -from leap.soledad.backends.couch import CouchServerState + + +# These will be used in the future for EncryptedCouchSyncTestCase +#from leap.soledad.tests.test_couch import CouchDBTestCase +#from leap.soledad.tests.test_leap_backend import make_leap_document_for_test +#from leap.soledad.backends.couch import CouchServerState class EncryptedSyncTestCase(BaseSoledadTest): @@ -68,112 +65,22 @@ class EncryptedSyncTestCase(BaseSoledadTest): Test encrypting and decrypting documents. """ simpledoc = {'key': 'val'} - doc1 = LeapDocument(doc_id='id') + doc1 = leap_backend.LeapDocument(doc_id='id') doc1.content = simpledoc # encrypt doc - doc1.set_json(encrypt_doc(self._soledad._crypto, doc1)) + doc1.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc1)) # assert content is different and includes keys self.assertNotEqual( simpledoc, doc1.content, 'incorrect document encryption') - self.assertTrue(ENC_JSON_KEY in doc1.content) - self.assertTrue(ENC_SCHEME_KEY in doc1.content) + self.assertTrue(leap_backend.ENC_JSON_KEY in doc1.content) + self.assertTrue(leap_backend.ENC_SCHEME_KEY in doc1.content) # decrypt doc - doc1.set_json(decrypt_doc(self._soledad._crypto, doc1)) + doc1.set_json(leap_backend.decrypt_doc(self._soledad._crypto, doc1)) self.assertEqual( simpledoc, doc1.content, 'incorrect document encryption') -#from leap.soledad.server import SoledadApp, SoledadAuthMiddleware -# -# -#def make_token_leap_app(test, state): -# app = SoledadApp(state) -# application = SoledadAuthMiddleware(app, prefix='/soledad/') -# return application -# -# -#def leap_sync_target(test, path): -# return LeapSyncTarget(test.getURL(path)) -# -# -#def token_leap_sync_target(test, path): -# st = leap_sync_target(test, 'soledad/' + path) -# st.set_token_credentials('any_user', 'any_token') -# return st -# -# -#class EncryptedCouchSyncTest(CouchDBTestCase, TestCaseWithServer): -# -# make_app_with_state = make_token_leap_app -# -# make_document_for_test = make_leap_document_for_test -# -# sync_target = token_leap_sync_target -# -# def make_app(self): -# # potential hook point -# self.request_state = CouchServerState(self._couch_url) -# return self.make_app_with_state(self.request_state) -# -# def _soledad_instance(self, user='leap@leap.se', prefix='', -# bootstrap=False, gnupg_home='/gnupg', -# secrets_path='/secret.gpg', -# local_db_path='/soledad.u1db'): -# return Soledad( -# user, -# '123', -# gnupg_home=self.tempdir+prefix+gnupg_home, -# secrets_path=self.tempdir+prefix+secrets_path, -# local_db_path=self.tempdir+prefix+local_db_path, -# bootstrap=bootstrap) -# -# def setUp(self): -# CouchDBTestCase.setUp(self) -# TestCaseWithServer.setUp(self) -# self.tempdir = tempfile.mkdtemp(suffix='.couch.test') -# # initialize soledad by hand so we can control keys -# self._soledad = self._soledad_instance('leap@leap.se') -# self._soledad._init_dirs() -# self._soledad._crypto = SoledadCrypto(self._soledad) -# if not self._soledad._has_get_storage_secret()(): -# self._soledad._gen_get_storage_secret()() -# self._soledad._load_get_storage_secret()() -# self._soledad._init_db() -# -# def tearDown(self): -# shutil.rmtree(self.tempdir) -# -# def test_encrypted_sym_sync(self): -# # get direct access to couchdb -# import ipdb; ipdb.set_trace() -# self._couch_url = 'http://localhost:' + str(self.wrapper.port) -# db = CouchDatabase(self._couch_url, 'testdb') -# # create and encrypt a doc to insert directly in couchdb -# doc = LeapDocument('doc-id') -# doc.set_json( -# encrypt_doc( -# self._soledad._crypto, 'doc-id', json.dumps(simple_doc))) -# db.put_doc(doc) -# # setup credentials for access to soledad server -# creds = { -# 'token': { -# 'uuid': 'leap@leap.se', -# 'token': '1234', -# } -# } -# # sync local soledad db with server -# self.assertTrue(self._soledad.get_doc('doc-id') is None) -# self.startServer() -# # TODO fix sync for test. -# #self._soledad.sync(self.getURL('soledad/testdb'), creds) -# # get and check doc -# doc = self._soledad.get_doc('doc-id') -# # TODO: fix below. -# #self.assertTrue(doc is not None) -# #self.assertTrue(doc.content == simple_doc) - - class RecoveryDocumentTestCase(BaseSoledadTest): def test_export_recovery_document_raw(self): @@ -199,7 +106,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest): 'Failed settinng secret for symmetric encryption.') -class CryptoMethodsTestCase(BaseSoledadTest): +class SoledadSecretsTestCase(BaseSoledadTest): def test__gen_secret(self): # instantiate and save secret_id @@ -254,33 +161,85 @@ class MacAuthTestCase(BaseSoledadTest): Trying to decrypt a document with wrong MAC should raise. """ simpledoc = {'key': 'val'} - doc = LeapDocument(doc_id='id') + doc = leap_backend.LeapDocument(doc_id='id') doc.content = simpledoc # encrypt doc - doc.set_json(encrypt_doc(self._soledad._crypto, doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) + doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc)) + self.assertTrue(leap_backend.MAC_KEY in doc.content) + self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content) # mess with MAC - doc.content[MAC_KEY] = '1234567890ABCDEF' + doc.content[leap_backend.MAC_KEY] = '1234567890ABCDEF' # try to decrypt doc self.assertRaises( - WrongMac, - decrypt_doc, self._soledad._crypto, doc) + leap_backend.WrongMac, + leap_backend.decrypt_doc, self._soledad._crypto, doc) def test_decrypt_with_unknown_mac_method_raises(self): """ Trying to decrypt a document with unknown MAC method should raise. """ simpledoc = {'key': 'val'} - doc = LeapDocument(doc_id='id') + doc = leap_backend.LeapDocument(doc_id='id') doc.content = simpledoc # encrypt doc - doc.set_json(encrypt_doc(self._soledad._crypto, doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) + doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc)) + self.assertTrue(leap_backend.MAC_KEY in doc.content) + self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content) # mess with MAC method - doc.content[MAC_METHOD_KEY] = 'mymac' + doc.content[leap_backend.MAC_METHOD_KEY] = 'mymac' # try to decrypt doc self.assertRaises( - UnknownMacMethod, - decrypt_doc, self._soledad._crypto, doc) + leap_backend.UnknownMacMethod, + leap_backend.decrypt_doc, self._soledad._crypto, doc) + + +class SoledadCryptoTestCase(BaseSoledadTest): + + def test_encrypt_decrypt_sym(self): + # generate 256-bit key + key = Random.new().read(32) + iv, cyphertext = self._soledad._crypto.encrypt_sym( + 'data', key, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + plaintext = self._soledad._crypto.decrypt_sym( + cyphertext, key, iv=iv, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertEqual('data', plaintext) + + def test_decrypt_with_wrong_iv_fails(self): + key = Random.new().read(32) + iv, cyphertext = self._soledad._crypto.encrypt_sym( + 'data', key, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + # get a different iv by changing the first byte + rawiv = binascii.a2b_base64(iv) + wrongiv = rawiv + while wrongiv == rawiv: + wrongiv = os.urandom(1) + rawiv[1:] + plaintext = self._soledad._crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv), + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertNotEqual('data', plaintext) + + def test_decrypt_with_wrong_key_fails(self): + key = Random.new().read(32) + iv, cyphertext = self._soledad._crypto.encrypt_sym( + 'data', key, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + wrongkey = Random.new().read(32) # 256-bits key + # ensure keys are different in case we are extremely lucky + while wrongkey == key: + wrongkey = Random.new().read(32) + plaintext = self._soledad._crypto.decrypt_sym( + cyphertext, wrongkey, iv=iv, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertNotEqual('data', plaintext) -- cgit v1.2.3 From f4561f3462d8a265f11811d65c4b50db1dd61b45 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 3 Jun 2013 17:10:15 -0300 Subject: Remove strict dependency on leap.common. * Encapsulate leap_assert and leap_assert_type so Soledad works without them. * Remove dependency on leap.common.files.mkdir_p(). * Encapsulate signaling. * Add changes file. --- src/leap/soledad/tests/test_soledad.py | 74 +++++++++++++++++----------------- 1 file changed, 37 insertions(+), 37 deletions(-) (limited to 'src/leap/soledad/tests') diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py index 09711f19..21d36771 100644 --- a/src/leap/soledad/tests/test_soledad.py +++ b/src/leap/soledad/tests/test_soledad.py @@ -165,7 +165,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): def setUp(self): BaseSoledadTest.setUp(self) # mock signaling - soledad.events.signal = Mock() + soledad.signal = Mock() def tearDown(self): pass @@ -179,54 +179,54 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test that a fresh soledad emits all bootstrap signals. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() # get a fresh instance so it emits all bootstrap signals sol = self._soledad_instance( secrets_path='alternative.json', local_db_path='alternative.u1db') # reverse call order so we can verify in the order the signals were # expected - soledad.events.signal.mock_calls.reverse() - soledad.events.signal.call_args = \ - soledad.events.signal.call_args_list[0] - soledad.events.signal.call_args_list.reverse() + soledad.signal.mock_calls.reverse() + soledad.signal.call_args = \ + soledad.signal.call_args_list[0] + soledad.signal.call_args_list.reverse() # assert signals - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_CREATING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_CREATING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_UPLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_UPLOADING_KEYS, ADDRESS, ) @@ -235,32 +235,32 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test that an existent soledad emits some of the bootstrap signals. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() # get an existent instance so it emits only some of bootstrap signals sol = self._soledad_instance() # reverse call order so we can verify in the order the signals were # expected - soledad.events.signal.mock_calls.reverse() - soledad.events.signal.call_args = \ - soledad.events.signal.call_args_list[0] - soledad.events.signal.call_args_list.reverse() + soledad.signal.mock_calls.reverse() + soledad.signal.call_args = \ + soledad.signal.call_args_list[0] + soledad.signal.call_args_list.reverse() # assert signals - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_UPLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_UPLOADING_KEYS, ADDRESS, ) @@ -269,7 +269,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test Soledad emits SOLEDAD_CREATING_KEYS signal. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() # get a fresh instance so it emits all bootstrap signals sol = self._soledad_instance() # mock the actual db sync so soledad does not try to connect to the @@ -278,7 +278,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): # do the sync sol.sync() # assert the signal has been emitted - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DATA_SYNC, ADDRESS, ) @@ -287,7 +287,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test Soledad emits SOLEDAD_CREATING_KEYS signal. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() sol = self._soledad_instance() # mock the sync target LeapSyncTarget.get_sync_info = Mock(return_value=[0, 0, 0, 0, 2]) @@ -296,7 +296,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): # check for new data to sync sol.need_sync('http://provider/userdb') # assert the signal has been emitted - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_NEW_DATA_TO_SYNC, ADDRESS, ) -- cgit v1.2.3