summaryrefslogtreecommitdiff
path: root/src/leap/eip/tests
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-06-28 14:25:19 -0300
committerTomás Touceda <chiiph@leap.se>2013-06-28 14:25:19 -0300
commit5b975799ce9b7a6e0a88be4bcb48bdfb90800bb3 (patch)
treebda674a79b1aeccb37b67609517bc4761db7ae07 /src/leap/eip/tests
parent9cea9c8a34343f8792d65b96f93ae22bd8685878 (diff)
parentc088a1544a5f7a51359d2802019c0740aab0cc5b (diff)
Merge branch 'release-0.2.2'0.2.2
Diffstat (limited to 'src/leap/eip/tests')
-rw-r--r--src/leap/eip/tests/__init__.py0
-rw-r--r--src/leap/eip/tests/data.py51
-rw-r--r--src/leap/eip/tests/test_checks.py372
-rw-r--r--src/leap/eip/tests/test_config.py298
-rw-r--r--src/leap/eip/tests/test_eipconnection.py216
-rw-r--r--src/leap/eip/tests/test_openvpnconnection.py161
6 files changed, 0 insertions, 1098 deletions
diff --git a/src/leap/eip/tests/__init__.py b/src/leap/eip/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/src/leap/eip/tests/__init__.py
+++ /dev/null
diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py
deleted file mode 100644
index a7fe1853..00000000
--- a/src/leap/eip/tests/data.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from __future__ import unicode_literals
-import os
-
-#from leap import __branding
-
-# sample data used in tests
-
-#PROVIDER = __branding.get('provider_domain')
-PROVIDER = "testprovider.example.org"
-
-EIP_SAMPLE_CONFIG = {
- "provider": "%s" % PROVIDER,
- "transport": "openvpn",
- "openvpn_protocol": "tcp",
- "openvpn_port": 80,
- "openvpn_ca_certificate": os.path.expanduser(
- "~/.config/leap/providers/"
- "%s/"
- "keys/ca/cacert.pem" % PROVIDER),
- "openvpn_client_certificate": os.path.expanduser(
- "~/.config/leap/providers/"
- "%s/"
- "keys/client/openvpn.pem" % PROVIDER),
- "connect_on_login": True,
- "block_cleartext_traffic": True,
- "primary_gateway": "location_unknown",
- "secondary_gateway": "location_unknown2",
- #"management_password": "oph7Que1othahwiech6J"
-}
-
-EIP_SAMPLE_SERVICE = {
- "serial": 1,
- "version": 1,
- "clusters": [
- {"label": {
- "en": "Location Unknown"},
- "name": "location_unknown"}
- ],
- "gateways": [
- {"capabilities": {
- "adblock": True,
- "filter_dns": True,
- "ports": ["80", "53", "443", "1194"],
- "protocols": ["udp", "tcp"],
- "transport": ["openvpn"],
- "user_ips": False},
- "cluster": "location_unknown",
- "host": "location.example.org",
- "ip_address": "192.0.43.10"}
- ]
-}
diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py
deleted file mode 100644
index f42a0eeb..00000000
--- a/src/leap/eip/tests/test_checks.py
+++ /dev/null
@@ -1,372 +0,0 @@
-from BaseHTTPServer import BaseHTTPRequestHandler
-import copy
-import json
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-import os
-import time
-import urlparse
-
-from mock import (patch, Mock)
-
-import requests
-
-from leap.base import config as baseconfig
-from leap.base import pluggableconfig
-from leap.base.constants import (DEFAULT_PROVIDER_DEFINITION,
- DEFINITION_EXPECTED_PATH)
-from leap.eip import checks as eipchecks
-from leap.eip import specs as eipspecs
-from leap.eip import exceptions as eipexceptions
-from leap.eip.tests import data as testdata
-from leap.testing.basetest import BaseLeapTest
-from leap.testing.https_server import BaseHTTPSServerTestCase
-from leap.testing.https_server import where as where_cert
-from leap.util.fileutil import mkdir_f
-
-
-class NoLogRequestHandler:
- def log_message(self, *args):
- # don't write log msg to stderr
- pass
-
- def read(self, n=None):
- return ''
-
-
-class EIPCheckTest(BaseLeapTest):
-
- __name__ = "eip_check_tests"
- provider = "testprovider.example.org"
- maxDiff = None
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- # test methods are there, and can be called from run_all
-
- def test_checker_should_implement_check_methods(self):
- checker = eipchecks.EIPConfigChecker(domain=self.provider)
-
- self.assertTrue(hasattr(checker, "check_default_eipconfig"),
- "missing meth")
- self.assertTrue(hasattr(checker, "check_is_there_default_provider"),
- "missing meth")
- self.assertTrue(hasattr(checker, "fetch_definition"), "missing meth")
- self.assertTrue(hasattr(checker, "fetch_eip_service_config"),
- "missing meth")
- self.assertTrue(hasattr(checker, "check_complete_eip_config"),
- "missing meth")
-
- def test_checker_should_actually_call_all_tests(self):
- checker = eipchecks.EIPConfigChecker(domain=self.provider)
-
- mc = Mock()
- checker.run_all(checker=mc)
- self.assertTrue(mc.check_default_eipconfig.called, "not called")
- self.assertTrue(mc.check_is_there_default_provider.called,
- "not called")
- self.assertTrue(mc.fetch_definition.called,
- "not called")
- self.assertTrue(mc.fetch_eip_service_config.called,
- "not called")
- self.assertTrue(mc.check_complete_eip_config.called,
- "not called")
-
- # test individual check methods
-
- def test_check_default_eipconfig(self):
- checker = eipchecks.EIPConfigChecker(domain=self.provider)
- # no eip config (empty home)
- eipconfig_path = checker.eipconfig.filename
- self.assertFalse(os.path.isfile(eipconfig_path))
- checker.check_default_eipconfig()
- # we've written one, so it should be there.
- self.assertTrue(os.path.isfile(eipconfig_path))
- with open(eipconfig_path, 'rb') as fp:
- deserialized = json.load(fp)
-
- # force re-evaluation of the paths
- # small workaround for evaluating home dirs correctly
- EIP_SAMPLE_CONFIG = copy.copy(testdata.EIP_SAMPLE_CONFIG)
- EIP_SAMPLE_CONFIG['openvpn_client_certificate'] = \
- eipspecs.client_cert_path(self.provider)
- EIP_SAMPLE_CONFIG['openvpn_ca_certificate'] = \
- eipspecs.provider_ca_path(self.provider)
- self.assertEqual(deserialized, EIP_SAMPLE_CONFIG)
-
- # TODO: shold ALSO run validation methods.
-
- def test_check_is_there_default_provider(self):
- checker = eipchecks.EIPConfigChecker(domain=self.provider)
- # we do dump a sample eip config, but lacking a
- # default provider entry.
- # This error will be possible catched in a different
- # place, when JSONConfig does validation of required fields.
-
- # passing direct config
- with self.assertRaises(eipexceptions.EIPMissingDefaultProvider):
- checker.check_is_there_default_provider(config={})
-
- # ok. now, messing with real files...
- # blank out default_provider
- sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
- sampleconfig['provider'] = None
- eipcfg_path = checker.eipconfig.filename
- mkdir_f(eipcfg_path)
- with open(eipcfg_path, 'w') as fp:
- json.dump(sampleconfig, fp)
- #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider):
- # XXX we should catch this as one of our errors, but do not
- # see how to do it quickly.
- with self.assertRaises(pluggableconfig.ValidationError):
- #import ipdb;ipdb.set_trace()
- checker.eipconfig.load(fromfile=eipcfg_path)
- checker.check_is_there_default_provider()
-
- sampleconfig = testdata.EIP_SAMPLE_CONFIG
- #eipcfg_path = checker._get_default_eipconfig_path()
- with open(eipcfg_path, 'w') as fp:
- json.dump(sampleconfig, fp)
- checker.eipconfig.load()
- self.assertTrue(checker.check_is_there_default_provider())
-
- def test_fetch_definition(self):
- with patch.object(requests, "get") as mocked_get:
- mocked_get.return_value.status_code = 200
- mocked_get.return_value.headers = {
- 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}
- mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION
- checker = eipchecks.EIPConfigChecker(fetcher=requests)
- sampleconfig = testdata.EIP_SAMPLE_CONFIG
- checker.fetch_definition(config=sampleconfig)
-
- fn = os.path.join(baseconfig.get_default_provider_path(),
- DEFINITION_EXPECTED_PATH)
- with open(fn, 'r') as fp:
- deserialized = json.load(fp)
- self.assertEqual(DEFAULT_PROVIDER_DEFINITION, deserialized)
-
- # XXX TODO check for ConnectionError, HTTPError, InvalidUrl
- # (and proper EIPExceptions are raised).
- # Look at base.test_config.
-
- def test_fetch_eip_service_config(self):
- with patch.object(requests, "get") as mocked_get:
- mocked_get.return_value.status_code = 200
- mocked_get.return_value.headers = {
- 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}
- mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE
- checker = eipchecks.EIPConfigChecker(fetcher=requests)
- sampleconfig = testdata.EIP_SAMPLE_CONFIG
- checker.fetch_eip_service_config(config=sampleconfig)
-
- def test_check_complete_eip_config(self):
- checker = eipchecks.EIPConfigChecker()
- with self.assertRaises(eipexceptions.EIPConfigurationError):
- sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
- sampleconfig['provider'] = None
- checker.check_complete_eip_config(config=sampleconfig)
- with self.assertRaises(eipexceptions.EIPConfigurationError):
- sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
- del sampleconfig['provider']
- checker.check_complete_eip_config(config=sampleconfig)
-
- # normal case
- sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
- checker.check_complete_eip_config(config=sampleconfig)
-
-
-class ProviderCertCheckerTest(BaseLeapTest):
-
- __name__ = "provider_cert_checker_tests"
- provider = "testprovider.example.org"
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- # test methods are there, and can be called from run_all
-
- def test_checker_should_implement_check_methods(self):
- checker = eipchecks.ProviderCertChecker()
-
- # For MVS+
- self.assertTrue(hasattr(checker, "download_ca_cert"),
- "missing meth")
- self.assertTrue(hasattr(checker, "download_ca_signature"),
- "missing meth")
- self.assertTrue(hasattr(checker, "get_ca_signatures"), "missing meth")
- self.assertTrue(hasattr(checker, "is_there_trust_path"),
- "missing meth")
-
- # For MVS
- self.assertTrue(hasattr(checker, "is_there_provider_ca"),
- "missing meth")
- self.assertTrue(hasattr(checker, "is_https_working"), "missing meth")
- self.assertTrue(hasattr(checker, "check_new_cert_needed"),
- "missing meth")
-
- def test_checker_should_actually_call_all_tests(self):
- checker = eipchecks.ProviderCertChecker()
-
- mc = Mock()
- checker.run_all(checker=mc)
- # XXX MVS+
- #self.assertTrue(mc.download_ca_cert.called, "not called")
- #self.assertTrue(mc.download_ca_signature.called, "not called")
- #self.assertTrue(mc.get_ca_signatures.called, "not called")
- #self.assertTrue(mc.is_there_trust_path.called, "not called")
-
- # For MVS
- self.assertTrue(mc.is_there_provider_ca.called, "not called")
- self.assertTrue(mc.is_https_working.called,
- "not called")
- self.assertTrue(mc.check_new_cert_needed.called,
- "not called")
-
- # test individual check methods
-
- @unittest.skip
- def test_is_there_provider_ca(self):
- # XXX commenting out this test.
- # With the generic client this does not make sense,
- # we should dump one there.
- # or test conductor logic.
- checker = eipchecks.ProviderCertChecker()
- self.assertTrue(
- checker.is_there_provider_ca())
-
-
-class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest):
- provider = "testprovider.example.org"
-
- class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
- responses = {
- '/': ['OK', ''],
- '/client.cert': [
- # XXX get sample cert
- '-----BEGIN CERTIFICATE-----',
- '-----END CERTIFICATE-----'],
- '/badclient.cert': [
- 'BADCERT']}
-
- def do_GET(self):
- path = urlparse.urlparse(self.path)
- message = '\n'.join(self.responses.get(
- path.path, None))
- self.send_response(200)
- self.end_headers()
- self.wfile.write(message)
-
- def test_is_https_working(self):
- fetcher = requests
- uri = "https://%s/" % (self.get_server())
- # bare requests call. this should just pass (if there is
- # an https service there).
- fetcher.get(uri, verify=False)
- checker = eipchecks.ProviderCertChecker(fetcher=fetcher)
- self.assertTrue(checker.is_https_working(uri=uri, verify=False))
-
- # for local debugs, when in doubt
- #self.assertTrue(checker.is_https_working(uri="https://github.com",
- #verify=True))
-
- # for the two checks below, I know they fail because no ca
- # cert is passed to them, and I know that's the error that
- # requests return with our implementation.
- # We're receiving this because our
- # server is dying prematurely when the handshake is interrupted on the
- # client side.
- # Since we have access to the server, we could check that
- # the error raised has been:
- # SSL23_READ_BYTES: alert bad certificate
- with self.assertRaises(requests.exceptions.SSLError) as exc:
- fetcher.get(uri, verify=True)
- self.assertTrue(
- "SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message)
-
- # XXX FIXME! Uncomment after #638 is done
- #with self.assertRaises(eipexceptions.EIPBadCertError) as exc:
- #checker.is_https_working(uri=uri, verify=True)
- #self.assertTrue(
- #"cert verification failed" in exc.message)
-
- # get cacert from testing.https_server
- cacert = where_cert('cacert.pem')
- fetcher.get(uri, verify=cacert)
- self.assertTrue(checker.is_https_working(uri=uri, verify=cacert))
-
- # same, but get cacert from leap.custom
- # XXX TODO!
-
- @unittest.skip
- def test_download_new_client_cert(self):
- # FIXME
- # Magick srp decorator broken right now...
- # Have to mock the decorator and inject something that
- # can bypass the authentication
-
- uri = "https://%s/client.cert" % (self.get_server())
- cacert = where_cert('cacert.pem')
- checker = eipchecks.ProviderCertChecker(domain=self.provider)
- credentials = "testuser", "testpassword"
- self.assertTrue(checker.download_new_client_cert(
- credentials=credentials, uri=uri, verify=cacert))
-
- # now download a malformed cert
- uri = "https://%s/badclient.cert" % (self.get_server())
- cacert = where_cert('cacert.pem')
- checker = eipchecks.ProviderCertChecker()
- with self.assertRaises(ValueError):
- self.assertTrue(checker.download_new_client_cert(
- credentials=credentials, uri=uri, verify=cacert))
-
- # did we write cert to its path?
- clientcertfile = eipspecs.client_cert_path()
- self.assertTrue(os.path.isfile(clientcertfile))
- certfile = eipspecs.client_cert_path()
- with open(certfile, 'r') as cf:
- certcontent = cf.read()
- self.assertEqual(certcontent,
- '\n'.join(
- self.request_handler.responses['/client.cert']))
- os.remove(clientcertfile)
-
- def test_is_cert_valid(self):
- checker = eipchecks.ProviderCertChecker()
- # TODO: better exception catching
- # should raise eipexceptions.BadClientCertificate, and give reasons
- # on msg.
- with self.assertRaises(Exception) as exc:
- self.assertFalse(checker.is_cert_valid())
- exc.message = "missing cert"
-
- def test_bad_validity_certs(self):
- checker = eipchecks.ProviderCertChecker()
- certfile = where_cert('leaptestscert.pem')
- self.assertFalse(checker.is_cert_not_expired(
- certfile=certfile,
- now=lambda: time.mktime((2038, 1, 1, 1, 1, 1, 1, 1, 1))))
- self.assertFalse(checker.is_cert_not_expired(
- certfile=certfile,
- now=lambda: time.mktime((1970, 1, 1, 1, 1, 1, 1, 1, 1))))
-
- def test_check_new_cert_needed(self):
- # check: missing cert
- checker = eipchecks.ProviderCertChecker(domain=self.provider)
- self.assertTrue(checker.check_new_cert_needed(skip_download=True))
- # TODO check: malformed cert
- # TODO check: expired cert
- # TODO check: pass test server uri instead of skip
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py
deleted file mode 100644
index 72ab3c8e..00000000
--- a/src/leap/eip/tests/test_config.py
+++ /dev/null
@@ -1,298 +0,0 @@
-from collections import OrderedDict
-import json
-import os
-import platform
-import stat
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-
-#from leap.base import constants
-#from leap.eip import config as eip_config
-#from leap import __branding as BRANDING
-from leap.eip import config as eipconfig
-from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE
-from leap.testing.basetest import BaseLeapTest
-from leap.util.fileutil import mkdir_p, mkdir_f
-
-_system = platform.system()
-
-#PROVIDER = BRANDING.get('provider_domain')
-#PROVIDER_SHORTNAME = BRANDING.get('short_name')
-
-
-class EIPConfigTest(BaseLeapTest):
-
- __name__ = "eip_config_tests"
- provider = "testprovider.example.org"
-
- maxDiff = None
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- #
- # helpers
- #
-
- def touch_exec(self):
- path = os.path.join(
- self.tempdir, 'bin')
- mkdir_p(path)
- tfile = os.path.join(
- path,
- 'openvpn')
- open(tfile, 'wb').close()
- os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
-
- def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None,
- gateways=None):
- conf = eipconfig.EIPServiceConfig()
- mkdir_f(conf.filename)
- if gateways:
- EIP_SAMPLE_SERVICE['gateways'] = gateways
- if vpnciphers:
- openvpnconfig = OrderedDict({
- "auth": "SHA1",
- "cipher": "AES-128-CBC",
- "tls-cipher": "DHE-RSA-AES128-SHA"})
- if extra_vpnopts:
- for k, v in extra_vpnopts.items():
- openvpnconfig[k] = v
- EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig
-
- with open(conf.filename, 'w') as fd:
- fd.write(json.dumps(EIP_SAMPLE_SERVICE))
-
- def write_sample_eipconfig(self):
- conf = eipconfig.EIPConfig()
- folder, f = os.path.split(conf.filename)
- if not os.path.isdir(folder):
- mkdir_p(folder)
- with open(conf.filename, 'w') as fd:
- fd.write(json.dumps(EIP_SAMPLE_CONFIG))
-
- def get_expected_openvpn_args(self, with_openvpn_ciphers=False):
- """
- yeah, this is almost as duplicating the
- code for building the command
- """
- args = []
- eipconf = eipconfig.EIPConfig(domain=self.provider)
- eipconf.load()
- eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
- eipsconf.load()
-
- username = self.get_username()
- groupname = self.get_groupname()
-
- args.append('--client')
- args.append('--dev')
- #does this have to be tap for win??
- args.append('tun')
- args.append('--persist-tun')
- args.append('--persist-key')
- args.append('--remote')
-
- args.append('%s' % eipconfig.get_eip_gateway(
- eipconfig=eipconf,
- eipserviceconfig=eipsconf))
- # XXX get port!?
- args.append('1194')
- # XXX get proto
- args.append('udp')
- args.append('--tls-client')
- args.append('--remote-cert-tls')
- args.append('server')
-
- if with_openvpn_ciphers:
- CIPHERS = [
- "--tls-cipher", "DHE-RSA-AES128-SHA",
- "--cipher", "AES-128-CBC",
- "--auth", "SHA1"]
- for opt in CIPHERS:
- args.append(opt)
-
- args.append('--user')
- args.append(username)
- args.append('--group')
- args.append(groupname)
- args.append('--management-client-user')
- args.append(username)
- args.append('--management-signal')
-
- args.append('--management')
- #XXX hey!
- #get platform switches here!
- args.append('/tmp/test.socket')
- args.append('unix')
-
- args.append('--script-security')
- args.append('2')
-
- if _system == "Linux":
- UPDOWN_SCRIPT = "/etc/leap/resolv-update"
- if os.path.isfile(UPDOWN_SCRIPT):
- args.append('--up')
- args.append('/etc/leap/resolv-update')
- args.append('--down')
- args.append('/etc/leap/resolv-update')
- args.append('--plugin')
- args.append('/usr/lib/openvpn/openvpn-down-root.so')
- args.append("'script_type=down /etc/leap/resolv-update'")
-
- # certs
- # XXX get values from specs?
- args.append('--cert')
- args.append(os.path.join(
- self.home,
- '.config', 'leap', 'providers',
- '%s' % self.provider,
- 'keys', 'client',
- 'openvpn.pem'))
- args.append('--key')
- args.append(os.path.join(
- self.home,
- '.config', 'leap', 'providers',
- '%s' % self.provider,
- 'keys', 'client',
- 'openvpn.pem'))
- args.append('--ca')
- args.append(os.path.join(
- self.home,
- '.config', 'leap', 'providers',
- '%s' % self.provider,
- 'keys', 'ca',
- 'cacert.pem'))
- return args
-
- # build command string
- # these tests are going to have to check
- # many combinations. we should inject some
- # params in the function call, to disable
- # some checks.
-
- def test_get_eip_gateway(self):
- self.write_sample_eipconfig()
- eipconf = eipconfig.EIPConfig(domain=self.provider)
-
- # default eipservice
- self.write_sample_eipservice()
- eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
-
- gateway = eipconfig.get_eip_gateway(
- eipconfig=eipconf,
- eipserviceconfig=eipsconf)
-
- # in spec is local gateway by default
- self.assertEqual(gateway, '127.0.0.1')
-
- # change eipservice
- # right now we only check that cluster == selected primary gw in
- # eip.json, and pick first matching ip
- eipconf._config.config['primary_gateway'] = "foo_provider"
- newgateways = [{"cluster": "foo_provider",
- "ip_address": "127.0.0.99"}]
- self.write_sample_eipservice(gateways=newgateways)
- eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
- # load from disk file
- eipsconf.load()
-
- gateway = eipconfig.get_eip_gateway(
- eipconfig=eipconf,
- eipserviceconfig=eipsconf)
- self.assertEqual(gateway, '127.0.0.99')
-
- # change eipservice, several gateways
- # right now we only check that cluster == selected primary gw in
- # eip.json, and pick first matching ip
- eipconf._config.config['primary_gateway'] = "bar_provider"
- newgateways = [{"cluster": "foo_provider",
- "ip_address": "127.0.0.99"},
- {'cluster': "bar_provider",
- "ip_address": "127.0.0.88"}]
- self.write_sample_eipservice(gateways=newgateways)
- eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
- # load from disk file
- eipsconf.load()
-
- gateway = eipconfig.get_eip_gateway(
- eipconfig=eipconf,
- eipserviceconfig=eipsconf)
- self.assertEqual(gateway, '127.0.0.88')
-
- def test_build_ovpn_command_empty_config(self):
- self.touch_exec()
- self.write_sample_eipservice()
- self.write_sample_eipconfig()
-
- from leap.eip import config as eipconfig
- from leap.util.fileutil import which
- path = os.environ['PATH']
- vpnbin = which('openvpn', path=path)
- #print 'path =', path
- #print 'vpnbin = ', vpnbin
- vpncommand, vpnargs = eipconfig.build_ovpn_command(
- do_pkexec_check=False, vpnbin=vpnbin,
- socket_path="/tmp/test.socket",
- provider=self.provider)
- self.assertEqual(vpncommand, self.home + '/bin/openvpn')
- self.assertEqual(vpnargs, self.get_expected_openvpn_args())
-
- def test_build_ovpn_command_openvpnoptions(self):
- self.touch_exec()
-
- from leap.eip import config as eipconfig
- from leap.util.fileutil import which
- path = os.environ['PATH']
- vpnbin = which('openvpn', path=path)
-
- self.write_sample_eipconfig()
-
- # regular run, everything normal
- self.write_sample_eipservice(vpnciphers=True)
- vpncommand, vpnargs = eipconfig.build_ovpn_command(
- do_pkexec_check=False, vpnbin=vpnbin,
- socket_path="/tmp/test.socket",
- provider=self.provider)
- self.assertEqual(vpncommand, self.home + '/bin/openvpn')
- expected = self.get_expected_openvpn_args(
- with_openvpn_ciphers=True)
- self.assertEqual(vpnargs, expected)
-
- # bad options -- illegal options
- self.write_sample_eipservice(
- vpnciphers=True,
- # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher
- extra_vpnopts={"notallowedconfig": "badvalue"})
- vpncommand, vpnargs = eipconfig.build_ovpn_command(
- do_pkexec_check=False, vpnbin=vpnbin,
- socket_path="/tmp/test.socket",
- provider=self.provider)
- self.assertEqual(vpncommand, self.home + '/bin/openvpn')
- expected = self.get_expected_openvpn_args(
- with_openvpn_ciphers=True)
- self.assertEqual(vpnargs, expected)
-
- # bad options -- illegal chars
- self.write_sample_eipservice(
- vpnciphers=True,
- # WE ONLY ALLOW A-Z09\-
- extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"})
- vpncommand, vpnargs = eipconfig.build_ovpn_command(
- do_pkexec_check=False, vpnbin=vpnbin,
- socket_path="/tmp/test.socket",
- provider=self.provider)
- self.assertEqual(vpncommand, self.home + '/bin/openvpn')
- expected = self.get_expected_openvpn_args(
- with_openvpn_ciphers=True)
- self.assertEqual(vpnargs, expected)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py
deleted file mode 100644
index 163f8d45..00000000
--- a/src/leap/eip/tests/test_eipconnection.py
+++ /dev/null
@@ -1,216 +0,0 @@
-import glob
-import logging
-import platform
-#import os
-import shutil
-
-logging.basicConfig()
-logger = logging.getLogger(name=__name__)
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-
-from mock import Mock, patch # MagicMock
-
-from leap.eip.eipconnection import EIPConnection
-from leap.eip.exceptions import ConnectionRefusedError
-from leap.eip import specs as eipspecs
-from leap.testing.basetest import BaseLeapTest
-
-_system = platform.system()
-
-PROVIDER = "testprovider.example.org"
-
-
-class NotImplementedError(Exception):
- pass
-
-
-@patch('OpenVPNConnection._get_or_create_config')
-@patch('OpenVPNConnection._set_ovpn_command')
-class MockedEIPConnection(EIPConnection):
-
- def _set_ovpn_command(self):
- self.command = "mock_command"
- self.args = [1, 2, 3]
-
-
-class EIPConductorTest(BaseLeapTest):
-
- __name__ = "eip_conductor_tests"
- provider = PROVIDER
-
- def setUp(self):
- # XXX there's a conceptual/design
- # mistake here.
- # If we're testing just attrs after init,
- # init shold not be doing so much side effects.
-
- # for instance:
- # We have to TOUCH a keys file because
- # we're triggerig the key checks FROM
- # the constructor. me not like that,
- # key checker should better be called explicitelly.
-
- # XXX change to keys_checker invocation
- # (see config_checker)
-
- keyfiles = (eipspecs.provider_ca_path(domain=self.provider),
- eipspecs.client_cert_path(domain=self.provider))
- for filepath in keyfiles:
- self.touch(filepath)
- self.chmod600(filepath)
-
- # we init the manager with only
- # some methods mocked
- self.manager = Mock(name="openvpnmanager_mock")
- self.con = MockedEIPConnection()
- self.con.provider = self.provider
-
- # XXX watch out. This sometimes is throwing the following error:
- # NoSuchProcess: process no longer exists (pid=6571)
- # because of a bad implementation of _check_if_running_instance
-
- self.con.run_openvpn_checks()
-
- def tearDown(self):
- pass
-
- def doCleanups(self):
- super(BaseLeapTest, self).doCleanups()
- self.cleanupSocketDir()
- del self.con
-
- def cleanupSocketDir(self):
- ptt = ('/tmp/leap-tmp*')
- for tmpdir in glob.glob(ptt):
- shutil.rmtree(tmpdir)
-
- #
- # tests
- #
-
- def test_vpnconnection_defaults(self):
- """
- default attrs as expected
- """
- con = self.con
- self.assertEqual(con.autostart, True)
- # XXX moar!
-
- def test_ovpn_command(self):
- """
- set_ovpn_command called
- """
- self.assertEqual(self.con.command,
- "mock_command")
- self.assertEqual(self.con.args,
- [1, 2, 3])
-
- # config checks
-
- def test_config_checked_called(self):
- # XXX this single test is taking half of the time
- # needed to run tests. (roughly 3 secs for this only)
- # We should modularize and inject Mocks on more places.
-
- oldcon = self.con
- del(self.con)
- config_checker = Mock()
- self.con = MockedEIPConnection(config_checker=config_checker)
- self.assertTrue(config_checker.called)
- self.con.run_checks()
- self.con.config_checker.run_all.assert_called_with(
- skip_download=False)
-
- # XXX test for cert_checker also
- self.con = oldcon
-
- # connect/disconnect calls
-
- def test_disconnect(self):
- """
- disconnect method calls private and changes status
- """
- self.con._disconnect = Mock(
- name="_disconnect")
-
- # first we set status to connected
- self.con.status.set_current(self.con.status.CONNECTED)
- self.assertEqual(self.con.status.current,
- self.con.status.CONNECTED)
-
- # disconnect
- self.con.terminate_openvpn_connection = Mock()
- self.con.disconnect()
- self.con.terminate_openvpn_connection.assert_called_once_with(
- shutdown=False)
- self.con.terminate_openvpn_connection = Mock()
- self.con.disconnect(shutdown=True)
- self.con.terminate_openvpn_connection.assert_called_once_with(
- shutdown=True)
-
- # new status should be disconnected
- # XXX this should evolve and check no errors
- # during disconnection
- self.assertEqual(self.con.status.current,
- self.con.status.DISCONNECTED)
-
- def test_connect(self):
- """
- connect calls _launch_openvpn private
- """
- self.con._launch_openvpn = Mock()
- self.con.connect()
- self.con._launch_openvpn.assert_called_once_with()
-
- # XXX tests breaking here ...
-
- def test_good_poll_connection_state(self):
- """
- """
- #@patch --
- # self.manager.get_connection_state
-
- #XXX review this set of poll_state tests
- #they SHOULD NOT NEED TO MOCK ANYTHING IN THE
- #lower layers!! -- status, vpn_manager..
- #right now we're testing implementation, not
- #behavior!!!
- good_state = ["1345466946", "unknown_state", "ok",
- "192.168.1.1", "192.168.1.100"]
- self.con.get_connection_state = Mock(return_value=good_state)
- self.con.status.set_vpn_state = Mock()
-
- state = self.con.poll_connection_state()
- good_state[1] = "disconnected"
- final_state = tuple(good_state)
- self.con.status.set_vpn_state.assert_called_with("unknown_state")
- self.assertEqual(state, final_state)
-
- # TODO between "good" and "bad" (exception raised) cases,
- # we can still test for malformed states and see that only good
- # states do have a change (and from only the expected transition
- # states).
-
- def test_bad_poll_connection_state(self):
- """
- get connection state raises ConnectionRefusedError
- state is None
- """
- self.con.get_connection_state = Mock(
- side_effect=ConnectionRefusedError('foo!'))
- state = self.con.poll_connection_state()
- self.assertEqual(state, None)
-
-
- # XXX more things to test:
- # - called config routines during initz.
- # - raising proper exceptions with no config
- # - called proper checks on config / permissions
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py
deleted file mode 100644
index 95bfb2f0..00000000
--- a/src/leap/eip/tests/test_openvpnconnection.py
+++ /dev/null
@@ -1,161 +0,0 @@
-import logging
-import os
-import platform
-import psutil
-import shutil
-#import socket
-
-logging.basicConfig()
-logger = logging.getLogger(name=__name__)
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-
-from mock import Mock, patch # MagicMock
-
-from leap.eip import config as eipconfig
-from leap.eip import openvpnconnection
-from leap.eip import exceptions as eipexceptions
-from leap.eip.udstelnet import UDSTelnet
-from leap.testing.basetest import BaseLeapTest
-
-_system = platform.system()
-
-
-class NotImplementedError(Exception):
- pass
-
-
-mock_UDSTelnet = Mock(spec=UDSTelnet)
-# XXX cautious!!!
-# this might be fragile right now (counting a global
-# reference of calls I think.
-# investigate this other form instead:
-# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop
-
-# XXX redo after merge-refactor
-
-
-@patch('openvpnconnection.OpenVPNConnection.connect_to_management')
-class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection):
- def __init__(self, *args, **kwargs):
- self.mock_UDSTelnet = Mock()
- super(MockedOpenVPNConnection, self).__init__(
- *args, **kwargs)
- self.tn = self.mock_UDSTelnet(self.host, self.port)
-
- def connect_to_management(self):
- #print 'patched connect'
- self.tn = mock_UDSTelnet(self.host, port=self.port)
-
-
-class OpenVPNConnectionTest(BaseLeapTest):
-
- __name__ = "vpnconnection_tests"
-
- def setUp(self):
- # XXX this will have to change for win, host=localhost
- host = eipconfig.get_socket_path()
- self.host = host
- self.manager = MockedOpenVPNConnection(host=host)
-
- def tearDown(self):
- pass
-
- def doCleanups(self):
- super(BaseLeapTest, self).doCleanups()
- self.cleanupSocketDir()
-
- def cleanupSocketDir(self):
- # remove the socket folder.
- # XXX only if posix. in win, host is localhost, so nothing
- # has to be done.
- if self.host:
- folder, fpath = os.path.split(self.host)
- try:
- assert folder.startswith('/tmp/leap-tmp') # safety check
- shutil.rmtree(folder)
- except:
- self.fail("could not remove temp file")
-
- del self.manager
-
- #
- # tests
- #
-
- def test_detect_vpn(self):
- # XXX review, not sure if captured all the logic
- # while fixing. kali.
- openvpn_connection = openvpnconnection.OpenVPNConnection()
-
- with patch.object(psutil, "process_iter") as mocked_psutil:
- mocked_process = Mock()
- mocked_process.name = "openvpn"
- mocked_process.cmdline = ["openvpn", "-foo", "-bar", "-gaaz"]
- mocked_psutil.return_value = [mocked_process]
- with self.assertRaises(eipexceptions.OpenVPNAlreadyRunning):
- openvpn_connection._check_if_running_instance()
-
- openvpn_connection._check_if_running_instance()
-
- @unittest.skipIf(_system == "Windows", "lin/mac only")
- def test_lin_mac_default_init(self):
- """
- check default host for management iface
- """
- self.assertTrue(self.manager.host.startswith('/tmp/leap-tmp'))
- self.assertEqual(self.manager.port, 'unix')
-
- @unittest.skipUnless(_system == "Windows", "win only")
- def test_win_default_init(self):
- """
- check default host for management iface
- """
- # XXX should we make the platform specific switch
- # here or in the vpn command string building?
- self.assertEqual(self.manager.host, 'localhost')
- self.assertEqual(self.manager.port, 7777)
-
- def test_port_types_init(self):
- oldmanager = self.manager
- self.manager = MockedOpenVPNConnection(port="42")
- self.assertEqual(self.manager.port, 42)
- self.manager = MockedOpenVPNConnection()
- self.assertEqual(self.manager.port, "unix")
- self.manager = MockedOpenVPNConnection(port="bad")
- self.assertEqual(self.manager.port, None)
- self.manager = oldmanager
-
- def test_uds_telnet_called_on_connect(self):
- self.manager.connect_to_management()
- mock_UDSTelnet.assert_called_with(
- self.manager.host,
- port=self.manager.port)
-
- @unittest.skip
- def test_connect(self):
- raise NotImplementedError
- # XXX calls close
- # calls UDSTelnet mock.
-
- # XXX
- # tests to write:
- # UDSTelnetTest (for real?)
- # HAVE A LOOK AT CORE TESTS FOR TELNETLIB.
- # very illustrative instead...
-
- # - raise MissingSocket
- # - raise ConnectionRefusedError
- # - test send command
- # - tries connect
- # - ... tries?
- # - ... calls _seek_to_eof
- # - ... read_until --> return value
- # - ...
-
-
-if __name__ == "__main__":
- unittest.main()