diff options
Diffstat (limited to 'src/leap/eip/tests')
| -rw-r--r-- | src/leap/eip/tests/__init__.py | 0 | ||||
| -rw-r--r-- | src/leap/eip/tests/data.py | 51 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_checks.py | 372 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_config.py | 298 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_eipconnection.py | 216 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_openvpnconnection.py | 161 | 
6 files changed, 0 insertions, 1098 deletions
diff --git a/src/leap/eip/tests/__init__.py b/src/leap/eip/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/src/leap/eip/tests/__init__.py +++ /dev/null diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py deleted file mode 100644 index a7fe1853..00000000 --- a/src/leap/eip/tests/data.py +++ /dev/null @@ -1,51 +0,0 @@ -from __future__ import unicode_literals -import os - -#from leap import __branding - -# sample data used in tests - -#PROVIDER = __branding.get('provider_domain') -PROVIDER = "testprovider.example.org" - -EIP_SAMPLE_CONFIG = { -    "provider": "%s" % PROVIDER, -    "transport": "openvpn", -    "openvpn_protocol": "tcp", -    "openvpn_port": 80, -    "openvpn_ca_certificate": os.path.expanduser( -        "~/.config/leap/providers/" -        "%s/" -        "keys/ca/cacert.pem" % PROVIDER), -    "openvpn_client_certificate": os.path.expanduser( -        "~/.config/leap/providers/" -        "%s/" -        "keys/client/openvpn.pem" % PROVIDER), -    "connect_on_login": True, -    "block_cleartext_traffic": True, -    "primary_gateway": "location_unknown", -    "secondary_gateway": "location_unknown2", -    #"management_password": "oph7Que1othahwiech6J" -} - -EIP_SAMPLE_SERVICE = { -    "serial": 1, -    "version": 1, -    "clusters": [ -        {"label": { -            "en": "Location Unknown"}, -            "name": "location_unknown"} -    ], -    "gateways": [ -        {"capabilities": { -            "adblock": True, -            "filter_dns": True, -            "ports": ["80", "53", "443", "1194"], -            "protocols": ["udp", "tcp"], -            "transport": ["openvpn"], -            "user_ips": False}, -         "cluster": "location_unknown", -         "host": "location.example.org", -         "ip_address": "192.0.43.10"} -    ] -} diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py deleted file mode 100644 index f42a0eeb..00000000 --- a/src/leap/eip/tests/test_checks.py +++ /dev/null @@ -1,372 +0,0 @@ -from BaseHTTPServer import BaseHTTPRequestHandler -import copy -import json -try: -    import unittest2 as unittest -except ImportError: -    import unittest -import os -import time -import urlparse - -from mock import (patch, Mock) - -import requests - -from leap.base import config as baseconfig -from leap.base import pluggableconfig -from leap.base.constants import (DEFAULT_PROVIDER_DEFINITION, -                                 DEFINITION_EXPECTED_PATH) -from leap.eip import checks as eipchecks -from leap.eip import specs as eipspecs -from leap.eip import exceptions as eipexceptions -from leap.eip.tests import data as testdata -from leap.testing.basetest import BaseLeapTest -from leap.testing.https_server import BaseHTTPSServerTestCase -from leap.testing.https_server import where as where_cert -from leap.util.fileutil import mkdir_f - - -class NoLogRequestHandler: -    def log_message(self, *args): -        # don't write log msg to stderr -        pass - -    def read(self, n=None): -        return '' - - -class EIPCheckTest(BaseLeapTest): - -    __name__ = "eip_check_tests" -    provider = "testprovider.example.org" -    maxDiff = None - -    def setUp(self): -        pass - -    def tearDown(self): -        pass - -    # test methods are there, and can be called from run_all - -    def test_checker_should_implement_check_methods(self): -        checker = eipchecks.EIPConfigChecker(domain=self.provider) - -        self.assertTrue(hasattr(checker, "check_default_eipconfig"), -                        "missing meth") -        self.assertTrue(hasattr(checker, "check_is_there_default_provider"), -                        "missing meth") -        self.assertTrue(hasattr(checker, "fetch_definition"), "missing meth") -        self.assertTrue(hasattr(checker, "fetch_eip_service_config"), -                        "missing meth") -        self.assertTrue(hasattr(checker, "check_complete_eip_config"), -                        "missing meth") - -    def test_checker_should_actually_call_all_tests(self): -        checker = eipchecks.EIPConfigChecker(domain=self.provider) - -        mc = Mock() -        checker.run_all(checker=mc) -        self.assertTrue(mc.check_default_eipconfig.called, "not called") -        self.assertTrue(mc.check_is_there_default_provider.called, -                        "not called") -        self.assertTrue(mc.fetch_definition.called, -                        "not called") -        self.assertTrue(mc.fetch_eip_service_config.called, -                        "not called") -        self.assertTrue(mc.check_complete_eip_config.called, -                        "not called") - -    # test individual check methods - -    def test_check_default_eipconfig(self): -        checker = eipchecks.EIPConfigChecker(domain=self.provider) -        # no eip config (empty home) -        eipconfig_path = checker.eipconfig.filename -        self.assertFalse(os.path.isfile(eipconfig_path)) -        checker.check_default_eipconfig() -        # we've written one, so it should be there. -        self.assertTrue(os.path.isfile(eipconfig_path)) -        with open(eipconfig_path, 'rb') as fp: -            deserialized = json.load(fp) - -        # force re-evaluation of the paths -        # small workaround for evaluating home dirs correctly -        EIP_SAMPLE_CONFIG = copy.copy(testdata.EIP_SAMPLE_CONFIG) -        EIP_SAMPLE_CONFIG['openvpn_client_certificate'] = \ -            eipspecs.client_cert_path(self.provider) -        EIP_SAMPLE_CONFIG['openvpn_ca_certificate'] = \ -            eipspecs.provider_ca_path(self.provider) -        self.assertEqual(deserialized, EIP_SAMPLE_CONFIG) - -        # TODO: shold ALSO run validation methods. - -    def test_check_is_there_default_provider(self): -        checker = eipchecks.EIPConfigChecker(domain=self.provider) -        # we do dump a sample eip config, but lacking a -        # default provider entry. -        # This error will be possible catched in a different -        # place, when JSONConfig does validation of required fields. - -        # passing direct config -        with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): -            checker.check_is_there_default_provider(config={}) - -        # ok. now, messing with real files... -        # blank out default_provider -        sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) -        sampleconfig['provider'] = None -        eipcfg_path = checker.eipconfig.filename -        mkdir_f(eipcfg_path) -        with open(eipcfg_path, 'w') as fp: -            json.dump(sampleconfig, fp) -        #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): -        # XXX we should catch this as one of our errors, but do not -        # see how to do it quickly. -        with self.assertRaises(pluggableconfig.ValidationError): -            #import ipdb;ipdb.set_trace() -            checker.eipconfig.load(fromfile=eipcfg_path) -            checker.check_is_there_default_provider() - -        sampleconfig = testdata.EIP_SAMPLE_CONFIG -        #eipcfg_path = checker._get_default_eipconfig_path() -        with open(eipcfg_path, 'w') as fp: -            json.dump(sampleconfig, fp) -        checker.eipconfig.load() -        self.assertTrue(checker.check_is_there_default_provider()) - -    def test_fetch_definition(self): -        with patch.object(requests, "get") as mocked_get: -            mocked_get.return_value.status_code = 200 -            mocked_get.return_value.headers = { -                'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} -            mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION -            checker = eipchecks.EIPConfigChecker(fetcher=requests) -            sampleconfig = testdata.EIP_SAMPLE_CONFIG -            checker.fetch_definition(config=sampleconfig) - -        fn = os.path.join(baseconfig.get_default_provider_path(), -                          DEFINITION_EXPECTED_PATH) -        with open(fn, 'r') as fp: -            deserialized = json.load(fp) -        self.assertEqual(DEFAULT_PROVIDER_DEFINITION, deserialized) - -        # XXX TODO check for ConnectionError, HTTPError, InvalidUrl -        # (and proper EIPExceptions are raised). -        # Look at base.test_config. - -    def test_fetch_eip_service_config(self): -        with patch.object(requests, "get") as mocked_get: -            mocked_get.return_value.status_code = 200 -            mocked_get.return_value.headers = { -                'last-modified': "Wed Dec 12 12:12:12 GMT 2012"} -            mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE -            checker = eipchecks.EIPConfigChecker(fetcher=requests) -            sampleconfig = testdata.EIP_SAMPLE_CONFIG -            checker.fetch_eip_service_config(config=sampleconfig) - -    def test_check_complete_eip_config(self): -        checker = eipchecks.EIPConfigChecker() -        with self.assertRaises(eipexceptions.EIPConfigurationError): -            sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) -            sampleconfig['provider'] = None -            checker.check_complete_eip_config(config=sampleconfig) -        with self.assertRaises(eipexceptions.EIPConfigurationError): -            sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) -            del sampleconfig['provider'] -            checker.check_complete_eip_config(config=sampleconfig) - -        # normal case -        sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG) -        checker.check_complete_eip_config(config=sampleconfig) - - -class ProviderCertCheckerTest(BaseLeapTest): - -    __name__ = "provider_cert_checker_tests" -    provider = "testprovider.example.org" - -    def setUp(self): -        pass - -    def tearDown(self): -        pass - -    # test methods are there, and can be called from run_all - -    def test_checker_should_implement_check_methods(self): -        checker = eipchecks.ProviderCertChecker() - -        # For MVS+ -        self.assertTrue(hasattr(checker, "download_ca_cert"), -                        "missing meth") -        self.assertTrue(hasattr(checker, "download_ca_signature"), -                        "missing meth") -        self.assertTrue(hasattr(checker, "get_ca_signatures"), "missing meth") -        self.assertTrue(hasattr(checker, "is_there_trust_path"), -                        "missing meth") - -        # For MVS -        self.assertTrue(hasattr(checker, "is_there_provider_ca"), -                        "missing meth") -        self.assertTrue(hasattr(checker, "is_https_working"), "missing meth") -        self.assertTrue(hasattr(checker, "check_new_cert_needed"), -                        "missing meth") - -    def test_checker_should_actually_call_all_tests(self): -        checker = eipchecks.ProviderCertChecker() - -        mc = Mock() -        checker.run_all(checker=mc) -        # XXX MVS+ -        #self.assertTrue(mc.download_ca_cert.called, "not called") -        #self.assertTrue(mc.download_ca_signature.called, "not called") -        #self.assertTrue(mc.get_ca_signatures.called, "not called") -        #self.assertTrue(mc.is_there_trust_path.called, "not called") - -        # For MVS -        self.assertTrue(mc.is_there_provider_ca.called, "not called") -        self.assertTrue(mc.is_https_working.called, -                        "not called") -        self.assertTrue(mc.check_new_cert_needed.called, -                        "not called") - -    # test individual check methods - -    @unittest.skip -    def test_is_there_provider_ca(self): -        # XXX commenting out this test. -        # With the generic client this does not make sense, -        # we should dump one there. -        # or test conductor logic. -        checker = eipchecks.ProviderCertChecker() -        self.assertTrue( -            checker.is_there_provider_ca()) - - -class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest): -    provider = "testprovider.example.org" - -    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): -        responses = { -            '/': ['OK', ''], -            '/client.cert': [ -                # XXX get sample cert -                '-----BEGIN CERTIFICATE-----', -                '-----END CERTIFICATE-----'], -            '/badclient.cert': [ -                'BADCERT']} - -        def do_GET(self): -            path = urlparse.urlparse(self.path) -            message = '\n'.join(self.responses.get( -                path.path, None)) -            self.send_response(200) -            self.end_headers() -            self.wfile.write(message) - -    def test_is_https_working(self): -        fetcher = requests -        uri = "https://%s/" % (self.get_server()) -        # bare requests call. this should just pass (if there is -        # an https service there). -        fetcher.get(uri, verify=False) -        checker = eipchecks.ProviderCertChecker(fetcher=fetcher) -        self.assertTrue(checker.is_https_working(uri=uri, verify=False)) - -        # for local debugs, when in doubt -        #self.assertTrue(checker.is_https_working(uri="https://github.com", -                        #verify=True)) - -        # for the two checks below, I know they fail because no ca -        # cert is passed to them, and I know that's the error that -        # requests return with our implementation. -        # We're receiving this because our -        # server is dying prematurely when the handshake is interrupted on the -        # client side. -        # Since we have access to the server, we could check that -        # the error raised has been: -        # SSL23_READ_BYTES: alert bad certificate -        with self.assertRaises(requests.exceptions.SSLError) as exc: -            fetcher.get(uri, verify=True) -            self.assertTrue( -                "SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message) - -        # XXX FIXME! Uncomment after #638 is done -        #with self.assertRaises(eipexceptions.EIPBadCertError) as exc: -            #checker.is_https_working(uri=uri, verify=True) -            #self.assertTrue( -                #"cert verification failed" in exc.message) - -        # get cacert from testing.https_server -        cacert = where_cert('cacert.pem') -        fetcher.get(uri, verify=cacert) -        self.assertTrue(checker.is_https_working(uri=uri, verify=cacert)) - -        # same, but get cacert from leap.custom -        # XXX TODO! - -    @unittest.skip -    def test_download_new_client_cert(self): -        # FIXME -        # Magick srp decorator broken right now... -        # Have to mock the decorator and inject something that -        # can bypass the authentication - -        uri = "https://%s/client.cert" % (self.get_server()) -        cacert = where_cert('cacert.pem') -        checker = eipchecks.ProviderCertChecker(domain=self.provider) -        credentials = "testuser", "testpassword" -        self.assertTrue(checker.download_new_client_cert( -                        credentials=credentials, uri=uri, verify=cacert)) - -        # now download a malformed cert -        uri = "https://%s/badclient.cert" % (self.get_server()) -        cacert = where_cert('cacert.pem') -        checker = eipchecks.ProviderCertChecker() -        with self.assertRaises(ValueError): -            self.assertTrue(checker.download_new_client_cert( -                            credentials=credentials, uri=uri, verify=cacert)) - -        # did we write cert to its path? -        clientcertfile = eipspecs.client_cert_path() -        self.assertTrue(os.path.isfile(clientcertfile)) -        certfile = eipspecs.client_cert_path() -        with open(certfile, 'r') as cf: -            certcontent = cf.read() -        self.assertEqual(certcontent, -                         '\n'.join( -                             self.request_handler.responses['/client.cert'])) -        os.remove(clientcertfile) - -    def test_is_cert_valid(self): -        checker = eipchecks.ProviderCertChecker() -        # TODO: better exception catching -        # should raise eipexceptions.BadClientCertificate, and give reasons -        # on msg. -        with self.assertRaises(Exception) as exc: -            self.assertFalse(checker.is_cert_valid()) -            exc.message = "missing cert" - -    def test_bad_validity_certs(self): -        checker = eipchecks.ProviderCertChecker() -        certfile = where_cert('leaptestscert.pem') -        self.assertFalse(checker.is_cert_not_expired( -            certfile=certfile, -            now=lambda: time.mktime((2038, 1, 1, 1, 1, 1, 1, 1, 1)))) -        self.assertFalse(checker.is_cert_not_expired( -            certfile=certfile, -            now=lambda: time.mktime((1970, 1, 1, 1, 1, 1, 1, 1, 1)))) - -    def test_check_new_cert_needed(self): -        # check: missing cert -        checker = eipchecks.ProviderCertChecker(domain=self.provider) -        self.assertTrue(checker.check_new_cert_needed(skip_download=True)) -        # TODO check: malformed cert -        # TODO check: expired cert -        # TODO check: pass test server uri instead of skip - - -if __name__ == "__main__": -    unittest.main() diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py deleted file mode 100644 index 72ab3c8e..00000000 --- a/src/leap/eip/tests/test_config.py +++ /dev/null @@ -1,298 +0,0 @@ -from collections import OrderedDict -import json -import os -import platform -import stat - -try: -    import unittest2 as unittest -except ImportError: -    import unittest - -#from leap.base import constants -#from leap.eip import config as eip_config -#from leap import __branding as BRANDING -from leap.eip import config as eipconfig -from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE -from leap.testing.basetest import BaseLeapTest -from leap.util.fileutil import mkdir_p, mkdir_f - -_system = platform.system() - -#PROVIDER = BRANDING.get('provider_domain') -#PROVIDER_SHORTNAME = BRANDING.get('short_name') - - -class EIPConfigTest(BaseLeapTest): - -    __name__ = "eip_config_tests" -    provider = "testprovider.example.org" - -    maxDiff = None - -    def setUp(self): -        pass - -    def tearDown(self): -        pass - -    # -    # helpers -    # - -    def touch_exec(self): -        path = os.path.join( -            self.tempdir, 'bin') -        mkdir_p(path) -        tfile = os.path.join( -            path, -            'openvpn') -        open(tfile, 'wb').close() -        os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - -    def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None, -                                gateways=None): -        conf = eipconfig.EIPServiceConfig() -        mkdir_f(conf.filename) -        if gateways: -            EIP_SAMPLE_SERVICE['gateways'] = gateways -        if vpnciphers: -            openvpnconfig = OrderedDict({ -                "auth": "SHA1", -                "cipher": "AES-128-CBC", -                "tls-cipher": "DHE-RSA-AES128-SHA"}) -            if extra_vpnopts: -                for k, v in extra_vpnopts.items(): -                    openvpnconfig[k] = v -            EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig - -        with open(conf.filename, 'w') as fd: -            fd.write(json.dumps(EIP_SAMPLE_SERVICE)) - -    def write_sample_eipconfig(self): -        conf = eipconfig.EIPConfig() -        folder, f = os.path.split(conf.filename) -        if not os.path.isdir(folder): -            mkdir_p(folder) -        with open(conf.filename, 'w') as fd: -            fd.write(json.dumps(EIP_SAMPLE_CONFIG)) - -    def get_expected_openvpn_args(self, with_openvpn_ciphers=False): -        """ -        yeah, this is almost as duplicating the -        code for building the command -        """ -        args = [] -        eipconf = eipconfig.EIPConfig(domain=self.provider) -        eipconf.load() -        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) -        eipsconf.load() - -        username = self.get_username() -        groupname = self.get_groupname() - -        args.append('--client') -        args.append('--dev') -        #does this have to be tap for win?? -        args.append('tun') -        args.append('--persist-tun') -        args.append('--persist-key') -        args.append('--remote') - -        args.append('%s' % eipconfig.get_eip_gateway( -            eipconfig=eipconf, -            eipserviceconfig=eipsconf)) -        # XXX get port!? -        args.append('1194') -        # XXX get proto -        args.append('udp') -        args.append('--tls-client') -        args.append('--remote-cert-tls') -        args.append('server') - -        if with_openvpn_ciphers: -            CIPHERS = [ -                "--tls-cipher", "DHE-RSA-AES128-SHA", -                "--cipher", "AES-128-CBC", -                "--auth", "SHA1"] -            for opt in CIPHERS: -                args.append(opt) - -        args.append('--user') -        args.append(username) -        args.append('--group') -        args.append(groupname) -        args.append('--management-client-user') -        args.append(username) -        args.append('--management-signal') - -        args.append('--management') -        #XXX hey! -        #get platform switches here! -        args.append('/tmp/test.socket') -        args.append('unix') - -        args.append('--script-security') -        args.append('2') - -        if _system == "Linux": -            UPDOWN_SCRIPT = "/etc/leap/resolv-update" -            if os.path.isfile(UPDOWN_SCRIPT): -                args.append('--up') -                args.append('/etc/leap/resolv-update') -                args.append('--down') -                args.append('/etc/leap/resolv-update') -                args.append('--plugin') -                args.append('/usr/lib/openvpn/openvpn-down-root.so') -                args.append("'script_type=down /etc/leap/resolv-update'") - -        # certs -        # XXX get values from specs? -        args.append('--cert') -        args.append(os.path.join( -            self.home, -            '.config', 'leap', 'providers', -            '%s' % self.provider, -            'keys', 'client', -            'openvpn.pem')) -        args.append('--key') -        args.append(os.path.join( -            self.home, -            '.config', 'leap', 'providers', -            '%s' % self.provider, -            'keys', 'client', -            'openvpn.pem')) -        args.append('--ca') -        args.append(os.path.join( -            self.home, -            '.config', 'leap', 'providers', -            '%s' % self.provider, -            'keys', 'ca', -            'cacert.pem')) -        return args - -    # build command string -    # these tests are going to have to check -    # many combinations. we should inject some -    # params in the function call, to disable -    # some checks. - -    def test_get_eip_gateway(self): -        self.write_sample_eipconfig() -        eipconf = eipconfig.EIPConfig(domain=self.provider) - -        # default eipservice -        self.write_sample_eipservice() -        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) - -        gateway = eipconfig.get_eip_gateway( -            eipconfig=eipconf, -            eipserviceconfig=eipsconf) - -        # in spec is local gateway by default -        self.assertEqual(gateway, '127.0.0.1') - -        # change eipservice -        # right now we only check that cluster == selected primary gw in -        # eip.json, and pick first matching ip -        eipconf._config.config['primary_gateway'] = "foo_provider" -        newgateways = [{"cluster": "foo_provider", -                        "ip_address": "127.0.0.99"}] -        self.write_sample_eipservice(gateways=newgateways) -        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) -        # load from disk file -        eipsconf.load() - -        gateway = eipconfig.get_eip_gateway( -            eipconfig=eipconf, -            eipserviceconfig=eipsconf) -        self.assertEqual(gateway, '127.0.0.99') - -        # change eipservice, several gateways -        # right now we only check that cluster == selected primary gw in -        # eip.json, and pick first matching ip -        eipconf._config.config['primary_gateway'] = "bar_provider" -        newgateways = [{"cluster": "foo_provider", -                        "ip_address": "127.0.0.99"}, -                       {'cluster': "bar_provider", -                        "ip_address": "127.0.0.88"}] -        self.write_sample_eipservice(gateways=newgateways) -        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) -        # load from disk file -        eipsconf.load() - -        gateway = eipconfig.get_eip_gateway( -            eipconfig=eipconf, -            eipserviceconfig=eipsconf) -        self.assertEqual(gateway, '127.0.0.88') - -    def test_build_ovpn_command_empty_config(self): -        self.touch_exec() -        self.write_sample_eipservice() -        self.write_sample_eipconfig() - -        from leap.eip import config as eipconfig -        from leap.util.fileutil import which -        path = os.environ['PATH'] -        vpnbin = which('openvpn', path=path) -        #print 'path =', path -        #print 'vpnbin = ', vpnbin -        vpncommand, vpnargs = eipconfig.build_ovpn_command( -            do_pkexec_check=False, vpnbin=vpnbin, -            socket_path="/tmp/test.socket", -            provider=self.provider) -        self.assertEqual(vpncommand, self.home + '/bin/openvpn') -        self.assertEqual(vpnargs, self.get_expected_openvpn_args()) - -    def test_build_ovpn_command_openvpnoptions(self): -        self.touch_exec() - -        from leap.eip import config as eipconfig -        from leap.util.fileutil import which -        path = os.environ['PATH'] -        vpnbin = which('openvpn', path=path) - -        self.write_sample_eipconfig() - -        # regular run, everything normal -        self.write_sample_eipservice(vpnciphers=True) -        vpncommand, vpnargs = eipconfig.build_ovpn_command( -            do_pkexec_check=False, vpnbin=vpnbin, -            socket_path="/tmp/test.socket", -            provider=self.provider) -        self.assertEqual(vpncommand, self.home + '/bin/openvpn') -        expected = self.get_expected_openvpn_args( -            with_openvpn_ciphers=True) -        self.assertEqual(vpnargs, expected) - -        # bad options -- illegal options -        self.write_sample_eipservice( -            vpnciphers=True, -            # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher -            extra_vpnopts={"notallowedconfig": "badvalue"}) -        vpncommand, vpnargs = eipconfig.build_ovpn_command( -            do_pkexec_check=False, vpnbin=vpnbin, -            socket_path="/tmp/test.socket", -            provider=self.provider) -        self.assertEqual(vpncommand, self.home + '/bin/openvpn') -        expected = self.get_expected_openvpn_args( -            with_openvpn_ciphers=True) -        self.assertEqual(vpnargs, expected) - -        # bad options -- illegal chars -        self.write_sample_eipservice( -            vpnciphers=True, -            # WE ONLY ALLOW A-Z09\- -            extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) -        vpncommand, vpnargs = eipconfig.build_ovpn_command( -            do_pkexec_check=False, vpnbin=vpnbin, -            socket_path="/tmp/test.socket", -            provider=self.provider) -        self.assertEqual(vpncommand, self.home + '/bin/openvpn') -        expected = self.get_expected_openvpn_args( -            with_openvpn_ciphers=True) -        self.assertEqual(vpnargs, expected) - - -if __name__ == "__main__": -    unittest.main() diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py deleted file mode 100644 index 163f8d45..00000000 --- a/src/leap/eip/tests/test_eipconnection.py +++ /dev/null @@ -1,216 +0,0 @@ -import glob -import logging -import platform -#import os -import shutil - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: -    import unittest2 as unittest -except ImportError: -    import unittest - -from mock import Mock, patch  # MagicMock - -from leap.eip.eipconnection import EIPConnection -from leap.eip.exceptions import ConnectionRefusedError -from leap.eip import specs as eipspecs -from leap.testing.basetest import BaseLeapTest - -_system = platform.system() - -PROVIDER = "testprovider.example.org" - - -class NotImplementedError(Exception): -    pass - - -@patch('OpenVPNConnection._get_or_create_config') -@patch('OpenVPNConnection._set_ovpn_command') -class MockedEIPConnection(EIPConnection): - -    def _set_ovpn_command(self): -        self.command = "mock_command" -        self.args = [1, 2, 3] - - -class EIPConductorTest(BaseLeapTest): - -    __name__ = "eip_conductor_tests" -    provider = PROVIDER - -    def setUp(self): -        # XXX there's a conceptual/design -        # mistake here. -        # If we're testing just attrs after init, -        # init shold not be doing so much side effects. - -        # for instance: -        # We have to TOUCH a keys file because -        # we're triggerig the key checks FROM -        # the constructor. me not like that, -        # key checker should better be called explicitelly. - -        # XXX change to keys_checker invocation -        # (see config_checker) - -        keyfiles = (eipspecs.provider_ca_path(domain=self.provider), -                    eipspecs.client_cert_path(domain=self.provider)) -        for filepath in keyfiles: -            self.touch(filepath) -            self.chmod600(filepath) - -        # we init the manager with only -        # some methods mocked -        self.manager = Mock(name="openvpnmanager_mock") -        self.con = MockedEIPConnection() -        self.con.provider = self.provider - -        # XXX watch out. This sometimes is throwing the following error: -        # NoSuchProcess: process no longer exists (pid=6571) -        # because of a bad implementation of _check_if_running_instance - -        self.con.run_openvpn_checks() - -    def tearDown(self): -        pass - -    def doCleanups(self): -        super(BaseLeapTest, self).doCleanups() -        self.cleanupSocketDir() -        del self.con - -    def cleanupSocketDir(self): -        ptt = ('/tmp/leap-tmp*') -        for tmpdir in glob.glob(ptt): -            shutil.rmtree(tmpdir) - -    # -    # tests -    # - -    def test_vpnconnection_defaults(self): -        """ -        default attrs as expected -        """ -        con = self.con -        self.assertEqual(con.autostart, True) -        # XXX moar! - -    def test_ovpn_command(self): -        """ -        set_ovpn_command called -        """ -        self.assertEqual(self.con.command, -                         "mock_command") -        self.assertEqual(self.con.args, -                         [1, 2, 3]) - -    # config checks - -    def test_config_checked_called(self): -        # XXX this single test is taking half of the time -        # needed to run tests. (roughly 3 secs for this only) -        # We should modularize and inject Mocks on more places. - -        oldcon = self.con -        del(self.con) -        config_checker = Mock() -        self.con = MockedEIPConnection(config_checker=config_checker) -        self.assertTrue(config_checker.called) -        self.con.run_checks() -        self.con.config_checker.run_all.assert_called_with( -            skip_download=False) - -        # XXX test for cert_checker also -        self.con = oldcon - -    # connect/disconnect calls - -    def test_disconnect(self): -        """ -        disconnect method calls private and changes status -        """ -        self.con._disconnect = Mock( -            name="_disconnect") - -        # first we set status to connected -        self.con.status.set_current(self.con.status.CONNECTED) -        self.assertEqual(self.con.status.current, -                         self.con.status.CONNECTED) - -        # disconnect -        self.con.terminate_openvpn_connection = Mock() -        self.con.disconnect() -        self.con.terminate_openvpn_connection.assert_called_once_with( -            shutdown=False) -        self.con.terminate_openvpn_connection = Mock() -        self.con.disconnect(shutdown=True) -        self.con.terminate_openvpn_connection.assert_called_once_with( -            shutdown=True) - -        # new status should be disconnected -        # XXX this should evolve and check no errors -        # during disconnection -        self.assertEqual(self.con.status.current, -                         self.con.status.DISCONNECTED) - -    def test_connect(self): -        """ -        connect calls _launch_openvpn private -        """ -        self.con._launch_openvpn = Mock() -        self.con.connect() -        self.con._launch_openvpn.assert_called_once_with() - -    # XXX tests breaking here ... - -    def test_good_poll_connection_state(self): -        """ -        """ -        #@patch -- -        # self.manager.get_connection_state - -        #XXX review this set of poll_state tests -        #they SHOULD NOT NEED TO MOCK ANYTHING IN THE -        #lower layers!! -- status, vpn_manager.. -        #right now we're testing implementation, not -        #behavior!!! -        good_state = ["1345466946", "unknown_state", "ok", -                      "192.168.1.1", "192.168.1.100"] -        self.con.get_connection_state = Mock(return_value=good_state) -        self.con.status.set_vpn_state = Mock() - -        state = self.con.poll_connection_state() -        good_state[1] = "disconnected" -        final_state = tuple(good_state) -        self.con.status.set_vpn_state.assert_called_with("unknown_state") -        self.assertEqual(state, final_state) - -    # TODO between "good" and "bad" (exception raised) cases, -    # we can still test for malformed states and see that only good -    # states do have a change (and from only the expected transition -    # states). - -    def test_bad_poll_connection_state(self): -        """ -        get connection state raises ConnectionRefusedError -        state is None -        """ -        self.con.get_connection_state = Mock( -            side_effect=ConnectionRefusedError('foo!')) -        state = self.con.poll_connection_state() -        self.assertEqual(state, None) - - -    # XXX more things to test: -    # - called config routines during initz. -    # - raising proper exceptions with no config -    # - called proper checks on config / permissions - - -if __name__ == "__main__": -    unittest.main() diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py deleted file mode 100644 index 95bfb2f0..00000000 --- a/src/leap/eip/tests/test_openvpnconnection.py +++ /dev/null @@ -1,161 +0,0 @@ -import logging -import os -import platform -import psutil -import shutil -#import socket - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: -    import unittest2 as unittest -except ImportError: -    import unittest - -from mock import Mock, patch  # MagicMock - -from leap.eip import config as eipconfig -from leap.eip import openvpnconnection -from leap.eip import exceptions as eipexceptions -from leap.eip.udstelnet import UDSTelnet -from leap.testing.basetest import BaseLeapTest - -_system = platform.system() - - -class NotImplementedError(Exception): -    pass - - -mock_UDSTelnet = Mock(spec=UDSTelnet) -# XXX cautious!!! -# this might be fragile right now (counting a global -# reference of calls I think. -# investigate this other form instead: -# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop - -# XXX redo after merge-refactor - - -@patch('openvpnconnection.OpenVPNConnection.connect_to_management') -class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): -    def __init__(self, *args, **kwargs): -        self.mock_UDSTelnet = Mock() -        super(MockedOpenVPNConnection, self).__init__( -            *args, **kwargs) -        self.tn = self.mock_UDSTelnet(self.host, self.port) - -    def connect_to_management(self): -        #print 'patched connect' -        self.tn = mock_UDSTelnet(self.host, port=self.port) - - -class OpenVPNConnectionTest(BaseLeapTest): - -    __name__ = "vpnconnection_tests" - -    def setUp(self): -        # XXX this will have to change for win, host=localhost -        host = eipconfig.get_socket_path() -        self.host = host -        self.manager = MockedOpenVPNConnection(host=host) - -    def tearDown(self): -        pass - -    def doCleanups(self): -        super(BaseLeapTest, self).doCleanups() -        self.cleanupSocketDir() - -    def cleanupSocketDir(self): -        # remove the socket folder. -        # XXX only if posix. in win, host is localhost, so nothing -        # has to be done. -        if self.host: -            folder, fpath = os.path.split(self.host) -            try: -                assert folder.startswith('/tmp/leap-tmp')  # safety check -                shutil.rmtree(folder) -            except: -                self.fail("could not remove temp file") - -        del self.manager - -    # -    # tests -    # - -    def test_detect_vpn(self): -        # XXX review, not sure if captured all the logic -        # while fixing. kali. -        openvpn_connection = openvpnconnection.OpenVPNConnection() - -        with patch.object(psutil, "process_iter") as mocked_psutil: -            mocked_process = Mock() -            mocked_process.name = "openvpn" -            mocked_process.cmdline = ["openvpn", "-foo", "-bar", "-gaaz"] -            mocked_psutil.return_value = [mocked_process] -            with self.assertRaises(eipexceptions.OpenVPNAlreadyRunning): -                openvpn_connection._check_if_running_instance() - -        openvpn_connection._check_if_running_instance() - -    @unittest.skipIf(_system == "Windows", "lin/mac only") -    def test_lin_mac_default_init(self): -        """ -        check default host for management iface -        """ -        self.assertTrue(self.manager.host.startswith('/tmp/leap-tmp')) -        self.assertEqual(self.manager.port, 'unix') - -    @unittest.skipUnless(_system == "Windows", "win only") -    def test_win_default_init(self): -        """ -        check default host for management iface -        """ -        # XXX should we make the platform specific switch -        # here or in the vpn command string building? -        self.assertEqual(self.manager.host, 'localhost') -        self.assertEqual(self.manager.port, 7777) - -    def test_port_types_init(self): -        oldmanager = self.manager -        self.manager = MockedOpenVPNConnection(port="42") -        self.assertEqual(self.manager.port, 42) -        self.manager = MockedOpenVPNConnection() -        self.assertEqual(self.manager.port, "unix") -        self.manager = MockedOpenVPNConnection(port="bad") -        self.assertEqual(self.manager.port, None) -        self.manager = oldmanager - -    def test_uds_telnet_called_on_connect(self): -        self.manager.connect_to_management() -        mock_UDSTelnet.assert_called_with( -            self.manager.host, -            port=self.manager.port) - -    @unittest.skip -    def test_connect(self): -        raise NotImplementedError -        # XXX calls close -        # calls UDSTelnet mock. - -    # XXX -    # tests to write: -    # UDSTelnetTest (for real?) -    # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. -    # very illustrative instead... - -    # - raise MissingSocket -    # - raise ConnectionRefusedError -    # - test send command -    #   - tries connect -    #   - ... tries? -    #   - ... calls _seek_to_eof -    #   - ... read_until --> return value -    #   - ... - - -if __name__ == "__main__": -    unittest.main()  | 
