From 04d423e2a89034dfb86fe305108162fd2a696079 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 12 Dec 2012 03:29:31 +0900 Subject: tests for openvpn options and make the rest of tests pass after some changes in this branch (dirtyness in config files) --- src/leap/base/config.py | 23 ++++++++- src/leap/base/tests/test_providers.py | 8 +-- src/leap/baseapp/systray.py | 2 + src/leap/eip/checks.py | 2 +- src/leap/eip/config.py | 30 ++++++----- src/leap/eip/specs.py | 7 +++ src/leap/eip/tests/test_checks.py | 6 +++ src/leap/eip/tests/test_config.py | 93 +++++++++++++++++++++++++++++++---- src/leap/util/fileutil.py | 5 ++ 9 files changed, 149 insertions(+), 27 deletions(-) (limited to 'src/leap') diff --git a/src/leap/base/config.py b/src/leap/base/config.py index 321fbdcd..b307ad05 100644 --- a/src/leap/base/config.py +++ b/src/leap/base/config.py @@ -126,14 +126,33 @@ class JSONLeapConfig(BaseLeapConfig): # mandatory baseconfig interface - def save(self, to=None): - if self._config.is_dirty(): + def save(self, to=None, force=False): + """ + force param will skip the dirty check. + :type force: bool + """ + # XXX this force=True does not feel to right + # but still have to look for a better way + # of dealing with dirtiness and the + # trick of loading remote config only + # when newer. + + if force: + do_save = True + else: + do_save = self._config.is_dirty() + + if do_save: if to is None: to = self.filename folder, filename = os.path.split(to) if folder and not os.path.isdir(folder): mkdir_p(folder) self._config.serialize(to) + return True + + else: + return False def load(self, fromfile=None, from_uri=None, fetcher=None, force_download=False, verify=False): diff --git a/src/leap/base/tests/test_providers.py b/src/leap/base/tests/test_providers.py index 15c4ed58..d9604fab 100644 --- a/src/leap/base/tests/test_providers.py +++ b/src/leap/base/tests/test_providers.py @@ -8,7 +8,7 @@ import os import jsonschema -from leap import __branding as BRANDING +#from leap import __branding as BRANDING from leap.testing.basetest import BaseLeapTest from leap.base import providers @@ -33,8 +33,8 @@ class TestLeapProviderDefinition(BaseLeapTest): self.domain = "testprovider.example.org" self.definition = providers.LeapProviderDefinition( domain=self.domain) - self.definition.save() - self.definition.load() + self.definition.save(force=True) + self.definition.load() # why have to load after save?? self.config = self.definition.config def tearDown(self): @@ -61,7 +61,7 @@ class TestLeapProviderDefinition(BaseLeapTest): def test_provider_dump(self): # check a good provider definition is dumped to disk self.testfile = self.get_tempfile('test.json') - self.definition.save(to=self.testfile) + self.definition.save(to=self.testfile, force=True) deserialized = json.load(open(self.testfile, 'rb')) self.maxDiff = None self.assertEqual(deserialized, EXPECTED_DEFAULT_CONFIG) diff --git a/src/leap/baseapp/systray.py b/src/leap/baseapp/systray.py index 49f044aa..52060ae2 100644 --- a/src/leap/baseapp/systray.py +++ b/src/leap/baseapp/systray.py @@ -217,6 +217,8 @@ class StatusAwareTrayIconMixin(object): updates icon, according to the openvpn status change. """ icon_name = self.conductor.get_icon_name() + if not icon_name: + return # XXX refactor. Use QStateMachine diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index a876eea1..8d615b94 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -502,7 +502,7 @@ class EIPConfigChecker(object): return self.eipconfig.exists() def _dump_default_eipconfig(self): - self.eipconfig.save() + self.eipconfig.save(force=True) def _get_provider_definition_uri(self, domain=None, path=None): if domain is None: diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 1fe0530a..e40d2785 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -1,6 +1,7 @@ import logging import os import platform +import re import tempfile from leap import __branding as BRANDING @@ -110,14 +111,18 @@ def get_cipher_options(eipserviceconfig=None): eipsconf = eipserviceconfig.get_config() ALLOWED_KEYS = ("auth", "cipher", "tls-cipher") + CIPHERS_REGEX = re.compile("[A-Z0-9\-]+") opts = [] if 'openvpn_configuration' in eipsconf: - config = eipserviceconfig.openvpn_configuration + config = eipserviceconfig.config.get( + "openvpn_configuration", {}) for key, value in config.items(): if key in ALLOWED_KEYS and value is not None: - # I humbly think we should sanitize this - # input against `valid` openvpn settings. -- kali. - opts.append(['--%s' % key, value]) + sanitized_val = CIPHERS_REGEX.findall(value) + if len(sanitized_val) != 0: + _val = sanitized_val[0] + opts.append('--%s' % key) + opts.append('%s' % _val) return opts @@ -162,7 +167,9 @@ def build_ovpn_options(daemon=False, socket_path=None, **kwargs): opts.append('--verb') opts.append("%s" % verbosity) - # remote + # remote ############################## + # (server, port, protocol) + opts.append('--remote') gw = get_eip_gateway(eipconfig=eipconfig, @@ -170,12 +177,6 @@ def build_ovpn_options(daemon=False, socket_path=None, **kwargs): logger.debug('setting eip gateway to %s', gw) opts.append(str(gw)) - # get ciphers - ciphers = get_cipher_options( - eipserviceconfig=eipserviceconfig) - for cipheropt in ciphers: - opts.append(str(cipheropt)) - # get port/protocol from eipservice too opts.append('1194') #opts.append('80') @@ -185,6 +186,13 @@ def build_ovpn_options(daemon=False, socket_path=None, **kwargs): opts.append('--remote-cert-tls') opts.append('server') + # get ciphers ####################### + + ciphers = get_cipher_options( + eipserviceconfig=eipserviceconfig) + for cipheropt in ciphers: + opts.append(str(cipheropt)) + # set user and group opts.append('--user') opts.append('%s' % user) diff --git a/src/leap/eip/specs.py b/src/leap/eip/specs.py index 57e7537b..cf5d5359 100644 --- a/src/leap/eip/specs.py +++ b/src/leap/eip/specs.py @@ -119,6 +119,13 @@ eipservice_config_spec = { "label": {"en":"west"}, "capabilities": {}, "hosts": ["1.2.3.4", "1.2.3.5"]}] + }, + 'openvpn_configuration': { + 'type': dict, + 'default': { + "auth": None, + "cipher": None, + "tls-cipher": None} } } } diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py index 1d7bfc17..ab11037a 100644 --- a/src/leap/eip/tests/test_checks.py +++ b/src/leap/eip/tests/test_checks.py @@ -25,6 +25,7 @@ from leap.eip.tests import data as testdata from leap.testing.basetest import BaseLeapTest from leap.testing.https_server import BaseHTTPSServerTestCase from leap.testing.https_server import where as where_cert +from leap.util.fileutil import mkdir_f class NoLogRequestHandler: @@ -118,6 +119,7 @@ class EIPCheckTest(BaseLeapTest): sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) sampleconfig['provider'] = None eipcfg_path = checker.eipconfig.filename + mkdir_f(eipcfg_path) with open(eipcfg_path, 'w') as fp: json.dump(sampleconfig, fp) #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): @@ -138,6 +140,8 @@ class EIPCheckTest(BaseLeapTest): def test_fetch_definition(self): with patch.object(requests, "get") as mocked_get: mocked_get.return_value.status_code = 200 + mocked_get.return_value.headers = { + 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION checker = eipchecks.EIPConfigChecker(fetcher=requests) sampleconfig = testdata.EIP_SAMPLE_CONFIG @@ -156,6 +160,8 @@ class EIPCheckTest(BaseLeapTest): def test_fetch_eip_service_config(self): with patch.object(requests, "get") as mocked_get: mocked_get.return_value.status_code = 200 + mocked_get.return_value.headers = { + 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE checker = eipchecks.EIPConfigChecker(fetcher=requests) sampleconfig = testdata.EIP_SAMPLE_CONFIG diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py index 50538240..404d543f 100644 --- a/src/leap/eip/tests/test_config.py +++ b/src/leap/eip/tests/test_config.py @@ -1,3 +1,4 @@ +from collections import OrderedDict import json import os import platform @@ -10,7 +11,7 @@ except ImportError: #from leap.base import constants #from leap.eip import config as eip_config -from leap import __branding as BRANDING +#from leap import __branding as BRANDING from leap.eip import config as eipconfig from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE from leap.testing.basetest import BaseLeapTest @@ -47,11 +48,21 @@ class EIPConfigTest(BaseLeapTest): open(tfile, 'wb').close() os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - def write_sample_eipservice(self): + def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None): conf = eipconfig.EIPServiceConfig() folder, f = os.path.split(conf.filename) if not os.path.isdir(folder): mkdir_p(folder) + if vpnciphers: + openvpnconfig = OrderedDict({ + "auth": "SHA1", + "cipher": "AES-128-CBC", + "tls-cipher": "DHE-RSA-AES128-SHA"}) + if extra_vpnopts: + for k, v in extra_vpnopts.items(): + openvpnconfig[k] = v + EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig + with open(conf.filename, 'w') as fd: fd.write(json.dumps(EIP_SAMPLE_SERVICE)) @@ -63,8 +74,13 @@ class EIPConfigTest(BaseLeapTest): with open(conf.filename, 'w') as fd: fd.write(json.dumps(EIP_SAMPLE_CONFIG)) - def get_expected_openvpn_args(self): + def get_expected_openvpn_args(self, with_openvpn_ciphers=False): args = [] + eipconf = eipconfig.EIPConfig(domain=self.provider) + eipconf.load() + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + eipsconf.load() + username = self.get_username() groupname = self.get_groupname() @@ -75,8 +91,10 @@ class EIPConfigTest(BaseLeapTest): args.append('--persist-tun') args.append('--persist-key') args.append('--remote') + args.append('%s' % eipconfig.get_eip_gateway( - provider=self.provider)) + eipconfig=eipconf, + eipserviceconfig=eipsconf)) # XXX get port!? args.append('1194') # XXX get proto @@ -85,6 +103,14 @@ class EIPConfigTest(BaseLeapTest): args.append('--remote-cert-tls') args.append('server') + if with_openvpn_ciphers: + CIPHERS = [ + "--tls-cipher", "DHE-RSA-AES128-SHA", + "--cipher", "AES-128-CBC", + "--auth", "SHA1"] + for opt in CIPHERS: + args.append(opt) + args.append('--user') args.append(username) args.append('--group') @@ -139,14 +165,63 @@ class EIPConfigTest(BaseLeapTest): from leap.util.fileutil import which path = os.environ['PATH'] vpnbin = which('openvpn', path=path) - print 'path =', path - print 'vpnbin = ', vpnbin - command, args = eipconfig.build_ovpn_command( + #print 'path =', path + #print 'vpnbin = ', vpnbin + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + self.assertEqual(vpnargs, self.get_expected_openvpn_args()) + + def test_build_ovpn_command_openvpnoptions(self): + self.touch_exec() + + from leap.eip import config as eipconfig + from leap.util.fileutil import which + path = os.environ['PATH'] + vpnbin = which('openvpn', path=path) + + self.write_sample_eipconfig() + + # regular run, everything normal + self.write_sample_eipservice(vpnciphers=True) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal options + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher + extra_vpnopts={"notallowedconfig": "badvalue"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal chars + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW A-Z09\- + extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( do_pkexec_check=False, vpnbin=vpnbin, socket_path="/tmp/test.socket", provider=self.provider) - self.assertEqual(command, self.home + '/bin/openvpn') - self.assertEqual(args, self.get_expected_openvpn_args()) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) if __name__ == "__main__": diff --git a/src/leap/util/fileutil.py b/src/leap/util/fileutil.py index aef4cfe0..820ffe46 100644 --- a/src/leap/util/fileutil.py +++ b/src/leap/util/fileutil.py @@ -93,6 +93,11 @@ def mkdir_p(path): raise +def mkdir_f(path): + folder, fname = os.path.split(path) + mkdir_p(folder) + + def check_and_fix_urw_only(_file): """ test for 600 mode and try -- cgit v1.2.3