From d5af0f112eca2c9af0344c589d731cd6b3051acf Mon Sep 17 00:00:00 2001 From: antialias Date: Mon, 20 Aug 2012 17:41:50 -0700 Subject: added json parsing from eip.json file and some basic tests. --- src/leap/eip/tests/tests_config.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 src/leap/eip/tests/tests_config.py (limited to 'src/leap') diff --git a/src/leap/eip/tests/tests_config.py b/src/leap/eip/tests/tests_config.py new file mode 100644 index 00000000..6534723e --- /dev/null +++ b/src/leap/eip/tests/tests_config.py @@ -0,0 +1,18 @@ + +"""Test config helper functions""" + +import unittest + +from leap.eip import config + +class TestConfig(unittest.TestCase): + """ + Test configuration help functions. + """ + def test_get_config_json(self): + config_js = config.get_config_json() + self.assertTrue(isinstance(config_js, dict)) + self.assertTrue(config_js.has_key('transport')) + self.assertTrue(config_js.has_key('provider')) + self.assertEqual(config_js['provider'], "testprovider.org") + -- cgit v1.2.3 From ac00ec313a142e910447857c0e46e6d36c7f2ab2 Mon Sep 17 00:00:00 2001 From: antialias Date: Tue, 21 Aug 2012 10:12:22 -0700 Subject: Error fixes and json commit. --- src/leap/eip/config.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) (limited to 'src/leap') diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 8e55d789..a219fedb 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -2,6 +2,7 @@ import ConfigParser import grp import logging import os +import json import platform import socket @@ -118,8 +119,8 @@ def check_or_create_default_vpnconf(config): 'remote_ip') validate_ip(remote_ip) - except ConfigParser.NoOptionError: - raise EIPInitNoProviderError + except ConfigParser.NoSectionError: + raise eip_exceptions.EIPInitNoProviderError except socket.error: # this does not look like an ip, dave @@ -394,7 +395,7 @@ def check_vpn_keys(config): if not os.path.isfile(keyfile): logger.error('key file %s not found. aborting.', keyfile) - raise EIPInitNoKeyFileError + raise eip_exceptions.EIPInitNoKeyFileError # check proper permission on keys # bad perms? try to fix them @@ -402,3 +403,27 @@ def check_vpn_keys(config): check_and_fix_urw_only(keyfile) except OSError: raise EIPInitBadKeyFilePermError + + +def get_config_json(config_file=None): + """ + will replace get_config function be developing them + in parralel for branch purposes. + @param: configuration file + @type: file + @rparam: configuration turples + @rtype: dictionary + """ + if not config_file: + fpath = get_config_file('eip.json') + if not os.path.isfile(fpath): + dpath, cfile = os.path.split(fpath) + if not os.path.isdir(dpath): + mkdir_p(dpath) + with open(fpath, 'wb') as configfile: + configfile.write() + config_file = open(fpath) + + config = json.load(config_file) + + return config -- cgit v1.2.3 From 78c83ab3bb7f95564bdc537d29f6d278b1710b17 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 03:31:07 +0900 Subject: pep8 --- src/leap/eip/exceptions.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/leap') diff --git a/src/leap/eip/exceptions.py b/src/leap/eip/exceptions.py index ac61f42b..3719c605 100644 --- a/src/leap/eip/exceptions.py +++ b/src/leap/eip/exceptions.py @@ -61,5 +61,3 @@ class EIPInitNoKeyFileError(Exception): class EIPInitBadKeyFilePermError(Exception): pass - - -- cgit v1.2.3 From 04cf64af3702ab85a670efe6850c60f20bbf7eb0 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:11:44 +0900 Subject: conductor tests --- src/leap/eip/test_conductor.py | 180 +++++++++++++++++++++++++++++++++++++++++ src/leap/util/test_fileutil.py | 10 +-- 2 files changed, 184 insertions(+), 6 deletions(-) create mode 100644 src/leap/eip/test_conductor.py (limited to 'src/leap') diff --git a/src/leap/eip/test_conductor.py b/src/leap/eip/test_conductor.py new file mode 100644 index 00000000..51772b7c --- /dev/null +++ b/src/leap/eip/test_conductor.py @@ -0,0 +1,180 @@ +import ConfigParser +import logging +import platform + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip.eipconnection import EIPConnection +from leap.eip.exceptions import ConnectionRefusedError + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +@patch('OpenVPNConnection._get_or_create_config') +@patch('OpenVPNConnection._set_ovpn_command') +class MockedEIPConnection(EIPConnection): + def _get_or_create_config(self): + self.config = ConfigParser.ConfigParser() + self._set_ovpn_command() + + def _set_ovpn_command(self): + self.command = "mock_command" + self.args = [1, 2, 3] + + +class EIPConductorTest(unittest.TestCase): + + __name__ = "eip_conductor_tests" + + def setUp(self): + self.manager = Mock( + name="openvpnmanager_mock") + + self.con = MockedEIPConnection() + #manager=self.manager) + + def tearDown(self): + del self.con + + # + # helpers + # + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skip + #ain't manager anymore! + def test_manager_was_initialized(self): + """ + manager init ok during conductor init? + """ + self.manager.assert_called_once_with() + + def test_vpnconnection_defaults(self): + """ + default attrs as expected + """ + con = self.con + self.assertEqual(con.autostart, True) + self.assertEqual(con.missing_pkexec, False) + self.assertEqual(con.missing_vpn_keyfile, False) + self.assertEqual(con.missing_provider, False) + self.assertEqual(con.bad_provider, False) + + def test_config_was_init(self): + """ + is there a config object? + """ + self.assertTrue(isinstance(self.con.config, + ConfigParser.ConfigParser)) + + def test_ovpn_command(self): + """ + set_ovpn_command called + """ + self.assertEqual(self.con.command, + "mock_command") + self.assertEqual(self.con.args, + [1, 2, 3]) + + # connect/disconnect calls + + def test_disconnect(self): + """ + disconnect method calls private and changes status + """ + self.con._disconnect = Mock( + name="_disconnect") + + # first we set status to connected + self.con.status.set_current(self.con.status.CONNECTED) + self.assertEqual(self.con.status.current, + self.con.status.CONNECTED) + + # disconnect + self.con.disconnect() + self.con._disconnect.assert_called_once_with() + + # new status should be disconnected + # XXX this should evolve and check no errors + # during disconnection + self.assertEqual(self.con.status.current, + self.con.status.DISCONNECTED) + + def test_connect(self): + """ + connect calls _launch_openvpn private + """ + self.con._launch_openvpn = Mock() + self.con.connect() + self.con._launch_openvpn.assert_called_once_with() + + # XXX tests breaking here ... + + def test_good_poll_connection_state(self): + """ + """ + #@patch -- + # self.manager.get_connection_state + + #XXX review this set of poll_state tests + #they SHOULD NOT NEED TO MOCK ANYTHING IN THE + #lower layers!! -- status, vpn_manager.. + #right now we're testing implementation, not + #behavior!!! + good_state = ["1345466946", "unknown_state", "ok", + "192.168.1.1", "192.168.1.100"] + self.con.get_connection_state = Mock(return_value=good_state) + self.con.status.set_vpn_state = Mock() + + state = self.con.poll_connection_state() + good_state[1] = "disconnected" + final_state = tuple(good_state) + self.con.status.set_vpn_state.assert_called_with("unknown_state") + self.assertEqual(state, final_state) + + # TODO between "good" and "bad" (exception raised) cases, + # we can still test for malformed states and see that only good + # states do have a change (and from only the expected transition + # states). + + def test_bad_poll_connection_state(self): + """ + get connection state raises ConnectionRefusedError + state is None + """ + self.con.get_connection_state = Mock( + side_effect=ConnectionRefusedError('foo!')) + state = self.con.poll_connection_state() + self.assertEqual(state, None) + + + # XXX more things to test: + # - called config routines during initz. + # - raising proper exceptions with no config + # - called proper checks on config / permissions + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/util/test_fileutil.py b/src/leap/util/test_fileutil.py index 849decaf..f5dbe108 100644 --- a/src/leap/util/test_fileutil.py +++ b/src/leap/util/test_fileutil.py @@ -52,8 +52,7 @@ class FileUtilTest(unittest.TestCase): def test_is_user_executable(self): """ - test that a 700 file - is an 700 file. kindda oximoronic, but... + touch_exec_file creates in mode 700? """ # XXX could check access X_OK @@ -63,10 +62,10 @@ class FileUtilTest(unittest.TestCase): def test_which(self): """ + which implementation ok? not a very reliable test, but I cannot think of anything smarter now I guess it's highly improbable that copy - command is somewhere else..? """ # XXX yep, we can change the syspath # for the test... ! @@ -78,7 +77,7 @@ class FileUtilTest(unittest.TestCase): def test_mkdir_p(self): """ - test our mkdir -p implementation + our own mkdir -p implementation ok? """ testdir = self.get_file_path( os.path.join('test', 'foo', 'bar')) @@ -88,8 +87,7 @@ class FileUtilTest(unittest.TestCase): def test_check_and_fix_urw_only(self): """ - test function that fixes perms on - files that should be rw only for owner + ensure check_and_fix_urx_only ok? """ fp = self.touch_exec_file() mode = self.get_mode(fp) -- cgit v1.2.3 From a048ecc7d709f6378ccba6201131b8c03df94716 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:30:41 +0900 Subject: add conductor + manager tests (red) --- src/leap/eip/test_conductor.py | 180 --------------------------------- src/leap/eip/test_eipconnection.py | 180 +++++++++++++++++++++++++++++++++ src/leap/eip/test_openvpnconnection.py | 136 +++++++++++++++++++++++++ 3 files changed, 316 insertions(+), 180 deletions(-) delete mode 100644 src/leap/eip/test_conductor.py create mode 100644 src/leap/eip/test_eipconnection.py create mode 100644 src/leap/eip/test_openvpnconnection.py (limited to 'src/leap') diff --git a/src/leap/eip/test_conductor.py b/src/leap/eip/test_conductor.py deleted file mode 100644 index 51772b7c..00000000 --- a/src/leap/eip/test_conductor.py +++ /dev/null @@ -1,180 +0,0 @@ -import ConfigParser -import logging -import platform - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from mock import Mock, patch # MagicMock - -from leap.eip.eipconnection import EIPConnection -from leap.eip.exceptions import ConnectionRefusedError - -_system = platform.system() - - -class NotImplementedError(Exception): - pass - - -@patch('OpenVPNConnection._get_or_create_config') -@patch('OpenVPNConnection._set_ovpn_command') -class MockedEIPConnection(EIPConnection): - def _get_or_create_config(self): - self.config = ConfigParser.ConfigParser() - self._set_ovpn_command() - - def _set_ovpn_command(self): - self.command = "mock_command" - self.args = [1, 2, 3] - - -class EIPConductorTest(unittest.TestCase): - - __name__ = "eip_conductor_tests" - - def setUp(self): - self.manager = Mock( - name="openvpnmanager_mock") - - self.con = MockedEIPConnection() - #manager=self.manager) - - def tearDown(self): - del self.con - - # - # helpers - # - - def _missing_test_for_plat(self, do_raise=False): - if do_raise: - raise NotImplementedError( - "This test is not implemented " - "for the running platform: %s" % - _system) - - # - # tests - # - - @unittest.skip - #ain't manager anymore! - def test_manager_was_initialized(self): - """ - manager init ok during conductor init? - """ - self.manager.assert_called_once_with() - - def test_vpnconnection_defaults(self): - """ - default attrs as expected - """ - con = self.con - self.assertEqual(con.autostart, True) - self.assertEqual(con.missing_pkexec, False) - self.assertEqual(con.missing_vpn_keyfile, False) - self.assertEqual(con.missing_provider, False) - self.assertEqual(con.bad_provider, False) - - def test_config_was_init(self): - """ - is there a config object? - """ - self.assertTrue(isinstance(self.con.config, - ConfigParser.ConfigParser)) - - def test_ovpn_command(self): - """ - set_ovpn_command called - """ - self.assertEqual(self.con.command, - "mock_command") - self.assertEqual(self.con.args, - [1, 2, 3]) - - # connect/disconnect calls - - def test_disconnect(self): - """ - disconnect method calls private and changes status - """ - self.con._disconnect = Mock( - name="_disconnect") - - # first we set status to connected - self.con.status.set_current(self.con.status.CONNECTED) - self.assertEqual(self.con.status.current, - self.con.status.CONNECTED) - - # disconnect - self.con.disconnect() - self.con._disconnect.assert_called_once_with() - - # new status should be disconnected - # XXX this should evolve and check no errors - # during disconnection - self.assertEqual(self.con.status.current, - self.con.status.DISCONNECTED) - - def test_connect(self): - """ - connect calls _launch_openvpn private - """ - self.con._launch_openvpn = Mock() - self.con.connect() - self.con._launch_openvpn.assert_called_once_with() - - # XXX tests breaking here ... - - def test_good_poll_connection_state(self): - """ - """ - #@patch -- - # self.manager.get_connection_state - - #XXX review this set of poll_state tests - #they SHOULD NOT NEED TO MOCK ANYTHING IN THE - #lower layers!! -- status, vpn_manager.. - #right now we're testing implementation, not - #behavior!!! - good_state = ["1345466946", "unknown_state", "ok", - "192.168.1.1", "192.168.1.100"] - self.con.get_connection_state = Mock(return_value=good_state) - self.con.status.set_vpn_state = Mock() - - state = self.con.poll_connection_state() - good_state[1] = "disconnected" - final_state = tuple(good_state) - self.con.status.set_vpn_state.assert_called_with("unknown_state") - self.assertEqual(state, final_state) - - # TODO between "good" and "bad" (exception raised) cases, - # we can still test for malformed states and see that only good - # states do have a change (and from only the expected transition - # states). - - def test_bad_poll_connection_state(self): - """ - get connection state raises ConnectionRefusedError - state is None - """ - self.con.get_connection_state = Mock( - side_effect=ConnectionRefusedError('foo!')) - state = self.con.poll_connection_state() - self.assertEqual(state, None) - - - # XXX more things to test: - # - called config routines during initz. - # - raising proper exceptions with no config - # - called proper checks on config / permissions - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/eip/test_eipconnection.py b/src/leap/eip/test_eipconnection.py new file mode 100644 index 00000000..51772b7c --- /dev/null +++ b/src/leap/eip/test_eipconnection.py @@ -0,0 +1,180 @@ +import ConfigParser +import logging +import platform + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip.eipconnection import EIPConnection +from leap.eip.exceptions import ConnectionRefusedError + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +@patch('OpenVPNConnection._get_or_create_config') +@patch('OpenVPNConnection._set_ovpn_command') +class MockedEIPConnection(EIPConnection): + def _get_or_create_config(self): + self.config = ConfigParser.ConfigParser() + self._set_ovpn_command() + + def _set_ovpn_command(self): + self.command = "mock_command" + self.args = [1, 2, 3] + + +class EIPConductorTest(unittest.TestCase): + + __name__ = "eip_conductor_tests" + + def setUp(self): + self.manager = Mock( + name="openvpnmanager_mock") + + self.con = MockedEIPConnection() + #manager=self.manager) + + def tearDown(self): + del self.con + + # + # helpers + # + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skip + #ain't manager anymore! + def test_manager_was_initialized(self): + """ + manager init ok during conductor init? + """ + self.manager.assert_called_once_with() + + def test_vpnconnection_defaults(self): + """ + default attrs as expected + """ + con = self.con + self.assertEqual(con.autostart, True) + self.assertEqual(con.missing_pkexec, False) + self.assertEqual(con.missing_vpn_keyfile, False) + self.assertEqual(con.missing_provider, False) + self.assertEqual(con.bad_provider, False) + + def test_config_was_init(self): + """ + is there a config object? + """ + self.assertTrue(isinstance(self.con.config, + ConfigParser.ConfigParser)) + + def test_ovpn_command(self): + """ + set_ovpn_command called + """ + self.assertEqual(self.con.command, + "mock_command") + self.assertEqual(self.con.args, + [1, 2, 3]) + + # connect/disconnect calls + + def test_disconnect(self): + """ + disconnect method calls private and changes status + """ + self.con._disconnect = Mock( + name="_disconnect") + + # first we set status to connected + self.con.status.set_current(self.con.status.CONNECTED) + self.assertEqual(self.con.status.current, + self.con.status.CONNECTED) + + # disconnect + self.con.disconnect() + self.con._disconnect.assert_called_once_with() + + # new status should be disconnected + # XXX this should evolve and check no errors + # during disconnection + self.assertEqual(self.con.status.current, + self.con.status.DISCONNECTED) + + def test_connect(self): + """ + connect calls _launch_openvpn private + """ + self.con._launch_openvpn = Mock() + self.con.connect() + self.con._launch_openvpn.assert_called_once_with() + + # XXX tests breaking here ... + + def test_good_poll_connection_state(self): + """ + """ + #@patch -- + # self.manager.get_connection_state + + #XXX review this set of poll_state tests + #they SHOULD NOT NEED TO MOCK ANYTHING IN THE + #lower layers!! -- status, vpn_manager.. + #right now we're testing implementation, not + #behavior!!! + good_state = ["1345466946", "unknown_state", "ok", + "192.168.1.1", "192.168.1.100"] + self.con.get_connection_state = Mock(return_value=good_state) + self.con.status.set_vpn_state = Mock() + + state = self.con.poll_connection_state() + good_state[1] = "disconnected" + final_state = tuple(good_state) + self.con.status.set_vpn_state.assert_called_with("unknown_state") + self.assertEqual(state, final_state) + + # TODO between "good" and "bad" (exception raised) cases, + # we can still test for malformed states and see that only good + # states do have a change (and from only the expected transition + # states). + + def test_bad_poll_connection_state(self): + """ + get connection state raises ConnectionRefusedError + state is None + """ + self.con.get_connection_state = Mock( + side_effect=ConnectionRefusedError('foo!')) + state = self.con.poll_connection_state() + self.assertEqual(state, None) + + + # XXX more things to test: + # - called config routines during initz. + # - raising proper exceptions with no config + # - called proper checks on config / permissions + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/test_openvpnconnection.py b/src/leap/eip/test_openvpnconnection.py new file mode 100644 index 00000000..821c1ed4 --- /dev/null +++ b/src/leap/eip/test_openvpnconnection.py @@ -0,0 +1,136 @@ +import logging +import platform +#import socket + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip import openvpnconnection +from leap.eip import exceptions as eip_exceptions +from leap.eip.udstelnet import UDSTelnet + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +mock_UDSTelnet = Mock(spec=UDSTelnet) +# XXX cautious!!! +# this might be fragile right now (counting a global +# reference of calls I think. +# investigate this other form instead: +# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop + +# XXX redo after merge-refactor + + +@patch('openvpnconnection.OpenVPNConnection.connect_to_management') +class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): + def __init__(self, *args, **kwargs): + self.mock_UDSTelnet = Mock() + super(MockedOpenVPNConnection, self).__init__( + *args, **kwargs) + self.tn = self.mock_UDSTelnet(self.host, self.port) + + def connect_to_management(self): + #print 'patched connect' + self.tn = mock_UDSTelnet(self.host, port=self.port) + + +class OpenVPNConnectionTest(unittest.TestCase): + + __name__ = "vpnconnection_tests" + + def setUp(self): + self.manager = MockedOpenVPNConnection() + + def tearDown(self): + del self.manager + + # + # helpers + # + + # XXX hey, refactor this to basetestclass + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skipIf(_system == "Windows", "lin/mac only") + def test_lin_mac_default_init(self): + """ + check default host for management iface + """ + self.assertEqual(self.manager.host, '/tmp/.eip.sock') + self.assertEqual(self.manager.port, 'unix') + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_default_init(self): + """ + check default host for management iface + """ + # XXX should we make the platform specific switch + # here or in the vpn command string building? + self.assertEqual(self.manager.host, 'localhost') + self.assertEqual(self.manager.port, 7777) + + def test_port_types_init(self): + self.manager = MockedOpenVPNConnection(port="42") + self.assertEqual(self.manager.port, 42) + self.manager = MockedOpenVPNConnection() + self.assertEqual(self.manager.port, "unix") + self.manager = MockedOpenVPNConnection(port="bad") + self.assertEqual(self.manager.port, None) + + def test_connect_raises_missing_socket(self): + self.manager = openvpnconnection.OpenVPNConnection() + with self.assertRaises(eip_exceptions.MissingSocketError): + self.manager.connect_to_management() + + def test_uds_telnet_called_on_connect(self): + self.manager.connect() + mock_UDSTelnet.assert_called_with( + self.manager.host, + port=self.manager.port) + + @unittest.skip + def test_connect(self): + raise NotImplementedError + # XXX calls close + # calls UDSTelnet mock. + + # XXX + # tests to write: + # UDSTelnetTest (for real?) + # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. + # very illustrative instead... + + # - raise MissingSocket + # - raise ConnectionRefusedError + # - test send command + # - tries connect + # - ... tries? + # - ... calls _seek_to_eof + # - ... read_until --> return value + # - ... + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 5ab2163a89ad7bc303f436af738aa0e7e6bb24d4 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:37:41 +0900 Subject: fix for failing bad port init test --- src/leap/eip/openvpnconnection.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/eip/openvpnconnection.py b/src/leap/eip/openvpnconnection.py index a230d229..3972b617 100644 --- a/src/leap/eip/openvpnconnection.py +++ b/src/leap/eip/openvpnconnection.py @@ -85,9 +85,12 @@ to be triggered for each one of them. self.host = host if isinstance(port, str) and port.isdigit(): port = int(port) + elif port == "unix": + port = "unix" + else: + port = None self.port = port self.password = password - #self.tn = None def _set_autostart(self): config = self.config -- cgit v1.2.3 From 40e7706d7291605b7759561ae59550213af83a94 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:39:35 +0900 Subject: fix udstelnet called test (green) --- src/leap/eip/test_openvpnconnection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/eip/test_openvpnconnection.py b/src/leap/eip/test_openvpnconnection.py index 821c1ed4..dea75b55 100644 --- a/src/leap/eip/test_openvpnconnection.py +++ b/src/leap/eip/test_openvpnconnection.py @@ -105,7 +105,7 @@ class OpenVPNConnectionTest(unittest.TestCase): self.manager.connect_to_management() def test_uds_telnet_called_on_connect(self): - self.manager.connect() + self.manager.connect_to_management() mock_UDSTelnet.assert_called_with( self.manager.host, port=self.manager.port) -- cgit v1.2.3 From b9f9e2d5df2d9aa64377a02eba03fd877b134a8a Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:41:59 +0900 Subject: moved tests to directory --- src/leap/eip/test_eipconnection.py | 180 --------------------------- src/leap/eip/test_openvpnconnection.py | 136 -------------------- src/leap/eip/tests/test_eipconnection.py | 180 +++++++++++++++++++++++++++ src/leap/eip/tests/test_openvpnconnection.py | 136 ++++++++++++++++++++ 4 files changed, 316 insertions(+), 316 deletions(-) delete mode 100644 src/leap/eip/test_eipconnection.py delete mode 100644 src/leap/eip/test_openvpnconnection.py create mode 100644 src/leap/eip/tests/test_eipconnection.py create mode 100644 src/leap/eip/tests/test_openvpnconnection.py (limited to 'src/leap') diff --git a/src/leap/eip/test_eipconnection.py b/src/leap/eip/test_eipconnection.py deleted file mode 100644 index 51772b7c..00000000 --- a/src/leap/eip/test_eipconnection.py +++ /dev/null @@ -1,180 +0,0 @@ -import ConfigParser -import logging -import platform - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from mock import Mock, patch # MagicMock - -from leap.eip.eipconnection import EIPConnection -from leap.eip.exceptions import ConnectionRefusedError - -_system = platform.system() - - -class NotImplementedError(Exception): - pass - - -@patch('OpenVPNConnection._get_or_create_config') -@patch('OpenVPNConnection._set_ovpn_command') -class MockedEIPConnection(EIPConnection): - def _get_or_create_config(self): - self.config = ConfigParser.ConfigParser() - self._set_ovpn_command() - - def _set_ovpn_command(self): - self.command = "mock_command" - self.args = [1, 2, 3] - - -class EIPConductorTest(unittest.TestCase): - - __name__ = "eip_conductor_tests" - - def setUp(self): - self.manager = Mock( - name="openvpnmanager_mock") - - self.con = MockedEIPConnection() - #manager=self.manager) - - def tearDown(self): - del self.con - - # - # helpers - # - - def _missing_test_for_plat(self, do_raise=False): - if do_raise: - raise NotImplementedError( - "This test is not implemented " - "for the running platform: %s" % - _system) - - # - # tests - # - - @unittest.skip - #ain't manager anymore! - def test_manager_was_initialized(self): - """ - manager init ok during conductor init? - """ - self.manager.assert_called_once_with() - - def test_vpnconnection_defaults(self): - """ - default attrs as expected - """ - con = self.con - self.assertEqual(con.autostart, True) - self.assertEqual(con.missing_pkexec, False) - self.assertEqual(con.missing_vpn_keyfile, False) - self.assertEqual(con.missing_provider, False) - self.assertEqual(con.bad_provider, False) - - def test_config_was_init(self): - """ - is there a config object? - """ - self.assertTrue(isinstance(self.con.config, - ConfigParser.ConfigParser)) - - def test_ovpn_command(self): - """ - set_ovpn_command called - """ - self.assertEqual(self.con.command, - "mock_command") - self.assertEqual(self.con.args, - [1, 2, 3]) - - # connect/disconnect calls - - def test_disconnect(self): - """ - disconnect method calls private and changes status - """ - self.con._disconnect = Mock( - name="_disconnect") - - # first we set status to connected - self.con.status.set_current(self.con.status.CONNECTED) - self.assertEqual(self.con.status.current, - self.con.status.CONNECTED) - - # disconnect - self.con.disconnect() - self.con._disconnect.assert_called_once_with() - - # new status should be disconnected - # XXX this should evolve and check no errors - # during disconnection - self.assertEqual(self.con.status.current, - self.con.status.DISCONNECTED) - - def test_connect(self): - """ - connect calls _launch_openvpn private - """ - self.con._launch_openvpn = Mock() - self.con.connect() - self.con._launch_openvpn.assert_called_once_with() - - # XXX tests breaking here ... - - def test_good_poll_connection_state(self): - """ - """ - #@patch -- - # self.manager.get_connection_state - - #XXX review this set of poll_state tests - #they SHOULD NOT NEED TO MOCK ANYTHING IN THE - #lower layers!! -- status, vpn_manager.. - #right now we're testing implementation, not - #behavior!!! - good_state = ["1345466946", "unknown_state", "ok", - "192.168.1.1", "192.168.1.100"] - self.con.get_connection_state = Mock(return_value=good_state) - self.con.status.set_vpn_state = Mock() - - state = self.con.poll_connection_state() - good_state[1] = "disconnected" - final_state = tuple(good_state) - self.con.status.set_vpn_state.assert_called_with("unknown_state") - self.assertEqual(state, final_state) - - # TODO between "good" and "bad" (exception raised) cases, - # we can still test for malformed states and see that only good - # states do have a change (and from only the expected transition - # states). - - def test_bad_poll_connection_state(self): - """ - get connection state raises ConnectionRefusedError - state is None - """ - self.con.get_connection_state = Mock( - side_effect=ConnectionRefusedError('foo!')) - state = self.con.poll_connection_state() - self.assertEqual(state, None) - - - # XXX more things to test: - # - called config routines during initz. - # - raising proper exceptions with no config - # - called proper checks on config / permissions - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/eip/test_openvpnconnection.py b/src/leap/eip/test_openvpnconnection.py deleted file mode 100644 index dea75b55..00000000 --- a/src/leap/eip/test_openvpnconnection.py +++ /dev/null @@ -1,136 +0,0 @@ -import logging -import platform -#import socket - -logging.basicConfig() -logger = logging.getLogger(name=__name__) - -try: - import unittest2 as unittest -except ImportError: - import unittest - -from mock import Mock, patch # MagicMock - -from leap.eip import openvpnconnection -from leap.eip import exceptions as eip_exceptions -from leap.eip.udstelnet import UDSTelnet - -_system = platform.system() - - -class NotImplementedError(Exception): - pass - - -mock_UDSTelnet = Mock(spec=UDSTelnet) -# XXX cautious!!! -# this might be fragile right now (counting a global -# reference of calls I think. -# investigate this other form instead: -# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop - -# XXX redo after merge-refactor - - -@patch('openvpnconnection.OpenVPNConnection.connect_to_management') -class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): - def __init__(self, *args, **kwargs): - self.mock_UDSTelnet = Mock() - super(MockedOpenVPNConnection, self).__init__( - *args, **kwargs) - self.tn = self.mock_UDSTelnet(self.host, self.port) - - def connect_to_management(self): - #print 'patched connect' - self.tn = mock_UDSTelnet(self.host, port=self.port) - - -class OpenVPNConnectionTest(unittest.TestCase): - - __name__ = "vpnconnection_tests" - - def setUp(self): - self.manager = MockedOpenVPNConnection() - - def tearDown(self): - del self.manager - - # - # helpers - # - - # XXX hey, refactor this to basetestclass - - def _missing_test_for_plat(self, do_raise=False): - if do_raise: - raise NotImplementedError( - "This test is not implemented " - "for the running platform: %s" % - _system) - - # - # tests - # - - @unittest.skipIf(_system == "Windows", "lin/mac only") - def test_lin_mac_default_init(self): - """ - check default host for management iface - """ - self.assertEqual(self.manager.host, '/tmp/.eip.sock') - self.assertEqual(self.manager.port, 'unix') - - @unittest.skipUnless(_system == "Windows", "win only") - def test_win_default_init(self): - """ - check default host for management iface - """ - # XXX should we make the platform specific switch - # here or in the vpn command string building? - self.assertEqual(self.manager.host, 'localhost') - self.assertEqual(self.manager.port, 7777) - - def test_port_types_init(self): - self.manager = MockedOpenVPNConnection(port="42") - self.assertEqual(self.manager.port, 42) - self.manager = MockedOpenVPNConnection() - self.assertEqual(self.manager.port, "unix") - self.manager = MockedOpenVPNConnection(port="bad") - self.assertEqual(self.manager.port, None) - - def test_connect_raises_missing_socket(self): - self.manager = openvpnconnection.OpenVPNConnection() - with self.assertRaises(eip_exceptions.MissingSocketError): - self.manager.connect_to_management() - - def test_uds_telnet_called_on_connect(self): - self.manager.connect_to_management() - mock_UDSTelnet.assert_called_with( - self.manager.host, - port=self.manager.port) - - @unittest.skip - def test_connect(self): - raise NotImplementedError - # XXX calls close - # calls UDSTelnet mock. - - # XXX - # tests to write: - # UDSTelnetTest (for real?) - # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. - # very illustrative instead... - - # - raise MissingSocket - # - raise ConnectionRefusedError - # - test send command - # - tries connect - # - ... tries? - # - ... calls _seek_to_eof - # - ... read_until --> return value - # - ... - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py new file mode 100644 index 00000000..51772b7c --- /dev/null +++ b/src/leap/eip/tests/test_eipconnection.py @@ -0,0 +1,180 @@ +import ConfigParser +import logging +import platform + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip.eipconnection import EIPConnection +from leap.eip.exceptions import ConnectionRefusedError + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +@patch('OpenVPNConnection._get_or_create_config') +@patch('OpenVPNConnection._set_ovpn_command') +class MockedEIPConnection(EIPConnection): + def _get_or_create_config(self): + self.config = ConfigParser.ConfigParser() + self._set_ovpn_command() + + def _set_ovpn_command(self): + self.command = "mock_command" + self.args = [1, 2, 3] + + +class EIPConductorTest(unittest.TestCase): + + __name__ = "eip_conductor_tests" + + def setUp(self): + self.manager = Mock( + name="openvpnmanager_mock") + + self.con = MockedEIPConnection() + #manager=self.manager) + + def tearDown(self): + del self.con + + # + # helpers + # + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skip + #ain't manager anymore! + def test_manager_was_initialized(self): + """ + manager init ok during conductor init? + """ + self.manager.assert_called_once_with() + + def test_vpnconnection_defaults(self): + """ + default attrs as expected + """ + con = self.con + self.assertEqual(con.autostart, True) + self.assertEqual(con.missing_pkexec, False) + self.assertEqual(con.missing_vpn_keyfile, False) + self.assertEqual(con.missing_provider, False) + self.assertEqual(con.bad_provider, False) + + def test_config_was_init(self): + """ + is there a config object? + """ + self.assertTrue(isinstance(self.con.config, + ConfigParser.ConfigParser)) + + def test_ovpn_command(self): + """ + set_ovpn_command called + """ + self.assertEqual(self.con.command, + "mock_command") + self.assertEqual(self.con.args, + [1, 2, 3]) + + # connect/disconnect calls + + def test_disconnect(self): + """ + disconnect method calls private and changes status + """ + self.con._disconnect = Mock( + name="_disconnect") + + # first we set status to connected + self.con.status.set_current(self.con.status.CONNECTED) + self.assertEqual(self.con.status.current, + self.con.status.CONNECTED) + + # disconnect + self.con.disconnect() + self.con._disconnect.assert_called_once_with() + + # new status should be disconnected + # XXX this should evolve and check no errors + # during disconnection + self.assertEqual(self.con.status.current, + self.con.status.DISCONNECTED) + + def test_connect(self): + """ + connect calls _launch_openvpn private + """ + self.con._launch_openvpn = Mock() + self.con.connect() + self.con._launch_openvpn.assert_called_once_with() + + # XXX tests breaking here ... + + def test_good_poll_connection_state(self): + """ + """ + #@patch -- + # self.manager.get_connection_state + + #XXX review this set of poll_state tests + #they SHOULD NOT NEED TO MOCK ANYTHING IN THE + #lower layers!! -- status, vpn_manager.. + #right now we're testing implementation, not + #behavior!!! + good_state = ["1345466946", "unknown_state", "ok", + "192.168.1.1", "192.168.1.100"] + self.con.get_connection_state = Mock(return_value=good_state) + self.con.status.set_vpn_state = Mock() + + state = self.con.poll_connection_state() + good_state[1] = "disconnected" + final_state = tuple(good_state) + self.con.status.set_vpn_state.assert_called_with("unknown_state") + self.assertEqual(state, final_state) + + # TODO between "good" and "bad" (exception raised) cases, + # we can still test for malformed states and see that only good + # states do have a change (and from only the expected transition + # states). + + def test_bad_poll_connection_state(self): + """ + get connection state raises ConnectionRefusedError + state is None + """ + self.con.get_connection_state = Mock( + side_effect=ConnectionRefusedError('foo!')) + state = self.con.poll_connection_state() + self.assertEqual(state, None) + + + # XXX more things to test: + # - called config routines during initz. + # - raising proper exceptions with no config + # - called proper checks on config / permissions + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py new file mode 100644 index 00000000..dea75b55 --- /dev/null +++ b/src/leap/eip/tests/test_openvpnconnection.py @@ -0,0 +1,136 @@ +import logging +import platform +#import socket + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip import openvpnconnection +from leap.eip import exceptions as eip_exceptions +from leap.eip.udstelnet import UDSTelnet + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +mock_UDSTelnet = Mock(spec=UDSTelnet) +# XXX cautious!!! +# this might be fragile right now (counting a global +# reference of calls I think. +# investigate this other form instead: +# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop + +# XXX redo after merge-refactor + + +@patch('openvpnconnection.OpenVPNConnection.connect_to_management') +class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): + def __init__(self, *args, **kwargs): + self.mock_UDSTelnet = Mock() + super(MockedOpenVPNConnection, self).__init__( + *args, **kwargs) + self.tn = self.mock_UDSTelnet(self.host, self.port) + + def connect_to_management(self): + #print 'patched connect' + self.tn = mock_UDSTelnet(self.host, port=self.port) + + +class OpenVPNConnectionTest(unittest.TestCase): + + __name__ = "vpnconnection_tests" + + def setUp(self): + self.manager = MockedOpenVPNConnection() + + def tearDown(self): + del self.manager + + # + # helpers + # + + # XXX hey, refactor this to basetestclass + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skipIf(_system == "Windows", "lin/mac only") + def test_lin_mac_default_init(self): + """ + check default host for management iface + """ + self.assertEqual(self.manager.host, '/tmp/.eip.sock') + self.assertEqual(self.manager.port, 'unix') + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_default_init(self): + """ + check default host for management iface + """ + # XXX should we make the platform specific switch + # here or in the vpn command string building? + self.assertEqual(self.manager.host, 'localhost') + self.assertEqual(self.manager.port, 7777) + + def test_port_types_init(self): + self.manager = MockedOpenVPNConnection(port="42") + self.assertEqual(self.manager.port, 42) + self.manager = MockedOpenVPNConnection() + self.assertEqual(self.manager.port, "unix") + self.manager = MockedOpenVPNConnection(port="bad") + self.assertEqual(self.manager.port, None) + + def test_connect_raises_missing_socket(self): + self.manager = openvpnconnection.OpenVPNConnection() + with self.assertRaises(eip_exceptions.MissingSocketError): + self.manager.connect_to_management() + + def test_uds_telnet_called_on_connect(self): + self.manager.connect_to_management() + mock_UDSTelnet.assert_called_with( + self.manager.host, + port=self.manager.port) + + @unittest.skip + def test_connect(self): + raise NotImplementedError + # XXX calls close + # calls UDSTelnet mock. + + # XXX + # tests to write: + # UDSTelnetTest (for real?) + # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. + # very illustrative instead... + + # - raise MissingSocket + # - raise ConnectionRefusedError + # - test send command + # - tries connect + # - ... tries? + # - ... calls _seek_to_eof + # - ... read_until --> return value + # - ... + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 5f6064b9dfa102b1115d5e3a6ecfb22cdcf82d14 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:47:14 +0900 Subject: config tests --- src/leap/eip/config.py | 72 +++++++++---- src/leap/eip/tests/test_config.py | 210 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 18 deletions(-) create mode 100644 src/leap/eip/tests/test_config.py (limited to 'src/leap') diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 8e55d789..8c67a258 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -9,15 +9,37 @@ from leap.util.fileutil import (which, mkdir_p, check_and_fix_urw_only) from leap.baseapp.permcheck import (is_pkexec_in_system, is_auth_agent_running) -from leap.eip import exceptions as eip_exceptions logger = logging.getLogger(name=__name__) logger.setLevel('DEBUG') -# XXX this has to be REMOVED -# and all these options passed in the -# command line --> move to build_ovpn_command -# issue #447 +# XXX move exceptions: +# from leap.eip import exceptions as eip_exceptions + + +class EIPNoPkexecAvailable(Exception): + pass + + +class EIPNoPolkitAuthAgentAvailable(Exception): + pass + + +class EIPInitNoProviderError(Exception): + pass + + +class EIPInitBadProviderError(Exception): + pass + + +class EIPInitNoKeyFileError(Exception): + pass + + +class EIPInitBadKeyFilePermError(Exception): + pass + OPENVPN_CONFIG_TEMPLATE = """#Autogenerated by eip-client wizard remote {VPN_REMOTE_HOST} {VPN_REMOTE_PORT} @@ -114,6 +136,10 @@ def check_or_create_default_vpnconf(config): # instead. try: + # XXX by now, we're expecting + # only IP format for remote. + # We should allow also domain names, + # and make a reverse resolv. remote_ip = config.get('provider', 'remote_ip') validate_ip(remote_ip) @@ -158,6 +184,15 @@ def check_or_create_default_vpnconf(config): f.write(ovpn_config) +def get_username(): + return os.getlogin() + + +def get_groupname(): + gid = os.getgroups()[-1] + return grp.getgrgid(gid).gr_name + + def build_ovpn_options(daemon=False): """ build a list of options @@ -175,16 +210,11 @@ def build_ovpn_options(daemon=False): # get user/group name # also from config. - user = os.getlogin() - gid = os.getgroups()[-1] - group = grp.getgrgid(gid).gr_name + user = get_username() + group = get_groupname() opts = [] - #moved to config files - #opts.append('--persist-tun') - #opts.append('--persist-key') - # set user and group opts.append('--user') opts.append('%s' % user) @@ -219,6 +249,8 @@ def build_ovpn_options(daemon=False): opts.append('--config') default_provider_path = get_default_provider_path() + + # XXX get rid of config_file at all ovpncnf = get_config_file( 'openvpn.conf', folder=default_provider_path) @@ -233,7 +265,7 @@ def build_ovpn_options(daemon=False): return opts -def build_ovpn_command(config, debug=False): +def build_ovpn_command(config, debug=False, do_pkexec_check=True): """ build a string with the complete openvpn invocation @@ -251,17 +283,16 @@ def build_ovpn_command(config, debug=False): if config.has_option('openvpn', 'use_pkexec'): use_pkexec = config.get('openvpn', 'use_pkexec') - if platform.system() == "Linux" and use_pkexec: + if platform.system() == "Linux" and use_pkexec and do_pkexec_check: # XXX check for both pkexec (done) # AND a suitable authentication # agent running. - # (until we implement setuid helper) logger.info('use_pkexec set to True') if not is_pkexec_in_system(): logger.error('no pkexec in system') - raise eip_exceptions.EIPNoPkexecAvailable + raise EIPNoPkexecAvailable if not is_auth_agent_running(): logger.warning( @@ -269,7 +300,7 @@ def build_ovpn_command(config, debug=False): "pkexec will use its own text " "based authentication agent. " "that's probably a bad idea") - raise eip_exceptions.EIPNoPolkitAuthAgentAvailable + raise EIPNoPolkitAuthAgentAvailable command.append('pkexec') @@ -283,7 +314,11 @@ def build_ovpn_command(config, debug=False): 'openvpn_binary') if ovpn: - command.append(ovpn) + vpn_command = ovpn + else: + vpn_command = "openvpn" + + command.append(vpn_command) daemon_mode = not debug @@ -291,6 +326,7 @@ def build_ovpn_command(config, debug=False): command.append(opt) # XXX check len and raise proper error + return [command[0], command[1:]] diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py new file mode 100644 index 00000000..12679ec6 --- /dev/null +++ b/src/leap/eip/tests/test_config.py @@ -0,0 +1,210 @@ +import ConfigParser +import os +import platform +import shutil +import socket +import tempfile + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.eip import config + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + +# XXX use mock_open here? + + +class EIPConfigTest(unittest.TestCase): + + __name__ = "eip_config_tests" + + def setUp(self): + self.old_path = os.environ['PATH'] + + self.tdir = tempfile.mkdtemp() + + bin_tdir = os.path.join( + self.tdir, + 'bin') + os.mkdir(bin_tdir) + os.environ['PATH'] = bin_tdir + + def tearDown(self): + os.environ['PATH'] = self.old_path + shutil.rmtree(self.tdir) + # + # helpers + # + + def get_username(self): + return config.get_username() + + def get_groupname(self): + return config.get_groupname() + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + def touch_exec(self): + tfile = os.path.join( + self.tdir, + 'bin', + 'openvpn') + open(tfile, 'bw').close() + + def get_empty_config(self): + _config = ConfigParser.ConfigParser() + return _config + + def get_minimal_config(self): + _config = ConfigParser.ConfigParser() + return _config + + def get_expected_openvpn_args(self): + args = [] + username = self.get_username() + groupname = self.get_groupname() + + args.append('--user') + args.append(username) + args.append('--group') + args.append(groupname) + args.append('--management-client-user') + args.append(username) + args.append('--management-signal') + args.append('--management') + + #XXX hey! + #get platform switches here! + args.append('/tmp/.eip.sock') + args.append('unix') + args.append('--config') + #XXX bad assumption. FIXME: expand $HOME + args.append('/home/%s/.config/leap/providers/default/openvpn.conf' % + username) + return args + + # + # tests + # + + # XXX fixme! /home/user should + # be replaced for proper home lookup. + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_lin_get_config_file(self): + """ + config file path where expected? (linux) + """ + self.assertEqual( + config.get_config_file( + 'test', folder="foo/bar"), + '/home/%s/.config/leap/foo/bar/test' % + self.get_username()) + + @unittest.skipUnless(_system == "Darwin", "mac only") + def test_mac_get_config_file(self): + """ + config file path where expected? (mac) + """ + self._missing_test_for_plat(do_raise=True) + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_get_config_file(self): + """ + config file path where expected? + """ + self._missing_test_for_plat(do_raise=True) + + # + # XXX hey, I'm raising exceptions here + # on purpose. just wanted to make sure + # that the skip stuff is doing it right. + # If you're working on win/macos tests, + # feel free to remove tests that you see + # are too redundant. + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_lin_get_config_dir(self): + """ + nice config dir? (linux) + """ + self.assertEqual( + config.get_config_dir(), + '/home/%s/.config/leap' % + self.get_username()) + + @unittest.skipUnless(_system == "Darwin", "mac only") + def test_mac_get_config_dir(self): + """ + nice config dir? (mac) + """ + self._missing_test_for_plat(do_raise=True) + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_get_config_dir(self): + """ + nice config dir? (win) + """ + self._missing_test_for_plat(do_raise=True) + + # provider paths + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_get_default_provider_path(self): + """ + is default provider path ok? + """ + self.assertEqual( + config.get_default_provider_path(), + '/home/%s/.config/leap/providers/default/' % + self.get_username()) + + # validate ip + + def test_validate_ip(self): + """ + check our ip validation + """ + config.validate_ip('3.3.3.3') + with self.assertRaises(socket.error): + config.validate_ip('255.255.255.256') + with self.assertRaises(socket.error): + config.validate_ip('foobar') + + @unittest.skip + def test_validate_domain(self): + """ + code to be written yet + """ + pass + + # build command string + # these tests are going to have to check + # many combinations. we should inject some + # params in the function call, to disable + # some checks. + # XXX breaking! + + def test_build_ovpn_command_empty_config(self): + _config = self.get_empty_config() + command, args = config.build_ovpn_command( + _config, + do_pkexec_check=False) + self.assertEqual(command, 'openvpn') + self.assertEqual(args, self.get_expected_openvpn_args()) + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 6fcbd68152689f98d9c5b7526eee2e1e9b7dd0a2 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:52:31 +0900 Subject: minor tweaks to setup + env test --- src/leap/eip/tests/test_config.py | 1 - 1 file changed, 1 deletion(-) (limited to 'src/leap') diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py index 12679ec6..11433777 100644 --- a/src/leap/eip/tests/test_config.py +++ b/src/leap/eip/tests/test_config.py @@ -195,7 +195,6 @@ class EIPConfigTest(unittest.TestCase): # many combinations. we should inject some # params in the function call, to disable # some checks. - # XXX breaking! def test_build_ovpn_command_empty_config(self): _config = self.get_empty_config() -- cgit v1.2.3 From 38e6c9c6345ca28ed0134ce6f4d43ec650103709 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 04:57:38 +0900 Subject: mv util tests to folder --- src/leap/util/test_fileutil.py | 97 ----------------------------- src/leap/util/test_leap_argparse.py | 27 -------- src/leap/util/tests/test_fileutil.py | 100 ++++++++++++++++++++++++++++++ src/leap/util/tests/test_leap_argparse.py | 30 +++++++++ 4 files changed, 130 insertions(+), 124 deletions(-) delete mode 100644 src/leap/util/test_fileutil.py delete mode 100644 src/leap/util/test_leap_argparse.py create mode 100644 src/leap/util/tests/test_fileutil.py create mode 100644 src/leap/util/tests/test_leap_argparse.py (limited to 'src/leap') diff --git a/src/leap/util/test_fileutil.py b/src/leap/util/test_fileutil.py deleted file mode 100644 index f5dbe108..00000000 --- a/src/leap/util/test_fileutil.py +++ /dev/null @@ -1,97 +0,0 @@ -import os -import platform -import shutil -import stat -import tempfile -import unittest - -from leap.util import fileutil - - -class FileUtilTest(unittest.TestCase): - """ - test our file utils - """ - - def setUp(self): - self.system = platform.system() - self.create_temp_dir() - - def tearDown(self): - self.remove_temp_dir() - - # - # helpers - # - - def create_temp_dir(self): - self.tmpdir = tempfile.mkdtemp() - - def remove_temp_dir(self): - shutil.rmtree(self.tmpdir) - - def get_file_path(self, filename): - return os.path.join( - self.tmpdir, - filename) - - def touch_exec_file(self): - fp = self.get_file_path('testexec') - open(fp, 'w').close() - os.chmod( - fp, - stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - return fp - - def get_mode(self, fp): - return stat.S_IMODE(os.stat(fp).st_mode) - - # - # tests - # - - def test_is_user_executable(self): - """ - touch_exec_file creates in mode 700? - """ - # XXX could check access X_OK - - fp = self.touch_exec_file() - mode = self.get_mode(fp) - self.assertEqual(mode, int('700', 8)) - - def test_which(self): - """ - which implementation ok? - not a very reliable test, - but I cannot think of anything smarter now - I guess it's highly improbable that copy - """ - # XXX yep, we can change the syspath - # for the test... ! - - if self.system == "Linux": - self.assertEqual( - fileutil.which('cp'), - '/bin/cp') - - def test_mkdir_p(self): - """ - our own mkdir -p implementation ok? - """ - testdir = self.get_file_path( - os.path.join('test', 'foo', 'bar')) - self.assertEqual(os.path.isdir(testdir), False) - fileutil.mkdir_p(testdir) - self.assertEqual(os.path.isdir(testdir), True) - - def test_check_and_fix_urw_only(self): - """ - ensure check_and_fix_urx_only ok? - """ - fp = self.touch_exec_file() - mode = self.get_mode(fp) - self.assertEqual(mode, int('700', 8)) - fileutil.check_and_fix_urw_only(fp) - mode = self.get_mode(fp) - self.assertEqual(mode, int('600', 8)) diff --git a/src/leap/util/test_leap_argparse.py b/src/leap/util/test_leap_argparse.py deleted file mode 100644 index 1442e827..00000000 --- a/src/leap/util/test_leap_argparse.py +++ /dev/null @@ -1,27 +0,0 @@ -from argparse import Namespace -import unittest - -from leap.util import leap_argparse - - -class LeapArgParseTest(unittest.TestCase): - """ - Test argparse options for eip client - """ - - def setUp(self): - """ - get the parser - """ - self.parser = leap_argparse.build_parser() - - def test_debug_mode(self): - """ - test debug mode option - """ - opts = self.parser.parse_args( - ['--debug']) - self.assertEqual( - opts, - Namespace(config_file=None, - debug=True)) diff --git a/src/leap/util/tests/test_fileutil.py b/src/leap/util/tests/test_fileutil.py new file mode 100644 index 00000000..f5131b3d --- /dev/null +++ b/src/leap/util/tests/test_fileutil.py @@ -0,0 +1,100 @@ +import os +import platform +import shutil +import stat +import tempfile +import unittest + +from leap.util import fileutil + + +class FileUtilTest(unittest.TestCase): + """ + test our file utils + """ + + def setUp(self): + self.system = platform.system() + self.create_temp_dir() + + def tearDown(self): + self.remove_temp_dir() + + # + # helpers + # + + def create_temp_dir(self): + self.tmpdir = tempfile.mkdtemp() + + def remove_temp_dir(self): + shutil.rmtree(self.tmpdir) + + def get_file_path(self, filename): + return os.path.join( + self.tmpdir, + filename) + + def touch_exec_file(self): + fp = self.get_file_path('testexec') + open(fp, 'w').close() + os.chmod( + fp, + stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) + return fp + + def get_mode(self, fp): + return stat.S_IMODE(os.stat(fp).st_mode) + + # + # tests + # + + def test_is_user_executable(self): + """ + touch_exec_file creates in mode 700? + """ + # XXX could check access X_OK + + fp = self.touch_exec_file() + mode = self.get_mode(fp) + self.assertEqual(mode, int('700', 8)) + + def test_which(self): + """ + which implementation ok? + not a very reliable test, + but I cannot think of anything smarter now + I guess it's highly improbable that copy + """ + # XXX yep, we can change the syspath + # for the test... ! + + if self.system == "Linux": + self.assertEqual( + fileutil.which('cp'), + '/bin/cp') + + def test_mkdir_p(self): + """ + our own mkdir -p implementation ok? + """ + testdir = self.get_file_path( + os.path.join('test', 'foo', 'bar')) + self.assertEqual(os.path.isdir(testdir), False) + fileutil.mkdir_p(testdir) + self.assertEqual(os.path.isdir(testdir), True) + + def test_check_and_fix_urw_only(self): + """ + ensure check_and_fix_urx_only ok? + """ + fp = self.touch_exec_file() + mode = self.get_mode(fp) + self.assertEqual(mode, int('700', 8)) + fileutil.check_and_fix_urw_only(fp) + mode = self.get_mode(fp) + self.assertEqual(mode, int('600', 8)) + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/util/tests/test_leap_argparse.py b/src/leap/util/tests/test_leap_argparse.py new file mode 100644 index 00000000..f4c86e36 --- /dev/null +++ b/src/leap/util/tests/test_leap_argparse.py @@ -0,0 +1,30 @@ +from argparse import Namespace +import unittest + +from leap.util import leap_argparse + + +class LeapArgParseTest(unittest.TestCase): + """ + Test argparse options for eip client + """ + + def setUp(self): + """ + get the parser + """ + self.parser = leap_argparse.build_parser() + + def test_debug_mode(self): + """ + test debug mode option + """ + opts = self.parser.parse_args( + ['--debug']) + self.assertEqual( + opts, + Namespace(config_file=None, + debug=True)) + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3 From 6ce22c7ebd293550473bfa5453a2f720dffad3e8 Mon Sep 17 00:00:00 2001 From: antialias Date: Tue, 21 Aug 2012 13:46:01 -0700 Subject: minor pep8 clean up. --- src/leap/eip/config.py | 8 ++++---- src/leap/eip/exceptions.py | 2 -- src/leap/eip/tests/tests_config.py | 7 ++++--- 3 files changed, 8 insertions(+), 9 deletions(-) (limited to 'src/leap') diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index a219fedb..e0151e87 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -403,14 +403,14 @@ def check_vpn_keys(config): check_and_fix_urw_only(keyfile) except OSError: raise EIPInitBadKeyFilePermError - - + + def get_config_json(config_file=None): """ will replace get_config function be developing them in parralel for branch purposes. @param: configuration file - @type: file + @type: file @rparam: configuration turples @rtype: dictionary """ @@ -421,7 +421,7 @@ def get_config_json(config_file=None): if not os.path.isdir(dpath): mkdir_p(dpath) with open(fpath, 'wb') as configfile: - configfile.write() + configfile.flush() config_file = open(fpath) config = json.load(config_file) diff --git a/src/leap/eip/exceptions.py b/src/leap/eip/exceptions.py index ac61f42b..3719c605 100644 --- a/src/leap/eip/exceptions.py +++ b/src/leap/eip/exceptions.py @@ -61,5 +61,3 @@ class EIPInitNoKeyFileError(Exception): class EIPInitBadKeyFilePermError(Exception): pass - - diff --git a/src/leap/eip/tests/tests_config.py b/src/leap/eip/tests/tests_config.py index 6534723e..5a0e2d94 100644 --- a/src/leap/eip/tests/tests_config.py +++ b/src/leap/eip/tests/tests_config.py @@ -5,14 +5,15 @@ import unittest from leap.eip import config + class TestConfig(unittest.TestCase): """ Test configuration help functions. """ + def test_get_config_json(self): config_js = config.get_config_json() self.assertTrue(isinstance(config_js, dict)) - self.assertTrue(config_js.has_key('transport')) - self.assertTrue(config_js.has_key('provider')) + self.assertTrue('transport' in config_js) + self.assertTrue('provider' in config_js) self.assertEqual(config_js['provider'], "testprovider.org") - -- cgit v1.2.3 From 3bd45c8e1e020bebf041bc266c5092a41f944130 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 07:05:39 +0900 Subject: removed dup exceptions --- src/leap/eip/config.py | 28 ---------------------------- 1 file changed, 28 deletions(-) (limited to 'src/leap') diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 8c67a258..a1dc2764 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -13,34 +13,6 @@ from leap.baseapp.permcheck import (is_pkexec_in_system, logger = logging.getLogger(name=__name__) logger.setLevel('DEBUG') -# XXX move exceptions: -# from leap.eip import exceptions as eip_exceptions - - -class EIPNoPkexecAvailable(Exception): - pass - - -class EIPNoPolkitAuthAgentAvailable(Exception): - pass - - -class EIPInitNoProviderError(Exception): - pass - - -class EIPInitBadProviderError(Exception): - pass - - -class EIPInitNoKeyFileError(Exception): - pass - - -class EIPInitBadKeyFilePermError(Exception): - pass - - OPENVPN_CONFIG_TEMPLATE = """#Autogenerated by eip-client wizard remote {VPN_REMOTE_HOST} {VPN_REMOTE_PORT} -- cgit v1.2.3 From 04676d5869c33a419d199b1be7dbb616c31434c2 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 07:12:03 +0900 Subject: moved json-config tests --- src/leap/eip/tests/test_config.py | 9 +++++++++ src/leap/eip/tests/tests_config.py | 19 ------------------- 2 files changed, 9 insertions(+), 19 deletions(-) delete mode 100644 src/leap/eip/tests/tests_config.py (limited to 'src/leap') diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py index 11433777..051faa29 100644 --- a/src/leap/eip/tests/test_config.py +++ b/src/leap/eip/tests/test_config.py @@ -204,6 +204,15 @@ class EIPConfigTest(unittest.TestCase): self.assertEqual(command, 'openvpn') self.assertEqual(args, self.get_expected_openvpn_args()) + # json config + + def test_get_config_json(self): + config_js = config.get_config_json() + self.assertTrue(isinstance(config_js, dict)) + self.assertTrue('transport' in config_js) + self.assertTrue('provider' in config_js) + self.assertEqual(config_js['provider'], "testprovider.org") + if __name__ == "__main__": unittest.main() diff --git a/src/leap/eip/tests/tests_config.py b/src/leap/eip/tests/tests_config.py deleted file mode 100644 index 5a0e2d94..00000000 --- a/src/leap/eip/tests/tests_config.py +++ /dev/null @@ -1,19 +0,0 @@ - -"""Test config helper functions""" - -import unittest - -from leap.eip import config - - -class TestConfig(unittest.TestCase): - """ - Test configuration help functions. - """ - - def test_get_config_json(self): - config_js = config.get_config_json() - self.assertTrue(isinstance(config_js, dict)) - self.assertTrue('transport' in config_js) - self.assertTrue('provider' in config_js) - self.assertEqual(config_js['provider'], "testprovider.org") -- cgit v1.2.3 From 83ac2efaa10de68f7fd35189f6cf272b03d60a30 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 07:46:51 +0900 Subject: fix exceptions location --- src/leap/eip/config.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/leap') diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index c77bb142..f38268e2 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -10,6 +10,7 @@ from leap.util.fileutil import (which, mkdir_p, check_and_fix_urw_only) from leap.baseapp.permcheck import (is_pkexec_in_system, is_auth_agent_running) +from leap.eip import exceptions as eip_exceptions logger = logging.getLogger(name=__name__) logger.setLevel('DEBUG') @@ -122,7 +123,7 @@ def check_or_create_default_vpnconf(config): except socket.error: # this does not look like an ip, dave - raise EIPInitBadProviderError + raise eip_exceptions.EIPInitBadProviderError if config.has_option('provider', 'remote_port'): remote_port = config.get('provider', @@ -265,7 +266,7 @@ def build_ovpn_command(config, debug=False, do_pkexec_check=True): if not is_pkexec_in_system(): logger.error('no pkexec in system') - raise EIPNoPkexecAvailable + raise eip_exceptions.EIPNoPkexecAvailable if not is_auth_agent_running(): logger.warning( @@ -273,7 +274,7 @@ def build_ovpn_command(config, debug=False, do_pkexec_check=True): "pkexec will use its own text " "based authentication agent. " "that's probably a bad idea") - raise EIPNoPolkitAuthAgentAvailable + raise eip_exceptions.EIPNoPolkitAuthAgentAvailable command.append('pkexec') @@ -410,7 +411,7 @@ def check_vpn_keys(config): try: check_and_fix_urw_only(keyfile) except OSError: - raise EIPInitBadKeyFilePermError + raise eip_exceptions.EIPInitBadKeyFilePermError def get_config_json(config_file=None): -- cgit v1.2.3 From 24f288b5214b814e2e7daa6ef41b226a27d96b81 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 22 Aug 2012 07:49:19 +0900 Subject: bye bye conductor, watcher (after refactor) --- src/leap/eip/conductor.py | 305 --------------------------------------------- src/leap/eip/vpnwatcher.py | 169 ------------------------- 2 files changed, 474 deletions(-) delete mode 100644 src/leap/eip/conductor.py delete mode 100644 src/leap/eip/vpnwatcher.py (limited to 'src/leap') diff --git a/src/leap/eip/conductor.py b/src/leap/eip/conductor.py deleted file mode 100644 index f528d639..00000000 --- a/src/leap/eip/conductor.py +++ /dev/null @@ -1,305 +0,0 @@ -""" -stablishes a vpn connection and monitors its state -""" -from __future__ import (division, unicode_literals, print_function) -#import threading -from functools import partial -import logging - -from leap.util.coroutines import spawn_and_watch_process - -# XXX from leap.eip import config as eipconfig -# from leap.eip import exceptions as eip_exceptions - -from leap.eip.config import (get_config, build_ovpn_command, - check_or_create_default_vpnconf, - check_vpn_keys, - EIPNoPkexecAvailable, - EIPNoPolkitAuthAgentAvailable, - EIPInitNoProviderError, - EIPInitBadProviderError, - EIPInitNoKeyFileError, - EIPInitBadKeyFilePermError) -from leap.eip.vpnwatcher import EIPConnectionStatus, status_watcher -from leap.eip.vpnmanager import OpenVPNManager, ConnectionRefusedError - -logger = logging.getLogger(name=__name__) - - -# -# Openvpn related classes -# -# XXX deprecated! moved to eipconnection - - -class OpenVPNConnection(object): - """ - All related to invocation - of the openvpn binary - """ - # Connection Methods - - def __init__(self, config_file=None, - watcher_cb=None, debug=False): - #XXX FIXME - #change watcher_cb to line_observer - """ - :param config_file: configuration file to read from - :param watcher_cb: callback to be \ -called for each line in watched stdout - :param signal_map: dictionary of signal names and callables \ -to be triggered for each one of them. - :type config_file: str - :type watcher_cb: function - :type signal_map: dict - """ - # XXX get host/port from config - self.manager = OpenVPNManager() - self.debug = debug - #print('conductor:%s' % debug) - - self.config_file = config_file - self.watcher_cb = watcher_cb - #self.signal_maps = signal_maps - - self.subp = None - self.watcher = None - - self.server = None - self.port = None - self.proto = None - - self.missing_pkexec = False - self.missing_auth_agent = False - self.bad_keyfile_perms = False - self.missing_vpn_keyfile = False - self.missing_provider = False - self.bad_provider = False - - self.command = None - self.args = None - - self.autostart = True - self._get_or_create_config() - self._check_vpn_keys() - - def _set_autostart(self): - config = self.config - if config.has_option('openvpn', 'autostart'): - autostart = config.getboolean('openvpn', - 'autostart') - self.autostart = autostart - else: - if config.has_option('DEFAULT', 'autostart'): - autostart = config.getboolean('DEFAULT', - 'autostart') - self.autostart = autostart - - def _set_ovpn_command(self): - config = self.config - if config.has_option('openvpn', 'command'): - commandline = config.get('openvpn', 'command') - - command_split = commandline.split(' ') - command = command_split[0] - if len(command_split) > 1: - args = command_split[1:] - else: - args = [] - - self.command = command - self.args = args - else: - # no command in config, we build it up. - # XXX check also for command-line --command flag - try: - command, args = build_ovpn_command(config, - debug=self.debug) - except EIPNoPolkitAuthAgentAvailable: - command = args = None - self.missing_auth_agent = True - except EIPNoPkexecAvailable: - command = args = None - self.missing_pkexec = True - - # XXX if not command, signal error. - self.command = command - self.args = args - - def _check_ovpn_config(self): - """ - checks if there is a default openvpn config. - if not, it writes one with info from the provider - definition file - """ - # TODO - # - get --with-openvpn-config from opts - try: - check_or_create_default_vpnconf(self.config) - except EIPInitNoProviderError: - logger.error('missing default provider definition') - self.missing_provider = True - except EIPInitBadProviderError: - logger.error('bad provider definition') - self.bad_provider = True - - def _get_or_create_config(self): - """ - retrieves the config options from defaults or - home file, or config file passed in command line. - populates command and args to be passed to subprocess. - """ - config = get_config(config_file=self.config_file) - self.config = config - - self._set_autostart() - self._set_ovpn_command() - self._check_ovpn_config() - - def _check_vpn_keys(self): - """ - checks for correct permissions on vpn keys - """ - try: - check_vpn_keys(self.config) - except EIPInitNoKeyFileError: - self.missing_vpn_keyfile = True - except EIPInitBadKeyFilePermError: - logger.error('error while checking vpn keys') - self.bad_keyfile_perms = True - - def _launch_openvpn(self): - """ - invocation of openvpn binaries in a subprocess. - """ - #XXX TODO: - #deprecate watcher_cb, - #use _only_ signal_maps instead - - if self.watcher_cb is not None: - linewrite_callback = self.watcher_cb - else: - #XXX get logger instead - linewrite_callback = lambda line: print('watcher: %s' % line) - - observers = (linewrite_callback, - partial(status_watcher, self.status)) - subp, watcher = spawn_and_watch_process( - self.command, - self.args, - observers=observers) - self.subp = subp - self.watcher = watcher - - #conn_result = self.status.CONNECTED - #return conn_result - - def _try_connection(self): - """ - attempts to connect - """ - if self.command is None: - raise EIPNoCommandError - if self.subp is not None: - print('cowardly refusing to launch subprocess again') - return - self._launch_openvpn() - - def cleanup(self): - """ - terminates child subprocess - """ - if self.subp: - self.subp.terminate() - - -class EIPConductor(OpenVPNConnection): - """ - Manages the execution of the OpenVPN process, auto starts, monitors the - network connection, handles configuration, fixes leaky hosts, handles - errors, etc. - Preferences will be stored via the Storage API. (TBD) - Status updates (connected, bandwidth, etc) are signaled to the GUI. - """ - - def __init__(self, *args, **kwargs): - self.settingsfile = kwargs.get('settingsfile', None) - self.logfile = kwargs.get('logfile', None) - self.error_queue = [] - self.desired_con_state = None # ??? - - status_signals = kwargs.pop('status_signals', None) - self.status = EIPConnectionStatus(callbacks=status_signals) - - super(EIPConductor, self).__init__(*args, **kwargs) - - def connect(self): - """ - entry point for connection process - """ - self.manager.forget_errors() - self._try_connection() - # XXX should capture errors here? - - def disconnect(self): - """ - disconnects client - """ - self._disconnect() - self.status.change_to(self.status.DISCONNECTED) - - def poll_connection_state(self): - """ - """ - try: - state = self.manager.get_connection_state() - except ConnectionRefusedError: - # connection refused. might be not ready yet. - return - if not state: - return - (ts, status_step, - ok, ip, remote) = state - self.status.set_vpn_state(status_step) - status_step = self.status.get_readable_status() - return (ts, status_step, ok, ip, remote) - - def get_icon_name(self): - """ - get icon name from status object - """ - return self.status.get_state_icon() - - # - # private methods - # - - def _disconnect(self): - """ - private method for disconnecting - """ - if self.subp is not None: - self.subp.terminate() - self.subp = None - # XXX signal state changes! :) - - def _is_alive(self): - """ - don't know yet - """ - pass - - def _connect(self): - """ - entry point for connection cascade methods. - """ - #conn_result = ConState.DISCONNECTED - try: - conn_result = self._try_connection() - except UnrecoverableError as except_msg: - logger.error("FATAL: %s" % unicode(except_msg)) - conn_result = self.status.UNRECOVERABLE - except Exception as except_msg: - self.error_queue.append(except_msg) - logger.error("Failed Connection: %s" % - unicode(except_msg)) diff --git a/src/leap/eip/vpnwatcher.py b/src/leap/eip/vpnwatcher.py deleted file mode 100644 index 09bd5811..00000000 --- a/src/leap/eip/vpnwatcher.py +++ /dev/null @@ -1,169 +0,0 @@ -"""generic watcher object that keeps track of connection status""" -# This should be deprecated in favor of daemon mode + management -# interface. But we can leave it here for debug purposes. - - -class EIPConnectionStatus(object): - """ - Keep track of client (gui) and openvpn - states. - - These are the OpenVPN states: - CONNECTING -- OpenVPN's initial state. - WAIT -- (Client only) Waiting for initial response - from server. - AUTH -- (Client only) Authenticating with server. - GET_CONFIG -- (Client only) Downloading configuration options - from server. - ASSIGN_IP -- Assigning IP address to virtual network - interface. - ADD_ROUTES -- Adding routes to system. - CONNECTED -- Initialization Sequence Completed. - RECONNECTING -- A restart has occurred. - EXITING -- A graceful exit is in progress. - - We add some extra states: - - DISCONNECTED -- GUI initial state. - UNRECOVERABLE -- An unrecoverable error has been raised - while invoking openvpn service. - """ - CONNECTING = 1 - WAIT = 2 - AUTH = 3 - GET_CONFIG = 4 - ASSIGN_IP = 5 - ADD_ROUTES = 6 - CONNECTED = 7 - RECONNECTING = 8 - EXITING = 9 - - # gui specific states: - UNRECOVERABLE = 11 - DISCONNECTED = 0 - - def __init__(self, callbacks=None): - """ - EIPConnectionStatus is initialized with a tuple - of signals to be triggered. - :param callbacks: a tuple of (callable) observers - :type callbacks: tuple - """ - # (callbacks to connect to signals in Qt-land) - self.current = self.DISCONNECTED - self.previous = None - self.callbacks = callbacks - - def get_readable_status(self): - # XXX DRY status / labels a little bit. - # think we'll want to i18n this. - human_status = { - 0: 'disconnected', - 1: 'connecting', - 2: 'waiting', - 3: 'authenticating', - 4: 'getting config', - 5: 'assigning ip', - 6: 'adding routes', - 7: 'connected', - 8: 'reconnecting', - 9: 'exiting', - 11: 'unrecoverable error', - } - return human_status[self.current] - - def get_state_icon(self): - """ - returns the high level icon - for each fine-grain openvpn state - """ - connecting = (self.CONNECTING, - self.WAIT, - self.AUTH, - self.GET_CONFIG, - self.ASSIGN_IP, - self.ADD_ROUTES) - connected = (self.CONNECTED,) - disconnected = (self.DISCONNECTED, - self.UNRECOVERABLE) - - # this can be made smarter, - # but it's like it'll change, - # so +readability. - - if self.current in connecting: - return "connecting" - if self.current in connected: - return "connected" - if self.current in disconnected: - return "disconnected" - - def set_vpn_state(self, status): - """ - accepts a state string from the management - interface, and sets the internal state. - :param status: openvpn STATE (uppercase). - :type status: str - """ - if hasattr(self, status): - self.change_to(getattr(self, status)) - - def set_current(self, to): - """ - setter for the 'current' property - :param to: destination state - :type to: int - """ - self.current = to - - def change_to(self, to): - """ - :param to: destination state - :type to: int - """ - if to == self.current: - return - changed = False - from_ = self.current - self.current = to - - # We can add transition restrictions - # here to ensure no transitions are - # allowed outside the fsm. - - self.set_current(to) - changed = True - - #trigger signals (as callbacks) - #print('current state: %s' % self.current) - if changed: - self.previous = from_ - if self.callbacks: - for cb in self.callbacks: - if callable(cb): - cb(self) - - -def status_watcher(cs, line): - """ - a wrapper that calls to ConnectionStatus object - :param cs: a EIPConnectionStatus instance - :type cs: EIPConnectionStatus object - :param line: a single line of the watched output - :type line: str - """ - #print('status watcher watching') - - # from the mullvad code, should watch for - # things like: - # "Initialization Sequence Completed" - # "With Errors" - # "Tap-Win32" - - if "Completed" in line: - cs.change_to(cs.CONNECTED) - return - - if "Initial packet from" in line: - cs.change_to(cs.CONNECTING) - return -- cgit v1.2.3