summaryrefslogtreecommitdiff
path: root/src/leap/eip/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/eip/tests')
-rw-r--r--src/leap/eip/tests/__init__.py0
-rw-r--r--src/leap/eip/tests/data.py51
-rw-r--r--src/leap/eip/tests/test_checks.py372
-rw-r--r--src/leap/eip/tests/test_config.py298
-rw-r--r--src/leap/eip/tests/test_eipconnection.py216
-rw-r--r--src/leap/eip/tests/test_openvpnconnection.py161
6 files changed, 1098 insertions, 0 deletions
diff --git a/src/leap/eip/tests/__init__.py b/src/leap/eip/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/eip/tests/__init__.py
diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py
new file mode 100644
index 00000000..a7fe1853
--- /dev/null
+++ b/src/leap/eip/tests/data.py
@@ -0,0 +1,51 @@
+from __future__ import unicode_literals
+import os
+
+#from leap import __branding
+
+# sample data used in tests
+
+#PROVIDER = __branding.get('provider_domain')
+PROVIDER = "testprovider.example.org"
+
+EIP_SAMPLE_CONFIG = {
+ "provider": "%s" % PROVIDER,
+ "transport": "openvpn",
+ "openvpn_protocol": "tcp",
+ "openvpn_port": 80,
+ "openvpn_ca_certificate": os.path.expanduser(
+ "~/.config/leap/providers/"
+ "%s/"
+ "keys/ca/cacert.pem" % PROVIDER),
+ "openvpn_client_certificate": os.path.expanduser(
+ "~/.config/leap/providers/"
+ "%s/"
+ "keys/client/openvpn.pem" % PROVIDER),
+ "connect_on_login": True,
+ "block_cleartext_traffic": True,
+ "primary_gateway": "location_unknown",
+ "secondary_gateway": "location_unknown2",
+ #"management_password": "oph7Que1othahwiech6J"
+}
+
+EIP_SAMPLE_SERVICE = {
+ "serial": 1,
+ "version": 1,
+ "clusters": [
+ {"label": {
+ "en": "Location Unknown"},
+ "name": "location_unknown"}
+ ],
+ "gateways": [
+ {"capabilities": {
+ "adblock": True,
+ "filter_dns": True,
+ "ports": ["80", "53", "443", "1194"],
+ "protocols": ["udp", "tcp"],
+ "transport": ["openvpn"],
+ "user_ips": False},
+ "cluster": "location_unknown",
+ "host": "location.example.org",
+ "ip_address": "192.0.43.10"}
+ ]
+}
diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py
new file mode 100644
index 00000000..f42a0eeb
--- /dev/null
+++ b/src/leap/eip/tests/test_checks.py
@@ -0,0 +1,372 @@
+from BaseHTTPServer import BaseHTTPRequestHandler
+import copy
+import json
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+import os
+import time
+import urlparse
+
+from mock import (patch, Mock)
+
+import requests
+
+from leap.base import config as baseconfig
+from leap.base import pluggableconfig
+from leap.base.constants import (DEFAULT_PROVIDER_DEFINITION,
+ DEFINITION_EXPECTED_PATH)
+from leap.eip import checks as eipchecks
+from leap.eip import specs as eipspecs
+from leap.eip import exceptions as eipexceptions
+from leap.eip.tests import data as testdata
+from leap.testing.basetest import BaseLeapTest
+from leap.testing.https_server import BaseHTTPSServerTestCase
+from leap.testing.https_server import where as where_cert
+from leap.util.fileutil import mkdir_f
+
+
+class NoLogRequestHandler:
+ def log_message(self, *args):
+ # don't write log msg to stderr
+ pass
+
+ def read(self, n=None):
+ return ''
+
+
+class EIPCheckTest(BaseLeapTest):
+
+ __name__ = "eip_check_tests"
+ provider = "testprovider.example.org"
+ maxDiff = None
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ # test methods are there, and can be called from run_all
+
+ def test_checker_should_implement_check_methods(self):
+ checker = eipchecks.EIPConfigChecker(domain=self.provider)
+
+ self.assertTrue(hasattr(checker, "check_default_eipconfig"),
+ "missing meth")
+ self.assertTrue(hasattr(checker, "check_is_there_default_provider"),
+ "missing meth")
+ self.assertTrue(hasattr(checker, "fetch_definition"), "missing meth")
+ self.assertTrue(hasattr(checker, "fetch_eip_service_config"),
+ "missing meth")
+ self.assertTrue(hasattr(checker, "check_complete_eip_config"),
+ "missing meth")
+
+ def test_checker_should_actually_call_all_tests(self):
+ checker = eipchecks.EIPConfigChecker(domain=self.provider)
+
+ mc = Mock()
+ checker.run_all(checker=mc)
+ self.assertTrue(mc.check_default_eipconfig.called, "not called")
+ self.assertTrue(mc.check_is_there_default_provider.called,
+ "not called")
+ self.assertTrue(mc.fetch_definition.called,
+ "not called")
+ self.assertTrue(mc.fetch_eip_service_config.called,
+ "not called")
+ self.assertTrue(mc.check_complete_eip_config.called,
+ "not called")
+
+ # test individual check methods
+
+ def test_check_default_eipconfig(self):
+ checker = eipchecks.EIPConfigChecker(domain=self.provider)
+ # no eip config (empty home)
+ eipconfig_path = checker.eipconfig.filename
+ self.assertFalse(os.path.isfile(eipconfig_path))
+ checker.check_default_eipconfig()
+ # we've written one, so it should be there.
+ self.assertTrue(os.path.isfile(eipconfig_path))
+ with open(eipconfig_path, 'rb') as fp:
+ deserialized = json.load(fp)
+
+ # force re-evaluation of the paths
+ # small workaround for evaluating home dirs correctly
+ EIP_SAMPLE_CONFIG = copy.copy(testdata.EIP_SAMPLE_CONFIG)
+ EIP_SAMPLE_CONFIG['openvpn_client_certificate'] = \
+ eipspecs.client_cert_path(self.provider)
+ EIP_SAMPLE_CONFIG['openvpn_ca_certificate'] = \
+ eipspecs.provider_ca_path(self.provider)
+ self.assertEqual(deserialized, EIP_SAMPLE_CONFIG)
+
+ # TODO: shold ALSO run validation methods.
+
+ def test_check_is_there_default_provider(self):
+ checker = eipchecks.EIPConfigChecker(domain=self.provider)
+ # we do dump a sample eip config, but lacking a
+ # default provider entry.
+ # This error will be possible catched in a different
+ # place, when JSONConfig does validation of required fields.
+
+ # passing direct config
+ with self.assertRaises(eipexceptions.EIPMissingDefaultProvider):
+ checker.check_is_there_default_provider(config={})
+
+ # ok. now, messing with real files...
+ # blank out default_provider
+ sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
+ sampleconfig['provider'] = None
+ eipcfg_path = checker.eipconfig.filename
+ mkdir_f(eipcfg_path)
+ with open(eipcfg_path, 'w') as fp:
+ json.dump(sampleconfig, fp)
+ #with self.assertRaises(eipexceptions.EIPMissingDefaultProvider):
+ # XXX we should catch this as one of our errors, but do not
+ # see how to do it quickly.
+ with self.assertRaises(pluggableconfig.ValidationError):
+ #import ipdb;ipdb.set_trace()
+ checker.eipconfig.load(fromfile=eipcfg_path)
+ checker.check_is_there_default_provider()
+
+ sampleconfig = testdata.EIP_SAMPLE_CONFIG
+ #eipcfg_path = checker._get_default_eipconfig_path()
+ with open(eipcfg_path, 'w') as fp:
+ json.dump(sampleconfig, fp)
+ checker.eipconfig.load()
+ self.assertTrue(checker.check_is_there_default_provider())
+
+ def test_fetch_definition(self):
+ with patch.object(requests, "get") as mocked_get:
+ mocked_get.return_value.status_code = 200
+ mocked_get.return_value.headers = {
+ 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}
+ mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION
+ checker = eipchecks.EIPConfigChecker(fetcher=requests)
+ sampleconfig = testdata.EIP_SAMPLE_CONFIG
+ checker.fetch_definition(config=sampleconfig)
+
+ fn = os.path.join(baseconfig.get_default_provider_path(),
+ DEFINITION_EXPECTED_PATH)
+ with open(fn, 'r') as fp:
+ deserialized = json.load(fp)
+ self.assertEqual(DEFAULT_PROVIDER_DEFINITION, deserialized)
+
+ # XXX TODO check for ConnectionError, HTTPError, InvalidUrl
+ # (and proper EIPExceptions are raised).
+ # Look at base.test_config.
+
+ def test_fetch_eip_service_config(self):
+ with patch.object(requests, "get") as mocked_get:
+ mocked_get.return_value.status_code = 200
+ mocked_get.return_value.headers = {
+ 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}
+ mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE
+ checker = eipchecks.EIPConfigChecker(fetcher=requests)
+ sampleconfig = testdata.EIP_SAMPLE_CONFIG
+ checker.fetch_eip_service_config(config=sampleconfig)
+
+ def test_check_complete_eip_config(self):
+ checker = eipchecks.EIPConfigChecker()
+ with self.assertRaises(eipexceptions.EIPConfigurationError):
+ sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
+ sampleconfig['provider'] = None
+ checker.check_complete_eip_config(config=sampleconfig)
+ with self.assertRaises(eipexceptions.EIPConfigurationError):
+ sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
+ del sampleconfig['provider']
+ checker.check_complete_eip_config(config=sampleconfig)
+
+ # normal case
+ sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
+ checker.check_complete_eip_config(config=sampleconfig)
+
+
+class ProviderCertCheckerTest(BaseLeapTest):
+
+ __name__ = "provider_cert_checker_tests"
+ provider = "testprovider.example.org"
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ # test methods are there, and can be called from run_all
+
+ def test_checker_should_implement_check_methods(self):
+ checker = eipchecks.ProviderCertChecker()
+
+ # For MVS+
+ self.assertTrue(hasattr(checker, "download_ca_cert"),
+ "missing meth")
+ self.assertTrue(hasattr(checker, "download_ca_signature"),
+ "missing meth")
+ self.assertTrue(hasattr(checker, "get_ca_signatures"), "missing meth")
+ self.assertTrue(hasattr(checker, "is_there_trust_path"),
+ "missing meth")
+
+ # For MVS
+ self.assertTrue(hasattr(checker, "is_there_provider_ca"),
+ "missing meth")
+ self.assertTrue(hasattr(checker, "is_https_working"), "missing meth")
+ self.assertTrue(hasattr(checker, "check_new_cert_needed"),
+ "missing meth")
+
+ def test_checker_should_actually_call_all_tests(self):
+ checker = eipchecks.ProviderCertChecker()
+
+ mc = Mock()
+ checker.run_all(checker=mc)
+ # XXX MVS+
+ #self.assertTrue(mc.download_ca_cert.called, "not called")
+ #self.assertTrue(mc.download_ca_signature.called, "not called")
+ #self.assertTrue(mc.get_ca_signatures.called, "not called")
+ #self.assertTrue(mc.is_there_trust_path.called, "not called")
+
+ # For MVS
+ self.assertTrue(mc.is_there_provider_ca.called, "not called")
+ self.assertTrue(mc.is_https_working.called,
+ "not called")
+ self.assertTrue(mc.check_new_cert_needed.called,
+ "not called")
+
+ # test individual check methods
+
+ @unittest.skip
+ def test_is_there_provider_ca(self):
+ # XXX commenting out this test.
+ # With the generic client this does not make sense,
+ # we should dump one there.
+ # or test conductor logic.
+ checker = eipchecks.ProviderCertChecker()
+ self.assertTrue(
+ checker.is_there_provider_ca())
+
+
+class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest):
+ provider = "testprovider.example.org"
+
+ class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
+ responses = {
+ '/': ['OK', ''],
+ '/client.cert': [
+ # XXX get sample cert
+ '-----BEGIN CERTIFICATE-----',
+ '-----END CERTIFICATE-----'],
+ '/badclient.cert': [
+ 'BADCERT']}
+
+ def do_GET(self):
+ path = urlparse.urlparse(self.path)
+ message = '\n'.join(self.responses.get(
+ path.path, None))
+ self.send_response(200)
+ self.end_headers()
+ self.wfile.write(message)
+
+ def test_is_https_working(self):
+ fetcher = requests
+ uri = "https://%s/" % (self.get_server())
+ # bare requests call. this should just pass (if there is
+ # an https service there).
+ fetcher.get(uri, verify=False)
+ checker = eipchecks.ProviderCertChecker(fetcher=fetcher)
+ self.assertTrue(checker.is_https_working(uri=uri, verify=False))
+
+ # for local debugs, when in doubt
+ #self.assertTrue(checker.is_https_working(uri="https://github.com",
+ #verify=True))
+
+ # for the two checks below, I know they fail because no ca
+ # cert is passed to them, and I know that's the error that
+ # requests return with our implementation.
+ # We're receiving this because our
+ # server is dying prematurely when the handshake is interrupted on the
+ # client side.
+ # Since we have access to the server, we could check that
+ # the error raised has been:
+ # SSL23_READ_BYTES: alert bad certificate
+ with self.assertRaises(requests.exceptions.SSLError) as exc:
+ fetcher.get(uri, verify=True)
+ self.assertTrue(
+ "SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message)
+
+ # XXX FIXME! Uncomment after #638 is done
+ #with self.assertRaises(eipexceptions.EIPBadCertError) as exc:
+ #checker.is_https_working(uri=uri, verify=True)
+ #self.assertTrue(
+ #"cert verification failed" in exc.message)
+
+ # get cacert from testing.https_server
+ cacert = where_cert('cacert.pem')
+ fetcher.get(uri, verify=cacert)
+ self.assertTrue(checker.is_https_working(uri=uri, verify=cacert))
+
+ # same, but get cacert from leap.custom
+ # XXX TODO!
+
+ @unittest.skip
+ def test_download_new_client_cert(self):
+ # FIXME
+ # Magick srp decorator broken right now...
+ # Have to mock the decorator and inject something that
+ # can bypass the authentication
+
+ uri = "https://%s/client.cert" % (self.get_server())
+ cacert = where_cert('cacert.pem')
+ checker = eipchecks.ProviderCertChecker(domain=self.provider)
+ credentials = "testuser", "testpassword"
+ self.assertTrue(checker.download_new_client_cert(
+ credentials=credentials, uri=uri, verify=cacert))
+
+ # now download a malformed cert
+ uri = "https://%s/badclient.cert" % (self.get_server())
+ cacert = where_cert('cacert.pem')
+ checker = eipchecks.ProviderCertChecker()
+ with self.assertRaises(ValueError):
+ self.assertTrue(checker.download_new_client_cert(
+ credentials=credentials, uri=uri, verify=cacert))
+
+ # did we write cert to its path?
+ clientcertfile = eipspecs.client_cert_path()
+ self.assertTrue(os.path.isfile(clientcertfile))
+ certfile = eipspecs.client_cert_path()
+ with open(certfile, 'r') as cf:
+ certcontent = cf.read()
+ self.assertEqual(certcontent,
+ '\n'.join(
+ self.request_handler.responses['/client.cert']))
+ os.remove(clientcertfile)
+
+ def test_is_cert_valid(self):
+ checker = eipchecks.ProviderCertChecker()
+ # TODO: better exception catching
+ # should raise eipexceptions.BadClientCertificate, and give reasons
+ # on msg.
+ with self.assertRaises(Exception) as exc:
+ self.assertFalse(checker.is_cert_valid())
+ exc.message = "missing cert"
+
+ def test_bad_validity_certs(self):
+ checker = eipchecks.ProviderCertChecker()
+ certfile = where_cert('leaptestscert.pem')
+ self.assertFalse(checker.is_cert_not_expired(
+ certfile=certfile,
+ now=lambda: time.mktime((2038, 1, 1, 1, 1, 1, 1, 1, 1))))
+ self.assertFalse(checker.is_cert_not_expired(
+ certfile=certfile,
+ now=lambda: time.mktime((1970, 1, 1, 1, 1, 1, 1, 1, 1))))
+
+ def test_check_new_cert_needed(self):
+ # check: missing cert
+ checker = eipchecks.ProviderCertChecker(domain=self.provider)
+ self.assertTrue(checker.check_new_cert_needed(skip_download=True))
+ # TODO check: malformed cert
+ # TODO check: expired cert
+ # TODO check: pass test server uri instead of skip
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py
new file mode 100644
index 00000000..72ab3c8e
--- /dev/null
+++ b/src/leap/eip/tests/test_config.py
@@ -0,0 +1,298 @@
+from collections import OrderedDict
+import json
+import os
+import platform
+import stat
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+#from leap.base import constants
+#from leap.eip import config as eip_config
+#from leap import __branding as BRANDING
+from leap.eip import config as eipconfig
+from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE
+from leap.testing.basetest import BaseLeapTest
+from leap.util.fileutil import mkdir_p, mkdir_f
+
+_system = platform.system()
+
+#PROVIDER = BRANDING.get('provider_domain')
+#PROVIDER_SHORTNAME = BRANDING.get('short_name')
+
+
+class EIPConfigTest(BaseLeapTest):
+
+ __name__ = "eip_config_tests"
+ provider = "testprovider.example.org"
+
+ maxDiff = None
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ #
+ # helpers
+ #
+
+ def touch_exec(self):
+ path = os.path.join(
+ self.tempdir, 'bin')
+ mkdir_p(path)
+ tfile = os.path.join(
+ path,
+ 'openvpn')
+ open(tfile, 'wb').close()
+ os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+
+ def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None,
+ gateways=None):
+ conf = eipconfig.EIPServiceConfig()
+ mkdir_f(conf.filename)
+ if gateways:
+ EIP_SAMPLE_SERVICE['gateways'] = gateways
+ if vpnciphers:
+ openvpnconfig = OrderedDict({
+ "auth": "SHA1",
+ "cipher": "AES-128-CBC",
+ "tls-cipher": "DHE-RSA-AES128-SHA"})
+ if extra_vpnopts:
+ for k, v in extra_vpnopts.items():
+ openvpnconfig[k] = v
+ EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig
+
+ with open(conf.filename, 'w') as fd:
+ fd.write(json.dumps(EIP_SAMPLE_SERVICE))
+
+ def write_sample_eipconfig(self):
+ conf = eipconfig.EIPConfig()
+ folder, f = os.path.split(conf.filename)
+ if not os.path.isdir(folder):
+ mkdir_p(folder)
+ with open(conf.filename, 'w') as fd:
+ fd.write(json.dumps(EIP_SAMPLE_CONFIG))
+
+ def get_expected_openvpn_args(self, with_openvpn_ciphers=False):
+ """
+ yeah, this is almost as duplicating the
+ code for building the command
+ """
+ args = []
+ eipconf = eipconfig.EIPConfig(domain=self.provider)
+ eipconf.load()
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+ eipsconf.load()
+
+ username = self.get_username()
+ groupname = self.get_groupname()
+
+ args.append('--client')
+ args.append('--dev')
+ #does this have to be tap for win??
+ args.append('tun')
+ args.append('--persist-tun')
+ args.append('--persist-key')
+ args.append('--remote')
+
+ args.append('%s' % eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf))
+ # XXX get port!?
+ args.append('1194')
+ # XXX get proto
+ args.append('udp')
+ args.append('--tls-client')
+ args.append('--remote-cert-tls')
+ args.append('server')
+
+ if with_openvpn_ciphers:
+ CIPHERS = [
+ "--tls-cipher", "DHE-RSA-AES128-SHA",
+ "--cipher", "AES-128-CBC",
+ "--auth", "SHA1"]
+ for opt in CIPHERS:
+ args.append(opt)
+
+ args.append('--user')
+ args.append(username)
+ args.append('--group')
+ args.append(groupname)
+ args.append('--management-client-user')
+ args.append(username)
+ args.append('--management-signal')
+
+ args.append('--management')
+ #XXX hey!
+ #get platform switches here!
+ args.append('/tmp/test.socket')
+ args.append('unix')
+
+ args.append('--script-security')
+ args.append('2')
+
+ if _system == "Linux":
+ UPDOWN_SCRIPT = "/etc/leap/resolv-update"
+ if os.path.isfile(UPDOWN_SCRIPT):
+ args.append('--up')
+ args.append('/etc/leap/resolv-update')
+ args.append('--down')
+ args.append('/etc/leap/resolv-update')
+ args.append('--plugin')
+ args.append('/usr/lib/openvpn/openvpn-down-root.so')
+ args.append("'script_type=down /etc/leap/resolv-update'")
+
+ # certs
+ # XXX get values from specs?
+ args.append('--cert')
+ args.append(os.path.join(
+ self.home,
+ '.config', 'leap', 'providers',
+ '%s' % self.provider,
+ 'keys', 'client',
+ 'openvpn.pem'))
+ args.append('--key')
+ args.append(os.path.join(
+ self.home,
+ '.config', 'leap', 'providers',
+ '%s' % self.provider,
+ 'keys', 'client',
+ 'openvpn.pem'))
+ args.append('--ca')
+ args.append(os.path.join(
+ self.home,
+ '.config', 'leap', 'providers',
+ '%s' % self.provider,
+ 'keys', 'ca',
+ 'cacert.pem'))
+ return args
+
+ # build command string
+ # these tests are going to have to check
+ # many combinations. we should inject some
+ # params in the function call, to disable
+ # some checks.
+
+ def test_get_eip_gateway(self):
+ self.write_sample_eipconfig()
+ eipconf = eipconfig.EIPConfig(domain=self.provider)
+
+ # default eipservice
+ self.write_sample_eipservice()
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+
+ gateway = eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf)
+
+ # in spec is local gateway by default
+ self.assertEqual(gateway, '127.0.0.1')
+
+ # change eipservice
+ # right now we only check that cluster == selected primary gw in
+ # eip.json, and pick first matching ip
+ eipconf._config.config['primary_gateway'] = "foo_provider"
+ newgateways = [{"cluster": "foo_provider",
+ "ip_address": "127.0.0.99"}]
+ self.write_sample_eipservice(gateways=newgateways)
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+ # load from disk file
+ eipsconf.load()
+
+ gateway = eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf)
+ self.assertEqual(gateway, '127.0.0.99')
+
+ # change eipservice, several gateways
+ # right now we only check that cluster == selected primary gw in
+ # eip.json, and pick first matching ip
+ eipconf._config.config['primary_gateway'] = "bar_provider"
+ newgateways = [{"cluster": "foo_provider",
+ "ip_address": "127.0.0.99"},
+ {'cluster': "bar_provider",
+ "ip_address": "127.0.0.88"}]
+ self.write_sample_eipservice(gateways=newgateways)
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+ # load from disk file
+ eipsconf.load()
+
+ gateway = eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf)
+ self.assertEqual(gateway, '127.0.0.88')
+
+ def test_build_ovpn_command_empty_config(self):
+ self.touch_exec()
+ self.write_sample_eipservice()
+ self.write_sample_eipconfig()
+
+ from leap.eip import config as eipconfig
+ from leap.util.fileutil import which
+ path = os.environ['PATH']
+ vpnbin = which('openvpn', path=path)
+ #print 'path =', path
+ #print 'vpnbin = ', vpnbin
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ self.assertEqual(vpnargs, self.get_expected_openvpn_args())
+
+ def test_build_ovpn_command_openvpnoptions(self):
+ self.touch_exec()
+
+ from leap.eip import config as eipconfig
+ from leap.util.fileutil import which
+ path = os.environ['PATH']
+ vpnbin = which('openvpn', path=path)
+
+ self.write_sample_eipconfig()
+
+ # regular run, everything normal
+ self.write_sample_eipservice(vpnciphers=True)
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ expected = self.get_expected_openvpn_args(
+ with_openvpn_ciphers=True)
+ self.assertEqual(vpnargs, expected)
+
+ # bad options -- illegal options
+ self.write_sample_eipservice(
+ vpnciphers=True,
+ # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher
+ extra_vpnopts={"notallowedconfig": "badvalue"})
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ expected = self.get_expected_openvpn_args(
+ with_openvpn_ciphers=True)
+ self.assertEqual(vpnargs, expected)
+
+ # bad options -- illegal chars
+ self.write_sample_eipservice(
+ vpnciphers=True,
+ # WE ONLY ALLOW A-Z09\-
+ extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"})
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ expected = self.get_expected_openvpn_args(
+ with_openvpn_ciphers=True)
+ self.assertEqual(vpnargs, expected)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py
new file mode 100644
index 00000000..163f8d45
--- /dev/null
+++ b/src/leap/eip/tests/test_eipconnection.py
@@ -0,0 +1,216 @@
+import glob
+import logging
+import platform
+#import os
+import shutil
+
+logging.basicConfig()
+logger = logging.getLogger(name=__name__)
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from mock import Mock, patch # MagicMock
+
+from leap.eip.eipconnection import EIPConnection
+from leap.eip.exceptions import ConnectionRefusedError
+from leap.eip import specs as eipspecs
+from leap.testing.basetest import BaseLeapTest
+
+_system = platform.system()
+
+PROVIDER = "testprovider.example.org"
+
+
+class NotImplementedError(Exception):
+ pass
+
+
+@patch('OpenVPNConnection._get_or_create_config')
+@patch('OpenVPNConnection._set_ovpn_command')
+class MockedEIPConnection(EIPConnection):
+
+ def _set_ovpn_command(self):
+ self.command = "mock_command"
+ self.args = [1, 2, 3]
+
+
+class EIPConductorTest(BaseLeapTest):
+
+ __name__ = "eip_conductor_tests"
+ provider = PROVIDER
+
+ def setUp(self):
+ # XXX there's a conceptual/design
+ # mistake here.
+ # If we're testing just attrs after init,
+ # init shold not be doing so much side effects.
+
+ # for instance:
+ # We have to TOUCH a keys file because
+ # we're triggerig the key checks FROM
+ # the constructor. me not like that,
+ # key checker should better be called explicitelly.
+
+ # XXX change to keys_checker invocation
+ # (see config_checker)
+
+ keyfiles = (eipspecs.provider_ca_path(domain=self.provider),
+ eipspecs.client_cert_path(domain=self.provider))
+ for filepath in keyfiles:
+ self.touch(filepath)
+ self.chmod600(filepath)
+
+ # we init the manager with only
+ # some methods mocked
+ self.manager = Mock(name="openvpnmanager_mock")
+ self.con = MockedEIPConnection()
+ self.con.provider = self.provider
+
+ # XXX watch out. This sometimes is throwing the following error:
+ # NoSuchProcess: process no longer exists (pid=6571)
+ # because of a bad implementation of _check_if_running_instance
+
+ self.con.run_openvpn_checks()
+
+ def tearDown(self):
+ pass
+
+ def doCleanups(self):
+ super(BaseLeapTest, self).doCleanups()
+ self.cleanupSocketDir()
+ del self.con
+
+ def cleanupSocketDir(self):
+ ptt = ('/tmp/leap-tmp*')
+ for tmpdir in glob.glob(ptt):
+ shutil.rmtree(tmpdir)
+
+ #
+ # tests
+ #
+
+ def test_vpnconnection_defaults(self):
+ """
+ default attrs as expected
+ """
+ con = self.con
+ self.assertEqual(con.autostart, True)
+ # XXX moar!
+
+ def test_ovpn_command(self):
+ """
+ set_ovpn_command called
+ """
+ self.assertEqual(self.con.command,
+ "mock_command")
+ self.assertEqual(self.con.args,
+ [1, 2, 3])
+
+ # config checks
+
+ def test_config_checked_called(self):
+ # XXX this single test is taking half of the time
+ # needed to run tests. (roughly 3 secs for this only)
+ # We should modularize and inject Mocks on more places.
+
+ oldcon = self.con
+ del(self.con)
+ config_checker = Mock()
+ self.con = MockedEIPConnection(config_checker=config_checker)
+ self.assertTrue(config_checker.called)
+ self.con.run_checks()
+ self.con.config_checker.run_all.assert_called_with(
+ skip_download=False)
+
+ # XXX test for cert_checker also
+ self.con = oldcon
+
+ # connect/disconnect calls
+
+ def test_disconnect(self):
+ """
+ disconnect method calls private and changes status
+ """
+ self.con._disconnect = Mock(
+ name="_disconnect")
+
+ # first we set status to connected
+ self.con.status.set_current(self.con.status.CONNECTED)
+ self.assertEqual(self.con.status.current,
+ self.con.status.CONNECTED)
+
+ # disconnect
+ self.con.terminate_openvpn_connection = Mock()
+ self.con.disconnect()
+ self.con.terminate_openvpn_connection.assert_called_once_with(
+ shutdown=False)
+ self.con.terminate_openvpn_connection = Mock()
+ self.con.disconnect(shutdown=True)
+ self.con.terminate_openvpn_connection.assert_called_once_with(
+ shutdown=True)
+
+ # new status should be disconnected
+ # XXX this should evolve and check no errors
+ # during disconnection
+ self.assertEqual(self.con.status.current,
+ self.con.status.DISCONNECTED)
+
+ def test_connect(self):
+ """
+ connect calls _launch_openvpn private
+ """
+ self.con._launch_openvpn = Mock()
+ self.con.connect()
+ self.con._launch_openvpn.assert_called_once_with()
+
+ # XXX tests breaking here ...
+
+ def test_good_poll_connection_state(self):
+ """
+ """
+ #@patch --
+ # self.manager.get_connection_state
+
+ #XXX review this set of poll_state tests
+ #they SHOULD NOT NEED TO MOCK ANYTHING IN THE
+ #lower layers!! -- status, vpn_manager..
+ #right now we're testing implementation, not
+ #behavior!!!
+ good_state = ["1345466946", "unknown_state", "ok",
+ "192.168.1.1", "192.168.1.100"]
+ self.con.get_connection_state = Mock(return_value=good_state)
+ self.con.status.set_vpn_state = Mock()
+
+ state = self.con.poll_connection_state()
+ good_state[1] = "disconnected"
+ final_state = tuple(good_state)
+ self.con.status.set_vpn_state.assert_called_with("unknown_state")
+ self.assertEqual(state, final_state)
+
+ # TODO between "good" and "bad" (exception raised) cases,
+ # we can still test for malformed states and see that only good
+ # states do have a change (and from only the expected transition
+ # states).
+
+ def test_bad_poll_connection_state(self):
+ """
+ get connection state raises ConnectionRefusedError
+ state is None
+ """
+ self.con.get_connection_state = Mock(
+ side_effect=ConnectionRefusedError('foo!'))
+ state = self.con.poll_connection_state()
+ self.assertEqual(state, None)
+
+
+ # XXX more things to test:
+ # - called config routines during initz.
+ # - raising proper exceptions with no config
+ # - called proper checks on config / permissions
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py
new file mode 100644
index 00000000..95bfb2f0
--- /dev/null
+++ b/src/leap/eip/tests/test_openvpnconnection.py
@@ -0,0 +1,161 @@
+import logging
+import os
+import platform
+import psutil
+import shutil
+#import socket
+
+logging.basicConfig()
+logger = logging.getLogger(name=__name__)
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from mock import Mock, patch # MagicMock
+
+from leap.eip import config as eipconfig
+from leap.eip import openvpnconnection
+from leap.eip import exceptions as eipexceptions
+from leap.eip.udstelnet import UDSTelnet
+from leap.testing.basetest import BaseLeapTest
+
+_system = platform.system()
+
+
+class NotImplementedError(Exception):
+ pass
+
+
+mock_UDSTelnet = Mock(spec=UDSTelnet)
+# XXX cautious!!!
+# this might be fragile right now (counting a global
+# reference of calls I think.
+# investigate this other form instead:
+# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop
+
+# XXX redo after merge-refactor
+
+
+@patch('openvpnconnection.OpenVPNConnection.connect_to_management')
+class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection):
+ def __init__(self, *args, **kwargs):
+ self.mock_UDSTelnet = Mock()
+ super(MockedOpenVPNConnection, self).__init__(
+ *args, **kwargs)
+ self.tn = self.mock_UDSTelnet(self.host, self.port)
+
+ def connect_to_management(self):
+ #print 'patched connect'
+ self.tn = mock_UDSTelnet(self.host, port=self.port)
+
+
+class OpenVPNConnectionTest(BaseLeapTest):
+
+ __name__ = "vpnconnection_tests"
+
+ def setUp(self):
+ # XXX this will have to change for win, host=localhost
+ host = eipconfig.get_socket_path()
+ self.host = host
+ self.manager = MockedOpenVPNConnection(host=host)
+
+ def tearDown(self):
+ pass
+
+ def doCleanups(self):
+ super(BaseLeapTest, self).doCleanups()
+ self.cleanupSocketDir()
+
+ def cleanupSocketDir(self):
+ # remove the socket folder.
+ # XXX only if posix. in win, host is localhost, so nothing
+ # has to be done.
+ if self.host:
+ folder, fpath = os.path.split(self.host)
+ try:
+ assert folder.startswith('/tmp/leap-tmp') # safety check
+ shutil.rmtree(folder)
+ except:
+ self.fail("could not remove temp file")
+
+ del self.manager
+
+ #
+ # tests
+ #
+
+ def test_detect_vpn(self):
+ # XXX review, not sure if captured all the logic
+ # while fixing. kali.
+ openvpn_connection = openvpnconnection.OpenVPNConnection()
+
+ with patch.object(psutil, "process_iter") as mocked_psutil:
+ mocked_process = Mock()
+ mocked_process.name = "openvpn"
+ mocked_process.cmdline = ["openvpn", "-foo", "-bar", "-gaaz"]
+ mocked_psutil.return_value = [mocked_process]
+ with self.assertRaises(eipexceptions.OpenVPNAlreadyRunning):
+ openvpn_connection._check_if_running_instance()
+
+ openvpn_connection._check_if_running_instance()
+
+ @unittest.skipIf(_system == "Windows", "lin/mac only")
+ def test_lin_mac_default_init(self):
+ """
+ check default host for management iface
+ """
+ self.assertTrue(self.manager.host.startswith('/tmp/leap-tmp'))
+ self.assertEqual(self.manager.port, 'unix')
+
+ @unittest.skipUnless(_system == "Windows", "win only")
+ def test_win_default_init(self):
+ """
+ check default host for management iface
+ """
+ # XXX should we make the platform specific switch
+ # here or in the vpn command string building?
+ self.assertEqual(self.manager.host, 'localhost')
+ self.assertEqual(self.manager.port, 7777)
+
+ def test_port_types_init(self):
+ oldmanager = self.manager
+ self.manager = MockedOpenVPNConnection(port="42")
+ self.assertEqual(self.manager.port, 42)
+ self.manager = MockedOpenVPNConnection()
+ self.assertEqual(self.manager.port, "unix")
+ self.manager = MockedOpenVPNConnection(port="bad")
+ self.assertEqual(self.manager.port, None)
+ self.manager = oldmanager
+
+ def test_uds_telnet_called_on_connect(self):
+ self.manager.connect_to_management()
+ mock_UDSTelnet.assert_called_with(
+ self.manager.host,
+ port=self.manager.port)
+
+ @unittest.skip
+ def test_connect(self):
+ raise NotImplementedError
+ # XXX calls close
+ # calls UDSTelnet mock.
+
+ # XXX
+ # tests to write:
+ # UDSTelnetTest (for real?)
+ # HAVE A LOOK AT CORE TESTS FOR TELNETLIB.
+ # very illustrative instead...
+
+ # - raise MissingSocket
+ # - raise ConnectionRefusedError
+ # - test send command
+ # - tries connect
+ # - ... tries?
+ # - ... calls _seek_to_eof
+ # - ... read_until --> return value
+ # - ...
+
+
+if __name__ == "__main__":
+ unittest.main()