diff options
Diffstat (limited to 'src/leap/eip/tests')
| -rw-r--r-- | src/leap/eip/tests/data.py | 33 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_checks.py | 6 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_config.py | 155 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_eipconnection.py | 31 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_openvpnconnection.py | 21 | 
5 files changed, 211 insertions, 35 deletions
| diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py index cadf720e..a7fe1853 100644 --- a/src/leap/eip/tests/data.py +++ b/src/leap/eip/tests/data.py @@ -23,26 +23,29 @@ EIP_SAMPLE_CONFIG = {          "keys/client/openvpn.pem" % PROVIDER),      "connect_on_login": True,      "block_cleartext_traffic": True, -    "primary_gateway": "turkey", -    "secondary_gateway": "france", +    "primary_gateway": "location_unknown", +    "secondary_gateway": "location_unknown2",      #"management_password": "oph7Que1othahwiech6J"  }  EIP_SAMPLE_SERVICE = {      "serial": 1, -    "version": "0.1.0", -    "capabilities": { -        "transport": ["openvpn"], -        "ports": ["80", "53"], -        "protocols": ["udp", "tcp"], -        "static_ips": True, -        "adblock": True -    }, +    "version": 1, +    "clusters": [ +        {"label": { +            "en": "Location Unknown"}, +            "name": "location_unknown"} +    ],      "gateways": [ -    {"country_code": "tr", -     "name": "turkey", -     "label": {"en":"Ankara, Turkey"}, -     "capabilities": {}, -     "hosts": ["192.0.43.10"]} +        {"capabilities": { +            "adblock": True, +            "filter_dns": True, +            "ports": ["80", "53", "443", "1194"], +            "protocols": ["udp", "tcp"], +            "transport": ["openvpn"], +            "user_ips": False}, +         "cluster": "location_unknown", +         "host": "location.example.org", +         "ip_address": "192.0.43.10"}      ]  } diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py index 1d7bfc17..ab11037a 100644 --- a/src/leap/eip/tests/test_checks.py +++ b/src/leap/eip/tests/test_checks.py @@ -25,6 +25,7 @@ from leap.eip.tests import data as testdata  from leap.testing.basetest import BaseLeapTest  from leap.testing.https_server import BaseHTTPSServerTestCase  from leap.testing.https_server import where as where_cert +from leap.util.fileutil import mkdir_f  class NoLogRequestHandler: @@ -118,6 +119,7 @@ class EIPCheckTest(BaseLeapTest):          sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)          sampleconfig['provider'] = None          eipcfg_path = checker.eipconfig.filename +        mkdir_f(eipcfg_path)          with open(eipcfg_path, 'w') as fp:              json.dump(sampleconfig, fp)          #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider): @@ -138,6 +140,8 @@ class EIPCheckTest(BaseLeapTest):      def test_fetch_definition(self):          with patch.object(requests, "get") as mocked_get:              mocked_get.return_value.status_code = 200 +            mocked_get.return_value.headers = { +                'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}              mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION              checker = eipchecks.EIPConfigChecker(fetcher=requests)              sampleconfig = testdata.EIP_SAMPLE_CONFIG @@ -156,6 +160,8 @@ class EIPCheckTest(BaseLeapTest):      def test_fetch_eip_service_config(self):          with patch.object(requests, "get") as mocked_get:              mocked_get.return_value.status_code = 200 +            mocked_get.return_value.headers = { +                'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}              mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE              checker = eipchecks.EIPConfigChecker(fetcher=requests)              sampleconfig = testdata.EIP_SAMPLE_CONFIG diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py index 50538240..5977ef3c 100644 --- a/src/leap/eip/tests/test_config.py +++ b/src/leap/eip/tests/test_config.py @@ -1,3 +1,4 @@ +from collections import OrderedDict  import json  import os  import platform @@ -10,11 +11,11 @@ except ImportError:  #from leap.base import constants  #from leap.eip import config as eip_config -from leap import __branding as BRANDING +#from leap import __branding as BRANDING  from leap.eip import config as eipconfig  from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE  from leap.testing.basetest import BaseLeapTest -from leap.util.fileutil import mkdir_p +from leap.util.fileutil import mkdir_p, mkdir_f  _system = platform.system() @@ -47,11 +48,22 @@ class EIPConfigTest(BaseLeapTest):          open(tfile, 'wb').close()          os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) -    def write_sample_eipservice(self): +    def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None, +                                gateways=None):          conf = eipconfig.EIPServiceConfig() -        folder, f = os.path.split(conf.filename) -        if not os.path.isdir(folder): -            mkdir_p(folder) +        mkdir_f(conf.filename) +        if gateways: +            EIP_SAMPLE_SERVICE['gateways'] = gateways +        if vpnciphers: +            openvpnconfig = OrderedDict({ +                "auth": "SHA1", +                "cipher": "AES-128-CBC", +                "tls-cipher": "DHE-RSA-AES128-SHA"}) +            if extra_vpnopts: +                for k, v in extra_vpnopts.items(): +                    openvpnconfig[k] = v +            EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig +          with open(conf.filename, 'w') as fd:              fd.write(json.dumps(EIP_SAMPLE_SERVICE)) @@ -63,8 +75,17 @@ class EIPConfigTest(BaseLeapTest):          with open(conf.filename, 'w') as fd:              fd.write(json.dumps(EIP_SAMPLE_CONFIG)) -    def get_expected_openvpn_args(self): +    def get_expected_openvpn_args(self, with_openvpn_ciphers=False): +        """ +        yeah, this is almost as duplicating the +        code for building the command +        """          args = [] +        eipconf = eipconfig.EIPConfig(domain=self.provider) +        eipconf.load() +        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) +        eipsconf.load() +          username = self.get_username()          groupname = self.get_groupname() @@ -75,8 +96,10 @@ class EIPConfigTest(BaseLeapTest):          args.append('--persist-tun')          args.append('--persist-key')          args.append('--remote') +          args.append('%s' % eipconfig.get_eip_gateway( -            provider=self.provider)) +            eipconfig=eipconf, +            eipserviceconfig=eipsconf))          # XXX get port!?          args.append('1194')          # XXX get proto @@ -85,6 +108,14 @@ class EIPConfigTest(BaseLeapTest):          args.append('--remote-cert-tls')          args.append('server') +        if with_openvpn_ciphers: +            CIPHERS = [ +                "--tls-cipher", "DHE-RSA-AES128-SHA", +                "--cipher", "AES-128-CBC", +                "--auth", "SHA1"] +            for opt in CIPHERS: +                args.append(opt) +          args.append('--user')          args.append(username)          args.append('--group') @@ -130,6 +161,55 @@ class EIPConfigTest(BaseLeapTest):      # params in the function call, to disable      # some checks. +    def test_get_eip_gateway(self): +        self.write_sample_eipconfig() +        eipconf = eipconfig.EIPConfig(domain=self.provider) + +        # default eipservice +        self.write_sample_eipservice() +        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) + +        gateway = eipconfig.get_eip_gateway( +            eipconfig=eipconf, +            eipserviceconfig=eipsconf) + +        # in spec is local gateway by default +        self.assertEqual(gateway, '127.0.0.1') + +        # change eipservice +        # right now we only check that cluster == selected primary gw in +        # eip.json, and pick first matching ip +        eipconf._config.config['primary_gateway'] = "foo_provider" +        newgateways = [{"cluster": "foo_provider", +                        "ip_address": "127.0.0.99"}] +        self.write_sample_eipservice(gateways=newgateways) +        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) +        # load from disk file +        eipsconf.load() + +        gateway = eipconfig.get_eip_gateway( +            eipconfig=eipconf, +            eipserviceconfig=eipsconf) +        self.assertEqual(gateway, '127.0.0.99') + +        # change eipservice, several gateways +        # right now we only check that cluster == selected primary gw in +        # eip.json, and pick first matching ip +        eipconf._config.config['primary_gateway'] = "bar_provider" +        newgateways = [{"cluster": "foo_provider", +                        "ip_address": "127.0.0.99"}, +                       {'cluster': "bar_provider", +                        "ip_address": "127.0.0.88"}] +        self.write_sample_eipservice(gateways=newgateways) +        eipsconf = eipconfig.EIPServiceConfig(domain=self.provider) +        # load from disk file +        eipsconf.load() + +        gateway = eipconfig.get_eip_gateway( +            eipconfig=eipconf, +            eipserviceconfig=eipsconf) +        self.assertEqual(gateway, '127.0.0.88') +      def test_build_ovpn_command_empty_config(self):          self.touch_exec()          self.write_sample_eipservice() @@ -139,14 +219,63 @@ class EIPConfigTest(BaseLeapTest):          from leap.util.fileutil import which          path = os.environ['PATH']          vpnbin = which('openvpn', path=path) -        print 'path =', path -        print 'vpnbin = ', vpnbin -        command, args = eipconfig.build_ovpn_command( +        #print 'path =', path +        #print 'vpnbin = ', vpnbin +        vpncommand, vpnargs = eipconfig.build_ovpn_command( +            do_pkexec_check=False, vpnbin=vpnbin, +            socket_path="/tmp/test.socket", +            provider=self.provider) +        self.assertEqual(vpncommand, self.home + '/bin/openvpn') +        self.assertEqual(vpnargs, self.get_expected_openvpn_args()) + +    def test_build_ovpn_command_openvpnoptions(self): +        self.touch_exec() + +        from leap.eip import config as eipconfig +        from leap.util.fileutil import which +        path = os.environ['PATH'] +        vpnbin = which('openvpn', path=path) + +        self.write_sample_eipconfig() + +        # regular run, everything normal +        self.write_sample_eipservice(vpnciphers=True) +        vpncommand, vpnargs = eipconfig.build_ovpn_command( +            do_pkexec_check=False, vpnbin=vpnbin, +            socket_path="/tmp/test.socket", +            provider=self.provider) +        self.assertEqual(vpncommand, self.home + '/bin/openvpn') +        expected = self.get_expected_openvpn_args( +            with_openvpn_ciphers=True) +        self.assertEqual(vpnargs, expected) + +        # bad options -- illegal options +        self.write_sample_eipservice( +            vpnciphers=True, +            # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher +            extra_vpnopts={"notallowedconfig": "badvalue"}) +        vpncommand, vpnargs = eipconfig.build_ovpn_command( +            do_pkexec_check=False, vpnbin=vpnbin, +            socket_path="/tmp/test.socket", +            provider=self.provider) +        self.assertEqual(vpncommand, self.home + '/bin/openvpn') +        expected = self.get_expected_openvpn_args( +            with_openvpn_ciphers=True) +        self.assertEqual(vpnargs, expected) + +        # bad options -- illegal chars +        self.write_sample_eipservice( +            vpnciphers=True, +            # WE ONLY ALLOW A-Z09\- +            extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"}) +        vpncommand, vpnargs = eipconfig.build_ovpn_command(              do_pkexec_check=False, vpnbin=vpnbin,              socket_path="/tmp/test.socket",              provider=self.provider) -        self.assertEqual(command, self.home + '/bin/openvpn') -        self.assertEqual(args, self.get_expected_openvpn_args()) +        self.assertEqual(vpncommand, self.home + '/bin/openvpn') +        expected = self.get_expected_openvpn_args( +            with_openvpn_ciphers=True) +        self.assertEqual(vpnargs, expected)  if __name__ == "__main__": diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py index aefca36f..163f8d45 100644 --- a/src/leap/eip/tests/test_eipconnection.py +++ b/src/leap/eip/tests/test_eipconnection.py @@ -1,6 +1,8 @@ +import glob  import logging  import platform -import os +#import os +import shutil  logging.basicConfig()  logger = logging.getLogger(name=__name__) @@ -66,11 +68,26 @@ class EIPConductorTest(BaseLeapTest):          self.manager = Mock(name="openvpnmanager_mock")          self.con = MockedEIPConnection()          self.con.provider = self.provider + +        # XXX watch out. This sometimes is throwing the following error: +        # NoSuchProcess: process no longer exists (pid=6571) +        # because of a bad implementation of _check_if_running_instance +          self.con.run_openvpn_checks()      def tearDown(self): +        pass + +    def doCleanups(self): +        super(BaseLeapTest, self).doCleanups() +        self.cleanupSocketDir()          del self.con +    def cleanupSocketDir(self): +        ptt = ('/tmp/leap-tmp*') +        for tmpdir in glob.glob(ptt): +            shutil.rmtree(tmpdir) +      #      # tests      # @@ -81,6 +98,7 @@ class EIPConductorTest(BaseLeapTest):          """          con = self.con          self.assertEqual(con.autostart, True) +        # XXX moar!      def test_ovpn_command(self):          """ @@ -98,6 +116,7 @@ class EIPConductorTest(BaseLeapTest):          # needed to run tests. (roughly 3 secs for this only)          # We should modularize and inject Mocks on more places. +        oldcon = self.con          del(self.con)          config_checker = Mock()          self.con = MockedEIPConnection(config_checker=config_checker) @@ -107,6 +126,7 @@ class EIPConductorTest(BaseLeapTest):              skip_download=False)          # XXX test for cert_checker also +        self.con = oldcon      # connect/disconnect calls @@ -123,9 +143,14 @@ class EIPConductorTest(BaseLeapTest):                           self.con.status.CONNECTED)          # disconnect -        self.con.cleanup = Mock() +        self.con.terminate_openvpn_connection = Mock()          self.con.disconnect() -        self.con.cleanup.assert_called_once_with() +        self.con.terminate_openvpn_connection.assert_called_once_with( +            shutdown=False) +        self.con.terminate_openvpn_connection = Mock() +        self.con.disconnect(shutdown=True) +        self.con.terminate_openvpn_connection.assert_called_once_with( +            shutdown=True)          # new status should be disconnected          # XXX this should evolve and check no errors diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py index 0f27facf..f7493567 100644 --- a/src/leap/eip/tests/test_openvpnconnection.py +++ b/src/leap/eip/tests/test_openvpnconnection.py @@ -58,16 +58,27 @@ class OpenVPNConnectionTest(BaseLeapTest):      def setUp(self):          # XXX this will have to change for win, host=localhost          host = eipconfig.get_socket_path() +        self.host = host          self.manager = MockedOpenVPNConnection(host=host)      def tearDown(self): +        pass + +    def doCleanups(self): +        super(BaseLeapTest, self).doCleanups() +        self.cleanupSocketDir() + +    def cleanupSocketDir(self):          # remove the socket folder.          # XXX only if posix. in win, host is localhost, so nothing          # has to be done. -        if self.manager.host: -            folder, fpath = os.path.split(self.manager.host) -            assert folder.startswith('/tmp/leap-tmp')  # safety check -            shutil.rmtree(folder) +        if self.host: +            folder, fpath = os.path.split(self.host) +            try: +                assert folder.startswith('/tmp/leap-tmp')  # safety check +                shutil.rmtree(folder) +            except: +                self.fail("could not remove temp file")          del self.manager @@ -108,12 +119,14 @@ class OpenVPNConnectionTest(BaseLeapTest):          self.assertEqual(self.manager.port, 7777)      def test_port_types_init(self): +        oldmanager = self.manager          self.manager = MockedOpenVPNConnection(port="42")          self.assertEqual(self.manager.port, 42)          self.manager = MockedOpenVPNConnection()          self.assertEqual(self.manager.port, "unix")          self.manager = MockedOpenVPNConnection(port="bad")          self.assertEqual(self.manager.port, None) +        self.manager = oldmanager      def test_uds_telnet_called_on_connect(self):          self.manager.connect_to_management() | 
