From 7d63e687e4c3b6e4da4599a7e685dc5c44457a64 Mon Sep 17 00:00:00 2001 From: drebs Date: Sat, 13 Apr 2013 12:33:59 -0300 Subject: Use BaseConfig for configuring Soledad. --- src/leap/soledad/__init__.py | 113 +++++++++++++------------- src/leap/soledad/config.py | 102 +++++++++++++++++++++++ src/leap/soledad/tests/__init__.py | 18 ++++- src/leap/soledad/tests/test_crypto.py | 82 ++++++++----------- src/leap/soledad/tests/test_soledad.py | 143 +++++++++++++++++++++++++++++++++ 5 files changed, 349 insertions(+), 109 deletions(-) create mode 100644 src/leap/soledad/config.py create mode 100644 src/leap/soledad/tests/test_soledad.py (limited to 'src') diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index b636b744..38b1f772 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -38,8 +38,9 @@ except ImportError: import json # noqa -from leap.soledad.backends import sqlcipher +from leap.soledad.config import SoledadConfig from leap.soledad.util import GPGWrapper +from leap.soledad.backends import sqlcipher from leap.soledad.backends.leap_backend import ( LeapDocument, DocumentNotEncrypted, @@ -75,27 +76,21 @@ class Soledad(object): storing/fetching them on Soledad server. """ - # other configs SECRET_LENGTH = 50 - DEFAULT_CONF = { - 'gnupg_home': '%s/gnupg', - 'secret_path': '%s/secret.gpg', - 'local_db_path': '%s/soledad.u1db', - 'config_file': '%s/soledad.ini', - 'shared_db_url': '', - } - - def __init__(self, user, prefix=None, gnupg_home=None, + """ + The length of the secret used for symmetric encryption. + """ + + def __init__(self, user, config_path=None, gnupg_home=None, secret_path=None, local_db_path=None, - config_file=None, shared_db_url=None, auth_token=None, - bootstrap=True): + shared_db_url=None, auth_token=None, bootstrap=True): """ Initialize configuration, cryptographic keys and dbs. @param user: Email address of the user (username@provider). @type user: str - @param prefix: Path to use as prefix for files. - @type prefix: str + @param config_path: Path for configuration file. + @type config_path: str @param gnupg_home: Home directory for gnupg. @type gnupg_home: str @param secret_path: Path for storing gpg-encrypted key used for @@ -103,8 +98,6 @@ class Soledad(object): @type secret_path: str @param local_db_path: Path for local encrypted storage db. @type local_db_path: str - @param config_file: Path for configuration file. - @type config_file: str @param shared_db_url: URL for shared Soledad DB for key storage and unauth retrieval. @type shared_db_url: str @@ -118,11 +111,10 @@ class Soledad(object): self._user = user self._auth_token = auth_token self._init_config( - prefix=prefix, + config_path=config_path, gnupg_home=gnupg_home, secret_path=secret_path, local_db_path=local_db_path, - config_file=config_file, shared_db_url=shared_db_url, ) if bootstrap: @@ -156,7 +148,15 @@ class Soledad(object): # TODO: log each bootstrap step. # Stage 0 - Local environment setup self._init_dirs() - self._gpg = GPGWrapper(gnupghome=self.gnupg_home) + self._gpg = GPGWrapper(gnupghome=self._config.get_gnupg_home()) + if self._config.get_shared_db_url() and self._auth_token: + # TODO: eliminate need to create db here. + self._shared_db = SoledadSharedDatabase.open_database( + self._config.get_shared_db_url(), + True, + token=self._auth_token) + else: + self._shared_db = None # Stage 1 - Keys generation/loading if self._has_keys(): self._load_keys() @@ -170,50 +170,44 @@ class Soledad(object): self._set_symkey(self.decrypt(doc.content['_symkey'])) # Stage 2 - Keys synchronization self._assert_server_keys() - # Stage 3 -Database initialization + # Stage 3 - Local database initialization self._init_db() - if self.shared_db_url: - # TODO: eliminate need to create db here. - self._shared_db = SoledadSharedDatabase.open_database( - self.shared_db_url, - True, - token=auth_token) - else: - self._shared_db = None def _init_config(self, **kwargs): """ - Initialize configuration, with precedence order give by: instance - parameters > config file > default values. + Initialize configuration using SoledadConfig. - @param kwargs: a dictionary with parameter values passed when - instantiating this Soledad instance. + Soledad configuration makes use of BaseLeapConfig to load values from + a file or from default configuration. Parameters passed as arguments + for this method will supersede file and default values. + + @param kwargs: a dictionary with configuration parameter values passed + when instantiating this Soledad instance. @type kwargs: dict """ - # TODO: write tests for _init_config() - self.prefix = kwargs['prefix'] or \ - os.environ['HOME'] + '/.config/leap/soledad' - m = re.compile('.*%s.*') - for key, default_value in self.DEFAULT_CONF.iteritems(): - val = kwargs[key] or default_value - if m.match(val): - val = val % self.prefix - setattr(self, key, val) - # get config from file - # TODO: sanitize options from config file. - config = configparser.ConfigParser() - config.read(self.config_file) - if 'soledad-client' in config: - for key in self.DEFAULT_CONF: - if key in config['soledad-client'] and not kwargs[key]: - setattr(self, key, config['soledad-client'][key]) + self._config = SoledadConfig() + config_file = kwargs.get('config_path', None) + if config_file is not None: + self._config.load(path=config_file) + else: + self._config.load(data='') + # overwrite config with passed parameters + for param in ['gnupg_home', 'secret_path', 'local_db_path', + 'shared_db_url']: + if param in kwargs and kwargs[param] is not None: + self._config._config_checker.config[param] = kwargs[param] def _init_dirs(self): """ Create work directories. """ - if not os.path.isdir(self.prefix): - os.makedirs(self.prefix) + paths = map( + lambda x: os.path.dirname(x), + [self._config.get_gnupg_home(), self._config.get_local_db_path(), + self._config.get_secret_path()]) + for path in paths: + if not os.path.isdir(path): + os.makedirs(path) def _init_keys(self): """ @@ -238,7 +232,7 @@ class Soledad(object): # TODO: verify if secret for sqlcipher should be the same as the # one for symmetric encryption. self._db = sqlcipher.open( - self.local_db_path, + self._config.get_local_db_path(), self._symkey, create=True, document_factory=LeapDocument, @@ -267,14 +261,14 @@ class Soledad(object): @rtype: bool """ # does the file exist in disk? - if not os.path.isfile(self.secret_path): + if not os.path.isfile(self._config.get_secret_path()): return False # is it asymmetrically encrypted? - f = open(self.secret_path, 'r') + f = open(self._config.get_secret_path(), 'r') content = f.read() if not self.is_encrypted_asym(content): raise DocumentNotEncrypted( - "File %s is not encrypted!" % self.secret_path) + "File %s is not encrypted!" % self._config.get_secret_path()) # can we decrypt it? fp = self._gpg.encrypted_to(content)['fingerprint'] if fp != self._fingerprint: @@ -291,10 +285,11 @@ class Soledad(object): raise KeyDoesNotExist("Tried to load key for symmetric " "encryption but it does not exist on disk.") try: - with open(self.secret_path) as f: + with open(self._config.get_secret_path()) as f: self._symkey = str(self._gpg.decrypt(f.read())) except IOError: - raise IOError('Failed to open secret file %s.' % self.secret_path) + raise IOError('Failed to open secret file %s.' % + self._config.get_secret_path()) def _gen_symkey(self): """ @@ -323,7 +318,7 @@ class Soledad(object): def _store_symkey(self): ciphertext = self._gpg.encrypt(self._symkey, self._fingerprint, self._fingerprint) - f = open(self.secret_path, 'w') + f = open(self._config.get_secret_path(), 'w') f.write(str(ciphertext)) f.close() diff --git a/src/leap/soledad/config.py b/src/leap/soledad/config.py new file mode 100644 index 00000000..2b2d910c --- /dev/null +++ b/src/leap/soledad/config.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# config.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Management of configuration sources for Soledad. +""" + +import os +import logging + + +from leap.config.baseconfig import BaseConfig + + +logger = logging.getLogger(name=__name__) + + +PREFIX = os.environ['HOME'] + '/.config/leap/soledad' + + +soledad_config_spec = { + 'description': 'sample soledad config', + 'type': 'object', + 'properties': { + 'gnupg_home': { + 'type': unicode, + 'default': PREFIX + '/gnupg', + 'required': True, + }, + 'secret_path': { + 'type': unicode, + 'default': PREFIX + '/secret.gpg', + 'required': True, + }, + 'local_db_path': { + #'type': unicode, + 'default': PREFIX + '/soledad.u1db', + 'required': True, + }, + 'shared_db_url': { + 'type': unicode, + 'default': 'http://provider/soledad/shared', + 'required': True, # should this be True? + }, + } +} + + +class SoledadConfig(BaseConfig): + + def _get_spec(self): + """ + Returns the spec object for the specific configuration + """ + return soledad_config_spec + + def get_gnupg_home(self): + return self._safe_get_value("gnupg_home") + + def get_secret_path(self): + return self._safe_get_value("secret_path") + + def get_local_db_path(self): + return self._safe_get_value("local_db_path") + + def get_shared_db_url(self): + return self._safe_get_value("shared_db_url") + + +if __name__ == "__main__": + logger = logging.getLogger(name='leap') + logger.setLevel(logging.DEBUG) + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s ' + '- %(name)s - %(levelname)s - %(message)s') + console.setFormatter(formatter) + logger.addHandler(console) + + soledadconfig = SoledadConfig() + + try: + soledadconfig.get_local_db_path() + except Exception as e: + assert isinstance(e, AssertionError), "Expected an assert" + print "Safe value getting is working" diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py index 8ceafe99..1dbbe8ab 100644 --- a/src/leap/soledad/tests/__init__.py +++ b/src/leap/soledad/tests/__init__.py @@ -31,9 +31,7 @@ class BaseSoledadTest(BaseLeapTest): self._db2 = u1db.open(self.db2_file, create=True, document_factory=LeapDocument) # initialize soledad by hand so we can control keys - self._soledad = Soledad(self.email, gnupg_home=self.gnupg_home, - bootstrap=False, - prefix=self.tempdir) + self._soledad = self._soledad_instance(user=self.email) self._soledad._init_dirs() self._soledad._gpg = GPGWrapper(gnupghome=self.gnupg_home) #self._soledad._gpg.import_keys(PUBLIC_KEY) @@ -49,6 +47,20 @@ class BaseSoledadTest(BaseLeapTest): self._db2.close() self._soledad.close() + def _soledad_instance(self, user='leap@leap.se', prefix='', + bootstrap=False, gnupg_home='/gnupg', + secret_path='/secret.gpg', + local_db_path='/soledad.u1db'): + return Soledad( + user, + gnupg_home=self.tempdir+prefix+gnupg_home, + secret_path=self.tempdir+prefix+secret_path, + local_db_path=self.tempdir+prefix+local_db_path, + bootstrap=bootstrap) + + def _gpgwrapper_instance(self): + return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) + # Key material for testing KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index fdecbeef..919ec88c 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -1,4 +1,29 @@ +# -*- coding: utf-8 -*- +# test_crypto.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Tests for cryptographic related stuff. +""" + + import os + + from leap.common.testing.basetest import BaseLeapTest from leap.soledad.backends.leap_backend import LeapDocument from leap.soledad.tests import BaseSoledadTest @@ -6,10 +31,7 @@ from leap.soledad.tests import ( KEY_FINGERPRINT, PRIVATE_KEY, ) -from leap.soledad import ( - Soledad, - KeyAlreadyExists, -) +from leap.soledad import KeyAlreadyExists from leap.soledad.util import GPGWrapper try: @@ -93,8 +115,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest): def test_import_recovery_document_raw(self): rd = self._soledad.export_recovery_document(None) gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir - s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, - bootstrap=False, prefix=self.tempdir) + s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2') s._init_dirs() s._gpg = GPGWrapper(gnupghome=gnupg_home) s.import_recovery_document(rd, None) @@ -119,8 +140,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest): def test_import_recovery_document_crypt(self): rd = self._soledad.export_recovery_document('123456') gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir - s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, - bootstrap=False, prefix=self.tempdir) + s = self._soledad_instance(user='anotheruser@leap.se') s._init_dirs() s._gpg = GPGWrapper(gnupghome=gnupg_home) s.import_recovery_document(rd, '123456') @@ -143,52 +163,19 @@ class RecoveryDocumentTestCase(BaseSoledadTest): ) -class SoledadAuxMethods(BaseLeapTest): - - def setUp(self): - pass - - def tearDown(self): - pass - - def _soledad_instance(self, prefix=None): - return Soledad('leap@leap.se', bootstrap=False, - prefix=prefix or self.tempdir+'/soledad') - - def _gpgwrapper_instance(self): - return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) - - def test__init_dirs(self): - sol = self._soledad_instance() - sol._init_dirs() - self.assertTrue(os.path.isdir(sol.prefix)) - - def test__init_db(self): - sol = self._soledad_instance() - sol._init_dirs() - sol._gpg = self._gpgwrapper_instance() - #self._soledad._gpg.import_keys(PUBLIC_KEY) - if not sol._has_privkey(): - sol._set_privkey(PRIVATE_KEY) - if not sol._has_symkey(): - sol._gen_symkey() - sol._load_symkey() - sol._init_db() - from leap.soledad.backends.sqlcipher import SQLCipherDatabase - self.assertIsInstance(sol._db, SQLCipherDatabase) +class CryptoMethodsTestCase(BaseSoledadTest): def test__gen_privkey(self): - sol = self._soledad_instance() + sol = self._soledad_instance(user='user@leap.se', prefix='/4') sol._init_dirs() sol._gpg = GPGWrapper(gnupghome="%s/gnupg2" % self.tempdir) self.assertFalse(sol._has_privkey(), 'Should not have a private key ' 'at this point.') - sol._set_privkey(PRIVATE_KEY) + sol._gen_privkey() self.assertTrue(sol._has_privkey(), 'Could not generate privkey.') def test__gen_symkey(self): - sol = Soledad('leap@leap.se', bootstrap=False, - prefix=self.tempdir+'/soledad3') + sol = self._soledad_instance(user='user@leap.se', prefix='/3') sol._init_dirs() sol._gpg = GPGWrapper(gnupghome="%s/gnupg3" % self.tempdir) if not sol._has_privkey(): @@ -199,11 +186,12 @@ class SoledadAuxMethods(BaseLeapTest): self.assertTrue(sol._has_symkey(), "Could not generate symkey.") def test__has_keys(self): - sol = self._soledad_instance() + sol = self._soledad_instance(user='leap@leap.se', prefix='/5') sol._init_dirs() - sol._gpg = self._gpgwrapper_instance() + sol._gpg = GPGWrapper(gnupghome=self.tempdir+"/5/gnupg") self.assertFalse(sol._has_keys()) sol._set_privkey(PRIVATE_KEY) + sol._has_privkey() self.assertFalse(sol._has_keys()) sol._gen_symkey() self.assertTrue(sol._has_keys()) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py new file mode 100644 index 00000000..92d5182b --- /dev/null +++ b/src/leap/soledad/tests/test_soledad.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Tests for general Soledad functionality. +""" + + +import os +import re +import tempfile +try: + import simplejson as json +except ImportError: + import json # noqa + + +from leap.soledad.tests import BaseSoledadTest +from leap.soledad import Soledad + + +class AuxMethodsTestCase(BaseSoledadTest): + + def test__init_dirs(self): + sol = self._soledad_instance(prefix='/_init_dirs') + sol._init_dirs() + local_db_dir = os.path.dirname(sol._config.get_local_db_path()) + gnupg_home = os.path.dirname(sol._config.get_gnupg_home()) + secret_path = os.path.dirname(sol._config.get_secret_path()) + self.assertTrue(os.path.isdir(local_db_dir)) + self.assertTrue(os.path.isdir(gnupg_home)) + self.assertTrue(os.path.isdir(secret_path)) + + def test__init_db(self): + sol = self._soledad_instance() + sol._init_dirs() + sol._gpg = self._gpgwrapper_instance() + #self._soledad._gpg.import_keys(PUBLIC_KEY) + if not sol._has_privkey(): + sol._set_privkey(PRIVATE_KEY) + if not sol._has_symkey(): + sol._gen_symkey() + sol._load_symkey() + sol._init_db() + from leap.soledad.backends.sqlcipher import SQLCipherDatabase + self.assertIsInstance(sol._db, SQLCipherDatabase) + + def test__init_config_default(self): + """ + Test if configuration defaults point to the correct place. + """ + sol = Soledad(user='leap@leap.se', bootstrap=False) + self.assertTrue(bool(re.match( + '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home()))) + self.assertTrue(bool(re.match( + '.*/\.config/leap/soledad/secret.gpg', + sol._config.get_secret_path()))) + self.assertTrue(bool(re.match( + '.*/\.config/leap/soledad/soledad.u1db', + sol._config.get_local_db_path()))) + self.assertEqual( + 'http://provider/soledad/shared', + sol._config.get_shared_db_url()) + + def test__init_config_defaults(self): + """ + Test if configuration defaults point to the correct place. + """ + # we use regexp match here because HOME environment variable is + # changed by the BaseLeapTest class but BaseConfig does not capture + # that change. + sol = Soledad(user='leap@leap.se', bootstrap=False) + self.assertTrue(bool(re.match( + '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home()))) + self.assertTrue(bool(re.match( + '.*/\.config/leap/soledad/secret.gpg', + sol._config.get_secret_path()))) + self.assertTrue(bool(re.match( + '.*/\.config/leap/soledad/soledad.u1db', + sol._config.get_local_db_path()))) + self.assertEqual( + 'http://provider/soledad/shared', + sol._config.get_shared_db_url()) + + def test__init_config_from_file(self): + """ + Test if configuration is correctly read from file. + """ + # we use regexp match here because HOME environment variable is + # changed by the BaseLeapTest class but BaseConfig does not capture + # that change. + config_values = { + "gnupg_home": "value_1", + "secret_path": "value_2", + "local_db_path": "value_3", + "shared_db_url": "value_4" + } + tmpfile = tempfile.mktemp(dir=self.tempdir) + f = open(tmpfile, 'w') + f.write(json.dumps(config_values)) + f.close() + sol = Soledad( + user='leap@leap.se', + bootstrap=False, + config_path=tmpfile) + self.assertEqual('value_1', sol._config.get_gnupg_home()) + self.assertEqual('value_2', sol._config.get_secret_path()) + self.assertEqual('value_3', sol._config.get_local_db_path()) + self.assertEqual('value_4', sol._config.get_shared_db_url()) + + def test__init_config_from_params(self): + """ + Test if configuration is correctly read from file. + """ + # we use regexp match here because HOME environment variable is + # changed by the BaseLeapTest class but BaseConfig does not capture + # that change. + sol = Soledad( + user='leap@leap.se', + bootstrap=False, + gnupg_home='value_4', + secret_path='value_3', + local_db_path='value_2', + shared_db_url='value_1') + self.assertEqual('value_4', sol._config.get_gnupg_home()) + self.assertEqual('value_3', sol._config.get_secret_path()) + self.assertEqual('value_2', sol._config.get_local_db_path()) + self.assertEqual('value_1', sol._config.get_shared_db_url()) -- cgit v1.2.3