From 46eff942e4e3b3c7ddbecd170dd7d5078b8debc0 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Sat, 19 Aug 2017 17:04:04 -0400 Subject: [feature] add twisted protocol for handling openvpn management --- src/leap/bitmask/vpn/_management.py | 443 +++++++++--------------------------- src/leap/bitmask/vpn/_telnet.py | 60 ----- src/leap/bitmask/vpn/management.py | 339 +++++++++++++++++++++++++++ 3 files changed, 443 insertions(+), 399 deletions(-) delete mode 100644 src/leap/bitmask/vpn/_telnet.py create mode 100644 src/leap/bitmask/vpn/management.py (limited to 'src') diff --git a/src/leap/bitmask/vpn/_management.py b/src/leap/bitmask/vpn/_management.py index fac1b099..d05790c4 100644 --- a/src/leap/bitmask/vpn/_management.py +++ b/src/leap/bitmask/vpn/_management.py @@ -15,7 +15,6 @@ except ImportError: from psutil import AccessDenied as psutil_AccessDenied PSUTIL_2 = True -from leap.bitmask.vpn._telnet import UDSTelnet class OpenVPNAlreadyRunning(Exception): @@ -32,237 +31,8 @@ class ImproperlyConfigured(Exception): pass -class VPNManagement(object): - """ - A class to handle the communication with the openvpn management - interface. - - For more info about management methods:: - - zcat `dpkg -L openvpn | grep management` - """ - log = Logger() - - # Timers, in secs - CONNECTION_RETRY_TIME = 1 - - def __init__(self): - self._tn = None - self.aborted = False - self._host = None - self._port = None - - self._watcher = None - self._logs = {} - - def set_connection(self, host, port): - """ - :param host: either socket path (unix) or socket IP - :type host: str +class Management(object): - :param port: either string "unix" if it's a unix socket, or port - otherwise - """ - self._host = host - self._port = port - - def set_watcher(self, watcher): - self._watcher = watcher - - def is_connected(self): - return bool(self._tn) - - def connect_retry(self, retry=0, max_retries=None): - """ - Attempts to connect to a management interface, and retries - after CONNECTION_RETRY_TIME if not successful. - - :param retry: number of the retry - :type retry: int - """ - if max_retries and retry > max_retries: - self.log.warn( - 'Max retries reached while attempting to connect ' - 'to management. Aborting.') - self.aborted = True - return - - if not self.aborted and not self.is_connected(): - self._connect() - reactor.callLater( - self.CONNECTION_RETRY_TIME, - self.connect_retry, retry + 1, max_retries) - - def _connect(self): - if not self._host or not self._port: - raise ImproperlyConfigured('Connection is not configured') - - try: - self._tn = UDSTelnet(self._host, self._port) - self._tn.read_eager() - - except Exception as e: - self.log.warn('Could not connect to OpenVPN yet: %r' % (e,)) - self._tn = None - - if self._tn: - return True - else: - self.log.error('Error while connecting to management!') - return False - - def process_log(self): - if not self._watcher or not self._tn: - return - - lines = self._send_command('log 20') - for line in lines: - try: - splitted = line.split(',') - ts = splitted[0] - msg = ','.join(splitted[2:]) - if msg.startswith('MANAGEMENT'): - continue - if ts not in self._logs: - self._watcher.watch(msg) - self.log.info('VPN: %s' % msg) - self._logs[ts] = msg - except Exception: - pass - - def _seek_to_eof(self): - """ - Read as much as available. Position seek pointer to end of stream - """ - try: - return self._tn.read_eager() - except EOFError: - self.log.debug('Could not read from socket. Assuming it died.') - return - - def _send_command(self, command, until=b"END"): - """ - Sends a command to the telnet connection and reads until END - is reached. - - :param command: command to send - :type command: str - - :param until: byte delimiter string for reading command output - :type until: byte str - - :return: response read - :rtype: list - """ - try: - self._tn.write("%s\n" % (command,)) - buf = self._tn.read_until(until) - seek = self._seek_to_eof() - blist = buf.split('\r\n') - if blist[-1].startswith(until): - del blist[-1] - return blist - else: - return [] - - except socket.error: - # XXX should get a counter and repeat only - # after mod X times. - self.log.warn('Socket error (command was: "%s")' % (command,)) - self._close_management_socket(announce=False) - self.log.debug('Trying to connect to management again') - self.connect_retry(max_retries=5) - return [] - - except Exception as e: - self.log.warn("Error sending command %s: %r" % - (command, e)) - return [] - - def _close_management_socket(self, announce=True): - """ - Close connection to openvpn management interface. - """ - if announce: - self._tn.write("quit\n") - self._tn.read_all() - self._tn.get_socket().close() - self._tn = None - - def _parse_state(self, output): - """ - Parses the output of the state command. - - :param output: list of lines that the state command printed as - its output - :type output: list - """ - for line in output: - status_step = '' - stripped = line.strip() - if stripped == "END": - continue - parts = stripped.split(",") - if len(parts) < 5: - continue - try: - ts, status_step, ok, ip, remote, port, _, _, _ = parts - except ValueError: - try: - ts, status_step, ok, ip, remote, port, _, _ = parts - except ValueError: - self.log.debug('Could not parse state line: %s' % line) - - return status_step - - def _parse_status(self, output): - """ - Parses the output of the status command. - - :param output: list of lines that the status command printed - as its output - :type output: list - """ - tun_tap_read = "" - tun_tap_write = "" - - for line in output: - stripped = line.strip() - if stripped.endswith("STATISTICS") or stripped == "END": - continue - parts = stripped.split(",") - if len(parts) < 2: - continue - - try: - text, value = parts - except ValueError: - self.log.debug('Could not parse status line %s' % line) - return - # text can be: - # "TUN/TAP read bytes" - # "TUN/TAP write bytes" - # "TCP/UDP read bytes" - # "TCP/UDP write bytes" - # "Auth read bytes" - - if text == "TUN/TAP read bytes": - tun_tap_read = value # download - elif text == "TUN/TAP write bytes": - tun_tap_write = value # upload - - return (tun_tap_read, tun_tap_write) - - def get_state(self): - if not self.is_connected(): - return "" - state = self._parse_state(self._send_command("state")) - return state - - def get_traffic_status(self): - if not self.is_connected(): - return (None, None) - return self._parse_status(self._send_command("status")) def terminate(self, shutdown=False): """ @@ -271,125 +41,120 @@ class VPNManagement(object): if self.is_connected(): self._send_command("signal SIGTERM") if shutdown: - self._cleanup_tempfiles() + _cleanup_tempfiles() - def _cleanup_tempfiles(self): - """ - Remove all temporal files we might have left behind. - Iif self.port is 'unix', we have created a temporal socket path that, - under normal circumstances, we should be able to delete. - """ - if self._socket_port == "unix": - tempfolder = _first(os.path.split(self._host)) - if tempfolder and os.path.isdir(tempfolder): - try: - shutil.rmtree(tempfolder) - except OSError: - self.log.error( - 'Could not delete tmpfolder %s' % tempfolder) - - def get_openvpn_process(self): - """ - Looks for openvpn instances running. +# TODO -- finish porting ---------------------------------------------------- - :rtype: process - """ - openvpn_process = None - for p in psutil.process_iter(): +def _cleanup_tempfiles(self): + """ + Remove all temporal files we might have left behind. + + Iif self.port is 'unix', we have created a temporal socket path that, + under normal circumstances, we should be able to delete. + """ + if self._socket_port == "unix": + tempfolder = _first(os.path.split(self._host)) + if tempfolder and os.path.isdir(tempfolder): try: - # XXX Not exact! - # Will give false positives. - # we should check that cmdline BEGINS - # with openvpn or with our wrapper - # (pkexec / osascript / whatever) - - # This needs more work, see #3268, but for the moment - # we need to be able to filter out arguments in the form - # --openvpn-foo, since otherwise we are shooting ourselves - # in the feet. - - if PSUTIL_2: - cmdline = p.cmdline() - else: - cmdline = p.cmdline - if any(map(lambda s: s.find( - "LEAPOPENVPN") != -1, cmdline)): - openvpn_process = p - break - except psutil_AccessDenied: - pass - return openvpn_process - - def stop_if_already_running(self): - """ - Checks if VPN is already running and tries to stop it. + shutil.rmtree(tempfolder) + except OSError: + self.log.error( + 'Could not delete tmpfolder %s' % tempfolder) - Might raise OpenVPNAlreadyRunning. +def _get_openvpn_process(): + """ + Looks for openvpn instances running. - :return: True if stopped, False otherwise + :rtype: process + """ + openvpn_process = None + for p in psutil.process_iter(): + try: + # XXX Not exact! + # Will give false positives. + # we should check that cmdline BEGINS + # with openvpn or with our wrapper + # (pkexec / osascript / whatever) + + # This needs more work, see #3268, but for the moment + # we need to be able to filter out arguments in the form + # --openvpn-foo, since otherwise we are shooting ourselves + # in the feet. + + if PSUTIL_2: + cmdline = p.cmdline() + else: + cmdline = p.cmdline + if any(map(lambda s: s.find( + "LEAPOPENVPN") != -1, cmdline)): + openvpn_process = p + break + except psutil_AccessDenied: + pass + return openvpn_process + +def _stop_if_already_running(): + """ + Checks if VPN is already running and tries to stop it. - """ - process = self.get_openvpn_process() - if not process: - self.log.debug('Could not find openvpn process while ' - 'trying to stop it.') - return + Might raise OpenVPNAlreadyRunning. - self.log.debug('OpenVPN is already running, trying to stop it...') - cmdline = process.cmdline + :return: True if stopped, False otherwise - manag_flag = "--management" + """ + process = _get_openvpn_process() + if not process: + self.log.debug('Could not find openvpn process while ' + 'trying to stop it.') + return - if isinstance(cmdline, list) and manag_flag in cmdline: + log.debug('OpenVPN is already running, trying to stop it...') + cmdline = process.cmdline - # we know that our invocation has this distinctive fragment, so - # we use this fingerprint to tell other invocations apart. - # this might break if we change the configuration path in the - # launchers + manag_flag = "--management" - def smellslikeleap(s): - return "leap" in s and "providers" in s + if isinstance(cmdline, list) and manag_flag in cmdline: - if not any(map(smellslikeleap, cmdline)): - self.log.debug("We cannot stop this instance since we do not " - "recognise it as a leap invocation.") - raise AlienOpenVPNAlreadyRunning + # we know that our invocation has this distinctive fragment, so + # we use this fingerprint to tell other invocations apart. + # this might break if we change the configuration path in the + # launchers - try: - index = cmdline.index(manag_flag) - host = cmdline[index + 1] - port = cmdline[index + 2] - self.log.debug("Trying to connect to %s:%s" - % (host, port)) - self._connect() - - # XXX this has a problem with connections to different - # remotes. So the reconnection will only work when we are - # terminating instances left running for the same provider. - # If we are killing an openvpn instance configured for another - # provider, we will get: - # TLS Error: local/remote TLS keys are out of sync - # However, that should be a rare case right now. - self._send_command("signal SIGTERM") - self._close_management_socket(announce=True) - except (Exception, AssertionError): - self.log.failure('Problem trying to terminate OpenVPN') - else: - self.log.debug('Could not find the expected openvpn command line.') - - process = self.get_openvpn_process() - if process is None: - self.log.debug('Successfully finished already running ' - 'openvpn process.') - return True - else: - self.log.warn('Unable to terminate OpenVPN') - raise OpenVPNAlreadyRunning - - -def _first(things): - try: - return things[0] - except (IndexError, TypeError): - return None + def smellslikeleap(s): + return "leap" in s and "providers" in s + + if not any(map(smellslikeleap, cmdline)): + self.log.debug("We cannot stop this instance since we do not " + "recognise it as a leap invocation.") + raise AlienOpenVPNAlreadyRunning + + try: + index = cmdline.index(manag_flag) + host = cmdline[index + 1] + port = cmdline[index + 2] + self.log.debug("Trying to connect to %s:%s" + % (host, port)) + _connect() + + # XXX this has a problem with connections to different + # remotes. So the reconnection will only work when we are + # terminating instances left running for the same provider. + # If we are killing an openvpn instance configured for another + # provider, we will get: + # TLS Error: local/remote TLS keys are out of sync + # However, that should be a rare case right now. + self._send_command("signal SIGTERM") + except (Exception, AssertionError): + log.failure('Problem trying to terminate OpenVPN') + else: + log.debug('Could not find the expected openvpn command line.') + + process = _get_openvpn_process() + if process is None: + self.log.debug('Successfully finished already running ' + 'openvpn process.') + return True + else: + self.log.warn('Unable to terminate OpenVPN') + raise OpenVPNAlreadyRunning diff --git a/src/leap/bitmask/vpn/_telnet.py b/src/leap/bitmask/vpn/_telnet.py deleted file mode 100644 index cfc82ef0..00000000 --- a/src/leap/bitmask/vpn/_telnet.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- -# _telnet.py -# Copyright (C) 2013-2017 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import socket -import telnetlib - - -class ConnectionRefusedError(Exception): - pass - - -class MissingSocketError(Exception): - pass - - -class UDSTelnet(telnetlib.Telnet): - """ - A telnet-alike class, that can listen on unix domain sockets - """ - - def open(self, host, port=23, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): - """ - Connect to a host. If port is 'unix', it will open a - connection over unix docmain sockets. - - The optional second argument is the port number, which - defaults to the standard telnet port (23). - Don't try to reopen an already connected instance. - """ - self.eof = 0 - self.host = host - self.port = port - self.timeout = timeout - - if self.port == "unix": - # unix sockets spoken - if not os.path.exists(self.host): - raise MissingSocketError() - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - self.sock.connect(self.host) - except socket.error: - raise ConnectionRefusedError() - else: - self.sock = socket.create_connection((host, port), timeout) diff --git a/src/leap/bitmask/vpn/management.py b/src/leap/bitmask/vpn/management.py new file mode 100644 index 00000000..b9bda6c9 --- /dev/null +++ b/src/leap/bitmask/vpn/management.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8 -*- +# management.py +# Copyright (c) 2012 Mike Mattice +# Copyright (C) 2017 LEAP Encryption Access Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Handles an OpenVPN process through its Management Interface. +""" + +import time +from collections import OrderedDict + +from twisted.internet import defer +from twisted.protocols.basic import LineReceiver +from twisted.internet.defer import Deferred +from twisted.python import log + +from zope.interface import Interface + +from _human import bytes2human + + +class IStateListener(Interface): + + def change_state(self, state): + pass + + +class ManagementProtocol(LineReceiver): + + def __init__(self, verbose=False): + + self.verbose = verbose + self.state = None + self.remote = None + self.rport = None + self.traffic = TrafficCounter() + self.openvpn_version = '' + self.pid = None + + self._defs = [] + self._statelog = OrderedDict() + self._linebuf = [] + self._state_listeners = set([]) + + def addStateListener(self, listener): + """ + A Listener must implement change_state method, + and it will be called with a State object. + """ + self._state_listeners.add(listener) + + def getStateHistory(self): + return self._statelog + + def lineReceived(self, line): + if self.verbose: + print line + + if line[0] == '>': + try: + infotype, data = line[1:].split(':', 1) + infotype = infotype.replace('-', '_') + except Exception, msg: + print "failed to parse '%r': %s" % (line, msg) + raise + m = getattr(self, '_handle_%s' % infotype, None) + if m: + try: + m(data) + except Exception, msg: + print "Failure in _handle_%s: %s" % (infotype, msg) + else: + self._handle_unknown(infotype, data) + else: + if line.strip() == 'END': + try: + d = self._defs.pop(0) + d.callback('\n'.join(self._linebuf)) + except IndexError: + pass + self._linebuf = [] + return + try: + status, data = line.split(': ', 1) + except ValueError: + print "ERROR PARSING:", line + return + if status in ('ERROR', 'SUCCESS'): + try: + d = self._defs.pop(0) + if status == 'SUCCESS': + d.callback(line) + else: + d.errback(line) + except: + pass + else: + self._linebuf.append(line) + + def _handle_unknown(self, infotype, data): + log.msg('Received unhandled infotype %s with data %s' % + (infotype, data)) + + def _handle_BYTECOUNT(self, data): + down, up = data.split(',') + self.traffic.update(down, up, time.time()) + + def _handle_ECHO(self, data): + pass + + def _handle_FATAL(self, data): + pass + + def _handle_HOLD(self, data): + pass + + def _handle_INFO(self, data): + pass + + def _handle_LOG(self, data): + pass + + def _handle_NEED_OK(self, data): + pass + + def _handle_NEED_STR(self, data): + pass + + def _handle_STATE(self, data): + data = data.strip().split(',') + remote = rport = None + try: + if len(data) == 9: + (ts, state, verbose, localtun, + remote, rport, laddr, lport, ip6) = data + elif len(data) == 8: + ts, state = data[:2] + except Exception as exc: + print "ERROR", exc + log.error('Failure parsing data: %s' % exc) + + if state != self.state: + now = time.time() + stateobj = State(state, ts) + self._statelog[now] = stateobj + for listener in self._state_listeners: + listener.change_state(stateobj) + self.state = stateobj + self.remote = remote + self.rport = rport + + def _pushdef(self): + d = Deferred() + self._defs.append(d) + return d + + def byteCount(self, interval=0): + d = self._pushdef() + self.sendLine('bytecount %d' % (interval,)) + return d + + def signal(self, signal='SIGTERM'): + d = self._pushdef() + self.sendLine('signal %s' % (signal,)) + return d + + def _parseHoldstatus(self, result): + return result.split('=')[0] == '1' + + def hold(self, p=''): + d = self._pushdef() + self.sendLine('hold %s' % (p,)) + if p == '': + d.addCallback(self._parseHoldstatus) + return d + + def _parsePid(self, result): + self.pid = int(result.split('=')[1]) + + def get_pid(self): + d = self._pushdef() + self.sendLine('pid') + d.addCallback(self._parsePid) + return d + + def logOn(self): + d = self._pushdef() + self.sendLine('log on') + return d + + def stateOn(self): + d = self._pushdef() + self.sendLine('state on') + return d + + def _parseVersion(self, data): + version = data.split('\n')[0].split(':')[1] + self.openvpn_version = version.strip() + + def getVersion(self): + d = self._pushdef() + self.sendLine('version') + d.addCallback(self._parseVersion) + return d + + def getInfo(self): + state = self._statelog.values()[-1] + return { + 'remote': self.remote, + 'rport': self.rport, + 'state': state.state, + 'state_simple': state.simple, + 'state_legend': state.legend, + 'openvpn_version': self.openvpn_version, + 'pid': self.pid, + 'traffic_down_total': self.traffic.down, + 'traffic_up_total': self.traffic.up} + + +class State(object): + + """ + Possible States in an OpenVPN connection, according to the + OpenVPN Management documentation. + """ + + CONNECTING = 'CONNECTING' + WAIT = 'WAIT' + AUTH = 'AUTH' + GET_CONFIG = 'GET_CONFIG' + ASSIGN_IP = 'ASSIGN_IP' + ADD_ROUTES = 'ADD_ROUTES' + CONNECTED = 'CONNECTED' + RECONNECTING = 'RECONNECTING' + EXITING = 'EXITING' + + OFF = 'OFF' + ON = 'ON' + STARTING = 'STARTING' + STOPPING = 'STOPPING' + FAILED = 'FAILED' + + _legend = { + 'CONNECTING': 'Connecting to remote server', + 'WAIT': 'Waiting from initial response from server', + 'AUTH': 'Authenticating with server', + 'GET_CONFIG': 'Downloading configuration options from server', + 'ASSIGN_IP': 'Assigning IP address to virtual network interface', + 'ADD_ROUTES': 'Adding routes to system', + 'CONNECTED': 'Initialization Sequence Completed', + 'RECONNECTING': 'A restart has occurred', + 'EXITING': 'A graceful exit is in progress' + } + + _simple = { + 'CONNECTING': STARTING, + 'WAIT': STARTING, + 'AUTH': STARTING, + 'GET_CONFIG': STARTING, + 'ASSIGN_IP': STARTING, + 'ADD_ROUTES': STARTING, + 'CONNECTED': ON, + 'RECONNECTING': STARTING, + 'EXITING': STOPPING + } + + def __init__(self, state, timestamp): + self.state = state + self.timestamp = timestamp + + @classmethod + def get_legend(cls, state): + return cls._legend.get(state) + + @classmethod + def get_simple(cls, state): + return cls._simple.get(state) + + @property + def simple(self): + return self.get_simple(self.state) + + @property + def legend(self): + return self.get_legend(self.state) + + def __repr__(self): + return '' % ( + self.state, time.ctime(int(self.timestamp))) + + +class TrafficCounter(object): + + CAPACITY = 60 + + def __init__(self): + self.down = None + self.up = None + self._buf = OrderedDict() + + def update(self, down, up, ts): + i_down = int(down) + i_up = int(up) + self.down = i_down + self.up = i_up + if len(self._buf) > self.CAPACITY: + self._buf.pop(self._buf.keys()[0]) + self._buf[ts] = i_down, i_up + + def get_rate(self, human=True): + points = self._buf.items() + if len(points) < 2: + return ['NA', 'NA'] + ts1, prev = points[-2] + ts2, last = points[-1] + rate_down = _get_rate(last[0], prev[0], ts2, ts1) + rate_up = _get_rate(last[1], prev[1], ts2, ts1) + rates = rate_down, rate_up + if human: + rates = map(bytes2human, rates) + return rates + + +def _get_rate(p2, p1, ts2, ts1): + return ((1.0 * (p2 - p1)) / (ts2 - ts1)) -- cgit v1.2.3