diff options
author | kali <kali@leap.se> | 2013-02-15 09:31:51 +0900 |
---|---|---|
committer | kali <kali@leap.se> | 2013-02-15 09:31:51 +0900 |
commit | 9cea9c8a34343f8792d65b96f93ae22bd8685878 (patch) | |
tree | 9f512367b1d47ced5614702a00f3ff0a8fe746d7 /src/leap/base/tests | |
parent | 7159734ec6c0b76fc7f3737134cd22fdaaaa7d58 (diff) | |
parent | 1032e07a50c8bb265ff9bd31b3bb00e83ddb451e (diff) |
Merge branch 'release/v0.2.0'
Conflicts:
README.txt
Diffstat (limited to 'src/leap/base/tests')
-rw-r--r-- | src/leap/base/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/base/tests/test_auth.py | 58 | ||||
-rw-r--r-- | src/leap/base/tests/test_checks.py | 177 | ||||
-rw-r--r-- | src/leap/base/tests/test_config.py | 247 | ||||
-rw-r--r-- | src/leap/base/tests/test_providers.py | 148 | ||||
-rw-r--r-- | src/leap/base/tests/test_validation.py | 93 |
6 files changed, 723 insertions, 0 deletions
diff --git a/src/leap/base/tests/__init__.py b/src/leap/base/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/base/tests/__init__.py diff --git a/src/leap/base/tests/test_auth.py b/src/leap/base/tests/test_auth.py new file mode 100644 index 00000000..b3009a9b --- /dev/null +++ b/src/leap/base/tests/test_auth.py @@ -0,0 +1,58 @@ +from BaseHTTPServer import BaseHTTPRequestHandler +import urlparse +try: + import unittest2 as unittest +except ImportError: + import unittest + +import requests +#from mock import Mock + +from leap.base import auth +#from leap.base import exceptions +from leap.eip.tests.test_checks import NoLogRequestHandler +from leap.testing.basetest import BaseLeapTest +from leap.testing.https_server import BaseHTTPSServerTestCase + + +class LeapSRPRegisterTests(BaseHTTPSServerTestCase, BaseLeapTest): + __name__ = "leap_srp_register_test" + provider = "testprovider.example.org" + + class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): + responses = { + '/': ['OK', '']} + + def do_GET(self): + path = urlparse.urlparse(self.path) + message = '\n'.join(self.responses.get( + path.path, None)) + self.send_response(200) + self.end_headers() + self.wfile.write(message) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_srp_auth_should_implement_check_methods(self): + SERVER = "https://localhost:8443" + srp_auth = auth.LeapSRPRegister(provider=SERVER, verify=False) + + self.assertTrue(hasattr(srp_auth, "init_session"), + "missing meth") + self.assertTrue(hasattr(srp_auth, "get_registration_uri"), + "missing meth") + self.assertTrue(hasattr(srp_auth, "register_user"), + "missing meth") + + def test_srp_auth_basic_functionality(self): + SERVER = "https://localhost:8443" + srp_auth = auth.LeapSRPRegister(provider=SERVER, verify=False) + + self.assertIsInstance(srp_auth.session, requests.sessions.Session) + self.assertEqual( + srp_auth.get_registration_uri(), + "https://localhost:8443/1/users") diff --git a/src/leap/base/tests/test_checks.py b/src/leap/base/tests/test_checks.py new file mode 100644 index 00000000..8126755b --- /dev/null +++ b/src/leap/base/tests/test_checks.py @@ -0,0 +1,177 @@ +try: + import unittest2 as unittest +except ImportError: + import unittest +import os +import sh + +from mock import (patch, Mock) +from StringIO import StringIO + +from leap.base import checks +from leap.base import exceptions +from leap.testing.basetest import BaseLeapTest + +_uid = os.getuid() + + +class LeapNetworkCheckTest(BaseLeapTest): + __name__ = "leap_network_check_tests" + + def setUp(self): + os.environ['PATH'] += ':/bin' + pass + + def tearDown(self): + pass + + def test_checker_should_implement_check_methods(self): + checker = checks.LeapNetworkChecker() + + self.assertTrue(hasattr(checker, "check_internet_connection"), + "missing meth") + self.assertTrue(hasattr(checker, "check_tunnel_default_interface"), + "missing meth") + self.assertTrue(hasattr(checker, "is_internet_up"), + "missing meth") + self.assertTrue(hasattr(checker, "ping_gateway"), + "missing meth") + self.assertTrue(hasattr(checker, "parse_log_and_react"), + "missing meth") + + def test_checker_should_actually_call_all_tests(self): + checker = checks.LeapNetworkChecker() + mc = Mock() + checker.run_all(checker=mc) + self.assertTrue(mc.check_internet_connection.called, "not called") + self.assertTrue(mc.check_tunnel_default_interface.called, "not called") + self.assertTrue(mc.is_internet_up.called, "not called") + self.assertTrue(mc.parse_log_and_react.called, "not called") + + # ping gateway only called if we pass provider_gw + checker = checks.LeapNetworkChecker(provider_gw="0.0.0.0") + mc = Mock() + checker.run_all(checker=mc) + self.assertTrue(mc.check_internet_connection.called, "not called") + self.assertTrue(mc.check_tunnel_default_interface.called, "not called") + self.assertTrue(mc.ping_gateway.called, "not called") + self.assertTrue(mc.is_internet_up.called, "not called") + self.assertTrue(mc.parse_log_and_react.called, "not called") + + def test_get_default_interface_no_interface(self): + checker = checks.LeapNetworkChecker() + with patch('leap.base.checks.open', create=True) as mock_open: + with self.assertRaises(exceptions.NoDefaultInterfaceFoundError): + mock_open.return_value = StringIO( + "Iface\tDestination Gateway\t" + "Flags\tRefCntd\tUse\tMetric\t" + "Mask\tMTU\tWindow\tIRTT") + checker.get_default_interface_gateway() + + def test_check_tunnel_default_interface(self): + checker = checks.LeapNetworkChecker() + with patch('leap.base.checks.open', create=True) as mock_open: + with self.assertRaises(exceptions.TunnelNotDefaultRouteError): + mock_open.return_value = StringIO( + "Iface\tDestination Gateway\t" + "Flags\tRefCntd\tUse\tMetric\t" + "Mask\tMTU\tWindow\tIRTT\n" + "wlan0\t00000000\t0102A8C0\t" + "0003\t0\t0\t0\t00000000\t0\t0\t0") + checker.check_tunnel_default_interface() + + with patch('leap.base.checks.open', create=True) as mock_open: + mock_open.return_value = StringIO( + "Iface\tDestination Gateway\t" + "Flags\tRefCntd\tUse\tMetric\t" + "Mask\tMTU\tWindow\tIRTT\n" + "tun0\t00000000\t01002A0A\t0003\t0\t0\t0\t00000080\t0\t0\t0") + checker.check_tunnel_default_interface() + + def test_ping_gateway_fail(self): + checker = checks.LeapNetworkChecker() + with patch.object(sh, "ping") as mocked_ping: + with self.assertRaises(exceptions.NoConnectionToGateway): + mocked_ping.return_value = Mock + mocked_ping.return_value.stdout = "11% packet loss" + checker.ping_gateway("4.2.2.2") + + def test_ping_gateway(self): + checker = checks.LeapNetworkChecker() + with patch.object(sh, "ping") as mocked_ping: + mocked_ping.return_value = Mock + mocked_ping.return_value.stdout = """ +PING 4.2.2.2 (4.2.2.2) 56(84) bytes of data. +64 bytes from 4.2.2.2: icmp_req=1 ttl=54 time=33.8 ms +64 bytes from 4.2.2.2: icmp_req=2 ttl=54 time=30.6 ms +64 bytes from 4.2.2.2: icmp_req=3 ttl=54 time=31.4 ms +64 bytes from 4.2.2.2: icmp_req=4 ttl=54 time=36.1 ms +64 bytes from 4.2.2.2: icmp_req=5 ttl=54 time=30.8 ms +64 bytes from 4.2.2.2: icmp_req=6 ttl=54 time=30.4 ms +64 bytes from 4.2.2.2: icmp_req=7 ttl=54 time=30.7 ms +64 bytes from 4.2.2.2: icmp_req=8 ttl=54 time=32.7 ms +64 bytes from 4.2.2.2: icmp_req=9 ttl=54 time=31.4 ms +64 bytes from 4.2.2.2: icmp_req=10 ttl=54 time=33.3 ms + +--- 4.2.2.2 ping statistics --- +10 packets transmitted, 10 received, 0% packet loss, time 9016ms +rtt min/avg/max/mdev = 30.497/32.172/36.161/1.755 ms""" + checker.ping_gateway("4.2.2.2") + + def test_check_internet_connection_failures(self): + checker = checks.LeapNetworkChecker() + TimeoutError = get_ping_timeout_error() + with patch.object(sh, "ping") as mocked_ping: + mocked_ping.side_effect = TimeoutError + with self.assertRaises(exceptions.NoInternetConnection): + with patch.object(checker, "ping_gateway") as mock_gateway: + mock_gateway.side_effect = exceptions.NoConnectionToGateway + checker.check_internet_connection() + + with patch.object(sh, "ping") as mocked_ping: + mocked_ping.side_effect = TimeoutError + with self.assertRaises(exceptions.NoInternetConnection): + with patch.object(checker, "ping_gateway") as mock_gateway: + mock_gateway.return_value = True + checker.check_internet_connection() + + def test_parse_log_and_react(self): + checker = checks.LeapNetworkChecker() + to_call = Mock() + log = [("leap.openvpn - INFO - Mon Nov 19 13:36:24 2012 " + "read UDPv4 [ECONNREFUSED]: Connection refused (code=111)")] + err_matrix = [(checks.EVENT_CONNECT_REFUSED, (to_call, ))] + checker.parse_log_and_react(log, err_matrix) + self.assertTrue(to_call.called) + + log = [("2012-11-19 13:36:26,177 - leap.openvpn - INFO - " + "Mon Nov 19 13:36:24 2012 ERROR: Linux route delete command " + "failed: external program exited"), + ("2012-11-19 13:36:26,178 - leap.openvpn - INFO - " + "Mon Nov 19 13:36:24 2012 ERROR: Linux route delete command " + "failed: external program exited"), + ("2012-11-19 13:36:26,180 - leap.openvpn - INFO - " + "Mon Nov 19 13:36:24 2012 ERROR: Linux route delete command " + "failed: external program exited"), + ("2012-11-19 13:36:26,181 - leap.openvpn - INFO - " + "Mon Nov 19 13:36:24 2012 /sbin/ifconfig tun0 0.0.0.0"), + ("2012-11-19 13:36:26,182 - leap.openvpn - INFO - " + "Mon Nov 19 13:36:24 2012 Linux ip addr del failed: external " + "program exited with error stat"), + ("2012-11-19 13:36:26,183 - leap.openvpn - INFO - " + "Mon Nov 19 13:36:26 2012 SIGTERM[hard,] received, process" + "exiting"), ] + to_call.reset_mock() + checker.parse_log_and_react(log, err_matrix) + self.assertFalse(to_call.called) + + to_call.reset_mock() + checker.parse_log_and_react([], err_matrix) + self.assertFalse(to_call.called) + + +def get_ping_timeout_error(): + try: + sh.ping("-c", "1", "-w", "1", "8.8.7.7") + except Exception as e: + return e diff --git a/src/leap/base/tests/test_config.py b/src/leap/base/tests/test_config.py new file mode 100644 index 00000000..d03149b2 --- /dev/null +++ b/src/leap/base/tests/test_config.py @@ -0,0 +1,247 @@ +import json +import os +import platform +import socket +#import tempfile + +import mock +import requests + +from leap.base import config +from leap.base import constants +from leap.base import exceptions +from leap.eip import constants as eipconstants +from leap.util.fileutil import mkdir_p +from leap.testing.basetest import BaseLeapTest + + +try: + import unittest2 as unittest +except ImportError: + import unittest + +_system = platform.system() + + +class JSONLeapConfigTest(BaseLeapTest): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_metaclass(self): + with self.assertRaises(exceptions.ImproperlyConfigured) as exc: + class DummyTestConfig(config.JSONLeapConfig): + __metaclass__ = config.MetaConfigWithSpec + exc.startswith("missing spec dict") + + class DummyTestConfig(config.JSONLeapConfig): + __metaclass__ = config.MetaConfigWithSpec + spec = {'properties': {}} + with self.assertRaises(exceptions.ImproperlyConfigured) as exc: + DummyTestConfig() + exc.startswith("missing slug") + + class DummyTestConfig(config.JSONLeapConfig): + __metaclass__ = config.MetaConfigWithSpec + spec = {'properties': {}} + slug = "foo" + DummyTestConfig() + +######################################3 +# +# provider fetch tests block +# + + +class ProviderTest(BaseLeapTest): + # override per test fixtures + + def setUp(self): + pass + + def tearDown(self): + pass + + +# XXX depreacated. similar test in eip.checks + +#class BareHomeTestCase(ProviderTest): +# + #__name__ = "provider_config_tests_bare_home" +# + #def test_should_raise_if_missing_eip_json(self): + #with self.assertRaises(exceptions.MissingConfigFileError): + #config.get_config_json(os.path.join(self.home, 'eip.json')) + + +class ProviderDefinitionTestCase(ProviderTest): + # XXX MOVE TO eip.test_checks + # -- kali 2012-08-24 00:38 + + __name__ = "provider_config_tests" + + def setUp(self): + # dump a sample eip file + # XXX Move to Use EIP Spec Instead!!! + # XXX tests to be moved to eip.checks and eip.providers + # XXX can use eipconfig.dump_default_eipconfig + + path = os.path.join(self.home, '.config', 'leap') + mkdir_p(path) + with open(os.path.join(path, 'eip.json'), 'w') as fp: + json.dump(eipconstants.EIP_SAMPLE_JSON, fp) + + +# these tests below should move to +# eip.checks +# config.Configuration has been deprecated + +# TODO: +# - We're instantiating a ProviderTest because we're doing the home wipeoff +# on setUpClass instead of the setUp (for speedup of the general cases). + +# We really should be testing all of them in the same testCase, and +# doing an extra wipe of the tempdir... but be careful!!!! do not mess with +# os.environ home more than needed... that could potentially bite! + +# XXX actually, another thing to fix here is separating tests: +# - test that requests has been called. +# - check deeper for error types/msgs + +# we SHOULD inject requests dep in the constructor +# (so we can pass mock easily). + + +#class ProviderFetchConError(ProviderTest): + #def test_connection_error(self): + #with mock.patch.object(requests, "get") as mock_method: + #mock_method.side_effect = requests.ConnectionError + #cf = config.Configuration() + #self.assertIsInstance(cf.error, str) +# +# +#class ProviderFetchHttpError(ProviderTest): + #def test_file_not_found(self): + #with mock.patch.object(requests, "get") as mock_method: + #mock_method.side_effect = requests.HTTPError + #cf = config.Configuration() + #self.assertIsInstance(cf.error, str) +# +# +#class ProviderFetchInvalidUrl(ProviderTest): + #def test_invalid_url(self): + #cf = config.Configuration("ht") + #self.assertTrue(cf.error) + + +# end provider fetch tests +########################################### + + +class ConfigHelperFunctions(BaseLeapTest): + + __name__ = "config_helper_tests" + + def setUp(self): + pass + + def tearDown(self): + pass + + # tests + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_lin_get_config_file(self): + """ + config file path where expected? (linux) + """ + self.assertEqual( + config.get_config_file( + 'test', folder="foo/bar"), + os.path.expanduser( + '~/.config/leap/foo/bar/test') + ) + + @unittest.skipUnless(_system == "Darwin", "mac only") + def test_mac_get_config_file(self): + """ + config file path where expected? (mac) + """ + self._missing_test_for_plat(do_raise=True) + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_get_config_file(self): + """ + config file path where expected? + """ + self._missing_test_for_plat(do_raise=True) + + # + # XXX hey, I'm raising exceptions here + # on purpose. just wanted to make sure + # that the skip stuff is doing it right. + # If you're working on win/macos tests, + # feel free to remove tests that you see + # are too redundant. + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_lin_get_config_dir(self): + """ + nice config dir? (linux) + """ + self.assertEqual( + config.get_config_dir(), + os.path.expanduser('~/.config/leap')) + + @unittest.skipUnless(_system == "Darwin", "mac only") + def test_mac_get_config_dir(self): + """ + nice config dir? (mac) + """ + self._missing_test_for_plat(do_raise=True) + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_get_config_dir(self): + """ + nice config dir? (win) + """ + self._missing_test_for_plat(do_raise=True) + + # provider paths + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_get_default_provider_path(self): + """ + is default provider path ok? + """ + self.assertEqual( + config.get_default_provider_path(), + os.path.expanduser( + '~/.config/leap/providers/%s/' % + constants.DEFAULT_PROVIDER) + ) + + # validate ip + + def test_validate_ip(self): + """ + check our ip validation + """ + config.validate_ip('3.3.3.3') + with self.assertRaises(socket.error): + config.validate_ip('255.255.255.256') + with self.assertRaises(socket.error): + config.validate_ip('foobar') + + @unittest.skip + def test_validate_domain(self): + """ + code to be written yet + """ + raise NotImplementedError + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/base/tests/test_providers.py b/src/leap/base/tests/test_providers.py new file mode 100644 index 00000000..92bc1f2f --- /dev/null +++ b/src/leap/base/tests/test_providers.py @@ -0,0 +1,148 @@ +import copy +import json +try: + import unittest2 as unittest +except ImportError: + import unittest +import os + +from leap.base.pluggableconfig import ValidationError +from leap.testing.basetest import BaseLeapTest +from leap.base import providers + + +EXPECTED_DEFAULT_CONFIG = { + u"api_version": u"0.1.0", + #u"description": "LEAPTranslatable<{u'en': u'Test provider'}>", + u"description": {u'en': u'Test provider'}, + u"default_language": u"en", + #u"display_name": {u'en': u"Test Provider"}, + u"domain": u"testprovider.example.org", + #u'name': "LEAPTranslatable<{u'en': u'Test Provider'}>", + u'name': {u'en': u'Test Provider'}, + u"enrollment_policy": u"open", + #u"serial": 1, + u"services": [ + u"eip" + ], + u"languages": [u"en"], + u"version": u"0.1.0" +} + + +class TestLeapProviderDefinition(BaseLeapTest): + def setUp(self): + self.domain = "testprovider.example.org" + self.definition = providers.LeapProviderDefinition( + domain=self.domain) + self.definition.save(force=True) + self.definition.load() # why have to load after save?? + self.config = self.definition.config + + def tearDown(self): + if hasattr(self, 'testfile') and os.path.isfile(self.testfile): + os.remove(self.testfile) + + # tests + + # XXX most of these tests can be made more abstract + # and moved to test_baseconfig *triangulate!* + + def test_provider_slug_property(self): + slug = self.definition.slug + self.assertEquals( + slug, + os.path.join( + self.home, + '.config', 'leap', 'providers', + '%s' % self.domain, + 'provider.json')) + with self.assertRaises(AttributeError): + self.definition.slug = 23 + + def test_provider_dump(self): + # check a good provider definition is dumped to disk + self.testfile = self.get_tempfile('test.json') + self.definition.save(to=self.testfile, force=True) + deserialized = json.load(open(self.testfile, 'rb')) + self.maxDiff = None + #import ipdb;ipdb.set_trace() + self.assertEqual(deserialized, EXPECTED_DEFAULT_CONFIG) + + def test_provider_dump_to_slug(self): + # same as above, but we test the ability to save to a + # file generated from the slug. + # XXX THIS TEST SHOULD MOVE TO test_baseconfig + self.definition.save() + filename = self.definition.filename + self.assertTrue(os.path.isfile(filename)) + deserialized = json.load(open(filename, 'rb')) + self.assertEqual(deserialized, EXPECTED_DEFAULT_CONFIG) + + def test_provider_load(self): + # check loading provider from disk file + self.testfile = self.get_tempfile('test_load.json') + with open(self.testfile, 'w') as wf: + wf.write(json.dumps(EXPECTED_DEFAULT_CONFIG)) + self.definition.load(fromfile=self.testfile) + #self.assertDictEqual(self.config, + #EXPECTED_DEFAULT_CONFIG) + self.assertItemsEqual(self.config, EXPECTED_DEFAULT_CONFIG) + + def test_provider_validation(self): + self.definition.validate(self.config) + _config = copy.deepcopy(self.config) + # bad type, raise validation error + _config['domain'] = 111 + with self.assertRaises(ValidationError): + self.definition.validate(_config) + + @unittest.skip + def test_load_malformed_json_definition(self): + raise NotImplementedError + + @unittest.skip + def test_type_validation(self): + # check various type validation + # type cast + raise NotImplementedError + + +class TestLeapProviderSet(BaseLeapTest): + + def setUp(self): + self.providers = providers.LeapProviderSet() + + def tearDown(self): + pass + ### + + def test_get_zero_count(self): + self.assertEqual(self.providers.count, 0) + + @unittest.skip + def test_count_defined_providers(self): + # check the method used for making + # the list of providers + raise NotImplementedError + + @unittest.skip + def test_get_default_provider(self): + raise NotImplementedError + + @unittest.skip + def test_should_be_at_least_one_provider_after_init(self): + # when we init an empty environment, + # there should be at least one provider, + # that will be a dump of the default provider definition + # somehow a high level test + raise NotImplementedError + + @unittest.skip + def test_get_eip_remote_from_default_provider(self): + # from: default provider + # expect: remote eip domain + raise NotImplementedError + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/base/tests/test_validation.py b/src/leap/base/tests/test_validation.py new file mode 100644 index 00000000..b45fbe3a --- /dev/null +++ b/src/leap/base/tests/test_validation.py @@ -0,0 +1,93 @@ +import copy +import datetime +from functools import partial +#import json +try: + import unittest2 as unittest +except ImportError: + import unittest +import os + +from leap.base.config import JSONLeapConfig +from leap.base import pluggableconfig +from leap.testing.basetest import BaseLeapTest + +SAMPLE_CONFIG_DICT = { + 'prop_one': 1, + 'prop_uri': "http://example.org", + 'prop_date': '2012-12-12', +} + +EXPECTED_CONFIG = { + 'prop_one': 1, + 'prop_uri': "http://example.org", + 'prop_date': datetime.datetime(2012, 12, 12) +} + +sample_spec = { + 'description': 'sample schema definition', + 'type': 'object', + 'properties': { + 'prop_one': { + 'type': int, + 'default': 1, + 'required': True + }, + 'prop_uri': { + 'type': str, + 'default': 'http://example.org', + 'required': True, + 'format': 'uri' + }, + 'prop_date': { + 'type': str, + 'default': '2012-12-12', + 'format': 'date' + } + } +} + + +class SampleConfig(JSONLeapConfig): + spec = sample_spec + + @property + def slug(self): + return os.path.expanduser('~/sampleconfig.json') + + +class TestJSONLeapConfigValidation(BaseLeapTest): + def setUp(self): + self.sampleconfig = SampleConfig() + self.sampleconfig.save() + self.sampleconfig.load() + self.config = self.sampleconfig.config + + def tearDown(self): + if hasattr(self, 'testfile') and os.path.isfile(self.testfile): + os.remove(self.testfile) + + # tests + + def test_good_validation(self): + self.sampleconfig.validate(SAMPLE_CONFIG_DICT) + + def test_broken_int(self): + _config = copy.deepcopy(SAMPLE_CONFIG_DICT) + _config['prop_one'] = '1' + self.assertRaises( + pluggableconfig.ValidationError, + partial(self.sampleconfig.validate, _config)) + + def test_format_property(self): + # JsonSchema Validator does not check the format property. + # We should have to extend the Configuration class + blah = copy.deepcopy(SAMPLE_CONFIG_DICT) + blah['prop_uri'] = 'xxx' + self.assertRaises( + pluggableconfig.TypeCastException, + partial(self.sampleconfig.validate, blah)) + + +if __name__ == "__main__": + unittest.main() |