diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/eip/config.py | 72 | ||||
| -rw-r--r-- | src/leap/eip/tests/test_config.py | 210 | 
2 files changed, 264 insertions, 18 deletions
| diff --git a/src/leap/eip/config.py b/src/leap/eip/config.py index 8e55d789..8c67a258 100644 --- a/src/leap/eip/config.py +++ b/src/leap/eip/config.py @@ -9,15 +9,37 @@ from leap.util.fileutil import (which, mkdir_p,                                  check_and_fix_urw_only)  from leap.baseapp.permcheck import (is_pkexec_in_system,                                      is_auth_agent_running) -from leap.eip import exceptions as eip_exceptions  logger = logging.getLogger(name=__name__)  logger.setLevel('DEBUG') -# XXX this has to be REMOVED -# and all these options passed in the -# command line --> move to build_ovpn_command -# issue #447 +# XXX move exceptions: +# from leap.eip import exceptions as eip_exceptions + + +class EIPNoPkexecAvailable(Exception): +    pass + + +class EIPNoPolkitAuthAgentAvailable(Exception): +    pass + + +class EIPInitNoProviderError(Exception): +    pass + + +class EIPInitBadProviderError(Exception): +    pass + + +class EIPInitNoKeyFileError(Exception): +    pass + + +class EIPInitBadKeyFilePermError(Exception): +    pass +  OPENVPN_CONFIG_TEMPLATE = """#Autogenerated by eip-client wizard  remote {VPN_REMOTE_HOST} {VPN_REMOTE_PORT} @@ -114,6 +136,10 @@ def check_or_create_default_vpnconf(config):      # instead.      try: +        # XXX by now, we're expecting +        # only IP format for remote. +        # We should allow also domain names, +        # and make a reverse resolv.          remote_ip = config.get('provider',                                 'remote_ip')          validate_ip(remote_ip) @@ -158,6 +184,15 @@ def check_or_create_default_vpnconf(config):          f.write(ovpn_config) +def get_username(): +    return os.getlogin() + + +def get_groupname(): +    gid = os.getgroups()[-1] +    return grp.getgrgid(gid).gr_name + +  def build_ovpn_options(daemon=False):      """      build a list of options @@ -175,16 +210,11 @@ def build_ovpn_options(daemon=False):      # get user/group name      # also from config. -    user = os.getlogin() -    gid = os.getgroups()[-1] -    group = grp.getgrgid(gid).gr_name +    user = get_username() +    group = get_groupname()      opts = [] -    #moved to config files -    #opts.append('--persist-tun') -    #opts.append('--persist-key') -      # set user and group      opts.append('--user')      opts.append('%s' % user) @@ -219,6 +249,8 @@ def build_ovpn_options(daemon=False):      opts.append('--config')      default_provider_path = get_default_provider_path() + +    # XXX get rid of config_file at all      ovpncnf = get_config_file(          'openvpn.conf',          folder=default_provider_path) @@ -233,7 +265,7 @@ def build_ovpn_options(daemon=False):      return opts -def build_ovpn_command(config, debug=False): +def build_ovpn_command(config, debug=False, do_pkexec_check=True):      """      build a string with the      complete openvpn invocation @@ -251,17 +283,16 @@ def build_ovpn_command(config, debug=False):      if config.has_option('openvpn', 'use_pkexec'):          use_pkexec = config.get('openvpn', 'use_pkexec') -    if platform.system() == "Linux" and use_pkexec: +    if platform.system() == "Linux" and use_pkexec and do_pkexec_check:          # XXX check for both pkexec (done)          # AND a suitable authentication          # agent running. -        # (until we implement setuid helper)          logger.info('use_pkexec set to True')          if not is_pkexec_in_system():              logger.error('no pkexec in system') -            raise eip_exceptions.EIPNoPkexecAvailable +            raise EIPNoPkexecAvailable          if not is_auth_agent_running():              logger.warning( @@ -269,7 +300,7 @@ def build_ovpn_command(config, debug=False):                  "pkexec will use its own text "                  "based authentication agent. "                  "that's probably a bad idea") -            raise eip_exceptions.EIPNoPolkitAuthAgentAvailable +            raise EIPNoPolkitAuthAgentAvailable          command.append('pkexec') @@ -283,7 +314,11 @@ def build_ovpn_command(config, debug=False):                            'openvpn_binary')      if ovpn: -        command.append(ovpn) +        vpn_command = ovpn +    else: +        vpn_command = "openvpn" + +    command.append(vpn_command)      daemon_mode = not debug @@ -291,6 +326,7 @@ def build_ovpn_command(config, debug=False):          command.append(opt)      # XXX check len and raise proper error +      return [command[0], command[1:]] diff --git a/src/leap/eip/tests/test_config.py b/src/leap/eip/tests/test_config.py new file mode 100644 index 00000000..12679ec6 --- /dev/null +++ b/src/leap/eip/tests/test_config.py @@ -0,0 +1,210 @@ +import ConfigParser +import os +import platform +import shutil +import socket +import tempfile + +try: +    import unittest2 as unittest +except ImportError: +    import unittest + +from leap.eip import config + +_system = platform.system() + + +class NotImplementedError(Exception): +    pass + +# XXX use mock_open here? + + +class EIPConfigTest(unittest.TestCase): + +    __name__ = "eip_config_tests" + +    def setUp(self): +        self.old_path = os.environ['PATH'] + +        self.tdir = tempfile.mkdtemp() + +        bin_tdir = os.path.join( +            self.tdir, +            'bin') +        os.mkdir(bin_tdir) +        os.environ['PATH'] = bin_tdir + +    def tearDown(self): +        os.environ['PATH'] = self.old_path +        shutil.rmtree(self.tdir) +    # +    # helpers +    # + +    def get_username(self): +        return config.get_username() + +    def get_groupname(self): +        return config.get_groupname() + +    def _missing_test_for_plat(self, do_raise=False): +        if do_raise: +            raise NotImplementedError( +                "This test is not implemented " +                "for the running platform: %s" % +                _system) + +    def touch_exec(self): +        tfile = os.path.join( +            self.tdir, +            'bin', +            'openvpn') +        open(tfile, 'bw').close() + +    def get_empty_config(self): +        _config = ConfigParser.ConfigParser() +        return _config + +    def get_minimal_config(self): +        _config = ConfigParser.ConfigParser() +        return _config + +    def get_expected_openvpn_args(self): +        args = [] +        username = self.get_username() +        groupname = self.get_groupname() + +        args.append('--user') +        args.append(username) +        args.append('--group') +        args.append(groupname) +        args.append('--management-client-user') +        args.append(username) +        args.append('--management-signal') +        args.append('--management') + +        #XXX hey! +        #get platform switches here! +        args.append('/tmp/.eip.sock') +        args.append('unix') +        args.append('--config') +        #XXX bad assumption. FIXME: expand $HOME +        args.append('/home/%s/.config/leap/providers/default/openvpn.conf' % +                    username) +        return args + +    # +    # tests +    # + +    # XXX fixme! /home/user should +    # be replaced for proper home lookup. + +    @unittest.skipUnless(_system == "Linux", "linux only") +    def test_lin_get_config_file(self): +        """ +        config file path where expected? (linux) +        """ +        self.assertEqual( +            config.get_config_file( +                'test', folder="foo/bar"), +            '/home/%s/.config/leap/foo/bar/test' % +            self.get_username()) + +    @unittest.skipUnless(_system == "Darwin", "mac only") +    def test_mac_get_config_file(self): +        """ +        config file path where expected? (mac) +        """ +        self._missing_test_for_plat(do_raise=True) + +    @unittest.skipUnless(_system == "Windows", "win only") +    def test_win_get_config_file(self): +        """ +        config file path where expected? +        """ +        self._missing_test_for_plat(do_raise=True) + +    # +    # XXX hey, I'm raising exceptions here +    # on purpose. just wanted to make sure +    # that the skip stuff is doing it right. +    # If you're working on win/macos tests, +    # feel free to remove tests that you see +    # are too redundant. + +    @unittest.skipUnless(_system == "Linux", "linux only") +    def test_lin_get_config_dir(self): +        """ +        nice config dir? (linux) +        """ +        self.assertEqual( +            config.get_config_dir(), +            '/home/%s/.config/leap' % +            self.get_username()) + +    @unittest.skipUnless(_system == "Darwin", "mac only") +    def test_mac_get_config_dir(self): +        """ +        nice config dir? (mac) +        """ +        self._missing_test_for_plat(do_raise=True) + +    @unittest.skipUnless(_system == "Windows", "win only") +    def test_win_get_config_dir(self): +        """ +        nice config dir? (win) +        """ +        self._missing_test_for_plat(do_raise=True) + +    # provider paths + +    @unittest.skipUnless(_system == "Linux", "linux only") +    def test_get_default_provider_path(self): +        """ +        is default provider path ok? +        """ +        self.assertEqual( +            config.get_default_provider_path(), +            '/home/%s/.config/leap/providers/default/' % +            self.get_username()) + +    # validate ip + +    def test_validate_ip(self): +        """ +        check our ip validation +        """ +        config.validate_ip('3.3.3.3') +        with self.assertRaises(socket.error): +            config.validate_ip('255.255.255.256') +        with self.assertRaises(socket.error): +            config.validate_ip('foobar') + +    @unittest.skip +    def test_validate_domain(self): +        """ +        code to be written yet +        """ +        pass + +    # build command string +    # these tests are going to have to check +    # many combinations. we should inject some +    # params in the function call, to disable +    # some checks. +    # XXX breaking! + +    def test_build_ovpn_command_empty_config(self): +        _config = self.get_empty_config() +        command, args = config.build_ovpn_command( +            _config, +            do_pkexec_check=False) +        self.assertEqual(command, 'openvpn') +        self.assertEqual(args, self.get_expected_openvpn_args()) + + +if __name__ == "__main__": +    unittest.main() | 
