summaryrefslogtreecommitdiff
path: root/src/leap/util/net.py
blob: a4104d075f74728340475775d96aa8377e6e5a42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
net.py
-------
Utilities for networking.

@authors: Isis Agora Lovecruft, <isis@leap.se> 0x2cdb8b35
@license: see included LICENSE file
@copyright: 2013 Isis Agora Lovecruft
'''

import ipaddr
import sys
import socket

from random import randint

from leap.mx.utils import log


PLATFORMS = {'LINUX': sys.platform.startswith("linux"),
             'OPENBSD': sys.platform.startswith("openbsd"),
             'FREEBSD': sys.platform.startswith("freebsd"),
             'NETBSD': sys.platform.startswith("netbsd"),
             'DARWIN': sys.platform.startswith("darwin"),
             'SOLARIS': sys.platform.startswith("sunos"),
             'WINDOWS': sys.platform.startswith("win32")}


class UnsupportedPlatform(Exception):
    """Support for this platform is not currently available."""

class IfaceError(Exception):
    """Could not find default network interface."""

class PermissionsError(SystemExit):
    """This test requires admin or root privileges to run. Exiting..."""


def checkIPaddress(addr):
    """
    Check that a given string is a valid IPv4 or IPv6 address.

    @param addr: Any string defining an IP address, i.e. '1.2.3.4' or '::1'.
    @returns: True if :param:`addr` defines a valid IPAddress, else False.
    """
    import ipaddr

    try:
        check = ipaddr.IPAddress(addr)
    except ValueError, ve:
        log.warn(ve.message)
        return False
    else:
        return True

def getClientPlatform(platform_name=None):
    for name, test in PLATFORMS.items():
        if not platform_name or platform_name.upper() == name:
            if test:
                return name, test

def getPosixIfaces():
    from twisted.internet.test import _posixifaces
    log.msg("Attempting to discover network interfaces...")
    ifaces = _posixifaces._interfaces()
    return ifaces

def getWindowsIfaces():
    from twisted.internet.test import _win32ifaces
    log.msg("Attempting to discover network interfaces...")
    ifaces = _win32ifaces._interfaces()
    return ifaces

def getIfaces(platform_name=None):
    client, test = getClientPlatform(platform_name)
    if client:
        if client == ('LINUX' or 'DARWIN') or client[-3:] == 'BSD':
            return getPosixIfaces()
        elif client == 'WINDOWS':
            return getWindowsIfaces()
        ## XXX fixme figure out how to get iface for Solaris
        else:
            raise UnsupportedPlatform
    else:
        raise UnsupportedPlatform

def getRandomUnusedPort(addr=None):
    free = False
    while not free:
        port = randint(1024, 65535)
        s = socket.socket()
        try:
            s.bind((addr, port))
            free = True
        except:
            pass
        s.close()
    return port

def getNonLoopbackIfaces(platform_name=None):
    try:
        ifaces = getIfaces(platform_name)
    except UnsupportedPlatform, up:
        log.err(up)

    if not ifaces:
        log.msg("Unable to discover network interfaces...")
        return None
    else:
        found = [{i[0]: i[2]} for i in ifaces if i[0] != 'lo']
        log.debug("Found non-loopback interfaces: %s" % found)
        for iface in ifaces:
            try:
                interface = checkInterfaces(found)
            except IfaceError, ie:
                log.err(ie)
                return None
            else:
                return interfaces


def getLocalAddress():
    default_iface = getDefaultIface()
    return default_iface.ipaddr