# -*- coding: utf-8 -*- # test_eipbootstrapper.py # Copyright (C) 2013 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Tests for the EIP Boostrapper checks These will be whitebox tests since we want to make sure the private implementation is checking what we expect. """ import os import mock import tempfile import time try: import unittest2 as unittest except ImportError: import unittest from nose.twistedtools import deferred, reactor from twisted.internet import threads from requests.models import Response from leap.common.testing.basetest import BaseLeapTest from leap.services.eip.eipbootstrapper import EIPBootstrapper from leap.services.eip.eipconfig import EIPConfig from leap.config.providerconfig import ProviderConfig from leap.crypto.tests import fake_provider from leap.common.files import mkdir_p from leap.crypto.srpauth import SRPAuth class EIPBootstrapperActiveTest(BaseLeapTest): @classmethod def setUpClass(cls): BaseLeapTest.setUpClass() factory = fake_provider.get_provider_factory() http = reactor.listenTCP(0, factory) https = reactor.listenSSL( 0, factory, fake_provider.OpenSSLServerContextFactory()) get_port = lambda p: p.getHost().port cls.http_port = get_port(http) cls.https_port = get_port(https) def setUp(self): self.eb = EIPBootstrapper() self.old_pp = EIPConfig.get_path_prefix self.old_save = EIPConfig.save self.old_load = EIPConfig.load self.old_si = SRPAuth.get_session_id def tearDown(self): EIPConfig.get_path_prefix = self.old_pp EIPConfig.save = self.old_save EIPConfig.load = self.old_load SRPAuth.get_session_id = self.old_si def _download_config_test_template(self, ifneeded, new): """ All download config tests have the same structure, so this is a parametrized test for that. :param ifneeded: sets _download_if_needed :type ifneeded: bool :param new: if True uses time.time() as mtime for the mocked eip-service file, otherwise it uses 100 (a really old mtime) :type new: float or int (will be coersed) """ pc = ProviderConfig() pc.get_domain = mock.MagicMock( return_value="localhost:%s" % (self.https_port)) self.eb._provider_config = pc pc.get_api_uri = mock.MagicMock( return_value="https://%s" % (pc.get_domain())) pc.get_api_version = mock.MagicMock(return_value="1") # This is to ignore https checking, since it's not the point # of this test pc.get_ca_cert_path = mock.MagicMock(return_value=False) path_prefix = tempfile.mkdtemp() EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix) EIPConfig.save = mock.MagicMock() EIPConfig.load = mock.MagicMock() self.eb._download_if_needed = ifneeded provider_dir = os.path.join(EIPConfig.get_path_prefix(), "leap", "providers", pc.get_domain()) mkdir_p(provider_dir) eip_config_path = os.path.join(provider_dir, "eip-service.json") with open(eip_config_path, "w") as ec: ec.write("A") # set mtime to something really new if new: os.utime(eip_config_path, (-1, time.time())) else: os.utime(eip_config_path, (-1, 100)) @deferred() def test_download_config_not_modified(self): self._download_config_test_template(True, True) d = threads.deferToThread(self.eb._download_config) def check(*args): self.assertFalse(self.eb._eip_config.save.called) d.addCallback(check) return d @deferred() def test_download_config_modified(self): self._download_config_test_template(True, False) d = threads.deferToThread(self.eb._download_config) def check(*args): self.assertTrue(self.eb._eip_config.save.called) d.addCallback(check) return d @deferred() def test_download_config_ignores_mtime(self): self._download_config_test_template(False, True) d = threads.deferToThread(self.eb._download_config) def check(*args): self.eb._eip_config.save.assert_called_once_with( ["leap", "providers", self.eb._provider_config.get_domain(), "eip-service.json"]) d.addCallback(check) return d def _download_certificate_test_template(self, ifneeded, createcert): """ All download client certificate tests have the same structure, so this is a parametrized test for that. :param ifneeded: sets _download_if_needed :type ifneeded: bool :param createcert: if True it creates a dummy file to play the part of a downloaded certificate :type createcert: bool :returns: the temp eip cert path and the dummy cert contents :rtype: tuple of str, str """ pc = ProviderConfig() ec = EIPConfig() self.eb._provider_config = pc self.eb._eip_config = ec pc.get_domain = mock.MagicMock( return_value="localhost:%s" % (self.https_port)) pc.get_api_uri = mock.MagicMock( return_value="https://%s" % (pc.get_domain())) pc.get_api_version = mock.MagicMock(return_value="1") pc.get_ca_cert_path = mock.MagicMock(return_value=False) path_prefix = tempfile.mkdtemp() EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix) EIPConfig.save = mock.MagicMock() EIPConfig.load = mock.MagicMock() self.eb._download_if_needed = ifneeded provider_dir = os.path.join(EIPConfig.get_path_prefix(), "leap", "providers", "somedomain") mkdir_p(provider_dir) eip_cert_path = os.path.join(provider_dir, "cert") ec.get_client_cert_path = mock.MagicMock( return_value=eip_cert_path) cert_content = "A" if createcert: with open(eip_cert_path, "w") as ec: ec.write(cert_content) return eip_cert_path, cert_content def test_download_client_certificate_not_modified(self): cert_path, old_cert_content = self._download_certificate_test_template( True, True) with mock.patch('leap.common.certs.should_redownload', new_callable=mock.MagicMock, return_value=False): self.eb._download_client_certificates() with open(cert_path, "r") as c: self.assertEqual(c.read(), old_cert_content) @deferred() def test_download_client_certificate_old_cert(self): cert_path, old_cert_content = self._download_certificate_test_template( True, True) def wrapper(*args): with mock.patch('leap.common.certs.should_redownload', new_callable=mock.MagicMock, return_value=True): with mock.patch('leap.common.certs.is_valid_pemfile', new_callable=mock.MagicMock, return_value=True): self.eb._download_client_certificates() def check(*args): with open(cert_path, "r") as c: self.assertNotEqual(c.read(), old_cert_content) d = threads.deferToThread(wrapper) d.addCallback(check) return d @deferred() def test_download_client_certificate_no_cert(self): cert_path, _ = self._download_certificate_test_template( True, False) def wrapper(*args): with mock.patch('leap.common.certs.should_redownload', new_callable=mock.MagicMock, return_value=False): with mock.patch('leap.common.certs.is_valid_pemfile', new_callable=mock.MagicMock, return_value=True): self.eb._download_client_certificates() def check(*args): self.assertTrue(os.path.exists(cert_path)) d = threads.deferToThread(wrapper) d.addCallback(check) return d @deferred() def test_download_client_certificate_force_not_valid(self): cert_path, old_cert_content = self._download_certificate_test_template( True, True) def wrapper(*args): with mock.patch('leap.common.certs.should_redownload', new_callable=mock.MagicMock, return_value=True): with mock.patch('leap.common.certs.is_valid_pemfile', new_callable=mock.MagicMock, return_value=True): self.eb._download_client_certificates() def check(*args): with open(cert_path, "r") as c: self.assertNotEqual(c.read(), old_cert_content) d = threads.deferToThread(wrapper) d.addCallback(check) return d @deferred() def test_download_client_certificate_invalid_download(self): cert_path, _ = self._download_certificate_test_template( False, False) def wrapper(*args): with mock.patch('leap.common.certs.should_redownload', new_callable=mock.MagicMock, return_value=True): with mock.patch('leap.common.certs.is_valid_pemfile', new_callable=mock.MagicMock, return_value=False): with self.assertRaises(Exception): self.eb._download_client_certificates() d = threads.deferToThread(wrapper) return d @deferred() def test_download_client_certificate_uses_session_id(self): _, _ = self._download_certificate_test_template( False, False) SRPAuth.get_session_id = mock.MagicMock(return_value="1") def check_cookie(*args, **kwargs): cookies = kwargs.get("cookies", None) self.assertEqual(cookies, {'_session_id': '1'}) return Response() def wrapper(*args): with mock.patch('leap.common.certs.should_redownload', new_callable=mock.MagicMock, return_value=False): with mock.patch('leap.common.certs.is_valid_pemfile', new_callable=mock.MagicMock, return_value=True): with mock.patch('requests.sessions.Session.get', new_callable=mock.MagicMock, side_effect=check_cookie): with mock.patch('requests.models.Response.content', new_callable=mock.PropertyMock, return_value="A"): self.eb._download_client_certificates() d = threads.deferToThread(wrapper) return d @deferred() def test_run_eip_setup_checks(self): self.eb._download_config = mock.MagicMock() self.eb._download_client_certificates = mock.MagicMock() d = self.eb.run_eip_setup_checks(ProviderConfig()) def check(*args): self.eb._download_config.assert_called_once_with() self.eb._download_client_certificates.assert_called_once_with(None) d.addCallback(check) return d