diff options
author | Tomás Touceda <chiiph@leap.se> | 2013-06-28 14:25:19 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2013-06-28 14:25:19 -0300 |
commit | 5b975799ce9b7a6e0a88be4bcb48bdfb90800bb3 (patch) | |
tree | bda674a79b1aeccb37b67609517bc4761db7ae07 /src/leap/eip/tests | |
parent | 9cea9c8a34343f8792d65b96f93ae22bd8685878 (diff) | |
parent | c088a1544a5f7a51359d2802019c0740aab0cc5b (diff) |
Merge branch 'release-0.2.2'0.2.2
Diffstat (limited to 'src/leap/eip/tests')
-rw-r--r-- | src/leap/eip/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/eip/tests/data.py | 51 | ||||
-rw-r--r-- | src/leap/eip/tests/test_checks.py | 372 | ||||
-rw-r--r-- | src/leap/eip/tests/test_config.py | 298 | ||||
-rw-r--r-- | src/leap/eip/tests/test_eipconnection.py | 216 | ||||
-rw-r--r-- | src/leap/eip/tests/test_openvpnconnection.py | 161 |
6 files changed, 0 insertions, 1098 deletions
diff --git a/src/leap/eip/tests/__init__.py b/src/leap/eip/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/src/leap/eip/tests/__init__.py +++ /dev/null diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py deleted file mode 100644 index a7fe1853..00000000 --- a/src/leap/eip/tests/data.py +++ /dev/null @@ -1,51 +0,0 @@ -from __future__ import unicode_literals -import os - -#from leap import __branding - -# sample data used in tests - -#PROVIDER = __branding.get('provider_domain') -PROVIDER = "testprovider.example.org" - -EIP_SAMPLE_CONFIG = { - "provider": "%s" % PROVIDER, - "transport": "openvpn", - "openvpn_protocol": "tcp", - "openvpn_port": 80, - "openvpn_ca_certificate": os.path.expanduser( - "~/.config/leap/providers/" - "%s/" - "keys/ca/cacert.pem" % PROVIDER), - "openvpn_client_certificate": os.path.expanduser( - "~/.config/leap/providers/" - "%s/" - "keys/client/openvpn.pem" % PROVIDER), - "connect_on_login": True, - "block_cleartext_traffic": True, - "primary_gateway": "location_unknown", - "secondary_gateway": "location_unknown2", - #"management_password": "oph7Que1othahwiech6J" -} - -EIP_SAMPLE_SERVICE = { - "serial": 1, - "version": 1, - "clusters": [ - {"label": { - "en": "Location Unknown"}, - "name": "location_unknown"} - ], - "gateways": [ - {"capabilities": { - "adblock": True, - "filter_dns": True, - "ports": ["80", "53", "443", "1194"], - "protocols": ["udp", "tcp"], - "transport": ["openvpn"], - "user_ips": False}, - "cluster": "location_unknown", - "host": "location.example.org", - "ip_address": "192.0.43.10"} - ] -} diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py deleted file mode 100644 index f42a0eeb..00000000 --- a/src/leap/eip/tests/test_checks.py +++ /dev/null @@ -1,372 +0,0 @@ -from BaseHTTPServer import BaseHTTPRequestHandler -import copy -import json -try: - import unittest2 as unittest -except ImportError: - import unittest -import os -import time -import urlparse - -from mock import (patch, Mock) - -import requests - -from leap.base import config as baseconfig -from leap.base import pluggableconfig -from leap.base.constants import (DEFAULT_PROVIDER_DEFINITION, - DEFINITION_EXPECTED_PATH) -from leap.eip import checks as eipchecks -from leap.eip import specs as eipspecs -from leap.eip import exceptions as eipexceptions -from leap.eip.tests import data as testdata -from leap.testing.basetest import BaseLeapTest -from leap.testing.https_server import BaseHTTPSServerTestCase -from leap.testing.https_server import where as where_cert -from leap.util.fileutil import mkdir_f - - -class NoLogRequestHandler: - def log_message(self, *args): - # don't write log msg to stderr - pass - - def read(self, n=None): - return '' - - -class EIPCheckTest(BaseLeapTest): - - __name__ = "eip_check_tests" - provider = "testprovider.example.org" - maxDiff = None - - def setUp(self): - pass - - def tearDown(self): - pass - - # test methods are there, and can be called from run_all - - def test_checker_should_implement_check_methods(self): - checker = eipchecks.EIPConfigChecker(domain=self.provider) - - self.assertTrue(hasattr(checker, "check_default_eipconfig"), - "missing meth") - self.assertTrue(hasattr(checker, "check_is_there_default_provider"), - "missing meth") - self.assertTrue(hasattr(checker, "fetch_definition"), "missing meth") - self.assertTrue(hasattr(checker, "fetch_eip_service_config"), - "missing meth") - self.assertTrue(hasattr(checker, "check_complete_eip_config"), - "missing meth") - - def test_checker_should_actually_call_all_tests(self): - checker = eipchecks.EIPConfigChecker(domain=self.provider) - - mc = Mock() - checker.run_all(checker=mc) - self.assertTrue(mc.check_default_eipconfig.called, "not called") - self.assertTrue(mc.check_is_there_default_provider.called, - "not called") - self.assertTrue(mc.fetch_definition.called, - "not called") - self.assertTrue(mc.fetch_eip_service_config.called, - "not called") - self.assertTrue(mc.check_complete_eip_config.called, - "not called") - - # test individual check methods - - def test_check_default_eipconfig(self): - checker = eipchecks.EIPConfigChecker(domain=self.provider) - # no eip config (empty home) - eipconfig_path = checker.eipconfig.filename - self.assertFalse(os.path.isfile(eipconfig_path)) - checker.check_default_eipconfig() - # we've written one, so it should be there. - self.assertTrue(os.path.isfile(eipconfig_path)) - with open(eipconfig_path, 'rb') as fp: - deserialized = json.load(fp) - - # force re-evaluation of the paths - # small workaround for evaluating home dirs correctly - EIP_SAMPLE_CONFIG = copy.copy(testdata.EIP_SAMPLE_CONFIG) - EIP_SAMPLE_CONFIG['openvpn_client_certificate'] = \ - eipspecs.client_cert_path(self.provider) - EIP_SAMPLE_CONFIG['openvpn_ca_certificate'] = \ - eipspecs.provider_ca_path(self.provider) - self.assertEqual(deserialized, EIP_SAMPLE_CONFIG) - - # TODO: shold ALSO run validation methods. - - def test_check_is_there_default_provider(self): - checker = eipchecks.EIPConfigChecker(domain=self.provider) - # we do dump a sample eip config, but lacking a - # default provider entry. - # This error will be possible catched in a different - # place, when JSONConfig does validation of required fields. - - # passing direct config - with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): - checker.check_is_there_default_provider(config={}) - - # ok. now, messing with real files... - # blank out default_provider - sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) - sampleconfig['provider'] = None - eipcfg_path = checker.eipconfig.filename - mkdir_f(eipcfg_path) - with open(eipcfg_path, 'w') as fp: - json.dump(sampleconfig, fp) - #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): - # XXX we should catch this as one of our errors, but do not - # see how to do it quickly. - with self.assertRaises(pluggableconfig.ValidationError): - #import ipdb;ipdb.set_trace() - checker.eipconfig.load(fromfile=eipcfg_path) - checker.check_is_there_default_provider() - - sampleconfig = testdata.EIP_SAMPLE_CONFIG - #eipcfg_path = checker._get_default_eipconfig_path() - with open(eipcfg_path, 'w') as fp: - json.dump(sampleconfig, fp) - checker.eipconfig.load() - self.assertTrue(checker.check_is_there_default_provider()) - - def test_fetch_definition(self): - with patch.object(requests, "get") as mocked_get: - mocked_get.return_value.status_code = 200 - mocked_get.return_value.headers = { - 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} - mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION - checker = eipchecks.EIPConfigChecker(fetcher=requests) - sampleconfig = testdata.EIP_SAMPLE_CONFIG - checker.fetch_definition(config=sampleconfig) - - fn = os.path.join(baseconfig.get_default_provider_path(), - DEFINITION_EXPECTED_PATH) - with open(fn, 'r') as fp: - deserialized = json.load(fp) - self.assertEqual(DEFAULT_PROVIDER_DEFINITION, deserialized) - - # XXX TODO check for ConnectionError, HTTPError, InvalidUrl - # (and proper EIPExceptions are raised). - # Look at base.test_config. - - def test_fetch_eip_service_config(self): - with patch.object(requests, "get") as mocked_get: - mocked_get.return_value.status_code = 200 - mocked_get.return_value.headers = { - 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} - mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE - checker = eipchecks.EIPConfigChecker(fetcher=requests) - sampleconfig = testdata.EIP_SAMPLE_CONFIG - checker.fetch_eip_service_config(config=sampleconfig) - - def test_check_complete_eip_config(self): - checker = eipchecks.EIPConfigChecker() - with self.assertRaises(eipexceptions.EIPConfigurationError): - sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) - sampleconfig['provider'] = None - checker.check_complete_eip_config(config=sampleconfig) - with self.assertRaises(eipexceptions.EIPConfigurationError): - sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) - del sampleconfig['provider'] - checker.check_complete_eip_config(config=sampleconfig) - - # normal case - sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) - checker.check_complete_eip_config(config=sampleconfig) - - -class ProviderCertCheckerTest(BaseLeapTest): - - __name__ = "provider_cert_checker_tests" - provider = "testprovider.example.org" - - def setUp(self): - pass - - def tearDown(self): - pass - - # test methods are there, and can be called from run_all - - def test_checker_should_implement_check_methods(self): - checker = eipchecks.ProviderCertChecker() - - # For MVS+ - self.assertTrue(hasattr(checker, "download_ca_cert"), - "missing meth") - self.assertTrue(hasattr(checker, "download_ca_signature"), - "missing meth") - self.assertTrue(hasattr(checker, "get_ca_signatures"), "missing meth") - self.assertTrue(hasattr(checker, "is_there_trust_path"), - "missing meth") - - # For MVS - self.assertTrue(hasattr(checker, "is_there_provider_ca"), - "missing meth") - self.assertTrue(hasattr(checker, "is_https_working"), "missing meth") - self.assertTrue(hasattr(checker, "check_new_cert_needed"), - "missing meth") - - def test_checker_should_actually_call_all_tests(self): - checker = eipchecks.ProviderCertChecker() - - mc = Mock() - checker.run_all(checker=mc) - # XXX MVS+ - #self.assertTrue(mc.download_ca_cert.called, "not called") - #self.assertTrue(mc.download_ca_signature.called, "not called") - #self.assertTrue(mc.get_ca_signatures.called, "not called") - #self.assertTrue(mc.is_there_trust_path.called, "not called") - - # For MVS - self.assertTrue(mc.is_there_provider_ca.called, "not called") - self.assertTrue(mc.is_https_working.called, - "not called") - self.assertTrue(mc.check_new_cert_needed.called, - "not called") - - # test individual check methods - - @unittest.skip - def test_is_there_provider_ca(self): - # XXX commenting out this test. - # With the generic client this does not make sense, - # we should dump one there. - # or test conductor logic. - checker = eipchecks.ProviderCertChecker() - self.assertTrue( - checker.is_there_provider_ca()) - - -class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest): - provider = "testprovider.example.org" - - class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): - responses = { - '/': ['OK', ''], - '/client.cert': [ - # XXX get sample cert - '-----BEGIN CERTIFICATE-----', - '-----END CERTIFICATE-----'], - '/badclient.cert': [ - 'BADCERT']} - - def do_GET(self): - path = urlparse.urlparse(self.path) - message = '\n'.join(self.responses.get( - path.path, None)) - self.send_response(200) - self.end_headers() - self.wfile.write(message) - - def test_is_https_working(self): - fetcher = requests - uri = "https://%s/" % (self.get_server()) - # bare requests call. this should just pass (if there is - # an https service there). - fetcher.get(uri, verify=False) - checker = eipchecks.ProviderCertChecker(fetcher=fetcher) - self.assertTrue(checker.is_https_working(uri=uri, verify=False)) - - # for local debugs, when in doubt - #self.assertTrue(checker.is_https_working(uri="https://github.com", - #verify=True)) - - # for the two checks below, I know they fail because no ca - # cert is passed to them, and I know that's the error that - # requests return with our implementation. - # We're receiving this because our - # server is dying prematurely when the handshake is interrupted on the - # client side. - # Since we have access to the server, we could check that - # the error raised has been: - # SSL23_READ_BYTES: alert bad certificate - with self.assertRaises(requests.exceptions.SSLError) as exc: - fetcher.get(uri, verify=True) - self.assertTrue( - "SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message) - - # XXX FIXME! Uncomment after #638 is done - #with self.assertRaises(eipexceptions.EIPBadCertError) as exc: - #checker.is_https_working(uri=uri, verify=True) - #self.assertTrue( - #"cert verification failed" in exc.message) - - # get cacert from testing.https_server - cacert = where_cert('cacert.pem') - fetcher.get(uri, verify=cacert) - self.assertTrue(checker.is_https_working(uri=uri, verify=cacert)) - - # same, but get cacert from leap.custom - # XXX TODO! - - @unittest.skip - def test_download_new_client_cert(self): - # FIXME - # Magick srp decorator broken right now... - # Have to mock the decorator and inject something that - # can bypass the authentication - - uri = "https://%s/client.cert" % (self.get_server()) - cacert = where_cert('cacert.pem') - checker = eipchecks.ProviderCertChecker(domain=self.provider) - credentials = "testuser", "testpassword" - self.assertTrue(checker.download_new_client_cert( - credentials=credentials, uri=uri, verify=cacert)) - - # now download a malformed cert - uri = "https://%s/badclient.cert" % (self.get_server()) - cacert = where_cert('cacert.pem') - checker = eipchecks.ProviderCertChecker() - with self.assertRaises(ValueError): - self.assertTrue(checker.download_new_client_cert( - credentials=credentials, uri=uri, verify=cacert)) - - # did we write cert to its path? - clientcertfile = eipspecs.client_cert_path() - self.assertTrue(os.path.isfile(clientcertfile)) - certfile = eipspecs.client_cert_path() - with open(certfile, 'r') as cf: - certcontent = cf.read() - self.assertEqual(certcontent, - '\n'.join( - self.request_handler.responses['/client.cert'])) - os.remove(clientcertfile) - - def test_is_cert_valid(self): - checker = eipchecks.ProviderCertChecker() - # TODO: better exception catching - # should raise eipexceptions.BadClientCertificate, and give reasons - # on msg. - with self.assertRaises(Exception) as exc: - self.assertFalse(checker.is_cert_valid()) - exc.message = "missing cert" - - def test_bad_validity_certs(self): - checker = eipchecks.ProviderCertChecker() - certfile = where_cert('leaptestscert.pem') - self.assertFalse(checker.is_cert_not_expired( - certfile=certfile, - now=lambda: time.mktime((2038, 1, 1, 1, 1, 1, 1, 1, 1)))) - self.assertFalse(checker.is_cert_not_expired( - certfile=certfile, - now=lambda: time.mktime((1970, 1, 1, 1, 1, 1, 1, 1, 1)))) - - def test_check_new_cert_needed(self): - # check: missing cert - checker = eipchecks.ProviderCertChecker(domain=self.provider) - self.assertTrue(checker.check_new_cert_needed(skip_download=True)) - # TODO check: malformed cert - # TODO check: expired cert - # TODO check: pass test server uri instead of skip - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py deleted file mode 100644 index 72ab3c8e..00000000 --- a/src/leap/eip/tests/test_config.py +++ /dev/null @@ -1,298 +0,0 @@ -from collections import OrderedDict -import json -import os -import platform -import stat - -try: - import unittest2 as unittest -except ImportError: - import unittest - -#from leap.base import constants -#from leap.eip import config as eip_config -#from leap import __branding as BRANDING -from leap.eip import config as eipconfig -from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE -from leap.testing.basetest import BaseLeapTest -from leap.util.fileutil import mkdir_p, mkdir_f - -_system = platform.system() - -#PROVIDER = BRANDING.get('provider_domain') -#PROVIDER_SHORTNAME = BRANDING.get('short_name') - - -class EIPConfigTest(BaseLeapTest): - - __name__ = "eip_config_tests" - provider = "testprovider.example.org" - - maxDiff = None - - def setUp(self): - pass - - def tearDown(self): - pass - - # - # helpers - # - - def touch_exec(self): - path = os.path.join( - self.tempdir, 'bin') - mkdir_p(path) - tfile = os.path.join( - path, - 'openvpn') - open(tfile, 'wb').close() - os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - - def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None, - gateways=None): - conf = eipconfig.EIPServiceConfig() - mkdir_f(conf.filename) - if gateways: - EIP_SAMPLE_SERVICE['gateways'] = gateways - if vpnciphers: - openvpnconfig = OrderedDict({ - "auth": "SHA1", - "cipher": "AES-128-CBC", - "tls-cipher": "DHE-RSA-AES128-SHA"}) - if extra_vpnopts: - for k, v in extra_vpnopts.items(): - openvpnconfig[k] = v - EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig - - with open(conf.filename, 'w') as fd: - fd.write(json.dumps(EIP_SAMPLE_SERVICE)) - - def write_sample_eipconfig(self): - conf = eipconfig.EIPConfig() - folder, f = os.path.split(conf.filename) - if not os.path.isdir(folder): - mkdir_p(folder) - with open(conf.filename, 'w') as fd: - fd.write(json.dumps(EIP_SAMPLE_CONFIG)) - - def get_expected_openvpn_args(self, with_openvpn_ciphers=False): - """ - yeah, this is almost as duplicating the - code for building the command - """ - args = [] - eipconf = eipconfig.EIPConfig(domain=self.provider) - eipconf.load() - eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) - eipsconf.load() - - username = self.get_username() - groupname = self.get_groupname() - - args.append('--client') - args.append('--dev') - #does this have to be tap for win?? - args.append('tun') - args.append('--persist-tun') - args.append('--persist-key') - args.append('--remote') - - args.append('%s' % eipconfig.get_eip_gateway( - eipconfig=eipconf, - eipserviceconfig=eipsconf)) - # XXX get port!? - args.append('1194') - # XXX get proto - args.append('udp') - args.append('--tls-client') - args.append('--remote-cert-tls') - args.append('server') - - if with_openvpn_ciphers: - CIPHERS = [ - "--tls-cipher", "DHE-RSA-AES128-SHA", - "--cipher", "AES-128-CBC", - "--auth", "SHA1"] - for opt in CIPHERS: - args.append(opt) - - args.append('--user') - args.append(username) - args.append('--group') - args.append(groupname) - args.append('--management-client-user') - args.append(username) - args.append('--management-signal') - - args.append('--management') - #XXX hey! - #get platform switches here! - args.append('/tmp/test.socket') - args.append('unix') - - args.append('--script-security') - args.append('2') - - if _system == "Linux": - UPDOWN_SCRIPT = "/etc/leap/resolv-update" - if os.path.isfile(UPDOWN_SCRIPT): - args.append('--up') - args.append('/etc/leap/resolv-update') - args.append('--down') - args.append('/etc/leap/resolv-update') - args.append('--plugin') - args.append('/usr/lib/openvpn/openvpn-down-root.so') - args.append("'script_type=down /etc/leap/resolv-update'") - - # certs - # XXX get values from specs? - args.append('--cert') - args.append(os.path.join( - self.home, - '.config', 'leap', 'providers', - '%s' % self.provider, - 'keys', 'client', - 'openvpn.pem')) - args.append('--key') - args.append(os.path.join( - self.home, - '.config', 'leap', 'providers', - '%s' % self.provider, - 'keys', 'client', - 'openvpn.pem')) - args.append('--ca') - args.append(os.path.join( - self.home, - '.config', 'leap', 'providers', - '%s' % self.provider, - 'keys', 'ca', - 'cacert.pem')) - return args - - # build command string - # these tests are going to have to check - # many combinations. we should inject some - # params in the function call, to disable - # some checks. - - def test_get_eip_gateway(self): - self.write_sample_eipconfig() - eipconf = eipconfig.EIPConfig(domain=self.provider) - - # default eipservice - self.write_sample_eipservice() - eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) - - gateway = eipconfig.get_eip_gateway( - eipconfig=eipconf, - eipserviceconfig=eipsconf) - - # in spec is local gateway by default - self.assertEqual(gateway, '127.0.0.1') - - # change eipservice - # right now we only check that cluster == selected primary gw in - # eip.json, and pick first matching ip - eipconf._config.config['primary_gateway'] = "foo_provider" - newgateways = [{"cluster": "foo_provider", - "ip_address": "127.0.0.99"}] - self.write_sample_eipservice(gateways=newgateways) - eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) - # load from disk file - eipsconf.load() - - gateway = eipconfig.get_eip_gateway( - eipconfig=eipconf, - eipserviceconfig=eipsconf) - self.assertEqual(gateway, '127.0.0.99') - - # change eipservice, several gateways - # right now we only check that cluster == selected primary gw in - # eip.json, and pick first matching ip - eipconf._config.config['primary_gateway'] = "bar_provider" - newgateways = [{"cluster": "foo_provider", - "ip_address": "127.0.0.99"}, - {'cluster': "bar_provider", - "ip_address": "127.0.0.88"}] - self.write_sample_eipservice(gateways=newgateways) - eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) - # load from disk file - eipsconf.load() - - gateway = eipconfig.get_eip_gateway( - eipconfig=eipconf, - eipserviceconfig=eipsconf) - self.assertEqual(gateway, '127.0.0.88') - - def test_build_ovpn_command_empty_config(self): - self.touch_exec() - self.write_sample_eipservice() - self.write_sample_eipconfig() - - from leap.eip import config as eipconfig - from leap.util.fileutil import which - path = os.environ['PATH'] - vpnbin = which('openvpn', path=path) - #print 'path =', path - #print 'vpnbin = ', vpnbin - vpncommand, vpnargs = eipconfig.build_ovpn_command( - do_pkexec_check=False, vpnbin=vpnbin, - socket_path="/tmp/test.socket", - provider=self.provider) - self.assertEqual(vpncommand, self.home + '/bin/openvpn') - self.assertEqual(vpnargs, self.get_expected_openvpn_args()) - - def test_build_ovpn_command_openvpnoptions(self): - self.touch_exec() - - from leap.eip import config as eipconfig - from leap.util.fileutil import which - path = os.environ['PATH'] - vpnbin = which('openvpn', path=path) - - self.write_sample_eipconfig() - - # regular run, everything normal - self.write_sample_eipservice(vpnciphers=True) - vpncommand, vpnargs = eipconfig.build_ovpn_command( - do_pkexec_check=False, vpnbin=vpnbin, - socket_path="/tmp/test.socket", - provider=self.provider) - self.assertEqual(vpncommand, self.home + '/bin/openvpn') - expected = self.get_expected_openvpn_args( - with_openvpn_ciphers=True) - self.assertEqual(vpnargs, expected) - - # bad options -- illegal options - self.write_sample_eipservice( - vpnciphers=True, - # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher - extra_vpnopts={"notallowedconfig": "badvalue"}) - vpncommand, vpnargs = eipconfig.build_ovpn_command( - do_pkexec_check=False, vpnbin=vpnbin, - socket_path="/tmp/test.socket", - provider=self.provider) - self.assertEqual(vpncommand, self.home + '/bin/openvpn') - expected = self.get_expected_openvpn_args( - with_openvpn_ciphers=True) - self.assertEqual(vpnargs, expected) - - # bad options -- illegal chars - self.write_sample_eipservice( - vpnciphers=True, - # WE ONLY ALLOW A-Z09\- - extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) - vpncommand, vpnargs = eipconfig.build_ovpn_command( - do_pkexec_check=False, vpnbin=vpnbin, - socket_path="/tmp/test.socket", - provider=self.provider) - self.assertEqual(vpncommand, self.home + '/bin/openvpn') - expected = self.get_expected_openvpn_args( - with_openvpn_ciphers=True) - self.assertEqual(vpnargs, expected) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py deleted file mode 100644 index 163f8d45..00000000 --- a/src/leap/eip/tests/test_eipconnection.py +++ /dev/null @@ -1,216 +0,0 @@ -import glob -import logging -import platform -#import os -import shutil - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from mock import Mock, patch # MagicMock - -from leap.eip.eipconnection import EIPConnection -from leap.eip.exceptions import ConnectionRefusedError -from leap.eip import specs as eipspecs -from leap.testing.basetest import BaseLeapTest - -_system = platform.system() - -PROVIDER = "testprovider.example.org" - - -class NotImplementedError(Exception): - pass - - -@patch('OpenVPNConnection._get_or_create_config') -@patch('OpenVPNConnection._set_ovpn_command') -class MockedEIPConnection(EIPConnection): - - def _set_ovpn_command(self): - self.command = "mock_command" - self.args = [1, 2, 3] - - -class EIPConductorTest(BaseLeapTest): - - __name__ = "eip_conductor_tests" - provider = PROVIDER - - def setUp(self): - # XXX there's a conceptual/design - # mistake here. - # If we're testing just attrs after init, - # init shold not be doing so much side effects. - - # for instance: - # We have to TOUCH a keys file because - # we're triggerig the key checks FROM - # the constructor. me not like that, - # key checker should better be called explicitelly. - - # XXX change to keys_checker invocation - # (see config_checker) - - keyfiles = (eipspecs.provider_ca_path(domain=self.provider), - eipspecs.client_cert_path(domain=self.provider)) - for filepath in keyfiles: - self.touch(filepath) - self.chmod600(filepath) - - # we init the manager with only - # some methods mocked - self.manager = Mock(name="openvpnmanager_mock") - self.con = MockedEIPConnection() - self.con.provider = self.provider - - # XXX watch out. This sometimes is throwing the following error: - # NoSuchProcess: process no longer exists (pid=6571) - # because of a bad implementation of _check_if_running_instance - - self.con.run_openvpn_checks() - - def tearDown(self): - pass - - def doCleanups(self): - super(BaseLeapTest, self).doCleanups() - self.cleanupSocketDir() - del self.con - - def cleanupSocketDir(self): - ptt = ('/tmp/leap-tmp*') - for tmpdir in glob.glob(ptt): - shutil.rmtree(tmpdir) - - # - # tests - # - - def test_vpnconnection_defaults(self): - """ - default attrs as expected - """ - con = self.con - self.assertEqual(con.autostart, True) - # XXX moar! - - def test_ovpn_command(self): - """ - set_ovpn_command called - """ - self.assertEqual(self.con.command, - "mock_command") - self.assertEqual(self.con.args, - [1, 2, 3]) - - # config checks - - def test_config_checked_called(self): - # XXX this single test is taking half of the time - # needed to run tests. (roughly 3 secs for this only) - # We should modularize and inject Mocks on more places. - - oldcon = self.con - del(self.con) - config_checker = Mock() - self.con = MockedEIPConnection(config_checker=config_checker) - self.assertTrue(config_checker.called) - self.con.run_checks() - self.con.config_checker.run_all.assert_called_with( - skip_download=False) - - # XXX test for cert_checker also - self.con = oldcon - - # connect/disconnect calls - - def test_disconnect(self): - """ - disconnect method calls private and changes status - """ - self.con._disconnect = Mock( - name="_disconnect") - - # first we set status to connected - self.con.status.set_current(self.con.status.CONNECTED) - self.assertEqual(self.con.status.current, - self.con.status.CONNECTED) - - # disconnect - self.con.terminate_openvpn_connection = Mock() - self.con.disconnect() - self.con.terminate_openvpn_connection.assert_called_once_with( - shutdown=False) - self.con.terminate_openvpn_connection = Mock() - self.con.disconnect(shutdown=True) - self.con.terminate_openvpn_connection.assert_called_once_with( - shutdown=True) - - # new status should be disconnected - # XXX this should evolve and check no errors - # during disconnection - self.assertEqual(self.con.status.current, - self.con.status.DISCONNECTED) - - def test_connect(self): - """ - connect calls _launch_openvpn private - """ - self.con._launch_openvpn = Mock() - self.con.connect() - self.con._launch_openvpn.assert_called_once_with() - - # XXX tests breaking here ... - - def test_good_poll_connection_state(self): - """ - """ - #@patch -- - # self.manager.get_connection_state - - #XXX review this set of poll_state tests - #they SHOULD NOT NEED TO MOCK ANYTHING IN THE - #lower layers!! -- status, vpn_manager.. - #right now we're testing implementation, not - #behavior!!! - good_state = ["1345466946", "unknown_state", "ok", - "192.168.1.1", "192.168.1.100"] - self.con.get_connection_state = Mock(return_value=good_state) - self.con.status.set_vpn_state = Mock() - - state = self.con.poll_connection_state() - good_state[1] = "disconnected" - final_state = tuple(good_state) - self.con.status.set_vpn_state.assert_called_with("unknown_state") - self.assertEqual(state, final_state) - - # TODO between "good" and "bad" (exception raised) cases, - # we can still test for malformed states and see that only good - # states do have a change (and from only the expected transition - # states). - - def test_bad_poll_connection_state(self): - """ - get connection state raises ConnectionRefusedError - state is None - """ - self.con.get_connection_state = Mock( - side_effect=ConnectionRefusedError('foo!')) - state = self.con.poll_connection_state() - self.assertEqual(state, None) - - - # XXX more things to test: - # - called config routines during initz. - # - raising proper exceptions with no config - # - called proper checks on config / permissions - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py deleted file mode 100644 index 95bfb2f0..00000000 --- a/src/leap/eip/tests/test_openvpnconnection.py +++ /dev/null @@ -1,161 +0,0 @@ -import logging -import os -import platform -import psutil -import shutil -#import socket - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from mock import Mock, patch # MagicMock - -from leap.eip import config as eipconfig -from leap.eip import openvpnconnection -from leap.eip import exceptions as eipexceptions -from leap.eip.udstelnet import UDSTelnet -from leap.testing.basetest import BaseLeapTest - -_system = platform.system() - - -class NotImplementedError(Exception): - pass - - -mock_UDSTelnet = Mock(spec=UDSTelnet) -# XXX cautious!!! -# this might be fragile right now (counting a global -# reference of calls I think. -# investigate this other form instead: -# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop - -# XXX redo after merge-refactor - - -@patch('openvpnconnection.OpenVPNConnection.connect_to_management') -class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): - def __init__(self, *args, **kwargs): - self.mock_UDSTelnet = Mock() - super(MockedOpenVPNConnection, self).__init__( - *args, **kwargs) - self.tn = self.mock_UDSTelnet(self.host, self.port) - - def connect_to_management(self): - #print 'patched connect' - self.tn = mock_UDSTelnet(self.host, port=self.port) - - -class OpenVPNConnectionTest(BaseLeapTest): - - __name__ = "vpnconnection_tests" - - def setUp(self): - # XXX this will have to change for win, host=localhost - host = eipconfig.get_socket_path() - self.host = host - self.manager = MockedOpenVPNConnection(host=host) - - def tearDown(self): - pass - - def doCleanups(self): - super(BaseLeapTest, self).doCleanups() - self.cleanupSocketDir() - - def cleanupSocketDir(self): - # remove the socket folder. - # XXX only if posix. in win, host is localhost, so nothing - # has to be done. - if self.host: - folder, fpath = os.path.split(self.host) - try: - assert folder.startswith('/tmp/leap-tmp') # safety check - shutil.rmtree(folder) - except: - self.fail("could not remove temp file") - - del self.manager - - # - # tests - # - - def test_detect_vpn(self): - # XXX review, not sure if captured all the logic - # while fixing. kali. - openvpn_connection = openvpnconnection.OpenVPNConnection() - - with patch.object(psutil, "process_iter") as mocked_psutil: - mocked_process = Mock() - mocked_process.name = "openvpn" - mocked_process.cmdline = ["openvpn", "-foo", "-bar", "-gaaz"] - mocked_psutil.return_value = [mocked_process] - with self.assertRaises(eipexceptions.OpenVPNAlreadyRunning): - openvpn_connection._check_if_running_instance() - - openvpn_connection._check_if_running_instance() - - @unittest.skipIf(_system == "Windows", "lin/mac only") - def test_lin_mac_default_init(self): - """ - check default host for management iface - """ - self.assertTrue(self.manager.host.startswith('/tmp/leap-tmp')) - self.assertEqual(self.manager.port, 'unix') - - @unittest.skipUnless(_system == "Windows", "win only") - def test_win_default_init(self): - """ - check default host for management iface - """ - # XXX should we make the platform specific switch - # here or in the vpn command string building? - self.assertEqual(self.manager.host, 'localhost') - self.assertEqual(self.manager.port, 7777) - - def test_port_types_init(self): - oldmanager = self.manager - self.manager = MockedOpenVPNConnection(port="42") - self.assertEqual(self.manager.port, 42) - self.manager = MockedOpenVPNConnection() - self.assertEqual(self.manager.port, "unix") - self.manager = MockedOpenVPNConnection(port="bad") - self.assertEqual(self.manager.port, None) - self.manager = oldmanager - - def test_uds_telnet_called_on_connect(self): - self.manager.connect_to_management() - mock_UDSTelnet.assert_called_with( - self.manager.host, - port=self.manager.port) - - @unittest.skip - def test_connect(self): - raise NotImplementedError - # XXX calls close - # calls UDSTelnet mock. - - # XXX - # tests to write: - # UDSTelnetTest (for real?) - # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. - # very illustrative instead... - - # - raise MissingSocket - # - raise ConnectionRefusedError - # - test send command - # - tries connect - # - ... tries? - # - ... calls _seek_to_eof - # - ... read_until --> return value - # - ... - - -if __name__ == "__main__": - unittest.main() |