summaryrefslogtreecommitdiff
path: root/src/leap/base
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/base')
-rw-r--r--src/leap/base/checks.py53
-rw-r--r--src/leap/base/config.py27
-rw-r--r--src/leap/base/tests/test_checks.py89
3 files changed, 86 insertions, 83 deletions
diff --git a/src/leap/base/checks.py b/src/leap/base/checks.py
index 0ebf4f2f..0bf44f59 100644
--- a/src/leap/base/checks.py
+++ b/src/leap/base/checks.py
@@ -5,8 +5,6 @@ import re
import socket
import netifaces
-import ping
-import requests
import sh
from leap.base import constants
@@ -45,26 +43,25 @@ class LeapNetworkChecker(object):
checker.parse_log_and_react([], ())
def check_internet_connection(self):
- try:
- # XXX remove this hardcoded random ip
- # ping leap.se or eip provider instead...?
- # XXX could use icmp instead..
- requests.get('http://216.172.161.165')
- except requests.ConnectionError as e:
- error = "Unidentified Connection Error"
- if e.message == "[Errno 113] No route to host":
+ if _platform == "Linux":
+ try:
+ output = sh.ping("-c", "5", "-w", "5", ICMP_TARGET)
+ # XXX should redirect this to netcheck logger.
+ # and don't clutter main log.
+ logger.debug('Network appears to be up.')
+ except sh.ErrorReturnCode_1 as e:
+ packet_loss = re.findall("\d+% packet loss", e.message)[0]
+ logger.debug("Unidentified Connection Error: " + packet_loss)
if not self.is_internet_up():
error = "No valid internet connection found."
else:
error = "Provider server appears to be down."
- logger.error(error)
- raise exceptions.NoInternetConnection(error)
- except (requests.HTTPError, requests.RequestException) as e:
- raise exceptions.NoInternetConnection(e.message)
- # XXX should redirect this to netcheck logger.
- # and don't clutter main log.
- logger.debug('Network appears to be up.')
+ logger.error(error)
+ raise exceptions.NoInternetConnection(error)
+
+ else:
+ raise NotImplementedError
def is_internet_up(self):
iface, gateway = self.get_default_interface_gateway()
@@ -82,7 +79,7 @@ class LeapNetworkChecker(object):
#toss out header
route_table.pop(0)
if not route_table:
- raise exceptions.TunnelNotDefaultRouteError()
+ raise exceptions.NoDefaultInterfaceFoundError
return route_table
def _get_def_iface_osx(self):
@@ -158,7 +155,7 @@ class LeapNetworkChecker(object):
if _platform == "Linux":
default_iface, gw = self._get_def_iface_linux()
elif _platform == "Darwin":
- default_iface, gw = self.get_def_iface_osx()
+ default_iface, gw = self._get_def_iface_osx()
else:
raise NotImplementedError
@@ -167,7 +164,7 @@ class LeapNetworkChecker(object):
if default_iface not in netifaces.interfaces():
raise exceptions.InterfaceNotFoundError
- logger.debug('-- default iface', default_iface)
+ logger.debug('-- default iface %s', default_iface)
return default_iface, gw
def ping_gateway(self, gateway):
@@ -178,13 +175,15 @@ class LeapNetworkChecker(object):
# -- is it a domain?
# -- can we resolve? -- raise NoDNSError if not.
- # XXX -- needs review!
- # We cannout use this ping implementation; it needs root.
- # We need to look for another, poors-man implementation
- # or wrap around system traceroute (using sh module, fi)
- # -- kali
- packet_loss = ping.quiet_ping(gateway)[0]
- logger.debug('packet loss %s' % packet_loss)
+ # XXX -- sh.ping implemtation needs review!
+ try:
+ output = sh.ping("-c", "10", gateway).stdout
+ except sh.ErrorReturnCode_1 as e:
+ output = e.message
+ finally:
+ packet_loss = int(re.findall("(\d+)% packet loss", output)[0])
+
+ logger.debug('packet loss %s%%' % packet_loss)
if packet_loss > constants.MAX_ICMP_PACKET_LOSS:
raise exceptions.NoConnectionToGateway
diff --git a/src/leap/base/config.py b/src/leap/base/config.py
index e235e5c3..d796bcf1 100644
--- a/src/leap/base/config.py
+++ b/src/leap/base/config.py
@@ -4,6 +4,7 @@ Configuration Base Class
import grp
import json
import logging
+import re
import socket
import time
import os
@@ -11,6 +12,7 @@ import os
logger = logging.getLogger(name=__name__)
from dateutil import parser as dateparser
+from xdg import BaseDirectory
import requests
from leap.base import exceptions
@@ -279,15 +281,16 @@ def get_config_dir():
@rparam: config path
@rtype: string
"""
- # TODO
- # check for $XDG_CONFIG_HOME var?
- # get a more sensible path for win/mac
- # kclair: opinion? ^^
-
- return os.path.expanduser(
- os.path.join('~',
- '.config',
- 'leap'))
+ home = os.path.expanduser("~")
+ if re.findall("leap_tests-[_a-zA-Z0-9]{6}", home):
+ # we're inside a test! :)
+ return os.path.join(home, ".config/leap")
+ else:
+ # XXX dirspec is cross-platform,
+ # we should borrow some of those
+ # routines for osx/win and wrap this call.
+ return os.path.join(BaseDirectory.xdg_config_home,
+ 'leap')
def get_config_file(filename, folder=None):
@@ -333,7 +336,11 @@ def validate_ip(ip_str):
def get_username():
- return os.getlogin()
+ try:
+ return os.getlogin()
+ except OSError as e:
+ import pwd
+ return pwd.getpwuid(os.getuid())[0]
def get_groupname():
diff --git a/src/leap/base/tests/test_checks.py b/src/leap/base/tests/test_checks.py
index 51586f02..8126755b 100644
--- a/src/leap/base/tests/test_checks.py
+++ b/src/leap/base/tests/test_checks.py
@@ -3,13 +3,11 @@ try:
except ImportError:
import unittest
import os
+import sh
from mock import (patch, Mock)
from StringIO import StringIO
-import ping
-import requests
-
from leap.base import checks
from leap.base import exceptions
from leap.testing.basetest import BaseLeapTest
@@ -21,6 +19,7 @@ class LeapNetworkCheckTest(BaseLeapTest):
__name__ = "leap_network_check_tests"
def setUp(self):
+ os.environ['PATH'] += ':/bin'
pass
def tearDown(self):
@@ -62,9 +61,7 @@ class LeapNetworkCheckTest(BaseLeapTest):
def test_get_default_interface_no_interface(self):
checker = checks.LeapNetworkChecker()
with patch('leap.base.checks.open', create=True) as mock_open:
- # aa is working on this and probably will merge this
- # correctly. By now just writing something so test pass
- with self.assertRaises(exceptions.TunnelNotDefaultRouteError):
+ with self.assertRaises(exceptions.NoDefaultInterfaceFoundError):
mock_open.return_value = StringIO(
"Iface\tDestination Gateway\t"
"Flags\tRefCntd\tUse\tMetric\t"
@@ -78,14 +75,6 @@ class LeapNetworkCheckTest(BaseLeapTest):
mock_open.return_value = StringIO(
"Iface\tDestination Gateway\t"
"Flags\tRefCntd\tUse\tMetric\t"
- "Mask\tMTU\tWindow\tIRTT")
- checker.check_tunnel_default_interface()
-
- with patch('leap.base.checks.open', create=True) as mock_open:
- with self.assertRaises(exceptions.TunnelNotDefaultRouteError):
- mock_open.return_value = StringIO(
- "Iface\tDestination Gateway\t"
- "Flags\tRefCntd\tUse\tMetric\t"
"Mask\tMTU\tWindow\tIRTT\n"
"wlan0\t00000000\t0102A8C0\t"
"0003\t0\t0\t0\t00000000\t0\t0\t0")
@@ -101,43 +90,49 @@ class LeapNetworkCheckTest(BaseLeapTest):
def test_ping_gateway_fail(self):
checker = checks.LeapNetworkChecker()
- with patch.object(ping, "quiet_ping") as mocked_ping:
+ with patch.object(sh, "ping") as mocked_ping:
with self.assertRaises(exceptions.NoConnectionToGateway):
- mocked_ping.return_value = [11, "", ""]
+ mocked_ping.return_value = Mock
+ mocked_ping.return_value.stdout = "11% packet loss"
checker.ping_gateway("4.2.2.2")
- def test_check_internet_connection_failures(self):
+ def test_ping_gateway(self):
checker = checks.LeapNetworkChecker()
- with patch.object(requests, "get") as mocked_get:
- mocked_get.side_effect = requests.HTTPError
- with self.assertRaises(exceptions.NoInternetConnection):
- checker.check_internet_connection()
-
- with patch.object(requests, "get") as mocked_get:
- mocked_get.side_effect = requests.RequestException
- with self.assertRaises(exceptions.NoInternetConnection):
- checker.check_internet_connection()
-
- #TODO: Mock possible errors that can be raised by is_internet_up
- with patch.object(requests, "get") as mocked_get:
- mocked_get.side_effect = requests.ConnectionError
- with self.assertRaises(exceptions.NoInternetConnection):
- checker.check_internet_connection()
+ with patch.object(sh, "ping") as mocked_ping:
+ mocked_ping.return_value = Mock
+ mocked_ping.return_value.stdout = """
+PING 4.2.2.2 (4.2.2.2) 56(84) bytes of data.
+64 bytes from 4.2.2.2: icmp_req=1 ttl=54 time=33.8 ms
+64 bytes from 4.2.2.2: icmp_req=2 ttl=54 time=30.6 ms
+64 bytes from 4.2.2.2: icmp_req=3 ttl=54 time=31.4 ms
+64 bytes from 4.2.2.2: icmp_req=4 ttl=54 time=36.1 ms
+64 bytes from 4.2.2.2: icmp_req=5 ttl=54 time=30.8 ms
+64 bytes from 4.2.2.2: icmp_req=6 ttl=54 time=30.4 ms
+64 bytes from 4.2.2.2: icmp_req=7 ttl=54 time=30.7 ms
+64 bytes from 4.2.2.2: icmp_req=8 ttl=54 time=32.7 ms
+64 bytes from 4.2.2.2: icmp_req=9 ttl=54 time=31.4 ms
+64 bytes from 4.2.2.2: icmp_req=10 ttl=54 time=33.3 ms
+
+--- 4.2.2.2 ping statistics ---
+10 packets transmitted, 10 received, 0% packet loss, time 9016ms
+rtt min/avg/max/mdev = 30.497/32.172/36.161/1.755 ms"""
+ checker.ping_gateway("4.2.2.2")
- with patch.object(requests, "get") as mocked_get:
- mocked_get.side_effect = requests.ConnectionError(
- "[Errno 113] No route to host")
+ def test_check_internet_connection_failures(self):
+ checker = checks.LeapNetworkChecker()
+ TimeoutError = get_ping_timeout_error()
+ with patch.object(sh, "ping") as mocked_ping:
+ mocked_ping.side_effect = TimeoutError
with self.assertRaises(exceptions.NoInternetConnection):
- with patch.object(checker, "ping_gateway") as mock_ping:
- mock_ping.return_value = True
+ with patch.object(checker, "ping_gateway") as mock_gateway:
+ mock_gateway.side_effect = exceptions.NoConnectionToGateway
checker.check_internet_connection()
- with patch.object(requests, "get") as mocked_get:
- mocked_get.side_effect = requests.ConnectionError(
- "[Errno 113] No route to host")
+ with patch.object(sh, "ping") as mocked_ping:
+ mocked_ping.side_effect = TimeoutError
with self.assertRaises(exceptions.NoInternetConnection):
- with patch.object(checker, "ping_gateway") as mock_ping:
- mock_ping.side_effect = exceptions.NoConnectionToGateway
+ with patch.object(checker, "ping_gateway") as mock_gateway:
+ mock_gateway.return_value = True
checker.check_internet_connection()
def test_parse_log_and_react(self):
@@ -174,7 +169,9 @@ class LeapNetworkCheckTest(BaseLeapTest):
checker.parse_log_and_react([], err_matrix)
self.assertFalse(to_call.called)
- @unittest.skipUnless(_uid == 0, "root only")
- def test_ping_gateway(self):
- checker = checks.LeapNetworkChecker()
- checker.ping_gateway("4.2.2.2")
+
+def get_ping_timeout_error():
+ try:
+ sh.ping("-c", "1", "-w", "1", "8.8.7.7")
+ except Exception as e:
+ return e