diff options
| -rw-r--r-- | src/leap/services/eip/eipconfig.py | 27 | ||||
| -rw-r--r-- | src/leap/services/eip/tests/__init__.py | 0 | ||||
| -rw-r--r-- | src/leap/services/eip/tests/test_eipconfig.py | 174 | 
3 files changed, 199 insertions, 2 deletions
diff --git a/src/leap/services/eip/eipconfig.py b/src/leap/services/eip/eipconfig.py index 4e74687a..baf26bca 100644 --- a/src/leap/services/eip/eipconfig.py +++ b/src/leap/services/eip/eipconfig.py @@ -18,8 +18,9 @@  """  Provider configuration  """ -import os  import logging +import os +import re  from leap.common.check import leap_assert, leap_assert_type  from leap.common.config.baseconfig import BaseConfig @@ -33,6 +34,8 @@ class EIPConfig(BaseConfig):      """      Provider configuration abstraction class      """ +    OPENVPN_ALLOWED_KEYS = ("auth", "cipher", "tls-cipher") +    OPENVPN_CIPHERS_REGEX = re.compile("[A-Z0-9\-]+")      def __init__(self):          BaseConfig.__init__(self) @@ -52,7 +55,24 @@ class EIPConfig(BaseConfig):          return self._safe_get_value("gateways")      def get_openvpn_configuration(self): -        return self._safe_get_value("openvpn_configuration") +        """ +        Returns a dictionary containing the openvpn configuration +        parameters. + +        These are sanitized with alphanumeric whitelist. + +        @returns: openvpn configuration dict +        @rtype: C{dict} +        """ +        ovpncfg = self._safe_get_value("openvpn_configuration") +        config = {} +        for key, value in ovpncfg.items(): +            if key in self.OPENVPN_ALLOWED_KEYS and value is not None: +                sanitized_val = self.OPENVPN_CIPHERS_REGEX.findall(value) +                if len(sanitized_val) != 0: +                    _val = sanitized_val[0] +                    config[str(key)] = str(_val) +        return config      def get_serial(self):          return self._safe_get_value("serial") @@ -61,6 +81,9 @@ class EIPConfig(BaseConfig):          return self._safe_get_value("version")      def get_gateway_ip(self, index=0): +        """ +        Returns the ip of the gateway +        """          gateways = self.get_gateways()          leap_assert(len(gateways) > 0, "We don't have any gateway!")          if index > len(gateways): diff --git a/src/leap/services/eip/tests/__init__.py b/src/leap/services/eip/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/services/eip/tests/__init__.py diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py new file mode 100644 index 00000000..1675472f --- /dev/null +++ b/src/leap/services/eip/tests/test_eipconfig.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# test_eipconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +tests for eipconfig +""" +import copy +import json +import os +import unittest + +from leap.common.testing.basetest import BaseLeapTest +from leap.services.eip.eipconfig import EIPConfig + + +sample_config = { +    "gateways": [ +    { +        "capabilities": { +            "adblock": False, +            "filter_dns": True, +            "limited": True, +            "ports": [ +            "1194", +            "443", +            "53", +            "80" +            ], +        "protocols": [ +            "tcp", +            "udp"], +        "transport": [ +            "openvpn"], +        "user_ips": False}, +    "host": "host.dev.example.org", +    "ip_address": "11.22.33.44", +    "location": "cyberspace" +    }], +    "locations": { +        "ankara": { +        "country_code": "XX", +        "hemisphere": "S", +        "name": "Antarctica", +        "timezone": "+2" +        } +    }, +    "openvpn_configuration": { +        "auth": "SHA1", +        "cipher": "AES-128-CBC", +        "tls-cipher": "DHE-RSA-AES128-SHA" +    }, +    "serial": 1, +    "version": 1 +} + + +class EIPConfigTest(BaseLeapTest): + +    __name__ = "eip_config_tests" +    #provider = "testprovider.example.org" + +    maxDiff = None + +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    # +    # helpers +    # + +    def write_config(self, data): +        self.configfile = os.path.join( +            self.tempdir, "eipconfig.json") +        conf = open(self.configfile, "w") +        conf.write(json.dumps(data)) +        conf.close() + +    def test_load_valid_config(self): +        """ +        load a sample config +        """ +        self.write_config(sample_config) +        config = EIPConfig() +        self.assertRaises( +            AssertionError, +            config.get_clusters) +        self.assertTrue(config.load(self.configfile)) +        self.assertEqual( +            config.get_openvpn_configuration(), +            sample_config["openvpn_configuration"]) +        self.assertEqual( +            config.get_gateway_ip(), +            "11.22.33.44") +        self.assertEqual(config.get_version(), 1) +        self.assertEqual(config.get_serial(), 1) +        self.assertEqual(config.get_gateways(), +                         sample_config["gateways"]) +        self.assertEqual( +            config.get_clusters(), None) + +    def test_openvpnoptions(self): +        """ +        check the sanitization of openvpn options +        """ +        # extra parameters +        data = copy.deepcopy(sample_config) +        data['openvpn_configuration']["extra_param"] = "FOO" +        self.write_config(data) +        config = EIPConfig() +        config.load(self.configfile) +        self.assertEqual( +            config.get_openvpn_configuration(), +            sample_config["openvpn_configuration"]) + +        # non allowed chars +        data = copy.deepcopy(sample_config) +        data['openvpn_configuration']["auth"] = "SHA1;" +        self.write_config(data) +        config = EIPConfig() +        config.load(self.configfile) +        self.assertEqual( +            config.get_openvpn_configuration(), +            sample_config["openvpn_configuration"]) + +        # non allowed chars +        data = copy.deepcopy(sample_config) +        data['openvpn_configuration']["auth"] = "SHA1>`&|" +        self.write_config(data) +        config = EIPConfig() +        config.load(self.configfile) +        self.assertEqual( +            config.get_openvpn_configuration(), +            sample_config["openvpn_configuration"]) + +        # lowercase +        data = copy.deepcopy(sample_config) +        data['openvpn_configuration']["auth"] = "shaSHA1" +        self.write_config(data) +        config = EIPConfig() +        config.load(self.configfile) +        self.assertEqual( +            config.get_openvpn_configuration(), +            sample_config["openvpn_configuration"]) + +        # all characters invalid -> null value +        data = copy.deepcopy(sample_config) +        data['openvpn_configuration']["auth"] = "sha&*!@#;" +        self.write_config(data) +        config = EIPConfig() +        config.load(self.configfile) +        self.assertEqual( +            config.get_openvpn_configuration(), +            {'cipher': 'AES-128-CBC', +             'tls-cipher': 'DHE-RSA-AES128-SHA'}) + + +if __name__ == "__main__": +    unittest.main()  | 
