summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/feature_remove-strict-dependency-on-leap.common1
-rw-r--r--changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf1
-rw-r--r--setup.py8
-rw-r--r--src/leap/soledad/__init__.py168
-rw-r--r--src/leap/soledad/backends/leap_backend.py26
-rw-r--r--src/leap/soledad/backends/sqlcipher.py2
-rw-r--r--src/leap/soledad/crypto.py64
-rw-r--r--src/leap/soledad/tests/test_crypto.py217
-rw-r--r--src/leap/soledad/tests/test_soledad.py74
9 files changed, 344 insertions, 217 deletions
diff --git a/changes/feature_remove-strict-dependency-on-leap.common b/changes/feature_remove-strict-dependency-on-leap.common
new file mode 100644
index 00000000..f25dcbf3
--- /dev/null
+++ b/changes/feature_remove-strict-dependency-on-leap.common
@@ -0,0 +1 @@
+ o Remove strict dependency on leap.common.
diff --git a/changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf b/changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf
new file mode 100644
index 00000000..385c1c84
--- /dev/null
+++ b/changes/feature_use-raw-sqlcipher-key-with-scrypt-as-kdf
@@ -0,0 +1 @@
+ o Use scrypt to derive the key for local encryption.
diff --git a/setup.py b/setup.py
index b1c0b9db..b989f48a 100644
--- a/setup.py
+++ b/setup.py
@@ -26,7 +26,6 @@ from setuptools import (
install_requirements = [
'configparser',
'couchdb',
- 'leap.common',
'pysqlcipher',
'simplejson',
'twisted>=12.0.0', # TODO: maybe we just want twisted-web?
@@ -35,9 +34,10 @@ install_requirements = [
'u1db',
'requests',
'six==1.1.0',
- 'pysqlite',
'scrypt',
'routes',
+ 'pyxdg',
+ 'pycrypto',
]
@@ -45,6 +45,7 @@ tests_requirements = [
'mock',
'nose2',
'testscenarios',
+ 'leap.common',
]
@@ -89,4 +90,7 @@ setup(
tests_require=tests_requirements,
data_files=data_files,
classifiers=trove_classifiers,
+ extras_require={
+ 'signaling': ['leap.common'],
+ }
)
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index c7f8cff3..ea3f676b 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -36,6 +36,7 @@ import scrypt
import httplib
import socket
import ssl
+import errno
from xdg import BaseDirectory
@@ -47,9 +48,92 @@ from u1db.remote.ssl_match_hostname import ( # noqa
)
-from leap.common import events
-from leap.common.check import leap_assert
-from leap.common.files import mkdir_p
+#
+# Assert functions
+#
+
+def soledad_assert(condition, message):
+ """
+ Asserts the condition and displays the message if that's not
+ met.
+
+ @param condition: condition to check
+ @type condition: bool
+ @param message: message to display if the condition isn't met
+ @type message: str
+ """
+ assert condition, message
+
+
+# we want to use leap.common.check.leap_assert in case it is available,
+# because it also logs in a way other parts of leap can access log messages.
+try:
+ from leap.common.check import leap_assert
+ soledad_assert = leap_assert
+except ImportError:
+ pass
+
+
+def soledad_assert_type(var, expectedType):
+ """
+ Helper assert check for a variable's expected type
+
+ @param var: variable to check
+ @type var: any
+ @param expectedType: type to check agains
+ @type expectedType: type
+ """
+ soledad_assert(isinstance(var, expectedType),
+ "Expected type %r instead of %r" %
+ (expectedType, type(var)))
+
+try:
+ from leap.common.check import leap_assert_type
+ soledad_assert_type = leap_assert_type
+except ImportError:
+ pass
+
+
+#
+# Signaling function
+#
+
+# we define a fake signaling function and fake signal constants that will
+# allow for logging signaling attempts in case leap.common.events is not
+# available.
+
+def signal(signal, content=""):
+ logger.info("Would signal: %s - %s." % (str(signal), content))
+
+SOLEDAD_CREATING_KEYS = 'Creating keys...'
+SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.'
+SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...'
+SOLEDAD_DONE_DOWNLOADING_KEYS = 'Done downloading keys.'
+SOLEDAD_UPLOADING_KEYS = 'Uploading keys...'
+SOLEDAD_DONE_UPLOADING_KEYS = 'Done uploading keys.'
+SOLEDAD_NEW_DATA_TO_SYNC = 'New data available.'
+SOLEDAD_DONE_DATA_SYNC = 'Done data sync.'
+
+# we want to use leap.common.events to emits signals, if it is available.
+try:
+ from leap.common import events
+ # replace fake signaling function with real one
+ signal = events.signal
+ # replace fake string signals with real signals
+ SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS
+ SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS
+ SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS
+ SOLEDAD_DONE_DOWNLOADING_KEYS = \
+ events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS
+ SOLEDAD_UPLOADING_KEYS = events.events_pb2.SOLEDAD_UPLOADING_KEYS
+ SOLEDAD_DONE_UPLOADING_KEYS = \
+ events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS
+ SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC
+ SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC
+except ImportError:
+ pass
+
+
from leap.soledad.backends import sqlcipher
from leap.soledad.backends.leap_backend import (
LeapDocument,
@@ -64,6 +148,10 @@ from leap.soledad.crypto import SoledadCrypto
logger = logging.getLogger(name=__name__)
+#
+# Constants
+#
+
SOLEDAD_CERT = None
"""
Path to the certificate file used to certify the SSL connection between
@@ -226,7 +314,7 @@ class Soledad(object):
self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME)
# initialize server_url
self._server_url = server_url
- leap_assert(
+ soledad_assert(
self._server_url is not None,
'Missing URL for Soledad server.')
@@ -295,23 +383,48 @@ class Soledad(object):
[self._local_db_path, self._secrets_path])
for path in paths:
logger.info('Creating directory: %s.' % path)
- mkdir_p(path)
+ try:
+ os.makedirs(path)
+ except OSError as exc:
+ if exc.errno == errno.EEXIST and os.path.isdir(path):
+ pass
+ else:
+ raise
def _init_db(self):
"""
Initialize the U1DB SQLCipher database for local storage.
- The local storage passphrase is hexlified version of the last
- C{LOCAL_STORAGE_SECRET_LENGTH} bytes of the storage secret.
- """
+ Currently, Soledad uses the default SQLCipher cipher, i.e.
+ 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key and
+ uses the 'raw PRAGMA key' format to handle the key to SQLCipher.
+
+ The first C{self.REMOTE_STORAGE_SECRET_LENGTH} bytes of the storage
+ secret are used for remote storage encryption. We use the next
+ C{self.LOCAL_STORAGE_SECRET} bytes to derive a key for local storage.
+ From these bytes, the first C{self.SALT_LENGTH} are used as the salt
+ and the rest as the password for the scrypt hashing.
+ """
+ # salt indexes
+ salt_start = self.REMOTE_STORAGE_SECRET_LENGTH
+ salt_end = salt_start + self.SALT_LENGTH
+ # password indexes
+ pwd_start = salt_end
+ pwd_end = salt_start + self.LOCAL_STORAGE_SECRET_LENGTH
+ # calculate the key for local encryption
+ secret = self._get_storage_secret()
+ key = scrypt.hash(
+ secret[pwd_start:pwd_end], # the password
+ secret[salt_start:salt_end], # the salt
+ buflen=32, # we need a key with 256 bits (32 bytes)
+ )
self._db = sqlcipher.open(
self._local_db_path,
- # storage secret is binary but sqlcipher passphrase must be string
- binascii.b2a_hex(
- self._get_storage_secret()[self.LOCAL_STORAGE_SECRET_LENGTH:]),
+ binascii.b2a_hex(key), # sqlcipher only accepts the hex version
create=True,
document_factory=LeapDocument,
- crypto=self._crypto)
+ crypto=self._crypto,
+ raw_key=True)
def close(self):
"""
@@ -420,8 +533,8 @@ class Soledad(object):
This method emits the following signals:
- * leap.common.events.events_pb2.SOLEDAD_CREATING_KEYS
- * leap.common.events.events_pb2.SOLEDAD_DONE_CREATING_KEYS
+ * SOLEDAD_CREATING_KEYS
+ * SOLEDAD_DONE_CREATING_KEYS
A secret has the following structure:
@@ -439,7 +552,7 @@ class Soledad(object):
@return: The id of the generated secret.
@rtype: str
"""
- events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._uuid)
+ signal(SOLEDAD_CREATING_KEYS, self._uuid)
# generate random secret
secret = os.urandom(self.GENERATED_SECRET_LENGTH)
secret_id = sha256(secret).hexdigest()
@@ -460,8 +573,7 @@ class Soledad(object):
str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)),
}
self._store_secrets()
- events.signal(
- events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._uuid)
+ signal(SOLEDAD_DONE_CREATING_KEYS, self._uuid)
return secret_id
def _store_secrets(self):
@@ -524,15 +636,13 @@ class Soledad(object):
@return: a document with encrypted key material in its contents
@rtype: LeapDocument
"""
- events.signal(
- events.events_pb2.SOLEDAD_DOWNLOADING_KEYS, self._uuid)
+ signal(SOLEDAD_DOWNLOADING_KEYS, self._uuid)
db = self._shared_db()
if not db:
logger.warning('No shared db found')
return
doc = db.get_doc(self._uuid_hash())
- events.signal(
- events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
+ signal(SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
return doc
def _put_secrets_in_shared_db(self):
@@ -544,7 +654,7 @@ class Soledad(object):
Otherwise, upload keys to shared recovery database.
"""
- leap_assert(
+ soledad_assert(
self._has_secret(),
'Tried to send keys to server but they don\'t exist in local '
'storage.')
@@ -555,15 +665,13 @@ class Soledad(object):
# fill doc with encrypted secrets
doc.content = self.export_recovery_document(include_uuid=False)
# upload secrets to server
- events.signal(
- events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._uuid)
+ signal(SOLEDAD_UPLOADING_KEYS, self._uuid)
db = self._shared_db()
if not db:
logger.warning('No shared db found')
return
db.put_doc(doc)
- events.signal(
- events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
+ signal(SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
#
# Document storage, retrieval and sync.
@@ -816,7 +924,7 @@ class Soledad(object):
local_gen = self._db.sync(
urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
creds=self._creds, autocreate=True)
- events.signal(events.events_pb2.SOLEDAD_DONE_DATA_SYNC, self._uuid)
+ signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
return local_gen
def need_sync(self, url):
@@ -833,8 +941,7 @@ class Soledad(object):
info = target.get_sync_info(self._db._get_replica_uid())
# compare source generation with target's last known source generation
if self._db._get_generation() != info[4]:
- events.signal(
- events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC, self._uuid)
+ signal(SOLEDAD_NEW_DATA_TO_SYNC, self._uuid)
return True
return False
@@ -986,3 +1093,6 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection
http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection
+
+
+__all__ = ['soledad_assert', 'Soledad']
diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py
index d92025db..4d92db37 100644
--- a/src/leap/soledad/backends/leap_backend.py
+++ b/src/leap/soledad/backends/leap_backend.py
@@ -33,13 +33,11 @@ from u1db.errors import BrokenSyncStream
from u1db.remote.http_target import HTTPSyncTarget
-from leap.common.crypto import (
+from leap.soledad import soledad_assert
+from leap.soledad.crypto import (
EncryptionMethods,
UnknownEncryptionMethod,
- encrypt_sym,
- decrypt_sym,
)
-from leap.common.check import leap_assert
from leap.soledad.auth import TokenBasedAuth
@@ -167,9 +165,9 @@ def encrypt_doc(crypto, doc):
content.
@rtype: str
"""
- leap_assert(doc.is_tombstone() is False)
+ soledad_assert(doc.is_tombstone() is False)
# encrypt content using AES-256 CTR mode
- iv, ciphertext = encrypt_sym(
+ iv, ciphertext = crypto.encrypt_sym(
doc.get_json(),
crypto.doc_passphrase(doc.doc_id),
method=EncryptionMethods.AES_256_CTR)
@@ -220,12 +218,12 @@ def decrypt_doc(crypto, doc):
@return: The JSON serialization of the decrypted content.
@rtype: str
"""
- leap_assert(doc.is_tombstone() is False)
- leap_assert(ENC_JSON_KEY in doc.content)
- leap_assert(ENC_SCHEME_KEY in doc.content)
- leap_assert(ENC_METHOD_KEY in doc.content)
- leap_assert(MAC_KEY in doc.content)
- leap_assert(MAC_METHOD_KEY in doc.content)
+ soledad_assert(doc.is_tombstone() is False)
+ soledad_assert(ENC_JSON_KEY in doc.content)
+ soledad_assert(ENC_SCHEME_KEY in doc.content)
+ soledad_assert(ENC_METHOD_KEY in doc.content)
+ soledad_assert(MAC_KEY in doc.content)
+ soledad_assert(MAC_METHOD_KEY in doc.content)
# verify MAC
ciphertext = binascii.a2b_hex( # content is stored as hex.
doc.content[ENC_JSON_KEY])
@@ -241,8 +239,8 @@ def decrypt_doc(crypto, doc):
if enc_scheme == EncryptionSchemes.SYMKEY:
enc_method = doc.content[ENC_METHOD_KEY]
if enc_method == EncryptionMethods.AES_256_CTR:
- leap_assert(ENC_IV_KEY in doc.content)
- plainjson = decrypt_sym(
+ soledad_assert(ENC_IV_KEY in doc.content)
+ plainjson = crypto.decrypt_sym(
ciphertext,
crypto.doc_passphrase(doc.doc_id),
method=enc_method,
diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py
index 5825b844..d6d62f21 100644
--- a/src/leap/soledad/backends/sqlcipher.py
+++ b/src/leap/soledad/backends/sqlcipher.py
@@ -483,7 +483,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
if not all(c in string.hexdigits for c in key):
raise NotAnHexString(key)
- db_handle.cursor().execute('PRAGMA key = "x\'%s"' % passphrase)
+ db_handle.cursor().execute('PRAGMA key = "x\'%s"' % key)
@classmethod
def _pragma_cipher(cls, db_handle, cipher='aes-256-cbc'):
diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py
index e020eee6..be83e4a2 100644
--- a/src/leap/soledad/crypto.py
+++ b/src/leap/soledad/crypto.py
@@ -21,11 +21,35 @@ Cryptographic utilities for Soledad.
"""
+import os
+import binascii
import hmac
import hashlib
-from leap.common import crypto
+from Crypto.Cipher import AES
+from Crypto.Util import Counter
+
+
+from leap.soledad import (
+ soledad_assert,
+ soledad_assert_type,
+)
+
+
+class EncryptionMethods(object):
+ """
+ Representation of encryption methods that can be used.
+ """
+
+ AES_256_CTR = 'aes-256-ctr'
+
+
+class UnknownEncryptionMethod(Exception):
+ """
+ Raised when trying to encrypt/decrypt with unknown method.
+ """
+ pass
class NoSymmetricSecret(Exception):
@@ -51,7 +75,7 @@ class SoledadCrypto(object):
self._soledad = soledad
def encrypt_sym(self, data, key,
- method=crypto.EncryptionMethods.AES_256_CTR):
+ method=EncryptionMethods.AES_256_CTR):
"""
Encrypt C{data} using a {password}.
@@ -67,10 +91,24 @@ class SoledadCrypto(object):
@return: A tuple with the initial value and the encrypted data.
@rtype: (long, str)
"""
- return crypto.encrypt_sym(data, key, method)
+ soledad_assert_type(key, str)
+
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s bits (must be 256 bits long).' %
+ (len(key) * 8))
+ iv = os.urandom(8)
+ ctr = Counter.new(64, prefix=iv)
+ cipher = AES.new(key=key, mode=AES.MODE_CTR, counter=ctr)
+ return binascii.b2a_base64(iv), cipher.encrypt(data)
+
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
def decrypt_sym(self, data, key,
- method=crypto.EncryptionMethods.AES_256_CTR, **kwargs):
+ method=EncryptionMethods.AES_256_CTR, **kwargs):
"""
Decrypt data using symmetric secret.
@@ -88,7 +126,23 @@ class SoledadCrypto(object):
@return: The decrypted data.
@rtype: str
"""
- return crypto.decrypt_sym(data, key, method, **kwargs)
+ soledad_assert_type(key, str)
+
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ # assert params
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s (must be 256 bits long).' % len(key))
+ soledad_assert(
+ 'iv' in kwargs,
+ 'AES-256-CTR needs an initial value given as.')
+ ctr = Counter.new(64, prefix=binascii.a2b_base64(kwargs['iv']))
+ cipher = AES.new(key=key, mode=AES.MODE_CTR, counter=ctr)
+ return cipher.decrypt(data)
+
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
def doc_passphrase(self, doc_id):
"""
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index ae84dad3..59d20fa7 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -25,27 +25,20 @@ import shutil
import tempfile
import simplejson as json
import hashlib
+import binascii
-from leap.soledad.backends.leap_backend import (
- LeapDocument,
- encrypt_doc,
- decrypt_doc,
- EncryptionSchemes,
- LeapSyncTarget,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
- MAC_METHOD_KEY,
- MAC_KEY,
- UnknownMacMethod,
- WrongMac,
-)
-from leap.soledad.backends.couch import CouchDatabase
+from leap.common.testing.basetest import BaseLeapTest
+from Crypto import Random
+
+
+from leap.common.testing.basetest import BaseLeapTest
from leap.soledad import Soledad
-from leap.soledad.crypto import SoledadCrypto
-from leap.soledad.tests import BaseSoledadTest
-from leap.soledad.tests.test_couch import CouchDBTestCase
+from leap.soledad import crypto
+from leap.soledad.backends import leap_backend
+from leap.soledad.backends.couch import CouchDatabase
from leap.soledad.tests import (
+ BaseSoledadTest,
KEY_FINGERPRINT,
PRIVATE_KEY,
)
@@ -54,8 +47,12 @@ from leap.soledad.tests.u1db_tests import (
nested_doc,
TestCaseWithServer,
)
-from leap.soledad.tests.test_leap_backend import make_leap_document_for_test
-from leap.soledad.backends.couch import CouchServerState
+
+
+# These will be used in the future for EncryptedCouchSyncTestCase
+#from leap.soledad.tests.test_couch import CouchDBTestCase
+#from leap.soledad.tests.test_leap_backend import make_leap_document_for_test
+#from leap.soledad.backends.couch import CouchServerState
class EncryptedSyncTestCase(BaseSoledadTest):
@@ -68,112 +65,22 @@ class EncryptedSyncTestCase(BaseSoledadTest):
Test encrypting and decrypting documents.
"""
simpledoc = {'key': 'val'}
- doc1 = LeapDocument(doc_id='id')
+ doc1 = leap_backend.LeapDocument(doc_id='id')
doc1.content = simpledoc
# encrypt doc
- doc1.set_json(encrypt_doc(self._soledad._crypto, doc1))
+ doc1.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc1))
# assert content is different and includes keys
self.assertNotEqual(
simpledoc, doc1.content,
'incorrect document encryption')
- self.assertTrue(ENC_JSON_KEY in doc1.content)
- self.assertTrue(ENC_SCHEME_KEY in doc1.content)
+ self.assertTrue(leap_backend.ENC_JSON_KEY in doc1.content)
+ self.assertTrue(leap_backend.ENC_SCHEME_KEY in doc1.content)
# decrypt doc
- doc1.set_json(decrypt_doc(self._soledad._crypto, doc1))
+ doc1.set_json(leap_backend.decrypt_doc(self._soledad._crypto, doc1))
self.assertEqual(
simpledoc, doc1.content, 'incorrect document encryption')
-#from leap.soledad.server import SoledadApp, SoledadAuthMiddleware
-#
-#
-#def make_token_leap_app(test, state):
-# app = SoledadApp(state)
-# application = SoledadAuthMiddleware(app, prefix='/soledad/')
-# return application
-#
-#
-#def leap_sync_target(test, path):
-# return LeapSyncTarget(test.getURL(path))
-#
-#
-#def token_leap_sync_target(test, path):
-# st = leap_sync_target(test, 'soledad/' + path)
-# st.set_token_credentials('any_user', 'any_token')
-# return st
-#
-#
-#class EncryptedCouchSyncTest(CouchDBTestCase, TestCaseWithServer):
-#
-# make_app_with_state = make_token_leap_app
-#
-# make_document_for_test = make_leap_document_for_test
-#
-# sync_target = token_leap_sync_target
-#
-# def make_app(self):
-# # potential hook point
-# self.request_state = CouchServerState(self._couch_url)
-# return self.make_app_with_state(self.request_state)
-#
-# def _soledad_instance(self, user='leap@leap.se', prefix='',
-# bootstrap=False, gnupg_home='/gnupg',
-# secrets_path='/secret.gpg',
-# local_db_path='/soledad.u1db'):
-# return Soledad(
-# user,
-# '123',
-# gnupg_home=self.tempdir+prefix+gnupg_home,
-# secrets_path=self.tempdir+prefix+secrets_path,
-# local_db_path=self.tempdir+prefix+local_db_path,
-# bootstrap=bootstrap)
-#
-# def setUp(self):
-# CouchDBTestCase.setUp(self)
-# TestCaseWithServer.setUp(self)
-# self.tempdir = tempfile.mkdtemp(suffix='.couch.test')
-# # initialize soledad by hand so we can control keys
-# self._soledad = self._soledad_instance('leap@leap.se')
-# self._soledad._init_dirs()
-# self._soledad._crypto = SoledadCrypto(self._soledad)
-# if not self._soledad._has_get_storage_secret()():
-# self._soledad._gen_get_storage_secret()()
-# self._soledad._load_get_storage_secret()()
-# self._soledad._init_db()
-#
-# def tearDown(self):
-# shutil.rmtree(self.tempdir)
-#
-# def test_encrypted_sym_sync(self):
-# # get direct access to couchdb
-# import ipdb; ipdb.set_trace()
-# self._couch_url = 'http://localhost:' + str(self.wrapper.port)
-# db = CouchDatabase(self._couch_url, 'testdb')
-# # create and encrypt a doc to insert directly in couchdb
-# doc = LeapDocument('doc-id')
-# doc.set_json(
-# encrypt_doc(
-# self._soledad._crypto, 'doc-id', json.dumps(simple_doc)))
-# db.put_doc(doc)
-# # setup credentials for access to soledad server
-# creds = {
-# 'token': {
-# 'uuid': 'leap@leap.se',
-# 'token': '1234',
-# }
-# }
-# # sync local soledad db with server
-# self.assertTrue(self._soledad.get_doc('doc-id') is None)
-# self.startServer()
-# # TODO fix sync for test.
-# #self._soledad.sync(self.getURL('soledad/testdb'), creds)
-# # get and check doc
-# doc = self._soledad.get_doc('doc-id')
-# # TODO: fix below.
-# #self.assertTrue(doc is not None)
-# #self.assertTrue(doc.content == simple_doc)
-
-
class RecoveryDocumentTestCase(BaseSoledadTest):
def test_export_recovery_document_raw(self):
@@ -199,7 +106,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
'Failed settinng secret for symmetric encryption.')
-class CryptoMethodsTestCase(BaseSoledadTest):
+class SoledadSecretsTestCase(BaseSoledadTest):
def test__gen_secret(self):
# instantiate and save secret_id
@@ -254,33 +161,85 @@ class MacAuthTestCase(BaseSoledadTest):
Trying to decrypt a document with wrong MAC should raise.
"""
simpledoc = {'key': 'val'}
- doc = LeapDocument(doc_id='id')
+ doc = leap_backend.LeapDocument(doc_id='id')
doc.content = simpledoc
# encrypt doc
- doc.set_json(encrypt_doc(self._soledad._crypto, doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
+ doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc))
+ self.assertTrue(leap_backend.MAC_KEY in doc.content)
+ self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content)
# mess with MAC
- doc.content[MAC_KEY] = '1234567890ABCDEF'
+ doc.content[leap_backend.MAC_KEY] = '1234567890ABCDEF'
# try to decrypt doc
self.assertRaises(
- WrongMac,
- decrypt_doc, self._soledad._crypto, doc)
+ leap_backend.WrongMac,
+ leap_backend.decrypt_doc, self._soledad._crypto, doc)
def test_decrypt_with_unknown_mac_method_raises(self):
"""
Trying to decrypt a document with unknown MAC method should raise.
"""
simpledoc = {'key': 'val'}
- doc = LeapDocument(doc_id='id')
+ doc = leap_backend.LeapDocument(doc_id='id')
doc.content = simpledoc
# encrypt doc
- doc.set_json(encrypt_doc(self._soledad._crypto, doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
+ doc.set_json(leap_backend.encrypt_doc(self._soledad._crypto, doc))
+ self.assertTrue(leap_backend.MAC_KEY in doc.content)
+ self.assertTrue(leap_backend.MAC_METHOD_KEY in doc.content)
# mess with MAC method
- doc.content[MAC_METHOD_KEY] = 'mymac'
+ doc.content[leap_backend.MAC_METHOD_KEY] = 'mymac'
# try to decrypt doc
self.assertRaises(
- UnknownMacMethod,
- decrypt_doc, self._soledad._crypto, doc)
+ leap_backend.UnknownMacMethod,
+ leap_backend.decrypt_doc, self._soledad._crypto, doc)
+
+
+class SoledadCryptoTestCase(BaseSoledadTest):
+
+ def test_encrypt_decrypt_sym(self):
+ # generate 256-bit key
+ key = Random.new().read(32)
+ iv, cyphertext = self._soledad._crypto.encrypt_sym(
+ 'data', key,
+ method=crypto.EncryptionMethods.AES_256_CTR)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != 'data')
+ plaintext = self._soledad._crypto.decrypt_sym(
+ cyphertext, key, iv=iv,
+ method=crypto.EncryptionMethods.AES_256_CTR)
+ self.assertEqual('data', plaintext)
+
+ def test_decrypt_with_wrong_iv_fails(self):
+ key = Random.new().read(32)
+ iv, cyphertext = self._soledad._crypto.encrypt_sym(
+ 'data', key,
+ method=crypto.EncryptionMethods.AES_256_CTR)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != 'data')
+ # get a different iv by changing the first byte
+ rawiv = binascii.a2b_base64(iv)
+ wrongiv = rawiv
+ while wrongiv == rawiv:
+ wrongiv = os.urandom(1) + rawiv[1:]
+ plaintext = self._soledad._crypto.decrypt_sym(
+ cyphertext, key, iv=binascii.b2a_base64(wrongiv),
+ method=crypto.EncryptionMethods.AES_256_CTR)
+ self.assertNotEqual('data', plaintext)
+
+ def test_decrypt_with_wrong_key_fails(self):
+ key = Random.new().read(32)
+ iv, cyphertext = self._soledad._crypto.encrypt_sym(
+ 'data', key,
+ method=crypto.EncryptionMethods.AES_256_CTR)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != 'data')
+ wrongkey = Random.new().read(32) # 256-bits key
+ # ensure keys are different in case we are extremely lucky
+ while wrongkey == key:
+ wrongkey = Random.new().read(32)
+ plaintext = self._soledad._crypto.decrypt_sym(
+ cyphertext, wrongkey, iv=iv,
+ method=crypto.EncryptionMethods.AES_256_CTR)
+ self.assertNotEqual('data', plaintext)
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
index 09711f19..21d36771 100644
--- a/src/leap/soledad/tests/test_soledad.py
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -165,7 +165,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
def setUp(self):
BaseSoledadTest.setUp(self)
# mock signaling
- soledad.events.signal = Mock()
+ soledad.signal = Mock()
def tearDown(self):
pass
@@ -179,54 +179,54 @@ class SoledadSignalingTestCase(BaseSoledadTest):
"""
Test that a fresh soledad emits all bootstrap signals.
"""
- soledad.events.signal.reset_mock()
+ soledad.signal.reset_mock()
# get a fresh instance so it emits all bootstrap signals
sol = self._soledad_instance(
secrets_path='alternative.json',
local_db_path='alternative.u1db')
# reverse call order so we can verify in the order the signals were
# expected
- soledad.events.signal.mock_calls.reverse()
- soledad.events.signal.call_args = \
- soledad.events.signal.call_args_list[0]
- soledad.events.signal.call_args_list.reverse()
+ soledad.signal.mock_calls.reverse()
+ soledad.signal.call_args = \
+ soledad.signal.call_args_list[0]
+ soledad.signal.call_args_list.reverse()
# assert signals
- soledad.events.signal.assert_called_with(
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_CREATING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_CREATING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_UPLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_UPLOADING_KEYS,
ADDRESS,
)
@@ -235,32 +235,32 @@ class SoledadSignalingTestCase(BaseSoledadTest):
"""
Test that an existent soledad emits some of the bootstrap signals.
"""
- soledad.events.signal.reset_mock()
+ soledad.signal.reset_mock()
# get an existent instance so it emits only some of bootstrap signals
sol = self._soledad_instance()
# reverse call order so we can verify in the order the signals were
# expected
- soledad.events.signal.mock_calls.reverse()
- soledad.events.signal.call_args = \
- soledad.events.signal.call_args_list[0]
- soledad.events.signal.call_args_list.reverse()
+ soledad.signal.mock_calls.reverse()
+ soledad.signal.call_args = \
+ soledad.signal.call_args_list[0]
+ soledad.signal.call_args_list.reverse()
# assert signals
- soledad.events.signal.assert_called_with(
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_UPLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.events.signal)
- soledad.events.signal.assert_called_with(
+ self._pop_mock_call(soledad.signal)
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_UPLOADING_KEYS,
ADDRESS,
)
@@ -269,7 +269,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
"""
Test Soledad emits SOLEDAD_CREATING_KEYS signal.
"""
- soledad.events.signal.reset_mock()
+ soledad.signal.reset_mock()
# get a fresh instance so it emits all bootstrap signals
sol = self._soledad_instance()
# mock the actual db sync so soledad does not try to connect to the
@@ -278,7 +278,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
# do the sync
sol.sync()
# assert the signal has been emitted
- soledad.events.signal.assert_called_with(
+ soledad.signal.assert_called_with(
proto.SOLEDAD_DONE_DATA_SYNC,
ADDRESS,
)
@@ -287,7 +287,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
"""
Test Soledad emits SOLEDAD_CREATING_KEYS signal.
"""
- soledad.events.signal.reset_mock()
+ soledad.signal.reset_mock()
sol = self._soledad_instance()
# mock the sync target
LeapSyncTarget.get_sync_info = Mock(return_value=[0, 0, 0, 0, 2])
@@ -296,7 +296,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
# check for new data to sync
sol.need_sync('http://provider/userdb')
# assert the signal has been emitted
- soledad.events.signal.assert_called_with(
+ soledad.signal.assert_called_with(
proto.SOLEDAD_NEW_DATA_TO_SYNC,
ADDRESS,
)