diff options
| author | Ivan Alejandro <ivanalejandro0@yahoo.com.ar> | 2013-06-28 11:14:08 -0300 | 
|---|---|---|
| committer | Ivan Alejandro <ivanalejandro0@yahoo.com.ar> | 2013-06-28 11:14:08 -0300 | 
| commit | 432a5e058f1d1a3722ed3ab5c350d522b7795bf4 (patch) | |
| tree | 9febdba365812c65e9a2296ca6a857353371217b | |
| parent | 1a1cd679a6f6c86ff8965f26f4fc463b98eba4ec (diff) | |
| parent | 6fcf0faa8b23bbdc1b9dd462d24cbb30d0c79652 (diff) | |
Merge branch 'feature/coverage_eipbootstrapper' into develop
| -rw-r--r-- | src/leap/crypto/tests/eip-service.json | 43 | ||||
| -rwxr-xr-x | src/leap/crypto/tests/fake_provider.py | 9 | ||||
| -rw-r--r-- | src/leap/services/eip/eipbootstrapper.py | 10 | ||||
| -rw-r--r-- | src/leap/services/eip/tests/test_eipbootstrapper.py | 347 | 
4 files changed, 398 insertions, 11 deletions
| diff --git a/src/leap/crypto/tests/eip-service.json b/src/leap/crypto/tests/eip-service.json new file mode 100644 index 00000000..24df42a2 --- /dev/null +++ b/src/leap/crypto/tests/eip-service.json @@ -0,0 +1,43 @@ +{ +  "gateways": [ +    { +      "capabilities": { +        "adblock": false,  +        "filter_dns": false,  +        "limited": true,  +        "ports": [ +          "1194",  +          "443",  +          "53",  +          "80" +        ],  +        "protocols": [ +          "tcp",  +          "udp" +        ],  +        "transport": [ +          "openvpn" +        ],  +        "user_ips": false +      },  +      "host": "harrier.cdev.bitmask.net",  +      "ip_address": "199.254.238.50",  +      "location": "seattle__wa" +    } +  ],  +  "locations": { +    "seattle__wa": { +      "country_code": "US",  +      "hemisphere": "N",  +      "name": "Seattle, WA",  +      "timezone": "-7" +    } +  },  +  "openvpn_configuration": { +    "auth": "SHA1",  +    "cipher": "AES-128-CBC",  +    "tls-cipher": "DHE-RSA-AES128-SHA" +  },  +  "serial": 1,  +  "version": 1 +}
\ No newline at end of file diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py index f86d5ca9..54af485d 100755 --- a/src/leap/crypto/tests/fake_provider.py +++ b/src/leap/crypto/tests/fake_provider.py @@ -306,9 +306,7 @@ class FileModified(File):          since = request.getHeader('if-modified-since')          if since:              tsince = time.strptime(since.replace(" GMT", "")) -            tfrom = time.strptime(time.ctime(os.path.getmtime( -                os.path.join(_here, -                             "test_provider.json")))) +            tfrom = time.strptime(time.ctime(os.path.getmtime(self.path)))              if tfrom > tsince:                  return File.render_GET(self, request)              else: @@ -350,12 +348,13 @@ def get_provider_factory():      config = Resource()      config.putChild(          "eip-service.json", -        File("./eip-service.json")) +        FileModified( +            os.path.join(_here, "eip-service.json")))      apiv1 = Resource()      apiv1.putChild("config", config)      apiv1.putChild("sessions", API_Sessions())      apiv1.putChild("users", FakeUsers(None)) -    apiv1.putChild("cert", File( +    apiv1.putChild("cert", FileModified(          os.path.join(_here,                       'openvpn.pem')))      root.putChild("1", apiv1) diff --git a/src/leap/services/eip/eipbootstrapper.py b/src/leap/services/eip/eipbootstrapper.py index 4da8f90f..b2af0aea 100644 --- a/src/leap/services/eip/eipbootstrapper.py +++ b/src/leap/services/eip/eipbootstrapper.py @@ -25,7 +25,7 @@ import os  from PySide import QtCore  from leap.common.check import leap_assert, leap_assert_type -from leap.common.certs import is_valid_pemfile, should_redownload +from leap.common import certs  from leap.common.files import check_and_fix_urw_only, get_mtime, mkdir_p  from leap.config.providerconfig import ProviderConfig  from leap.crypto.srpauth import SRPAuth @@ -120,7 +120,7 @@ class EIPBootstrapper(AbstractBootstrapper):          # For re-download if something is wrong with the cert          self._download_if_needed = self._download_if_needed and \ -            not should_redownload(client_cert_path) +            not certs.should_redownload(client_cert_path)          if self._download_if_needed and \                  os.path.exists(client_cert_path): @@ -143,9 +143,7 @@ class EIPBootstrapper(AbstractBootstrapper):          res.raise_for_status()          client_cert = res.content -        # TODO: check certificate validity - -        if not is_valid_pemfile(client_cert): +        if not certs.is_valid_pemfile(client_cert):              raise Exception(self.tr("The downloaded certificate is not a "                                      "valid PEM file")) @@ -177,4 +175,4 @@ class EIPBootstrapper(AbstractBootstrapper):               self.download_client_certificate)          ] -        self.addCallbackChain(cb_chain) +        return self.addCallbackChain(cb_chain) diff --git a/src/leap/services/eip/tests/test_eipbootstrapper.py b/src/leap/services/eip/tests/test_eipbootstrapper.py new file mode 100644 index 00000000..f2331eca --- /dev/null +++ b/src/leap/services/eip/tests/test_eipbootstrapper.py @@ -0,0 +1,347 @@ +# -*- coding: utf-8 -*- +# test_eipbootstrapper.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for the EIP Boostrapper checks + +These will be whitebox tests since we want to make sure the private +implementation is checking what we expect. +""" + +import os +import mock +import tempfile +import time +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from nose.twistedtools import deferred, reactor +from twisted.internet import threads +from requests.models import Response + +from leap.common.testing.basetest import BaseLeapTest +from leap.services.eip.eipbootstrapper import EIPBootstrapper +from leap.services.eip.eipconfig import EIPConfig +from leap.config.providerconfig import ProviderConfig +from leap.crypto.tests import fake_provider +from leap.common.files import mkdir_p +from leap.crypto.srpauth import SRPAuth + + +class EIPBootstrapperActiveTest(BaseLeapTest): +    @classmethod +    def setUpClass(cls): +        BaseLeapTest.setUpClass() +        factory = fake_provider.get_provider_factory() +        http = reactor.listenTCP(0, factory) +        https = reactor.listenSSL( +            0, factory, +            fake_provider.OpenSSLServerContextFactory()) +        get_port = lambda p: p.getHost().port +        cls.http_port = get_port(http) +        cls.https_port = get_port(https) + +    def setUp(self): +        self.eb = EIPBootstrapper() +        self.old_pp = EIPConfig.get_path_prefix +        self.old_save = EIPConfig.save +        self.old_load = EIPConfig.load +        self.old_si = SRPAuth.get_session_id + +    def tearDown(self): +        EIPConfig.get_path_prefix = self.old_pp +        EIPConfig.save = self.old_save +        EIPConfig.load = self.old_load +        SRPAuth.get_session_id = self.old_si + +    def _download_config_test_template(self, ifneeded, new): +        """ +        All download config tests have the same structure, so this is +        a parametrized test for that. + +        :param ifneeded: sets _download_if_needed +        :type ifneeded: bool +        :param new: if True uses time.time() as mtime for the mocked +                    eip-service file, otherwise it uses 100 (a really +                    old mtime) +        :type new: float or int (will be coersed) +        """ +        pc = ProviderConfig() +        pc.get_domain = mock.MagicMock( +            return_value="localhost:%s" % (self.https_port)) +        self.eb._provider_config = pc + +        pc.get_api_uri = mock.MagicMock( +            return_value="https://%s" % (pc.get_domain())) +        pc.get_api_version = mock.MagicMock(return_value="1") + +        # This is to ignore https checking, since it's not the point +        # of this test +        pc.get_ca_cert_path = mock.MagicMock(return_value=False) + +        path_prefix = tempfile.mkdtemp() +        EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix) +        EIPConfig.save = mock.MagicMock() +        EIPConfig.load = mock.MagicMock() + +        self.eb._download_if_needed = ifneeded + +        provider_dir = os.path.join(EIPConfig.get_path_prefix(), +                                    "leap", +                                    "providers", +                                    pc.get_domain()) +        mkdir_p(provider_dir) +        eip_config_path = os.path.join(provider_dir, +                                       "eip-service.json") + +        with open(eip_config_path, "w") as ec: +            ec.write("A") + +        # set mtime to something really new +        if new: +            os.utime(eip_config_path, (-1, time.time())) +        else: +            os.utime(eip_config_path, (-1, 100)) + +    @deferred() +    def test_download_config_not_modified(self): +        self._download_config_test_template(True, True) + +        d = threads.deferToThread(self.eb._download_config) + +        def check(*args): +            self.assertFalse(self.eb._eip_config.save.called) +        d.addCallback(check) +        return d + +    @deferred() +    def test_download_config_modified(self): +        self._download_config_test_template(True, False) + +        d = threads.deferToThread(self.eb._download_config) + +        def check(*args): +            self.assertTrue(self.eb._eip_config.save.called) +        d.addCallback(check) +        return d + +    @deferred() +    def test_download_config_ignores_mtime(self): +        self._download_config_test_template(False, True) + +        d = threads.deferToThread(self.eb._download_config) + +        def check(*args): +            self.eb._eip_config.save.assert_called_once_with( +                ["leap", +                 "providers", +                 self.eb._provider_config.get_domain(), +                 "eip-service.json"]) +        d.addCallback(check) +        return d + +    def _download_certificate_test_template(self, ifneeded, createcert): +        """ +        All download client certificate tests have the same structure, +        so this is a parametrized test for that. + +        :param ifneeded: sets _download_if_needed +        :type ifneeded: bool +        :param createcert: if True it creates a dummy file to play the +                           part of a downloaded certificate +        :type createcert: bool + +        :returns: the temp eip cert path and the dummy cert contents +        :rtype: tuple of str, str +        """ +        pc = ProviderConfig() +        ec = EIPConfig() +        self.eb._provider_config = pc +        self.eb._eip_config = ec + +        pc.get_domain = mock.MagicMock( +            return_value="localhost:%s" % (self.https_port)) +        pc.get_api_uri = mock.MagicMock( +            return_value="https://%s" % (pc.get_domain())) +        pc.get_api_version = mock.MagicMock(return_value="1") +        pc.get_ca_cert_path = mock.MagicMock(return_value=False) + +        path_prefix = tempfile.mkdtemp() +        EIPConfig.get_path_prefix = mock.MagicMock(return_value=path_prefix) +        EIPConfig.save = mock.MagicMock() +        EIPConfig.load = mock.MagicMock() + +        self.eb._download_if_needed = ifneeded + +        provider_dir = os.path.join(EIPConfig.get_path_prefix(), +                                    "leap", +                                    "providers", +                                    "somedomain") +        mkdir_p(provider_dir) +        eip_cert_path = os.path.join(provider_dir, +                                     "cert") + +        ec.get_client_cert_path = mock.MagicMock( +            return_value=eip_cert_path) + +        cert_content = "A" +        if createcert: +            with open(eip_cert_path, "w") as ec: +                ec.write(cert_content) + +        return eip_cert_path, cert_content + +    def test_download_client_certificate_not_modified(self): +        cert_path, old_cert_content = self._download_certificate_test_template( +            True, True) + +        with mock.patch('leap.common.certs.should_redownload', +                        new_callable=mock.MagicMock, +                        return_value=False): +            self.eb._download_client_certificates() +            with open(cert_path, "r") as c: +                self.assertEqual(c.read(), old_cert_content) + +    @deferred() +    def test_download_client_certificate_old_cert(self): +        cert_path, old_cert_content = self._download_certificate_test_template( +            True, True) + +        def wrapper(*args): +            with mock.patch('leap.common.certs.should_redownload', +                            new_callable=mock.MagicMock, +                            return_value=True): +                with mock.patch('leap.common.certs.is_valid_pemfile', +                                new_callable=mock.MagicMock, +                                return_value=True): +                    self.eb._download_client_certificates() + +        def check(*args): +            with open(cert_path, "r") as c: +                self.assertNotEqual(c.read(), old_cert_content) +        d = threads.deferToThread(wrapper) +        d.addCallback(check) + +        return d + +    @deferred() +    def test_download_client_certificate_no_cert(self): +        cert_path, _ = self._download_certificate_test_template( +            True, False) + +        def wrapper(*args): +            with mock.patch('leap.common.certs.should_redownload', +                            new_callable=mock.MagicMock, +                            return_value=False): +                with mock.patch('leap.common.certs.is_valid_pemfile', +                                new_callable=mock.MagicMock, +                                return_value=True): +                    self.eb._download_client_certificates() + +        def check(*args): +            self.assertTrue(os.path.exists(cert_path)) +        d = threads.deferToThread(wrapper) +        d.addCallback(check) + +        return d + +    @deferred() +    def test_download_client_certificate_force_not_valid(self): +        cert_path, old_cert_content = self._download_certificate_test_template( +            True, True) + +        def wrapper(*args): +            with mock.patch('leap.common.certs.should_redownload', +                            new_callable=mock.MagicMock, +                            return_value=True): +                with mock.patch('leap.common.certs.is_valid_pemfile', +                                new_callable=mock.MagicMock, +                                return_value=True): +                    self.eb._download_client_certificates() + +        def check(*args): +            with open(cert_path, "r") as c: +                self.assertNotEqual(c.read(), old_cert_content) +        d = threads.deferToThread(wrapper) +        d.addCallback(check) + +        return d + +    @deferred() +    def test_download_client_certificate_invalid_download(self): +        cert_path, _ = self._download_certificate_test_template( +            False, False) + +        def wrapper(*args): +            with mock.patch('leap.common.certs.should_redownload', +                            new_callable=mock.MagicMock, +                            return_value=True): +                with mock.patch('leap.common.certs.is_valid_pemfile', +                                new_callable=mock.MagicMock, +                                return_value=False): +                    with self.assertRaises(Exception): +                        self.eb._download_client_certificates() +        d = threads.deferToThread(wrapper) + +        return d + +    @deferred() +    def test_download_client_certificate_uses_session_id(self): +        _, _ = self._download_certificate_test_template( +            False, False) + +        SRPAuth.get_session_id = mock.MagicMock(return_value="1") + +        def check_cookie(*args, **kwargs): +            cookies = kwargs.get("cookies", None) +            self.assertEqual(cookies, {'_session_id': '1'}) +            return Response() + +        def wrapper(*args): +            with mock.patch('leap.common.certs.should_redownload', +                            new_callable=mock.MagicMock, +                            return_value=False): +                with mock.patch('leap.common.certs.is_valid_pemfile', +                                new_callable=mock.MagicMock, +                                return_value=True): +                    with mock.patch('requests.sessions.Session.get', +                                    new_callable=mock.MagicMock, +                                    side_effect=check_cookie): +                        with mock.patch('requests.models.Response.content', +                                        new_callable=mock.PropertyMock, +                                        return_value="A"): +                            self.eb._download_client_certificates() + +        d = threads.deferToThread(wrapper) + +        return d + +    @deferred() +    def test_run_eip_setup_checks(self): +        self.eb._download_config = mock.MagicMock() +        self.eb._download_client_certificates = mock.MagicMock() + +        d = self.eb.run_eip_setup_checks(ProviderConfig()) + +        def check(*args): +            self.eb._download_config.assert_called_once_with() +            self.eb._download_client_certificates.assert_called_once_with(None) +        d.addCallback(check) +        return d | 
