From 04d423e2a89034dfb86fe305108162fd2a696079 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 12 Dec 2012 03:29:31 +0900 Subject: tests for openvpn options and make the rest of tests pass after some changes in this branch (dirtyness in config files) --- src/leap/eip/checks.py | 2 +- src/leap/eip/config.py | 30 ++++++++----- src/leap/eip/specs.py | 7 +++ src/leap/eip/tests/test_checks.py | 6 +++ src/leap/eip/tests/test_config.py | 93 +++++++++++++++++++++++++++++++++++---- 5 files changed, 117 insertions(+), 21 deletions(-) (limited to 'src/leap/eip') diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index a876eea1..8d615b94 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -502,7 +502,7 @@ class EIPConfigChecker(object): return self.eipconfig.exists() def _dump_default_eipconfig(self): - self.eipconfig.save() + self.eipconfig.save(force=True) def _get_provider_definition_uri(self, domain=None, path=None): if domain is None: diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 1fe0530a..e40d2785 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -1,6 +1,7 @@ import logging import os import platform +import re import tempfile from leap import __branding as BRANDING @@ -110,14 +111,18 @@ def get_cipher_options(eipserviceconfig=None): eipsconf = eipserviceconfig.get_config() ALLOWED_KEYS = ("auth", "cipher", "tls-cipher") + CIPHERS_REGEX = re.compile("[A-Z0-9\-]+") opts = [] if 'openvpn_configuration' in eipsconf: - config = eipserviceconfig.openvpn_configuration + config = eipserviceconfig.config.get( + "openvpn_configuration", {}) for key, value in config.items(): if key in ALLOWED_KEYS and value is not None: - # I humbly think we should sanitize this - # input against `valid` openvpn settings. -- kali. - opts.append(['--%s' % key, value]) + sanitized_val = CIPHERS_REGEX.findall(value) + if len(sanitized_val) != 0: + _val = sanitized_val[0] + opts.append('--%s' % key) + opts.append('%s' % _val) return opts @@ -162,7 +167,9 @@ def build_ovpn_options(daemon=False, socket_path=None, **kwargs): opts.append('--verb') opts.append("%s" % verbosity) - # remote + # remote ############################## + # (server, port, protocol) + opts.append('--remote') gw = get_eip_gateway(eipconfig=eipconfig, @@ -170,12 +177,6 @@ def build_ovpn_options(daemon=False, socket_path=None, **kwargs): logger.debug('setting eip gateway to %s', gw) opts.append(str(gw)) - # get ciphers - ciphers = get_cipher_options( - eipserviceconfig=eipserviceconfig) - for cipheropt in ciphers: - opts.append(str(cipheropt)) - # get port/protocol from eipservice too opts.append('1194') #opts.append('80') @@ -185,6 +186,13 @@ def build_ovpn_options(daemon=False, socket_path=None, **kwargs): opts.append('--remote-cert-tls') opts.append('server') + # get ciphers ####################### + + ciphers = get_cipher_options( + eipserviceconfig=eipserviceconfig) + for cipheropt in ciphers: + opts.append(str(cipheropt)) + # set user and group opts.append('--user') opts.append('%s' % user) diff --git a/src/leap/eip/specs.py b/src/leap/eip/specs.py index 57e7537b..cf5d5359 100644 --- a/src/leap/eip/specs.py +++ b/src/leap/eip/specs.py @@ -119,6 +119,13 @@ eipservice_config_spec = { "label": {"en":"west"}, "capabilities": {}, "hosts": ["1.2.3.4", "1.2.3.5"]}] + }, + 'openvpn_configuration': { + 'type': dict, + 'default': { + "auth": None, + "cipher": None, + "tls-cipher": None} } } } diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py index 1d7bfc17..ab11037a 100644 --- a/src/leap/eip/tests/test_checks.py +++ b/src/leap/eip/tests/test_checks.py @@ -25,6 +25,7 @@ from leap.eip.tests import data as testdata from leap.testing.basetest import BaseLeapTest from leap.testing.https_server import BaseHTTPSServerTestCase from leap.testing.https_server import where as where_cert +from leap.util.fileutil import mkdir_f class NoLogRequestHandler: @@ -118,6 +119,7 @@ class EIPCheckTest(BaseLeapTest): sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) sampleconfig['provider'] = None eipcfg_path = checker.eipconfig.filename + mkdir_f(eipcfg_path) with open(eipcfg_path, 'w') as fp: json.dump(sampleconfig, fp) #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): @@ -138,6 +140,8 @@ class EIPCheckTest(BaseLeapTest): def test_fetch_definition(self): with patch.object(requests, "get") as mocked_get: mocked_get.return_value.status_code = 200 + mocked_get.return_value.headers = { + 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION checker = eipchecks.EIPConfigChecker(fetcher=requests) sampleconfig = testdata.EIP_SAMPLE_CONFIG @@ -156,6 +160,8 @@ class EIPCheckTest(BaseLeapTest): def test_fetch_eip_service_config(self): with patch.object(requests, "get") as mocked_get: mocked_get.return_value.status_code = 200 + mocked_get.return_value.headers = { + 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE checker = eipchecks.EIPConfigChecker(fetcher=requests) sampleconfig = testdata.EIP_SAMPLE_CONFIG diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py index 50538240..404d543f 100644 --- a/src/leap/eip/tests/test_config.py +++ b/src/leap/eip/tests/test_config.py @@ -1,3 +1,4 @@ +from collections import OrderedDict import json import os import platform @@ -10,7 +11,7 @@ except ImportError: #from leap.base import constants #from leap.eip import config as eip_config -from leap import __branding as BRANDING +#from leap import __branding as BRANDING from leap.eip import config as eipconfig from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE from leap.testing.basetest import BaseLeapTest @@ -47,11 +48,21 @@ class EIPConfigTest(BaseLeapTest): open(tfile, 'wb').close() os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - def write_sample_eipservice(self): + def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None): conf = eipconfig.EIPServiceConfig() folder, f = os.path.split(conf.filename) if not os.path.isdir(folder): mkdir_p(folder) + if vpnciphers: + openvpnconfig = OrderedDict({ + "auth": "SHA1", + "cipher": "AES-128-CBC", + "tls-cipher": "DHE-RSA-AES128-SHA"}) + if extra_vpnopts: + for k, v in extra_vpnopts.items(): + openvpnconfig[k] = v + EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig + with open(conf.filename, 'w') as fd: fd.write(json.dumps(EIP_SAMPLE_SERVICE)) @@ -63,8 +74,13 @@ class EIPConfigTest(BaseLeapTest): with open(conf.filename, 'w') as fd: fd.write(json.dumps(EIP_SAMPLE_CONFIG)) - def get_expected_openvpn_args(self): + def get_expected_openvpn_args(self, with_openvpn_ciphers=False): args = [] + eipconf = eipconfig.EIPConfig(domain=self.provider) + eipconf.load() + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + eipsconf.load() + username = self.get_username() groupname = self.get_groupname() @@ -75,8 +91,10 @@ class EIPConfigTest(BaseLeapTest): args.append('--persist-tun') args.append('--persist-key') args.append('--remote') + args.append('%s' % eipconfig.get_eip_gateway( - provider=self.provider)) + eipconfig=eipconf, + eipserviceconfig=eipsconf)) # XXX get port!? args.append('1194') # XXX get proto @@ -85,6 +103,14 @@ class EIPConfigTest(BaseLeapTest): args.append('--remote-cert-tls') args.append('server') + if with_openvpn_ciphers: + CIPHERS = [ + "--tls-cipher", "DHE-RSA-AES128-SHA", + "--cipher", "AES-128-CBC", + "--auth", "SHA1"] + for opt in CIPHERS: + args.append(opt) + args.append('--user') args.append(username) args.append('--group') @@ -139,14 +165,63 @@ class EIPConfigTest(BaseLeapTest): from leap.util.fileutil import which path = os.environ['PATH'] vpnbin = which('openvpn', path=path) - print 'path =', path - print 'vpnbin = ', vpnbin - command, args = eipconfig.build_ovpn_command( + #print 'path =', path + #print 'vpnbin = ', vpnbin + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + self.assertEqual(vpnargs, self.get_expected_openvpn_args()) + + def test_build_ovpn_command_openvpnoptions(self): + self.touch_exec() + + from leap.eip import config as eipconfig + from leap.util.fileutil import which + path = os.environ['PATH'] + vpnbin = which('openvpn', path=path) + + self.write_sample_eipconfig() + + # regular run, everything normal + self.write_sample_eipservice(vpnciphers=True) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal options + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher + extra_vpnopts={"notallowedconfig": "badvalue"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal chars + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW A-Z09\- + extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( do_pkexec_check=False, vpnbin=vpnbin, socket_path="/tmp/test.socket", provider=self.provider) - self.assertEqual(command, self.home + '/bin/openvpn') - self.assertEqual(args, self.get_expected_openvpn_args()) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) if __name__ == "__main__": -- cgit v1.2.3