diff options
author | antialias <antialias@leap.se> | 2012-08-21 16:27:37 -0700 |
---|---|---|
committer | antialias <antialias@leap.se> | 2012-08-21 16:27:37 -0700 |
commit | 5d848fe4f127ea01615102e6d8f14bfb12dfe2da (patch) | |
tree | 51dff8de10f0276786c0c83a55932a7f8932f6e9 /src/leap/eip | |
parent | e9a817ceadab842ec5382e074e4d7861dbd2b898 (diff) | |
parent | 24f288b5214b814e2e7daa6ef41b226a27d96b81 (diff) |
Merge branch 'develop' of ssh://leap.se:4422/leap-client into develop
Diffstat (limited to 'src/leap/eip')
-rw-r--r-- | src/leap/eip/conductor.py | 305 | ||||
-rw-r--r-- | src/leap/eip/config.py | 76 | ||||
-rw-r--r-- | src/leap/eip/openvpnconnection.py | 5 | ||||
-rw-r--r-- | src/leap/eip/tests/test_config.py | 218 | ||||
-rw-r--r-- | src/leap/eip/tests/test_eipconnection.py | 180 | ||||
-rw-r--r-- | src/leap/eip/tests/test_openvpnconnection.py | 136 | ||||
-rw-r--r-- | src/leap/eip/vpnwatcher.py | 169 |
7 files changed, 593 insertions, 496 deletions
diff --git a/src/leap/eip/conductor.py b/src/leap/eip/conductor.py deleted file mode 100644 index f528d639..00000000 --- a/src/leap/eip/conductor.py +++ /dev/null @@ -1,305 +0,0 @@ -""" -stablishes a vpn connection and monitors its state -""" -from __future__ import (division, unicode_literals, print_function) -#import threading -from functools import partial -import logging - -from leap.util.coroutines import spawn_and_watch_process - -# XXX from leap.eip import config as eipconfig -# from leap.eip import exceptions as eip_exceptions - -from leap.eip.config import (get_config, build_ovpn_command, - check_or_create_default_vpnconf, - check_vpn_keys, - EIPNoPkexecAvailable, - EIPNoPolkitAuthAgentAvailable, - EIPInitNoProviderError, - EIPInitBadProviderError, - EIPInitNoKeyFileError, - EIPInitBadKeyFilePermError) -from leap.eip.vpnwatcher import EIPConnectionStatus, status_watcher -from leap.eip.vpnmanager import OpenVPNManager, ConnectionRefusedError - -logger = logging.getLogger(name=__name__) - - -# -# Openvpn related classes -# -# XXX deprecated! moved to eipconnection - - -class OpenVPNConnection(object): - """ - All related to invocation - of the openvpn binary - """ - # Connection Methods - - def __init__(self, config_file=None, - watcher_cb=None, debug=False): - #XXX FIXME - #change watcher_cb to line_observer - """ - :param config_file: configuration file to read from - :param watcher_cb: callback to be \ -called for each line in watched stdout - :param signal_map: dictionary of signal names and callables \ -to be triggered for each one of them. - :type config_file: str - :type watcher_cb: function - :type signal_map: dict - """ - # XXX get host/port from config - self.manager = OpenVPNManager() - self.debug = debug - #print('conductor:%s' % debug) - - self.config_file = config_file - self.watcher_cb = watcher_cb - #self.signal_maps = signal_maps - - self.subp = None - self.watcher = None - - self.server = None - self.port = None - self.proto = None - - self.missing_pkexec = False - self.missing_auth_agent = False - self.bad_keyfile_perms = False - self.missing_vpn_keyfile = False - self.missing_provider = False - self.bad_provider = False - - self.command = None - self.args = None - - self.autostart = True - self._get_or_create_config() - self._check_vpn_keys() - - def _set_autostart(self): - config = self.config - if config.has_option('openvpn', 'autostart'): - autostart = config.getboolean('openvpn', - 'autostart') - self.autostart = autostart - else: - if config.has_option('DEFAULT', 'autostart'): - autostart = config.getboolean('DEFAULT', - 'autostart') - self.autostart = autostart - - def _set_ovpn_command(self): - config = self.config - if config.has_option('openvpn', 'command'): - commandline = config.get('openvpn', 'command') - - command_split = commandline.split(' ') - command = command_split[0] - if len(command_split) > 1: - args = command_split[1:] - else: - args = [] - - self.command = command - self.args = args - else: - # no command in config, we build it up. - # XXX check also for command-line --command flag - try: - command, args = build_ovpn_command(config, - debug=self.debug) - except EIPNoPolkitAuthAgentAvailable: - command = args = None - self.missing_auth_agent = True - except EIPNoPkexecAvailable: - command = args = None - self.missing_pkexec = True - - # XXX if not command, signal error. - self.command = command - self.args = args - - def _check_ovpn_config(self): - """ - checks if there is a default openvpn config. - if not, it writes one with info from the provider - definition file - """ - # TODO - # - get --with-openvpn-config from opts - try: - check_or_create_default_vpnconf(self.config) - except EIPInitNoProviderError: - logger.error('missing default provider definition') - self.missing_provider = True - except EIPInitBadProviderError: - logger.error('bad provider definition') - self.bad_provider = True - - def _get_or_create_config(self): - """ - retrieves the config options from defaults or - home file, or config file passed in command line. - populates command and args to be passed to subprocess. - """ - config = get_config(config_file=self.config_file) - self.config = config - - self._set_autostart() - self._set_ovpn_command() - self._check_ovpn_config() - - def _check_vpn_keys(self): - """ - checks for correct permissions on vpn keys - """ - try: - check_vpn_keys(self.config) - except EIPInitNoKeyFileError: - self.missing_vpn_keyfile = True - except EIPInitBadKeyFilePermError: - logger.error('error while checking vpn keys') - self.bad_keyfile_perms = True - - def _launch_openvpn(self): - """ - invocation of openvpn binaries in a subprocess. - """ - #XXX TODO: - #deprecate watcher_cb, - #use _only_ signal_maps instead - - if self.watcher_cb is not None: - linewrite_callback = self.watcher_cb - else: - #XXX get logger instead - linewrite_callback = lambda line: print('watcher: %s' % line) - - observers = (linewrite_callback, - partial(status_watcher, self.status)) - subp, watcher = spawn_and_watch_process( - self.command, - self.args, - observers=observers) - self.subp = subp - self.watcher = watcher - - #conn_result = self.status.CONNECTED - #return conn_result - - def _try_connection(self): - """ - attempts to connect - """ - if self.command is None: - raise EIPNoCommandError - if self.subp is not None: - print('cowardly refusing to launch subprocess again') - return - self._launch_openvpn() - - def cleanup(self): - """ - terminates child subprocess - """ - if self.subp: - self.subp.terminate() - - -class EIPConductor(OpenVPNConnection): - """ - Manages the execution of the OpenVPN process, auto starts, monitors the - network connection, handles configuration, fixes leaky hosts, handles - errors, etc. - Preferences will be stored via the Storage API. (TBD) - Status updates (connected, bandwidth, etc) are signaled to the GUI. - """ - - def __init__(self, *args, **kwargs): - self.settingsfile = kwargs.get('settingsfile', None) - self.logfile = kwargs.get('logfile', None) - self.error_queue = [] - self.desired_con_state = None # ??? - - status_signals = kwargs.pop('status_signals', None) - self.status = EIPConnectionStatus(callbacks=status_signals) - - super(EIPConductor, self).__init__(*args, **kwargs) - - def connect(self): - """ - entry point for connection process - """ - self.manager.forget_errors() - self._try_connection() - # XXX should capture errors here? - - def disconnect(self): - """ - disconnects client - """ - self._disconnect() - self.status.change_to(self.status.DISCONNECTED) - - def poll_connection_state(self): - """ - """ - try: - state = self.manager.get_connection_state() - except ConnectionRefusedError: - # connection refused. might be not ready yet. - return - if not state: - return - (ts, status_step, - ok, ip, remote) = state - self.status.set_vpn_state(status_step) - status_step = self.status.get_readable_status() - return (ts, status_step, ok, ip, remote) - - def get_icon_name(self): - """ - get icon name from status object - """ - return self.status.get_state_icon() - - # - # private methods - # - - def _disconnect(self): - """ - private method for disconnecting - """ - if self.subp is not None: - self.subp.terminate() - self.subp = None - # XXX signal state changes! :) - - def _is_alive(self): - """ - don't know yet - """ - pass - - def _connect(self): - """ - entry point for connection cascade methods. - """ - #conn_result = ConState.DISCONNECTED - try: - conn_result = self._try_connection() - except UnrecoverableError as except_msg: - logger.error("FATAL: %s" % unicode(except_msg)) - conn_result = self.status.UNRECOVERABLE - except Exception as except_msg: - self.error_queue.append(except_msg) - logger.error("Failed Connection: %s" % - unicode(except_msg)) diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 8e55d789..f38268e2 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -2,6 +2,7 @@ import ConfigParser import grp import logging import os +import json import platform import socket @@ -14,11 +15,6 @@ from leap.eip import exceptions as eip_exceptions logger = logging.getLogger(name=__name__) logger.setLevel('DEBUG') -# XXX this has to be REMOVED -# and all these options passed in the -# command line --> move to build_ovpn_command -# issue #447 - OPENVPN_CONFIG_TEMPLATE = """#Autogenerated by eip-client wizard remote {VPN_REMOTE_HOST} {VPN_REMOTE_PORT} @@ -114,16 +110,20 @@ def check_or_create_default_vpnconf(config): # instead. try: + # XXX by now, we're expecting + # only IP format for remote. + # We should allow also domain names, + # and make a reverse resolv. remote_ip = config.get('provider', 'remote_ip') validate_ip(remote_ip) - except ConfigParser.NoOptionError: - raise EIPInitNoProviderError + except ConfigParser.NoSectionError: + raise eip_exceptions.EIPInitNoProviderError except socket.error: # this does not look like an ip, dave - raise EIPInitBadProviderError + raise eip_exceptions.EIPInitBadProviderError if config.has_option('provider', 'remote_port'): remote_port = config.get('provider', @@ -158,6 +158,15 @@ def check_or_create_default_vpnconf(config): f.write(ovpn_config) +def get_username(): + return os.getlogin() + + +def get_groupname(): + gid = os.getgroups()[-1] + return grp.getgrgid(gid).gr_name + + def build_ovpn_options(daemon=False): """ build a list of options @@ -175,16 +184,11 @@ def build_ovpn_options(daemon=False): # get user/group name # also from config. - user = os.getlogin() - gid = os.getgroups()[-1] - group = grp.getgrgid(gid).gr_name + user = get_username() + group = get_groupname() opts = [] - #moved to config files - #opts.append('--persist-tun') - #opts.append('--persist-key') - # set user and group opts.append('--user') opts.append('%s' % user) @@ -219,6 +223,8 @@ def build_ovpn_options(daemon=False): opts.append('--config') default_provider_path = get_default_provider_path() + + # XXX get rid of config_file at all ovpncnf = get_config_file( 'openvpn.conf', folder=default_provider_path) @@ -233,7 +239,7 @@ def build_ovpn_options(daemon=False): return opts -def build_ovpn_command(config, debug=False): +def build_ovpn_command(config, debug=False, do_pkexec_check=True): """ build a string with the complete openvpn invocation @@ -251,12 +257,11 @@ def build_ovpn_command(config, debug=False): if config.has_option('openvpn', 'use_pkexec'): use_pkexec = config.get('openvpn', 'use_pkexec') - if platform.system() == "Linux" and use_pkexec: + if platform.system() == "Linux" and use_pkexec and do_pkexec_check: # XXX check for both pkexec (done) # AND a suitable authentication # agent running. - # (until we implement setuid helper) logger.info('use_pkexec set to True') if not is_pkexec_in_system(): @@ -283,7 +288,11 @@ def build_ovpn_command(config, debug=False): 'openvpn_binary') if ovpn: - command.append(ovpn) + vpn_command = ovpn + else: + vpn_command = "openvpn" + + command.append(vpn_command) daemon_mode = not debug @@ -291,6 +300,7 @@ def build_ovpn_command(config, debug=False): command.append(opt) # XXX check len and raise proper error + return [command[0], command[1:]] @@ -394,11 +404,35 @@ def check_vpn_keys(config): if not os.path.isfile(keyfile): logger.error('key file %s not found. aborting.', keyfile) - raise EIPInitNoKeyFileError + raise eip_exceptions.EIPInitNoKeyFileError # check proper permission on keys # bad perms? try to fix them try: check_and_fix_urw_only(keyfile) except OSError: - raise EIPInitBadKeyFilePermError + raise eip_exceptions.EIPInitBadKeyFilePermError + + +def get_config_json(config_file=None): + """ + will replace get_config function be developing them + in parralel for branch purposes. + @param: configuration file + @type: file + @rparam: configuration turples + @rtype: dictionary + """ + if not config_file: + fpath = get_config_file('eip.json') + if not os.path.isfile(fpath): + dpath, cfile = os.path.split(fpath) + if not os.path.isdir(dpath): + mkdir_p(dpath) + with open(fpath, 'wb') as configfile: + configfile.flush() + config_file = open(fpath) + + config = json.load(config_file) + + return config diff --git a/src/leap/eip/openvpnconnection.py b/src/leap/eip/openvpnconnection.py index a230d229..3972b617 100644 --- a/src/leap/eip/openvpnconnection.py +++ b/src/leap/eip/openvpnconnection.py @@ -85,9 +85,12 @@ to be triggered for each one of them. self.host = host if isinstance(port, str) and port.isdigit(): port = int(port) + elif port == "unix": + port = "unix" + else: + port = None self.port = port self.password = password - #self.tn = None def _set_autostart(self): config = self.config diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py new file mode 100644 index 00000000..051faa29 --- /dev/null +++ b/src/leap/eip/tests/test_config.py @@ -0,0 +1,218 @@ +import ConfigParser +import os +import platform +import shutil +import socket +import tempfile + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.eip import config + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + +# XXX use mock_open here? + + +class EIPConfigTest(unittest.TestCase): + + __name__ = "eip_config_tests" + + def setUp(self): + self.old_path = os.environ['PATH'] + + self.tdir = tempfile.mkdtemp() + + bin_tdir = os.path.join( + self.tdir, + 'bin') + os.mkdir(bin_tdir) + os.environ['PATH'] = bin_tdir + + def tearDown(self): + os.environ['PATH'] = self.old_path + shutil.rmtree(self.tdir) + # + # helpers + # + + def get_username(self): + return config.get_username() + + def get_groupname(self): + return config.get_groupname() + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + def touch_exec(self): + tfile = os.path.join( + self.tdir, + 'bin', + 'openvpn') + open(tfile, 'bw').close() + + def get_empty_config(self): + _config = ConfigParser.ConfigParser() + return _config + + def get_minimal_config(self): + _config = ConfigParser.ConfigParser() + return _config + + def get_expected_openvpn_args(self): + args = [] + username = self.get_username() + groupname = self.get_groupname() + + args.append('--user') + args.append(username) + args.append('--group') + args.append(groupname) + args.append('--management-client-user') + args.append(username) + args.append('--management-signal') + args.append('--management') + + #XXX hey! + #get platform switches here! + args.append('/tmp/.eip.sock') + args.append('unix') + args.append('--config') + #XXX bad assumption. FIXME: expand $HOME + args.append('/home/%s/.config/leap/providers/default/openvpn.conf' % + username) + return args + + # + # tests + # + + # XXX fixme! /home/user should + # be replaced for proper home lookup. + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_lin_get_config_file(self): + """ + config file path where expected? (linux) + """ + self.assertEqual( + config.get_config_file( + 'test', folder="foo/bar"), + '/home/%s/.config/leap/foo/bar/test' % + self.get_username()) + + @unittest.skipUnless(_system == "Darwin", "mac only") + def test_mac_get_config_file(self): + """ + config file path where expected? (mac) + """ + self._missing_test_for_plat(do_raise=True) + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_get_config_file(self): + """ + config file path where expected? + """ + self._missing_test_for_plat(do_raise=True) + + # + # XXX hey, I'm raising exceptions here + # on purpose. just wanted to make sure + # that the skip stuff is doing it right. + # If you're working on win/macos tests, + # feel free to remove tests that you see + # are too redundant. + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_lin_get_config_dir(self): + """ + nice config dir? (linux) + """ + self.assertEqual( + config.get_config_dir(), + '/home/%s/.config/leap' % + self.get_username()) + + @unittest.skipUnless(_system == "Darwin", "mac only") + def test_mac_get_config_dir(self): + """ + nice config dir? (mac) + """ + self._missing_test_for_plat(do_raise=True) + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_get_config_dir(self): + """ + nice config dir? (win) + """ + self._missing_test_for_plat(do_raise=True) + + # provider paths + + @unittest.skipUnless(_system == "Linux", "linux only") + def test_get_default_provider_path(self): + """ + is default provider path ok? + """ + self.assertEqual( + config.get_default_provider_path(), + '/home/%s/.config/leap/providers/default/' % + self.get_username()) + + # validate ip + + def test_validate_ip(self): + """ + check our ip validation + """ + config.validate_ip('3.3.3.3') + with self.assertRaises(socket.error): + config.validate_ip('255.255.255.256') + with self.assertRaises(socket.error): + config.validate_ip('foobar') + + @unittest.skip + def test_validate_domain(self): + """ + code to be written yet + """ + pass + + # build command string + # these tests are going to have to check + # many combinations. we should inject some + # params in the function call, to disable + # some checks. + + def test_build_ovpn_command_empty_config(self): + _config = self.get_empty_config() + command, args = config.build_ovpn_command( + _config, + do_pkexec_check=False) + self.assertEqual(command, 'openvpn') + self.assertEqual(args, self.get_expected_openvpn_args()) + + # json config + + def test_get_config_json(self): + config_js = config.get_config_json() + self.assertTrue(isinstance(config_js, dict)) + self.assertTrue('transport' in config_js) + self.assertTrue('provider' in config_js) + self.assertEqual(config_js['provider'], "testprovider.org") + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py new file mode 100644 index 00000000..51772b7c --- /dev/null +++ b/src/leap/eip/tests/test_eipconnection.py @@ -0,0 +1,180 @@ +import ConfigParser +import logging +import platform + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip.eipconnection import EIPConnection +from leap.eip.exceptions import ConnectionRefusedError + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +@patch('OpenVPNConnection._get_or_create_config') +@patch('OpenVPNConnection._set_ovpn_command') +class MockedEIPConnection(EIPConnection): + def _get_or_create_config(self): + self.config = ConfigParser.ConfigParser() + self._set_ovpn_command() + + def _set_ovpn_command(self): + self.command = "mock_command" + self.args = [1, 2, 3] + + +class EIPConductorTest(unittest.TestCase): + + __name__ = "eip_conductor_tests" + + def setUp(self): + self.manager = Mock( + name="openvpnmanager_mock") + + self.con = MockedEIPConnection() + #manager=self.manager) + + def tearDown(self): + del self.con + + # + # helpers + # + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skip + #ain't manager anymore! + def test_manager_was_initialized(self): + """ + manager init ok during conductor init? + """ + self.manager.assert_called_once_with() + + def test_vpnconnection_defaults(self): + """ + default attrs as expected + """ + con = self.con + self.assertEqual(con.autostart, True) + self.assertEqual(con.missing_pkexec, False) + self.assertEqual(con.missing_vpn_keyfile, False) + self.assertEqual(con.missing_provider, False) + self.assertEqual(con.bad_provider, False) + + def test_config_was_init(self): + """ + is there a config object? + """ + self.assertTrue(isinstance(self.con.config, + ConfigParser.ConfigParser)) + + def test_ovpn_command(self): + """ + set_ovpn_command called + """ + self.assertEqual(self.con.command, + "mock_command") + self.assertEqual(self.con.args, + [1, 2, 3]) + + # connect/disconnect calls + + def test_disconnect(self): + """ + disconnect method calls private and changes status + """ + self.con._disconnect = Mock( + name="_disconnect") + + # first we set status to connected + self.con.status.set_current(self.con.status.CONNECTED) + self.assertEqual(self.con.status.current, + self.con.status.CONNECTED) + + # disconnect + self.con.disconnect() + self.con._disconnect.assert_called_once_with() + + # new status should be disconnected + # XXX this should evolve and check no errors + # during disconnection + self.assertEqual(self.con.status.current, + self.con.status.DISCONNECTED) + + def test_connect(self): + """ + connect calls _launch_openvpn private + """ + self.con._launch_openvpn = Mock() + self.con.connect() + self.con._launch_openvpn.assert_called_once_with() + + # XXX tests breaking here ... + + def test_good_poll_connection_state(self): + """ + """ + #@patch -- + # self.manager.get_connection_state + + #XXX review this set of poll_state tests + #they SHOULD NOT NEED TO MOCK ANYTHING IN THE + #lower layers!! -- status, vpn_manager.. + #right now we're testing implementation, not + #behavior!!! + good_state = ["1345466946", "unknown_state", "ok", + "192.168.1.1", "192.168.1.100"] + self.con.get_connection_state = Mock(return_value=good_state) + self.con.status.set_vpn_state = Mock() + + state = self.con.poll_connection_state() + good_state[1] = "disconnected" + final_state = tuple(good_state) + self.con.status.set_vpn_state.assert_called_with("unknown_state") + self.assertEqual(state, final_state) + + # TODO between "good" and "bad" (exception raised) cases, + # we can still test for malformed states and see that only good + # states do have a change (and from only the expected transition + # states). + + def test_bad_poll_connection_state(self): + """ + get connection state raises ConnectionRefusedError + state is None + """ + self.con.get_connection_state = Mock( + side_effect=ConnectionRefusedError('foo!')) + state = self.con.poll_connection_state() + self.assertEqual(state, None) + + + # XXX more things to test: + # - called config routines during initz. + # - raising proper exceptions with no config + # - called proper checks on config / permissions + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py new file mode 100644 index 00000000..dea75b55 --- /dev/null +++ b/src/leap/eip/tests/test_openvpnconnection.py @@ -0,0 +1,136 @@ +import logging +import platform +#import socket + +logging.basicConfig() +logger = logging.getLogger(name=__name__) + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import Mock, patch # MagicMock + +from leap.eip import openvpnconnection +from leap.eip import exceptions as eip_exceptions +from leap.eip.udstelnet import UDSTelnet + +_system = platform.system() + + +class NotImplementedError(Exception): + pass + + +mock_UDSTelnet = Mock(spec=UDSTelnet) +# XXX cautious!!! +# this might be fragile right now (counting a global +# reference of calls I think. +# investigate this other form instead: +# http://www.voidspace.org.uk/python/mock/patch.html#start-and-stop + +# XXX redo after merge-refactor + + +@patch('openvpnconnection.OpenVPNConnection.connect_to_management') +class MockedOpenVPNConnection(openvpnconnection.OpenVPNConnection): + def __init__(self, *args, **kwargs): + self.mock_UDSTelnet = Mock() + super(MockedOpenVPNConnection, self).__init__( + *args, **kwargs) + self.tn = self.mock_UDSTelnet(self.host, self.port) + + def connect_to_management(self): + #print 'patched connect' + self.tn = mock_UDSTelnet(self.host, port=self.port) + + +class OpenVPNConnectionTest(unittest.TestCase): + + __name__ = "vpnconnection_tests" + + def setUp(self): + self.manager = MockedOpenVPNConnection() + + def tearDown(self): + del self.manager + + # + # helpers + # + + # XXX hey, refactor this to basetestclass + + def _missing_test_for_plat(self, do_raise=False): + if do_raise: + raise NotImplementedError( + "This test is not implemented " + "for the running platform: %s" % + _system) + + # + # tests + # + + @unittest.skipIf(_system == "Windows", "lin/mac only") + def test_lin_mac_default_init(self): + """ + check default host for management iface + """ + self.assertEqual(self.manager.host, '/tmp/.eip.sock') + self.assertEqual(self.manager.port, 'unix') + + @unittest.skipUnless(_system == "Windows", "win only") + def test_win_default_init(self): + """ + check default host for management iface + """ + # XXX should we make the platform specific switch + # here or in the vpn command string building? + self.assertEqual(self.manager.host, 'localhost') + self.assertEqual(self.manager.port, 7777) + + def test_port_types_init(self): + self.manager = MockedOpenVPNConnection(port="42") + self.assertEqual(self.manager.port, 42) + self.manager = MockedOpenVPNConnection() + self.assertEqual(self.manager.port, "unix") + self.manager = MockedOpenVPNConnection(port="bad") + self.assertEqual(self.manager.port, None) + + def test_connect_raises_missing_socket(self): + self.manager = openvpnconnection.OpenVPNConnection() + with self.assertRaises(eip_exceptions.MissingSocketError): + self.manager.connect_to_management() + + def test_uds_telnet_called_on_connect(self): + self.manager.connect_to_management() + mock_UDSTelnet.assert_called_with( + self.manager.host, + port=self.manager.port) + + @unittest.skip + def test_connect(self): + raise NotImplementedError + # XXX calls close + # calls UDSTelnet mock. + + # XXX + # tests to write: + # UDSTelnetTest (for real?) + # HAVE A LOOK AT CORE TESTS FOR TELNETLIB. + # very illustrative instead... + + # - raise MissingSocket + # - raise ConnectionRefusedError + # - test send command + # - tries connect + # - ... tries? + # - ... calls _seek_to_eof + # - ... read_until --> return value + # - ... + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/eip/vpnwatcher.py b/src/leap/eip/vpnwatcher.py deleted file mode 100644 index 09bd5811..00000000 --- a/src/leap/eip/vpnwatcher.py +++ /dev/null @@ -1,169 +0,0 @@ -"""generic watcher object that keeps track of connection status""" -# This should be deprecated in favor of daemon mode + management -# interface. But we can leave it here for debug purposes. - - -class EIPConnectionStatus(object): - """ - Keep track of client (gui) and openvpn - states. - - These are the OpenVPN states: - CONNECTING -- OpenVPN's initial state. - WAIT -- (Client only) Waiting for initial response - from server. - AUTH -- (Client only) Authenticating with server. - GET_CONFIG -- (Client only) Downloading configuration options - from server. - ASSIGN_IP -- Assigning IP address to virtual network - interface. - ADD_ROUTES -- Adding routes to system. - CONNECTED -- Initialization Sequence Completed. - RECONNECTING -- A restart has occurred. - EXITING -- A graceful exit is in progress. - - We add some extra states: - - DISCONNECTED -- GUI initial state. - UNRECOVERABLE -- An unrecoverable error has been raised - while invoking openvpn service. - """ - CONNECTING = 1 - WAIT = 2 - AUTH = 3 - GET_CONFIG = 4 - ASSIGN_IP = 5 - ADD_ROUTES = 6 - CONNECTED = 7 - RECONNECTING = 8 - EXITING = 9 - - # gui specific states: - UNRECOVERABLE = 11 - DISCONNECTED = 0 - - def __init__(self, callbacks=None): - """ - EIPConnectionStatus is initialized with a tuple - of signals to be triggered. - :param callbacks: a tuple of (callable) observers - :type callbacks: tuple - """ - # (callbacks to connect to signals in Qt-land) - self.current = self.DISCONNECTED - self.previous = None - self.callbacks = callbacks - - def get_readable_status(self): - # XXX DRY status / labels a little bit. - # think we'll want to i18n this. - human_status = { - 0: 'disconnected', - 1: 'connecting', - 2: 'waiting', - 3: 'authenticating', - 4: 'getting config', - 5: 'assigning ip', - 6: 'adding routes', - 7: 'connected', - 8: 'reconnecting', - 9: 'exiting', - 11: 'unrecoverable error', - } - return human_status[self.current] - - def get_state_icon(self): - """ - returns the high level icon - for each fine-grain openvpn state - """ - connecting = (self.CONNECTING, - self.WAIT, - self.AUTH, - self.GET_CONFIG, - self.ASSIGN_IP, - self.ADD_ROUTES) - connected = (self.CONNECTED,) - disconnected = (self.DISCONNECTED, - self.UNRECOVERABLE) - - # this can be made smarter, - # but it's like it'll change, - # so +readability. - - if self.current in connecting: - return "connecting" - if self.current in connected: - return "connected" - if self.current in disconnected: - return "disconnected" - - def set_vpn_state(self, status): - """ - accepts a state string from the management - interface, and sets the internal state. - :param status: openvpn STATE (uppercase). - :type status: str - """ - if hasattr(self, status): - self.change_to(getattr(self, status)) - - def set_current(self, to): - """ - setter for the 'current' property - :param to: destination state - :type to: int - """ - self.current = to - - def change_to(self, to): - """ - :param to: destination state - :type to: int - """ - if to == self.current: - return - changed = False - from_ = self.current - self.current = to - - # We can add transition restrictions - # here to ensure no transitions are - # allowed outside the fsm. - - self.set_current(to) - changed = True - - #trigger signals (as callbacks) - #print('current state: %s' % self.current) - if changed: - self.previous = from_ - if self.callbacks: - for cb in self.callbacks: - if callable(cb): - cb(self) - - -def status_watcher(cs, line): - """ - a wrapper that calls to ConnectionStatus object - :param cs: a EIPConnectionStatus instance - :type cs: EIPConnectionStatus object - :param line: a single line of the watched output - :type line: str - """ - #print('status watcher watching') - - # from the mullvad code, should watch for - # things like: - # "Initialization Sequence Completed" - # "With Errors" - # "Tap-Win32" - - if "Completed" in line: - cs.change_to(cs.CONNECTED) - return - - if "Initial packet from" in line: - cs.change_to(cs.CONNECTING) - return |