diff options
Diffstat (limited to 'src/leap/eip/tests/test_config.py')
-rw-r--r-- | src/leap/eip/tests/test_config.py | 155 |
1 files changed, 142 insertions, 13 deletions
diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py index 50538240..5977ef3c 100644 --- a/src/leap/eip/tests/test_config.py +++ b/src/leap/eip/tests/test_config.py @@ -1,3 +1,4 @@ +from collections import OrderedDict import json import os import platform @@ -10,11 +11,11 @@ except ImportError: #from leap.base import constants #from leap.eip import config as eip_config -from leap import __branding as BRANDING +#from leap import __branding as BRANDING from leap.eip import config as eipconfig from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE from leap.testing.basetest import BaseLeapTest -from leap.util.fileutil import mkdir_p +from leap.util.fileutil import mkdir_p, mkdir_f _system = platform.system() @@ -47,11 +48,22 @@ class EIPConfigTest(BaseLeapTest): open(tfile, 'wb').close() os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - def write_sample_eipservice(self): + def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None, + gateways=None): conf = eipconfig.EIPServiceConfig() - folder, f = os.path.split(conf.filename) - if not os.path.isdir(folder): - mkdir_p(folder) + mkdir_f(conf.filename) + if gateways: + EIP_SAMPLE_SERVICE['gateways'] = gateways + if vpnciphers: + openvpnconfig = OrderedDict({ + "auth": "SHA1", + "cipher": "AES-128-CBC", + "tls-cipher": "DHE-RSA-AES128-SHA"}) + if extra_vpnopts: + for k, v in extra_vpnopts.items(): + openvpnconfig[k] = v + EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig + with open(conf.filename, 'w') as fd: fd.write(json.dumps(EIP_SAMPLE_SERVICE)) @@ -63,8 +75,17 @@ class EIPConfigTest(BaseLeapTest): with open(conf.filename, 'w') as fd: fd.write(json.dumps(EIP_SAMPLE_CONFIG)) - def get_expected_openvpn_args(self): + def get_expected_openvpn_args(self, with_openvpn_ciphers=False): + """ + yeah, this is almost as duplicating the + code for building the command + """ args = [] + eipconf = eipconfig.EIPConfig(domain=self.provider) + eipconf.load() + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + eipsconf.load() + username = self.get_username() groupname = self.get_groupname() @@ -75,8 +96,10 @@ class EIPConfigTest(BaseLeapTest): args.append('--persist-tun') args.append('--persist-key') args.append('--remote') + args.append('%s' % eipconfig.get_eip_gateway( - provider=self.provider)) + eipconfig=eipconf, + eipserviceconfig=eipsconf)) # XXX get port!? args.append('1194') # XXX get proto @@ -85,6 +108,14 @@ class EIPConfigTest(BaseLeapTest): args.append('--remote-cert-tls') args.append('server') + if with_openvpn_ciphers: + CIPHERS = [ + "--tls-cipher", "DHE-RSA-AES128-SHA", + "--cipher", "AES-128-CBC", + "--auth", "SHA1"] + for opt in CIPHERS: + args.append(opt) + args.append('--user') args.append(username) args.append('--group') @@ -130,6 +161,55 @@ class EIPConfigTest(BaseLeapTest): # params in the function call, to disable # some checks. + def test_get_eip_gateway(self): + self.write_sample_eipconfig() + eipconf = eipconfig.EIPConfig(domain=self.provider) + + # default eipservice + self.write_sample_eipservice() + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + + gateway = eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf) + + # in spec is local gateway by default + self.assertEqual(gateway, '127.0.0.1') + + # change eipservice + # right now we only check that cluster == selected primary gw in + # eip.json, and pick first matching ip + eipconf._config.config['primary_gateway'] = "foo_provider" + newgateways = [{"cluster": "foo_provider", + "ip_address": "127.0.0.99"}] + self.write_sample_eipservice(gateways=newgateways) + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + # load from disk file + eipsconf.load() + + gateway = eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf) + self.assertEqual(gateway, '127.0.0.99') + + # change eipservice, several gateways + # right now we only check that cluster == selected primary gw in + # eip.json, and pick first matching ip + eipconf._config.config['primary_gateway'] = "bar_provider" + newgateways = [{"cluster": "foo_provider", + "ip_address": "127.0.0.99"}, + {'cluster': "bar_provider", + "ip_address": "127.0.0.88"}] + self.write_sample_eipservice(gateways=newgateways) + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + # load from disk file + eipsconf.load() + + gateway = eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf) + self.assertEqual(gateway, '127.0.0.88') + def test_build_ovpn_command_empty_config(self): self.touch_exec() self.write_sample_eipservice() @@ -139,14 +219,63 @@ class EIPConfigTest(BaseLeapTest): from leap.util.fileutil import which path = os.environ['PATH'] vpnbin = which('openvpn', path=path) - print 'path =', path - print 'vpnbin = ', vpnbin - command, args = eipconfig.build_ovpn_command( + #print 'path =', path + #print 'vpnbin = ', vpnbin + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + self.assertEqual(vpnargs, self.get_expected_openvpn_args()) + + def test_build_ovpn_command_openvpnoptions(self): + self.touch_exec() + + from leap.eip import config as eipconfig + from leap.util.fileutil import which + path = os.environ['PATH'] + vpnbin = which('openvpn', path=path) + + self.write_sample_eipconfig() + + # regular run, everything normal + self.write_sample_eipservice(vpnciphers=True) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal options + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher + extra_vpnopts={"notallowedconfig": "badvalue"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal chars + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW A-Z09\- + extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( do_pkexec_check=False, vpnbin=vpnbin, socket_path="/tmp/test.socket", provider=self.provider) - self.assertEqual(command, self.home + '/bin/openvpn') - self.assertEqual(args, self.get_expected_openvpn_args()) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) if __name__ == "__main__": |