From 7e90eed551bbe847201e5c62edcf0e6493ab2ec3 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 6 Jul 2016 08:43:49 +0200 Subject: [test] toxify tests --- pkg/requirements-testing.pip | 3 - setup.cfg | 2 +- src/leap/common/config/tests/test_baseconfig.py | 271 --------------------- .../common/config/tests/test_get_path_prefix.py | 63 ----- src/leap/common/events/tests/__init__.py | 0 src/leap/common/events/tests/test_auth.py | 64 ----- src/leap/common/events/tests/test_events.py | 203 --------------- .../common/events/tests/test_zmq_components.py | 51 ---- src/leap/common/testing/test_basetest.py | 138 ----------- src/leap/common/tests/__init__.py | 0 src/leap/common/tests/test_certs.py | 99 -------- src/leap/common/tests/test_check.py | 53 ---- src/leap/common/tests/test_http.py | 75 ------ src/leap/common/tests/test_memoize.py | 76 ------ tests/config/test_baseconfig.py | 271 +++++++++++++++++++++ tests/config/test_get_path_prefix.py | 63 +++++ tests/events/test_auth.py | 61 +++++ tests/events/test_events.py | 203 +++++++++++++++ tests/events/test_zmq_components.py | 51 ++++ tests/test_certs.py | 97 ++++++++ tests/test_check.py | 53 ++++ tests/test_http.py | 73 ++++++ tests/test_memoize.py | 76 ++++++ tests/testing/test_basetest.py | 138 +++++++++++ tox.ini | 17 ++ 25 files changed, 1104 insertions(+), 1097 deletions(-) delete mode 100644 pkg/requirements-testing.pip delete mode 100644 src/leap/common/config/tests/test_baseconfig.py delete mode 100644 src/leap/common/config/tests/test_get_path_prefix.py delete mode 100644 src/leap/common/events/tests/__init__.py delete mode 100644 src/leap/common/events/tests/test_auth.py delete mode 100644 src/leap/common/events/tests/test_events.py delete mode 100644 src/leap/common/events/tests/test_zmq_components.py delete mode 100644 src/leap/common/testing/test_basetest.py delete mode 100644 src/leap/common/tests/__init__.py delete mode 100644 src/leap/common/tests/test_certs.py delete mode 100644 src/leap/common/tests/test_check.py delete mode 100644 src/leap/common/tests/test_http.py delete mode 100644 src/leap/common/tests/test_memoize.py create mode 100644 tests/config/test_baseconfig.py create mode 100644 tests/config/test_get_path_prefix.py create mode 100644 tests/events/test_auth.py create mode 100644 tests/events/test_events.py create mode 100644 tests/events/test_zmq_components.py create mode 100644 tests/test_certs.py create mode 100644 tests/test_check.py create mode 100644 tests/test_http.py create mode 100644 tests/test_memoize.py create mode 100644 tests/testing/test_basetest.py create mode 100644 tox.ini diff --git a/pkg/requirements-testing.pip b/pkg/requirements-testing.pip deleted file mode 100644 index c5a3ad0..0000000 --- a/pkg/requirements-testing.pip +++ /dev/null @@ -1,3 +0,0 @@ -mock -setuptools-trial -pep8 diff --git a/setup.cfg b/setup.cfg index 7828918..431f4b9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [aliases] -test = trial +test=pytest [pep8] exclude = versioneer.py,_version.py,*.egg,build,dist,docs diff --git a/src/leap/common/config/tests/test_baseconfig.py b/src/leap/common/config/tests/test_baseconfig.py deleted file mode 100644 index e17e82d..0000000 --- a/src/leap/common/config/tests/test_baseconfig.py +++ /dev/null @@ -1,271 +0,0 @@ -# -*- coding: utf-8 -*- -# test_baseconfig.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for baseconfig -""" -import json -import unittest -import copy - -from leap.common.config.baseconfig import BaseConfig, LocalizedKey -from leap.common.testing.basetest import BaseLeapTest - -from mock import Mock - -# reduced eipconfig sample config -sample_config = { - "gateways": [ - { - "capabilities": { - "adblock": False, - "transport": ["openvpn"], - "user_ips": False - }, - "host": "host.dev.example.org", - }, { - "capabilities": { - "adblock": False, - "transport": ["openvpn"], - "user_ips": False - }, - "host": "host2.dev.example.org", - } - ], - "default_language": "en", - "languages": [ - "en", - "es" - ], - "name": { - "en": "Baseconfig testing environment", - "es": "Entorno de pruebas de Baseconfig" - }, - "serial": 1, - "version": 1 -} - -# reduced eipconfig.spec version -sample_spec = { - 'description': 'sample eip service config', - 'type': 'object', - 'properties': { - 'serial': { - 'type': int, - 'default': 1, - 'required': ["True"] - }, - 'version': { - 'type': int, - 'default': 1, - 'required': ["True"] - }, - "default_language": { - 'type': unicode, - 'default': 'en' - }, - 'languages': { - 'type': list, - 'default': ['en'] - }, - 'name': { - 'type': dict, - 'format': 'translatable', - 'default': {u'en': u'Test Provider'} - }, - 'gateways': { - 'type': list, - 'default': [ - {"capabilities": { - "adblock": True, - "transport": ["openvpn"], - "user_ips": False}, - "host": "location.example.org", - }] - }, - } -} - - -class TestConfig(BaseConfig): - """ - BaseConfig implementation for testing purposes only. - """ - def get_gateways(self): - return self._safe_get_value("gateways") - - def get_serial(self): - return self._safe_get_value("serial") - - def get_version(self): - return self._safe_get_value("version") - - def _get_schema(self): - return sample_spec - - def _get_spec(self): - return self._get_schema() - - def get_default_language(self): - return self._safe_get_value("default_language") - - @LocalizedKey - def get_name(self): - return self._safe_get_value("name") - - -class BaseConfigTest(BaseLeapTest): - - def setUp(self): - pass - - def tearDown(self): - pass - - def _write_config(self, data): - """ - Helper to write some data to a temp config file. - - :param data: data to be used to save in the config file. - :data type: dict (valid json) - """ - self.config_file = self.get_tempfile("config.json") - conf = open(self.config_file, "w") - conf.write(json.dumps(data)) - conf.close() - - def _get_config(self, fromfile=False, data=sample_config): - """ - Helper that returns a TestConfig object using the data parameter - or a sample data. - - :param fromfile: sets if we should use a file or a string - :fromfile type: bool - :param data: sets the data to be used to load in the TestConfig object - :data type: dict (valid json) - :rtype: TestConfig - """ - config = TestConfig() - - loaded = False - if fromfile: - self._write_config(data) - loaded = config.load(self.config_file, relative=False) - else: - json_string = json.dumps(data) - loaded = config.load(data=json_string) - - if not loaded: - return None - - return config - - def test_loads_from_file(self): - config = self._get_config(fromfile=True) - self.assertIsNotNone(config) - - def test_loads_from_data(self): - config = self._get_config() - self.assertIsNotNone(config) - - def test_load_valid_config_from_file(self): - config = self._get_config(fromfile=True) - self.assertIsNotNone(config) - - self.assertEqual(config.get_version(), sample_config["version"]) - self.assertEqual(config.get_serial(), sample_config["serial"]) - self.assertEqual(config.get_gateways(), sample_config["gateways"]) - - def test_load_valid_config_from_data(self): - config = self._get_config() - self.assertIsNotNone(config) - - self.assertEqual(config.get_version(), sample_config["version"]) - self.assertEqual(config.get_serial(), sample_config["serial"]) - self.assertEqual(config.get_gateways(), sample_config["gateways"]) - - def test_safe_get_value_no_config(self): - config = TestConfig() - - with self.assertRaises(AssertionError): - config.get_version() - - def test_safe_get_value_non_existent_value(self): - config = self._get_config() - - self.assertIsNone(config._safe_get_value('non-existent-value')) - - def test_loaded(self): - config = self._get_config() - self.assertTrue(config.loaded()) - - def test_not_loaded(self): - config = TestConfig() - self.assertFalse(config.loaded()) - - def test_save_and_load(self): - config = self._get_config() - config.get_path_prefix = Mock(return_value=self.tempdir) - config_file = 'test_config.json' - self.assertTrue(config.save([config_file])) - - config_saved = TestConfig() - config_file_path = self.get_tempfile(config_file) - self.assertTrue(config_saved.load(config_file_path, relative=False)) - - self.assertEqual(config.get_version(), config_saved.get_version()) - self.assertEqual(config.get_serial(), config_saved.get_serial()) - self.assertEqual(config.get_gateways(), config_saved.get_gateways()) - - def test_localizations(self): - conf = self._get_config() - - self.assertEqual(conf.get_name(lang='en'), sample_config['name']['en']) - self.assertEqual(conf.get_name(lang='es'), sample_config['name']['es']) - - def _localized_config(self, lang): - """ - Helper to change default language of the provider config. - """ - conf = copy.deepcopy(sample_config) - conf['default_language'] = lang - json_string = json.dumps(conf) - config = TestConfig() - config.load(data=json_string) - - return config - - def test_default_localization1(self): - default_language = sample_config['languages'][0] - config = self._localized_config(default_language) - - default_name = sample_config['name'][default_language] - - self.assertEqual(config.get_name(lang='xx'), default_name) - self.assertEqual(config.get_name(), default_name) - - def test_default_localization2(self): - default_language = sample_config['languages'][1] - config = self._localized_config(default_language) - - default_name = sample_config['name'][default_language] - - self.assertEqual(config.get_name(lang='xx'), default_name) - self.assertEqual(config.get_name(), default_name) - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/src/leap/common/config/tests/test_get_path_prefix.py b/src/leap/common/config/tests/test_get_path_prefix.py deleted file mode 100644 index 27824fc..0000000 --- a/src/leap/common/config/tests/test_get_path_prefix.py +++ /dev/null @@ -1,63 +0,0 @@ -# -*- coding: utf-8 -*- -# test_get_path_prefix.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for get_path_prefix -""" -import os -import mock - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from leap.common.config import get_path_prefix -from leap.common.testing.basetest import BaseLeapTest - - -class GetPathPrefixTest(BaseLeapTest): - """ - Tests for the get_path_prefix helper. - - Note: we only are testing that the path is correctly returned and that if - we are not in a bundle (standalone=False) then the paths are different. - - dirspec calculates the correct path using different methods and dlls - (in case of Windows) so we don't implement tests to check if the paths - are the correct ones. - """ - def setUp(self): - pass - - def tearDown(self): - pass - - def test_standalone_path(self): - expected_path = os.path.join('expected', 'path', 'config') - fake_cwd = os.path.join('expected', 'path') - with mock.patch('os.getcwd', lambda: fake_cwd): - path = get_path_prefix(standalone=True) - self.assertEquals(path, expected_path) - - def test_path_prefix(self): - standalone_path = get_path_prefix(standalone=True) - path = get_path_prefix(standalone=False) - self.assertNotEquals(path, standalone_path) - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/src/leap/common/events/tests/__init__.py b/src/leap/common/events/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/leap/common/events/tests/test_auth.py b/src/leap/common/events/tests/test_auth.py deleted file mode 100644 index 78ffd9f..0000000 --- a/src/leap/common/events/tests/test_auth.py +++ /dev/null @@ -1,64 +0,0 @@ -# -*- coding: utf-8 -*- -# test_zmq_components.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the auth module. -""" -import os - -from twisted.trial import unittest -from txzmq import ZmqFactory - -from leap.common.events import auth -from leap.common.testing.basetest import BaseLeapTest -from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX -from leap.common.zmq_utils import maybe_create_and_get_certificates - -from txzmq.test import _wait - - -class ZmqAuthTestCase(unittest.TestCase, BaseLeapTest): - - def setUp(self): - self.setUpEnv(launch_events_server=False) - - self.factory = ZmqFactory() - self._config_prefix = os.path.join(self.tempdir, "leap", "events") - - self.public, self.secret = maybe_create_and_get_certificates( - self._config_prefix, 'server') - - self.authenticator = auth.TxAuthenticator(self.factory) - self.authenticator.start() - self.auth_req = auth.TxAuthenticationRequest(self.factory) - - def tearDown(self): - self.factory.shutdown() - self.tearDownEnv() - - def test_curve_auth(self): - self.auth_req.start() - self.auth_req.allow('127.0.0.1') - public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) - self.auth_req.configure_curve(domain="*", location=public_keys_dir) - - def check(ignored): - authenticator = self.authenticator.authenticator - certs = authenticator.certs['*'] - self.failUnlessEqual(authenticator.whitelist, set([u'127.0.0.1'])) - self.failUnlessEqual(certs[certs.keys()[0]], True) - - return _wait(0.1).addCallback(check) diff --git a/src/leap/common/events/tests/test_events.py b/src/leap/common/events/tests/test_events.py deleted file mode 100644 index d8435c6..0000000 --- a/src/leap/common/events/tests/test_events.py +++ /dev/null @@ -1,203 +0,0 @@ -# -*- coding: utf-8 -*- -# test_events.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the events framework -""" -import os -import logging - -from twisted.internet.reactor import callFromThread -from twisted.trial import unittest -from twisted.internet import defer - -from txzmq import ZmqFactory - -from leap.common.events import server -from leap.common.events import client -from leap.common.events import flags -from leap.common.events import txclient -from leap.common.events import catalog -from leap.common.events.errors import CallbackAlreadyRegisteredError - - -if 'DEBUG' in os.environ: - logging.basicConfig(level=logging.DEBUG) - - -class EventsGenericClientTestCase(object): - - def setUp(self): - flags.set_events_enabled(True) - self.factory = ZmqFactory() - self._server = server.ensure_server( - emit_addr="tcp://127.0.0.1:0", - reg_addr="tcp://127.0.0.1:0", - factory=self.factory, - enable_curve=False) - - self._client.configure_client( - emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, - reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port, - factory=self.factory, enable_curve=False) - - def tearDown(self): - flags.set_events_enabled(False) - self.factory.shutdown() - self._client.instance().reset() - - def test_client_register(self): - """ - Ensure clients can register callbacks. - """ - callbacks = self._client.instance().callbacks - self.assertTrue(len(callbacks) == 0, - 'There should be no callback for this event.') - # register one event - event1 = catalog.CLIENT_UID - - def cbk1(event, _): - return True - - uid1 = self._client.register(event1, cbk1) - # assert for correct registration - self.assertTrue(len(callbacks) == 1) - self.assertTrue(callbacks[event1][uid1] == cbk1, - 'Could not register event in local client.') - # register another event - event2 = catalog.CLIENT_SESSION_ID - - def cbk2(event, _): - return True - - uid2 = self._client.register(event2, cbk2) - # assert for correct registration - self.assertTrue(len(callbacks) == 2) - self.assertTrue(callbacks[event2][uid2] == cbk2, - 'Could not register event in local client.') - - def test_register_signal_replace(self): - """ - Make sure clients can replace already registered callbacks. - """ - event = catalog.CLIENT_UID - d = defer.Deferred() - - def cbk_fail(event, _): - return callFromThread(d.errback, event) - - def cbk_succeed(event, _): - return callFromThread(d.callback, event) - - self._client.register(event, cbk_fail, uid=1) - self._client.register(event, cbk_succeed, uid=1, replace=True) - self._client.emit(event, None) - return d - - def test_register_signal_replace_fails_when_replace_is_false(self): - """ - Make sure clients trying to replace already registered callbacks fail - when replace=False - """ - event = catalog.CLIENT_UID - self._client.register(event, lambda event, _: None, uid=1) - self.assertRaises( - CallbackAlreadyRegisteredError, - self._client.register, - event, lambda event, _: None, uid=1, replace=False) - - def test_register_more_than_one_callback_works(self): - """ - Make sure clients can replace already registered callbacks. - """ - event = catalog.CLIENT_UID - d1 = defer.Deferred() - - def cbk1(event, _): - return callFromThread(d1.callback, event) - - d2 = defer.Deferred() - - def cbk2(event, _): - return d2.callback(event) - - self._client.register(event, cbk1) - self._client.register(event, cbk2) - self._client.emit(event, None) - d = defer.gatherResults([d1, d2]) - return d - - def test_client_receives_signal(self): - """ - Ensure clients can receive signals. - """ - event = catalog.CLIENT_UID - d = defer.Deferred() - - def cbk(events, _): - callFromThread(d.callback, event) - - self._client.register(event, cbk) - self._client.emit(event, None) - return d - - def test_client_unregister_all(self): - """ - Test that the client can unregister all events for one signal. - """ - event1 = catalog.CLIENT_UID - d = defer.Deferred() - # register more than one callback for the same event - self._client.register( - event1, lambda ev, _: callFromThread(d.errback, None)) - self._client.register( - event1, lambda ev, _: callFromThread(d.errback, None)) - # unregister and emit the event - self._client.unregister(event1) - self._client.emit(event1, None) - # register and emit another event so the deferred can succeed - event2 = catalog.CLIENT_SESSION_ID - self._client.register( - event2, lambda ev, _: callFromThread(d.callback, None)) - self._client.emit(event2, None) - return d - - def test_client_unregister_by_uid(self): - """ - Test that the client can unregister an event by uid. - """ - event = catalog.CLIENT_UID - d = defer.Deferred() - # register one callback that would fail - uid = self._client.register( - event, lambda ev, _: callFromThread(d.errback, None)) - # register one callback that will succeed - self._client.register( - event, lambda ev, _: callFromThread(d.callback, None)) - # unregister by uid and emit the event - self._client.unregister(event, uid=uid) - self._client.emit(event, None) - return d - - -class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - - _client = txclient - - -class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - - _client = client diff --git a/src/leap/common/events/tests/test_zmq_components.py b/src/leap/common/events/tests/test_zmq_components.py deleted file mode 100644 index c51e37e..0000000 --- a/src/leap/common/events/tests/test_zmq_components.py +++ /dev/null @@ -1,51 +0,0 @@ -# -*- coding: utf-8 -*- -# test_zmq_components.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for the zmq_components module. -""" -try: - import unittest2 as unittest -except ImportError: - import unittest - -from leap.common.events import zmq_components - - -class AddrParseTestCase(unittest.TestCase): - - def setUp(self): - pass - - def tearDown(self): - pass - - def test_addr_parsing(self): - addr_re = zmq_components.ADDRESS_RE - - self.assertEqual( - addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(), - ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None)) - self.assertEqual( - addr_re.search("tcp://localhost:9000").groups(), - ("tcp", "localhost", "9000")) - self.assertEqual( - addr_re.search("tcp://127.0.0.1:9000").groups(), - ("tcp", "127.0.0.1", "9000")) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/common/testing/test_basetest.py b/src/leap/common/testing/test_basetest.py deleted file mode 100644 index ec42a62..0000000 --- a/src/leap/common/testing/test_basetest.py +++ /dev/null @@ -1,138 +0,0 @@ -# -*- coding: utf-8 -*- -# test_basetest.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Unittests for BaseLeapTest ...becase it's oh so meta -""" -try: - import unittest2 as unittest -except ImportError: - import unittest - -import os -import StringIO - -from leap.common.testing.basetest import BaseLeapTest - -_tempdir = None # global for tempdir checking - - -class _TestCaseRunner(object): - """ - TestCaseRunner used to run BaseLeapTest - """ - def run_testcase(self, testcase=None): - """ - Runs a given TestCase - - :param testcase: the testcase - :type testcase: unittest.TestCase - """ - if not testcase: - return None - loader = unittest.TestLoader() - suite = loader.loadTestsFromTestCase(testcase) - - # Create runner, and run testcase - io = StringIO.StringIO() - runner = unittest.TextTestRunner(stream=io) - results = runner.run(suite) - return results - - -class TestAbstractBaseLeapTest(unittest.TestCase, _TestCaseRunner): - """ - TestCase for BaseLeapTest abs - """ - def test_abstract_base_class(self): - """ - Test errors raised when setup/teardown not overloaded - """ - class _BaseTest(BaseLeapTest): - def test_dummy_method(self): - pass - - def test_tautology(self): - assert True - - results = self.run_testcase(_BaseTest) - - # should be 2 errors: NotImplemented - # raised for setUp/tearDown - self.assertEquals(results.testsRun, 2) - self.assertEquals(len(results.failures), 0) - self.assertEquals(len(results.errors), 2) - - -class TestInitBaseLeapTest(BaseLeapTest): - """ - TestCase for testing initialization of BaseLeapTest - """ - - def setUp(self): - self.setUpEnv() - - def tearDown(self): - self.tearDownEnv() - - def test_path_is_changed(self): - """tests whether we have changed the PATH env var""" - os_path = os.environ['PATH'] - self.assertTrue(os_path.startswith(self.tempdir)) - - def test_old_path_is_saved(self): - """tests whether we restore the PATH env var""" - self.assertTrue(len(self.old_path) > 1) - - -class TestCleanedBaseLeapTest(unittest.TestCase, _TestCaseRunner): - """ - TestCase for testing tempdir creation and cleanup - """ - - def test_tempdir_is_cleaned_after_tests(self): - """ - test if a TestCase derived from BaseLeapTest creates and cleans the - temporal dir - """ - class _BaseTest(BaseLeapTest): - def setUp(self): - """set global _tempdir to this instance tempdir""" - global _tempdir - _tempdir = self.tempdir - - def tearDown(self): - """nothing""" - pass - - def test_tempdir_created(self): - """test if tempdir was created""" - self.assertTrue(os.path.isdir(self.tempdir)) - - def test_tempdir_created_on_setupclass(self): - """test if tempdir is the one created by setupclass""" - self.assertEqual(_tempdir, self.tempdir) - - results = self.run_testcase(_BaseTest) - self.assertEquals(results.testsRun, 2) - self.assertEquals(len(results.failures), 0) - self.assertEquals(len(results.errors), 0) - - # did we cleaned the tempdir? - self.assertFalse(os.path.isdir(_tempdir)) - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/common/tests/__init__.py b/src/leap/common/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py deleted file mode 100644 index 8ebc0f4..0000000 --- a/src/leap/common/tests/test_certs.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# test_certs.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for: - * leap/common/certs.py -""" -import os -import time - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from leap.common import certs -from leap.common.testing.basetest import BaseLeapTest - -TEST_CERT_PEM = os.path.join( - os.path.split(__file__)[0], - '..', 'testing', "leaptest_combined_keycert.pem") - -# Values from the test cert file: -# Not Before: Sep 3 17:52:16 2013 GMT -# Not After : Sep 1 17:52:16 2023 GMT -CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0) -CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) - - -class CertsTest(BaseLeapTest): - - def setUp(self): - self.setUpEnv() - - def tearDown(self): - self.tearDownEnv() - - def test_should_redownload_if_no_cert(self): - self.assertTrue(certs.should_redownload(certfile="")) - - def test_should_redownload_if_invalid_pem(self): - cert_path = self.get_tempfile('test_pem_file.pem') - - with open(cert_path, 'w') as f: - f.write('this is some invalid data for the pem file') - - self.assertTrue(certs.should_redownload(cert_path)) - - def test_should_redownload_if_before(self): - def new_now(): - time.struct_time(CERT_NOT_BEFORE) - self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) - - def test_should_redownload_if_after(self): - def new_now(): - time.struct_time(CERT_NOT_AFTER) - self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) - - def test_not_should_redownload(self): - self.assertFalse(certs.should_redownload(TEST_CERT_PEM)) - - def test_is_valid_pemfile(self): - with open(TEST_CERT_PEM) as f: - cert_data = f.read() - - self.assertTrue(certs.is_valid_pemfile(cert_data)) - - def test_not_is_valid_pemfile(self): - cert_data = 'this is some invalid data for the pem file' - - self.assertFalse(certs.is_valid_pemfile(cert_data)) - - def test_get_cert_time_boundaries(self): - """ - This test ensures us that the returned values are returned in UTC/GMT. - """ - with open(TEST_CERT_PEM) as f: - cert_data = f.read() - - valid_from, valid_to = certs.get_cert_time_boundaries(cert_data) - self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE) - self.assertEqual(tuple(valid_to), CERT_NOT_AFTER) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/common/tests/test_check.py b/src/leap/common/tests/test_check.py deleted file mode 100644 index cd488ff..0000000 --- a/src/leap/common/tests/test_check.py +++ /dev/null @@ -1,53 +0,0 @@ -# -*- coding: utf-8 -*- -# test_check.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for: - * leap/common/check.py -""" -try: - import unittest2 as unittest -except ImportError: - import unittest - -import mock - -from leap.common import check - - -class CheckTests(unittest.TestCase): - def test_raises_on_false_condition(self): - with self.assertRaises(AssertionError): - check.leap_assert(False, "Condition") - - def test_raises_on_none_condition(self): - with self.assertRaises(AssertionError): - check.leap_assert(None, "Condition") - - def test_suceeds_with_good_condition(self): - check.leap_assert(True, "") - - def test_raises_on_bad_type(self): - with self.assertRaises(AssertionError): - check.leap_assert_type(42, str) - - def test_succeeds_on_good_type(self): - check.leap_assert_type(42, int) - - @mock.patch("traceback.extract_stack", mock.MagicMock(return_value=None)) - def test_does_not_raise_on_bug(self): - with self.assertRaises(AssertionError): - check.leap_assert(False, "") diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py deleted file mode 100644 index f44550f..0000000 --- a/src/leap/common/tests/test_http.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# test_http.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for: - * leap/common/http.py -""" -import os -try: - import unittest2 as unittest -except ImportError: - import unittest - -from leap.common import http -from leap.common.testing.basetest import BaseLeapTest - -TEST_CERT_PEM = os.path.join( - os.path.split(__file__)[0], - '..', 'testing', "leaptest_combined_keycert.pem") - - -class HTTPClientTest(BaseLeapTest): - - def setUp(self): - pass - - def tearDown(self): - pass - - def test_agents_sharing_pool_by_default(self): - client = http.HTTPClient() - client2 = http.HTTPClient(TEST_CERT_PEM) - self.assertNotEquals( - client._agent, client2._agent, "Expected dedicated agents") - self.assertEquals( - client._agent._pool, client2._agent._pool, - "Pool was not reused by default") - - def test_agent_can_have_dedicated_custom_pool(self): - custom_pool = http._HTTPConnectionPool( - None, - timeout=10, - maxPersistentPerHost=42, - persistent=False - ) - self.assertEquals(custom_pool.maxPersistentPerHost, 42, - "Custom persistent connections " - "limit is not being respected") - self.assertFalse(custom_pool.persistent, - "Custom persistence is not being respected") - default_client = http.HTTPClient() - custom_client = http.HTTPClient(pool=custom_pool) - - self.assertNotEquals( - default_client._agent, custom_client._agent, - "No agent reuse is expected") - self.assertEquals( - custom_pool, custom_client._agent._pool, - "Custom pool usage was not respected") - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/common/tests/test_memoize.py b/src/leap/common/tests/test_memoize.py deleted file mode 100644 index c923fc5..0000000 --- a/src/leap/common/tests/test_memoize.py +++ /dev/null @@ -1,76 +0,0 @@ -# -*- coding: utf-8 -*- -# test_check.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for: - * leap/common/decorators._memoized -""" -try: - import unittest2 as unittest -except ImportError: - import unittest - -from time import sleep - -import mock - -from leap.common.decorators import _memoized - - -class MemoizeTests(unittest.TestCase): - - def test_memoized_call(self): - """ - Test that a memoized function only calls once. - """ - witness = mock.Mock() - - @_memoized - def callmebaby(): - return witness() - - for i in range(10): - callmebaby() - witness.assert_called_once_with() - - def test_cache_invalidation(self): - """ - Test that time makes the cache invalidation expire. - """ - witness = mock.Mock() - - cache_with_alzheimer = _memoized - cache_with_alzheimer.CACHE_INVALIDATION_DELTA = 1 - - @cache_with_alzheimer - def callmebaby(*args): - return witness(*args) - - for i in range(10): - callmebaby() - witness.assert_called_once_with() - - sleep(2) - callmebaby("onemoretime") - - expected = [mock.call(), mock.call("onemoretime")] - self.assertEqual( - witness.call_args_list, - expected) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/config/test_baseconfig.py b/tests/config/test_baseconfig.py new file mode 100644 index 0000000..22358ec --- /dev/null +++ b/tests/config/test_baseconfig.py @@ -0,0 +1,271 @@ +# -*- coding: utf-8 -*- +# test_baseconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for baseconfig +""" +import json +import unittest +import copy + +from leap.common.config.baseconfig import BaseConfig, LocalizedKey +from leap.common.testing.basetest import BaseLeapTest + +from mock import Mock + +# reduced eipconfig sample config +sample_config = { + "gateways": [ + { + "capabilities": { + "adblock": False, + "transport": ["openvpn"], + "user_ips": False + }, + "host": "host.dev.example.org", + }, { + "capabilities": { + "adblock": False, + "transport": ["openvpn"], + "user_ips": False + }, + "host": "host2.dev.example.org", + } + ], + "default_language": "en", + "languages": [ + "en", + "es" + ], + "name": { + "en": "Baseconfig testing environment", + "es": "Entorno de pruebas de Baseconfig" + }, + "serial": 1, + "version": 1 +} + +# reduced eipconfig.spec version +sample_spec = { + 'description': 'sample eip service config', + 'type': 'object', + 'properties': { + 'serial': { + 'type': int, + 'default': 1, + 'required': ["True"] + }, + 'version': { + 'type': int, + 'default': 1, + 'required': ["True"] + }, + "default_language": { + 'type': unicode, + 'default': 'en' + }, + 'languages': { + 'type': list, + 'default': ['en'] + }, + 'name': { + 'type': dict, + 'format': 'translatable', + 'default': {u'en': u'Test Provider'} + }, + 'gateways': { + 'type': list, + 'default': [ + {"capabilities": { + "adblock": True, + "transport": ["openvpn"], + "user_ips": False}, + "host": "location.example.org", + }] + }, + } +} + + +class Config(BaseConfig): + """ + BaseConfig implementation for testing purposes only. + """ + def get_gateways(self): + return self._safe_get_value("gateways") + + def get_serial(self): + return self._safe_get_value("serial") + + def get_version(self): + return self._safe_get_value("version") + + def _get_schema(self): + return sample_spec + + def _get_spec(self): + return self._get_schema() + + def get_default_language(self): + return self._safe_get_value("default_language") + + @LocalizedKey + def get_name(self): + return self._safe_get_value("name") + + +class BaseConfigTest(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def _write_config(self, data): + """ + Helper to write some data to a temp config file. + + :param data: data to be used to save in the config file. + :data type: dict (valid json) + """ + self.config_file = self.get_tempfile("config.json") + conf = open(self.config_file, "w") + conf.write(json.dumps(data)) + conf.close() + + def _get_config(self, fromfile=False, data=sample_config): + """ + Helper that returns a Config object using the data parameter + or a sample data. + + :param fromfile: sets if we should use a file or a string + :fromfile type: bool + :param data: sets the data to be used to load in the Config object + :data type: dict (valid json) + :rtype: Config + """ + config = Config() + + loaded = False + if fromfile: + self._write_config(data) + loaded = config.load(self.config_file, relative=False) + else: + json_string = json.dumps(data) + loaded = config.load(data=json_string) + + if not loaded: + return None + + return config + + def test_loads_from_file(self): + config = self._get_config(fromfile=True) + self.assertIsNotNone(config) + + def test_loads_from_data(self): + config = self._get_config() + self.assertIsNotNone(config) + + def test_load_valid_config_from_file(self): + config = self._get_config(fromfile=True) + self.assertIsNotNone(config) + + self.assertEqual(config.get_version(), sample_config["version"]) + self.assertEqual(config.get_serial(), sample_config["serial"]) + self.assertEqual(config.get_gateways(), sample_config["gateways"]) + + def test_load_valid_config_from_data(self): + config = self._get_config() + self.assertIsNotNone(config) + + self.assertEqual(config.get_version(), sample_config["version"]) + self.assertEqual(config.get_serial(), sample_config["serial"]) + self.assertEqual(config.get_gateways(), sample_config["gateways"]) + + def test_safe_get_value_no_config(self): + config = Config() + + with self.assertRaises(AssertionError): + config.get_version() + + def test_safe_get_value_non_existent_value(self): + config = self._get_config() + + self.assertIsNone(config._safe_get_value('non-existent-value')) + + def test_loaded(self): + config = self._get_config() + self.assertTrue(config.loaded()) + + def test_not_loaded(self): + config = Config() + self.assertFalse(config.loaded()) + + def test_save_and_load(self): + config = self._get_config() + config.get_path_prefix = Mock(return_value=self.tempdir) + config_file = 'test_config.json' + self.assertTrue(config.save([config_file])) + + config_saved = Config() + config_file_path = self.get_tempfile(config_file) + self.assertTrue(config_saved.load(config_file_path, relative=False)) + + self.assertEqual(config.get_version(), config_saved.get_version()) + self.assertEqual(config.get_serial(), config_saved.get_serial()) + self.assertEqual(config.get_gateways(), config_saved.get_gateways()) + + def test_localizations(self): + conf = self._get_config() + + self.assertEqual(conf.get_name(lang='en'), sample_config['name']['en']) + self.assertEqual(conf.get_name(lang='es'), sample_config['name']['es']) + + def _localized_config(self, lang): + """ + Helper to change default language of the provider config. + """ + conf = copy.deepcopy(sample_config) + conf['default_language'] = lang + json_string = json.dumps(conf) + config = Config() + config.load(data=json_string) + + return config + + def test_default_localization1(self): + default_language = sample_config['languages'][0] + config = self._localized_config(default_language) + + default_name = sample_config['name'][default_language] + + self.assertEqual(config.get_name(lang='xx'), default_name) + self.assertEqual(config.get_name(), default_name) + + def test_default_localization2(self): + default_language = sample_config['languages'][1] + config = self._localized_config(default_language) + + default_name = sample_config['name'][default_language] + + self.assertEqual(config.get_name(lang='xx'), default_name) + self.assertEqual(config.get_name(), default_name) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/config/test_get_path_prefix.py b/tests/config/test_get_path_prefix.py new file mode 100644 index 0000000..e383a7e --- /dev/null +++ b/tests/config/test_get_path_prefix.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# test_get_path_prefix.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for get_path_prefix +""" +import os +import mock + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common.config import get_path_prefix +from leap.common.testing.basetest import BaseLeapTest + + +class GetPathPrefixTest(BaseLeapTest): + """ + Tests for the get_path_prefix helper. + + Note: we only are testing that the path is correctly returned and that if + we are not in a bundle (standalone=False) then the paths are different. + + xdg calculates the correct path using different methods and dlls + (in case of Windows) so we don't implement tests to check if the paths + are the correct ones. + """ + def setUp(self): + pass + + def tearDown(self): + pass + + def test_standalone_path(self): + expected_path = os.path.join('expected', 'path', 'config') + fake_cwd = os.path.join('expected', 'path') + with mock.patch('os.getcwd', lambda: fake_cwd): + path = get_path_prefix(standalone=True) + self.assertEquals(path, expected_path) + + def test_path_prefix(self): + standalone_path = get_path_prefix(standalone=True) + path = get_path_prefix(standalone=False) + self.assertNotEquals(path, standalone_path) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/events/test_auth.py b/tests/events/test_auth.py new file mode 100644 index 0000000..5442ebd --- /dev/null +++ b/tests/events/test_auth.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the auth module. +""" +import os + +from twisted.trial import unittest +from txzmq import ZmqFactory + +from leap.common.events import auth +from leap.common.testing.basetest import BaseLeapTest +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX +from leap.common.zmq_utils import maybe_create_and_get_certificates + +from txzmq.test import _wait + + +class ZmqAuthTestCase(unittest.TestCase, BaseLeapTest): + + def setUp(self): + self.factory = ZmqFactory() + self._config_prefix = os.path.join(self.tempdir, "leap", "events") + + self.public, self.secret = maybe_create_and_get_certificates( + self._config_prefix, 'server') + + self.authenticator = auth.TxAuthenticator(self.factory) + self.authenticator.start() + self.auth_req = auth.TxAuthenticationRequest(self.factory) + + def tearDown(self): + self.factory.shutdown() + + def test_curve_auth(self): + self.auth_req.start() + self.auth_req.allow('127.0.0.1') + public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) + self.auth_req.configure_curve(domain="*", location=public_keys_dir) + + def check(ignored): + authenticator = self.authenticator.authenticator + certs = authenticator.certs['*'] + self.failUnlessEqual(authenticator.whitelist, set([u'127.0.0.1'])) + self.failUnlessEqual(certs[certs.keys()[0]], True) + + return _wait(0.1).addCallback(check) diff --git a/tests/events/test_events.py b/tests/events/test_events.py new file mode 100644 index 0000000..d8435c6 --- /dev/null +++ b/tests/events/test_events.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +# test_events.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the events framework +""" +import os +import logging + +from twisted.internet.reactor import callFromThread +from twisted.trial import unittest +from twisted.internet import defer + +from txzmq import ZmqFactory + +from leap.common.events import server +from leap.common.events import client +from leap.common.events import flags +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError + + +if 'DEBUG' in os.environ: + logging.basicConfig(level=logging.DEBUG) + + +class EventsGenericClientTestCase(object): + + def setUp(self): + flags.set_events_enabled(True) + self.factory = ZmqFactory() + self._server = server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0", + factory=self.factory, + enable_curve=False) + + self._client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port, + factory=self.factory, enable_curve=False) + + def tearDown(self): + flags.set_events_enabled(False) + self.factory.shutdown() + self._client.instance().reset() + + def test_client_register(self): + """ + Ensure clients can register callbacks. + """ + callbacks = self._client.instance().callbacks + self.assertTrue(len(callbacks) == 0, + 'There should be no callback for this event.') + # register one event + event1 = catalog.CLIENT_UID + + def cbk1(event, _): + return True + + uid1 = self._client.register(event1, cbk1) + # assert for correct registration + self.assertTrue(len(callbacks) == 1) + self.assertTrue(callbacks[event1][uid1] == cbk1, + 'Could not register event in local client.') + # register another event + event2 = catalog.CLIENT_SESSION_ID + + def cbk2(event, _): + return True + + uid2 = self._client.register(event2, cbk2) + # assert for correct registration + self.assertTrue(len(callbacks) == 2) + self.assertTrue(callbacks[event2][uid2] == cbk2, + 'Could not register event in local client.') + + def test_register_signal_replace(self): + """ + Make sure clients can replace already registered callbacks. + """ + event = catalog.CLIENT_UID + d = defer.Deferred() + + def cbk_fail(event, _): + return callFromThread(d.errback, event) + + def cbk_succeed(event, _): + return callFromThread(d.callback, event) + + self._client.register(event, cbk_fail, uid=1) + self._client.register(event, cbk_succeed, uid=1, replace=True) + self._client.emit(event, None) + return d + + def test_register_signal_replace_fails_when_replace_is_false(self): + """ + Make sure clients trying to replace already registered callbacks fail + when replace=False + """ + event = catalog.CLIENT_UID + self._client.register(event, lambda event, _: None, uid=1) + self.assertRaises( + CallbackAlreadyRegisteredError, + self._client.register, + event, lambda event, _: None, uid=1, replace=False) + + def test_register_more_than_one_callback_works(self): + """ + Make sure clients can replace already registered callbacks. + """ + event = catalog.CLIENT_UID + d1 = defer.Deferred() + + def cbk1(event, _): + return callFromThread(d1.callback, event) + + d2 = defer.Deferred() + + def cbk2(event, _): + return d2.callback(event) + + self._client.register(event, cbk1) + self._client.register(event, cbk2) + self._client.emit(event, None) + d = defer.gatherResults([d1, d2]) + return d + + def test_client_receives_signal(self): + """ + Ensure clients can receive signals. + """ + event = catalog.CLIENT_UID + d = defer.Deferred() + + def cbk(events, _): + callFromThread(d.callback, event) + + self._client.register(event, cbk) + self._client.emit(event, None) + return d + + def test_client_unregister_all(self): + """ + Test that the client can unregister all events for one signal. + """ + event1 = catalog.CLIENT_UID + d = defer.Deferred() + # register more than one callback for the same event + self._client.register( + event1, lambda ev, _: callFromThread(d.errback, None)) + self._client.register( + event1, lambda ev, _: callFromThread(d.errback, None)) + # unregister and emit the event + self._client.unregister(event1) + self._client.emit(event1, None) + # register and emit another event so the deferred can succeed + event2 = catalog.CLIENT_SESSION_ID + self._client.register( + event2, lambda ev, _: callFromThread(d.callback, None)) + self._client.emit(event2, None) + return d + + def test_client_unregister_by_uid(self): + """ + Test that the client can unregister an event by uid. + """ + event = catalog.CLIENT_UID + d = defer.Deferred() + # register one callback that would fail + uid = self._client.register( + event, lambda ev, _: callFromThread(d.errback, None)) + # register one callback that will succeed + self._client.register( + event, lambda ev, _: callFromThread(d.callback, None)) + # unregister by uid and emit the event + self._client.unregister(event, uid=uid) + self._client.emit(event, None) + return d + + +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): + + _client = txclient + + +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): + + _client = client diff --git a/tests/events/test_zmq_components.py b/tests/events/test_zmq_components.py new file mode 100644 index 0000000..c51e37e --- /dev/null +++ b/tests/events/test_zmq_components.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for the zmq_components module. +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common.events import zmq_components + + +class AddrParseTestCase(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_addr_parsing(self): + addr_re = zmq_components.ADDRESS_RE + + self.assertEqual( + addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(), + ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None)) + self.assertEqual( + addr_re.search("tcp://localhost:9000").groups(), + ("tcp", "localhost", "9000")) + self.assertEqual( + addr_re.search("tcp://127.0.0.1:9000").groups(), + ("tcp", "127.0.0.1", "9000")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_certs.py b/tests/test_certs.py new file mode 100644 index 0000000..b06fbf8 --- /dev/null +++ b/tests/test_certs.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# test_certs.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for: + * leap/common/certs.py +""" +import time + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common import certs +from leap.common.testing.basetest import BaseLeapTest +from leap.common.testing.https_server import where + +TEST_CERT_PEM = where("leaptest_combined_keycert.pem") + +# Values from the test cert file: +# Not Before: Sep 3 17:52:16 2013 GMT +# Not After : Sep 1 17:52:16 2023 GMT +CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0) +CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) + + +class CertsTest(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_should_redownload_if_no_cert(self): + self.assertTrue(certs.should_redownload(certfile="")) + + def test_should_redownload_if_invalid_pem(self): + cert_path = self.get_tempfile('test_pem_file.pem') + + with open(cert_path, 'w') as f: + f.write('this is some invalid data for the pem file') + + self.assertTrue(certs.should_redownload(cert_path)) + + def test_should_redownload_if_before(self): + def new_now(): + time.struct_time(CERT_NOT_BEFORE) + self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + + def test_should_redownload_if_after(self): + def new_now(): + time.struct_time(CERT_NOT_AFTER) + self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + + def test_not_should_redownload(self): + self.assertFalse(certs.should_redownload(TEST_CERT_PEM)) + + def test_is_valid_pemfile(self): + with open(TEST_CERT_PEM) as f: + cert_data = f.read() + + self.assertTrue(certs.is_valid_pemfile(cert_data)) + + def test_not_is_valid_pemfile(self): + cert_data = 'this is some invalid data for the pem file' + + self.assertFalse(certs.is_valid_pemfile(cert_data)) + + def test_get_cert_time_boundaries(self): + """ + This test ensures us that the returned values are returned in UTC/GMT. + """ + with open(TEST_CERT_PEM) as f: + cert_data = f.read() + + valid_from, valid_to = certs.get_cert_time_boundaries(cert_data) + self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE) + self.assertEqual(tuple(valid_to), CERT_NOT_AFTER) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_check.py b/tests/test_check.py new file mode 100644 index 0000000..cd488ff --- /dev/null +++ b/tests/test_check.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# test_check.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for: + * leap/common/check.py +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +import mock + +from leap.common import check + + +class CheckTests(unittest.TestCase): + def test_raises_on_false_condition(self): + with self.assertRaises(AssertionError): + check.leap_assert(False, "Condition") + + def test_raises_on_none_condition(self): + with self.assertRaises(AssertionError): + check.leap_assert(None, "Condition") + + def test_suceeds_with_good_condition(self): + check.leap_assert(True, "") + + def test_raises_on_bad_type(self): + with self.assertRaises(AssertionError): + check.leap_assert_type(42, str) + + def test_succeeds_on_good_type(self): + check.leap_assert_type(42, int) + + @mock.patch("traceback.extract_stack", mock.MagicMock(return_value=None)) + def test_does_not_raise_on_bug(self): + with self.assertRaises(AssertionError): + check.leap_assert(False, "") diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..d5526e6 --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for: + * leap/common/http.py +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common import http +from leap.common.testing.basetest import BaseLeapTest +from leap.common.testing.https_server import where + +TEST_CERT_PEM = where("leaptest_combined_keycert.pem") + + +class HTTPClientTest(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_agents_sharing_pool_by_default(self): + client = http.HTTPClient() + client2 = http.HTTPClient(TEST_CERT_PEM) + self.assertNotEquals( + client._agent, client2._agent, "Expected dedicated agents") + self.assertEquals( + client._agent._pool, client2._agent._pool, + "Pool was not reused by default") + + def test_agent_can_have_dedicated_custom_pool(self): + custom_pool = http._HTTPConnectionPool( + None, + timeout=10, + maxPersistentPerHost=42, + persistent=False + ) + self.assertEquals(custom_pool.maxPersistentPerHost, 42, + "Custom persistent connections " + "limit is not being respected") + self.assertFalse(custom_pool.persistent, + "Custom persistence is not being respected") + default_client = http.HTTPClient() + custom_client = http.HTTPClient(pool=custom_pool) + + self.assertNotEquals( + default_client._agent, custom_client._agent, + "No agent reuse is expected") + self.assertEquals( + custom_pool, custom_client._agent._pool, + "Custom pool usage was not respected") + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_memoize.py b/tests/test_memoize.py new file mode 100644 index 0000000..c923fc5 --- /dev/null +++ b/tests/test_memoize.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# test_check.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for: + * leap/common/decorators._memoized +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +from time import sleep + +import mock + +from leap.common.decorators import _memoized + + +class MemoizeTests(unittest.TestCase): + + def test_memoized_call(self): + """ + Test that a memoized function only calls once. + """ + witness = mock.Mock() + + @_memoized + def callmebaby(): + return witness() + + for i in range(10): + callmebaby() + witness.assert_called_once_with() + + def test_cache_invalidation(self): + """ + Test that time makes the cache invalidation expire. + """ + witness = mock.Mock() + + cache_with_alzheimer = _memoized + cache_with_alzheimer.CACHE_INVALIDATION_DELTA = 1 + + @cache_with_alzheimer + def callmebaby(*args): + return witness(*args) + + for i in range(10): + callmebaby() + witness.assert_called_once_with() + + sleep(2) + callmebaby("onemoretime") + + expected = [mock.call(), mock.call("onemoretime")] + self.assertEqual( + witness.call_args_list, + expected) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/testing/test_basetest.py b/tests/testing/test_basetest.py new file mode 100644 index 0000000..8be7aba --- /dev/null +++ b/tests/testing/test_basetest.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# test_basetest.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Unittests for BaseLeapTest ...becase it's oh so meta +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +import os +import StringIO + +from leap.common.testing.basetest import BaseLeapTest + +_tempdir = None # global for tempdir checking + + +class _TestCaseRunner(object): + """ + TestCaseRunner used to run BaseLeapTest + """ + def run_testcase(self, testcase=None): + """ + Runs a given TestCase + + :param testcase: the testcase + :type testcase: unittest.TestCase + """ + if not testcase: + return None + loader = unittest.TestLoader() + suite = loader.loadTestsFromTestCase(testcase) + + # Create runner, and run testcase + io = StringIO.StringIO() + runner = unittest.TextTestRunner(stream=io) + results = runner.run(suite) + return results + + +class TestAbstractBaseLeapTest(unittest.TestCase, _TestCaseRunner): + """ + TestCase for BaseLeapTest abs + """ + def test_abstract_base_class(self): + """ + Test errors raised when setup/teardown not overloaded + """ + class _BaseTest(BaseLeapTest): + def test_dummy_method(self): + pass + + def test_tautology(self): + assert True + + results = self.run_testcase(_BaseTest) + + # should be 2 errors: NotImplemented + # raised for setUp/tearDown + self.assertEquals(results.testsRun, 2) + self.assertEquals(len(results.failures), 0) + self.assertEquals(len(results.errors), 2) + + +class TestInitBaseLeapTest(BaseLeapTest): + """ + TestCase for testing initialization of BaseLeapTest + """ + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_path_is_changed(self): + """tests whether we have changed the PATH env var""" + os_path = os.environ['PATH'] + self.assertTrue(os_path.startswith(self.tempdir)) + + def test_old_path_is_saved(self): + """tests whether we restore the PATH env var""" + self.assertTrue(len(self.old_path) > 1) + + +class TestCleanedBaseLeapTest(unittest.TestCase, _TestCaseRunner): + """ + TestCase for testing tempdir creation and cleanup + """ + + def test_tempdir_is_cleaned_after_tests(self): + """ + test if a TestCase derived from BaseLeapTest creates and cleans the + temporal dir + """ + class _BaseTest(BaseLeapTest): + def setUp(self): + """set global _tempdir to this instance tempdir""" + global _tempdir + _tempdir = self.tempdir + + def tearDown(self): + """nothing""" + pass + + def test_tempdir_created(self): + """test if tempdir was created""" + self.assertTrue(os.path.isdir(self.tempdir)) + + def test_tempdir_created_on_setupclass(self): + """test if tempdir is the one created by setupclass""" + self.assertEqual(_tempdir, self.tempdir) + + results = self.run_testcase(_BaseTest) + self.assertEquals(results.testsRun, 2) + self.assertEquals(len(results.failures), 0) + self.assertEquals(len(results.errors), 0) + + # did we cleaned the tempdir? + self.assertFalse(os.path.isdir(_tempdir)) + +if __name__ == "__main__": + unittest.main() diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..5ca1ca4 --- /dev/null +++ b/tox.ini @@ -0,0 +1,17 @@ +# Tox (http://tox.testrun.org/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py27 + +[testenv] +commands = py.test {posargs} +deps = + pytest + mock + setuptools-trial + pep8 +setenv = + HOME=/tmp -- cgit v1.2.3