From 6da8d09846db4d2eed01e488bc6a6f5ba48b959f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 12 Aug 2013 13:25:44 +0200 Subject: move everything into bitmask namespace --- src/leap/bitmask/config/__init__.py | 0 src/leap/bitmask/config/leapsettings.py | 253 +++++++++++++++++++ src/leap/bitmask/config/provider_spec.py | 105 ++++++++ src/leap/bitmask/config/providerconfig.py | 218 ++++++++++++++++ .../bitmask/config/tests/test_providerconfig.py | 279 +++++++++++++++++++++ 5 files changed, 855 insertions(+) create mode 100644 src/leap/bitmask/config/__init__.py create mode 100644 src/leap/bitmask/config/leapsettings.py create mode 100644 src/leap/bitmask/config/provider_spec.py create mode 100644 src/leap/bitmask/config/providerconfig.py create mode 100644 src/leap/bitmask/config/tests/test_providerconfig.py (limited to 'src/leap/bitmask/config') diff --git a/src/leap/bitmask/config/__init__.py b/src/leap/bitmask/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/leap/bitmask/config/leapsettings.py b/src/leap/bitmask/config/leapsettings.py new file mode 100644 index 00000000..35010280 --- /dev/null +++ b/src/leap/bitmask/config/leapsettings.py @@ -0,0 +1,253 @@ +# -*- coding: utf-8 -*- +# leapsettings.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +QSettings abstraction +""" +import os +import logging + +from PySide import QtCore + +from leap.common.check import leap_assert, leap_assert_type +from leap.common.config.prefixers import get_platform_prefixer + +logger = logging.getLogger(__name__) + + +def to_bool(val): + """ + Returns the boolean value corresponding to val. Will return False + in case val is not a string or something that behaves like one. + + :param val: value to cast + :type val: either bool already or str + + :rtype: bool + """ + if isinstance(val, bool): + return val + + bool_val = False + try: + bool_val = val.lower() == "true" + except: + pass + + return bool_val + + +class LeapSettings(object): + """ + Leap client QSettings wrapper + """ + + CONFIG_NAME = "leap.conf" + + # keys + GEOMETRY_KEY = "Geometry" + WINDOWSTATE_KEY = "WindowState" + USER_KEY = "User" + PROPERPROVIDER_KEY = "ProperProvider" + REMEMBER_KEY = "RememberUserAndPass" + DEFAULTPROVIDER_KEY = "DefaultProvider" + ALERTMISSING_KEY = "AlertMissingScripts" + + def __init__(self, standalone=False): + """ + Constructor + + :param standalone: parameter used to define the location of + the config + :type standalone: bool + """ + + settings_path = os.path.join(get_platform_prefixer() + .get_path_prefix(standalone=standalone), + "leap", + self.CONFIG_NAME) + self._settings = QtCore.QSettings(settings_path, + QtCore.QSettings.IniFormat) + + def get_geometry(self): + """ + Returns the saved geometry or None if it wasn't saved + + :rtype: bytearray or None + """ + return self._settings.value(self.GEOMETRY_KEY, None) + + def set_geometry(self, geometry): + """ + Saves the geometry to the settings + + :param geometry: bytearray representing the geometry + :type geometry: bytearray + """ + leap_assert(geometry, "We need a geometry") + self._settings.setValue(self.GEOMETRY_KEY, geometry) + + def get_windowstate(self): + """ + Returns the window state or None if it wasn't saved + + :rtype: bytearray or None + """ + return self._settings.value(self.WINDOWSTATE_KEY, None) + + def set_windowstate(self, windowstate): + """ + Saves the window state to the settings + + :param windowstate: bytearray representing the window state + :type windowstate: bytearray + """ + leap_assert(windowstate, "We need a window state") + self._settings.setValue(self.WINDOWSTATE_KEY, windowstate) + + def get_enabled_services(self, provider): + """ + Returns a list of enabled services for the given provider + + :param provider: provider domain + :type provider: str + + :rtype: list of str + """ + + leap_assert(len(provider) > 0, "We need a nonempty provider") + enabled_services = self._settings.value("%s/Services" % (provider,), + []) + if isinstance(enabled_services, (str, unicode)): + enabled_services = enabled_services.split(",") + + return enabled_services + + def set_enabled_services(self, provider, services): + """ + Saves the list of enabled services for the given provider + + :param provider: provider domain + :type provider: str + + :param services: list of services to save + :type services: list of str + """ + + leap_assert(len(provider) > 0, "We need a nonempty provider") + leap_assert_type(services, list) + + self._settings.setValue("%s/Services" % (provider,), + services) + + def get_user(self): + """ + Returns the configured user to remember, None if there isn't one + + :rtype: str or None + """ + return self._settings.value(self.USER_KEY, None) + + def set_user(self, user): + """ + Saves the user to remember + + :param user: user name to remember + :type user: str + """ + leap_assert(len(user) > 0, "We cannot save an empty user") + self._settings.setValue(self.USER_KEY, user) + + def get_remember(self): + """ + Returns the value of the remember selection. + + :rtype: bool + """ + return to_bool(self._settings.value(self.REMEMBER_KEY, False)) + + def set_remember(self, remember): + """ + Sets wheter the app should remember username and password + + :param remember: True if the app should remember username and + password, False otherwise + :rtype: bool + """ + leap_assert_type(remember, bool) + self._settings.setValue(self.REMEMBER_KEY, remember) + + # TODO: make this scale with multiple providers, we are assuming + # just one for now + def get_properprovider(self): + """ + Returns True if there is a properly configured provider. + + .. note:: this assumes only one provider for now. + + :rtype: bool + """ + return to_bool(self._settings.value(self.PROPERPROVIDER_KEY, False)) + + def set_properprovider(self, properprovider): + """ + Sets whether the app should automatically login. + + :param properprovider: True if the provider is properly configured, + False otherwise. + :type properprovider: bool + """ + leap_assert_type(properprovider, bool) + self._settings.setValue(self.PROPERPROVIDER_KEY, properprovider) + + def get_defaultprovider(self): + """ + Returns the default provider to be used for autostarting EIP + + :rtype: str or None + """ + return self._settings.value(self.DEFAULTPROVIDER_KEY, None) + + def set_defaultprovider(self, provider): + """ + Sets the default provider to be used for autostarting EIP + + :param provider: provider to use + :type provider: str or None + """ + if provider is None: + self._settings.remove(self.DEFAULTPROVIDER_KEY) + else: + self._settings.setValue(self.DEFAULTPROVIDER_KEY, provider) + + def get_alert_missing_scripts(self): + """ + Returns the setting for alerting of missing up/down scripts. + + :rtype: bool + """ + return to_bool(self._settings.value(self.ALERTMISSING_KEY, True)) + + def set_alert_missing_scripts(self, value): + """ + Sets the setting for alerting of missing up/down scripts. + + :param value: the value to set + :type value: bool + """ + leap_assert_type(value, bool) + self._settings.setValue(self.ALERTMISSING_KEY, value) diff --git a/src/leap/bitmask/config/provider_spec.py b/src/leap/bitmask/config/provider_spec.py new file mode 100644 index 00000000..cf942c7b --- /dev/null +++ b/src/leap/bitmask/config/provider_spec.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# provider_spec.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +leap_provider_spec = { + 'description': 'provider definition', + 'type': 'object', + 'properties': { + 'version': { + 'type': unicode, + 'default': '0.1.0' + }, + "default_language": { + 'type': unicode, + 'default': 'en' + }, + 'domain': { + 'type': unicode, # XXX define uri type + 'default': 'testprovider.example.org' + }, + 'name': { + 'type': dict, + 'format': 'translatable', + 'default': {u'en': u'Test Provider'} + }, + 'description': { + #'type': LEAPTranslatable, + 'type': dict, + 'format': 'translatable', + 'default': {u'en': u'Test provider'} + }, + 'enrollment_policy': { + 'type': unicode, # oneof ?? + 'default': 'open' + }, + 'services': { + 'type': list, # oneof ?? + 'default': ['eip'] + }, + 'api_version': { + 'type': unicode, + 'default': '0.1.0' # version regexp + }, + 'api_uri': { + 'type': unicode # uri + }, + 'public_key': { + 'type': unicode # fingerprint + }, + 'ca_cert_fingerprint': { + 'type': unicode, + }, + 'ca_cert_uri': { + 'type': unicode, + 'format': 'https-uri' + }, + 'languages': { + 'type': list, + 'default': ['en'] + }, + 'service': { + 'levels': { + 'type': list + }, + 'default_service_level': { + 'type': int, + 'default': 1 + }, + 'allow_free': { + 'type': unicode + }, + 'allow_paid': { + 'type': unicode + }, + 'allow_anonymous': { + 'type': unicode + }, + 'allow_registration': { + 'type': unicode + }, + 'bandwidth_limit': { + 'type': int + }, + 'allow_limited_bandwidth': { + 'type': unicode + }, + 'allow_unlimited_bandwidth': { + 'type': unicode + } + } + } +} diff --git a/src/leap/bitmask/config/providerconfig.py b/src/leap/bitmask/config/providerconfig.py new file mode 100644 index 00000000..f899b17c --- /dev/null +++ b/src/leap/bitmask/config/providerconfig.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +# providerconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Provider configuration +""" +import logging +import os + +from leap.common.check import leap_check +from leap.common.config.baseconfig import BaseConfig, LocalizedKey +from leap.config.provider_spec import leap_provider_spec + +logger = logging.getLogger(__name__) + + +class MissingCACert(Exception): + """ + Raised when a CA certificate is needed but not found. + """ + pass + + +class ProviderConfig(BaseConfig): + """ + Provider configuration abstraction class + """ + def __init__(self): + BaseConfig.__init__(self) + + def _get_schema(self): + """ + Returns the schema corresponding to the version given. + + :rtype: dict or None if the version is not supported. + """ + return leap_provider_spec + + def _get_spec(self): + """ + Returns the spec object for the specific configuration. + + Override the BaseConfig one because we do not support multiple schemas + for the provider yet. + + :rtype: dict or None if the version is not supported. + """ + return self._get_schema() + + def get_api_uri(self): + return self._safe_get_value("api_uri") + + def get_api_version(self): + return self._safe_get_value("api_version") + + def get_ca_cert_fingerprint(self): + return self._safe_get_value("ca_cert_fingerprint") + + def get_ca_cert_uri(self): + return self._safe_get_value("ca_cert_uri") + + def get_default_language(self): + return self._safe_get_value("default_language") + + @LocalizedKey + def get_description(self): + return self._safe_get_value("description") + + @classmethod + def sanitize_path_component(cls, component): + """ + If the provider tries to instrument the component of a path + that is controlled by them, this will take care of + removing/escaping all the necessary elements. + + :param component: Path component to process + :type component: unicode or str + + :returns: The path component properly escaped + :rtype: unicode or str + """ + # TODO: Fix for windows, names like "aux" or "con" aren't + # allowed. + return component.replace(os.path.sep, "") + + def get_domain(self): + return ProviderConfig.sanitize_path_component( + self._safe_get_value("domain")) + + def get_enrollment_policy(self): + """ + Returns the enrollment policy + + :rtype: string + """ + return self._safe_get_value("enrollment_policy") + + def get_languages(self): + return self._safe_get_value("languages") + + @LocalizedKey + def get_name(self): + return self._safe_get_value("name") + + def get_services(self): + """ + Returns a list with the available services in the current provider. + + :rtype: list + """ + services = self._safe_get_value("services") + return services + + def get_services_string(self): + """ + Returns a string with the available services in the current + provider, ready to be shown to the user. + """ + services_str = ", ".join(self.get_services()) + services_str = services_str.replace( + "openvpn", "Encrypted Internet") + return services_str + + def get_ca_cert_path(self, about_to_download=False): + """ + Returns the path to the certificate for the current provider. + It may raise MissingCACert if + the certificate does not exists and not about_to_download + + :param about_to_download: defines wether we want the path to + download the cert or not. This helps avoid + checking if the cert exists because we + are about to write it. + :type about_to_download: bool + """ + + cert_path = os.path.join(self.get_path_prefix(), + "leap", + "providers", + self.get_domain(), + "keys", + "ca", + "cacert.pem") + + if not about_to_download: + cert_exists = os.path.exists(cert_path) + error_msg = "You need to download the certificate first" + leap_check(cert_exists, error_msg, MissingCACert) + logger.debug("Going to verify SSL against %s" % (cert_path,)) + + return cert_path + + def provides_eip(self): + """ + Returns True if this particular provider has the EIP service, + False otherwise. + + :rtype: bool + """ + return "openvpn" in self.get_services() + + def provides_mx(self): + """ + Returns True if this particular provider has the MX service, + False otherwise. + + :rtype: bool + """ + return "mx" in self.get_services() + + +if __name__ == "__main__": + logger = logging.getLogger(name='leap') + logger.setLevel(logging.DEBUG) + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s ' + '- %(name)s - %(levelname)s - %(message)s') + console.setFormatter(formatter) + logger.addHandler(console) + + provider = ProviderConfig() + + try: + provider.get_api_version() + except Exception as e: + assert isinstance(e, AssertionError), "Expected an assert" + print "Safe value getting is working" + + # standalone minitest + #if provider.load("provider_bad.json"): + if provider.load("leap/providers/bitmask.net/provider.json"): + print provider.get_api_version() + print provider.get_ca_cert_fingerprint() + print provider.get_ca_cert_uri() + print provider.get_default_language() + print provider.get_description() + print provider.get_description(lang="asd") + print provider.get_domain() + print provider.get_enrollment_policy() + print provider.get_languages() + print provider.get_name() + print provider.get_services() diff --git a/src/leap/bitmask/config/tests/test_providerconfig.py b/src/leap/bitmask/config/tests/test_providerconfig.py new file mode 100644 index 00000000..ff2828e6 --- /dev/null +++ b/src/leap/bitmask/config/tests/test_providerconfig.py @@ -0,0 +1,279 @@ +# -*- coding: utf-8 -*- +# test_providerconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for providerconfig +""" + +try: + import unittest2 as unittest +except ImportError: + import unittest + +import os +import json +import copy + +from leap.common.testing.basetest import BaseLeapTest +from leap.config.providerconfig import ProviderConfig, MissingCACert +from leap.services import get_supported + +from mock import Mock + + +sample_config = { + "api_uri": "https://api.test.bitmask.net:4430", + "api_version": "1", + "ca_cert_fingerprint": + "SHA256: 0f17c033115f6b76ff67871872303ff65034efe7dd1b910062ca323eb4da5c7e", + "ca_cert_uri": "https://test.bitmask.net/ca.crt", + "default_language": "en", + "description": { + "en": "Test description for provider", + "es": "Descripcion de prueba para el proveedor" + }, + "domain": "test.bitmask.net", + "enrollment_policy": "open", + "languages": [ + "en", + "es" + ], + "name": { + "en": "Bitmask testing environment", + "es": "Entorno de pruebas de Bitmask" + }, + "service": { + "allow_anonymous": True, + "allow_free": True, + "allow_limited_bandwidth": True, + "allow_paid": False, + "allow_registration": True, + "allow_unlimited_bandwidth": False, + "bandwidth_limit": 400000, + "default_service_level": 1, + "levels": [ + { + "bandwidth": "limited", + "id": 1, + "name": "anonymous" + }, + { + "bandwidth": "limited", + "id": 2, + "name": "free", + "storage": 50 + } + ] + }, + "services": [ + "openvpn" + ] +} + + +class ProviderConfigTest(BaseLeapTest): + """Tests for ProviderConfig""" + + def setUp(self): + self._provider_config = ProviderConfig() + json_string = json.dumps(sample_config) + self._provider_config.load(data=json_string) + + # At certain points we are going to be replacing these method + # to avoid creating a file. + # We need to save the old implementation and restore it in + # tearDown so we are sure everything is as expected for each + # test. If we do it inside each specific test, a failure in + # the test will leave the implementation with the mock. + self._old_ospath_exists = os.path.exists + + def tearDown(self): + os.path.exists = self._old_ospath_exists + + def test_configs_ok(self): + """ + Test if the configs loads ok + """ + # TODO: this test should go to the BaseConfig tests + pc = self._provider_config + self.assertEqual(pc.get_api_uri(), sample_config['api_uri']) + self.assertEqual(pc.get_api_version(), sample_config['api_version']) + self.assertEqual(pc.get_ca_cert_fingerprint(), + sample_config['ca_cert_fingerprint']) + self.assertEqual(pc.get_ca_cert_uri(), sample_config['ca_cert_uri']) + self.assertEqual(pc.get_default_language(), + sample_config['default_language']) + + self.assertEqual(pc.get_domain(), sample_config['domain']) + self.assertEqual(pc.get_enrollment_policy(), + sample_config['enrollment_policy']) + self.assertEqual(pc.get_languages(), sample_config['languages']) + + def test_localizations(self): + pc = self._provider_config + + self.assertEqual(pc.get_description(lang='en'), + sample_config['description']['en']) + self.assertEqual(pc.get_description(lang='es'), + sample_config['description']['es']) + + self.assertEqual(pc.get_name(lang='en'), sample_config['name']['en']) + self.assertEqual(pc.get_name(lang='es'), sample_config['name']['es']) + + def _localize(self, lang): + """ + Helper to change default language of the provider config. + """ + pc = self._provider_config + config = copy.deepcopy(sample_config) + config['default_language'] = lang + json_string = json.dumps(config) + pc.load(data=json_string) + + return config + + def test_default_localization1(self): + pc = self._provider_config + config = self._localize(sample_config['languages'][0]) + + default_language = config['default_language'] + default_description = config['description'][default_language] + default_name = config['name'][default_language] + + self.assertEqual(pc.get_description(lang='xx'), default_description) + self.assertEqual(pc.get_description(), default_description) + + self.assertEqual(pc.get_name(lang='xx'), default_name) + self.assertEqual(pc.get_name(), default_name) + + def test_default_localization2(self): + pc = self._provider_config + config = self._localize(sample_config['languages'][1]) + + default_language = config['default_language'] + default_description = config['description'][default_language] + default_name = config['name'][default_language] + + self.assertEqual(pc.get_description(lang='xx'), default_description) + self.assertEqual(pc.get_description(), default_description) + + self.assertEqual(pc.get_name(lang='xx'), default_name) + self.assertEqual(pc.get_name(), default_name) + + def test_get_ca_cert_path_as_expected(self): + pc = self._provider_config + pc.get_path_prefix = Mock(return_value='test') + + provider_domain = sample_config['domain'] + expected_path = os.path.join('test', 'leap', 'providers', + provider_domain, 'keys', 'ca', + 'cacert.pem') + + # mock 'os.path.exists' so we don't get an error for unexisting file + os.path.exists = Mock(return_value=True) + cert_path = pc.get_ca_cert_path() + + self.assertEqual(cert_path, expected_path) + + def test_get_ca_cert_path_about_to_download(self): + pc = self._provider_config + pc.get_path_prefix = Mock(return_value='test') + + provider_domain = sample_config['domain'] + expected_path = os.path.join('test', 'leap', 'providers', + provider_domain, 'keys', 'ca', + 'cacert.pem') + + cert_path = pc.get_ca_cert_path(about_to_download=True) + + self.assertEqual(cert_path, expected_path) + + def test_get_ca_cert_path_fails(self): + pc = self._provider_config + pc.get_path_prefix = Mock(return_value='test') + + # mock 'get_domain' so we don't need to load a config + provider_domain = 'test.provider.com' + pc.get_domain = Mock(return_value=provider_domain) + + with self.assertRaises(MissingCACert): + pc.get_ca_cert_path() + + def test_provides_eip(self): + pc = self._provider_config + config = copy.deepcopy(sample_config) + + # It provides + config['services'] = ['openvpn', 'test_service'] + json_string = json.dumps(config) + pc.load(data=json_string) + self.assertTrue(pc.provides_eip()) + + # It does not provides + config['services'] = ['test_service', 'other_service'] + json_string = json.dumps(config) + pc.load(data=json_string) + self.assertFalse(pc.provides_eip()) + + def test_provides_mx(self): + pc = self._provider_config + config = copy.deepcopy(sample_config) + + # It provides + config['services'] = ['mx', 'other_service'] + json_string = json.dumps(config) + pc.load(data=json_string) + self.assertTrue(pc.provides_mx()) + + # It does not provides + config['services'] = ['test_service', 'other_service'] + json_string = json.dumps(config) + pc.load(data=json_string) + self.assertFalse(pc.provides_mx()) + + def test_supports_unknown_service(self): + pc = self._provider_config + config = copy.deepcopy(sample_config) + + config['services'] = ['unknown'] + json_string = json.dumps(config) + pc.load(data=json_string) + self.assertFalse('unknown' in get_supported(pc.get_services())) + + def test_provides_unknown_service(self): + pc = self._provider_config + config = copy.deepcopy(sample_config) + + config['services'] = ['unknown'] + json_string = json.dumps(config) + pc.load(data=json_string) + self.assertTrue('unknown' in pc.get_services()) + + def test_get_services_string(self): + pc = self._provider_config + config = copy.deepcopy(sample_config) + config['services'] = [ + 'openvpn', 'asdf', 'openvpn', 'not_supported_service'] + json_string = json.dumps(config) + pc.load(data=json_string) + + self.assertEqual(pc.get_services_string(), + "Encrypted Internet, asdf, Encrypted Internet," + " not_supported_service") + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3