diff options
| -rw-r--r-- | src/leap/soledad/__init__.py | 113 | ||||
| -rw-r--r-- | src/leap/soledad/config.py | 102 | ||||
| -rw-r--r-- | src/leap/soledad/tests/__init__.py | 18 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_crypto.py | 82 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_soledad.py | 143 | 
5 files changed, 349 insertions, 109 deletions
| diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index b636b744..38b1f772 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -38,8 +38,9 @@ except ImportError:      import json  # noqa -from leap.soledad.backends import sqlcipher +from leap.soledad.config import SoledadConfig  from leap.soledad.util import GPGWrapper +from leap.soledad.backends import sqlcipher  from leap.soledad.backends.leap_backend import (      LeapDocument,      DocumentNotEncrypted, @@ -75,27 +76,21 @@ class Soledad(object):      storing/fetching them on Soledad server.      """ -    # other configs      SECRET_LENGTH = 50 -    DEFAULT_CONF = { -        'gnupg_home': '%s/gnupg', -        'secret_path': '%s/secret.gpg', -        'local_db_path': '%s/soledad.u1db', -        'config_file': '%s/soledad.ini', -        'shared_db_url': '', -    } - -    def __init__(self, user, prefix=None, gnupg_home=None, +    """ +    The length of the secret used for symmetric encryption. +    """ + +    def __init__(self, user, config_path=None, gnupg_home=None,                   secret_path=None, local_db_path=None, -                 config_file=None, shared_db_url=None, auth_token=None, -                 bootstrap=True): +                 shared_db_url=None, auth_token=None, bootstrap=True):          """          Initialize configuration, cryptographic keys and dbs.          @param user: Email address of the user (username@provider).          @type user: str -        @param prefix: Path to use as prefix for files. -        @type prefix: str +        @param config_path: Path for configuration file. +        @type config_path: str          @param gnupg_home: Home directory for gnupg.          @type gnupg_home: str          @param secret_path: Path for storing gpg-encrypted key used for @@ -103,8 +98,6 @@ class Soledad(object):          @type secret_path: str          @param local_db_path: Path for local encrypted storage db.          @type local_db_path: str -        @param config_file: Path for configuration file. -        @type config_file: str          @param shared_db_url: URL for shared Soledad DB for key storage and              unauth retrieval.          @type shared_db_url: str @@ -118,11 +111,10 @@ class Soledad(object):          self._user = user          self._auth_token = auth_token          self._init_config( -            prefix=prefix, +            config_path=config_path,              gnupg_home=gnupg_home,              secret_path=secret_path,              local_db_path=local_db_path, -            config_file=config_file,              shared_db_url=shared_db_url,          )          if bootstrap: @@ -156,7 +148,15 @@ class Soledad(object):          # TODO: log each bootstrap step.          # Stage 0  - Local environment setup          self._init_dirs() -        self._gpg = GPGWrapper(gnupghome=self.gnupg_home) +        self._gpg = GPGWrapper(gnupghome=self._config.get_gnupg_home()) +        if self._config.get_shared_db_url() and self._auth_token: +            # TODO: eliminate need to create db here. +            self._shared_db = SoledadSharedDatabase.open_database( +                self._config.get_shared_db_url(), +                True, +                token=self._auth_token) +        else: +            self._shared_db = None          # Stage 1 - Keys generation/loading          if self._has_keys():              self._load_keys() @@ -170,50 +170,44 @@ class Soledad(object):                  self._set_symkey(self.decrypt(doc.content['_symkey']))          # Stage 2 - Keys synchronization          self._assert_server_keys() -        # Stage 3 -Database initialization +        # Stage 3 - Local database initialization          self._init_db() -        if self.shared_db_url: -            # TODO: eliminate need to create db here. -            self._shared_db = SoledadSharedDatabase.open_database( -                self.shared_db_url, -                True, -                token=auth_token) -        else: -            self._shared_db = None      def _init_config(self, **kwargs):          """ -        Initialize configuration, with precedence order give by: instance -        parameters > config file > default values. +        Initialize configuration using SoledadConfig. -        @param kwargs: a dictionary with parameter values passed when -            instantiating this Soledad instance. +        Soledad configuration makes use of BaseLeapConfig to load values from +        a file or from default configuration. Parameters passed as arguments +        for this method will supersede file and default values. + +        @param kwargs: a dictionary with configuration parameter values passed +            when instantiating this Soledad instance.          @type kwargs: dict          """ -        # TODO: write tests for _init_config() -        self.prefix = kwargs['prefix'] or \ -            os.environ['HOME'] + '/.config/leap/soledad' -        m = re.compile('.*%s.*') -        for key, default_value in self.DEFAULT_CONF.iteritems(): -            val = kwargs[key] or default_value -            if m.match(val): -                val = val % self.prefix -            setattr(self, key, val) -        # get config from file -        # TODO: sanitize options from config file. -        config = configparser.ConfigParser() -        config.read(self.config_file) -        if 'soledad-client' in config: -            for key in self.DEFAULT_CONF: -                if key in config['soledad-client'] and not kwargs[key]: -                    setattr(self, key, config['soledad-client'][key]) +        self._config = SoledadConfig() +        config_file = kwargs.get('config_path', None) +        if config_file is not None: +            self._config.load(path=config_file) +        else: +            self._config.load(data='') +        # overwrite config with passed parameters +        for param in ['gnupg_home', 'secret_path', 'local_db_path', +                      'shared_db_url']: +            if param in kwargs and kwargs[param] is not None: +                self._config._config_checker.config[param] = kwargs[param]      def _init_dirs(self):          """          Create work directories.          """ -        if not os.path.isdir(self.prefix): -            os.makedirs(self.prefix) +        paths = map( +            lambda x: os.path.dirname(x), +            [self._config.get_gnupg_home(), self._config.get_local_db_path(), +             self._config.get_secret_path()]) +        for path in paths: +            if not os.path.isdir(path): +                os.makedirs(path)      def _init_keys(self):          """ @@ -238,7 +232,7 @@ class Soledad(object):          # TODO: verify if secret for sqlcipher should be the same as the          # one for symmetric encryption.          self._db = sqlcipher.open( -            self.local_db_path, +            self._config.get_local_db_path(),              self._symkey,              create=True,              document_factory=LeapDocument, @@ -267,14 +261,14 @@ class Soledad(object):          @rtype: bool          """          # does the file exist in disk? -        if not os.path.isfile(self.secret_path): +        if not os.path.isfile(self._config.get_secret_path()):              return False          # is it asymmetrically encrypted? -        f = open(self.secret_path, 'r') +        f = open(self._config.get_secret_path(), 'r')          content = f.read()          if not self.is_encrypted_asym(content):              raise DocumentNotEncrypted( -                "File %s is not encrypted!" % self.secret_path) +                "File %s is not encrypted!" % self._config.get_secret_path())          # can we decrypt it?          fp = self._gpg.encrypted_to(content)['fingerprint']          if fp != self._fingerprint: @@ -291,10 +285,11 @@ class Soledad(object):              raise KeyDoesNotExist("Tried to load key for symmetric "                                    "encryption but it does not exist on disk.")          try: -            with open(self.secret_path) as f: +            with open(self._config.get_secret_path()) as f:                  self._symkey = str(self._gpg.decrypt(f.read()))          except IOError: -            raise IOError('Failed to open secret file %s.' % self.secret_path) +            raise IOError('Failed to open secret file %s.' % +                          self._config.get_secret_path())      def _gen_symkey(self):          """ @@ -323,7 +318,7 @@ class Soledad(object):      def _store_symkey(self):          ciphertext = self._gpg.encrypt(self._symkey, self._fingerprint,                                         self._fingerprint) -        f = open(self.secret_path, 'w') +        f = open(self._config.get_secret_path(), 'w')          f.write(str(ciphertext))          f.close() diff --git a/src/leap/soledad/config.py b/src/leap/soledad/config.py new file mode 100644 index 00000000..2b2d910c --- /dev/null +++ b/src/leap/soledad/config.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# config.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Management of configuration sources for Soledad. +""" + +import os +import logging + + +from leap.config.baseconfig import BaseConfig + + +logger = logging.getLogger(name=__name__) + + +PREFIX = os.environ['HOME'] + '/.config/leap/soledad' + + +soledad_config_spec = { +    'description': 'sample soledad config', +    'type': 'object', +    'properties': { +        'gnupg_home': { +            'type': unicode, +            'default': PREFIX + '/gnupg', +            'required': True, +        }, +        'secret_path': { +            'type': unicode, +            'default': PREFIX + '/secret.gpg', +            'required': True, +        }, +        'local_db_path': { +            #'type': unicode, +            'default': PREFIX + '/soledad.u1db', +            'required': True, +        }, +        'shared_db_url': { +            'type': unicode, +            'default': 'http://provider/soledad/shared', +            'required': True,  # should this be True? +        }, +    } +} + + +class SoledadConfig(BaseConfig): + +    def _get_spec(self): +        """ +        Returns the spec object for the specific configuration +        """ +        return soledad_config_spec + +    def get_gnupg_home(self): +        return self._safe_get_value("gnupg_home") + +    def get_secret_path(self): +        return self._safe_get_value("secret_path") + +    def get_local_db_path(self): +        return self._safe_get_value("local_db_path") + +    def get_shared_db_url(self): +        return self._safe_get_value("shared_db_url") + + +if __name__ == "__main__": +    logger = logging.getLogger(name='leap') +    logger.setLevel(logging.DEBUG) +    console = logging.StreamHandler() +    console.setLevel(logging.DEBUG) +    formatter = logging.Formatter( +        '%(asctime)s ' +        '- %(name)s - %(levelname)s - %(message)s') +    console.setFormatter(formatter) +    logger.addHandler(console) + +    soledadconfig = SoledadConfig() + +    try: +        soledadconfig.get_local_db_path() +    except Exception as e: +        assert isinstance(e, AssertionError), "Expected an assert" +        print "Safe value getting is working" diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py index 8ceafe99..1dbbe8ab 100644 --- a/src/leap/soledad/tests/__init__.py +++ b/src/leap/soledad/tests/__init__.py @@ -31,9 +31,7 @@ class BaseSoledadTest(BaseLeapTest):          self._db2 = u1db.open(self.db2_file, create=True,                                document_factory=LeapDocument)          # initialize soledad by hand so we can control keys -        self._soledad = Soledad(self.email, gnupg_home=self.gnupg_home, -                                bootstrap=False, -                                prefix=self.tempdir) +        self._soledad = self._soledad_instance(user=self.email)          self._soledad._init_dirs()          self._soledad._gpg = GPGWrapper(gnupghome=self.gnupg_home)          #self._soledad._gpg.import_keys(PUBLIC_KEY) @@ -49,6 +47,20 @@ class BaseSoledadTest(BaseLeapTest):          self._db2.close()          self._soledad.close() +    def _soledad_instance(self, user='leap@leap.se', prefix='', +                          bootstrap=False, gnupg_home='/gnupg', +                          secret_path='/secret.gpg', +                          local_db_path='/soledad.u1db'): +        return Soledad( +            user, +            gnupg_home=self.tempdir+prefix+gnupg_home, +            secret_path=self.tempdir+prefix+secret_path, +            local_db_path=self.tempdir+prefix+local_db_path, +            bootstrap=bootstrap) + +    def _gpgwrapper_instance(self): +        return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) +  # Key material for testing  KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index fdecbeef..919ec88c 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -1,4 +1,29 @@ +# -*- coding: utf-8 -*- +# test_crypto.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for cryptographic related stuff. +""" + +  import os + +  from leap.common.testing.basetest import BaseLeapTest  from leap.soledad.backends.leap_backend import LeapDocument  from leap.soledad.tests import BaseSoledadTest @@ -6,10 +31,7 @@ from leap.soledad.tests import (      KEY_FINGERPRINT,      PRIVATE_KEY,  ) -from leap.soledad import ( -    Soledad, -    KeyAlreadyExists, -) +from leap.soledad import KeyAlreadyExists  from leap.soledad.util import GPGWrapper  try: @@ -93,8 +115,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):      def test_import_recovery_document_raw(self):          rd = self._soledad.export_recovery_document(None)          gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir -        s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, -                    bootstrap=False, prefix=self.tempdir) +        s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2')          s._init_dirs()          s._gpg = GPGWrapper(gnupghome=gnupg_home)          s.import_recovery_document(rd, None) @@ -119,8 +140,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest):      def test_import_recovery_document_crypt(self):          rd = self._soledad.export_recovery_document('123456')          gnupg_home = self.gnupg_home = "%s/gnupg2" % self.tempdir -        s = Soledad('anotheruser@leap.se', gnupg_home=gnupg_home, -                    bootstrap=False, prefix=self.tempdir) +        s = self._soledad_instance(user='anotheruser@leap.se')          s._init_dirs()          s._gpg = GPGWrapper(gnupghome=gnupg_home)          s.import_recovery_document(rd, '123456') @@ -143,52 +163,19 @@ class RecoveryDocumentTestCase(BaseSoledadTest):          ) -class SoledadAuxMethods(BaseLeapTest): - -    def setUp(self): -        pass - -    def tearDown(self): -        pass - -    def _soledad_instance(self, prefix=None): -        return Soledad('leap@leap.se', bootstrap=False, -                       prefix=prefix or self.tempdir+'/soledad') - -    def _gpgwrapper_instance(self): -        return GPGWrapper(gnupghome="%s/gnupg" % self.tempdir) - -    def test__init_dirs(self): -        sol = self._soledad_instance() -        sol._init_dirs() -        self.assertTrue(os.path.isdir(sol.prefix)) - -    def test__init_db(self): -        sol = self._soledad_instance() -        sol._init_dirs() -        sol._gpg = self._gpgwrapper_instance() -        #self._soledad._gpg.import_keys(PUBLIC_KEY) -        if not sol._has_privkey(): -            sol._set_privkey(PRIVATE_KEY) -        if not sol._has_symkey(): -            sol._gen_symkey() -        sol._load_symkey() -        sol._init_db() -        from leap.soledad.backends.sqlcipher import SQLCipherDatabase -        self.assertIsInstance(sol._db, SQLCipherDatabase) +class CryptoMethodsTestCase(BaseSoledadTest):      def test__gen_privkey(self): -        sol = self._soledad_instance() +        sol = self._soledad_instance(user='user@leap.se', prefix='/4')          sol._init_dirs()          sol._gpg = GPGWrapper(gnupghome="%s/gnupg2" % self.tempdir)          self.assertFalse(sol._has_privkey(), 'Should not have a private key '                                               'at this point.') -        sol._set_privkey(PRIVATE_KEY) +        sol._gen_privkey()          self.assertTrue(sol._has_privkey(), 'Could not generate privkey.')      def test__gen_symkey(self): -        sol = Soledad('leap@leap.se', bootstrap=False, -                      prefix=self.tempdir+'/soledad3') +        sol = self._soledad_instance(user='user@leap.se', prefix='/3')          sol._init_dirs()          sol._gpg = GPGWrapper(gnupghome="%s/gnupg3" % self.tempdir)          if not sol._has_privkey(): @@ -199,11 +186,12 @@ class SoledadAuxMethods(BaseLeapTest):          self.assertTrue(sol._has_symkey(), "Could not generate symkey.")      def test__has_keys(self): -        sol = self._soledad_instance() +        sol = self._soledad_instance(user='leap@leap.se', prefix='/5')          sol._init_dirs() -        sol._gpg = self._gpgwrapper_instance() +        sol._gpg = GPGWrapper(gnupghome=self.tempdir+"/5/gnupg")          self.assertFalse(sol._has_keys())          sol._set_privkey(PRIVATE_KEY) +        sol._has_privkey()          self.assertFalse(sol._has_keys())          sol._gen_symkey()          self.assertTrue(sol._has_keys()) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py new file mode 100644 index 00000000..92d5182b --- /dev/null +++ b/src/leap/soledad/tests/test_soledad.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for general Soledad functionality. +""" + + +import os +import re +import tempfile +try: +    import simplejson as json +except ImportError: +    import json  # noqa + + +from leap.soledad.tests import BaseSoledadTest +from leap.soledad import Soledad + + +class AuxMethodsTestCase(BaseSoledadTest): + +    def test__init_dirs(self): +        sol = self._soledad_instance(prefix='/_init_dirs') +        sol._init_dirs() +        local_db_dir = os.path.dirname(sol._config.get_local_db_path()) +        gnupg_home = os.path.dirname(sol._config.get_gnupg_home()) +        secret_path = os.path.dirname(sol._config.get_secret_path()) +        self.assertTrue(os.path.isdir(local_db_dir)) +        self.assertTrue(os.path.isdir(gnupg_home)) +        self.assertTrue(os.path.isdir(secret_path)) + +    def test__init_db(self): +        sol = self._soledad_instance() +        sol._init_dirs() +        sol._gpg = self._gpgwrapper_instance() +        #self._soledad._gpg.import_keys(PUBLIC_KEY) +        if not sol._has_privkey(): +            sol._set_privkey(PRIVATE_KEY) +        if not sol._has_symkey(): +            sol._gen_symkey() +        sol._load_symkey() +        sol._init_db() +        from leap.soledad.backends.sqlcipher import SQLCipherDatabase +        self.assertIsInstance(sol._db, SQLCipherDatabase) + +    def test__init_config_default(self): +        """ +        Test if configuration defaults point to the correct place. +        """ +        sol = Soledad(user='leap@leap.se', bootstrap=False) +        self.assertTrue(bool(re.match( +            '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home()))) +        self.assertTrue(bool(re.match( +            '.*/\.config/leap/soledad/secret.gpg', +            sol._config.get_secret_path()))) +        self.assertTrue(bool(re.match( +            '.*/\.config/leap/soledad/soledad.u1db', +            sol._config.get_local_db_path()))) +        self.assertEqual( +            'http://provider/soledad/shared', +            sol._config.get_shared_db_url()) + +    def test__init_config_defaults(self): +        """ +        Test if configuration defaults point to the correct place. +        """ +        # we use regexp match here because HOME environment variable is +        # changed by the BaseLeapTest class but BaseConfig does not capture +        # that change. +        sol = Soledad(user='leap@leap.se', bootstrap=False) +        self.assertTrue(bool(re.match( +            '.*/\.config/leap/soledad/gnupg', sol._config.get_gnupg_home()))) +        self.assertTrue(bool(re.match( +            '.*/\.config/leap/soledad/secret.gpg', +            sol._config.get_secret_path()))) +        self.assertTrue(bool(re.match( +            '.*/\.config/leap/soledad/soledad.u1db', +            sol._config.get_local_db_path()))) +        self.assertEqual( +            'http://provider/soledad/shared', +            sol._config.get_shared_db_url()) + +    def test__init_config_from_file(self): +        """ +        Test if configuration is correctly read from file. +        """ +        # we use regexp match here because HOME environment variable is +        # changed by the BaseLeapTest class but BaseConfig does not capture +        # that change. +        config_values = { +            "gnupg_home": "value_1", +            "secret_path": "value_2", +            "local_db_path": "value_3", +            "shared_db_url": "value_4" +        } +        tmpfile = tempfile.mktemp(dir=self.tempdir) +        f = open(tmpfile, 'w') +        f.write(json.dumps(config_values)) +        f.close() +        sol = Soledad( +            user='leap@leap.se', +            bootstrap=False, +            config_path=tmpfile) +        self.assertEqual('value_1', sol._config.get_gnupg_home()) +        self.assertEqual('value_2', sol._config.get_secret_path()) +        self.assertEqual('value_3', sol._config.get_local_db_path()) +        self.assertEqual('value_4', sol._config.get_shared_db_url()) + +    def test__init_config_from_params(self): +        """ +        Test if configuration is correctly read from file. +        """ +        # we use regexp match here because HOME environment variable is +        # changed by the BaseLeapTest class but BaseConfig does not capture +        # that change. +        sol = Soledad( +            user='leap@leap.se', +            bootstrap=False, +            gnupg_home='value_4', +            secret_path='value_3', +            local_db_path='value_2', +            shared_db_url='value_1') +        self.assertEqual('value_4', sol._config.get_gnupg_home()) +        self.assertEqual('value_3', sol._config.get_secret_path()) +        self.assertEqual('value_2', sol._config.get_local_db_path()) +        self.assertEqual('value_1', sol._config.get_shared_db_url()) | 
