summaryrefslogtreecommitdiff
path: root/src/leap/services/eip/tests/test_eipbootstrapper.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/services/eip/tests/test_eipbootstrapper.py')
-rw-r--r--src/leap/services/eip/tests/test_eipbootstrapper.py347
1 files changed, 0 insertions, 347 deletions
diff --git a/src/leap/services/eip/tests/test_eipbootstrapper.py b/src/leap/services/eip/tests/test_eipbootstrapper.py
deleted file mode 100644
index f2331eca..00000000
--- a/src/leap/services/eip/tests/test_eipbootstrapper.py
+++ /dev/null
@@ -1,347 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_eipbootstrapper.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-Tests for the EIP Boostrapper checks
-
-These will be whitebox tests since we want to make sure the private
-implementation is checking what we expect.
-"""
-
-import os
-import mock
-import tempfile
-import time
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-
-from nose.twistedtools import deferred, reactor
-from twisted.internet import threads
-from requests.models import Response
-
-from leap.common.testing.basetest import BaseLeapTest
-from leap.services.eip.eipbootstrapper import EIPBootstrapper
-from leap.services.eip.eipconfig import EIPConfig
-from leap.config.providerconfig import ProviderConfig
-from leap.crypto.tests import fake_provider
-from leap.common.files import mkdir_p
-from leap.crypto.srpauth import SRPAuth
-
-
-class EIPBootstrapperActiveTest(BaseLeapTest):
- @classmethod
- def setUpClass(cls):
- BaseLeapTest.setUpClass()
- factory = fake_provider.get_provider_factory()
- http = reactor.listenTCP(0, factory)
- https = reactor.listenSSL(
- 0, factory,
- fake_provider.OpenSSLServerContextFactory())
- get_port = lambda p: p.getHost().port
- cls.http_port = get_port(http)
- cls.https_port = get_port(https)
-
- def setUp(self):
- self.eb = EIPBootstrapper()
- self.old_pp = EIPConfig.get_path_prefix
- self.old_save = EIPConfig.save
- self.old_load = EIPConfig.load
- self.old_si = SRPAuth.get_session_id
-
- def tearDown(self):
- EIPConfig.get_path_prefix = self.old_pp
- EIPConfig.save = self.old_save
- EIPConfig.load = self.old_load
- SRPAuth.get_session_id = self.old_si
-
- def _download_config_test_template(self, ifneeded, new):
- """
- All download config tests have the same structure, so this is
- a parametrized test for that.
-
- :param ifneeded: sets _download_if_needed
- :type ifneeded: bool
- :param new: if True uses time.time() as mtime for the mocked
- eip-service file, otherwise it uses 100 (a really
- old mtime)
- :type new: float or int (will be coersed)
- """
- pc = ProviderConfig()
- pc.get_domain = mock.MagicMock(
- return_value="localhost:%s" % (self.https_port))
- self.eb._provider_config = pc
-
- pc.get_api_uri = mock.MagicMock(
- return_value="https://%s" % (pc.get_domain()))
- pc.get_api_version = mock.MagicMock(return_value="1")
-
- # This is to ignore https checking, since it's not the point
- # of this test
- pc.get_ca_cert_path = mock.MagicMock(return_value=False)
-
- path_prefix = tempfile.mkdtemp()
- EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix)
- EIPConfig.save = mock.MagicMock()
- EIPConfig.load = mock.MagicMock()
-
- self.eb._download_if_needed = ifneeded
-
- provider_dir = os.path.join(EIPConfig.get_path_prefix(),
- "leap",
- "providers",
- pc.get_domain())
- mkdir_p(provider_dir)
- eip_config_path = os.path.join(provider_dir,
- "eip-service.json")
-
- with open(eip_config_path, "w") as ec:
- ec.write("A")
-
- # set mtime to something really new
- if new:
- os.utime(eip_config_path, (-1, time.time()))
- else:
- os.utime(eip_config_path, (-1, 100))
-
- @deferred()
- def test_download_config_not_modified(self):
- self._download_config_test_template(True, True)
-
- d = threads.deferToThread(self.eb._download_config)
-
- def check(*args):
- self.assertFalse(self.eb._eip_config.save.called)
- d.addCallback(check)
- return d
-
- @deferred()
- def test_download_config_modified(self):
- self._download_config_test_template(True, False)
-
- d = threads.deferToThread(self.eb._download_config)
-
- def check(*args):
- self.assertTrue(self.eb._eip_config.save.called)
- d.addCallback(check)
- return d
-
- @deferred()
- def test_download_config_ignores_mtime(self):
- self._download_config_test_template(False, True)
-
- d = threads.deferToThread(self.eb._download_config)
-
- def check(*args):
- self.eb._eip_config.save.assert_called_once_with(
- ["leap",
- "providers",
- self.eb._provider_config.get_domain(),
- "eip-service.json"])
- d.addCallback(check)
- return d
-
- def _download_certificate_test_template(self, ifneeded, createcert):
- """
- All download client certificate tests have the same structure,
- so this is a parametrized test for that.
-
- :param ifneeded: sets _download_if_needed
- :type ifneeded: bool
- :param createcert: if True it creates a dummy file to play the
- part of a downloaded certificate
- :type createcert: bool
-
- :returns: the temp eip cert path and the dummy cert contents
- :rtype: tuple of str, str
- """
- pc = ProviderConfig()
- ec = EIPConfig()
- self.eb._provider_config = pc
- self.eb._eip_config = ec
-
- pc.get_domain = mock.MagicMock(
- return_value="localhost:%s" % (self.https_port))
- pc.get_api_uri = mock.MagicMock(
- return_value="https://%s" % (pc.get_domain()))
- pc.get_api_version = mock.MagicMock(return_value="1")
- pc.get_ca_cert_path = mock.MagicMock(return_value=False)
-
- path_prefix = tempfile.mkdtemp()
- EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix)
- EIPConfig.save = mock.MagicMock()
- EIPConfig.load = mock.MagicMock()
-
- self.eb._download_if_needed = ifneeded
-
- provider_dir = os.path.join(EIPConfig.get_path_prefix(),
- "leap",
- "providers",
- "somedomain")
- mkdir_p(provider_dir)
- eip_cert_path = os.path.join(provider_dir,
- "cert")
-
- ec.get_client_cert_path = mock.MagicMock(
- return_value=eip_cert_path)
-
- cert_content = "A"
- if createcert:
- with open(eip_cert_path, "w") as ec:
- ec.write(cert_content)
-
- return eip_cert_path, cert_content
-
- def test_download_client_certificate_not_modified(self):
- cert_path, old_cert_content = self._download_certificate_test_template(
- True, True)
-
- with mock.patch('leap.common.certs.should_redownload',
- new_callable=mock.MagicMock,
- return_value=False):
- self.eb._download_client_certificates()
- with open(cert_path, "r") as c:
- self.assertEqual(c.read(), old_cert_content)
-
- @deferred()
- def test_download_client_certificate_old_cert(self):
- cert_path, old_cert_content = self._download_certificate_test_template(
- True, True)
-
- def wrapper(*args):
- with mock.patch('leap.common.certs.should_redownload',
- new_callable=mock.MagicMock,
- return_value=True):
- with mock.patch('leap.common.certs.is_valid_pemfile',
- new_callable=mock.MagicMock,
- return_value=True):
- self.eb._download_client_certificates()
-
- def check(*args):
- with open(cert_path, "r") as c:
- self.assertNotEqual(c.read(), old_cert_content)
- d = threads.deferToThread(wrapper)
- d.addCallback(check)
-
- return d
-
- @deferred()
- def test_download_client_certificate_no_cert(self):
- cert_path, _ = self._download_certificate_test_template(
- True, False)
-
- def wrapper(*args):
- with mock.patch('leap.common.certs.should_redownload',
- new_callable=mock.MagicMock,
- return_value=False):
- with mock.patch('leap.common.certs.is_valid_pemfile',
- new_callable=mock.MagicMock,
- return_value=True):
- self.eb._download_client_certificates()
-
- def check(*args):
- self.assertTrue(os.path.exists(cert_path))
- d = threads.deferToThread(wrapper)
- d.addCallback(check)
-
- return d
-
- @deferred()
- def test_download_client_certificate_force_not_valid(self):
- cert_path, old_cert_content = self._download_certificate_test_template(
- True, True)
-
- def wrapper(*args):
- with mock.patch('leap.common.certs.should_redownload',
- new_callable=mock.MagicMock,
- return_value=True):
- with mock.patch('leap.common.certs.is_valid_pemfile',
- new_callable=mock.MagicMock,
- return_value=True):
- self.eb._download_client_certificates()
-
- def check(*args):
- with open(cert_path, "r") as c:
- self.assertNotEqual(c.read(), old_cert_content)
- d = threads.deferToThread(wrapper)
- d.addCallback(check)
-
- return d
-
- @deferred()
- def test_download_client_certificate_invalid_download(self):
- cert_path, _ = self._download_certificate_test_template(
- False, False)
-
- def wrapper(*args):
- with mock.patch('leap.common.certs.should_redownload',
- new_callable=mock.MagicMock,
- return_value=True):
- with mock.patch('leap.common.certs.is_valid_pemfile',
- new_callable=mock.MagicMock,
- return_value=False):
- with self.assertRaises(Exception):
- self.eb._download_client_certificates()
- d = threads.deferToThread(wrapper)
-
- return d
-
- @deferred()
- def test_download_client_certificate_uses_session_id(self):
- _, _ = self._download_certificate_test_template(
- False, False)
-
- SRPAuth.get_session_id = mock.MagicMock(return_value="1")
-
- def check_cookie(*args, **kwargs):
- cookies = kwargs.get("cookies", None)
- self.assertEqual(cookies, {'_session_id': '1'})
- return Response()
-
- def wrapper(*args):
- with mock.patch('leap.common.certs.should_redownload',
- new_callable=mock.MagicMock,
- return_value=False):
- with mock.patch('leap.common.certs.is_valid_pemfile',
- new_callable=mock.MagicMock,
- return_value=True):
- with mock.patch('requests.sessions.Session.get',
- new_callable=mock.MagicMock,
- side_effect=check_cookie):
- with mock.patch('requests.models.Response.content',
- new_callable=mock.PropertyMock,
- return_value="A"):
- self.eb._download_client_certificates()
-
- d = threads.deferToThread(wrapper)
-
- return d
-
- @deferred()
- def test_run_eip_setup_checks(self):
- self.eb._download_config = mock.MagicMock()
- self.eb._download_client_certificates = mock.MagicMock()
-
- d = self.eb.run_eip_setup_checks(ProviderConfig())
-
- def check(*args):
- self.eb._download_config.assert_called_once_with()
- self.eb._download_client_certificates.assert_called_once_with(None)
- d.addCallback(check)
- return d