summaryrefslogtreecommitdiff
path: root/src/leap/eip/tests
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-24 10:14:58 -0200
committerdrebs <drebs@leap.se>2012-12-24 10:14:58 -0200
commit319e279b59ac080779d0a3375ae4d6582f5ee6a3 (patch)
tree118dd0f495c0d54f2b2c66ea235e4e4e6b8cefd5 /src/leap/eip/tests
parentca5fb41a55e1292005ed186baf3710831d9ad678 (diff)
parenta7b091a0553e6120f3e0eb6d4e73a89732c589b2 (diff)
Merge branch 'develop' of ssh://code.leap.se/leap_client into develop
Diffstat (limited to 'src/leap/eip/tests')
-rw-r--r--src/leap/eip/tests/data.py33
-rw-r--r--src/leap/eip/tests/test_checks.py6
-rw-r--r--src/leap/eip/tests/test_config.py155
-rw-r--r--src/leap/eip/tests/test_eipconnection.py31
-rw-r--r--src/leap/eip/tests/test_openvpnconnection.py21
5 files changed, 211 insertions, 35 deletions
diff --git a/src/leap/eip/tests/data.py b/src/leap/eip/tests/data.py
index cadf720e..a7fe1853 100644
--- a/src/leap/eip/tests/data.py
+++ b/src/leap/eip/tests/data.py
@@ -23,26 +23,29 @@ EIP_SAMPLE_CONFIG = {
"keys/client/openvpn.pem" % PROVIDER),
"connect_on_login": True,
"block_cleartext_traffic": True,
- "primary_gateway": "turkey",
- "secondary_gateway": "france",
+ "primary_gateway": "location_unknown",
+ "secondary_gateway": "location_unknown2",
#"management_password": "oph7Que1othahwiech6J"
}
EIP_SAMPLE_SERVICE = {
"serial": 1,
- "version": "0.1.0",
- "capabilities": {
- "transport": ["openvpn"],
- "ports": ["80", "53"],
- "protocols": ["udp", "tcp"],
- "static_ips": True,
- "adblock": True
- },
+ "version": 1,
+ "clusters": [
+ {"label": {
+ "en": "Location Unknown"},
+ "name": "location_unknown"}
+ ],
"gateways": [
- {"country_code": "tr",
- "name": "turkey",
- "label": {"en":"Ankara, Turkey"},
- "capabilities": {},
- "hosts": ["192.0.43.10"]}
+ {"capabilities": {
+ "adblock": True,
+ "filter_dns": True,
+ "ports": ["80", "53", "443", "1194"],
+ "protocols": ["udp", "tcp"],
+ "transport": ["openvpn"],
+ "user_ips": False},
+ "cluster": "location_unknown",
+ "host": "location.example.org",
+ "ip_address": "192.0.43.10"}
]
}
diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py
index 1d7bfc17..ab11037a 100644
--- a/src/leap/eip/tests/test_checks.py
+++ b/src/leap/eip/tests/test_checks.py
@@ -25,6 +25,7 @@ from leap.eip.tests import data as testdata
from leap.testing.basetest import BaseLeapTest
from leap.testing.https_server import BaseHTTPSServerTestCase
from leap.testing.https_server import where as where_cert
+from leap.util.fileutil import mkdir_f
class NoLogRequestHandler:
@@ -118,6 +119,7 @@ class EIPCheckTest(BaseLeapTest):
sampleconfig = copy.copy(testdata.EIP_SAMPLE_CONFIG)
sampleconfig['provider'] = None
eipcfg_path = checker.eipconfig.filename
+ mkdir_f(eipcfg_path)
with open(eipcfg_path, 'w') as fp:
json.dump(sampleconfig, fp)
#with self.assertRaises(eipexceptions.EIPMissingDefaultProvider):
@@ -138,6 +140,8 @@ class EIPCheckTest(BaseLeapTest):
def test_fetch_definition(self):
with patch.object(requests, "get") as mocked_get:
mocked_get.return_value.status_code = 200
+ mocked_get.return_value.headers = {
+ 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}
mocked_get.return_value.json = DEFAULT_PROVIDER_DEFINITION
checker = eipchecks.EIPConfigChecker(fetcher=requests)
sampleconfig = testdata.EIP_SAMPLE_CONFIG
@@ -156,6 +160,8 @@ class EIPCheckTest(BaseLeapTest):
def test_fetch_eip_service_config(self):
with patch.object(requests, "get") as mocked_get:
mocked_get.return_value.status_code = 200
+ mocked_get.return_value.headers = {
+ 'last-modified': "Wed Dec 12 12:12:12 GMT 2012"}
mocked_get.return_value.json = testdata.EIP_SAMPLE_SERVICE
checker = eipchecks.EIPConfigChecker(fetcher=requests)
sampleconfig = testdata.EIP_SAMPLE_CONFIG
diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py
index 50538240..5977ef3c 100644
--- a/src/leap/eip/tests/test_config.py
+++ b/src/leap/eip/tests/test_config.py
@@ -1,3 +1,4 @@
+from collections import OrderedDict
import json
import os
import platform
@@ -10,11 +11,11 @@ except ImportError:
#from leap.base import constants
#from leap.eip import config as eip_config
-from leap import __branding as BRANDING
+#from leap import __branding as BRANDING
from leap.eip import config as eipconfig
from leap.eip.tests.data import EIP_SAMPLE_CONFIG, EIP_SAMPLE_SERVICE
from leap.testing.basetest import BaseLeapTest
-from leap.util.fileutil import mkdir_p
+from leap.util.fileutil import mkdir_p, mkdir_f
_system = platform.system()
@@ -47,11 +48,22 @@ class EIPConfigTest(BaseLeapTest):
open(tfile, 'wb').close()
os.chmod(tfile, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
- def write_sample_eipservice(self):
+ def write_sample_eipservice(self, vpnciphers=False, extra_vpnopts=None,
+ gateways=None):
conf = eipconfig.EIPServiceConfig()
- folder, f = os.path.split(conf.filename)
- if not os.path.isdir(folder):
- mkdir_p(folder)
+ mkdir_f(conf.filename)
+ if gateways:
+ EIP_SAMPLE_SERVICE['gateways'] = gateways
+ if vpnciphers:
+ openvpnconfig = OrderedDict({
+ "auth": "SHA1",
+ "cipher": "AES-128-CBC",
+ "tls-cipher": "DHE-RSA-AES128-SHA"})
+ if extra_vpnopts:
+ for k, v in extra_vpnopts.items():
+ openvpnconfig[k] = v
+ EIP_SAMPLE_SERVICE['openvpn_configuration'] = openvpnconfig
+
with open(conf.filename, 'w') as fd:
fd.write(json.dumps(EIP_SAMPLE_SERVICE))
@@ -63,8 +75,17 @@ class EIPConfigTest(BaseLeapTest):
with open(conf.filename, 'w') as fd:
fd.write(json.dumps(EIP_SAMPLE_CONFIG))
- def get_expected_openvpn_args(self):
+ def get_expected_openvpn_args(self, with_openvpn_ciphers=False):
+ """
+ yeah, this is almost as duplicating the
+ code for building the command
+ """
args = []
+ eipconf = eipconfig.EIPConfig(domain=self.provider)
+ eipconf.load()
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+ eipsconf.load()
+
username = self.get_username()
groupname = self.get_groupname()
@@ -75,8 +96,10 @@ class EIPConfigTest(BaseLeapTest):
args.append('--persist-tun')
args.append('--persist-key')
args.append('--remote')
+
args.append('%s' % eipconfig.get_eip_gateway(
- provider=self.provider))
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf))
# XXX get port!?
args.append('1194')
# XXX get proto
@@ -85,6 +108,14 @@ class EIPConfigTest(BaseLeapTest):
args.append('--remote-cert-tls')
args.append('server')
+ if with_openvpn_ciphers:
+ CIPHERS = [
+ "--tls-cipher", "DHE-RSA-AES128-SHA",
+ "--cipher", "AES-128-CBC",
+ "--auth", "SHA1"]
+ for opt in CIPHERS:
+ args.append(opt)
+
args.append('--user')
args.append(username)
args.append('--group')
@@ -130,6 +161,55 @@ class EIPConfigTest(BaseLeapTest):
# params in the function call, to disable
# some checks.
+ def test_get_eip_gateway(self):
+ self.write_sample_eipconfig()
+ eipconf = eipconfig.EIPConfig(domain=self.provider)
+
+ # default eipservice
+ self.write_sample_eipservice()
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+
+ gateway = eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf)
+
+ # in spec is local gateway by default
+ self.assertEqual(gateway, '127.0.0.1')
+
+ # change eipservice
+ # right now we only check that cluster == selected primary gw in
+ # eip.json, and pick first matching ip
+ eipconf._config.config['primary_gateway'] = "foo_provider"
+ newgateways = [{"cluster": "foo_provider",
+ "ip_address": "127.0.0.99"}]
+ self.write_sample_eipservice(gateways=newgateways)
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+ # load from disk file
+ eipsconf.load()
+
+ gateway = eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf)
+ self.assertEqual(gateway, '127.0.0.99')
+
+ # change eipservice, several gateways
+ # right now we only check that cluster == selected primary gw in
+ # eip.json, and pick first matching ip
+ eipconf._config.config['primary_gateway'] = "bar_provider"
+ newgateways = [{"cluster": "foo_provider",
+ "ip_address": "127.0.0.99"},
+ {'cluster': "bar_provider",
+ "ip_address": "127.0.0.88"}]
+ self.write_sample_eipservice(gateways=newgateways)
+ eipsconf = eipconfig.EIPServiceConfig(domain=self.provider)
+ # load from disk file
+ eipsconf.load()
+
+ gateway = eipconfig.get_eip_gateway(
+ eipconfig=eipconf,
+ eipserviceconfig=eipsconf)
+ self.assertEqual(gateway, '127.0.0.88')
+
def test_build_ovpn_command_empty_config(self):
self.touch_exec()
self.write_sample_eipservice()
@@ -139,14 +219,63 @@ class EIPConfigTest(BaseLeapTest):
from leap.util.fileutil import which
path = os.environ['PATH']
vpnbin = which('openvpn', path=path)
- print 'path =', path
- print 'vpnbin = ', vpnbin
- command, args = eipconfig.build_ovpn_command(
+ #print 'path =', path
+ #print 'vpnbin = ', vpnbin
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ self.assertEqual(vpnargs, self.get_expected_openvpn_args())
+
+ def test_build_ovpn_command_openvpnoptions(self):
+ self.touch_exec()
+
+ from leap.eip import config as eipconfig
+ from leap.util.fileutil import which
+ path = os.environ['PATH']
+ vpnbin = which('openvpn', path=path)
+
+ self.write_sample_eipconfig()
+
+ # regular run, everything normal
+ self.write_sample_eipservice(vpnciphers=True)
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ expected = self.get_expected_openvpn_args(
+ with_openvpn_ciphers=True)
+ self.assertEqual(vpnargs, expected)
+
+ # bad options -- illegal options
+ self.write_sample_eipservice(
+ vpnciphers=True,
+ # WE ONLY ALLOW vpn options in auth, cipher, tls-cipher
+ extra_vpnopts={"notallowedconfig": "badvalue"})
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
+ do_pkexec_check=False, vpnbin=vpnbin,
+ socket_path="/tmp/test.socket",
+ provider=self.provider)
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ expected = self.get_expected_openvpn_args(
+ with_openvpn_ciphers=True)
+ self.assertEqual(vpnargs, expected)
+
+ # bad options -- illegal chars
+ self.write_sample_eipservice(
+ vpnciphers=True,
+ # WE ONLY ALLOW A-Z09\-
+ extra_vpnopts={"cipher": "AES-128-CBC;FOOTHING"})
+ vpncommand, vpnargs = eipconfig.build_ovpn_command(
do_pkexec_check=False, vpnbin=vpnbin,
socket_path="/tmp/test.socket",
provider=self.provider)
- self.assertEqual(command, self.home + '/bin/openvpn')
- self.assertEqual(args, self.get_expected_openvpn_args())
+ self.assertEqual(vpncommand, self.home + '/bin/openvpn')
+ expected = self.get_expected_openvpn_args(
+ with_openvpn_ciphers=True)
+ self.assertEqual(vpnargs, expected)
if __name__ == "__main__":
diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py
index aefca36f..163f8d45 100644
--- a/src/leap/eip/tests/test_eipconnection.py
+++ b/src/leap/eip/tests/test_eipconnection.py
@@ -1,6 +1,8 @@
+import glob
import logging
import platform
-import os
+#import os
+import shutil
logging.basicConfig()
logger = logging.getLogger(name=__name__)
@@ -66,11 +68,26 @@ class EIPConductorTest(BaseLeapTest):
self.manager = Mock(name="openvpnmanager_mock")
self.con = MockedEIPConnection()
self.con.provider = self.provider
+
+ # XXX watch out. This sometimes is throwing the following error:
+ # NoSuchProcess: process no longer exists (pid=6571)
+ # because of a bad implementation of _check_if_running_instance
+
self.con.run_openvpn_checks()
def tearDown(self):
+ pass
+
+ def doCleanups(self):
+ super(BaseLeapTest, self).doCleanups()
+ self.cleanupSocketDir()
del self.con
+ def cleanupSocketDir(self):
+ ptt = ('/tmp/leap-tmp*')
+ for tmpdir in glob.glob(ptt):
+ shutil.rmtree(tmpdir)
+
#
# tests
#
@@ -81,6 +98,7 @@ class EIPConductorTest(BaseLeapTest):
"""
con = self.con
self.assertEqual(con.autostart, True)
+ # XXX moar!
def test_ovpn_command(self):
"""
@@ -98,6 +116,7 @@ class EIPConductorTest(BaseLeapTest):
# needed to run tests. (roughly 3 secs for this only)
# We should modularize and inject Mocks on more places.
+ oldcon = self.con
del(self.con)
config_checker = Mock()
self.con = MockedEIPConnection(config_checker=config_checker)
@@ -107,6 +126,7 @@ class EIPConductorTest(BaseLeapTest):
skip_download=False)
# XXX test for cert_checker also
+ self.con = oldcon
# connect/disconnect calls
@@ -123,9 +143,14 @@ class EIPConductorTest(BaseLeapTest):
self.con.status.CONNECTED)
# disconnect
- self.con.cleanup = Mock()
+ self.con.terminate_openvpn_connection = Mock()
self.con.disconnect()
- self.con.cleanup.assert_called_once_with()
+ self.con.terminate_openvpn_connection.assert_called_once_with(
+ shutdown=False)
+ self.con.terminate_openvpn_connection = Mock()
+ self.con.disconnect(shutdown=True)
+ self.con.terminate_openvpn_connection.assert_called_once_with(
+ shutdown=True)
# new status should be disconnected
# XXX this should evolve and check no errors
diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py
index 0f27facf..f7493567 100644
--- a/src/leap/eip/tests/test_openvpnconnection.py
+++ b/src/leap/eip/tests/test_openvpnconnection.py
@@ -58,16 +58,27 @@ class OpenVPNConnectionTest(BaseLeapTest):
def setUp(self):
# XXX this will have to change for win, host=localhost
host = eipconfig.get_socket_path()
+ self.host = host
self.manager = MockedOpenVPNConnection(host=host)
def tearDown(self):
+ pass
+
+ def doCleanups(self):
+ super(BaseLeapTest, self).doCleanups()
+ self.cleanupSocketDir()
+
+ def cleanupSocketDir(self):
# remove the socket folder.
# XXX only if posix. in win, host is localhost, so nothing
# has to be done.
- if self.manager.host:
- folder, fpath = os.path.split(self.manager.host)
- assert folder.startswith('/tmp/leap-tmp') # safety check
- shutil.rmtree(folder)
+ if self.host:
+ folder, fpath = os.path.split(self.host)
+ try:
+ assert folder.startswith('/tmp/leap-tmp') # safety check
+ shutil.rmtree(folder)
+ except:
+ self.fail("could not remove temp file")
del self.manager
@@ -108,12 +119,14 @@ class OpenVPNConnectionTest(BaseLeapTest):
self.assertEqual(self.manager.port, 7777)
def test_port_types_init(self):
+ oldmanager = self.manager
self.manager = MockedOpenVPNConnection(port="42")
self.assertEqual(self.manager.port, 42)
self.manager = MockedOpenVPNConnection()
self.assertEqual(self.manager.port, "unix")
self.manager = MockedOpenVPNConnection(port="bad")
self.assertEqual(self.manager.port, None)
+ self.manager = oldmanager
def test_uds_telnet_called_on_connect(self):
self.manager.connect_to_management()