summaryrefslogtreecommitdiff
path: root/tests/unit
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2017-07-14 14:56:09 +0200
committerKali Kaneko <kali@leap.se>2017-07-14 14:56:09 +0200
commit581030aa3170bf0e1dfe61678348f97f42d4fe56 (patch)
tree588a54194f4b6432705d2aafa01cf20e09e72dc7 /tests/unit
parentaceff1bf17afe60378ff5ad7335573da7ba9ddde (diff)
[tests] move all tests to unit tests
Diffstat (limited to 'tests/unit')
-rw-r--r--tests/unit/config/test_baseconfig.py271
-rw-r--r--tests/unit/config/test_get_path_prefix.py75
-rw-r--r--tests/unit/events/test_auth.py61
-rw-r--r--tests/unit/events/test_events.py203
-rw-r--r--tests/unit/events/test_zmq_components.py51
-rw-r--r--tests/unit/test_certs.py97
-rw-r--r--tests/unit/test_check.py53
-rw-r--r--tests/unit/test_http.py73
-rw-r--r--tests/unit/test_memoize.py76
9 files changed, 960 insertions, 0 deletions
diff --git a/tests/unit/config/test_baseconfig.py b/tests/unit/config/test_baseconfig.py
new file mode 100644
index 0000000..22358ec
--- /dev/null
+++ b/tests/unit/config/test_baseconfig.py
@@ -0,0 +1,271 @@
+# -*- coding: utf-8 -*-
+# test_baseconfig.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for baseconfig
+"""
+import json
+import unittest
+import copy
+
+from leap.common.config.baseconfig import BaseConfig, LocalizedKey
+from leap.common.testing.basetest import BaseLeapTest
+
+from mock import Mock
+
+# reduced eipconfig sample config
+sample_config = {
+ "gateways": [
+ {
+ "capabilities": {
+ "adblock": False,
+ "transport": ["openvpn"],
+ "user_ips": False
+ },
+ "host": "host.dev.example.org",
+ }, {
+ "capabilities": {
+ "adblock": False,
+ "transport": ["openvpn"],
+ "user_ips": False
+ },
+ "host": "host2.dev.example.org",
+ }
+ ],
+ "default_language": "en",
+ "languages": [
+ "en",
+ "es"
+ ],
+ "name": {
+ "en": "Baseconfig testing environment",
+ "es": "Entorno de pruebas de Baseconfig"
+ },
+ "serial": 1,
+ "version": 1
+}
+
+# reduced eipconfig.spec version
+sample_spec = {
+ 'description': 'sample eip service config',
+ 'type': 'object',
+ 'properties': {
+ 'serial': {
+ 'type': int,
+ 'default': 1,
+ 'required': ["True"]
+ },
+ 'version': {
+ 'type': int,
+ 'default': 1,
+ 'required': ["True"]
+ },
+ "default_language": {
+ 'type': unicode,
+ 'default': 'en'
+ },
+ 'languages': {
+ 'type': list,
+ 'default': ['en']
+ },
+ 'name': {
+ 'type': dict,
+ 'format': 'translatable',
+ 'default': {u'en': u'Test Provider'}
+ },
+ 'gateways': {
+ 'type': list,
+ 'default': [
+ {"capabilities": {
+ "adblock": True,
+ "transport": ["openvpn"],
+ "user_ips": False},
+ "host": "location.example.org",
+ }]
+ },
+ }
+}
+
+
+class Config(BaseConfig):
+ """
+ BaseConfig implementation for testing purposes only.
+ """
+ def get_gateways(self):
+ return self._safe_get_value("gateways")
+
+ def get_serial(self):
+ return self._safe_get_value("serial")
+
+ def get_version(self):
+ return self._safe_get_value("version")
+
+ def _get_schema(self):
+ return sample_spec
+
+ def _get_spec(self):
+ return self._get_schema()
+
+ def get_default_language(self):
+ return self._safe_get_value("default_language")
+
+ @LocalizedKey
+ def get_name(self):
+ return self._safe_get_value("name")
+
+
+class BaseConfigTest(BaseLeapTest):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def _write_config(self, data):
+ """
+ Helper to write some data to a temp config file.
+
+ :param data: data to be used to save in the config file.
+ :data type: dict (valid json)
+ """
+ self.config_file = self.get_tempfile("config.json")
+ conf = open(self.config_file, "w")
+ conf.write(json.dumps(data))
+ conf.close()
+
+ def _get_config(self, fromfile=False, data=sample_config):
+ """
+ Helper that returns a Config object using the data parameter
+ or a sample data.
+
+ :param fromfile: sets if we should use a file or a string
+ :fromfile type: bool
+ :param data: sets the data to be used to load in the Config object
+ :data type: dict (valid json)
+ :rtype: Config
+ """
+ config = Config()
+
+ loaded = False
+ if fromfile:
+ self._write_config(data)
+ loaded = config.load(self.config_file, relative=False)
+ else:
+ json_string = json.dumps(data)
+ loaded = config.load(data=json_string)
+
+ if not loaded:
+ return None
+
+ return config
+
+ def test_loads_from_file(self):
+ config = self._get_config(fromfile=True)
+ self.assertIsNotNone(config)
+
+ def test_loads_from_data(self):
+ config = self._get_config()
+ self.assertIsNotNone(config)
+
+ def test_load_valid_config_from_file(self):
+ config = self._get_config(fromfile=True)
+ self.assertIsNotNone(config)
+
+ self.assertEqual(config.get_version(), sample_config["version"])
+ self.assertEqual(config.get_serial(), sample_config["serial"])
+ self.assertEqual(config.get_gateways(), sample_config["gateways"])
+
+ def test_load_valid_config_from_data(self):
+ config = self._get_config()
+ self.assertIsNotNone(config)
+
+ self.assertEqual(config.get_version(), sample_config["version"])
+ self.assertEqual(config.get_serial(), sample_config["serial"])
+ self.assertEqual(config.get_gateways(), sample_config["gateways"])
+
+ def test_safe_get_value_no_config(self):
+ config = Config()
+
+ with self.assertRaises(AssertionError):
+ config.get_version()
+
+ def test_safe_get_value_non_existent_value(self):
+ config = self._get_config()
+
+ self.assertIsNone(config._safe_get_value('non-existent-value'))
+
+ def test_loaded(self):
+ config = self._get_config()
+ self.assertTrue(config.loaded())
+
+ def test_not_loaded(self):
+ config = Config()
+ self.assertFalse(config.loaded())
+
+ def test_save_and_load(self):
+ config = self._get_config()
+ config.get_path_prefix = Mock(return_value=self.tempdir)
+ config_file = 'test_config.json'
+ self.assertTrue(config.save([config_file]))
+
+ config_saved = Config()
+ config_file_path = self.get_tempfile(config_file)
+ self.assertTrue(config_saved.load(config_file_path, relative=False))
+
+ self.assertEqual(config.get_version(), config_saved.get_version())
+ self.assertEqual(config.get_serial(), config_saved.get_serial())
+ self.assertEqual(config.get_gateways(), config_saved.get_gateways())
+
+ def test_localizations(self):
+ conf = self._get_config()
+
+ self.assertEqual(conf.get_name(lang='en'), sample_config['name']['en'])
+ self.assertEqual(conf.get_name(lang='es'), sample_config['name']['es'])
+
+ def _localized_config(self, lang):
+ """
+ Helper to change default language of the provider config.
+ """
+ conf = copy.deepcopy(sample_config)
+ conf['default_language'] = lang
+ json_string = json.dumps(conf)
+ config = Config()
+ config.load(data=json_string)
+
+ return config
+
+ def test_default_localization1(self):
+ default_language = sample_config['languages'][0]
+ config = self._localized_config(default_language)
+
+ default_name = sample_config['name'][default_language]
+
+ self.assertEqual(config.get_name(lang='xx'), default_name)
+ self.assertEqual(config.get_name(), default_name)
+
+ def test_default_localization2(self):
+ default_language = sample_config['languages'][1]
+ config = self._localized_config(default_language)
+
+ default_name = sample_config['name'][default_language]
+
+ self.assertEqual(config.get_name(lang='xx'), default_name)
+ self.assertEqual(config.get_name(), default_name)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/tests/unit/config/test_get_path_prefix.py b/tests/unit/config/test_get_path_prefix.py
new file mode 100644
index 0000000..878e2fe
--- /dev/null
+++ b/tests/unit/config/test_get_path_prefix.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# test_get_path_prefix.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for get_path_prefix
+"""
+import os
+import mock
+import pytest
+import sys
+
+from leap.common.config import get_path_prefix
+from leap.common.testing.basetest import BaseLeapTest
+
+
+class GetPathPrefixTest(BaseLeapTest):
+ """
+ Tests for the get_path_prefix helper.
+
+ Note: we only are testing that the path is correctly returned and that if
+ we are not in a bundle (standalone=False) then the paths are different.
+
+ xdg calculates the correct path using different methods and dlls
+ (in case of Windows) so we don't implement tests to check if the paths
+ are the correct ones.
+ """
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_standalone_path(self):
+ expected_path = os.path.join('expected', 'path', 'config')
+ fake_cwd = os.path.join('expected', 'path')
+ with mock.patch('os.getcwd', lambda: fake_cwd):
+ path = get_path_prefix(standalone=True)
+ self.assertEquals(path, expected_path)
+
+ def test_path_prefix(self):
+ standalone_path = get_path_prefix(standalone=True)
+ path = get_path_prefix(standalone=False)
+ self.assertNotEquals(path, standalone_path)
+
+homedir = os.environ.get('HOME')
+
+
+@pytest.mark.parametrize(
+ "scenario", [
+ # (platform, path_parts, standalone),
+ ('linux', [homedir, '.config'], False),
+ ('darwin', [homedir, 'Library/Preferences'], False),
+ ('win32', [homedir, 'xyz'], False),
+ ('standalone', [os.getcwd(), 'config'], True)])
+def test_get_path_prefix(scenario, monkeypatch):
+ platform, path_parts, standalone = scenario
+ if platform == 'win32':
+ pytest.skip() # TODO: find a way to add test for win32 platform
+ # set a custom temporary platform
+ monkeypatch.setattr(sys, 'platform', platform)
+ expected_prefix = os.path.join(*path_parts)
+ assert expected_prefix == get_path_prefix(standalone)
diff --git a/tests/unit/events/test_auth.py b/tests/unit/events/test_auth.py
new file mode 100644
index 0000000..5442ebd
--- /dev/null
+++ b/tests/unit/events/test_auth.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+# test_zmq_components.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the auth module.
+"""
+import os
+
+from twisted.trial import unittest
+from txzmq import ZmqFactory
+
+from leap.common.events import auth
+from leap.common.testing.basetest import BaseLeapTest
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+
+from txzmq.test import _wait
+
+
+class ZmqAuthTestCase(unittest.TestCase, BaseLeapTest):
+
+ def setUp(self):
+ self.factory = ZmqFactory()
+ self._config_prefix = os.path.join(self.tempdir, "leap", "events")
+
+ self.public, self.secret = maybe_create_and_get_certificates(
+ self._config_prefix, 'server')
+
+ self.authenticator = auth.TxAuthenticator(self.factory)
+ self.authenticator.start()
+ self.auth_req = auth.TxAuthenticationRequest(self.factory)
+
+ def tearDown(self):
+ self.factory.shutdown()
+
+ def test_curve_auth(self):
+ self.auth_req.start()
+ self.auth_req.allow('127.0.0.1')
+ public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX)
+ self.auth_req.configure_curve(domain="*", location=public_keys_dir)
+
+ def check(ignored):
+ authenticator = self.authenticator.authenticator
+ certs = authenticator.certs['*']
+ self.failUnlessEqual(authenticator.whitelist, set([u'127.0.0.1']))
+ self.failUnlessEqual(certs[certs.keys()[0]], True)
+
+ return _wait(0.1).addCallback(check)
diff --git a/tests/unit/events/test_events.py b/tests/unit/events/test_events.py
new file mode 100644
index 0000000..d8435c6
--- /dev/null
+++ b/tests/unit/events/test_events.py
@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+# test_events.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the events framework
+"""
+import os
+import logging
+
+from twisted.internet.reactor import callFromThread
+from twisted.trial import unittest
+from twisted.internet import defer
+
+from txzmq import ZmqFactory
+
+from leap.common.events import server
+from leap.common.events import client
+from leap.common.events import flags
+from leap.common.events import txclient
+from leap.common.events import catalog
+from leap.common.events.errors import CallbackAlreadyRegisteredError
+
+
+if 'DEBUG' in os.environ:
+ logging.basicConfig(level=logging.DEBUG)
+
+
+class EventsGenericClientTestCase(object):
+
+ def setUp(self):
+ flags.set_events_enabled(True)
+ self.factory = ZmqFactory()
+ self._server = server.ensure_server(
+ emit_addr="tcp://127.0.0.1:0",
+ reg_addr="tcp://127.0.0.1:0",
+ factory=self.factory,
+ enable_curve=False)
+
+ self._client.configure_client(
+ emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
+ reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port,
+ factory=self.factory, enable_curve=False)
+
+ def tearDown(self):
+ flags.set_events_enabled(False)
+ self.factory.shutdown()
+ self._client.instance().reset()
+
+ def test_client_register(self):
+ """
+ Ensure clients can register callbacks.
+ """
+ callbacks = self._client.instance().callbacks
+ self.assertTrue(len(callbacks) == 0,
+ 'There should be no callback for this event.')
+ # register one event
+ event1 = catalog.CLIENT_UID
+
+ def cbk1(event, _):
+ return True
+
+ uid1 = self._client.register(event1, cbk1)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 1)
+ self.assertTrue(callbacks[event1][uid1] == cbk1,
+ 'Could not register event in local client.')
+ # register another event
+ event2 = catalog.CLIENT_SESSION_ID
+
+ def cbk2(event, _):
+ return True
+
+ uid2 = self._client.register(event2, cbk2)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 2)
+ self.assertTrue(callbacks[event2][uid2] == cbk2,
+ 'Could not register event in local client.')
+
+ def test_register_signal_replace(self):
+ """
+ Make sure clients can replace already registered callbacks.
+ """
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+
+ def cbk_fail(event, _):
+ return callFromThread(d.errback, event)
+
+ def cbk_succeed(event, _):
+ return callFromThread(d.callback, event)
+
+ self._client.register(event, cbk_fail, uid=1)
+ self._client.register(event, cbk_succeed, uid=1, replace=True)
+ self._client.emit(event, None)
+ return d
+
+ def test_register_signal_replace_fails_when_replace_is_false(self):
+ """
+ Make sure clients trying to replace already registered callbacks fail
+ when replace=False
+ """
+ event = catalog.CLIENT_UID
+ self._client.register(event, lambda event, _: None, uid=1)
+ self.assertRaises(
+ CallbackAlreadyRegisteredError,
+ self._client.register,
+ event, lambda event, _: None, uid=1, replace=False)
+
+ def test_register_more_than_one_callback_works(self):
+ """
+ Make sure clients can replace already registered callbacks.
+ """
+ event = catalog.CLIENT_UID
+ d1 = defer.Deferred()
+
+ def cbk1(event, _):
+ return callFromThread(d1.callback, event)
+
+ d2 = defer.Deferred()
+
+ def cbk2(event, _):
+ return d2.callback(event)
+
+ self._client.register(event, cbk1)
+ self._client.register(event, cbk2)
+ self._client.emit(event, None)
+ d = defer.gatherResults([d1, d2])
+ return d
+
+ def test_client_receives_signal(self):
+ """
+ Ensure clients can receive signals.
+ """
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+
+ def cbk(events, _):
+ callFromThread(d.callback, event)
+
+ self._client.register(event, cbk)
+ self._client.emit(event, None)
+ return d
+
+ def test_client_unregister_all(self):
+ """
+ Test that the client can unregister all events for one signal.
+ """
+ event1 = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register more than one callback for the same event
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ # unregister and emit the event
+ self._client.unregister(event1)
+ self._client.emit(event1, None)
+ # register and emit another event so the deferred can succeed
+ event2 = catalog.CLIENT_SESSION_ID
+ self._client.register(
+ event2, lambda ev, _: callFromThread(d.callback, None))
+ self._client.emit(event2, None)
+ return d
+
+ def test_client_unregister_by_uid(self):
+ """
+ Test that the client can unregister an event by uid.
+ """
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register one callback that would fail
+ uid = self._client.register(
+ event, lambda ev, _: callFromThread(d.errback, None))
+ # register one callback that will succeed
+ self._client.register(
+ event, lambda ev, _: callFromThread(d.callback, None))
+ # unregister by uid and emit the event
+ self._client.unregister(event, uid=uid)
+ self._client.emit(event, None)
+ return d
+
+
+class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
+
+ _client = txclient
+
+
+class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
+
+ _client = client
diff --git a/tests/unit/events/test_zmq_components.py b/tests/unit/events/test_zmq_components.py
new file mode 100644
index 0000000..c51e37e
--- /dev/null
+++ b/tests/unit/events/test_zmq_components.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# test_zmq_components.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the zmq_components module.
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common.events import zmq_components
+
+
+class AddrParseTestCase(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_addr_parsing(self):
+ addr_re = zmq_components.ADDRESS_RE
+
+ self.assertEqual(
+ addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(),
+ ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None))
+ self.assertEqual(
+ addr_re.search("tcp://localhost:9000").groups(),
+ ("tcp", "localhost", "9000"))
+ self.assertEqual(
+ addr_re.search("tcp://127.0.0.1:9000").groups(),
+ ("tcp", "127.0.0.1", "9000"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/unit/test_certs.py b/tests/unit/test_certs.py
new file mode 100644
index 0000000..b06fbf8
--- /dev/null
+++ b/tests/unit/test_certs.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+# test_certs.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/certs.py
+"""
+import time
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import certs
+from leap.common.testing.basetest import BaseLeapTest
+from leap.common.testing.https_server import where
+
+TEST_CERT_PEM = where("leaptest_combined_keycert.pem")
+
+# Values from the test cert file:
+# Not Before: Sep 3 17:52:16 2013 GMT
+# Not After : Sep 1 17:52:16 2023 GMT
+CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0)
+CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0)
+
+
+class CertsTest(BaseLeapTest):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_should_redownload_if_no_cert(self):
+ self.assertTrue(certs.should_redownload(certfile=""))
+
+ def test_should_redownload_if_invalid_pem(self):
+ cert_path = self.get_tempfile('test_pem_file.pem')
+
+ with open(cert_path, 'w') as f:
+ f.write('this is some invalid data for the pem file')
+
+ self.assertTrue(certs.should_redownload(cert_path))
+
+ def test_should_redownload_if_before(self):
+ def new_now():
+ time.struct_time(CERT_NOT_BEFORE)
+ self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
+
+ def test_should_redownload_if_after(self):
+ def new_now():
+ time.struct_time(CERT_NOT_AFTER)
+ self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
+
+ def test_not_should_redownload(self):
+ self.assertFalse(certs.should_redownload(TEST_CERT_PEM))
+
+ def test_is_valid_pemfile(self):
+ with open(TEST_CERT_PEM) as f:
+ cert_data = f.read()
+
+ self.assertTrue(certs.is_valid_pemfile(cert_data))
+
+ def test_not_is_valid_pemfile(self):
+ cert_data = 'this is some invalid data for the pem file'
+
+ self.assertFalse(certs.is_valid_pemfile(cert_data))
+
+ def test_get_cert_time_boundaries(self):
+ """
+ This test ensures us that the returned values are returned in UTC/GMT.
+ """
+ with open(TEST_CERT_PEM) as f:
+ cert_data = f.read()
+
+ valid_from, valid_to = certs.get_cert_time_boundaries(cert_data)
+ self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE)
+ self.assertEqual(tuple(valid_to), CERT_NOT_AFTER)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/unit/test_check.py b/tests/unit/test_check.py
new file mode 100644
index 0000000..cd488ff
--- /dev/null
+++ b/tests/unit/test_check.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+# test_check.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/check.py
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+import mock
+
+from leap.common import check
+
+
+class CheckTests(unittest.TestCase):
+ def test_raises_on_false_condition(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert(False, "Condition")
+
+ def test_raises_on_none_condition(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert(None, "Condition")
+
+ def test_suceeds_with_good_condition(self):
+ check.leap_assert(True, "")
+
+ def test_raises_on_bad_type(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert_type(42, str)
+
+ def test_succeeds_on_good_type(self):
+ check.leap_assert_type(42, int)
+
+ @mock.patch("traceback.extract_stack", mock.MagicMock(return_value=None))
+ def test_does_not_raise_on_bug(self):
+ with self.assertRaises(AssertionError):
+ check.leap_assert(False, "")
diff --git a/tests/unit/test_http.py b/tests/unit/test_http.py
new file mode 100644
index 0000000..d5526e6
--- /dev/null
+++ b/tests/unit/test_http.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+# test_http.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/http.py
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import http
+from leap.common.testing.basetest import BaseLeapTest
+from leap.common.testing.https_server import where
+
+TEST_CERT_PEM = where("leaptest_combined_keycert.pem")
+
+
+class HTTPClientTest(BaseLeapTest):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_agents_sharing_pool_by_default(self):
+ client = http.HTTPClient()
+ client2 = http.HTTPClient(TEST_CERT_PEM)
+ self.assertNotEquals(
+ client._agent, client2._agent, "Expected dedicated agents")
+ self.assertEquals(
+ client._agent._pool, client2._agent._pool,
+ "Pool was not reused by default")
+
+ def test_agent_can_have_dedicated_custom_pool(self):
+ custom_pool = http._HTTPConnectionPool(
+ None,
+ timeout=10,
+ maxPersistentPerHost=42,
+ persistent=False
+ )
+ self.assertEquals(custom_pool.maxPersistentPerHost, 42,
+ "Custom persistent connections "
+ "limit is not being respected")
+ self.assertFalse(custom_pool.persistent,
+ "Custom persistence is not being respected")
+ default_client = http.HTTPClient()
+ custom_client = http.HTTPClient(pool=custom_pool)
+
+ self.assertNotEquals(
+ default_client._agent, custom_client._agent,
+ "No agent reuse is expected")
+ self.assertEquals(
+ custom_pool, custom_client._agent._pool,
+ "Custom pool usage was not respected")
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/unit/test_memoize.py b/tests/unit/test_memoize.py
new file mode 100644
index 0000000..c923fc5
--- /dev/null
+++ b/tests/unit/test_memoize.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+# test_check.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/decorators._memoized
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from time import sleep
+
+import mock
+
+from leap.common.decorators import _memoized
+
+
+class MemoizeTests(unittest.TestCase):
+
+ def test_memoized_call(self):
+ """
+ Test that a memoized function only calls once.
+ """
+ witness = mock.Mock()
+
+ @_memoized
+ def callmebaby():
+ return witness()
+
+ for i in range(10):
+ callmebaby()
+ witness.assert_called_once_with()
+
+ def test_cache_invalidation(self):
+ """
+ Test that time makes the cache invalidation expire.
+ """
+ witness = mock.Mock()
+
+ cache_with_alzheimer = _memoized
+ cache_with_alzheimer.CACHE_INVALIDATION_DELTA = 1
+
+ @cache_with_alzheimer
+ def callmebaby(*args):
+ return witness(*args)
+
+ for i in range(10):
+ callmebaby()
+ witness.assert_called_once_with()
+
+ sleep(2)
+ callmebaby("onemoretime")
+
+ expected = [mock.call(), mock.call("onemoretime")]
+ self.assertEqual(
+ witness.call_args_list,
+ expected)
+
+
+if __name__ == "__main__":
+ unittest.main()