From 6d1d18faec5caa60c26b8245f0ab17c63d0b80d8 Mon Sep 17 00:00:00 2001 From: "Kali Kaneko (leap communications)" Date: Wed, 1 Feb 2017 00:00:20 +0100 Subject: [refactor] split vpn control into some modules --- src/leap/bitmask/vpn/_observer.py | 73 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 src/leap/bitmask/vpn/_observer.py (limited to 'src/leap/bitmask/vpn/_observer.py') diff --git a/src/leap/bitmask/vpn/_observer.py b/src/leap/bitmask/vpn/_observer.py new file mode 100644 index 00000000..1bb363d6 --- /dev/null +++ b/src/leap/bitmask/vpn/_observer.py @@ -0,0 +1,73 @@ +from twisted.logger import Logger + +logger = Logger() + + +class VPNObserver(object): + """ + A class containing different patterns in the openvpn output that + we can react upon. + """ + + _events = { + 'NETWORK_UNREACHABLE': ( + 'Network is unreachable (code=101)',), + 'PROCESS_RESTART_TLS': ( + "SIGTERM[soft,tls-error]",), + 'PROCESS_RESTART_PING': ( + "SIGTERM[soft,ping-restart]",), + 'INITIALIZATION_COMPLETED': ( + "Initialization Sequence Completed",), + } + + def __init__(self, signaler=None): + self._signaler = signaler + + def watch(self, line): + """ + Inspects line searching for the different patterns. If a match + is found, try to emit the corresponding signal. + + :param line: a line of openvpn output + :type line: str + """ + chained_iter = chain(*[ + zip(repeat(key, len(l)), l) + for key, l in self._events.iteritems()]) + for event, pattern in chained_iter: + if pattern in line: + logger.debug('pattern matched! %s' % pattern) + break + else: + return + + sig = self._get_signal(event) + if sig is not None: + if self._signaler is not None: + self._signaler.signal(sig) + return + else: + logger.debug('We got %s event from openvpn output but we could ' + 'not find a matching signal for it.' % event) + + def _get_signal(self, event): + """ + Tries to get the matching signal from the eip signals + objects based on the name of the passed event (in lowercase) + + :param event: the name of the event that we want to get a signal for + :type event: str + :returns: a Signaler signal or None + :rtype: str or None + """ + if self._signaler is None: + return + sig = self._signaler + signals = { + "network_unreachable": sig.eip_network_unreachable, + "process_restart_tls": sig.eip_process_restart_tls, + "process_restart_ping": sig.eip_process_restart_ping, + "initialization_completed": sig.eip_connected + } + return signals.get(event.lower()) + -- cgit v1.2.3