From 21b57bfd059ff32201c3403bd5ecc00d4b7d3aed Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 1 May 2013 04:11:26 +0900 Subject: whitelist openvpn cipher parameters --- src/leap/services/eip/tests/test_eipconfig.py | 174 ++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 src/leap/services/eip/tests/test_eipconfig.py (limited to 'src/leap/services/eip/tests/test_eipconfig.py') diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py new file mode 100644 index 00000000..1675472f --- /dev/null +++ b/src/leap/services/eip/tests/test_eipconfig.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# test_eipconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +tests for eipconfig +""" +import copy +import json +import os +import unittest + +from leap.common.testing.basetest import BaseLeapTest +from leap.services.eip.eipconfig import EIPConfig + + +sample_config = { + "gateways": [ + { + "capabilities": { + "adblock": False, + "filter_dns": True, + "limited": True, + "ports": [ + "1194", + "443", + "53", + "80" + ], + "protocols": [ + "tcp", + "udp"], + "transport": [ + "openvpn"], + "user_ips": False}, + "host": "host.dev.example.org", + "ip_address": "11.22.33.44", + "location": "cyberspace" + }], + "locations": { + "ankara": { + "country_code": "XX", + "hemisphere": "S", + "name": "Antarctica", + "timezone": "+2" + } + }, + "openvpn_configuration": { + "auth": "SHA1", + "cipher": "AES-128-CBC", + "tls-cipher": "DHE-RSA-AES128-SHA" + }, + "serial": 1, + "version": 1 +} + + +class EIPConfigTest(BaseLeapTest): + + __name__ = "eip_config_tests" + #provider = "testprovider.example.org" + + maxDiff = None + + def setUp(self): + pass + + def tearDown(self): + pass + + # + # helpers + # + + def write_config(self, data): + self.configfile = os.path.join( + self.tempdir, "eipconfig.json") + conf = open(self.configfile, "w") + conf.write(json.dumps(data)) + conf.close() + + def test_load_valid_config(self): + """ + load a sample config + """ + self.write_config(sample_config) + config = EIPConfig() + self.assertRaises( + AssertionError, + config.get_clusters) + self.assertTrue(config.load(self.configfile)) + self.assertEqual( + config.get_openvpn_configuration(), + sample_config["openvpn_configuration"]) + self.assertEqual( + config.get_gateway_ip(), + "11.22.33.44") + self.assertEqual(config.get_version(), 1) + self.assertEqual(config.get_serial(), 1) + self.assertEqual(config.get_gateways(), + sample_config["gateways"]) + self.assertEqual( + config.get_clusters(), None) + + def test_openvpnoptions(self): + """ + check the sanitization of openvpn options + """ + # extra parameters + data = copy.deepcopy(sample_config) + data['openvpn_configuration']["extra_param"] = "FOO" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_openvpn_configuration(), + sample_config["openvpn_configuration"]) + + # non allowed chars + data = copy.deepcopy(sample_config) + data['openvpn_configuration']["auth"] = "SHA1;" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_openvpn_configuration(), + sample_config["openvpn_configuration"]) + + # non allowed chars + data = copy.deepcopy(sample_config) + data['openvpn_configuration']["auth"] = "SHA1>`&|" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_openvpn_configuration(), + sample_config["openvpn_configuration"]) + + # lowercase + data = copy.deepcopy(sample_config) + data['openvpn_configuration']["auth"] = "shaSHA1" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_openvpn_configuration(), + sample_config["openvpn_configuration"]) + + # all characters invalid -> null value + data = copy.deepcopy(sample_config) + data['openvpn_configuration']["auth"] = "sha&*!@#;" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_openvpn_configuration(), + {'cipher': 'AES-128-CBC', + 'tls-cipher': 'DHE-RSA-AES128-SHA'}) + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 544717da3e95a553fa2af8555df6b4e06d9e5af2 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 1 May 2013 04:41:11 +0900 Subject: sanitize ip address --- src/leap/services/eip/tests/test_eipconfig.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'src/leap/services/eip/tests/test_eipconfig.py') diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py index 1675472f..ce04c2fc 100644 --- a/src/leap/services/eip/tests/test_eipconfig.py +++ b/src/leap/services/eip/tests/test_eipconfig.py @@ -114,9 +114,9 @@ class EIPConfigTest(BaseLeapTest): self.assertEqual( config.get_clusters(), None) - def test_openvpnoptions(self): + def test_sanitize_config(self): """ - check the sanitization of openvpn options + check the sanitization of options """ # extra parameters data = copy.deepcopy(sample_config) @@ -169,6 +169,24 @@ class EIPConfigTest(BaseLeapTest): {'cipher': 'AES-128-CBC', 'tls-cipher': 'DHE-RSA-AES128-SHA'}) + # bad_ip + data = copy.deepcopy(sample_config) + data['gateways'][0]["ip_address"] = "11.22.33.44;" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_gateway_ip(), + None) + + data = copy.deepcopy(sample_config) + data['gateways'][0]["ip_address"] = "11.22.33.44`" + self.write_config(data) + config = EIPConfig() + config.load(self.configfile) + self.assertEqual( + config.get_gateway_ip(), + None) if __name__ == "__main__": unittest.main() -- cgit v1.2.3 From 884d0e0f4dbba34b6f6f5afe6e27390a7606a7fa Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 29 May 2013 04:02:43 +0900 Subject: make tests pass & fix pep8 --- src/leap/services/eip/tests/test_eipconfig.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'src/leap/services/eip/tests/test_eipconfig.py') diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py index ce04c2fc..0bd19d5e 100644 --- a/src/leap/services/eip/tests/test_eipconfig.py +++ b/src/leap/services/eip/tests/test_eipconfig.py @@ -97,10 +97,12 @@ class EIPConfigTest(BaseLeapTest): """ self.write_config(sample_config) config = EIPConfig() - self.assertRaises( - AssertionError, - config.get_clusters) - self.assertTrue(config.load(self.configfile)) + #self.assertRaises( + #AssertionError, + #config.get_clusters) + + self.assertTrue(config.load( + self.configfile, relative=False)) self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) @@ -123,7 +125,8 @@ class EIPConfigTest(BaseLeapTest): data['openvpn_configuration']["extra_param"] = "FOO" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load( + self.configfile, relative=False) self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) @@ -133,7 +136,7 @@ class EIPConfigTest(BaseLeapTest): data['openvpn_configuration']["auth"] = "SHA1;" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load(self.configfile, relative=False) self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) @@ -143,7 +146,7 @@ class EIPConfigTest(BaseLeapTest): data['openvpn_configuration']["auth"] = "SHA1>`&|" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load(self.configfile, relative=False) self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) @@ -153,7 +156,7 @@ class EIPConfigTest(BaseLeapTest): data['openvpn_configuration']["auth"] = "shaSHA1" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load(self.configfile, relative=False) self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) @@ -163,7 +166,7 @@ class EIPConfigTest(BaseLeapTest): data['openvpn_configuration']["auth"] = "sha&*!@#;" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load(self.configfile, relative=False) self.assertEqual( config.get_openvpn_configuration(), {'cipher': 'AES-128-CBC', @@ -174,7 +177,7 @@ class EIPConfigTest(BaseLeapTest): data['gateways'][0]["ip_address"] = "11.22.33.44;" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load(self.configfile, relative=False) self.assertEqual( config.get_gateway_ip(), None) @@ -183,7 +186,7 @@ class EIPConfigTest(BaseLeapTest): data['gateways'][0]["ip_address"] = "11.22.33.44`" self.write_config(data) config = EIPConfig() - config.load(self.configfile) + config.load(self.configfile, relative=False) self.assertEqual( config.get_gateway_ip(), None) -- cgit v1.2.3 From d88f1e79fa5b6e3ec3ee7691e1c45680e85c6f12 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Tue, 25 Jun 2013 16:21:58 -0300 Subject: Refactor & add tests for eipconfig --- src/leap/services/eip/tests/test_eipconfig.py | 280 ++++++++++++++++++-------- 1 file changed, 199 insertions(+), 81 deletions(-) (limited to 'src/leap/services/eip/tests/test_eipconfig.py') diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py index 0bd19d5e..8b746b78 100644 --- a/src/leap/services/eip/tests/test_eipconfig.py +++ b/src/leap/services/eip/tests/test_eipconfig.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ -tests for eipconfig +Tests for eipconfig """ import copy import json @@ -24,6 +24,9 @@ import unittest from leap.common.testing.basetest import BaseLeapTest from leap.services.eip.eipconfig import EIPConfig +from leap.config.providerconfig import ProviderConfig + +from mock import Mock sample_config = { @@ -34,27 +37,50 @@ sample_config = { "filter_dns": True, "limited": True, "ports": [ - "1194", - "443", - "53", - "80" - ], - "protocols": [ - "tcp", - "udp"], - "transport": [ - "openvpn"], - "user_ips": False}, - "host": "host.dev.example.org", - "ip_address": "11.22.33.44", - "location": "cyberspace" - }], + "1194", + "443", + "53", + "80"], + "protocols": [ + "tcp", + "udp"], + "transport": ["openvpn"], + "user_ips": False}, + "host": "host.dev.example.org", + "ip_address": "11.22.33.44", + "location": "cyberspace" + }, { + "capabilities": { + "adblock": False, + "filter_dns": True, + "limited": True, + "ports": [ + "1194", + "443", + "53", + "80"], + "protocols": [ + "tcp", + "udp"], + "transport": ["openvpn"], + "user_ips": False}, + "host": "host2.dev.example.org", + "ip_address": "22.33.44.55", + "location": "cyberspace" + } + ], "locations": { "ankara": { - "country_code": "XX", - "hemisphere": "S", - "name": "Antarctica", - "timezone": "+2" + "country_code": "XX", + "hemisphere": "S", + "name": "Antarctica", + "timezone": "+2" + }, + "cyberspace": { + "country_code": "XX", + "hemisphere": "X", + "name": "outer space", + "timezone": "" } }, "openvpn_configuration": { @@ -70,126 +96,218 @@ sample_config = { class EIPConfigTest(BaseLeapTest): __name__ = "eip_config_tests" - #provider = "testprovider.example.org" maxDiff = None def setUp(self): - pass + self._old_ospath_exists = os.path.exists def tearDown(self): - pass + os.path.exists = self._old_ospath_exists - # - # helpers - # + def _write_config(self, data): + """ + Helper to write some data to a temp config file. - def write_config(self, data): - self.configfile = os.path.join( - self.tempdir, "eipconfig.json") + :param data: data to be used to save in the config file. + :data type: dict (valid json) + """ + self.configfile = os.path.join(self.tempdir, "eipconfig.json") conf = open(self.configfile, "w") conf.write(json.dumps(data)) conf.close() - def test_load_valid_config(self): + def _get_eipconfig(self, fromfile=True, data=sample_config): """ - load a sample config + Helper that returns an EIPConfig object using the data parameter + or a sample data. + + :param fromfile: sets if we should use a file or a string + :fromfile type: bool + :param data: sets the data to be used to load in the EIPConfig object + :data type: dict (valid json) + :rtype: EIPConfig """ - self.write_config(sample_config) config = EIPConfig() - #self.assertRaises( - #AssertionError, - #config.get_clusters) - self.assertTrue(config.load( - self.configfile, relative=False)) + loaded = False + if fromfile: + self._write_config(data) + loaded = config.load(self.configfile, relative=False) + else: + json_string = json.dumps(data) + loaded = config.load(data=json_string) + + if not loaded: + return None + + return config + + def test_loads_from_file(self): + config = self._get_eipconfig() + self.assertIsNotNone(config) + + def test_loads_from_data(self): + config = self._get_eipconfig(fromfile=False) + self.assertIsNotNone(config) + + def test_load_valid_config_from_file(self): + config = self._get_eipconfig() + self.assertIsNotNone(config) + self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) + + sample_ip = sample_config["gateways"][0]["ip_address"] self.assertEqual( config.get_gateway_ip(), - "11.22.33.44") - self.assertEqual(config.get_version(), 1) - self.assertEqual(config.get_serial(), 1) - self.assertEqual(config.get_gateways(), - sample_config["gateways"]) + sample_ip) + self.assertEqual(config.get_version(), sample_config["version"]) + self.assertEqual(config.get_serial(), sample_config["serial"]) + self.assertEqual(config.get_gateways(), sample_config["gateways"]) + self.assertEqual(config.get_locations(), sample_config["locations"]) + self.assertEqual(config.get_clusters(), None) + + def test_load_valid_config_from_data(self): + config = self._get_eipconfig(fromfile=False) + self.assertIsNotNone(config) + self.assertEqual( - config.get_clusters(), None) + config.get_openvpn_configuration(), + sample_config["openvpn_configuration"]) - def test_sanitize_config(self): - """ - check the sanitization of options - """ - # extra parameters + sample_ip = sample_config["gateways"][0]["ip_address"] + self.assertEqual( + config.get_gateway_ip(), + sample_ip) + + self.assertEqual(config.get_version(), sample_config["version"]) + self.assertEqual(config.get_serial(), sample_config["serial"]) + self.assertEqual(config.get_gateways(), sample_config["gateways"]) + self.assertEqual(config.get_locations(), sample_config["locations"]) + self.assertEqual(config.get_clusters(), None) + + def test_sanitize_extra_parameters(self): data = copy.deepcopy(sample_config) data['openvpn_configuration']["extra_param"] = "FOO" - self.write_config(data) - config = EIPConfig() - config.load( - self.configfile, relative=False) + config = self._get_eipconfig(data=data) + self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) - # non allowed chars + def test_sanitize_non_allowed_chars(self): data = copy.deepcopy(sample_config) data['openvpn_configuration']["auth"] = "SHA1;" - self.write_config(data) - config = EIPConfig() - config.load(self.configfile, relative=False) + config = self._get_eipconfig(data=data) + self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) - # non allowed chars data = copy.deepcopy(sample_config) data['openvpn_configuration']["auth"] = "SHA1>`&|" - self.write_config(data) - config = EIPConfig() - config.load(self.configfile, relative=False) + config = self._get_eipconfig(data=data) + self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) - # lowercase + def test_sanitize_lowercase(self): data = copy.deepcopy(sample_config) data['openvpn_configuration']["auth"] = "shaSHA1" - self.write_config(data) - config = EIPConfig() - config.load(self.configfile, relative=False) + config = self._get_eipconfig(data=data) + self.assertEqual( config.get_openvpn_configuration(), sample_config["openvpn_configuration"]) - # all characters invalid -> null value + def test_all_characters_invalid(self): data = copy.deepcopy(sample_config) data['openvpn_configuration']["auth"] = "sha&*!@#;" - self.write_config(data) - config = EIPConfig() - config.load(self.configfile, relative=False) + config = self._get_eipconfig(data=data) + self.assertEqual( config.get_openvpn_configuration(), {'cipher': 'AES-128-CBC', 'tls-cipher': 'DHE-RSA-AES128-SHA'}) - # bad_ip + def test_sanitize_bad_ip(self): data = copy.deepcopy(sample_config) data['gateways'][0]["ip_address"] = "11.22.33.44;" - self.write_config(data) - config = EIPConfig() - config.load(self.configfile, relative=False) - self.assertEqual( - config.get_gateway_ip(), - None) + config = self._get_eipconfig(data=data) + + self.assertEqual(config.get_gateway_ip(), None) data = copy.deepcopy(sample_config) data['gateways'][0]["ip_address"] = "11.22.33.44`" - self.write_config(data) - config = EIPConfig() - config.load(self.configfile, relative=False) - self.assertEqual( - config.get_gateway_ip(), - None) + config = self._get_eipconfig(data=data) + + self.assertEqual(config.get_gateway_ip(), None) + + def test_default_gateway_on_unknown_index(self): + config = self._get_eipconfig() + sample_ip = sample_config["gateways"][0]["ip_address"] + self.assertEqual(config.get_gateway_ip(999), sample_ip) + + def test_get_gateway_by_index(self): + config = self._get_eipconfig() + sample_ip_0 = sample_config["gateways"][0]["ip_address"] + sample_ip_1 = sample_config["gateways"][1]["ip_address"] + self.assertEqual(config.get_gateway_ip(0), sample_ip_0) + self.assertEqual(config.get_gateway_ip(1), sample_ip_1) + + def test_get_client_cert_path_as_expected(self): + config = self._get_eipconfig() + config.get_path_prefix = Mock(return_value='test') + + provider_config = ProviderConfig() + + # mock 'get_domain' so we don't need to load a config + provider_domain = 'test.provider.com' + provider_config.get_domain = Mock(return_value=provider_domain) + + expected_path = os.path.join('test', 'leap', 'providers', + provider_domain, 'keys', 'client', + 'openvpn.pem') + + # mock 'os.path.exists' so we don't get an error for unexisting file + os.path.exists = Mock(return_value=True) + cert_path = config.get_client_cert_path(provider_config) + + self.assertEqual(cert_path, expected_path) + + def test_get_client_cert_path_about_to_download(self): + config = self._get_eipconfig() + config.get_path_prefix = Mock(return_value='test') + + provider_config = ProviderConfig() + + # mock 'get_domain' so we don't need to load a config + provider_domain = 'test.provider.com' + provider_config.get_domain = Mock(return_value=provider_domain) + + expected_path = os.path.join('test', 'leap', 'providers', + provider_domain, 'keys', 'client', + 'openvpn.pem') + + cert_path = config.get_client_cert_path( + provider_config, about_to_download=True) + + self.assertEqual(cert_path, expected_path) + + def test_get_client_cert_path_fails(self): + config = self._get_eipconfig() + provider_config = ProviderConfig() + + # mock 'get_domain' so we don't need to load a config + provider_domain = 'test.provider.com' + provider_config.get_domain = Mock(return_value=provider_domain) + + with self.assertRaises(AssertionError): + config.get_client_cert_path(provider_config) + if __name__ == "__main__": unittest.main() -- cgit v1.2.3