diff options
-rw-r--r-- | changes/feature_remove-strict-dependency-on-leap.common | 1 | ||||
-rw-r--r-- | changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf | 1 | ||||
-rw-r--r-- | setup.py | 8 | ||||
-rw-r--r-- | src/leap/soledad/__init__.py | 168 | ||||
-rw-r--r-- | src/leap/soledad/backends/leap_backend.py | 26 | ||||
-rw-r--r-- | src/leap/soledad/backends/sqlcipher.py | 2 | ||||
-rw-r--r-- | src/leap/soledad/crypto.py | 64 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_crypto.py | 217 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_soledad.py | 74 |
9 files changed, 344 insertions, 217 deletions
diff --git a/changes/feature_remove-strict-dependency-on-leap.common b/changes/feature_remove-strict-dependency-on-leap.common new file mode 100644 index 00000000..f25dcbf3 --- /dev/null +++ b/changes/feature_remove-strict-dependency-on-leap.common @@ -0,0 +1 @@ + o Remove strict dependency on leap.common. diff --git a/changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf b/changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf new file mode 100644 index 00000000..385c1c84 --- /dev/null +++ b/changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf @@ -0,0 +1 @@ + o Use scrypt to derive the key for local encryption. @@ -26,7 +26,6 @@ from setuptools import ( install_requirements = [ 'configparser', 'couchdb', - 'leap.common', 'pysqlcipher', 'simplejson', 'twisted>=12.0.0', # TODO: maybe we just want twisted-web? @@ -35,9 +34,10 @@ install_requirements = [ 'u1db', 'requests', 'six==1.1.0', - 'pysqlite', 'scrypt', 'routes', + 'pyxdg', + 'pycrypto', ] @@ -45,6 +45,7 @@ tests_requirements = [ 'mock', 'nose2', 'testscenarios', + 'leap.common', ] @@ -89,4 +90,7 @@ setup( tests_require=tests_requirements, data_files=data_files, classifiers=trove_classifiers, + extras_require={ + 'signaling': ['leap.common'], + } ) diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index c7f8cff3..ea3f676b 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -36,6 +36,7 @@ import scrypt import httplib import socket import ssl +import errno from xdg import BaseDirectory @@ -47,9 +48,92 @@ from u1db.remote.ssl_match_hostname import ( # noqa ) -from leap.common import events -from leap.common.check import leap_assert -from leap.common.files import mkdir_p +# +# Assert functions +# + +def soledad_assert(condition, message): + """ + Asserts the condition and displays the message if that's not + met. + + @param condition: condition to check + @type condition: bool + @param message: message to display if the condition isn't met + @type message: str + """ + assert condition, message + + +# we want to use leap.common.check.leap_assert in case it is available, +# because it also logs in a way other parts of leap can access log messages. +try: + from leap.common.check import leap_assert + soledad_assert = leap_assert +except ImportError: + pass + + +def soledad_assert_type(var, expectedType): + """ + Helper assert check for a variable's expected type + + @param var: variable to check + @type var: any + @param expectedType: type to check agains + @type expectedType: type + """ + soledad_assert(isinstance(var, expectedType), + "Expected type %r instead of %r" % + (expectedType, type(var))) + +try: + from leap.common.check import leap_assert_type + soledad_assert_type = leap_assert_type +except ImportError: + pass + + +# +# Signaling function +# + +# we define a fake signaling function and fake signal constants that will +# allow for logging signaling attempts in case leap.common.events is not +# available. + +def signal(signal, content=""): + logger.info("Would signal: %s - %s." % (str(signal), content)) + +SOLEDAD_CREATING_KEYS = 'Creating keys...' +SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' +SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' +SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.' +SOLEDAD_UPLOADING_KEYS = 'Uploading keys...' +SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.' +SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.' +SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' + +# we want to use leap.common.events to emits signals, if it is available. +try: + from leap.common import events + # replace fake signaling function with real one + signal = events.signal + # replace fake string signals with real signals + SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS + SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS + SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS + SOLEDAD_DONE_DOWNLOADING_KEYS = \ + events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS + SOLEDAD_UPLOADING_KEYS = events.events_pb2.SOLEDAD_UPLOADING_KEYS + SOLEDAD_DONE_UPLOADING_KEYS = \ + events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS + SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC + SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC +except ImportError: + pass + + from leap.soledad.backends import sqlcipher from leap.soledad.backends.leap_backend import ( LeapDocument, @@ -64,6 +148,10 @@ from leap.soledad.crypto import SoledadCrypto logger = logging.getLogger(name=__name__) +# +# Constants +# + SOLEDAD_CERT = None """ Path to the certificate file used to certify the SSL connection between @@ -226,7 +314,7 @@ class Soledad(object): self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME) # initialize server_url self._server_url = server_url - leap_assert( + soledad_assert( self._server_url is not None, 'Missing URL for Soledad server.') @@ -295,23 +383,48 @@ class Soledad(object): [self._local_db_path, self._secrets_path]) for path in paths: logger.info('Creating directory: %s.' % path) - mkdir_p(path) + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise def _init_db(self): """ Initialize the U1DB SQLCipher database for local storage. - The local storage passphrase is hexlified version of the last - C{LOCAL_STORAGE_SECRET_LENGTH} bytes of the storage secret. - """ + Currently, Soledad uses the default SQLCipher cipher, i.e. + 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key and + uses the 'raw PRAGMA key' format to handle the key to SQLCipher. + + The first C{self.REMOTE_STORAGE_SECRET_LENGTH} bytes of the storage + secret are used for remote storage encryption. We use the next + C{self.LOCAL_STORAGE_SECRET} bytes to derive a key for local storage. + From these bytes, the first C{self.SALT_LENGTH} are used as the salt + and the rest as the password for the scrypt hashing. + """ + # salt indexes + salt_start = self.REMOTE_STORAGE_SECRET_LENGTH + salt_end = salt_start + self.SALT_LENGTH + # password indexes + pwd_start = salt_end + pwd_end = salt_start + self.LOCAL_STORAGE_SECRET_LENGTH + # calculate the key for local encryption + secret = self._get_storage_secret() + key = scrypt.hash( + secret[pwd_start:pwd_end], # the password + secret[salt_start:salt_end], # the salt + buflen=32, # we need a key with 256 bits (32 bytes) + ) self._db = sqlcipher.open( self._local_db_path, - # storage secret is binary but sqlcipher passphrase must be string - binascii.b2a_hex( - self._get_storage_secret()[self.LOCAL_STORAGE_SECRET_LENGTH:]), + binascii.b2a_hex(key), # sqlcipher only accepts the hex version create=True, document_factory=LeapDocument, - crypto=self._crypto) + crypto=self._crypto, + raw_key=True) def close(self): """ @@ -420,8 +533,8 @@ class Soledad(object): This method emits the following signals: - * leap.common.events.events_pb2.SOLEDAD_CREATING_KEYS - * leap.common.events.events_pb2.SOLEDAD_DONE_CREATING_KEYS + * SOLEDAD_CREATING_KEYS + * SOLEDAD_DONE_CREATING_KEYS A secret has the following structure: @@ -439,7 +552,7 @@ class Soledad(object): @return: The id of the generated secret. @rtype: str """ - events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._uuid) + signal(SOLEDAD_CREATING_KEYS, self._uuid) # generate random secret secret = os.urandom(self.GENERATED_SECRET_LENGTH) secret_id = sha256(secret).hexdigest() @@ -460,8 +573,7 @@ class Soledad(object): str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)), } self._store_secrets() - events.signal( - events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._uuid) + signal(SOLEDAD_DONE_CREATING_KEYS, self._uuid) return secret_id def _store_secrets(self): @@ -524,15 +636,13 @@ class Soledad(object): @return: a document with encrypted key material in its contents @rtype: LeapDocument """ - events.signal( - events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._uuid) + signal(SOLEDAD_DOWNLOADING_KEYS, self._uuid) db = self._shared_db() if not db: logger.warning('No shared db found') return doc = db.get_doc(self._uuid_hash()) - events.signal( - events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) + signal(SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) return doc def _put_secrets_in_shared_db(self): @@ -544,7 +654,7 @@ class Soledad(object): Otherwise, upload keys to shared recovery database. """ - leap_assert( + soledad_assert( self._has_secret(), 'Tried to send keys to server but they don\'t exist in local ' 'storage.') @@ -555,15 +665,13 @@ class Soledad(object): # fill doc with encrypted secrets doc.content = self.export_recovery_document(include_uuid=False) # upload secrets to server - events.signal( - events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._uuid) + signal(SOLEDAD_UPLOADING_KEYS, self._uuid) db = self._shared_db() if not db: logger.warning('No shared db found') return db.put_doc(doc) - events.signal( - events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) + signal(SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) # # Document storage, retrieval and sync. @@ -816,7 +924,7 @@ class Soledad(object): local_gen = self._db.sync( urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), creds=self._creds, autocreate=True) - events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._uuid) + signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen def need_sync(self, url): @@ -833,8 +941,7 @@ class Soledad(object): info = target.get_sync_info(self._db._get_replica_uid()) # compare source generation with target's last known source generation if self._db._get_generation() != info[4]: - events.signal( - events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._uuid) + signal(SOLEDAD_NEW_DATA_TO_SYNC, self._uuid) return True return False @@ -986,3 +1093,6 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection + + +__all__ = ['soledad_assert', 'Soledad'] diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py index d92025db..4d92db37 100644 --- a/src/leap/soledad/backends/leap_backend.py +++ b/src/leap/soledad/backends/leap_backend.py @@ -33,13 +33,11 @@ from u1db.errors import BrokenSyncStream from u1db.remote.http_target import HTTPSyncTarget -from leap.common.crypto import ( +from leap.soledad import soledad_assert +from leap.soledad.crypto import ( EncryptionMethods, UnknownEncryptionMethod, - encrypt_sym, - decrypt_sym, ) -from leap.common.check import leap_assert from leap.soledad.auth import TokenBasedAuth @@ -167,9 +165,9 @@ def encrypt_doc(crypto, doc): content. @rtype: str """ - leap_assert(doc.is_tombstone() is False) + soledad_assert(doc.is_tombstone() is False) # encrypt content using AES-256 CTR mode - iv, ciphertext = encrypt_sym( + iv, ciphertext = crypto.encrypt_sym( doc.get_json(), crypto.doc_passphrase(doc.doc_id), method=EncryptionMethods.AES_256_CTR) @@ -220,12 +218,12 @@ def decrypt_doc(crypto, doc): @return: The JSON serialization of the decrypted content. @rtype: str """ - leap_assert(doc.is_tombstone() is False) - leap_assert(ENC_JSON_KEY in doc.content) - leap_assert(ENC_SCHEME_KEY in doc.content) - leap_assert(ENC_METHOD_KEY in doc.content) - leap_assert(MAC_KEY in doc.content) - leap_assert(MAC_METHOD_KEY in doc.content) + soledad_assert(doc.is_tombstone() is False) + soledad_assert(ENC_JSON_KEY in doc.content) + soledad_assert(ENC_SCHEME_KEY in doc.content) + soledad_assert(ENC_METHOD_KEY in doc.content) + soledad_assert(MAC_KEY in doc.content) + soledad_assert(MAC_METHOD_KEY in doc.content) # verify MAC ciphertext = binascii.a2b_hex( # content is stored as hex. doc.content[ENC_JSON_KEY]) @@ -241,8 +239,8 @@ def decrypt_doc(crypto, doc): if enc_scheme == EncryptionSchemes.SYMKEY: enc_method = doc.content[ENC_METHOD_KEY] if enc_method == EncryptionMethods.AES_256_CTR: - leap_assert(ENC_IV_KEY in doc.content) - plainjson = decrypt_sym( + soledad_assert(ENC_IV_KEY in doc.content) + plainjson = crypto.decrypt_sym( ciphertext, crypto.doc_passphrase(doc.doc_id), method=enc_method, diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py index 5825b844..d6d62f21 100644 --- a/src/leap/soledad/backends/sqlcipher.py +++ b/src/leap/soledad/backends/sqlcipher.py @@ -483,7 +483,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ if not all(c in string.hexdigits for c in key): raise NotAnHexString(key) - db_handle.cursor().execute('PRAGMA key = "x\'%s"' % passphrase) + db_handle.cursor().execute('PRAGMA key = "x\'%s"' % key) @classmethod def _pragma_cipher(cls, db_handle, cipher='aes-256-cbc'): diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py index e020eee6..be83e4a2 100644 --- a/src/leap/soledad/crypto.py +++ b/src/leap/soledad/crypto.py @@ -21,11 +21,35 @@ Cryptographic utilities for Soledad. """ +import os +import binascii import hmac import hashlib -from leap.common import crypto +from Crypto.Cipher import AES +from Crypto.Util import Counter + + +from leap.soledad import ( + soledad_assert, + soledad_assert_type, +) + + +class EncryptionMethods(object): + """ + Representation of encryption methods that can be used. + """ + + AES_256_CTR = 'aes-256-ctr' + + +class UnknownEncryptionMethod(Exception): + """ + Raised when trying to encrypt/decrypt with unknown method. + """ + pass class NoSymmetricSecret(Exception): @@ -51,7 +75,7 @@ class SoledadCrypto(object): self._soledad = soledad def encrypt_sym(self, data, key, - method=crypto.EncryptionMethods.AES_256_CTR): + method=EncryptionMethods.AES_256_CTR): """ Encrypt C{data} using a {password}. @@ -67,10 +91,24 @@ class SoledadCrypto(object): @return: A tuple with the initial value and the encrypted data. @rtype: (long, str) """ - return crypto.encrypt_sym(data, key, method) + soledad_assert_type(key, str) + + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s bits (must be 256 bits long).' % + (len(key) * 8)) + iv = os.urandom(8) + ctr = Counter.new(64, prefix=iv) + cipher = AES.new(key=key, mode=AES.MODE_CTR, counter=ctr) + return binascii.b2a_base64(iv), cipher.encrypt(data) + + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) def decrypt_sym(self, data, key, - method=crypto.EncryptionMethods.AES_256_CTR, **kwargs): + method=EncryptionMethods.AES_256_CTR, **kwargs): """ Decrypt data using symmetric secret. @@ -88,7 +126,23 @@ class SoledadCrypto(object): @return: The decrypted data. @rtype: str """ - return crypto.decrypt_sym(data, key, method, **kwargs) + soledad_assert_type(key, str) + + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + # assert params + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s (must be 256 bits long).' % len(key)) + soledad_assert( + 'iv' in kwargs, + 'AES-256-CTR needs an initial value given as.') + ctr = Counter.new(64, prefix=binascii.a2b_base64(kwargs['iv'])) + cipher = AES.new(key=key, mode=AES.MODE_CTR, counter=ctr) + return cipher.decrypt(data) + + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) def doc_passphrase(self, doc_id): """ diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index ae84dad3..59d20fa7 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -25,27 +25,20 @@ import shutil import tempfile import simplejson as json import hashlib +import binascii -from leap.soledad.backends.leap_backend import ( - LeapDocument, - encrypt_doc, - decrypt_doc, - EncryptionSchemes, - LeapSyncTarget, - ENC_JSON_KEY, - ENC_SCHEME_KEY, - MAC_METHOD_KEY, - MAC_KEY, - UnknownMacMethod, - WrongMac, -) -from leap.soledad.backends.couch import CouchDatabase +from leap.common.testing.basetest import BaseLeapTest +from Crypto import Random + + +from leap.common.testing.basetest import BaseLeapTest from leap.soledad import Soledad -from leap.soledad.crypto import SoledadCrypto -from leap.soledad.tests import BaseSoledadTest -from leap.soledad.tests.test_couch import CouchDBTestCase +from leap.soledad import crypto +from leap.soledad.backends import leap_backend +from leap.soledad.backends.couch import CouchDatabase from leap.soledad.tests import ( + BaseSoledadTest, KEY_FINGERPRINT, PRIVATE_KEY, ) @@ -54,8 +47,12 @@ from leap.soledad.tests.u1db_tests import ( nested_doc, TestCaseWithServer, ) -from leap.soledad.tests.test_leap_backend import make_leap_document_for_test -from leap.soledad.backends.couch import CouchServerState + + +# These will be used in the future for EncryptedCouchSyncTestCase +#from leap.soledad.tests.test_couch import CouchDBTestCase +#from leap.soledad.tests.test_leap_backend import make_leap_document_for_test +#from leap.soledad.backends.couch import CouchServerState class EncryptedSyncTestCase(BaseSoledadTest): @@ -68,112 +65,22 @@ class EncryptedSyncTestCase(BaseSoledadTest): Test encrypting and decrypting documents. """ simpledoc = {'key': 'val'} - doc1 = LeapDocument(doc_id='id') + doc1 = leap_backend.LeapDocument(doc_id='id') doc1.content = simpledoc # encrypt doc - doc1.set_json(encrypt_doc(self._soledad._crypto, doc1)) + doc1.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc1)) # assert content is different and includes keys self.assertNotEqual( simpledoc, doc1.content, 'incorrect document encryption') - self.assertTrue(ENC_JSON_KEY in doc1.content) - self.assertTrue(ENC_SCHEME_KEY in doc1.content) + self.assertTrue(leap_backend.ENC_JSON_KEY in doc1.content) + self.assertTrue(leap_backend.ENC_SCHEME_KEY in doc1.content) # decrypt doc - doc1.set_json(decrypt_doc(self._soledad._crypto, doc1)) + doc1.set_json(leap_backend.decrypt_doc(self._soledad._crypto, doc1)) self.assertEqual( simpledoc, doc1.content, 'incorrect document encryption') -#from leap.soledad.server import SoledadApp, SoledadAuthMiddleware -# -# -#def make_token_leap_app(test, state): -# app = SoledadApp(state) -# application = SoledadAuthMiddleware(app, prefix='/soledad/') -# return application -# -# -#def leap_sync_target(test, path): -# return LeapSyncTarget(test.getURL(path)) -# -# -#def token_leap_sync_target(test, path): -# st = leap_sync_target(test, 'soledad/' + path) -# st.set_token_credentials('any_user', 'any_token') -# return st -# -# -#class EncryptedCouchSyncTest(CouchDBTestCase, TestCaseWithServer): -# -# make_app_with_state = make_token_leap_app -# -# make_document_for_test = make_leap_document_for_test -# -# sync_target = token_leap_sync_target -# -# def make_app(self): -# # potential hook point -# self.request_state = CouchServerState(self._couch_url) -# return self.make_app_with_state(self.request_state) -# -# def _soledad_instance(self, user='leap@leap.se', prefix='', -# bootstrap=False, gnupg_home='/gnupg', -# secrets_path='/secret.gpg', -# local_db_path='/soledad.u1db'): -# return Soledad( -# user, -# '123', -# gnupg_home=self.tempdir+prefix+gnupg_home, -# secrets_path=self.tempdir+prefix+secrets_path, -# local_db_path=self.tempdir+prefix+local_db_path, -# bootstrap=bootstrap) -# -# def setUp(self): -# CouchDBTestCase.setUp(self) -# TestCaseWithServer.setUp(self) -# self.tempdir = tempfile.mkdtemp(suffix='.couch.test') -# # initialize soledad by hand so we can control keys -# self._soledad = self._soledad_instance('leap@leap.se') -# self._soledad._init_dirs() -# self._soledad._crypto = SoledadCrypto(self._soledad) -# if not self._soledad._has_get_storage_secret()(): -# self._soledad._gen_get_storage_secret()() -# self._soledad._load_get_storage_secret()() -# self._soledad._init_db() -# -# def tearDown(self): -# shutil.rmtree(self.tempdir) -# -# def test_encrypted_sym_sync(self): -# # get direct access to couchdb -# import ipdb; ipdb.set_trace() -# self._couch_url = 'http://localhost:' + str(self.wrapper.port) -# db = CouchDatabase(self._couch_url, 'testdb') -# # create and encrypt a doc to insert directly in couchdb -# doc = LeapDocument('doc-id') -# doc.set_json( -# encrypt_doc( -# self._soledad._crypto, 'doc-id', json.dumps(simple_doc))) -# db.put_doc(doc) -# # setup credentials for access to soledad server -# creds = { -# 'token': { -# 'uuid': 'leap@leap.se', -# 'token': '1234', -# } -# } -# # sync local soledad db with server -# self.assertTrue(self._soledad.get_doc('doc-id') is None) -# self.startServer() -# # TODO fix sync for test. -# #self._soledad.sync(self.getURL('soledad/testdb'), creds) -# # get and check doc -# doc = self._soledad.get_doc('doc-id') -# # TODO: fix below. -# #self.assertTrue(doc is not None) -# #self.assertTrue(doc.content == simple_doc) - - class RecoveryDocumentTestCase(BaseSoledadTest): def test_export_recovery_document_raw(self): @@ -199,7 +106,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest): 'Failed settinng secret for symmetric encryption.') -class CryptoMethodsTestCase(BaseSoledadTest): +class SoledadSecretsTestCase(BaseSoledadTest): def test__gen_secret(self): # instantiate and save secret_id @@ -254,33 +161,85 @@ class MacAuthTestCase(BaseSoledadTest): Trying to decrypt a document with wrong MAC should raise. """ simpledoc = {'key': 'val'} - doc = LeapDocument(doc_id='id') + doc = leap_backend.LeapDocument(doc_id='id') doc.content = simpledoc # encrypt doc - doc.set_json(encrypt_doc(self._soledad._crypto, doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) + doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc)) + self.assertTrue(leap_backend.MAC_KEY in doc.content) + self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content) # mess with MAC - doc.content[MAC_KEY] = '1234567890ABCDEF' + doc.content[leap_backend.MAC_KEY] = '1234567890ABCDEF' # try to decrypt doc self.assertRaises( - WrongMac, - decrypt_doc, self._soledad._crypto, doc) + leap_backend.WrongMac, + leap_backend.decrypt_doc, self._soledad._crypto, doc) def test_decrypt_with_unknown_mac_method_raises(self): """ Trying to decrypt a document with unknown MAC method should raise. """ simpledoc = {'key': 'val'} - doc = LeapDocument(doc_id='id') + doc = leap_backend.LeapDocument(doc_id='id') doc.content = simpledoc # encrypt doc - doc.set_json(encrypt_doc(self._soledad._crypto, doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) + doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc)) + self.assertTrue(leap_backend.MAC_KEY in doc.content) + self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content) # mess with MAC method - doc.content[MAC_METHOD_KEY] = 'mymac' + doc.content[leap_backend.MAC_METHOD_KEY] = 'mymac' # try to decrypt doc self.assertRaises( - UnknownMacMethod, - decrypt_doc, self._soledad._crypto, doc) + leap_backend.UnknownMacMethod, + leap_backend.decrypt_doc, self._soledad._crypto, doc) + + +class SoledadCryptoTestCase(BaseSoledadTest): + + def test_encrypt_decrypt_sym(self): + # generate 256-bit key + key = Random.new().read(32) + iv, cyphertext = self._soledad._crypto.encrypt_sym( + 'data', key, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + plaintext = self._soledad._crypto.decrypt_sym( + cyphertext, key, iv=iv, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertEqual('data', plaintext) + + def test_decrypt_with_wrong_iv_fails(self): + key = Random.new().read(32) + iv, cyphertext = self._soledad._crypto.encrypt_sym( + 'data', key, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + # get a different iv by changing the first byte + rawiv = binascii.a2b_base64(iv) + wrongiv = rawiv + while wrongiv == rawiv: + wrongiv = os.urandom(1) + rawiv[1:] + plaintext = self._soledad._crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv), + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertNotEqual('data', plaintext) + + def test_decrypt_with_wrong_key_fails(self): + key = Random.new().read(32) + iv, cyphertext = self._soledad._crypto.encrypt_sym( + 'data', key, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != 'data') + wrongkey = Random.new().read(32) # 256-bits key + # ensure keys are different in case we are extremely lucky + while wrongkey == key: + wrongkey = Random.new().read(32) + plaintext = self._soledad._crypto.decrypt_sym( + cyphertext, wrongkey, iv=iv, + method=crypto.EncryptionMethods.AES_256_CTR) + self.assertNotEqual('data', plaintext) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py index 09711f19..21d36771 100644 --- a/src/leap/soledad/tests/test_soledad.py +++ b/src/leap/soledad/tests/test_soledad.py @@ -165,7 +165,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): def setUp(self): BaseSoledadTest.setUp(self) # mock signaling - soledad.events.signal = Mock() + soledad.signal = Mock() def tearDown(self): pass @@ -179,54 +179,54 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test that a fresh soledad emits all bootstrap signals. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() # get a fresh instance so it emits all bootstrap signals sol = self._soledad_instance( secrets_path='alternative.json', local_db_path='alternative.u1db') # reverse call order so we can verify in the order the signals were # expected - soledad.events.signal.mock_calls.reverse() - soledad.events.signal.call_args = \ - soledad.events.signal.call_args_list[0] - soledad.events.signal.call_args_list.reverse() + soledad.signal.mock_calls.reverse() + soledad.signal.call_args = \ + soledad.signal.call_args_list[0] + soledad.signal.call_args_list.reverse() # assert signals - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_CREATING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_CREATING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_UPLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_UPLOADING_KEYS, ADDRESS, ) @@ -235,32 +235,32 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test that an existent soledad emits some of the bootstrap signals. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() # get an existent instance so it emits only some of bootstrap signals sol = self._soledad_instance() # reverse call order so we can verify in the order the signals were # expected - soledad.events.signal.mock_calls.reverse() - soledad.events.signal.call_args = \ - soledad.events.signal.call_args_list[0] - soledad.events.signal.call_args_list.reverse() + soledad.signal.mock_calls.reverse() + soledad.signal.call_args = \ + soledad.signal.call_args_list[0] + soledad.signal.call_args_list.reverse() # assert signals - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_UPLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.events.signal) - soledad.events.signal.assert_called_with( + self._pop_mock_call(soledad.signal) + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_UPLOADING_KEYS, ADDRESS, ) @@ -269,7 +269,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test Soledad emits SOLEDAD_CREATING_KEYS signal. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() # get a fresh instance so it emits all bootstrap signals sol = self._soledad_instance() # mock the actual db sync so soledad does not try to connect to the @@ -278,7 +278,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): # do the sync sol.sync() # assert the signal has been emitted - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_DONE_DATA_SYNC, ADDRESS, ) @@ -287,7 +287,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): """ Test Soledad emits SOLEDAD_CREATING_KEYS signal. """ - soledad.events.signal.reset_mock() + soledad.signal.reset_mock() sol = self._soledad_instance() # mock the sync target LeapSyncTarget.get_sync_info = Mock(return_value=[0, 0, 0, 0, 2]) @@ -296,7 +296,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): # check for new data to sync sol.need_sync('http://provider/userdb') # assert the signal has been emitted - soledad.events.signal.assert_called_with( + soledad.signal.assert_called_with( proto.SOLEDAD_NEW_DATA_TO_SYNC, ADDRESS, ) |