diff options
author | kali <kali@leap.se> | 2013-02-15 09:31:51 +0900 |
---|---|---|
committer | kali <kali@leap.se> | 2013-02-15 09:31:51 +0900 |
commit | 9cea9c8a34343f8792d65b96f93ae22bd8685878 (patch) | |
tree | 9f512367b1d47ced5614702a00f3ff0a8fe746d7 /src/leap/eip/tests | |
parent | 7159734ec6c0b76fc7f3737134cd22fdaaaa7d58 (diff) | |
parent | 1032e07a50c8bb265ff9bd31b3bb00e83ddb451e (diff) |
Merge branch 'release/v0.2.0'
Conflicts:
README.txt
Diffstat (limited to 'src/leap/eip/tests')
-rw-r--r-- | src/leap/eip/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/eip/tests/data.py | 51 | ||||
-rw-r--r-- | src/leap/eip/tests/test_checks.py | 372 | ||||
-rw-r--r-- | src/leap/eip/tests/test_config.py | 298 | ||||
-rw-r--r-- | src/leap/eip/tests/test_eipconnection.py | 216 | ||||
-rw-r--r-- | src/leap/eip/tests/test_openvpnconnection.py | 161 |
6 files changed, 1098 insertions, 0 deletions
diff --git a/src/leap/eip/tests/__init__.py b/src/leap/eip/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/eip/tests/__init__.py diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py new file mode 100644 index 00000000..a7fe1853 --- /dev/null +++ b/src/leap/eip/tests/data.py @@ -0,0 +1,51 @@ +from __future__ import unicode_literals +import os + +#from leap import __branding + +# sample data used in tests + +#PROVIDER = __branding.get('provider_domain') +PROVIDER = "testprovider.example.org" + +EIP_SAMPLE_CONFIG = { + "provider": "%s" % PROVIDER, + "transport": "openvpn", + "openvpn_protocol": "tcp", + "openvpn_port": 80, + "openvpn_ca_certificate": os.path.expanduser( + "~/.config/leap/providers/" + "%s/" + "keys/ca/cacert.pem" % PROVIDER), + "openvpn_client_certificate": os.path.expanduser( + "~/.config/leap/providers/" + "%s/" + "keys/client/openvpn.pem" % PROVIDER), + "connect_on_login": True, + "block_cleartext_traffic": True, + "primary_gateway": "location_unknown", + "secondary_gateway": "location_unknown2", + #"management_password": "oph7Que1othahwiech6J" +} + +EIP_SAMPLE_SERVICE = { + "serial": 1, + "version": 1, + "clusters": [ + {"label": { + "en": "Location Unknown"}, + "name": "location_unknown"} + ], + "gateways": [ + {"capabilities": { + "adblock": True, + "filter_dns": True, + "ports": ["80", "53", "443", "1194"], + "protocols": ["udp", "tcp"], + "transport": ["openvpn"], + "user_ips": False}, + "cluster": "location_unknown", + "host": "location.example.org", + "ip_address": "192.0.43.10"} + ] +} diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py new file mode 100644 index 00000000..f42a0eeb --- /dev/null +++ b/src/leap/eip/tests/test_checks.py @@ -0,0 +1,372 @@ +from BaseHTTPServer import BaseHTTPRequestHandler +import copy +import json +try: + import unittest2 as unittest +except ImportError: + import unittest +import os +import time +import urlparse + +from mock import (patch, Mock) + +import requests + +from leap.base import config as baseconfig +from leap.base import pluggableconfig +from leap.base.constants import (DEFAULT_PROVIDER_DEFINITION, + DEFINITION_EXPECTED_PATH) +from leap.eip import checks as eipchecks +from leap.eip import specs as eipspecs +from leap.eip import exceptions as eipexceptions +from leap.eip.tests import data as testdata +from leap.testing.basetest import BaseLeapTest +from leap.testing.https_server import BaseHTTPSServerTestCase +from leap.testing.https_server import where as where_cert +from leap.util.fileutil import mkdir_f + + +class NoLogRequestHandler: + def log_message(self, *args): + # don't write log msg to stderr + pass + + def read(self, n=None): + return '' + + +class EIPCheckTest(BaseLeapTest): + + __name__ = "eip_check_tests" + provider = "testprovider.example.org" + maxDiff = None + + def setUp(self): + pass + + def tearDown(self): + pass + + # test methods are there, and can be called from run_all + + def test_checker_should_implement_check_methods(self): + checker = eipchecks.EIPConfigChecker(domain=self.provider) + + self.assertTrue(hasattr(checker, "check_default_eipconfig"), + "missing meth") + self.assertTrue(hasattr(checker, "check_is_there_default_provider"), + "missing meth") + self.assertTrue(hasattr(checker, "fetch_definition"), "missing meth") + self.assertTrue(hasattr(checker, "fetch_eip_service_config"), + "missing meth") + self.assertTrue(hasattr(checker, "check_complete_eip_config"), + "missing meth") + + def test_checker_should_actually_call_all_tests(self): + checker = eipchecks.EIPConfigChecker(domain=self.provider) + + mc = Mock() + checker.run_all(checker=mc) + self.assertTrue(mc.check_default_eipconfig.called, "not called") + self.assertTrue(mc.check_is_there_default_provider.called, + "not called") + self.assertTrue(mc.fetch_definition.called, + "not called") + self.assertTrue(mc.fetch_eip_service_config.called, + "not called") + self.assertTrue(mc.check_complete_eip_config.called, + "not called") + + # test individual check methods + + def test_check_default_eipconfig(self): + checker = eipchecks.EIPConfigChecker(domain=self.provider) + # no eip config (empty home) + eipconfig_path = checker.eipconfig.filename + self.assertFalse(os.path.isfile(eipconfig_path)) + checker.check_default_eipconfig() + # we've written one, so it should be there. + self.assertTrue(os.path.isfile(eipconfig_path)) + with open(eipconfig_path, 'rb') as fp: + deserialized = json.load(fp) + + # force re-evaluation of the paths + # small workaround for evaluating home dirs correctly + EIP_SAMPLE_CONFIG = copy.copy(testdata.EIP_SAMPLE_CONFIG) + EIP_SAMPLE_CONFIG['openvpn_client_certificate'] = \ + eipspecs.client_cert_path(self.provider) + EIP_SAMPLE_CONFIG['openvpn_ca_certificate'] = \ + eipspecs.provider_ca_path(self.provider) + self.assertEqual(deserialized, EIP_SAMPLE_CONFIG) + + # TODO: shold ALSO run validation methods. + + def test_check_is_there_default_provider(self): + checker = eipchecks.EIPConfigChecker(domain=self.provider) + # we do dump a sample eip config, but lacking a + # default provider entry. + # This error will be possible catched in a different + # place, when JSONConfig does validation of required fields. + + # passing direct config + with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): + checker.check_is_there_default_provider(config={}) + + # ok. now, messing with real files... + # blank out default_provider + sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) + sampleconfig['provider'] = None + eipcfg_path = checker.eipconfig.filename + mkdir_f(eipcfg_path) + with open(eipcfg_path, 'w') as fp: + json.dump(sampleconfig, fp) + #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): + # XXX we should catch this as one of our errors, but do not + # see how to do it quickly. + with self.assertRaises(pluggableconfig.ValidationError): + #import ipdb;ipdb.set_trace() + checker.eipconfig.load(fromfile=eipcfg_path) + checker.check_is_there_default_provider() + + sampleconfig = testdata.EIP_SAMPLE_CONFIG + #eipcfg_path = checker._get_default_eipconfig_path() + with open(eipcfg_path, 'w') as fp: + json.dump(sampleconfig, fp) + checker.eipconfig.load() + self.assertTrue(checker.check_is_there_default_provider()) + + def test_fetch_definition(self): + with patch.object(requests, "get") as mocked_get: + mocked_get.return_value.status_code = 200 + mocked_get.return_value.headers = { + 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} + mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION + checker = eipchecks.EIPConfigChecker(fetcher=requests) + sampleconfig = testdata.EIP_SAMPLE_CONFIG + checker.fetch_definition(config=sampleconfig) + + fn = os.path.join(baseconfig.get_default_provider_path(), + DEFINITION_EXPECTED_PATH) + with open(fn, 'r') as fp: + deserialized = json.load(fp) + self.assertEqual(DEFAULT_PROVIDER_DEFINITION, deserialized) + + # XXX TODO check for ConnectionError, HTTPError, InvalidUrl + # (and proper EIPExceptions are raised). + # Look at base.test_config. + + def test_fetch_eip_service_config(self): + with patch.object(requests, "get") as mocked_get: + mocked_get.return_value.status_code = 200 + mocked_get.return_value.headers = { + 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} + mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE + checker = eipchecks.EIPConfigChecker(fetcher=requests) + sampleconfig = testdata.EIP_SAMPLE_CONFIG + checker.fetch_eip_service_config(config=sampleconfig) + + def test_check_complete_eip_config(self): + checker = eipchecks.EIPConfigChecker() + with self.assertRaises(eipexceptions.EIPConfigurationError): + sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) + sampleconfig['provider'] = None + checker.check_complete_eip_config(config=sampleconfig) + with self.assertRaises(eipexceptions.EIPConfigurationError): + sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) + del sampleconfig['provider'] + checker.check_complete_eip_config(config=sampleconfig) + + # normal case + sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) + checker.check_complete_eip_config(config=sampleconfig) + + +class ProviderCertCheckerTest(BaseLeapTest): + + __name__ = "provider_cert_checker_tests" + provider = "testprovider.example.org" + + def setUp(self): + pass + + def tearDown(self): + pass + + # test methods are there, and can be called from run_all + + def test_checker_should_implement_check_methods(self): + checker = eipchecks.ProviderCertChecker() + + # For MVS+ + self.assertTrue(hasattr(checker, "download_ca_cert"), + "missing meth") + self.assertTrue(hasattr(checker, "download_ca_signature"), + "missing meth") + self.assertTrue(hasattr(checker, "get_ca_signatures"), "missing meth") + self.assertTrue(hasattr(checker, "is_there_trust_path"), + "missing meth") + + # For MVS + self.assertTrue(hasattr(checker, "is_there_provider_ca"), + "missing meth") + self.assertTrue(hasattr(checker, "is_https_working"), "missing meth") + self.assertTrue(hasattr(checker, "check_new_cert_needed"), + "missing meth") + + def test_checker_should_actually_call_all_tests(self): + checker = eipchecks.ProviderCertChecker() + + mc = Mock() + checker.run_all(checker=mc) + # XXX MVS+ + #self.assertTrue(mc.download_ca_cert.called, "not called") + #self.assertTrue(mc.download_ca_signature.called, "not called") + #self.assertTrue(mc.get_ca_signatures.called, "not called") + #self.assertTrue(mc.is_there_trust_path.called, "not called") + + # For MVS + self.assertTrue(mc.is_there_provider_ca.called, "not called") + self.assertTrue(mc.is_https_working.called, + "not called") + self.assertTrue(mc.check_new_cert_needed.called, + "not called") + + # test individual check methods + + @unittest.skip + def test_is_there_provider_ca(self): + # XXX commenting out this test. + # With the generic client this does not make sense, + # we should dump one there. + # or test conductor logic. + checker = eipchecks.ProviderCertChecker() + self.assertTrue( + checker.is_there_provider_ca()) + + +class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest): + provider = "testprovider.example.org" + + class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): + responses = { + '/': ['OK', ''], + '/client.cert': [ + # XXX get sample cert + '-----BEGIN CERTIFICATE-----', + '-----END CERTIFICATE-----'], + '/badclient.cert': [ + 'BADCERT']} + + def do_GET(self): + path = urlparse.urlparse(self.path) + message = '\n'.join(self.responses.get( + path.path, None)) + self.send_response(200) + self.end_headers() + self.wfile.write(message) + + def test_is_https_working(self): + fetcher = requests + uri = "https://%s/" % (self.get_server()) + # bare requests call. this should just pass (if there is + # an https service there). + fetcher.get(uri, verify=False) + checker = eipchecks.ProviderCertChecker(fetcher=fetcher) + self.assertTrue(checker.is_https_working(uri=uri, verify=False)) + + # for local debugs, when in doubt + #self.assertTrue(checker.is_https_working(uri="https://github.com", + #verify=True)) + + # for the two checks below, I know they fail because no ca + # cert is passed to them, and I know that's the error that + # requests return with our implementation. + # We're receiving this because our + # server is dying prematurely when the handshake is interrupted on the + # client side. + # Since we have access to the server, we could check that + # the error raised has been: + # SSL23_READ_BYTES: alert bad certificate + with self.assertRaises(requests.exceptions.SSLError) as exc: + fetcher.get(uri, verify=True) + self.assertTrue( + "SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message) + + # XXX FIXME! Uncomment after #638 is done + #with self.assertRaises(eipexceptions.EIPBadCertError) as exc: + #checker.is_https_working(uri=uri, verify=True) + #self.assertTrue( + #"cert verification failed" in exc.message) + + # get cacert from testing.https_server + cacert = where_cert('cacert.pem') + fetcher.get(uri, verify=cacert) + self.assertTrue(checker.is_https_working(uri=uri, verify=cacert)) + + # same, but get cacert from leap.custom + # XXX TODO! + + @unittest.skip + def test_download_new_client_cert(self): + # FIXME + # Magick srp decorator broken right now... + # Have to mock the decorator and inject something that + # can bypass the authentication + + uri = "https://%s/client.cert" % (self.get_server()) + cacert = where_cert('cacert.pem') + checker = eipchecks.ProviderCertChecker(domain=self.provider) + credentials = "testuser", "testpassword" + self.assertTrue(checker.download_new_client_cert( + credentials=credentials, uri=uri, verify=cacert)) + + # now download a malformed cert + uri = "https://%s/badclient.cert" % (self.get_server()) + cacert = where_cert('cacert.pem') + checker = eipchecks.ProviderCertChecker() + with self.assertRaises(ValueError): + self.assertTrue(checker.download_new_client_cert( + credentials=credentials, uri=uri, verify=cacert)) + + # did we write cert to its path? + clientcertfile = eipspecs.client_cert_path() + self.assertTrue(os.path.isfile(clientcertfile)) + certfile = eipspecs.client_cert_path() + with open(certfile, 'r') as cf: + certcontent = cf.read() + self.assertEqual(certcontent, + '\n'.join( + self.request_handler.responses['/client.cert'])) + os.remove(clientcertfile) + + def test_is_cert_valid(self): + checker = eipchecks.ProviderCertChecker() + # TODO: better exception catching + # should raise eipexceptions.BadClientCertificate, and give reasons + # on msg. + with self.assertRaises(Exception) as exc: + self.assertFalse(checker.is_cert_valid()) + exc.message = "missing cert" + + def test_bad_validity_certs(self): + checker = eipchecks.ProviderCertChecker() + certfile = where_cert('leaptestscert.pem') + self.assertFalse(checker.is_cert_not_expired( + certfile=certfile, + now=lambda: time.mktime((2038, 1, 1, 1, 1, 1, 1, 1, 1)))) + self.assertFalse(checker.is_cert_not_expired( + certfile=certfile, + now=lambda: time.mktime((1970, 1, 1, 1, 1, 1, 1, 1, 1)))) + + def test_check_new_cert_needed(self): + # check: missing cert + checker = eipchecks.ProviderCertChecker(domain=self.provider) + self.assertTrue(checker.check_new_cert_needed(skip_download=True)) + # TODO check: malformed cert + # TODO check: expired cert + # TODO check: pass test server uri instead of skip + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py new file mode 100644 index 00000000..72ab3c8e --- /dev/null +++ b/src/leap/eip/tests/test_config.py @@ -0,0 +1,298 @@ +from collections import OrderedDict +import json +import os +import platform +import stat + +try: + import unittest2 as unittest +except ImportError: + import unittest + +#from leap.base import constants +#from leap.eip import config as eip_config +#from leap import __branding as BRANDING +from leap.eip import config as eipconfig +from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE +from leap.testing.basetest import BaseLeapTest +from leap.util.fileutil import mkdir_p, mkdir_f + +_system = platform.system() + +#PROVIDER = BRANDING.get('provider_domain') +#PROVIDER_SHORTNAME = BRANDING.get('short_name') + + +class EIPConfigTest(BaseLeapTest): + + __name__ = "eip_config_tests" + provider = "testprovider.example.org" + + maxDiff = None + + def setUp(self): + pass + + def tearDown(self): + pass + + # + # helpers + # + + def touch_exec(self): + path = os.path.join( + self.tempdir, 'bin') + mkdir_p(path) + tfile = os.path.join( + path, + 'openvpn') + open(tfile, 'wb').close() + os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) + + def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None, + gateways=None): + conf = eipconfig.EIPServiceConfig() + mkdir_f(conf.filename) + if gateways: + EIP_SAMPLE_SERVICE['gateways'] = gateways + if vpnciphers: + openvpnconfig = OrderedDict({ + "auth": "SHA1", + "cipher": "AES-128-CBC", + "tls-cipher": "DHE-RSA-AES128-SHA"}) + if extra_vpnopts: + for k, v in extra_vpnopts.items(): + openvpnconfig[k] = v + EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig + + with open(conf.filename, 'w') as fd: + fd.write(json.dumps(EIP_SAMPLE_SERVICE)) + + def write_sample_eipconfig(self): + conf = eipconfig.EIPConfig() + folder, f = os.path.split(conf.filename) + if not os.path.isdir(folder): + mkdir_p(folder) + with open(conf.filename, 'w') as fd: + fd.write(json.dumps(EIP_SAMPLE_CONFIG)) + + def get_expected_openvpn_args(self, with_openvpn_ciphers=False): + """ + yeah, this is almost as duplicating the + code for building the command + """ + args = [] + eipconf = eipconfig.EIPConfig(domain=self.provider) + eipconf.load() + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + eipsconf.load() + + username = self.get_username() + groupname = self.get_groupname() + + args.append('--client') + args.append('--dev') + #does this have to be tap for win?? + args.append('tun') + args.append('--persist-tun') + args.append('--persist-key') + args.append('--remote') + + args.append('%s' % eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf)) + # XXX get port!? + args.append('1194') + # XXX get proto + args.append('udp') + args.append('--tls-client') + args.append('--remote-cert-tls') + args.append('server') + + if with_openvpn_ciphers: + CIPHERS = [ + "--tls-cipher", "DHE-RSA-AES128-SHA", + "--cipher", "AES-128-CBC", + "--auth", "SHA1"] + for opt in CIPHERS: + args.append(opt) + + args.append('--user') + args.append(username) + args.append('--group') + args.append(groupname) + args.append('--management-client-user') + args.append(username) + args.append('--management-signal') + + args.append('--management') + #XXX hey! + #get platform switches here! + args.append('/tmp/test.socket') + args.append('unix') + + args.append('--script-security') + args.append('2') + + if _system == "Linux": + UPDOWN_SCRIPT = "/etc/leap/resolv-update" + if os.path.isfile(UPDOWN_SCRIPT): + args.append('--up') + args.append('/etc/leap/resolv-update') + args.append('--down') + args.append('/etc/leap/resolv-update') + args.append('--plugin') + args.append('/usr/lib/openvpn/openvpn-down-root.so') + args.append("'script_type=down /etc/leap/resolv-update'") + + # certs + # XXX get values from specs? + args.append('--cert') + args.append(os.path.join( + self.home, + '.config', 'leap', 'providers', + '%s' % self.provider, + 'keys', 'client', + 'openvpn.pem')) + args.append('--key') + args.append(os.path.join( + self.home, + '.config', 'leap', 'providers', + '%s' % self.provider, + 'keys', 'client', + 'openvpn.pem')) + args.append('--ca') + args.append(os.path.join( + self.home, + '.config', 'leap', 'providers', + '%s' % self.provider, + 'keys', 'ca', + 'cacert.pem')) + return args + + # build command string + # these tests are going to have to check + # many combinations. we should inject some + # params in the function call, to disable + # some checks. + + def test_get_eip_gateway(self): + self.write_sample_eipconfig() + eipconf = eipconfig.EIPConfig(domain=self.provider) + + # default eipservice + self.write_sample_eipservice() + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + + gateway = eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf) + + # in spec is local gateway by default + self.assertEqual(gateway, '127.0.0.1') + + # change eipservice + # right now we only check that cluster == selected primary gw in + # eip.json, and pick first matching ip + eipconf._config.config['primary_gateway'] = "foo_provider" + newgateways = [{"cluster": "foo_provider", + "ip_address": "127.0.0.99"}] + self.write_sample_eipservice(gateways=newgateways) + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + # load from disk file + eipsconf.load() + + gateway = eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf) + self.assertEqual(gateway, '127.0.0.99') + + # change eipservice, several gateways + # right now we only check that cluster == selected primary gw in + # eip.json, and pick first matching ip + eipconf._config.config['primary_gateway'] = "bar_provider" + newgateways = [{"cluster": "foo_provider", + "ip_address": "127.0.0.99"}, + {'cluster': "bar_provider", + "ip_address": "127.0.0.88"}] + self.write_sample_eipservice(gateways=newgateways) + eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + # load from disk file + eipsconf.load() + + gateway = eipconfig.get_eip_gateway( + eipconfig=eipconf, + eipserviceconfig=eipsconf) + self.assertEqual(gateway, '127.0.0.88') + + def test_build_ovpn_command_empty_config(self): + self.touch_exec() + self.write_sample_eipservice() + self.write_sample_eipconfig() + + from leap.eip import config as eipconfig + from leap.util.fileutil import which + path = os.environ['PATH'] + vpnbin = which('openvpn', path=path) + #print 'path =', path + #print 'vpnbin = ', vpnbin + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + self.assertEqual(vpnargs, self.get_expected_openvpn_args()) + + def test_build_ovpn_command_openvpnoptions(self): + self.touch_exec() + + from leap.eip import config as eipconfig + from leap.util.fileutil import which + path = os.environ['PATH'] + vpnbin = which('openvpn', path=path) + + self.write_sample_eipconfig() + + # regular run, everything normal + self.write_sample_eipservice(vpnciphers=True) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal options + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher + extra_vpnopts={"notallowedconfig": "badvalue"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + # bad options -- illegal chars + self.write_sample_eipservice( + vpnciphers=True, + # WE ONLY ALLOW A-Z09\- + extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) + vpncommand, vpnargs = eipconfig.build_ovpn_command( + do_pkexec_check=False, vpnbin=vpnbin, + socket_path="/tmp/test.socket", + provider=self.provider) + self.assertEqual(vpncommand, self.home + '/bin/openvpn') + expected = self.get_expected_openvpn_args( + with_openvpn_ciphers=True) + self.assertEqual(vpnargs, expected) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py new file mode 100644 index 00000000..163f8d45 --- /dev/null +++ b/src/leap/eip/tests/test_eipconnection.py @@ -0,0 +1,216 @@ +import glob +import logging +import platform +#import os +import shutil + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip.eipconnection import EIPConnection +from leap.eip.exceptions import ConnectionRefusedError +from leap.eip import specs as eipspecs +from leap.testing.basetest import BaseLeapTest + +_system = platform.system() + +PROVIDER = "testprovider.example.org" + + +class NotImplementedError(Exception): + pass + + +@patch('OpenVPNConnection._get_or_create_config') +@patch('OpenVPNConnection._set_ovpn_command') +class MockedEIPConnection(EIPConnection): + + def _set_ovpn_command(self): + self.command = "mock_command" + self.args = [1, 2, 3] + + +class EIPConductorTest(BaseLeapTest): + + __name__ = "eip_conductor_tests" + provider = PROVIDER + + def setUp(self): + # XXX there's a conceptual/design + # mistake here. + # If we're testing just attrs after init, + # init shold not be doing so much side effects. + + # for instance: + # We have to TOUCH a keys file because + # we're triggerig the key checks FROM + # the constructor. me not like that, + # key checker should better be called explicitelly. + + # XXX change to keys_checker invocation + # (see config_checker) + + keyfiles = (eipspecs.provider_ca_path(domain=self.provider), + eipspecs.client_cert_path(domain=self.provider)) + for filepath in keyfiles: + self.touch(filepath) + self.chmod600(filepath) + + # we init the manager with only + # some methods mocked + self.manager = Mock(name="openvpnmanager_mock") + self.con = MockedEIPConnection() + self.con.provider = self.provider + + # XXX watch out. This sometimes is throwing the following error: + # NoSuchProcess: process no longer exists (pid=6571) + # because of a bad implementation of _check_if_running_instance + + self.con.run_openvpn_checks() + + def tearDown(self): + pass + + def doCleanups(self): + super(BaseLeapTest, self).doCleanups() + self.cleanupSocketDir() + del self.con + + def cleanupSocketDir(self): + ptt = ('/tmp/leap-tmp*') + for tmpdir in glob.glob(ptt): + shutil.rmtree(tmpdir) + + # + # tests + # + + def test_vpnconnection_defaults(self): + """ + default attrs as expected + """ + con = self.con + self.assertEqual(con.autostart, True) + # XXX moar! + + def test_ovpn_command(self): + """ + set_ovpn_command called + """ + self.assertEqual(self.con.command, + "mock_command") + self.assertEqual(self.con.args, + [1, 2, 3]) + + # config checks + + def test_config_checked_called(self): + # XXX this single test is taking half of the time + # needed to run tests. (roughly 3 secs for this only) + # We should modularize and inject Mocks on more places. + + oldcon = self.con + del(self.con) + config_checker = Mock() + self.con = MockedEIPConnection(config_checker=config_checker) + self.assertTrue(config_checker.called) + self.con.run_checks() + self.con.config_checker.run_all.assert_called_with( + skip_download=False) + + # XXX test for cert_checker also + self.con = oldcon + + # connect/disconnect calls + + def test_disconnect(self): + """ + disconnect method calls private and changes status + """ + self.con._disconnect = Mock( + name="_disconnect") + + # first we set status to connected + self.con.status.set_current(self.con.status.CONNECTED) + self.assertEqual(self.con.status.current, + self.con.status.CONNECTED) + + # disconnect + self.con.terminate_openvpn_connection = Mock() + self.con.disconnect() + self.con.terminate_openvpn_connection.assert_called_once_with( + shutdown=False) + self.con.terminate_openvpn_connection = Mock() + self.con.disconnect(shutdown=True) + self.con.terminate_openvpn_connection.assert_called_once_with( + shutdown=True) + + # new status should be disconnected + # XXX this should evolve and check no errors + # during disconnection + self.assertEqual(self.con.status.current, + self.con.status.DISCONNECTED) + + def test_connect(self): + """ + connect calls _launch_openvpn private + """ + self.con._launch_openvpn = Mock() + self.con.connect() + self.con._launch_openvpn.assert_called_once_with() + + # XXX tests breaking here ... + + def test_good_poll_connection_state(self): + """ + """ + #@patch -- + # self.manager.get_connection_state + + #XXX review this set of poll_state tests + #they SHOULD NOT NEED TO MOCK ANYTHING IN THE + #lower layers!! -- status, vpn_manager.. + #right now we're testing implementation, not + #behavior!!! + good_state = ["1345466946", "unknown_state", "ok", + "192.168.1.1", "192.168.1.100"] + self.con.get_connection_state = Mock(return_value=good_state) + self.con.status.set_vpn_state = Mock() + + state = self.con.poll_connection_state() + good_state[1] = "disconnected" + final_state = tuple(good_state) + self.con.status.set_vpn_state.assert_called_with("unknown_state") + self.assertEqual(state, final_state) + + # TODO between "good" and "bad" (exception raised) cases, + # we can still test for malformed states and see that only good + # states do have a change (and from only the expected transition + # states). + + def test_bad_poll_connection_state(self): + """ + get connection state raises ConnectionRefusedError + state is None + """ + self.con.get_connection_state = Mock( + side_effect=ConnectionRefusedError('foo!')) + state = self.con.poll_connection_state() + self.assertEqual(state, None) + + + # XXX more things to test: + # - called config routines during initz. + # - raising proper exceptions with no config + # - called proper checks on config / permissions + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py new file mode 100644 index 00000000..95bfb2f0 --- /dev/null +++ b/src/leap/eip/tests/test_openvpnconnection.py @@ -0,0 +1,161 @@ +import logging +import os +import platform +import psutil +import shutil +#import socket + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip import config as eipconfig +from leap.eip import openvpnconnection +from leap.eip import exceptions as eipexceptions +from leap.eip.udstelnet import UDSTelnet +from leap.testing.basetest import BaseLeapTest + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +mock_UDSTelnet = Mock(spec=UDSTelnet) +# XXX cautious!!! +# this might be fragile right now (counting a global +# reference of calls I think. +# investigate this other form instead: +# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop + +# XXX redo after merge-refactor + + +@patch('openvpnconnection.OpenVPNConnection.connect_to_management') +class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): + def __init__(self, *args, **kwargs): + self.mock_UDSTelnet = Mock() + super(MockedOpenVPNConnection, self).__init__( + *args, **kwargs) + self.tn = self.mock_UDSTelnet(self.host, self.port) + + def connect_to_management(self): + #print 'patched connect' + self.tn = mock_UDSTelnet(self.host, port=self.port) + + +class OpenVPNConnectionTest(BaseLeapTest): + + __name__ = "vpnconnection_tests" + + def setUp(self): + # XXX this will have to change for win, host=localhost + host = eipconfig.get_socket_path() + self.host = host + self.manager = MockedOpenVPNConnection(host=host) + + def tearDown(self): + pass + + def doCleanups(self): + super(BaseLeapTest, self).doCleanups() + self.cleanupSocketDir() + + def cleanupSocketDir(self): + # remove the socket folder. + # XXX only if posix. in win, host is localhost, so nothing + # has to be done. + if self.host: + folder, fpath = os.path.split(self.host) + try: + assert folder.startswith('/tmp/leap-tmp') # safety check + shutil.rmtree(folder) + except: + self.fail("could not remove temp file") + + del self.manager + + # + # tests + # + + def test_detect_vpn(self): + # XXX review, not sure if captured all the logic + # while fixing. kali. + openvpn_connection = openvpnconnection.OpenVPNConnection() + + with patch.object(psutil, "process_iter") as mocked_psutil: + mocked_process = Mock() + mocked_process.name = "openvpn" + mocked_process.cmdline = ["openvpn", "-foo", "-bar", "-gaaz"] + mocked_psutil.return_value = [mocked_process] + with self.assertRaises(eipexceptions.OpenVPNAlreadyRunning): + openvpn_connection._check_if_running_instance() + + openvpn_connection._check_if_running_instance() + + @unittest.skipIf(_system == "Windows", "lin/mac only") + def test_lin_mac_default_init(self): + """ + check default host for management iface + """ + self.assertTrue(self.manager.host.startswith('/tmp/leap-tmp')) + self.assertEqual(self.manager.port, 'unix') + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_default_init(self): + """ + check default host for management iface + """ + # XXX should we make the platform specific switch + # here or in the vpn command string building? + self.assertEqual(self.manager.host, 'localhost') + self.assertEqual(self.manager.port, 7777) + + def test_port_types_init(self): + oldmanager = self.manager + self.manager = MockedOpenVPNConnection(port="42") + self.assertEqual(self.manager.port, 42) + self.manager = MockedOpenVPNConnection() + self.assertEqual(self.manager.port, "unix") + self.manager = MockedOpenVPNConnection(port="bad") + self.assertEqual(self.manager.port, None) + self.manager = oldmanager + + def test_uds_telnet_called_on_connect(self): + self.manager.connect_to_management() + mock_UDSTelnet.assert_called_with( + self.manager.host, + port=self.manager.port) + + @unittest.skip + def test_connect(self): + raise NotImplementedError + # XXX calls close + # calls UDSTelnet mock. + + # XXX + # tests to write: + # UDSTelnetTest (for real?) + # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. + # very illustrative instead... + + # - raise MissingSocket + # - raise ConnectionRefusedError + # - test send command + # - tries connect + # - ... tries? + # - ... calls _seek_to_eof + # - ... read_until --> return value + # - ... + + +if __name__ == "__main__": + unittest.main() |