diff options
| author | Kali Kaneko <kali@leap.se> | 2017-07-14 14:56:09 +0200 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2017-07-14 14:56:09 +0200 | 
| commit | 581030aa3170bf0e1dfe61678348f97f42d4fe56 (patch) | |
| tree | 588a54194f4b6432705d2aafa01cf20e09e72dc7 /tests/unit | |
| parent | aceff1bf17afe60378ff5ad7335573da7ba9ddde (diff) | |
[tests] move all tests to unit tests
Diffstat (limited to 'tests/unit')
| -rw-r--r-- | tests/unit/config/test_baseconfig.py | 271 | ||||
| -rw-r--r-- | tests/unit/config/test_get_path_prefix.py | 75 | ||||
| -rw-r--r-- | tests/unit/events/test_auth.py | 61 | ||||
| -rw-r--r-- | tests/unit/events/test_events.py | 203 | ||||
| -rw-r--r-- | tests/unit/events/test_zmq_components.py | 51 | ||||
| -rw-r--r-- | tests/unit/test_certs.py | 97 | ||||
| -rw-r--r-- | tests/unit/test_check.py | 53 | ||||
| -rw-r--r-- | tests/unit/test_http.py | 73 | ||||
| -rw-r--r-- | tests/unit/test_memoize.py | 76 | 
9 files changed, 960 insertions, 0 deletions
| diff --git a/tests/unit/config/test_baseconfig.py b/tests/unit/config/test_baseconfig.py new file mode 100644 index 0000000..22358ec --- /dev/null +++ b/tests/unit/config/test_baseconfig.py @@ -0,0 +1,271 @@ +# -*- coding: utf-8 -*- +# test_baseconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for baseconfig +""" +import json +import unittest +import copy + +from leap.common.config.baseconfig import BaseConfig, LocalizedKey +from leap.common.testing.basetest import BaseLeapTest + +from mock import Mock + +# reduced eipconfig sample config +sample_config = { +    "gateways": [ +        { +            "capabilities": { +                "adblock": False, +                "transport": ["openvpn"], +                "user_ips": False +            }, +            "host": "host.dev.example.org", +        }, { +            "capabilities": { +                "adblock": False, +                "transport": ["openvpn"], +                "user_ips": False +            }, +            "host": "host2.dev.example.org", +        } +    ], +    "default_language": "en", +    "languages": [ +        "en", +        "es" +    ], +    "name": { +        "en": "Baseconfig testing environment", +        "es": "Entorno de pruebas de Baseconfig" +    }, +    "serial": 1, +    "version": 1 +} + +# reduced eipconfig.spec version +sample_spec = { +    'description': 'sample eip service config', +    'type': 'object', +    'properties': { +        'serial': { +            'type': int, +            'default': 1, +            'required': ["True"] +        }, +        'version': { +            'type': int, +            'default': 1, +            'required': ["True"] +        }, +        "default_language": { +            'type': unicode, +            'default': 'en' +        }, +        'languages': { +            'type': list, +            'default': ['en'] +        }, +        'name': { +            'type': dict, +            'format': 'translatable', +            'default': {u'en': u'Test Provider'} +        }, +        'gateways': { +            'type': list, +            'default': [ +                {"capabilities": { +                    "adblock": True, +                    "transport": ["openvpn"], +                    "user_ips": False}, +                 "host": "location.example.org", +                 }] +        }, +    } +} + + +class Config(BaseConfig): +    """ +    BaseConfig implementation for testing purposes only. +    """ +    def get_gateways(self): +        return self._safe_get_value("gateways") + +    def get_serial(self): +        return self._safe_get_value("serial") + +    def get_version(self): +        return self._safe_get_value("version") + +    def _get_schema(self): +        return sample_spec + +    def _get_spec(self): +        return self._get_schema() + +    def get_default_language(self): +        return self._safe_get_value("default_language") + +    @LocalizedKey +    def get_name(self): +        return self._safe_get_value("name") + + +class BaseConfigTest(BaseLeapTest): + +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def _write_config(self, data): +        """ +        Helper to write some data to a temp config file. + +        :param data: data to be used to save in the config file. +        :data type: dict (valid json) +        """ +        self.config_file = self.get_tempfile("config.json") +        conf = open(self.config_file, "w") +        conf.write(json.dumps(data)) +        conf.close() + +    def _get_config(self, fromfile=False, data=sample_config): +        """ +        Helper that returns a Config object using the data parameter +        or a sample data. + +        :param fromfile: sets if we should use a file or a string +        :fromfile type: bool +        :param data: sets the data to be used to load in the Config object +        :data type: dict (valid json) +        :rtype: Config +        """ +        config = Config() + +        loaded = False +        if fromfile: +            self._write_config(data) +            loaded = config.load(self.config_file, relative=False) +        else: +            json_string = json.dumps(data) +            loaded = config.load(data=json_string) + +        if not loaded: +            return None + +        return config + +    def test_loads_from_file(self): +        config = self._get_config(fromfile=True) +        self.assertIsNotNone(config) + +    def test_loads_from_data(self): +        config = self._get_config() +        self.assertIsNotNone(config) + +    def test_load_valid_config_from_file(self): +        config = self._get_config(fromfile=True) +        self.assertIsNotNone(config) + +        self.assertEqual(config.get_version(), sample_config["version"]) +        self.assertEqual(config.get_serial(), sample_config["serial"]) +        self.assertEqual(config.get_gateways(), sample_config["gateways"]) + +    def test_load_valid_config_from_data(self): +        config = self._get_config() +        self.assertIsNotNone(config) + +        self.assertEqual(config.get_version(), sample_config["version"]) +        self.assertEqual(config.get_serial(), sample_config["serial"]) +        self.assertEqual(config.get_gateways(), sample_config["gateways"]) + +    def test_safe_get_value_no_config(self): +        config = Config() + +        with self.assertRaises(AssertionError): +            config.get_version() + +    def test_safe_get_value_non_existent_value(self): +        config = self._get_config() + +        self.assertIsNone(config._safe_get_value('non-existent-value')) + +    def test_loaded(self): +        config = self._get_config() +        self.assertTrue(config.loaded()) + +    def test_not_loaded(self): +        config = Config() +        self.assertFalse(config.loaded()) + +    def test_save_and_load(self): +        config = self._get_config() +        config.get_path_prefix = Mock(return_value=self.tempdir) +        config_file = 'test_config.json' +        self.assertTrue(config.save([config_file])) + +        config_saved = Config() +        config_file_path = self.get_tempfile(config_file) +        self.assertTrue(config_saved.load(config_file_path, relative=False)) + +        self.assertEqual(config.get_version(), config_saved.get_version()) +        self.assertEqual(config.get_serial(), config_saved.get_serial()) +        self.assertEqual(config.get_gateways(), config_saved.get_gateways()) + +    def test_localizations(self): +        conf = self._get_config() + +        self.assertEqual(conf.get_name(lang='en'), sample_config['name']['en']) +        self.assertEqual(conf.get_name(lang='es'), sample_config['name']['es']) + +    def _localized_config(self, lang): +        """ +        Helper to change default language of the provider config. +        """ +        conf = copy.deepcopy(sample_config) +        conf['default_language'] = lang +        json_string = json.dumps(conf) +        config = Config() +        config.load(data=json_string) + +        return config + +    def test_default_localization1(self): +        default_language = sample_config['languages'][0] +        config = self._localized_config(default_language) + +        default_name = sample_config['name'][default_language] + +        self.assertEqual(config.get_name(lang='xx'), default_name) +        self.assertEqual(config.get_name(), default_name) + +    def test_default_localization2(self): +        default_language = sample_config['languages'][1] +        config = self._localized_config(default_language) + +        default_name = sample_config['name'][default_language] + +        self.assertEqual(config.get_name(lang='xx'), default_name) +        self.assertEqual(config.get_name(), default_name) + + +if __name__ == "__main__": +    unittest.main(verbosity=2) diff --git a/tests/unit/config/test_get_path_prefix.py b/tests/unit/config/test_get_path_prefix.py new file mode 100644 index 0000000..878e2fe --- /dev/null +++ b/tests/unit/config/test_get_path_prefix.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# test_get_path_prefix.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for get_path_prefix +""" +import os +import mock +import pytest +import sys + +from leap.common.config import get_path_prefix +from leap.common.testing.basetest import BaseLeapTest + + +class GetPathPrefixTest(BaseLeapTest): +    """ +    Tests for the get_path_prefix helper. + +    Note: we only are testing that the path is correctly returned and that if +    we are not in a bundle (standalone=False) then the paths are different. + +    xdg calculates the correct path using different methods and dlls +    (in case of Windows) so we don't implement tests to check if the paths +    are the correct ones. +    """ +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def test_standalone_path(self): +        expected_path = os.path.join('expected', 'path', 'config') +        fake_cwd = os.path.join('expected', 'path') +        with mock.patch('os.getcwd', lambda: fake_cwd): +            path = get_path_prefix(standalone=True) +        self.assertEquals(path, expected_path) + +    def test_path_prefix(self): +        standalone_path = get_path_prefix(standalone=True) +        path = get_path_prefix(standalone=False) +        self.assertNotEquals(path, standalone_path) + +homedir = os.environ.get('HOME') + + +@pytest.mark.parametrize( +    "scenario", [ +        # (platform, path_parts, standalone), +        ('linux', [homedir, '.config'], False), +        ('darwin', [homedir, 'Library/Preferences'], False), +        ('win32', [homedir, 'xyz'], False), +        ('standalone', [os.getcwd(), 'config'], True)]) +def test_get_path_prefix(scenario, monkeypatch): +    platform, path_parts, standalone = scenario +    if platform == 'win32': +        pytest.skip()  # TODO: find a way to add test for win32 platform +    # set a custom temporary platform +    monkeypatch.setattr(sys, 'platform', platform) +    expected_prefix = os.path.join(*path_parts) +    assert expected_prefix == get_path_prefix(standalone) diff --git a/tests/unit/events/test_auth.py b/tests/unit/events/test_auth.py new file mode 100644 index 0000000..5442ebd --- /dev/null +++ b/tests/unit/events/test_auth.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the auth module. +""" +import os + +from twisted.trial import unittest +from txzmq import ZmqFactory + +from leap.common.events import auth +from leap.common.testing.basetest import BaseLeapTest +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX +from leap.common.zmq_utils import maybe_create_and_get_certificates + +from txzmq.test import _wait + + +class ZmqAuthTestCase(unittest.TestCase, BaseLeapTest): + +    def setUp(self): +        self.factory = ZmqFactory() +        self._config_prefix = os.path.join(self.tempdir, "leap", "events") + +        self.public, self.secret = maybe_create_and_get_certificates( +            self._config_prefix, 'server') + +        self.authenticator = auth.TxAuthenticator(self.factory) +        self.authenticator.start() +        self.auth_req = auth.TxAuthenticationRequest(self.factory) + +    def tearDown(self): +        self.factory.shutdown() + +    def test_curve_auth(self): +        self.auth_req.start() +        self.auth_req.allow('127.0.0.1') +        public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) +        self.auth_req.configure_curve(domain="*", location=public_keys_dir) + +        def check(ignored): +            authenticator = self.authenticator.authenticator +            certs = authenticator.certs['*'] +            self.failUnlessEqual(authenticator.whitelist, set([u'127.0.0.1'])) +            self.failUnlessEqual(certs[certs.keys()[0]], True) + +        return _wait(0.1).addCallback(check) diff --git a/tests/unit/events/test_events.py b/tests/unit/events/test_events.py new file mode 100644 index 0000000..d8435c6 --- /dev/null +++ b/tests/unit/events/test_events.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +# test_events.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the events framework +""" +import os +import logging + +from twisted.internet.reactor import callFromThread +from twisted.trial import unittest +from twisted.internet import defer + +from txzmq import ZmqFactory + +from leap.common.events import server +from leap.common.events import client +from leap.common.events import flags +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError + + +if 'DEBUG' in os.environ: +    logging.basicConfig(level=logging.DEBUG) + + +class EventsGenericClientTestCase(object): + +    def setUp(self): +        flags.set_events_enabled(True) +        self.factory = ZmqFactory() +        self._server = server.ensure_server( +            emit_addr="tcp://127.0.0.1:0", +            reg_addr="tcp://127.0.0.1:0", +            factory=self.factory, +            enable_curve=False) + +        self._client.configure_client( +            emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, +            reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port, +            factory=self.factory, enable_curve=False) + +    def tearDown(self): +        flags.set_events_enabled(False) +        self.factory.shutdown() +        self._client.instance().reset() + +    def test_client_register(self): +        """ +        Ensure clients can register callbacks. +        """ +        callbacks = self._client.instance().callbacks +        self.assertTrue(len(callbacks) == 0, +                        'There should be no callback for this event.') +        # register one event +        event1 = catalog.CLIENT_UID + +        def cbk1(event, _): +            return True + +        uid1 = self._client.register(event1, cbk1) +        # assert for correct registration +        self.assertTrue(len(callbacks) == 1) +        self.assertTrue(callbacks[event1][uid1] == cbk1, +                        'Could not register event in local client.') +        # register another event +        event2 = catalog.CLIENT_SESSION_ID + +        def cbk2(event, _): +            return True + +        uid2 = self._client.register(event2, cbk2) +        # assert for correct registration +        self.assertTrue(len(callbacks) == 2) +        self.assertTrue(callbacks[event2][uid2] == cbk2, +                        'Could not register event in local client.') + +    def test_register_signal_replace(self): +        """ +        Make sure clients can replace already registered callbacks. +        """ +        event = catalog.CLIENT_UID +        d = defer.Deferred() + +        def cbk_fail(event, _): +            return callFromThread(d.errback, event) + +        def cbk_succeed(event, _): +            return callFromThread(d.callback, event) + +        self._client.register(event, cbk_fail, uid=1) +        self._client.register(event, cbk_succeed, uid=1, replace=True) +        self._client.emit(event, None) +        return d + +    def test_register_signal_replace_fails_when_replace_is_false(self): +        """ +        Make sure clients trying to replace already registered callbacks fail +        when replace=False +        """ +        event = catalog.CLIENT_UID +        self._client.register(event, lambda event, _: None, uid=1) +        self.assertRaises( +            CallbackAlreadyRegisteredError, +            self._client.register, +            event, lambda event, _: None, uid=1, replace=False) + +    def test_register_more_than_one_callback_works(self): +        """ +        Make sure clients can replace already registered callbacks. +        """ +        event = catalog.CLIENT_UID +        d1 = defer.Deferred() + +        def cbk1(event, _): +            return callFromThread(d1.callback, event) + +        d2 = defer.Deferred() + +        def cbk2(event, _): +            return d2.callback(event) + +        self._client.register(event, cbk1) +        self._client.register(event, cbk2) +        self._client.emit(event, None) +        d = defer.gatherResults([d1, d2]) +        return d + +    def test_client_receives_signal(self): +        """ +        Ensure clients can receive signals. +        """ +        event = catalog.CLIENT_UID +        d = defer.Deferred() + +        def cbk(events, _): +            callFromThread(d.callback, event) + +        self._client.register(event, cbk) +        self._client.emit(event, None) +        return d + +    def test_client_unregister_all(self): +        """ +        Test that the client can unregister all events for one signal. +        """ +        event1 = catalog.CLIENT_UID +        d = defer.Deferred() +        # register more than one callback for the same event +        self._client.register( +            event1, lambda ev, _: callFromThread(d.errback, None)) +        self._client.register( +            event1, lambda ev, _: callFromThread(d.errback, None)) +        # unregister and emit the event +        self._client.unregister(event1) +        self._client.emit(event1, None) +        # register and emit another event so the deferred can succeed +        event2 = catalog.CLIENT_SESSION_ID +        self._client.register( +            event2, lambda ev, _: callFromThread(d.callback, None)) +        self._client.emit(event2, None) +        return d + +    def test_client_unregister_by_uid(self): +        """ +        Test that the client can unregister an event by uid. +        """ +        event = catalog.CLIENT_UID +        d = defer.Deferred() +        # register one callback that would fail +        uid = self._client.register( +            event, lambda ev, _: callFromThread(d.errback, None)) +        # register one callback that will succeed +        self._client.register( +            event, lambda ev, _: callFromThread(d.callback, None)) +        # unregister by uid and emit the event +        self._client.unregister(event, uid=uid) +        self._client.emit(event, None) +        return d + + +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): + +    _client = txclient + + +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): + +    _client = client diff --git a/tests/unit/events/test_zmq_components.py b/tests/unit/events/test_zmq_components.py new file mode 100644 index 0000000..c51e37e --- /dev/null +++ b/tests/unit/events/test_zmq_components.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the zmq_components module. +""" +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from leap.common.events import zmq_components + + +class AddrParseTestCase(unittest.TestCase): + +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def test_addr_parsing(self): +        addr_re = zmq_components.ADDRESS_RE + +        self.assertEqual( +            addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(), +            ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None)) +        self.assertEqual( +            addr_re.search("tcp://localhost:9000").groups(), +            ("tcp", "localhost", "9000")) +        self.assertEqual( +            addr_re.search("tcp://127.0.0.1:9000").groups(), +            ("tcp", "127.0.0.1", "9000")) + + +if __name__ == "__main__": +    unittest.main() diff --git a/tests/unit/test_certs.py b/tests/unit/test_certs.py new file mode 100644 index 0000000..b06fbf8 --- /dev/null +++ b/tests/unit/test_certs.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# test_certs.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/common/certs.py +""" +import time + +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from leap.common import certs +from leap.common.testing.basetest import BaseLeapTest +from leap.common.testing.https_server import where + +TEST_CERT_PEM = where("leaptest_combined_keycert.pem") + +# Values from the test cert file: +# Not Before: Sep  3 17:52:16 2013 GMT +# Not After : Sep  1 17:52:16 2023 GMT +CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0) +CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) + + +class CertsTest(BaseLeapTest): + +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def test_should_redownload_if_no_cert(self): +        self.assertTrue(certs.should_redownload(certfile="")) + +    def test_should_redownload_if_invalid_pem(self): +        cert_path = self.get_tempfile('test_pem_file.pem') + +        with open(cert_path, 'w') as f: +            f.write('this is some invalid data for the pem file') + +        self.assertTrue(certs.should_redownload(cert_path)) + +    def test_should_redownload_if_before(self): +        def new_now(): +            time.struct_time(CERT_NOT_BEFORE) +        self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + +    def test_should_redownload_if_after(self): +        def new_now(): +            time.struct_time(CERT_NOT_AFTER) +        self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + +    def test_not_should_redownload(self): +        self.assertFalse(certs.should_redownload(TEST_CERT_PEM)) + +    def test_is_valid_pemfile(self): +        with open(TEST_CERT_PEM) as f: +            cert_data = f.read() + +        self.assertTrue(certs.is_valid_pemfile(cert_data)) + +    def test_not_is_valid_pemfile(self): +        cert_data = 'this is some invalid data for the pem file' + +        self.assertFalse(certs.is_valid_pemfile(cert_data)) + +    def test_get_cert_time_boundaries(self): +        """ +        This test ensures us that the returned values are returned in UTC/GMT. +        """ +        with open(TEST_CERT_PEM) as f: +            cert_data = f.read() + +        valid_from, valid_to = certs.get_cert_time_boundaries(cert_data) +        self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE) +        self.assertEqual(tuple(valid_to), CERT_NOT_AFTER) + + +if __name__ == "__main__": +    unittest.main() diff --git a/tests/unit/test_check.py b/tests/unit/test_check.py new file mode 100644 index 0000000..cd488ff --- /dev/null +++ b/tests/unit/test_check.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# test_check.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/common/check.py +""" +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +import mock + +from leap.common import check + + +class CheckTests(unittest.TestCase): +    def test_raises_on_false_condition(self): +        with self.assertRaises(AssertionError): +            check.leap_assert(False, "Condition") + +    def test_raises_on_none_condition(self): +        with self.assertRaises(AssertionError): +            check.leap_assert(None, "Condition") + +    def test_suceeds_with_good_condition(self): +        check.leap_assert(True, "") + +    def test_raises_on_bad_type(self): +        with self.assertRaises(AssertionError): +            check.leap_assert_type(42, str) + +    def test_succeeds_on_good_type(self): +        check.leap_assert_type(42, int) + +    @mock.patch("traceback.extract_stack", mock.MagicMock(return_value=None)) +    def test_does_not_raise_on_bug(self): +        with self.assertRaises(AssertionError): +            check.leap_assert(False, "") diff --git a/tests/unit/test_http.py b/tests/unit/test_http.py new file mode 100644 index 0000000..d5526e6 --- /dev/null +++ b/tests/unit/test_http.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/common/http.py +""" +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from leap.common import http +from leap.common.testing.basetest import BaseLeapTest +from leap.common.testing.https_server import where + +TEST_CERT_PEM = where("leaptest_combined_keycert.pem") + + +class HTTPClientTest(BaseLeapTest): + +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def test_agents_sharing_pool_by_default(self): +        client = http.HTTPClient() +        client2 = http.HTTPClient(TEST_CERT_PEM) +        self.assertNotEquals( +            client._agent, client2._agent, "Expected dedicated agents") +        self.assertEquals( +            client._agent._pool, client2._agent._pool, +            "Pool was not reused by default") + +    def test_agent_can_have_dedicated_custom_pool(self): +        custom_pool = http._HTTPConnectionPool( +            None, +            timeout=10, +            maxPersistentPerHost=42, +            persistent=False +        ) +        self.assertEquals(custom_pool.maxPersistentPerHost, 42, +                          "Custom persistent connections " +                          "limit is not being respected") +        self.assertFalse(custom_pool.persistent, +                         "Custom persistence is not being respected") +        default_client = http.HTTPClient() +        custom_client = http.HTTPClient(pool=custom_pool) + +        self.assertNotEquals( +            default_client._agent, custom_client._agent, +            "No agent reuse is expected") +        self.assertEquals( +            custom_pool, custom_client._agent._pool, +            "Custom pool usage was not respected") + +if __name__ == "__main__": +    unittest.main() diff --git a/tests/unit/test_memoize.py b/tests/unit/test_memoize.py new file mode 100644 index 0000000..c923fc5 --- /dev/null +++ b/tests/unit/test_memoize.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# test_check.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/common/decorators._memoized +""" +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from time import sleep + +import mock + +from leap.common.decorators import _memoized + + +class MemoizeTests(unittest.TestCase): + +    def test_memoized_call(self): +        """ +        Test that a memoized function only calls once. +        """ +        witness = mock.Mock() + +        @_memoized +        def callmebaby(): +            return witness() + +        for i in range(10): +            callmebaby() +        witness.assert_called_once_with() + +    def test_cache_invalidation(self): +        """ +        Test that time makes the cache invalidation expire. +        """ +        witness = mock.Mock() + +        cache_with_alzheimer = _memoized +        cache_with_alzheimer.CACHE_INVALIDATION_DELTA = 1 + +        @cache_with_alzheimer +        def callmebaby(*args): +            return witness(*args) + +        for i in range(10): +            callmebaby() +        witness.assert_called_once_with() + +        sleep(2) +        callmebaby("onemoretime") + +        expected = [mock.call(), mock.call("onemoretime")] +        self.assertEqual( +            witness.call_args_list, +            expected) + + +if __name__ == "__main__": +    unittest.main() | 
