summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-04-13 12:33:59 -0300
committerdrebs <drebs@leap.se>2013-04-13 12:33:59 -0300
commit7d63e687e4c3b6e4da4599a7e685dc5c44457a64 (patch)
treec5caa8ca6d7a21dba66523f877e23569a46272aa
parent3e0d12b8130a317c11da5145bb878b70b3bd8cac (diff)
Use BaseConfig for configuring Soledad.
-rw-r--r--src/leap/soledad/__init__.py113
-rw-r--r--src/leap/soledad/config.py102
-rw-r--r--src/leap/soledad/tests/__init__.py18
-rw-r--r--src/leap/soledad/tests/test_crypto.py82
-rw-r--r--src/leap/soledad/tests/test_soledad.py143
5 files changed, 349 insertions, 109 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index b636b744..38b1f772 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -38,8 +38,9 @@ except ImportError:
import json # noqa
-from leap.soledad.backends import sqlcipher
+from leap.soledad.config import SoledadConfig
from leap.soledad.util import GPGWrapper
+from leap.soledad.backends import sqlcipher
from leap.soledad.backends.leap_backend import (
LeapDocument,
DocumentNotEncrypted,
@@ -75,27 +76,21 @@ class Soledad(object):
storing/fetching them on Soledad server.
"""
- # other configs
SECRET_LENGTH = 50
- DEFAULT_CONF = {
- 'gnupg_home': '%s/gnupg',
- 'secret_path': '%s/secret.gpg',
- 'local_db_path': '%s/soledad.u1db',
- 'config_file': '%s/soledad.ini',
- 'shared_db_url': '',
- }
-
- def __init__(self, user, prefix=None, gnupg_home=None,
+ """
+ The length of the secret used for symmetric encryption.
+ """
+
+ def __init__(self, user, config_path=None, gnupg_home=None,
secret_path=None, local_db_path=None,
- config_file=None, shared_db_url=None, auth_token=None,
- bootstrap=True):
+ shared_db_url=None, auth_token=None, bootstrap=True):
"""
Initialize configuration, cryptographic keys and dbs.
@param user: Email address of the user (username@provider).
@type user: str
- @param prefix: Path to use as prefix for files.
- @type prefix: str
+ @param config_path: Path for configuration file.
+ @type config_path: str
@param gnupg_home: Home directory for gnupg.
@type gnupg_home: str
@param secret_path: Path for storing gpg-encrypted key used for
@@ -103,8 +98,6 @@ class Soledad(object):
@type secret_path: str
@param local_db_path: Path for local encrypted storage db.
@type local_db_path: str
- @param config_file: Path for configuration file.
- @type config_file: str
@param shared_db_url: URL for shared Soledad DB for key storage and
unauth retrieval.
@type shared_db_url: str
@@ -118,11 +111,10 @@ class Soledad(object):
self._user = user
self._auth_token = auth_token
self._init_config(
- prefix=prefix,
+ config_path=config_path,
gnupg_home=gnupg_home,
secret_path=secret_path,
local_db_path=local_db_path,
- config_file=config_file,
shared_db_url=shared_db_url,
)
if bootstrap:
@@ -156,7 +148,15 @@ class Soledad(object):
# TODO: log each bootstrap step.
# Stage 0 - Local environment setup
self._init_dirs()
- self._gpg = GPGWrapper(gnupghome=self.gnupg_home)
+ self._gpg = GPGWrapper(gnupghome=self._config.get_gnupg_home())
+ if self._config.get_shared_db_url() and self._auth_token:
+ # TODO: eliminate need to create db here.
+ self._shared_db = SoledadSharedDatabase.open_database(
+ self._config.get_shared_db_url(),
+ True,
+ token=self._auth_token)
+ else:
+ self._shared_db = None
# Stage 1 - Keys generation/loading
if self._has_keys():
self._load_keys()
@@ -170,50 +170,44 @@ class Soledad(object):
self._set_symkey(self.decrypt(doc.content['_symkey']))
# Stage 2 - Keys synchronization
self._assert_server_keys()
- # Stage 3 -Database initialization
+ # Stage 3 - Local database initialization
self._init_db()
- if self.shared_db_url:
- # TODO: eliminate need to create db here.
- self._shared_db = SoledadSharedDatabase.open_database(
- self.shared_db_url,
- True,
- token=auth_token)
- else:
- self._shared_db = None
def _init_config(self, **kwargs):
"""
- Initialize configuration, with precedence order give by: instance
- parameters > config file > default values.
+ Initialize configuration using SoledadConfig.
- @param kwargs: a dictionary with parameter values passed when
- instantiating this Soledad instance.
+ Soledad configuration makes use of BaseLeapConfig to load values from
+ a file or from default configuration. Parameters passed as arguments
+ for this method will supersede file and default values.
+
+ @param kwargs: a dictionary with configuration parameter values passed
+ when instantiating this Soledad instance.
@type kwargs: dict
"""
- # TODO: write tests for _init_config()
- self.prefix = kwargs['prefix'] or \
- os.environ['HOME'] + '/.config/leap/soledad'
- m = re.compile('.*%s.*')
- for key, default_value in self.DEFAULT_CONF.iteritems():
- val = kwargs[key] or default_value
- if m.match(val):
- val = val % self.prefix
- setattr(self, key, val)
- # get config from file
- # TODO: sanitize options from config file.
- config = configparser.ConfigParser()
- config.read(self.config_file)
- if 'soledad-client' in config:
- for key in self.DEFAULT_CONF:
- if key in config['soledad-client'] and not kwargs[key]:
- setattr(self, key, config['soledad-client'][key])
+ self._config = SoledadConfig()
+ config_file = kwargs.get('config_path', None)
+ if config_file is not None:
+ self._config.load(path=config_file)
+ else:
+ self._config.load(data='')
+ # overwrite config with passed parameters
+ for param in ['gnupg_home', 'secret_path', 'local_db_path',
+ 'shared_db_url']:
+ if param in kwargs and kwargs[param] is not None:
+ self._config._config_checker.config[param] = kwargs[param]
def _init_dirs(self):
"""
Create work directories.
"""
- if not os.path.isdir(self.prefix):
- os.makedirs(self.prefix)
+ paths = map(
+ lambda x: os.path.dirname(x),
+ [self._config.get_gnupg_home(), self._config.get_local_db_path(),
+ self._config.get_secret_path()])
+ for path in paths:
+ if not os.path.isdir(path):
+ os.makedirs(path)
def _init_keys(self):
"""
@@ -238,7 +232,7 @@ class Soledad(object):
# TODO: verify if secret for sqlcipher should be the same as the
# one for symmetric encryption.
self._db = sqlcipher.open(
- self.local_db_path,
+ self._config.get_local_db_path(),
self._symkey,
create=True,
document_factory=LeapDocument,
@@ -267,14 +261,14 @@ class Soledad(object):
@rtype: bool
"""
# does the file exist in disk?
- if not os.path.isfile(self.secret_path):
+ if not os.path.isfile(self._config.get_secret_path()):
return False
# is it asymmetrically encrypted?
- f = open(self.secret_path, 'r')
+ f = open(self._config.get_secret_path(), 'r')
content = f.read()
if not self.is_encrypted_asym(content):
raise DocumentNotEncrypted(
- "File %s is not encrypted!" % self.secret_path)
+ "File %s is not encrypted!" % self._config.get_secret_path())
# can we decrypt it?
fp = self._gpg.encrypted_to(content)['fingerprint']
if fp != self._fingerprint:
@@ -291,10 +285,11 @@ class Soledad(object):
raise KeyDoesNotExist("Tried to load key for symmetric "
"encryption but it does not exist on disk.")
try:
- with open(self.secret_path) as f:
+ with open(self._config.get_secret_path()) as f:
self._symkey = str(self._gpg.decrypt(f.read()))
except IOError:
- raise IOError('Failed to open secret file %s.' % self.secret_path)
+ raise IOError('Failed to open secret file %s.' %
+ self._config.get_secret_path())
def _gen_symkey(self):
"""
@@ -323,7 +318,7 @@ class Soledad(object):
def _store_symkey(self):
ciphertext = self._gpg.encrypt(self._symkey, self._fingerprint,
self._fingerprint)
- f = open(self.secret_path, 'w')
+ f = open(self._config.get_secret_path(), 'w')
f.write(str(ciphertext))
f.close()
diff --git a/src/leap/soledad/config.py b/src/leap/soledad/config.py
new file mode 100644
index 00000000..2b2d910c
--- /dev/null
+++ b/src/leap/soledad/config.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# config.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Management of configuration sources for Soledad.
+"""
+
+import os
+import logging
+
+
+from leap.config.baseconfig import BaseConfig
+
+
+logger = logging.getLogger(name=__name__)
+
+
+PREFIX = os.environ['HOME'] + '/.config/leap/soledad'
+
+
+soledad_config_spec = {
+ 'description': 'sample soledad config',
+ 'type': 'object',
+ 'properties': {
+ 'gnupg_home': {
+ 'type': unicode,
+ 'default': PREFIX + '/gnupg',
+ 'required': True,
+ },
+ 'secret_path': {
+ 'type': unicode,
+ 'default': PREFIX + '/secret.gpg',
+ 'required': True,
+ },
+ 'local_db_path': {
+ #'type': unicode,
+ 'default': PREFIX + '/soledad.u1db',
+ 'required': True,
+ },
+ 'shared_db_url': {
+ 'type': unicode,
+ 'default': 'http://provider/soledad/shared',
+ 'required': True, # should this be True?
+ },
+ }
+}
+
+
+class SoledadConfig(BaseConfig):
+
+ def _get_spec(self):
+ """
+ Returns the spec object for the specific configuration
+ """
+ return soledad_config_spec
+
+ def get_gnupg_home(self):
+ return self._safe_get_value("gnupg_home")
+
+ def get_secret_path(self):
+ return self._safe_get_value("secret_path")
+
+ def get_local_db_path(self):
+ return self._safe_get_value("local_db_path")
+
+ def get_shared_db_url(self):
+ return self._safe_get_value("shared_db_url")
+
+
+if __name__ == "__main__":
+ logger = logging.getLogger(name='leap')
+ logger.setLevel(logging.DEBUG)
+ console = logging.StreamHandler()
+ console.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(
+ '%(asctime)s '
+ '- %(name)s - %(levelname)s - %(message)s')
+ console.setFormatter(formatter)
+ logger.addHandler(console)
+
+ soledadconfig = SoledadConfig()
+
+ try:
+ soledadconfig.get_local_db_path()
+ except Exception as e:
+ assert isinstance(e, AssertionError), "Expected an assert"
+ print "Safe value getting is working"
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py
index 8ceafe99..1dbbe8ab 100644
--- a/src/leap/soledad/tests/__init__.py
+++ b/src/leap/soledad/tests/__init__.py
@@ -31,9 +31,7 @@ class BaseSoledadTest(BaseLeapTest):
self._db2 = u1db.open(self.db2_file, create=True,
document_factory=LeapDocument)
# initialize soledad by hand so we can control keys
- self._soledad = Soledad(self.email, gnupg_home=self.gnupg_home,
- bootstrap=False,
- prefix=self.tempdir)
+ self._soledad = self._soledad_instance(user=self.email)
self._soledad._init_dirs()
self._soledad._gpg = GPGWrapper(gnupghome=self.gnupg_home)
#self._soledad._gpg.import_keys(PUBLIC_KEY)
@@ -49,6 +47,20 @@ class BaseSoledadTest(BaseLeapTest):
self._db2.close()
self._soledad.close()
+ def _soledad_instance(self, user='leap@leap.se', prefix='',
+ bootstrap=False, gnupg_home='/gnupg',
+ secret_path='/secret.gpg',
+ local_db_path='/soledad.u1db'):
+ return Soledad(
+ user,
+ gnupg_home=self.tempdir+prefix+gnupg_home,
+ secret_path=self.tempdir+prefix+secret_path,
+ local_db_path=self.tempdir+prefix+local_db_path,
+ bootstrap=bootstrap)
+
+ def _gpgwrapper_instance(self):
+ return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir)
+
# Key material for testing
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index fdecbeef..919ec88c 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -1,4 +1,29 @@
+# -*- coding: utf-8 -*-
+# test_crypto.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Tests for cryptographic related stuff.
+"""
+
+
import os
+
+
from leap.common.testing.basetest import BaseLeapTest
from leap.soledad.backends.leap_backend import LeapDocument
from leap.soledad.tests import BaseSoledadTest
@@ -6,10 +31,7 @@ from leap.soledad.tests import (
KEY_FINGERPRINT,
PRIVATE_KEY,
)
-from leap.soledad import (
- Soledad,
- KeyAlreadyExists,
-)
+from leap.soledad import KeyAlreadyExists
from leap.soledad.util import GPGWrapper
try:
@@ -93,8 +115,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
def test_import_recovery_document_raw(self):
rd = self._soledad.export_recovery_document(None)
gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir
- s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home,
- bootstrap=False, prefix=self.tempdir)
+ s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2')
s._init_dirs()
s._gpg = GPGWrapper(gnupghome=gnupg_home)
s.import_recovery_document(rd, None)
@@ -119,8 +140,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
def test_import_recovery_document_crypt(self):
rd = self._soledad.export_recovery_document('123456')
gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir
- s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home,
- bootstrap=False, prefix=self.tempdir)
+ s = self._soledad_instance(user='anotheruser@leap.se')
s._init_dirs()
s._gpg = GPGWrapper(gnupghome=gnupg_home)
s.import_recovery_document(rd, '123456')
@@ -143,52 +163,19 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
)
-class SoledadAuxMethods(BaseLeapTest):
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def _soledad_instance(self, prefix=None):
- return Soledad('leap@leap.se', bootstrap=False,
- prefix=prefix or self.tempdir+'/soledad')
-
- def _gpgwrapper_instance(self):
- return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir)
-
- def test__init_dirs(self):
- sol = self._soledad_instance()
- sol._init_dirs()
- self.assertTrue(os.path.isdir(sol.prefix))
-
- def test__init_db(self):
- sol = self._soledad_instance()
- sol._init_dirs()
- sol._gpg = self._gpgwrapper_instance()
- #self._soledad._gpg.import_keys(PUBLIC_KEY)
- if not sol._has_privkey():
- sol._set_privkey(PRIVATE_KEY)
- if not sol._has_symkey():
- sol._gen_symkey()
- sol._load_symkey()
- sol._init_db()
- from leap.soledad.backends.sqlcipher import SQLCipherDatabase
- self.assertIsInstance(sol._db, SQLCipherDatabase)
+class CryptoMethodsTestCase(BaseSoledadTest):
def test__gen_privkey(self):
- sol = self._soledad_instance()
+ sol = self._soledad_instance(user='user@leap.se', prefix='/4')
sol._init_dirs()
sol._gpg = GPGWrapper(gnupghome="%s/gnupg2" % self.tempdir)
self.assertFalse(sol._has_privkey(), 'Should not have a private key '
'at this point.')
- sol._set_privkey(PRIVATE_KEY)
+ sol._gen_privkey()
self.assertTrue(sol._has_privkey(), 'Could not generate privkey.')
def test__gen_symkey(self):
- sol = Soledad('leap@leap.se', bootstrap=False,
- prefix=self.tempdir+'/soledad3')
+ sol = self._soledad_instance(user='user@leap.se', prefix='/3')
sol._init_dirs()
sol._gpg = GPGWrapper(gnupghome="%s/gnupg3" % self.tempdir)
if not sol._has_privkey():
@@ -199,11 +186,12 @@ class SoledadAuxMethods(BaseLeapTest):
self.assertTrue(sol._has_symkey(), "Could not generate symkey.")
def test__has_keys(self):
- sol = self._soledad_instance()
+ sol = self._soledad_instance(user='leap@leap.se', prefix='/5')
sol._init_dirs()
- sol._gpg = self._gpgwrapper_instance()
+ sol._gpg = GPGWrapper(gnupghome=self.tempdir+"/5/gnupg")
self.assertFalse(sol._has_keys())
sol._set_privkey(PRIVATE_KEY)
+ sol._has_privkey()
self.assertFalse(sol._has_keys())
sol._gen_symkey()
self.assertTrue(sol._has_keys())
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
new file mode 100644
index 00000000..92d5182b
--- /dev/null
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+# test_soledad.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Tests for general Soledad functionality.
+"""
+
+
+import os
+import re
+import tempfile
+try:
+ import simplejson as json
+except ImportError:
+ import json # noqa
+
+
+from leap.soledad.tests import BaseSoledadTest
+from leap.soledad import Soledad
+
+
+class AuxMethodsTestCase(BaseSoledadTest):
+
+ def test__init_dirs(self):
+ sol = self._soledad_instance(prefix='/_init_dirs')
+ sol._init_dirs()
+ local_db_dir = os.path.dirname(sol._config.get_local_db_path())
+ gnupg_home = os.path.dirname(sol._config.get_gnupg_home())
+ secret_path = os.path.dirname(sol._config.get_secret_path())
+ self.assertTrue(os.path.isdir(local_db_dir))
+ self.assertTrue(os.path.isdir(gnupg_home))
+ self.assertTrue(os.path.isdir(secret_path))
+
+ def test__init_db(self):
+ sol = self._soledad_instance()
+ sol._init_dirs()
+ sol._gpg = self._gpgwrapper_instance()
+ #self._soledad._gpg.import_keys(PUBLIC_KEY)
+ if not sol._has_privkey():
+ sol._set_privkey(PRIVATE_KEY)
+ if not sol._has_symkey():
+ sol._gen_symkey()
+ sol._load_symkey()
+ sol._init_db()
+ from leap.soledad.backends.sqlcipher import SQLCipherDatabase
+ self.assertIsInstance(sol._db, SQLCipherDatabase)
+
+ def test__init_config_default(self):
+ """
+ Test if configuration defaults point to the correct place.
+ """
+ sol = Soledad(user='leap@leap.se', bootstrap=False)
+ self.assertTrue(bool(re.match(
+ '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))
+ self.assertTrue(bool(re.match(
+ '.*/\.config/leap/soledad/secret.gpg',
+ sol._config.get_secret_path())))
+ self.assertTrue(bool(re.match(
+ '.*/\.config/leap/soledad/soledad.u1db',
+ sol._config.get_local_db_path())))
+ self.assertEqual(
+ 'http://provider/soledad/shared',
+ sol._config.get_shared_db_url())
+
+ def test__init_config_defaults(self):
+ """
+ Test if configuration defaults point to the correct place.
+ """
+ # we use regexp match here because HOME environment variable is
+ # changed by the BaseLeapTest class but BaseConfig does not capture
+ # that change.
+ sol = Soledad(user='leap@leap.se', bootstrap=False)
+ self.assertTrue(bool(re.match(
+ '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home())))
+ self.assertTrue(bool(re.match(
+ '.*/\.config/leap/soledad/secret.gpg',
+ sol._config.get_secret_path())))
+ self.assertTrue(bool(re.match(
+ '.*/\.config/leap/soledad/soledad.u1db',
+ sol._config.get_local_db_path())))
+ self.assertEqual(
+ 'http://provider/soledad/shared',
+ sol._config.get_shared_db_url())
+
+ def test__init_config_from_file(self):
+ """
+ Test if configuration is correctly read from file.
+ """
+ # we use regexp match here because HOME environment variable is
+ # changed by the BaseLeapTest class but BaseConfig does not capture
+ # that change.
+ config_values = {
+ "gnupg_home": "value_1",
+ "secret_path": "value_2",
+ "local_db_path": "value_3",
+ "shared_db_url": "value_4"
+ }
+ tmpfile = tempfile.mktemp(dir=self.tempdir)
+ f = open(tmpfile, 'w')
+ f.write(json.dumps(config_values))
+ f.close()
+ sol = Soledad(
+ user='leap@leap.se',
+ bootstrap=False,
+ config_path=tmpfile)
+ self.assertEqual('value_1', sol._config.get_gnupg_home())
+ self.assertEqual('value_2', sol._config.get_secret_path())
+ self.assertEqual('value_3', sol._config.get_local_db_path())
+ self.assertEqual('value_4', sol._config.get_shared_db_url())
+
+ def test__init_config_from_params(self):
+ """
+ Test if configuration is correctly read from file.
+ """
+ # we use regexp match here because HOME environment variable is
+ # changed by the BaseLeapTest class but BaseConfig does not capture
+ # that change.
+ sol = Soledad(
+ user='leap@leap.se',
+ bootstrap=False,
+ gnupg_home='value_4',
+ secret_path='value_3',
+ local_db_path='value_2',
+ shared_db_url='value_1')
+ self.assertEqual('value_4', sol._config.get_gnupg_home())
+ self.assertEqual('value_3', sol._config.get_secret_path())
+ self.assertEqual('value_2', sol._config.get_local_db_path())
+ self.assertEqual('value_1', sol._config.get_shared_db_url())