summaryrefslogtreecommitdiff
path: root/src/leap/soledad
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-05-12 19:16:16 -0300
committerdrebs <drebs@leap.se>2013-05-13 15:49:33 -0300
commit3cefb8291c69ff4354f5b4cfde1d92117aac3d26 (patch)
tree24541105fd58f15a7977bdac5583d99ef8248315 /src/leap/soledad
parent028cf283c1dae800b2ea70a05201cb9c696be973 (diff)
Encrypt storage secret using scrypt KDF.
* Change format of storage. * Refactor and modify export/import recovery documents logic. * Change storage secret length to 512. * Encrypt the storage document with derived function. * Refactor property name inside crypto submodule. * Add docstrings for new methods. * Fix wrong method call import_recovery_document. * Fix base64 encoding and secret/kdf length info. * Add missing header. * Add missing fields to comment. Closes 2475 and 2423.
Diffstat (limited to 'src/leap/soledad')
-rw-r--r--src/leap/soledad/__init__.py384
-rw-r--r--src/leap/soledad/crypto.py18
-rw-r--r--src/leap/soledad/tests/__init__.py11
-rw-r--r--src/leap/soledad/tests/test_crypto.py59
-rw-r--r--src/leap/soledad/tests/test_soledad.py28
5 files changed, 298 insertions, 202 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index 2a7b945e..407e28f0 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -31,10 +31,8 @@ import string
import binascii
import logging
import urlparse
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
+import simplejson as json
+import scrypt
from xdg import BaseDirectory
@@ -44,6 +42,7 @@ from hashlib import sha256
from leap.common import events
from leap.common.check import leap_assert
from leap.common.files import mkdir_p
+from leap.common.keymanager.errors import DecryptionFailed
from leap.soledad.backends import sqlcipher
from leap.soledad.backends.leap_backend import (
LeapDocument,
@@ -82,9 +81,26 @@ class NotADirectory(Exception):
#
+# Helper functions
+#
+
+def base64_encode(data):
+ """
+ Return the base64 encoded version of C{data}.
+
+ @return: The base64 encoded version of C{data}.
+ @rtype: str
+ """
+ # binascii.b2a_base64 returns a new line character in the end of the
+ # string, so we strip that here.
+ return binascii.b2a_base64(data)[:-1]
+
+
+#
# Soledad: local encrypted storage and remote encrypted sync.
#
+
class Soledad(object):
"""
Soledad provides encrypted data storage and sync.
@@ -118,15 +134,32 @@ class Soledad(object):
finished synchronizing with remote replica.
"""
- SECRET_LENGTH = 50
+ STORAGE_SECRETS_FILE_NAME = "soledad.json"
+ """
+ The name of the file where the storage secrets will be stored.
+ """
+
+ STORAGE_SECRET_LENGTH = 512
"""
The length of the secret used for symmetric encryption.
"""
- SYMKEY_KEY = '_symkey'
- UUID_KEY = '_uuid'
+ SALT_LENGTH = 64
"""
- Key used to access symmetric keys in recovery documents.
+ The length of the salt used to derive the key for the storage secret
+ encryption.
+ """
+
+ UUID_KEY = 'uuid'
+ STORAGE_SECRETS_KEY = 'storage_secrets'
+ SECRET_KEY = 'secret'
+ CIPHER_KEY = 'cipher'
+ LENGTH_KEY = 'length'
+ KDF_KEY = 'kdf'
+ KDF_SALT_KEY = 'kdf_salt'
+ KDF_LENGTH_KEY = 'kdf_length'
+ """
+ Keys used to access storage secrets in recovery documents.
"""
DEFAULT_PREFIX = os.path.join(
@@ -136,7 +169,7 @@ class Soledad(object):
Prefix for default values for path.
"""
- def __init__(self, uuid, passphrase, secret_path, local_db_path,
+ def __init__(self, uuid, passphrase, secrets_path, local_db_path,
server_url, cert_file, auth_token=None, bootstrap=True):
"""
Initialize configuration, cryptographic keys and dbs.
@@ -146,9 +179,9 @@ class Soledad(object):
@param passphrase: The passphrase for locking and unlocking encryption
secrets for disk storage.
@type passphrase: str
- @param secret_path: Path for storing encrypted key used for
+ @param secrets_path: Path for storing encrypted key used for
symmetric encryption.
- @type secret_path: str
+ @type secrets_path: str
@param local_db_path: Path for local encrypted storage db.
@type local_db_path: str
@param server_url: URL for Soledad server. This is used either to sync
@@ -164,26 +197,29 @@ class Soledad(object):
for testing purposes but can be useful for initialization control.
@type bootstrap: bool
"""
- # TODO: allow for fingerprint enforcing.
+ # get config params
self._uuid = uuid
self._passphrase = passphrase
- self._init_config(secret_path, local_db_path, server_url)
+ # init crypto variables
+ self._secrets = {}
+ self._secret_id = None
+ # init config (possibly with default values)
+ self._init_config(secrets_path, local_db_path, server_url)
self._set_token(auth_token)
-
+ # configure SSL certificate
shared_db.SOLEDAD_CERT = cert_file
-
if bootstrap:
self._bootstrap()
- def _init_config(self, secret_path, local_db_path, server_url):
+ def _init_config(self, secrets_path, local_db_path, server_url):
"""
Initialize configuration using default values for missing params.
"""
- # initialize secret_path
- self._secret_path = secret_path
- if self._secret_path is None:
- self._secret_path = os.path.join(
- self.DEFAULT_PREFIX, 'secret.gpg')
+ # initialize secrets_path
+ self._secrets_path = secrets_path
+ if self._secrets_path is None:
+ self._secrets_path = os.path.join(
+ self.DEFAULT_PREFIX, self.STORAGE_SECRETS_FILE_NAME)
# initialize local_db_path
self._local_db_path = local_db_path
if self._local_db_path is None:
@@ -229,13 +265,11 @@ class Soledad(object):
self._init_dirs()
self._crypto = SoledadCrypto(self)
# Stage 1 - Keys generation/loading
- if self._has_keys():
- self._load_keys()
- else:
+ if not self._has_secret():
logger.info(
'Trying to fetch cryptographic secrets from shared recovery '
'database...')
- doc = self._fetch_keys_from_shared_db()
+ doc = self._get_secrets_from_shared_db()
if not doc:
logger.info(
'No cryptographic secrets found, creating new secrets...')
@@ -244,12 +278,11 @@ class Soledad(object):
logger.info(
'Found cryptographic secrets in shared recovery '
'database.')
- self._set_symkey(
- self._crypto.decrypt_sym(
- doc.content[self.SYMKEY_KEY],
- passphrase=self._passphrase))
+ self.import_recovery_document(
+ doc.content[self.SECRET_KEY],
+ passphrase=self._passphrase)
# Stage 2 - Keys synchronization
- self._assert_keys_in_shared_db()
+ self._put_secrets_in_shared_db()
# Stage 3 - Local database initialization
self._init_db()
@@ -261,7 +294,7 @@ class Soledad(object):
"""
paths = map(
lambda x: os.path.dirname(x),
- [self.local_db_path, self.secret_path])
+ [self.local_db_path, self._secrets_path])
for path in paths:
logger.info('Creating directory: %s.' % path)
mkdir_p(path)
@@ -272,9 +305,8 @@ class Soledad(object):
"""
events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._uuid)
# load/generate secret
- if not self._has_symkey():
- self._gen_symkey()
- self._load_symkey()
+ if not self._has_secret():
+ self._set_secret_id(self._gen_secret())
events.signal(
events.events_pb2.SOLEDAD_DONE_CREATING_KEYS, self._uuid)
@@ -287,7 +319,7 @@ class Soledad(object):
# one for symmetric encryption.
self._db = sqlcipher.open(
self.local_db_path,
- self._symkey,
+ self._get_storage_secret(),
create=True,
document_factory=LeapDocument,
crypto=self._crypto)
@@ -311,97 +343,172 @@ class Soledad(object):
# Management of secret for symmetric encryption.
#
- def _has_symkey(self):
+ def _get_storage_secret(self):
"""
- Verify if a key for symmetric encryption exists in a local encrypted
- file.
+ Return the base64 encoding of the storage secret.
- @return: whether this soledad instance has a key for symmetric
- encryption
- @rtype: bool
- """
- # does the file exist in disk?
- if not os.path.isfile(self.secret_path):
- return False
- # is it symfetrically encrypted?
- content = None
- with open(self.secret_path, 'r') as f:
- content = f.read()
- if not self._crypto.is_encrypted_sym(content):
- raise DocumentNotEncrypted(
- "File %s is not encrypted!" % self.secret_path)
- # can we decrypt it?
- plaintext = self._crypto.decrypt_sym(
- content, passphrase=self._passphrase)
- return plaintext != ''
+ Storage secret is first base64 encoded and then encrypted before being
+ stored. This message only decrypts the stored secret and returns the
+ base64 encoded version.
- def _load_symkey(self):
- """
- Load secret for symmetric encryption from local encrypted file.
+ @return: The base64 encoding of the storage secret.
+ @rtype: str
"""
- if not self._has_symkey():
- raise KeyDoesNotExist("Tried to load key for symmetric "
- "encryption but it does not exist on disk.")
- with open(self.secret_path) as f:
- self._symkey = \
- self._crypto.decrypt_sym(
- f.read(), passphrase=self._passphrase)
- self._crypto.symkey = self._symkey
+ key = base64_encode(
+ scrypt.hash(
+ self._passphrase,
+ # the salt is also stored as base64 encoded string, so make
+ # direct use of this encoded version to derive the encryption
+ # key.
+ self._secrets[self._secret_id][self.KDF_SALT_KEY]))
+ return self._crypto.decrypt_sym(
+ self._secrets[self._secret_id][self.SECRET_KEY],
+ passphrase=key)
- def _gen_symkey(self):
+ def _set_secret_id(self, secret_id):
"""
- Generate a secret for symmetric encryption and store in a local
- encrypted file.
- """
- symkey = binascii.b2a_base64(os.urandom(self.SECRET_LENGTH))
- self._set_symkey(symkey)
+ Define the id of the storage secret to be used.
- def _set_symkey(self, symkey):
+ This method will also replace the secret in the crypto object.
"""
- Define and store the key to be used for symmetric encryption.
+ self._secret_id = secret_id
+ self._crypto.secret = self._get_storage_secret()
- @param symkey: the symmetric key
- @type symkey: str
+ def _load_secret(self):
"""
- if self._has_symkey():
- raise KeyAlreadyExists("Tried to set the value of the key for "
- "symmetric encryption but it already "
- "exists on disk.")
- self._symkey = symkey
- self._crypto.symkey = self._symkey
- self._store_symkey()
+ Load symmetric storage secret from local file.
- def _store_symkey(self):
- ciphertext = self._crypto.encrypt_sym(
- self._symkey, self._passphrase)
- with open(self.secret_path, 'w') as f:
- f.write(ciphertext)
+ The content of the file has the following format:
- #
- # General crypto utility methods.
- #
+ {
+ "storage_secrets": {
+ "<secret_id>": {
+ 'kdf': 'scrypt',
+ 'kdf_salt': '<b64 repr of salt>'
+ 'kdf_length': <key length>
+ "cipher": "aes256",
+ "length": <secret length>,
+ "secret": "<encrypted storage_secret 1>",
+ }
+ }
+ }
+
+ @raise leap.common.keymanager.errors.DecryptionFailed: Raised if could
+ not decrypt the secret with the given passphrase.
+ """
+ # does the file exist in disk?
+ if not os.path.isfile(self._secrets_path):
+ raise IOError('File does not exist: %s' % self._secrets_path)
+ # read storage secrets from file
+ content = None
+ with open(self._secrets_path, 'r') as f:
+ content = json.loads(f.read())
+ self._secrets = content[self.STORAGE_SECRETS_KEY]
+ # choose first secret if no secret_id was given
+ if self._secret_id == None:
+ self._set_secret_id(self._secrets.items()[0][0])
+ # check secret is isncrypted
+ if not self._crypto.is_encrypted_sym(
+ self._secrets[self._secret_id][self.SECRET_KEY]):
+ raise DocumentNotEncrypted(
+ "File %s is not encrypted!" % self._secrets_path)
- def _has_keys(self):
+ def _has_secret(self):
"""
- Return whether this instance has the key for symmetric encryption.
+ Return whether there is a storage secret available for use or not.
- @return: whether keys are available for this instance
+ @return: Whether there's a storage secret for symmetric encryption.
@rtype: bool
"""
- return self._has_symkey()
+ # if the secret is already loaded, return true
+ if self._secret_id is not None and self._secret_id in self._secrets:
+ return True
+ # try to load from disk
+ try:
+ self._load_secret()
+ return True
+ except DecryptionFailed:
+ logger.error('Could not decrypt storage secret.')
+ except IOError, e:
+ logger.error('IOError: %s' % str(e))
+ return False
- def _load_keys(self):
- """
- Load the key for symmetric encryption from persistent storage.
+ def _gen_secret(self):
"""
- logger.info('Loading cryptographic secrets from local storage...')
- self._load_symkey()
+ Generate a secret for symmetric encryption and store in a local
+ encrypted file.
+
+ This method emits the following signals:
+
+ * leap.common.events.events_pb2.SOLEDAD_CREATING_KEYS
+ * leap.common.events.events_pb2.SOLEDAD_DONE_CREATING_KEYS
- def _gen_keys(self):
+ A secret has the following structure:
+
+ {
+ '<secret_id>': {
+ 'kdf': 'scrypt',
+ 'kdf_salt': '<b64 repr of salt>'
+ 'kdf_length': <key length>
+ 'cipher': 'aes256',
+ 'length': <secret length>,
+ 'secret': '<encrypted b64 repr of storage_secret>',
+ }
+ }
+
+ @return: The id of the generated secret.
+ @rtype: str
"""
- Generate a key for symmetric encryption.
+ events.signal(events.events_pb2.SOLEDAD_CREATING_KEYS, self._uuid)
+ # generate random secret
+ secret = os.urandom(self.STORAGE_SECRET_LENGTH)
+ secret_id = sha256(secret).hexdigest()
+ # generate random salt
+ base64_salt = base64_encode(os.urandom(self.SALT_LENGTH))
+ key = scrypt.hash(self._passphrase, base64_salt)
+ self._secrets[secret_id] = {
+ # leap.common.keymanager.openpgp uses AES256 for symmetric
+ # encryption.
+ self.KDF_KEY: 'scrypt', # TODO: remove hard coded kdf
+ self.KDF_SALT_KEY: base64_salt,
+ self.KDF_LENGTH_KEY: len(key),
+ self.CIPHER_KEY: 'aes256', # TODO: remove hard coded cipher
+ self.LENGTH_KEY: len(secret),
+ self.SECRET_KEY: self._crypto.encrypt_sym(
+ base64_encode(secret),
+ base64_encode(key)),
+ }
+ self._store_secrets()
+ return secret_id
+
+ def _store_secrets(self):
+ """
+ Store a secret in C{Soledad.STORAGE_SECRETS_FILE_PATH}.
+
+ The contents of the stored file have the following format:
+
+ {
+ 'storage_secrets': {
+ '<secret_id>': {
+ 'kdf': 'scrypt',
+ 'kdf_salt': '<salt>'
+ 'kdf_length': <len>
+ 'cipher': 'aes256',
+ 'length': 512,
+ 'secret': '<encrypted storage_secret 1>',
+ }
+ }
+ }
"""
- self._gen_symkey()
+ data = {
+ self.STORAGE_SECRETS_KEY: self._secrets,
+ }
+ with open(self._secrets_path, 'w') as f:
+ f.write(json.dumps(data))
+
+ #
+ # General crypto utility methods.
+ #
def _uuid_hash(self):
"""
@@ -422,7 +529,7 @@ class Soledad(object):
False, # TODO: eliminate need to create db here.
creds=self._creds)
- def _fetch_keys_from_shared_db(self):
+ def _get_secrets_from_shared_db(self):
"""
Retrieve the document with encrypted key material from the shared
database.
@@ -437,7 +544,7 @@ class Soledad(object):
events.events_pb2.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
return doc
- def _assert_keys_in_shared_db(self):
+ def _put_secrets_in_shared_db(self):
"""
Assert local keys are the same as shared db's ones.
@@ -447,29 +554,24 @@ class Soledad(object):
"""
leap_assert(
- self._has_keys(),
+ self._has_secret(),
'Tried to send keys to server but they don\'t exist in local '
'storage.')
- doc = self._fetch_keys_from_shared_db()
- if doc:
- remote_symkey = self._crypto.decrypt_sym(
- doc.content[self.SYMKEY_KEY],
- passphrase=self._passphrase)
- leap_assert(
- remote_symkey == self._symkey,
- 'Local and remote symmetric secrets differ!')
- else:
- events.signal(
- events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._uuid)
- content = {
- self.SYMKEY_KEY: self._crypto.encrypt_sym(
- self._symkey, self._passphrase),
- }
+ # try to get secrets doc from server, otherwise create it
+ doc = self._get_secrets_from_shared_db()
+ if doc is None:
doc = LeapDocument(doc_id=self._uuid_hash())
- doc.content = content
- self._shared_db().put_doc(doc)
- events.signal(
- events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
+ # fill doc with encrypted secrets
+ doc.content = {
+ self.SECRET_KEY: self.export_recovery_document(
+ self._passphrase)
+ }
+ # upload secrets to server
+ events.signal(
+ events.events_pb2.SOLEDAD_UPLOADING_KEYS, self._uuid)
+ self._shared_db().put_doc(doc)
+ events.signal(
+ events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
#
# Document storage, retrieval and sync.
@@ -778,9 +880,7 @@ class Soledad(object):
token = property(_get_token, _set_token, doc='The authentication Token.')
#
- # Recovery document export and import methods.
- #
-
+ # Recovery document export and import methodsecret
def export_recovery_document(self, passphrase=None):
"""
Exports username, provider, private key and key for symmetric
@@ -809,7 +909,7 @@ class Soledad(object):
"""
data = json.dumps({
self.UUID_KEY: self._uuid,
- self.SYMKEY_KEY: self._symkey,
+ self.STORAGE_SECRETS_KEY: self._secrets,
})
if passphrase:
data = self._crypto.encrypt_sym(data, passphrase)
@@ -825,21 +925,21 @@ class Soledad(object):
@param passphrase: an optional passphrase for decrypting the document
@type passphrase: str
"""
- if self._has_keys():
- raise KeyAlreadyExists("You tried to import a recovery document "
- "but secret keys are already present.")
if passphrase and not self._crypto.is_encrypted_sym(data):
raise DocumentNotEncrypted("You provided a password but the "
"recovery document is not encrypted.")
if passphrase:
data = self._crypto.decrypt_sym(data, passphrase=passphrase)
data = json.loads(data)
+ # include new secrets in our secret pool.
+ for secret_id, secret_data in data[self.STORAGE_SECRETS_KEY].items():
+ if secret_id not in self._secrets:
+ self._secrets[secret_id] = secret_data
+ self._store_secrets()
+ # set uuid
self._uuid = data[self.UUID_KEY]
- self._symkey = data[self.SYMKEY_KEY]
- self._crypto.symkey = self._symkey
- self._store_symkey()
- # TODO: make this work well with bootstrap.
- self._load_keys()
+ # choose first secret to use
+ self._set_secret_id(self._secrets.items()[0][0])
#
# Setters/getters
@@ -850,11 +950,11 @@ class Soledad(object):
uuid = property(_get_uuid, doc='The user uuid.')
- def _get_secret_path(self):
- return self._secret_path
+ def _get_secrets_path(self):
+ return self._secrets_path
- secret_path = property(
- _get_secret_path,
+ secrets_path = property(
+ _get_secrets_path,
doc='The path for the file containing the encrypted symmetric secret.')
def _get_local_db_path(self):
diff --git a/src/leap/soledad/crypto.py b/src/leap/soledad/crypto.py
index 9d2227bd..605380ec 100644
--- a/src/leap/soledad/crypto.py
+++ b/src/leap/soledad/crypto.py
@@ -47,7 +47,7 @@ class SoledadCrypto(object):
"""
self._soledad = soledad
self._pgp = openpgp.OpenPGPScheme(self._soledad)
- self._symkey = None
+ self._secret = None
def encrypt_sym(self, data, passphrase):
"""
@@ -112,19 +112,19 @@ class SoledadCrypto(object):
@rtype: str
@raise NoSymmetricSecret: if no symmetric secret was supplied.
"""
- if self._symkey is None:
+ if self._secret is None:
raise NoSymmetricSecret()
- return sha256('%s%s' % (self._symkey, suffix)).hexdigest()
+ return sha256('%s%s' % (self._secret, suffix)).hexdigest()
#
- # symkey setters/getters
+ # secret setters/getters
#
- def _get_symkey(self):
- return self._symkey
+ def _get_secret(self):
+ return self._secret
- def _set_symkey(self, symkey):
- self._symkey = symkey
+ def _set_secret(self, secret):
+ self._secret = secret
- symkey = property(_get_symkey, _set_symkey,
+ secret = property(_get_secret, _set_secret,
doc='The key used for symmetric encryption')
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py
index d947f5b3..5cd23e45 100644
--- a/src/leap/soledad/tests/__init__.py
+++ b/src/leap/soledad/tests/__init__.py
@@ -40,9 +40,9 @@ class BaseSoledadTest(BaseLeapTest):
self._soledad._init_dirs()
#self._soledad._gpg.import_keys(PUBLIC_KEY)
self._soledad._crypto = SoledadCrypto(self._soledad)
- if not self._soledad._has_symkey():
- self._soledad._gen_symkey()
- self._soledad._load_symkey()
+ if not self._soledad._has_secret():
+ self._soledad._gen_secret()
+ self._soledad._load_secret()
self._soledad._init_db()
def tearDown(self):
@@ -51,12 +51,13 @@ class BaseSoledadTest(BaseLeapTest):
self._soledad.close()
def _soledad_instance(self, user='leap@leap.se', prefix='',
- bootstrap=False, secret_path='/secret.gpg',
+ bootstrap=False,
+ secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME,
local_db_path='/soledad.u1db'):
return Soledad(
user,
'123',
- secret_path=self.tempdir+prefix+secret_path,
+ secrets_path=self.tempdir+prefix+secrets_path,
local_db_path=self.tempdir+prefix+local_db_path,
server_url='', # Soledad will fail if not given an url.
cert_file=None,
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index abe32661..8f9145c8 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -127,13 +127,13 @@ class EncryptedSyncTestCase(BaseSoledadTest):
#
# def _soledad_instance(self, user='leap@leap.se', prefix='',
# bootstrap=False, gnupg_home='/gnupg',
-# secret_path='/secret.gpg',
+# secrets_path='/secret.gpg',
# local_db_path='/soledad.u1db'):
# return Soledad(
# user,
# '123',
# gnupg_home=self.tempdir+prefix+gnupg_home,
-# secret_path=self.tempdir+prefix+secret_path,
+# secrets_path=self.tempdir+prefix+secrets_path,
# local_db_path=self.tempdir+prefix+local_db_path,
# bootstrap=bootstrap)
#
@@ -145,9 +145,9 @@ class EncryptedSyncTestCase(BaseSoledadTest):
# self._soledad = self._soledad_instance('leap@leap.se')
# self._soledad._init_dirs()
# self._soledad._crypto = SoledadCrypto(self._soledad)
-# if not self._soledad._has_symkey():
-# self._soledad._gen_symkey()
-# self._soledad._load_symkey()
+# if not self._soledad._has_get_storage_secret()():
+# self._soledad._gen_get_storage_secret()()
+# self._soledad._load_get_storage_secret()()
# self._soledad._init_db()
#
# def tearDown(self):
@@ -186,15 +186,15 @@ class EncryptedSyncTestCase(BaseSoledadTest):
class RecoveryDocumentTestCase(BaseSoledadTest):
def test_export_recovery_document_raw(self):
- rd = self._soledad.export_recovery_document(None)
- self.assertEqual(
- {
- self._soledad.UUID_KEY: self._soledad._uuid,
- self._soledad.SYMKEY_KEY: self._soledad._symkey
- },
- json.loads(rd),
- "Could not export raw recovery document."
- )
+ rd = json.loads(self._soledad.export_recovery_document(None))
+ secret_id = rd[self._soledad.STORAGE_SECRETS_KEY].items()[0][0]
+ secret = rd[self._soledad.STORAGE_SECRETS_KEY][secret_id]
+ self.assertEqual(secret_id, self._soledad._secret_id)
+ self.assertEqual(secret, self._soledad._secrets[secret_id])
+ self.assertTrue(self._soledad.CIPHER_KEY in secret)
+ self.assertTrue(secret[self._soledad.CIPHER_KEY] == 'aes256')
+ self.assertTrue(self._soledad.LENGTH_KEY in secret)
+ self.assertTrue(self._soledad.SECRET_KEY in secret)
def test_export_recovery_document_crypt(self):
rd = self._soledad.export_recovery_document('123456')
@@ -202,7 +202,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
self._soledad._crypto.is_encrypted_sym(rd))
data = {
self._soledad.UUID_KEY: self._soledad._uuid,
- self._soledad.SYMKEY_KEY: self._soledad._symkey,
+ self._soledad.STORAGE_SECRETS_KEY: self._soledad._secrets,
}
raw_data = json.loads(self._soledad._crypto.decrypt_sym(
rd,
@@ -213,11 +213,6 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
"Could not export raw recovery document."
)
- def test_import_recovery_document_raises_exception(self):
- rd = self._soledad.export_recovery_document(None)
- self.assertRaises(KeyAlreadyExists,
- self._soledad.import_recovery_document, rd, None)
-
def test_import_recovery_document_raw(self):
rd = self._soledad.export_recovery_document(None)
s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2')
@@ -226,8 +221,8 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
s.import_recovery_document(rd, None)
self.assertEqual(self._soledad._uuid,
s._uuid, 'Failed setting user uuid.')
- self.assertEqual(self._soledad._symkey,
- s._symkey,
+ self.assertEqual(self._soledad._get_storage_secret(),
+ s._get_storage_secret(),
'Failed settinng secret for symmetric encryption.')
def test_import_recovery_document_crypt(self):
@@ -238,26 +233,26 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
s.import_recovery_document(rd, '123456')
self.assertEqual(self._soledad._uuid,
s._uuid, 'Failed setting user uuid.')
- self.assertEqual(self._soledad._symkey,
- s._symkey,
+ self.assertEqual(self._soledad._get_storage_secret(),
+ s._get_storage_secret(),
'Failed settinng secret for symmetric encryption.')
class CryptoMethodsTestCase(BaseSoledadTest):
- def test__gen_symkey(self):
+ def test__gen_secret(self):
sol = self._soledad_instance(user='user@leap.se', prefix='/3')
sol._init_dirs()
sol._crypto = SoledadCrypto(sol)
- self.assertFalse(sol._has_symkey(), "Should not have a symkey at "
+ self.assertFalse(sol._has_secret(), "Should not have a secret at "
"this point")
- sol._gen_symkey()
- self.assertTrue(sol._has_symkey(), "Could not generate symkey.")
+ sol._gen_secret()
+ self.assertTrue(sol._has_secret(), "Could not generate secret.")
- def test__has_keys(self):
+ def test__has_secret(self):
sol = self._soledad_instance(user='leap@leap.se', prefix='/5')
sol._init_dirs()
sol._crypto = SoledadCrypto(sol)
- self.assertFalse(sol._has_keys())
- sol._gen_symkey()
- self.assertTrue(sol._has_keys())
+ self.assertFalse(sol._has_secret())
+ sol._gen_secret()
+ self.assertTrue(sol._has_secret())
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
index 6031c704..018ce8d7 100644
--- a/src/leap/soledad/tests/test_soledad.py
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -45,18 +45,18 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol = self._soledad_instance(prefix='/_init_dirs')
sol._init_dirs()
local_db_dir = os.path.dirname(sol.local_db_path)
- secret_path = os.path.dirname(sol.secret_path)
+ secrets_path = os.path.dirname(sol.secrets_path)
self.assertTrue(os.path.isdir(local_db_dir))
- self.assertTrue(os.path.isdir(secret_path))
+ self.assertTrue(os.path.isdir(secrets_path))
def test__init_db(self):
sol = self._soledad_instance()
sol._init_dirs()
sol._crypto = SoledadCrypto(sol)
#self._soledad._gpg.import_keys(PUBLIC_KEY)
- if not sol._has_symkey():
- sol._gen_symkey()
- sol._load_symkey()
+ if not sol._has_secret():
+ sol._gen_secret()
+ sol._load_secret()
sol._init_db()
from leap.soledad.backends.sqlcipher import SQLCipherDatabase
self.assertIsInstance(sol._db, SQLCipherDatabase)
@@ -66,11 +66,11 @@ class AuxMethodsTestCase(BaseSoledadTest):
Test if configuration defaults point to the correct place.
"""
sol = Soledad('leap@leap.se', passphrase='123', bootstrap=False,
- secret_path=None, local_db_path=None,
+ secrets_path=None, local_db_path=None,
server_url='', cert_file=None) # otherwise Soledad will fail.
self.assertEquals(
- os.path.join(sol.DEFAULT_PREFIX, 'secret.gpg'),
- sol.secret_path)
+ os.path.join(sol.DEFAULT_PREFIX, Soledad.STORAGE_SECRETS_FILE_NAME),
+ sol.secrets_path)
self.assertEquals(
os.path.join(sol.DEFAULT_PREFIX, 'soledad.u1db'),
sol.local_db_path)
@@ -83,11 +83,11 @@ class AuxMethodsTestCase(BaseSoledadTest):
'leap@leap.se',
passphrase='123',
bootstrap=False,
- secret_path='value_3',
+ secrets_path='value_3',
local_db_path='value_2',
server_url='value_1',
cert_file=None)
- self.assertEqual('value_3', sol.secret_path)
+ self.assertEqual('value_3', sol.secrets_path)
self.assertEqual('value_2', sol.local_db_path)
self.assertEqual('value_1', sol.server_url)
@@ -102,7 +102,7 @@ class SoledadSharedDBTestCase(BaseSoledadTest):
self._shared_db = SoledadSharedDatabase(
'https://provider/', LeapDocument, None)
- def test__fetch_keys_from_shared_db(self):
+ def test__get_secrets_from_shared_db(self):
"""
Ensure the shared db is queried with the correct doc_id.
"""
@@ -116,13 +116,13 @@ class SoledadSharedDBTestCase(BaseSoledadTest):
self._soledad._shared_db = MockSharedDB()
doc_id = self._soledad._uuid_hash()
- self._soledad._fetch_keys_from_shared_db()
+ self._soledad._get_secrets_from_shared_db()
self.assertTrue(
self._soledad._shared_db().get_doc.assert_called_once_with(
doc_id) is None,
'Wrong doc_id when fetching recovery document.')
- def test__assert_keys_in_shared_db(self):
+ def test__put_secrets_in_shared_db(self):
"""
Ensure recovery document is put into shared recover db.
"""
@@ -140,7 +140,7 @@ class SoledadSharedDBTestCase(BaseSoledadTest):
self._soledad._shared_db = MockSharedDB()
doc_id = self._soledad._uuid_hash()
- self._soledad._assert_keys_in_shared_db()
+ self._soledad._put_secrets_in_shared_db()
self.assertTrue(
self._soledad._shared_db().get_doc.assert_called_once_with(
doc_id) is None,