summaryrefslogtreecommitdiff
path: root/src/leap/services/eip/tests
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2013-08-22 14:10:50 -0400
committerMicah Anderson <micah@riseup.net>2013-08-22 14:10:50 -0400
commit6bae67c3e7c66af9e8ce1605070f7e92f247f0cb (patch)
tree64dc7e2588cc5c17fdb6c801877792f83cfcecf3 /src/leap/services/eip/tests
parent00efccabb1eb6c3597282bc0e095cf2026f75a3b (diff)
parent00d98a47c60764475d97df1c2eb847e20a77cae5 (diff)
Merge tag '0.3.0' into debian
Tag bitmask version 0.3.0 Conflicts: .coveragerc .tx/config NEWS.rst setup.cfg src/leap/base/checks.py src/leap/base/exceptions.py src/leap/base/tests/test_checks.py src/leap/eip/checks.py src/leap/gui/firstrun/login.py src/leap/gui/locale_rc.py src/leap/util/polkit.py tests/test_qt_environment.py tox.ini
Diffstat (limited to 'src/leap/services/eip/tests')
-rw-r--r--src/leap/services/eip/tests/__init__.py0
-rw-r--r--src/leap/services/eip/tests/test_eipbootstrapper.py347
-rw-r--r--src/leap/services/eip/tests/test_eipconfig.py324
-rw-r--r--src/leap/services/eip/tests/test_providerbootstrapper.py531
-rw-r--r--src/leap/services/eip/tests/test_vpngatewayselector.py131
-rw-r--r--src/leap/services/eip/tests/wrongcert.pem33
6 files changed, 1366 insertions, 0 deletions
diff --git a/src/leap/services/eip/tests/__init__.py b/src/leap/services/eip/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/services/eip/tests/__init__.py
diff --git a/src/leap/services/eip/tests/test_eipbootstrapper.py b/src/leap/services/eip/tests/test_eipbootstrapper.py
new file mode 100644
index 00000000..f2331eca
--- /dev/null
+++ b/src/leap/services/eip/tests/test_eipbootstrapper.py
@@ -0,0 +1,347 @@
+# -*- coding: utf-8 -*-
+# test_eipbootstrapper.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Tests for the EIP Boostrapper checks
+
+These will be whitebox tests since we want to make sure the private
+implementation is checking what we expect.
+"""
+
+import os
+import mock
+import tempfile
+import time
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from nose.twistedtools import deferred, reactor
+from twisted.internet import threads
+from requests.models import Response
+
+from leap.common.testing.basetest import BaseLeapTest
+from leap.services.eip.eipbootstrapper import EIPBootstrapper
+from leap.services.eip.eipconfig import EIPConfig
+from leap.config.providerconfig import ProviderConfig
+from leap.crypto.tests import fake_provider
+from leap.common.files import mkdir_p
+from leap.crypto.srpauth import SRPAuth
+
+
+class EIPBootstrapperActiveTest(BaseLeapTest):
+ @classmethod
+ def setUpClass(cls):
+ BaseLeapTest.setUpClass()
+ factory = fake_provider.get_provider_factory()
+ http = reactor.listenTCP(0, factory)
+ https = reactor.listenSSL(
+ 0, factory,
+ fake_provider.OpenSSLServerContextFactory())
+ get_port = lambda p: p.getHost().port
+ cls.http_port = get_port(http)
+ cls.https_port = get_port(https)
+
+ def setUp(self):
+ self.eb = EIPBootstrapper()
+ self.old_pp = EIPConfig.get_path_prefix
+ self.old_save = EIPConfig.save
+ self.old_load = EIPConfig.load
+ self.old_si = SRPAuth.get_session_id
+
+ def tearDown(self):
+ EIPConfig.get_path_prefix = self.old_pp
+ EIPConfig.save = self.old_save
+ EIPConfig.load = self.old_load
+ SRPAuth.get_session_id = self.old_si
+
+ def _download_config_test_template(self, ifneeded, new):
+ """
+ All download config tests have the same structure, so this is
+ a parametrized test for that.
+
+ :param ifneeded: sets _download_if_needed
+ :type ifneeded: bool
+ :param new: if True uses time.time() as mtime for the mocked
+ eip-service file, otherwise it uses 100 (a really
+ old mtime)
+ :type new: float or int (will be coersed)
+ """
+ pc = ProviderConfig()
+ pc.get_domain = mock.MagicMock(
+ return_value="localhost:%s" % (self.https_port))
+ self.eb._provider_config = pc
+
+ pc.get_api_uri = mock.MagicMock(
+ return_value="https://%s" % (pc.get_domain()))
+ pc.get_api_version = mock.MagicMock(return_value="1")
+
+ # This is to ignore https checking, since it's not the point
+ # of this test
+ pc.get_ca_cert_path = mock.MagicMock(return_value=False)
+
+ path_prefix = tempfile.mkdtemp()
+ EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix)
+ EIPConfig.save = mock.MagicMock()
+ EIPConfig.load = mock.MagicMock()
+
+ self.eb._download_if_needed = ifneeded
+
+ provider_dir = os.path.join(EIPConfig.get_path_prefix(),
+ "leap",
+ "providers",
+ pc.get_domain())
+ mkdir_p(provider_dir)
+ eip_config_path = os.path.join(provider_dir,
+ "eip-service.json")
+
+ with open(eip_config_path, "w") as ec:
+ ec.write("A")
+
+ # set mtime to something really new
+ if new:
+ os.utime(eip_config_path, (-1, time.time()))
+ else:
+ os.utime(eip_config_path, (-1, 100))
+
+ @deferred()
+ def test_download_config_not_modified(self):
+ self._download_config_test_template(True, True)
+
+ d = threads.deferToThread(self.eb._download_config)
+
+ def check(*args):
+ self.assertFalse(self.eb._eip_config.save.called)
+ d.addCallback(check)
+ return d
+
+ @deferred()
+ def test_download_config_modified(self):
+ self._download_config_test_template(True, False)
+
+ d = threads.deferToThread(self.eb._download_config)
+
+ def check(*args):
+ self.assertTrue(self.eb._eip_config.save.called)
+ d.addCallback(check)
+ return d
+
+ @deferred()
+ def test_download_config_ignores_mtime(self):
+ self._download_config_test_template(False, True)
+
+ d = threads.deferToThread(self.eb._download_config)
+
+ def check(*args):
+ self.eb._eip_config.save.assert_called_once_with(
+ ["leap",
+ "providers",
+ self.eb._provider_config.get_domain(),
+ "eip-service.json"])
+ d.addCallback(check)
+ return d
+
+ def _download_certificate_test_template(self, ifneeded, createcert):
+ """
+ All download client certificate tests have the same structure,
+ so this is a parametrized test for that.
+
+ :param ifneeded: sets _download_if_needed
+ :type ifneeded: bool
+ :param createcert: if True it creates a dummy file to play the
+ part of a downloaded certificate
+ :type createcert: bool
+
+ :returns: the temp eip cert path and the dummy cert contents
+ :rtype: tuple of str, str
+ """
+ pc = ProviderConfig()
+ ec = EIPConfig()
+ self.eb._provider_config = pc
+ self.eb._eip_config = ec
+
+ pc.get_domain = mock.MagicMock(
+ return_value="localhost:%s" % (self.https_port))
+ pc.get_api_uri = mock.MagicMock(
+ return_value="https://%s" % (pc.get_domain()))
+ pc.get_api_version = mock.MagicMock(return_value="1")
+ pc.get_ca_cert_path = mock.MagicMock(return_value=False)
+
+ path_prefix = tempfile.mkdtemp()
+ EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix)
+ EIPConfig.save = mock.MagicMock()
+ EIPConfig.load = mock.MagicMock()
+
+ self.eb._download_if_needed = ifneeded
+
+ provider_dir = os.path.join(EIPConfig.get_path_prefix(),
+ "leap",
+ "providers",
+ "somedomain")
+ mkdir_p(provider_dir)
+ eip_cert_path = os.path.join(provider_dir,
+ "cert")
+
+ ec.get_client_cert_path = mock.MagicMock(
+ return_value=eip_cert_path)
+
+ cert_content = "A"
+ if createcert:
+ with open(eip_cert_path, "w") as ec:
+ ec.write(cert_content)
+
+ return eip_cert_path, cert_content
+
+ def test_download_client_certificate_not_modified(self):
+ cert_path, old_cert_content = self._download_certificate_test_template(
+ True, True)
+
+ with mock.patch('leap.common.certs.should_redownload',
+ new_callable=mock.MagicMock,
+ return_value=False):
+ self.eb._download_client_certificates()
+ with open(cert_path, "r") as c:
+ self.assertEqual(c.read(), old_cert_content)
+
+ @deferred()
+ def test_download_client_certificate_old_cert(self):
+ cert_path, old_cert_content = self._download_certificate_test_template(
+ True, True)
+
+ def wrapper(*args):
+ with mock.patch('leap.common.certs.should_redownload',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ with mock.patch('leap.common.certs.is_valid_pemfile',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ self.eb._download_client_certificates()
+
+ def check(*args):
+ with open(cert_path, "r") as c:
+ self.assertNotEqual(c.read(), old_cert_content)
+ d = threads.deferToThread(wrapper)
+ d.addCallback(check)
+
+ return d
+
+ @deferred()
+ def test_download_client_certificate_no_cert(self):
+ cert_path, _ = self._download_certificate_test_template(
+ True, False)
+
+ def wrapper(*args):
+ with mock.patch('leap.common.certs.should_redownload',
+ new_callable=mock.MagicMock,
+ return_value=False):
+ with mock.patch('leap.common.certs.is_valid_pemfile',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ self.eb._download_client_certificates()
+
+ def check(*args):
+ self.assertTrue(os.path.exists(cert_path))
+ d = threads.deferToThread(wrapper)
+ d.addCallback(check)
+
+ return d
+
+ @deferred()
+ def test_download_client_certificate_force_not_valid(self):
+ cert_path, old_cert_content = self._download_certificate_test_template(
+ True, True)
+
+ def wrapper(*args):
+ with mock.patch('leap.common.certs.should_redownload',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ with mock.patch('leap.common.certs.is_valid_pemfile',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ self.eb._download_client_certificates()
+
+ def check(*args):
+ with open(cert_path, "r") as c:
+ self.assertNotEqual(c.read(), old_cert_content)
+ d = threads.deferToThread(wrapper)
+ d.addCallback(check)
+
+ return d
+
+ @deferred()
+ def test_download_client_certificate_invalid_download(self):
+ cert_path, _ = self._download_certificate_test_template(
+ False, False)
+
+ def wrapper(*args):
+ with mock.patch('leap.common.certs.should_redownload',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ with mock.patch('leap.common.certs.is_valid_pemfile',
+ new_callable=mock.MagicMock,
+ return_value=False):
+ with self.assertRaises(Exception):
+ self.eb._download_client_certificates()
+ d = threads.deferToThread(wrapper)
+
+ return d
+
+ @deferred()
+ def test_download_client_certificate_uses_session_id(self):
+ _, _ = self._download_certificate_test_template(
+ False, False)
+
+ SRPAuth.get_session_id = mock.MagicMock(return_value="1")
+
+ def check_cookie(*args, **kwargs):
+ cookies = kwargs.get("cookies", None)
+ self.assertEqual(cookies, {'_session_id': '1'})
+ return Response()
+
+ def wrapper(*args):
+ with mock.patch('leap.common.certs.should_redownload',
+ new_callable=mock.MagicMock,
+ return_value=False):
+ with mock.patch('leap.common.certs.is_valid_pemfile',
+ new_callable=mock.MagicMock,
+ return_value=True):
+ with mock.patch('requests.sessions.Session.get',
+ new_callable=mock.MagicMock,
+ side_effect=check_cookie):
+ with mock.patch('requests.models.Response.content',
+ new_callable=mock.PropertyMock,
+ return_value="A"):
+ self.eb._download_client_certificates()
+
+ d = threads.deferToThread(wrapper)
+
+ return d
+
+ @deferred()
+ def test_run_eip_setup_checks(self):
+ self.eb._download_config = mock.MagicMock()
+ self.eb._download_client_certificates = mock.MagicMock()
+
+ d = self.eb.run_eip_setup_checks(ProviderConfig())
+
+ def check(*args):
+ self.eb._download_config.assert_called_once_with()
+ self.eb._download_client_certificates.assert_called_once_with(None)
+ d.addCallback(check)
+ return d
diff --git a/src/leap/services/eip/tests/test_eipconfig.py b/src/leap/services/eip/tests/test_eipconfig.py
new file mode 100644
index 00000000..87ce04c2
--- /dev/null
+++ b/src/leap/services/eip/tests/test_eipconfig.py
@@ -0,0 +1,324 @@
+# -*- coding: utf-8 -*-
+# test_eipconfig.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for eipconfig
+"""
+import copy
+import json
+import os
+import unittest
+
+from leap.common.testing.basetest import BaseLeapTest
+from leap.services.eip.eipconfig import EIPConfig
+from leap.config.providerconfig import ProviderConfig
+
+from mock import Mock
+
+
+sample_config = {
+ "gateways": [
+ {
+ "capabilities": {
+ "adblock": False,
+ "filter_dns": True,
+ "limited": True,
+ "ports": [
+ "1194",
+ "443",
+ "53",
+ "80"],
+ "protocols": [
+ "tcp",
+ "udp"],
+ "transport": ["openvpn"],
+ "user_ips": False},
+ "host": "host.dev.example.org",
+ "ip_address": "11.22.33.44",
+ "location": "cyberspace"
+ }, {
+ "capabilities": {
+ "adblock": False,
+ "filter_dns": True,
+ "limited": True,
+ "ports": [
+ "1194",
+ "443",
+ "53",
+ "80"],
+ "protocols": [
+ "tcp",
+ "udp"],
+ "transport": ["openvpn"],
+ "user_ips": False},
+ "host": "host2.dev.example.org",
+ "ip_address": "22.33.44.55",
+ "location": "cyberspace"
+ }
+ ],
+ "locations": {
+ "ankara": {
+ "country_code": "XX",
+ "hemisphere": "S",
+ "name": "Antarctica",
+ "timezone": "+2"
+ },
+ "cyberspace": {
+ "country_code": "XX",
+ "hemisphere": "X",
+ "name": "outer space",
+ "timezone": ""
+ }
+ },
+ "openvpn_configuration": {
+ "auth": "SHA1",
+ "cipher": "AES-128-CBC",
+ "tls-cipher": "DHE-RSA-AES128-SHA"
+ },
+ "serial": 1,
+ "version": 1
+}
+
+
+class EIPConfigTest(BaseLeapTest):
+
+ __name__ = "eip_config_tests"
+
+ maxDiff = None
+
+ def setUp(self):
+ self._old_ospath_exists = os.path.exists
+
+ def tearDown(self):
+ os.path.exists = self._old_ospath_exists
+
+ def _write_config(self, data):
+ """
+ Helper to write some data to a temp config file.
+
+ :param data: data to be used to save in the config file.
+ :data type: dict (valid json)
+ """
+ self.configfile = os.path.join(self.tempdir, "eipconfig.json")
+ conf = open(self.configfile, "w")
+ conf.write(json.dumps(data))
+ conf.close()
+
+ def _get_eipconfig(self, fromfile=True, data=sample_config, api_ver='1'):
+ """
+ Helper that returns an EIPConfig object using the data parameter
+ or a sample data.
+
+ :param fromfile: sets if we should use a file or a string
+ :type fromfile: bool
+ :param data: sets the data to be used to load in the EIPConfig object
+ :type data: dict (valid json)
+ :param api_ver: the api_version schema to use.
+ :type api_ver: str
+ :rtype: EIPConfig
+ """
+ config = EIPConfig()
+ config.set_api_version(api_ver)
+
+ loaded = False
+ if fromfile:
+ self._write_config(data)
+ loaded = config.load(self.configfile, relative=False)
+ else:
+ json_string = json.dumps(data)
+ loaded = config.load(data=json_string)
+
+ if not loaded:
+ return None
+
+ return config
+
+ def test_loads_from_file(self):
+ config = self._get_eipconfig()
+ self.assertIsNotNone(config)
+
+ def test_loads_from_data(self):
+ config = self._get_eipconfig(fromfile=False)
+ self.assertIsNotNone(config)
+
+ def test_load_valid_config_from_file(self):
+ config = self._get_eipconfig()
+ self.assertIsNotNone(config)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ sample_ip = sample_config["gateways"][0]["ip_address"]
+ self.assertEqual(
+ config.get_gateway_ip(),
+ sample_ip)
+ self.assertEqual(config.get_version(), sample_config["version"])
+ self.assertEqual(config.get_serial(), sample_config["serial"])
+ self.assertEqual(config.get_gateways(), sample_config["gateways"])
+ self.assertEqual(config.get_locations(), sample_config["locations"])
+ self.assertEqual(config.get_clusters(), None)
+
+ def test_load_valid_config_from_data(self):
+ config = self._get_eipconfig(fromfile=False)
+ self.assertIsNotNone(config)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ sample_ip = sample_config["gateways"][0]["ip_address"]
+ self.assertEqual(
+ config.get_gateway_ip(),
+ sample_ip)
+
+ self.assertEqual(config.get_version(), sample_config["version"])
+ self.assertEqual(config.get_serial(), sample_config["serial"])
+ self.assertEqual(config.get_gateways(), sample_config["gateways"])
+ self.assertEqual(config.get_locations(), sample_config["locations"])
+ self.assertEqual(config.get_clusters(), None)
+
+ def test_sanitize_extra_parameters(self):
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["extra_param"] = "FOO"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ def test_sanitize_non_allowed_chars(self):
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "SHA1;"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "SHA1>`&|"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ def test_sanitize_lowercase(self):
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "shaSHA1"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ sample_config["openvpn_configuration"])
+
+ def test_all_characters_invalid(self):
+ data = copy.deepcopy(sample_config)
+ data['openvpn_configuration']["auth"] = "sha&*!@#;"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(
+ config.get_openvpn_configuration(),
+ {'cipher': 'AES-128-CBC',
+ 'tls-cipher': 'DHE-RSA-AES128-SHA'})
+
+ def test_sanitize_bad_ip(self):
+ data = copy.deepcopy(sample_config)
+ data['gateways'][0]["ip_address"] = "11.22.33.44;"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(config.get_gateway_ip(), None)
+
+ data = copy.deepcopy(sample_config)
+ data['gateways'][0]["ip_address"] = "11.22.33.44`"
+ config = self._get_eipconfig(data=data)
+
+ self.assertEqual(config.get_gateway_ip(), None)
+
+ def test_default_gateway_on_unknown_index(self):
+ config = self._get_eipconfig()
+ sample_ip = sample_config["gateways"][0]["ip_address"]
+ self.assertEqual(config.get_gateway_ip(999), sample_ip)
+
+ def test_get_gateway_by_index(self):
+ config = self._get_eipconfig()
+ sample_ip_0 = sample_config["gateways"][0]["ip_address"]
+ sample_ip_1 = sample_config["gateways"][1]["ip_address"]
+ self.assertEqual(config.get_gateway_ip(0), sample_ip_0)
+ self.assertEqual(config.get_gateway_ip(1), sample_ip_1)
+
+ def test_get_client_cert_path_as_expected(self):
+ config = self._get_eipconfig()
+ config.get_path_prefix = Mock(return_value='test')
+
+ provider_config = ProviderConfig()
+
+ # mock 'get_domain' so we don't need to load a config
+ provider_domain = 'test.provider.com'
+ provider_config.get_domain = Mock(return_value=provider_domain)
+
+ expected_path = os.path.join('test', 'leap', 'providers',
+ provider_domain, 'keys', 'client',
+ 'openvpn.pem')
+
+ # mock 'os.path.exists' so we don't get an error for unexisting file
+ os.path.exists = Mock(return_value=True)
+ cert_path = config.get_client_cert_path(provider_config)
+
+ self.assertEqual(cert_path, expected_path)
+
+ def test_get_client_cert_path_about_to_download(self):
+ config = self._get_eipconfig()
+ config.get_path_prefix = Mock(return_value='test')
+
+ provider_config = ProviderConfig()
+
+ # mock 'get_domain' so we don't need to load a config
+ provider_domain = 'test.provider.com'
+ provider_config.get_domain = Mock(return_value=provider_domain)
+
+ expected_path = os.path.join('test', 'leap', 'providers',
+ provider_domain, 'keys', 'client',
+ 'openvpn.pem')
+
+ cert_path = config.get_client_cert_path(
+ provider_config, about_to_download=True)
+
+ self.assertEqual(cert_path, expected_path)
+
+ def test_get_client_cert_path_fails(self):
+ config = self._get_eipconfig()
+ provider_config = ProviderConfig()
+
+ # mock 'get_domain' so we don't need to load a config
+ provider_domain = 'test.provider.com'
+ provider_config.get_domain = Mock(return_value=provider_domain)
+
+ with self.assertRaises(AssertionError):
+ config.get_client_cert_path(provider_config)
+
+ def test_fails_without_api_set(self):
+ config = EIPConfig()
+ with self.assertRaises(AssertionError):
+ config.load('non-relevant-path')
+
+ def test_fails_with_api_without_schema(self):
+ with self.assertRaises(AssertionError):
+ self._get_eipconfig(api_ver='123')
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/services/eip/tests/test_providerbootstrapper.py b/src/leap/services/eip/tests/test_providerbootstrapper.py
new file mode 100644
index 00000000..b24334a2
--- /dev/null
+++ b/src/leap/services/eip/tests/test_providerbootstrapper.py
@@ -0,0 +1,531 @@
+# -*- coding: utf-8 -*-
+# test_providerbootstrapper.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Tests for the Provider Boostrapper checks
+
+These will be whitebox tests since we want to make sure the private
+implementation is checking what we expect.
+"""
+
+import os
+import mock
+import socket
+import stat
+import tempfile
+import time
+import requests
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from nose.twistedtools import deferred, reactor
+from twisted.internet import threads
+from requests.models import Response
+
+from leap.common.testing.https_server import where
+from leap.common.testing.basetest import BaseLeapTest
+from leap.services.eip.providerbootstrapper import ProviderBootstrapper
+from leap.services.eip.providerbootstrapper import UnsupportedProviderAPI
+from leap.services.eip.providerbootstrapper import WrongFingerprint
+from leap.provider.supportedapis import SupportedAPIs
+from leap.config.providerconfig import ProviderConfig
+from leap.crypto.tests import fake_provider
+from leap.common.files import mkdir_p
+
+
+class ProviderBootstrapperTest(BaseLeapTest):
+ def setUp(self):
+ self.pb = ProviderBootstrapper()
+
+ def tearDown(self):
+ pass
+
+ def test_name_resolution_check(self):
+ # Something highly likely to success
+ self.pb._domain = "google.com"
+ self.pb._check_name_resolution()
+ # Something highly likely to fail
+ self.pb._domain = "uquhqweuihowquie.abc.def"
+ with self.assertRaises(socket.gaierror):
+ self.pb._check_name_resolution()
+
+ @deferred()
+ def test_run_provider_select_checks(self):
+ self.pb._check_name_resolution = mock.MagicMock()
+ self.pb._check_https = mock.MagicMock()
+ self.pb._download_provider_info = mock.MagicMock()
+
+ d = self.pb.run_provider_select_checks("somedomain")
+
+ def check(*args):
+ self.pb._check_name_resolution.assert_called_once_with()
+ self.pb._check_https.assert_called_once_with(None)
+ self.pb._download_provider_info.assert_called_once_with(None)
+ d.addCallback(check)
+ return d
+
+ @deferred()
+ def test_run_provider_setup_checks(self):
+ self.pb._download_ca_cert = mock.MagicMock()
+ self.pb._check_ca_fingerprint = mock.MagicMock()
+ self.pb._check_api_certificate = mock.MagicMock()
+
+ d = self.pb.run_provider_setup_checks(ProviderConfig())
+
+ def check(*args):
+ self.pb._download_ca_cert.assert_called_once_with()
+ self.pb._check_ca_fingerprint.assert_called_once_with(None)
+ self.pb._check_api_certificate.assert_called_once_with(None)
+ d.addCallback(check)
+ return d
+
+ def test_should_proceed_cert(self):
+ self.pb._provider_config = mock.Mock()
+ self.pb._provider_config.get_ca_cert_path = mock.MagicMock(
+ return_value=where("cacert.pem"))
+
+ self.pb._download_if_needed = False
+ self.assertTrue(self.pb._should_proceed_cert())
+
+ self.pb._download_if_needed = True
+ self.assertFalse(self.pb._should_proceed_cert())
+
+ self.pb._provider_config.get_ca_cert_path = mock.MagicMock(
+ return_value=where("somefilethatdoesntexist.pem"))
+ self.assertTrue(self.pb._should_proceed_cert())
+
+ def _check_download_ca_cert(self, should_proceed):
+ """
+ Helper to check different paths easily for the download ca
+ cert check
+
+ :param should_proceed: sets the _should_proceed_cert in the
+ provider bootstrapper being tested
+ :type should_proceed: bool
+
+ :returns: The contents of the certificate, the expected
+ content depending on should_proceed, and the mode of
+ the file to be checked by the caller
+ :rtype: tuple of str, str, int
+ """
+ old_content = "NOT THE NEW CERT"
+ new_content = "NEW CERT"
+ new_cert_path = os.path.join(tempfile.mkdtemp(),
+ "mynewcert.pem")
+
+ with open(new_cert_path, "w") as c:
+ c.write(old_content)
+
+ self.pb._provider_config = mock.Mock()
+ self.pb._provider_config.get_ca_cert_path = mock.MagicMock(
+ return_value=new_cert_path)
+ self.pb._domain = "somedomain"
+
+ self.pb._should_proceed_cert = mock.MagicMock(
+ return_value=should_proceed)
+
+ read = None
+ content_to_check = None
+ mode = None
+
+ with mock.patch('requests.models.Response.content',
+ new_callable=mock.PropertyMock) as \
+ content:
+ content.return_value = new_content
+ response_obj = Response()
+ response_obj.raise_for_status = mock.MagicMock()
+
+ self.pb._session.get = mock.MagicMock(return_value=response_obj)
+ self.pb._download_ca_cert()
+ with open(new_cert_path, "r") as nc:
+ read = nc.read()
+ if should_proceed:
+ content_to_check = new_content
+ else:
+ content_to_check = old_content
+ mode = stat.S_IMODE(os.stat(new_cert_path).st_mode)
+
+ os.unlink(new_cert_path)
+ return read, content_to_check, mode
+
+ def test_download_ca_cert_no_saving(self):
+ read, expected_read, mode = self._check_download_ca_cert(False)
+ self.assertEqual(read, expected_read)
+ self.assertEqual(mode, int("600", 8))
+
+ def test_download_ca_cert_saving(self):
+ read, expected_read, mode = self._check_download_ca_cert(True)
+ self.assertEqual(read, expected_read)
+ self.assertEqual(mode, int("600", 8))
+
+ def test_check_ca_fingerprint_skips(self):
+ self.pb._provider_config = mock.Mock()
+ self.pb._provider_config.get_ca_cert_fingerprint = mock.MagicMock(
+ return_value="")
+ self.pb._domain = "somedomain"
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=False)
+
+ self.pb._check_ca_fingerprint()
+ self.assertFalse(self.pb._provider_config.
+ get_ca_cert_fingerprint.called)
+
+ def test_check_ca_cert_fingerprint_raises_bad_format(self):
+ self.pb._provider_config = mock.Mock()
+ self.pb._provider_config.get_ca_cert_fingerprint = mock.MagicMock(
+ return_value="wrongfprformat!!")
+ self.pb._domain = "somedomain"
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=True)
+
+ with self.assertRaises(WrongFingerprint):
+ self.pb._check_ca_fingerprint()
+
+ # This two hashes different in the last byte, but that's good enough
+ # for the tests
+ KNOWN_BAD_HASH = "SHA256: 0f17c033115f6b76ff67871872303ff65034efe" \
+ "7dd1b910062ca323eb4da5c7f"
+ KNOWN_GOOD_HASH = "SHA256: 0f17c033115f6b76ff67871872303ff65034ef" \
+ "e7dd1b910062ca323eb4da5c7e"
+ KNOWN_GOOD_CERT = """
+-----BEGIN CERTIFICATE-----
+MIIFbzCCA1egAwIBAgIBATANBgkqhkiG9w0BAQ0FADBKMRgwFgYDVQQDDA9CaXRt
+YXNrIFJvb3QgQ0ExEDAOBgNVBAoMB0JpdG1hc2sxHDAaBgNVBAsME2h0dHBzOi8v
+Yml0bWFzay5uZXQwHhcNMTIxMTA2MDAwMDAwWhcNMjIxMTA2MDAwMDAwWjBKMRgw
+FgYDVQQDDA9CaXRtYXNrIFJvb3QgQ0ExEDAOBgNVBAoMB0JpdG1hc2sxHDAaBgNV
+BAsME2h0dHBzOi8vYml0bWFzay5uZXQwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw
+ggIKAoICAQC1eV4YvayaU+maJbWrD4OHo3d7S1BtDlcvkIRS1Fw3iYDjsyDkZxai
+dHp4EUasfNQ+EVtXUvtk6170EmLco6Elg8SJBQ27trE6nielPRPCfX3fQzETRfvB
+7tNvGw4Jn2YKiYoMD79kkjgyZjkJ2r/bEHUSevmR09BRp86syHZerdNGpXYhcQ84
+CA1+V+603GFIHnrP+uQDdssW93rgDNYu+exT+Wj6STfnUkugyjmPRPjL7wh0tzy+
+znCeLl4xiV3g9sjPnc7r2EQKd5uaTe3j71sDPF92KRk0SSUndREz+B1+Dbe/RGk4
+MEqGFuOzrtsgEhPIX0hplhb0Tgz/rtug+yTT7oJjBa3u20AAOQ38/M99EfdeJvc4
+lPFF1XBBLh6X9UKF72an2NuANiX6XPySnJgZ7nZ09RiYZqVwu/qt3DfvLfhboq+0
+bQvLUPXrVDr70onv5UDjpmEA/cLmaIqqrduuTkFZOym65/PfAPvpGnt7crQj/Ibl
+DEDYZQmP7AS+6zBjoOzNjUGE5r40zWAR1RSi7zliXTu+yfsjXUIhUAWmYR6J3KxB
+lfsiHBQ+8dn9kC3YrUexWoOqBiqJOAJzZh5Y1tqgzfh+2nmHSB2dsQRs7rDRRlyy
+YMbkpzL9ZsOUO2eTP1mmar6YjCN+rggYjRrX71K2SpBG6b1zZxOG+wIDAQABo2Aw
+XjAdBgNVHQ4EFgQUuYGDLL2sswnYpHHvProt1JU+D48wDgYDVR0PAQH/BAQDAgIE
+MAwGA1UdEwQFMAMBAf8wHwYDVR0jBBgwFoAUuYGDLL2sswnYpHHvProt1JU+D48w
+DQYJKoZIhvcNAQENBQADggIBADeG67vaFcbITGpi51264kHPYPEWaXUa5XYbtmBl
+cXYyB6hY5hv/YNuVGJ1gWsDmdeXEyj0j2icGQjYdHRfwhrbEri+h1EZOm1cSBDuY
+k/P5+ctHyOXx8IE79DBsZ6IL61UKIaKhqZBfLGYcWu17DVV6+LT+AKtHhOrv3TSj
+RnAcKnCbKqXLhUPXpK0eTjPYS2zQGQGIhIy9sQXVXJJJsGrPgMxna1Xw2JikBOCG
+htD/JKwt6xBmNwktH0GI/LVtVgSp82Clbn9C4eZN9E5YbVYjLkIEDhpByeC71QhX
+EIQ0ZR56bFuJA/CwValBqV/G9gscTPQqd+iETp8yrFpAVHOW+YzSFbxjTEkBte1J
+aF0vmbqdMAWLk+LEFPQRptZh0B88igtx6tV5oVd+p5IVRM49poLhuPNJGPvMj99l
+mlZ4+AeRUnbOOeAEuvpLJbel4rhwFzmUiGoeTVoPZyMevWcVFq6BMkS+jRR2w0jK
+G6b0v5XDHlcFYPOgUrtsOBFJVwbutLvxdk6q37kIFnWCd8L3kmES5q4wjyFK47Co
+Ja8zlx64jmMZPg/t3wWqkZgXZ14qnbyG5/lGsj5CwVtfDljrhN0oCWK1FZaUmW3d
+69db12/g4f6phldhxiWuGC/W6fCW5kre7nmhshcltqAJJuU47iX+DarBFiIj816e
+yV8e
+-----END CERTIFICATE-----
+"""
+
+ def _prepare_provider_config_with(self, cert_path, cert_hash):
+ """
+ Mocks the provider config to give the cert_path and cert_hash
+ specified
+
+ :param cert_path: path for the certificate
+ :type cert_path: str
+ :param cert_hash: hash for the certificate as it would appear
+ in the provider config json
+ :type cert_hash: str
+ """
+ self.pb._provider_config = mock.Mock()
+ self.pb._provider_config.get_ca_cert_fingerprint = mock.MagicMock(
+ return_value=cert_hash)
+ self.pb._provider_config.get_ca_cert_path = mock.MagicMock(
+ return_value=cert_path)
+ self.pb._domain = "somedomain"
+
+ def test_check_ca_fingerprint_checksout(self):
+ cert_path = os.path.join(tempfile.mkdtemp(),
+ "mynewcert.pem")
+
+ with open(cert_path, "w") as c:
+ c.write(self.KNOWN_GOOD_CERT)
+
+ self._prepare_provider_config_with(cert_path, self.KNOWN_GOOD_HASH)
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=True)
+
+ self.pb._check_ca_fingerprint()
+
+ os.unlink(cert_path)
+
+ def test_check_ca_fingerprint_fails(self):
+ cert_path = os.path.join(tempfile.mkdtemp(),
+ "mynewcert.pem")
+
+ with open(cert_path, "w") as c:
+ c.write(self.KNOWN_GOOD_CERT)
+
+ self._prepare_provider_config_with(cert_path, self.KNOWN_BAD_HASH)
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=True)
+
+ with self.assertRaises(WrongFingerprint):
+ self.pb._check_ca_fingerprint()
+
+ os.unlink(cert_path)
+
+
+###############################################################################
+# Tests with a fake provider #
+###############################################################################
+
+class ProviderBootstrapperActiveTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ factory = fake_provider.get_provider_factory()
+ http = reactor.listenTCP(8002, factory)
+ https = reactor.listenSSL(
+ 0, factory,
+ fake_provider.OpenSSLServerContextFactory())
+ get_port = lambda p: p.getHost().port
+ cls.http_port = get_port(http)
+ cls.https_port = get_port(https)
+
+ def setUp(self):
+ self.pb = ProviderBootstrapper()
+
+ # At certain points we are going to be replacing these methods
+ # directly in ProviderConfig to be able to catch calls from
+ # new ProviderConfig objects inside the methods tested. We
+ # need to save the old implementation and restore it in
+ # tearDown so we are sure everything is as expected for each
+ # test. If we do it inside each specific test, a failure in
+ # the test will leave the implementation with the mock.
+ self.old_gpp = ProviderConfig.get_path_prefix
+ self.old_load = ProviderConfig.load
+ self.old_save = ProviderConfig.save
+ self.old_api_version = ProviderConfig.get_api_version
+
+ def tearDown(self):
+ ProviderConfig.get_path_prefix = self.old_gpp
+ ProviderConfig.load = self.old_load
+ ProviderConfig.save = self.old_save
+ ProviderConfig.get_api_version = self.old_api_version
+
+ def test_check_https_succeeds(self):
+ # XXX: Need a proper CA signed cert to test this
+ pass
+
+ @deferred()
+ def test_check_https_fails(self):
+ self.pb._domain = "localhost:%s" % (self.https_port,)
+
+ def check(*args):
+ with self.assertRaises(requests.exceptions.SSLError):
+ self.pb._check_https()
+ return threads.deferToThread(check)
+
+ @deferred()
+ def test_second_check_https_fails(self):
+ self.pb._domain = "localhost:1234"
+
+ def check(*args):
+ with self.assertRaises(Exception):
+ self.pb._check_https()
+ return threads.deferToThread(check)
+
+ @deferred()
+ def test_check_https_succeeds_if_danger(self):
+ self.pb._domain = "localhost:%s" % (self.https_port,)
+ self.pb._bypass_checks = True
+
+ def check(*args):
+ self.pb._check_https()
+
+ return threads.deferToThread(check)
+
+ def _setup_provider_config_with(self, api, path_prefix):
+ """
+ Sets up the ProviderConfig with mocks for the path prefix, the
+ api returned and load/save methods.
+ It modifies ProviderConfig directly instead of an object
+ because the object used is created in the method itself and we
+ cannot control that.
+
+ :param api: API to return
+ :type api: str
+ :param path_prefix: path prefix to be used when calculating
+ paths
+ :type path_prefix: str
+ """
+ ProviderConfig.get_path_prefix = mock.MagicMock(
+ return_value=path_prefix)
+ ProviderConfig.get_api_version = mock.MagicMock(
+ return_value=api)
+ ProviderConfig.load = mock.MagicMock()
+ ProviderConfig.save = mock.MagicMock()
+
+ def _setup_providerbootstrapper(self, ifneeded):
+ """
+ Sets the provider bootstrapper's domain to
+ localhost:https_port, sets it to bypass https checks and sets
+ the download if needed based on the ifneeded value.
+
+ :param ifneeded: Value for _download_if_needed
+ :type ifneeded: bool
+ """
+ self.pb._domain = "localhost:%s" % (self.https_port,)
+ self.pb._bypass_checks = True
+ self.pb._download_if_needed = ifneeded
+
+ def _produce_dummy_provider_json(self):
+ """
+ Creates a dummy provider json on disk in order to test
+ behaviour around it (download if newer online, etc)
+
+ :returns: the provider.json path used
+ :rtype: str
+ """
+ provider_dir = os.path.join(ProviderConfig()
+ .get_path_prefix(),
+ "leap",
+ "providers",
+ self.pb._domain)
+ mkdir_p(provider_dir)
+ provider_path = os.path.join(provider_dir,
+ "provider.json")
+
+ with open(provider_path, "w") as p:
+ p.write("A")
+ return provider_path
+
+ def test_download_provider_info_new_provider(self):
+ self._setup_provider_config_with("1", tempfile.mkdtemp())
+ self._setup_providerbootstrapper(True)
+
+ self.pb._download_provider_info()
+ self.assertTrue(ProviderConfig.save.called)
+
+ @mock.patch('leap.config.providerconfig.ProviderConfig.get_ca_cert_path',
+ lambda x: where('cacert.pem'))
+ def test_download_provider_info_not_modified(self):
+ self._setup_provider_config_with("1", tempfile.mkdtemp())
+ self._setup_providerbootstrapper(True)
+ provider_path = self._produce_dummy_provider_json()
+
+ # set mtime to something really new
+ os.utime(provider_path, (-1, time.time()))
+
+ with mock.patch.object(
+ ProviderConfig, 'get_api_uri',
+ return_value="https://localhost:%s" % (self.https_port,)):
+ self.pb._download_provider_info()
+ # we check that it doesn't save the provider
+ # config, because it's new enough
+ self.assertFalse(ProviderConfig.save.called)
+
+ @mock.patch('leap.config.providerconfig.ProviderConfig.get_ca_cert_path',
+ lambda x: where('cacert.pem'))
+ def test_download_provider_info_modified(self):
+ self._setup_provider_config_with("1", tempfile.mkdtemp())
+ self._setup_providerbootstrapper(True)
+ provider_path = self._produce_dummy_provider_json()
+
+ # set mtime to something really old
+ os.utime(provider_path, (-1, 100))
+
+ with mock.patch.object(
+ ProviderConfig, 'get_api_uri',
+ return_value="https://localhost:%s" % (self.https_port,)):
+ self.pb._download_provider_info()
+ self.assertTrue(ProviderConfig.load.called)
+ self.assertTrue(ProviderConfig.save.called)
+
+ @mock.patch('leap.config.providerconfig.ProviderConfig.get_ca_cert_path',
+ lambda x: where('cacert.pem'))
+ def test_download_provider_info_unsupported_api_raises(self):
+ self._setup_provider_config_with("9999999", tempfile.mkdtemp())
+ self._setup_providerbootstrapper(False)
+ self._produce_dummy_provider_json()
+
+ with mock.patch.object(
+ ProviderConfig, 'get_api_uri',
+ return_value="https://localhost:%s" % (self.https_port,)):
+ with self.assertRaises(UnsupportedProviderAPI):
+ self.pb._download_provider_info()
+
+ @mock.patch('leap.config.providerconfig.ProviderConfig.get_ca_cert_path',
+ lambda x: where('cacert.pem'))
+ def test_download_provider_info_unsupported_api(self):
+ self._setup_provider_config_with(SupportedAPIs.SUPPORTED_APIS[0],
+ tempfile.mkdtemp())
+ self._setup_providerbootstrapper(False)
+ self._produce_dummy_provider_json()
+
+ with mock.patch.object(
+ ProviderConfig, 'get_api_uri',
+ return_value="https://localhost:%s" % (self.https_port,)):
+ self.pb._download_provider_info()
+
+ @mock.patch('leap.config.providerconfig.ProviderConfig.get_api_uri',
+ lambda x: 'api.uri')
+ @mock.patch('leap.config.providerconfig.ProviderConfig.get_ca_cert_path',
+ lambda x: '/cert/path')
+ def test_check_api_certificate_skips(self):
+ self.pb._provider_config = ProviderConfig()
+ self.pb._session.get = mock.MagicMock(return_value=Response())
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=False)
+ self.pb._check_api_certificate()
+ self.assertFalse(self.pb._session.get.called)
+
+ @deferred()
+ def test_check_api_certificate_fails(self):
+ self.pb._provider_config = ProviderConfig()
+ self.pb._provider_config.get_api_uri = mock.MagicMock(
+ return_value="https://localhost:%s" % (self.https_port,))
+ self.pb._provider_config.get_ca_cert_path = mock.MagicMock(
+ return_value=os.path.join(
+ os.path.split(__file__)[0],
+ "wrongcert.pem"))
+ self.pb._provider_config.get_api_version = mock.MagicMock(
+ return_value="1")
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=True)
+
+ def check(*args):
+ with self.assertRaises(requests.exceptions.SSLError):
+ self.pb._check_api_certificate()
+ d = threads.deferToThread(check)
+ return d
+
+ @deferred()
+ def test_check_api_certificate_succeeds(self):
+ self.pb._provider_config = ProviderConfig()
+ self.pb._provider_config.get_api_uri = mock.MagicMock(
+ return_value="https://localhost:%s" % (self.https_port,))
+ self.pb._provider_config.get_ca_cert_path = mock.MagicMock(
+ return_value=where('cacert.pem'))
+ self.pb._provider_config.get_api_version = mock.MagicMock(
+ return_value="1")
+
+ self.pb._should_proceed_cert = mock.MagicMock(return_value=True)
+
+ def check(*args):
+ self.pb._check_api_certificate()
+ d = threads.deferToThread(check)
+ return d
diff --git a/src/leap/services/eip/tests/test_vpngatewayselector.py b/src/leap/services/eip/tests/test_vpngatewayselector.py
new file mode 100644
index 00000000..c90681d7
--- /dev/null
+++ b/src/leap/services/eip/tests/test_vpngatewayselector.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# test_vpngatewayselector.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+tests for vpngatewayselector
+"""
+
+import unittest
+
+from leap.services.eip.eipconfig import EIPConfig, VPNGatewaySelector
+from leap.common.testing.basetest import BaseLeapTest
+from mock import Mock
+
+
+sample_gateways = [
+ {u'host': u'gateway1.com',
+ u'ip_address': u'1.2.3.4',
+ u'location': u'location1'},
+ {u'host': u'gateway2.com',
+ u'ip_address': u'2.3.4.5',
+ u'location': u'location2'},
+ {u'host': u'gateway3.com',
+ u'ip_address': u'3.4.5.6',
+ u'location': u'location3'},
+ {u'host': u'gateway4.com',
+ u'ip_address': u'4.5.6.7',
+ u'location': u'location4'}
+]
+
+sample_gateways_no_location = [
+ {u'host': u'gateway1.com',
+ u'ip_address': u'1.2.3.4'},
+ {u'host': u'gateway2.com',
+ u'ip_address': u'2.3.4.5'},
+ {u'host': u'gateway3.com',
+ u'ip_address': u'3.4.5.6'}
+]
+
+sample_locations = {
+ u'location1': {u'timezone': u'2'},
+ u'location2': {u'timezone': u'-7'},
+ u'location3': {u'timezone': u'-4'},
+ u'location4': {u'timezone': u'+13'}
+}
+
+# 0 is not used, only for indexing from 1 in tests
+ips = (0, u'1.2.3.4', u'2.3.4.5', u'3.4.5.6', u'4.5.6.7')
+
+
+class VPNGatewaySelectorTest(BaseLeapTest):
+ """
+ VPNGatewaySelector's tests.
+ """
+ def setUp(self):
+ self.eipconfig = EIPConfig()
+ self.eipconfig.get_gateways = Mock(return_value=sample_gateways)
+ self.eipconfig.get_locations = Mock(return_value=sample_locations)
+
+ def tearDown(self):
+ pass
+
+ def test_get_no_gateways(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig)
+ self.eipconfig.get_gateways = Mock(return_value=[])
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [])
+
+ def test_get_gateway_with_no_locations(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig)
+ self.eipconfig.get_gateways = Mock(
+ return_value=sample_gateways_no_location)
+ self.eipconfig.get_locations = Mock(return_value=[])
+ gateways = gateway_selector.get_gateways()
+ gateways_default_order = [
+ sample_gateways[0]['ip_address'],
+ sample_gateways[1]['ip_address'],
+ sample_gateways[2]['ip_address']
+ ]
+ self.assertEqual(gateways, gateways_default_order)
+
+ def test_correct_order_gmt(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, 0)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[1], ips[3], ips[2], ips[4]])
+
+ def test_correct_order_gmt_minus_3(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, -3)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[3], ips[2], ips[1], ips[4]])
+
+ def test_correct_order_gmt_minus_7(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, -7)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[2], ips[3], ips[4], ips[1]])
+
+ def test_correct_order_gmt_plus_5(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, 5)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[1], ips[4], ips[3], ips[2]])
+
+ def test_correct_order_gmt_plus_12(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, 12)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[4], ips[2], ips[3], ips[1]])
+
+ def test_correct_order_gmt_minus_11(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, -11)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[4], ips[2], ips[3], ips[1]])
+
+ def test_correct_order_gmt_plus_14(self):
+ gateway_selector = VPNGatewaySelector(self.eipconfig, 14)
+ gateways = gateway_selector.get_gateways()
+ self.assertEqual(gateways, [ips[4], ips[2], ips[3], ips[1]])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/services/eip/tests/wrongcert.pem b/src/leap/services/eip/tests/wrongcert.pem
new file mode 100644
index 00000000..e6cff38a
--- /dev/null
+++ b/src/leap/services/eip/tests/wrongcert.pem
@@ -0,0 +1,33 @@
+-----BEGIN CERTIFICATE-----
+MIIFtTCCA52gAwIBAgIJAIWZus5EIXNtMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI1MTc0NjExWhcNMTgwNjI1MTc0NjExWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEA2ObM7ESjyuxFZYD/Y68qOPQgjgggW+cdXfBpU2p4n7clsrUeMhWdW40Y
+77Phzor9VOeqs3ZpHuyLzsYVp/kFDm8tKyo2ah5fJwzL0VCSLYaZkUQQ7GNUmTCk
+furaxl8cQx/fg395V7/EngsS9B3/y5iHbctbA4MnH3jaotO5EGeo6hw7/eyCotQ9
+KbBV9GJMcY94FsXBCmUB+XypKklWTLhSaS6Cu4Fo8YLW6WmcnsyEOGS2F7WVf5at
+7CBWFQZHaSgIBLmc818/mDYCnYmCVMFn/6Ndx7V2NTlz+HctWrQn0dmIOnCUeCwS
+wXq9PnBR1rSx/WxwyF/WpyjOFkcIo7vm72kS70pfrYsXcZD4BQqkXYj3FyKnPt3O
+ibLKtCxL8/83wOtErPcYpG6LgFkgAAlHQ9MkUi5dbmjCJtpqQmlZeK1RALdDPiB3
+K1KZimrGsmcE624dJxUIOJJpuwJDy21F8kh5ZAsAtE1prWETrQYNElNFjQxM83rS
+ZR1Ql2MPSB4usEZT57+KvpEzlOnAT3elgCg21XrjSFGi14hCEao4g2OEZH5GAwm5
+frf6UlSRZ/g3tLTfI8Hv1prw15W2qO+7q7SBAplTODCRk+Yb0YoA2mMM/QXBUcXs
+vKEDLSSxzNIBi3T62l39RB/ml+gPKo87ZMDivex1ZhrcJc3Yu3sCAwEAAaOBpzCB
+pDAdBgNVHQ4EFgQUPjE+4pun+8FreIdpoR8v6N7xKtUwdQYDVR0jBG4wbIAUPjE+
+4pun+8FreIdpoR8v6N7xKtWhSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT
+b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCF
+mbrORCFzbTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQCpvCPdtvXJ
+muTj379TZuCJs7/l0FhA7AHa1WAlHjsXHaA7N0+3ZWAbdtXDsowal6S+ldgU/kfV
+Lq7NrRq+amJWC7SYj6cvVwhrSwSvu01fe/TWuOzHrRv1uTfJ/VXLonVufMDd9opo
+bhqYxMaxLdIx6t/MYmZH4Wpiq0yfZuv//M8i7BBl/qvaWbLhg0yVAKRwjFvf59h6
+6tRFCLddELOIhLDQtk8zMbioPEbfAlKdwwP8kYGtDGj6/9/YTd/oTKRdgHuwyup3
+m0L20Y6LddC+tb0WpK5EyrNbCbEqj1L4/U7r6f/FKNA3bx6nfdXbscaMfYonKAKg
+1cRrRg45sErmCz0QyTnWzXyvbjR4oQRzyW3kJ1JZudZ+AwOi00J5FYa3NiLuxl1u
+gIGKWSrASQWhEdpa1nlCgX7PhdaQgYjEMpQvA0GCA0OF5JDu8en1yZqsOt1hCLIN
+lkz/5jKPqrclY5hV99bE3hgCHRmIPNHCZG3wbZv2yJKxJX1YLMmQwAmSh2N7YwGG
+yXRvCxQs5ChPHyRairuf/5MZCZnSVb45ppTVuNUijsbflKRUgfj/XvfqQ22f+C9N
+Om2dmNvAiS2TOIfuP47CF2OUa5q4plUwmr+nyXQGM0SIoHNCj+MBdFfb3oxxAtI+
+SLhbnzQv5e84Doqz3YF0XW8jyR7q8GFLNA==
+-----END CERTIFICATE-----