summaryrefslogtreecommitdiff
path: root/src/leap
diff options
context:
space:
mode:
authorkali <kali@leap.se>2012-08-22 07:08:02 +0900
committerkali <kali@leap.se>2012-08-22 07:08:02 +0900
commitb5f7999e43e2c6504b43534e03bfc5a130dfac87 (patch)
treeac920902bc0e258559d0ffe88909eae182773f7e /src/leap
parent6ce22c7ebd293550473bfa5453a2f720dffad3e8 (diff)
parent3bd45c8e1e020bebf041bc266c5092a41f944130 (diff)
Merge branch 'refactor-tests' into refactor
Diffstat (limited to 'src/leap')
-rw-r--r--src/leap/eip/config.py46
-rw-r--r--src/leap/eip/openvpnconnection.py5
-rw-r--r--src/leap/eip/tests/test_config.py209
-rw-r--r--src/leap/eip/tests/test_eipconnection.py180
-rw-r--r--src/leap/eip/tests/test_openvpnconnection.py136
-rw-r--r--src/leap/util/tests/test_fileutil.py (renamed from src/leap/util/test_fileutil.py)13
-rw-r--r--src/leap/util/tests/test_leap_argparse.py (renamed from src/leap/util/test_leap_argparse.py)3
7 files changed, 566 insertions, 26 deletions
diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py
index e0151e87..c77bb142 100644
--- a/src/leap/eip/config.py
+++ b/src/leap/eip/config.py
@@ -10,16 +10,10 @@ from leap.util.fileutil import (which, mkdir_p,
check_and_fix_urw_only)
from leap.baseapp.permcheck import (is_pkexec_in_system,
is_auth_agent_running)
-from leap.eip import exceptions as eip_exceptions
logger = logging.getLogger(name=__name__)
logger.setLevel('DEBUG')
-# XXX this has to be REMOVED
-# and all these options passed in the
-# command line --> move to build_ovpn_command
-# issue #447
-
OPENVPN_CONFIG_TEMPLATE = """#Autogenerated by eip-client wizard
remote {VPN_REMOTE_HOST} {VPN_REMOTE_PORT}
@@ -115,6 +109,10 @@ def check_or_create_default_vpnconf(config):
# instead.
try:
+ # XXX by now, we're expecting
+ # only IP format for remote.
+ # We should allow also domain names,
+ # and make a reverse resolv.
remote_ip = config.get('provider',
'remote_ip')
validate_ip(remote_ip)
@@ -159,6 +157,15 @@ def check_or_create_default_vpnconf(config):
f.write(ovpn_config)
+def get_username():
+ return os.getlogin()
+
+
+def get_groupname():
+ gid = os.getgroups()[-1]
+ return grp.getgrgid(gid).gr_name
+
+
def build_ovpn_options(daemon=False):
"""
build a list of options
@@ -176,16 +183,11 @@ def build_ovpn_options(daemon=False):
# get user/group name
# also from config.
- user = os.getlogin()
- gid = os.getgroups()[-1]
- group = grp.getgrgid(gid).gr_name
+ user = get_username()
+ group = get_groupname()
opts = []
- #moved to config files
- #opts.append('--persist-tun')
- #opts.append('--persist-key')
-
# set user and group
opts.append('--user')
opts.append('%s' % user)
@@ -220,6 +222,8 @@ def build_ovpn_options(daemon=False):
opts.append('--config')
default_provider_path = get_default_provider_path()
+
+ # XXX get rid of config_file at all
ovpncnf = get_config_file(
'openvpn.conf',
folder=default_provider_path)
@@ -234,7 +238,7 @@ def build_ovpn_options(daemon=False):
return opts
-def build_ovpn_command(config, debug=False):
+def build_ovpn_command(config, debug=False, do_pkexec_check=True):
"""
build a string with the
complete openvpn invocation
@@ -252,17 +256,16 @@ def build_ovpn_command(config, debug=False):
if config.has_option('openvpn', 'use_pkexec'):
use_pkexec = config.get('openvpn', 'use_pkexec')
- if platform.system() == "Linux" and use_pkexec:
+ if platform.system() == "Linux" and use_pkexec and do_pkexec_check:
# XXX check for both pkexec (done)
# AND a suitable authentication
# agent running.
- # (until we implement setuid helper)
logger.info('use_pkexec set to True')
if not is_pkexec_in_system():
logger.error('no pkexec in system')
- raise eip_exceptions.EIPNoPkexecAvailable
+ raise EIPNoPkexecAvailable
if not is_auth_agent_running():
logger.warning(
@@ -270,7 +273,7 @@ def build_ovpn_command(config, debug=False):
"pkexec will use its own text "
"based authentication agent. "
"that's probably a bad idea")
- raise eip_exceptions.EIPNoPolkitAuthAgentAvailable
+ raise EIPNoPolkitAuthAgentAvailable
command.append('pkexec')
@@ -284,7 +287,11 @@ def build_ovpn_command(config, debug=False):
'openvpn_binary')
if ovpn:
- command.append(ovpn)
+ vpn_command = ovpn
+ else:
+ vpn_command = "openvpn"
+
+ command.append(vpn_command)
daemon_mode = not debug
@@ -292,6 +299,7 @@ def build_ovpn_command(config, debug=False):
command.append(opt)
# XXX check len and raise proper error
+
return [command[0], command[1:]]
diff --git a/src/leap/eip/openvpnconnection.py b/src/leap/eip/openvpnconnection.py
index a230d229..3972b617 100644
--- a/src/leap/eip/openvpnconnection.py
+++ b/src/leap/eip/openvpnconnection.py
@@ -85,9 +85,12 @@ to be triggered for each one of them.
self.host = host
if isinstance(port, str) and port.isdigit():
port = int(port)
+ elif port == "unix":
+ port = "unix"
+ else:
+ port = None
self.port = port
self.password = password
- #self.tn = None
def _set_autostart(self):
config = self.config
diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py
new file mode 100644
index 00000000..11433777
--- /dev/null
+++ b/src/leap/eip/tests/test_config.py
@@ -0,0 +1,209 @@
+import ConfigParser
+import os
+import platform
+import shutil
+import socket
+import tempfile
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.eip import config
+
+_system = platform.system()
+
+
+class NotImplementedError(Exception):
+ pass
+
+# XXX use mock_open here?
+
+
+class EIPConfigTest(unittest.TestCase):
+
+ __name__ = "eip_config_tests"
+
+ def setUp(self):
+ self.old_path = os.environ['PATH']
+
+ self.tdir = tempfile.mkdtemp()
+
+ bin_tdir = os.path.join(
+ self.tdir,
+ 'bin')
+ os.mkdir(bin_tdir)
+ os.environ['PATH'] = bin_tdir
+
+ def tearDown(self):
+ os.environ['PATH'] = self.old_path
+ shutil.rmtree(self.tdir)
+ #
+ # helpers
+ #
+
+ def get_username(self):
+ return config.get_username()
+
+ def get_groupname(self):
+ return config.get_groupname()
+
+ def _missing_test_for_plat(self, do_raise=False):
+ if do_raise:
+ raise NotImplementedError(
+ "This test is not implemented "
+ "for the running platform: %s" %
+ _system)
+
+ def touch_exec(self):
+ tfile = os.path.join(
+ self.tdir,
+ 'bin',
+ 'openvpn')
+ open(tfile, 'bw').close()
+
+ def get_empty_config(self):
+ _config = ConfigParser.ConfigParser()
+ return _config
+
+ def get_minimal_config(self):
+ _config = ConfigParser.ConfigParser()
+ return _config
+
+ def get_expected_openvpn_args(self):
+ args = []
+ username = self.get_username()
+ groupname = self.get_groupname()
+
+ args.append('--user')
+ args.append(username)
+ args.append('--group')
+ args.append(groupname)
+ args.append('--management-client-user')
+ args.append(username)
+ args.append('--management-signal')
+ args.append('--management')
+
+ #XXX hey!
+ #get platform switches here!
+ args.append('/tmp/.eip.sock')
+ args.append('unix')
+ args.append('--config')
+ #XXX bad assumption. FIXME: expand $HOME
+ args.append('/home/%s/.config/leap/providers/default/openvpn.conf' %
+ username)
+ return args
+
+ #
+ # tests
+ #
+
+ # XXX fixme! /home/user should
+ # be replaced for proper home lookup.
+
+ @unittest.skipUnless(_system == "Linux", "linux only")
+ def test_lin_get_config_file(self):
+ """
+ config file path where expected? (linux)
+ """
+ self.assertEqual(
+ config.get_config_file(
+ 'test', folder="foo/bar"),
+ '/home/%s/.config/leap/foo/bar/test' %
+ self.get_username())
+
+ @unittest.skipUnless(_system == "Darwin", "mac only")
+ def test_mac_get_config_file(self):
+ """
+ config file path where expected? (mac)
+ """
+ self._missing_test_for_plat(do_raise=True)
+
+ @unittest.skipUnless(_system == "Windows", "win only")
+ def test_win_get_config_file(self):
+ """
+ config file path where expected?
+ """
+ self._missing_test_for_plat(do_raise=True)
+
+ #
+ # XXX hey, I'm raising exceptions here
+ # on purpose. just wanted to make sure
+ # that the skip stuff is doing it right.
+ # If you're working on win/macos tests,
+ # feel free to remove tests that you see
+ # are too redundant.
+
+ @unittest.skipUnless(_system == "Linux", "linux only")
+ def test_lin_get_config_dir(self):
+ """
+ nice config dir? (linux)
+ """
+ self.assertEqual(
+ config.get_config_dir(),
+ '/home/%s/.config/leap' %
+ self.get_username())
+
+ @unittest.skipUnless(_system == "Darwin", "mac only")
+ def test_mac_get_config_dir(self):
+ """
+ nice config dir? (mac)
+ """
+ self._missing_test_for_plat(do_raise=True)
+
+ @unittest.skipUnless(_system == "Windows", "win only")
+ def test_win_get_config_dir(self):
+ """
+ nice config dir? (win)
+ """
+ self._missing_test_for_plat(do_raise=True)
+
+ # provider paths
+
+ @unittest.skipUnless(_system == "Linux", "linux only")
+ def test_get_default_provider_path(self):
+ """
+ is default provider path ok?
+ """
+ self.assertEqual(
+ config.get_default_provider_path(),
+ '/home/%s/.config/leap/providers/default/' %
+ self.get_username())
+
+ # validate ip
+
+ def test_validate_ip(self):
+ """
+ check our ip validation
+ """
+ config.validate_ip('3.3.3.3')
+ with self.assertRaises(socket.error):
+ config.validate_ip('255.255.255.256')
+ with self.assertRaises(socket.error):
+ config.validate_ip('foobar')
+
+ @unittest.skip
+ def test_validate_domain(self):
+ """
+ code to be written yet
+ """
+ pass
+
+ # build command string
+ # these tests are going to have to check
+ # many combinations. we should inject some
+ # params in the function call, to disable
+ # some checks.
+
+ def test_build_ovpn_command_empty_config(self):
+ _config = self.get_empty_config()
+ command, args = config.build_ovpn_command(
+ _config,
+ do_pkexec_check=False)
+ self.assertEqual(command, 'openvpn')
+ self.assertEqual(args, self.get_expected_openvpn_args())
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py
new file mode 100644
index 00000000..51772b7c
--- /dev/null
+++ b/src/leap/eip/tests/test_eipconnection.py
@@ -0,0 +1,180 @@
+import ConfigParser
+import logging
+import platform
+
+logging.basicConfig()
+logger = logging.getLogger(name=__name__)
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from mock import Mock, patch # MagicMock
+
+from leap.eip.eipconnection import EIPConnection
+from leap.eip.exceptions import ConnectionRefusedError
+
+_system = platform.system()
+
+
+class NotImplementedError(Exception):
+ pass
+
+
+@patch('OpenVPNConnection._get_or_create_config')
+@patch('OpenVPNConnection._set_ovpn_command')
+class MockedEIPConnection(EIPConnection):
+ def _get_or_create_config(self):
+ self.config = ConfigParser.ConfigParser()
+ self._set_ovpn_command()
+
+ def _set_ovpn_command(self):
+ self.command = "mock_command"
+ self.args = [1, 2, 3]
+
+
+class EIPConductorTest(unittest.TestCase):
+
+ __name__ = "eip_conductor_tests"
+
+ def setUp(self):
+ self.manager = Mock(
+ name="openvpnmanager_mock")
+
+ self.con = MockedEIPConnection()
+ #manager=self.manager)
+
+ def tearDown(self):
+ del self.con
+
+ #
+ # helpers
+ #
+
+ def _missing_test_for_plat(self, do_raise=False):
+ if do_raise:
+ raise NotImplementedError(
+ "This test is not implemented "
+ "for the running platform: %s" %
+ _system)
+
+ #
+ # tests
+ #
+
+ @unittest.skip
+ #ain't manager anymore!
+ def test_manager_was_initialized(self):
+ """
+ manager init ok during conductor init?
+ """
+ self.manager.assert_called_once_with()
+
+ def test_vpnconnection_defaults(self):
+ """
+ default attrs as expected
+ """
+ con = self.con
+ self.assertEqual(con.autostart, True)
+ self.assertEqual(con.missing_pkexec, False)
+ self.assertEqual(con.missing_vpn_keyfile, False)
+ self.assertEqual(con.missing_provider, False)
+ self.assertEqual(con.bad_provider, False)
+
+ def test_config_was_init(self):
+ """
+ is there a config object?
+ """
+ self.assertTrue(isinstance(self.con.config,
+ ConfigParser.ConfigParser))
+
+ def test_ovpn_command(self):
+ """
+ set_ovpn_command called
+ """
+ self.assertEqual(self.con.command,
+ "mock_command")
+ self.assertEqual(self.con.args,
+ [1, 2, 3])
+
+ # connect/disconnect calls
+
+ def test_disconnect(self):
+ """
+ disconnect method calls private and changes status
+ """
+ self.con._disconnect = Mock(
+ name="_disconnect")
+
+ # first we set status to connected
+ self.con.status.set_current(self.con.status.CONNECTED)
+ self.assertEqual(self.con.status.current,
+ self.con.status.CONNECTED)
+
+ # disconnect
+ self.con.disconnect()
+ self.con._disconnect.assert_called_once_with()
+
+ # new status should be disconnected
+ # XXX this should evolve and check no errors
+ # during disconnection
+ self.assertEqual(self.con.status.current,
+ self.con.status.DISCONNECTED)
+
+ def test_connect(self):
+ """
+ connect calls _launch_openvpn private
+ """
+ self.con._launch_openvpn = Mock()
+ self.con.connect()
+ self.con._launch_openvpn.assert_called_once_with()
+
+ # XXX tests breaking here ...
+
+ def test_good_poll_connection_state(self):
+ """
+ """
+ #@patch --
+ # self.manager.get_connection_state
+
+ #XXX review this set of poll_state tests
+ #they SHOULD NOT NEED TO MOCK ANYTHING IN THE
+ #lower layers!! -- status, vpn_manager..
+ #right now we're testing implementation, not
+ #behavior!!!
+ good_state = ["1345466946", "unknown_state", "ok",
+ "192.168.1.1", "192.168.1.100"]
+ self.con.get_connection_state = Mock(return_value=good_state)
+ self.con.status.set_vpn_state = Mock()
+
+ state = self.con.poll_connection_state()
+ good_state[1] = "disconnected"
+ final_state = tuple(good_state)
+ self.con.status.set_vpn_state.assert_called_with("unknown_state")
+ self.assertEqual(state, final_state)
+
+ # TODO between "good" and "bad" (exception raised) cases,
+ # we can still test for malformed states and see that only good
+ # states do have a change (and from only the expected transition
+ # states).
+
+ def test_bad_poll_connection_state(self):
+ """
+ get connection state raises ConnectionRefusedError
+ state is None
+ """
+ self.con.get_connection_state = Mock(
+ side_effect=ConnectionRefusedError('foo!'))
+ state = self.con.poll_connection_state()
+ self.assertEqual(state, None)
+
+
+ # XXX more things to test:
+ # - called config routines during initz.
+ # - raising proper exceptions with no config
+ # - called proper checks on config / permissions
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py
new file mode 100644
index 00000000..dea75b55
--- /dev/null
+++ b/src/leap/eip/tests/test_openvpnconnection.py
@@ -0,0 +1,136 @@
+import logging
+import platform
+#import socket
+
+logging.basicConfig()
+logger = logging.getLogger(name=__name__)
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from mock import Mock, patch # MagicMock
+
+from leap.eip import openvpnconnection
+from leap.eip import exceptions as eip_exceptions
+from leap.eip.udstelnet import UDSTelnet
+
+_system = platform.system()
+
+
+class NotImplementedError(Exception):
+ pass
+
+
+mock_UDSTelnet = Mock(spec=UDSTelnet)
+# XXX cautious!!!
+# this might be fragile right now (counting a global
+# reference of calls I think.
+# investigate this other form instead:
+# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop
+
+# XXX redo after merge-refactor
+
+
+@patch('openvpnconnection.OpenVPNConnection.connect_to_management')
+class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection):
+ def __init__(self, *args, **kwargs):
+ self.mock_UDSTelnet = Mock()
+ super(MockedOpenVPNConnection, self).__init__(
+ *args, **kwargs)
+ self.tn = self.mock_UDSTelnet(self.host, self.port)
+
+ def connect_to_management(self):
+ #print 'patched connect'
+ self.tn = mock_UDSTelnet(self.host, port=self.port)
+
+
+class OpenVPNConnectionTest(unittest.TestCase):
+
+ __name__ = "vpnconnection_tests"
+
+ def setUp(self):
+ self.manager = MockedOpenVPNConnection()
+
+ def tearDown(self):
+ del self.manager
+
+ #
+ # helpers
+ #
+
+ # XXX hey, refactor this to basetestclass
+
+ def _missing_test_for_plat(self, do_raise=False):
+ if do_raise:
+ raise NotImplementedError(
+ "This test is not implemented "
+ "for the running platform: %s" %
+ _system)
+
+ #
+ # tests
+ #
+
+ @unittest.skipIf(_system == "Windows", "lin/mac only")
+ def test_lin_mac_default_init(self):
+ """
+ check default host for management iface
+ """
+ self.assertEqual(self.manager.host, '/tmp/.eip.sock')
+ self.assertEqual(self.manager.port, 'unix')
+
+ @unittest.skipUnless(_system == "Windows", "win only")
+ def test_win_default_init(self):
+ """
+ check default host for management iface
+ """
+ # XXX should we make the platform specific switch
+ # here or in the vpn command string building?
+ self.assertEqual(self.manager.host, 'localhost')
+ self.assertEqual(self.manager.port, 7777)
+
+ def test_port_types_init(self):
+ self.manager = MockedOpenVPNConnection(port="42")
+ self.assertEqual(self.manager.port, 42)
+ self.manager = MockedOpenVPNConnection()
+ self.assertEqual(self.manager.port, "unix")
+ self.manager = MockedOpenVPNConnection(port="bad")
+ self.assertEqual(self.manager.port, None)
+
+ def test_connect_raises_missing_socket(self):
+ self.manager = openvpnconnection.OpenVPNConnection()
+ with self.assertRaises(eip_exceptions.MissingSocketError):
+ self.manager.connect_to_management()
+
+ def test_uds_telnet_called_on_connect(self):
+ self.manager.connect_to_management()
+ mock_UDSTelnet.assert_called_with(
+ self.manager.host,
+ port=self.manager.port)
+
+ @unittest.skip
+ def test_connect(self):
+ raise NotImplementedError
+ # XXX calls close
+ # calls UDSTelnet mock.
+
+ # XXX
+ # tests to write:
+ # UDSTelnetTest (for real?)
+ # HAVE A LOOK AT CORE TESTS FOR TELNETLIB.
+ # very illustrative instead...
+
+ # - raise MissingSocket
+ # - raise ConnectionRefusedError
+ # - test send command
+ # - tries connect
+ # - ... tries?
+ # - ... calls _seek_to_eof
+ # - ... read_until --> return value
+ # - ...
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/util/test_fileutil.py b/src/leap/util/tests/test_fileutil.py
index 849decaf..f5131b3d 100644
--- a/src/leap/util/test_fileutil.py
+++ b/src/leap/util/tests/test_fileutil.py
@@ -52,8 +52,7 @@ class FileUtilTest(unittest.TestCase):
def test_is_user_executable(self):
"""
- test that a 700 file
- is an 700 file. kindda oximoronic, but...
+ touch_exec_file creates in mode 700?
"""
# XXX could check access X_OK
@@ -63,10 +62,10 @@ class FileUtilTest(unittest.TestCase):
def test_which(self):
"""
+ which implementation ok?
not a very reliable test,
but I cannot think of anything smarter now
I guess it's highly improbable that copy
- command is somewhere else..?
"""
# XXX yep, we can change the syspath
# for the test... !
@@ -78,7 +77,7 @@ class FileUtilTest(unittest.TestCase):
def test_mkdir_p(self):
"""
- test our mkdir -p implementation
+ our own mkdir -p implementation ok?
"""
testdir = self.get_file_path(
os.path.join('test', 'foo', 'bar'))
@@ -88,8 +87,7 @@ class FileUtilTest(unittest.TestCase):
def test_check_and_fix_urw_only(self):
"""
- test function that fixes perms on
- files that should be rw only for owner
+ ensure check_and_fix_urx_only ok?
"""
fp = self.touch_exec_file()
mode = self.get_mode(fp)
@@ -97,3 +95,6 @@ class FileUtilTest(unittest.TestCase):
fileutil.check_and_fix_urw_only(fp)
mode = self.get_mode(fp)
self.assertEqual(mode, int('600', 8))
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/util/test_leap_argparse.py b/src/leap/util/tests/test_leap_argparse.py
index 1442e827..f4c86e36 100644
--- a/src/leap/util/test_leap_argparse.py
+++ b/src/leap/util/tests/test_leap_argparse.py
@@ -25,3 +25,6 @@ class LeapArgParseTest(unittest.TestCase):
opts,
Namespace(config_file=None,
debug=True))
+
+if __name__ == "__main__":
+ unittest.main()