summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/vpn/_observer.py
blob: 1bb363d6559e39b326679a7a87520c3e31be989c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from twisted.logger import Logger

logger = Logger()


class VPNObserver(object):
    """
    A class containing different patterns in the openvpn output that
    we can react upon.
    """

    _events = {
        'NETWORK_UNREACHABLE': (
            'Network is unreachable (code=101)',),
        'PROCESS_RESTART_TLS': (
            "SIGTERM[soft,tls-error]",),
        'PROCESS_RESTART_PING': (
            "SIGTERM[soft,ping-restart]",),
        'INITIALIZATION_COMPLETED': (
            "Initialization Sequence Completed",),
    }

    def __init__(self, signaler=None):
        self._signaler = signaler

    def watch(self, line):
        """
        Inspects line searching for the different patterns. If a match
        is found, try to emit the corresponding signal.

        :param line: a line of openvpn output
        :type line: str
        """
        chained_iter = chain(*[
            zip(repeat(key, len(l)), l)
            for key, l in self._events.iteritems()])
        for event, pattern in chained_iter:
            if pattern in line:
                logger.debug('pattern matched! %s' % pattern)
                break
        else:
            return

        sig = self._get_signal(event)
        if sig is not None:
            if self._signaler is not None:
                self._signaler.signal(sig)
            return
        else:
            logger.debug('We got %s event from openvpn output but we could '
                         'not find a matching signal for it.' % event)

    def _get_signal(self, event):
        """
        Tries to get the matching signal from the eip signals
        objects based on the name of the passed event (in lowercase)

        :param event: the name of the event that we want to get a signal for
        :type event: str
        :returns: a Signaler signal or None
        :rtype: str or None
        """
        if self._signaler is None:
            return
        sig = self._signaler
        signals = {
            "network_unreachable": sig.eip_network_unreachable,
            "process_restart_tls": sig.eip_process_restart_tls,
            "process_restart_ping": sig.eip_process_restart_ping,
            "initialization_completed": sig.eip_connected
        }
        return signals.get(event.lower())