diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/base/checks.py | 27 | ||||
| -rw-r--r-- | src/leap/base/exceptions.py | 4 | ||||
| -rw-r--r-- | src/leap/base/tests/test_checks.py | 30 | 
3 files changed, 61 insertions, 0 deletions
diff --git a/src/leap/base/checks.py b/src/leap/base/checks.py index c5438b09..a775e162 100644 --- a/src/leap/base/checks.py +++ b/src/leap/base/checks.py @@ -1,5 +1,6 @@  # -*- coding: utf-8 -*- +import logging  import platform  import ping @@ -8,6 +9,7 @@ import requests  from leap.base import constants  from leap.base import exceptions +logger = logging.getLogger(name=__name__)  class LeapNetworkChecker(object):      """ @@ -22,6 +24,7 @@ class LeapNetworkChecker(object):          self.error = None  # ?          # for MVS +        checker.check_tunnel_default_interface()          checker.check_internet_connection()          checker.is_internet_up()          checker.ping_gateway() @@ -40,11 +43,35 @@ class LeapNetworkChecker(object):                  else:                      error = "Provider server appears to be down."              raise exceptions.NoInternetConnection(error) +        logger.debug('Network appears to be up.')      def is_internet_up(self):          iface, gateway = self.get_default_interface_gateway()          self.ping_gateway(self) +    def check_tunnel_default_interface(self): +        """ +        Raises an TunnelNotDefaultRouteError  +        (including when no routes are present) +        """ +        if not platform.system() == "Linux": +            raise NotImplementedError + +        f = open("/proc/net/route") +        route_table = f.readlines() +        f.close() +        #toss out header +        route_table.pop(0) + +        if not route_table: +            raise exceptions.TunnelNotDefaultRouteError() + +        line = route_table.pop(0) +        iface, destination = line.split('\t')[0:2] +        if not destination == '00000000' or not iface == 'tun0': +            raise exceptions.TunnelNotDefaultRouteError() + +      def get_default_interface_gateway(self):          """only impletemented for linux so far."""          if not platform.system() == "Linux": diff --git a/src/leap/base/exceptions.py b/src/leap/base/exceptions.py index 7771d1f9..48d827f5 100644 --- a/src/leap/base/exceptions.py +++ b/src/leap/base/exceptions.py @@ -23,3 +23,7 @@ class NoConnectionToGateway(Exception):  class NoInternetConnection(Exception):      message = "No Internet connection found" + + +class TunnelNotDefaultRouteError(Exception): +    message = "VPN Maybe be down." diff --git a/src/leap/base/tests/test_checks.py b/src/leap/base/tests/test_checks.py index a3b3ea91..30746991 100644 --- a/src/leap/base/tests/test_checks.py +++ b/src/leap/base/tests/test_checks.py @@ -31,6 +31,8 @@ class LeapNetworkCheckTest(BaseLeapTest):          self.assertTrue(hasattr(checker, "check_internet_connection"),                          "missing meth") +        self.assertTrue(hasattr(checker, "check_tunnel_default_interface"), +                        "missing meth")          self.assertTrue(hasattr(checker, "is_internet_up"),                          "missing meth")          self.assertTrue(hasattr(checker, "ping_gateway"), @@ -42,6 +44,7 @@ class LeapNetworkCheckTest(BaseLeapTest):          mc = Mock()          checker.run_all(checker=mc)          self.assertTrue(mc.check_internet_connection.called, "not called") +        self.assertTrue(mc.check_tunnel_default_interface.called, "not called")          self.assertTrue(mc.ping_gateway.called, "not called")          self.assertTrue(mc.is_internet_up.called, "not called") @@ -55,6 +58,33 @@ class LeapNetworkCheckTest(BaseLeapTest):                      "Mask\tMTU\tWindow\tIRTT")                  checker.get_default_interface_gateway() +    def test_check_tunnel_default_interface(self): +        checker = checks.LeapNetworkChecker() +        with patch('leap.base.checks.open', create=True) as mock_open: +            with self.assertRaises(exceptions.TunnelNotDefaultRouteError): +                mock_open.return_value = StringIO( +                    "Iface\tDestination Gateway\t" +                    "Flags\tRefCntd\tUse\tMetric\t" +                    "Mask\tMTU\tWindow\tIRTT") +                checker.check_tunnel_default_interface() + +        with patch('leap.base.checks.open', create=True) as mock_open: +            with self.assertRaises(exceptions.TunnelNotDefaultRouteError): +                mock_open.return_value = StringIO( +                    "Iface\tDestination Gateway\t" +                    "Flags\tRefCntd\tUse\tMetric\t" +                    "Mask\tMTU\tWindow\tIRTT\n" +                    "wlan0\t00000000\t0102A8C0\t0003\t0\t0\t0\t00000000\t0\t0\t0") +                checker.check_tunnel_default_interface() + +        with patch('leap.base.checks.open', create=True) as mock_open: +            mock_open.return_value = StringIO( +                "Iface\tDestination Gateway\t" +                "Flags\tRefCntd\tUse\tMetric\t" +                "Mask\tMTU\tWindow\tIRTT\n" +                "tun0\t00000000\t01002A0A\t0003\t0\t0\t0\t00000080\t0\t0\t0") +            checker.check_tunnel_default_interface() +      def test_ping_gateway_fail(self):          checker = checks.LeapNetworkChecker()          with patch.object(ping, "quiet_ping") as mocked_ping:  | 
