summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2017-08-19 17:04:04 -0400
committerKali Kaneko <kali@leap.se>2017-08-30 16:17:55 -0400
commit46eff942e4e3b3c7ddbecd170dd7d5078b8debc0 (patch)
tree4fd1f606d4ae7ffe87303b6372df5e43467957cb /src
parentf23f2fd7dc530e4b4502c2cf2e771f91644b35ef (diff)
[feature] add twisted protocol for handling openvpn management
Diffstat (limited to 'src')
-rw-r--r--src/leap/bitmask/vpn/_management.py443
-rw-r--r--src/leap/bitmask/vpn/_telnet.py60
-rw-r--r--src/leap/bitmask/vpn/management.py339
3 files changed, 443 insertions, 399 deletions
diff --git a/src/leap/bitmask/vpn/_management.py b/src/leap/bitmask/vpn/_management.py
index fac1b09..d05790c 100644
--- a/src/leap/bitmask/vpn/_management.py
+++ b/src/leap/bitmask/vpn/_management.py
@@ -15,7 +15,6 @@ except ImportError:
from psutil import AccessDenied as psutil_AccessDenied
PSUTIL_2 = True
-from leap.bitmask.vpn._telnet import UDSTelnet
class OpenVPNAlreadyRunning(Exception):
@@ -32,237 +31,8 @@ class ImproperlyConfigured(Exception):
pass
-class VPNManagement(object):
- """
- A class to handle the communication with the openvpn management
- interface.
-
- For more info about management methods::
-
- zcat `dpkg -L openvpn | grep management`
- """
- log = Logger()
-
- # Timers, in secs
- CONNECTION_RETRY_TIME = 1
-
- def __init__(self):
- self._tn = None
- self.aborted = False
- self._host = None
- self._port = None
-
- self._watcher = None
- self._logs = {}
-
- def set_connection(self, host, port):
- """
- :param host: either socket path (unix) or socket IP
- :type host: str
+class Management(object):
- :param port: either string "unix" if it's a unix socket, or port
- otherwise
- """
- self._host = host
- self._port = port
-
- def set_watcher(self, watcher):
- self._watcher = watcher
-
- def is_connected(self):
- return bool(self._tn)
-
- def connect_retry(self, retry=0, max_retries=None):
- """
- Attempts to connect to a management interface, and retries
- after CONNECTION_RETRY_TIME if not successful.
-
- :param retry: number of the retry
- :type retry: int
- """
- if max_retries and retry > max_retries:
- self.log.warn(
- 'Max retries reached while attempting to connect '
- 'to management. Aborting.')
- self.aborted = True
- return
-
- if not self.aborted and not self.is_connected():
- self._connect()
- reactor.callLater(
- self.CONNECTION_RETRY_TIME,
- self.connect_retry, retry + 1, max_retries)
-
- def _connect(self):
- if not self._host or not self._port:
- raise ImproperlyConfigured('Connection is not configured')
-
- try:
- self._tn = UDSTelnet(self._host, self._port)
- self._tn.read_eager()
-
- except Exception as e:
- self.log.warn('Could not connect to OpenVPN yet: %r' % (e,))
- self._tn = None
-
- if self._tn:
- return True
- else:
- self.log.error('Error while connecting to management!')
- return False
-
- def process_log(self):
- if not self._watcher or not self._tn:
- return
-
- lines = self._send_command('log 20')
- for line in lines:
- try:
- splitted = line.split(',')
- ts = splitted[0]
- msg = ','.join(splitted[2:])
- if msg.startswith('MANAGEMENT'):
- continue
- if ts not in self._logs:
- self._watcher.watch(msg)
- self.log.info('VPN: %s' % msg)
- self._logs[ts] = msg
- except Exception:
- pass
-
- def _seek_to_eof(self):
- """
- Read as much as available. Position seek pointer to end of stream
- """
- try:
- return self._tn.read_eager()
- except EOFError:
- self.log.debug('Could not read from socket. Assuming it died.')
- return
-
- def _send_command(self, command, until=b"END"):
- """
- Sends a command to the telnet connection and reads until END
- is reached.
-
- :param command: command to send
- :type command: str
-
- :param until: byte delimiter string for reading command output
- :type until: byte str
-
- :return: response read
- :rtype: list
- """
- try:
- self._tn.write("%s\n" % (command,))
- buf = self._tn.read_until(until)
- seek = self._seek_to_eof()
- blist = buf.split('\r\n')
- if blist[-1].startswith(until):
- del blist[-1]
- return blist
- else:
- return []
-
- except socket.error:
- # XXX should get a counter and repeat only
- # after mod X times.
- self.log.warn('Socket error (command was: "%s")' % (command,))
- self._close_management_socket(announce=False)
- self.log.debug('Trying to connect to management again')
- self.connect_retry(max_retries=5)
- return []
-
- except Exception as e:
- self.log.warn("Error sending command %s: %r" %
- (command, e))
- return []
-
- def _close_management_socket(self, announce=True):
- """
- Close connection to openvpn management interface.
- """
- if announce:
- self._tn.write("quit\n")
- self._tn.read_all()
- self._tn.get_socket().close()
- self._tn = None
-
- def _parse_state(self, output):
- """
- Parses the output of the state command.
-
- :param output: list of lines that the state command printed as
- its output
- :type output: list
- """
- for line in output:
- status_step = ''
- stripped = line.strip()
- if stripped == "END":
- continue
- parts = stripped.split(",")
- if len(parts) < 5:
- continue
- try:
- ts, status_step, ok, ip, remote, port, _, _, _ = parts
- except ValueError:
- try:
- ts, status_step, ok, ip, remote, port, _, _ = parts
- except ValueError:
- self.log.debug('Could not parse state line: %s' % line)
-
- return status_step
-
- def _parse_status(self, output):
- """
- Parses the output of the status command.
-
- :param output: list of lines that the status command printed
- as its output
- :type output: list
- """
- tun_tap_read = ""
- tun_tap_write = ""
-
- for line in output:
- stripped = line.strip()
- if stripped.endswith("STATISTICS") or stripped == "END":
- continue
- parts = stripped.split(",")
- if len(parts) < 2:
- continue
-
- try:
- text, value = parts
- except ValueError:
- self.log.debug('Could not parse status line %s' % line)
- return
- # text can be:
- # "TUN/TAP read bytes"
- # "TUN/TAP write bytes"
- # "TCP/UDP read bytes"
- # "TCP/UDP write bytes"
- # "Auth read bytes"
-
- if text == "TUN/TAP read bytes":
- tun_tap_read = value # download
- elif text == "TUN/TAP write bytes":
- tun_tap_write = value # upload
-
- return (tun_tap_read, tun_tap_write)
-
- def get_state(self):
- if not self.is_connected():
- return ""
- state = self._parse_state(self._send_command("state"))
- return state
-
- def get_traffic_status(self):
- if not self.is_connected():
- return (None, None)
- return self._parse_status(self._send_command("status"))
def terminate(self, shutdown=False):
"""
@@ -271,125 +41,120 @@ class VPNManagement(object):
if self.is_connected():
self._send_command("signal SIGTERM")
if shutdown:
- self._cleanup_tempfiles()
+ _cleanup_tempfiles()
- def _cleanup_tempfiles(self):
- """
- Remove all temporal files we might have left behind.
- Iif self.port is 'unix', we have created a temporal socket path that,
- under normal circumstances, we should be able to delete.
- """
- if self._socket_port == "unix":
- tempfolder = _first(os.path.split(self._host))
- if tempfolder and os.path.isdir(tempfolder):
- try:
- shutil.rmtree(tempfolder)
- except OSError:
- self.log.error(
- 'Could not delete tmpfolder %s' % tempfolder)
-
- def get_openvpn_process(self):
- """
- Looks for openvpn instances running.
+# TODO -- finish porting ----------------------------------------------------
- :rtype: process
- """
- openvpn_process = None
- for p in psutil.process_iter():
+def _cleanup_tempfiles(self):
+ """
+ Remove all temporal files we might have left behind.
+
+ Iif self.port is 'unix', we have created a temporal socket path that,
+ under normal circumstances, we should be able to delete.
+ """
+ if self._socket_port == "unix":
+ tempfolder = _first(os.path.split(self._host))
+ if tempfolder and os.path.isdir(tempfolder):
try:
- # XXX Not exact!
- # Will give false positives.
- # we should check that cmdline BEGINS
- # with openvpn or with our wrapper
- # (pkexec / osascript / whatever)
-
- # This needs more work, see #3268, but for the moment
- # we need to be able to filter out arguments in the form
- # --openvpn-foo, since otherwise we are shooting ourselves
- # in the feet.
-
- if PSUTIL_2:
- cmdline = p.cmdline()
- else:
- cmdline = p.cmdline
- if any(map(lambda s: s.find(
- "LEAPOPENVPN") != -1, cmdline)):
- openvpn_process = p
- break
- except psutil_AccessDenied:
- pass
- return openvpn_process
-
- def stop_if_already_running(self):
- """
- Checks if VPN is already running and tries to stop it.
+ shutil.rmtree(tempfolder)
+ except OSError:
+ self.log.error(
+ 'Could not delete tmpfolder %s' % tempfolder)
- Might raise OpenVPNAlreadyRunning.
+def _get_openvpn_process():
+ """
+ Looks for openvpn instances running.
- :return: True if stopped, False otherwise
+ :rtype: process
+ """
+ openvpn_process = None
+ for p in psutil.process_iter():
+ try:
+ # XXX Not exact!
+ # Will give false positives.
+ # we should check that cmdline BEGINS
+ # with openvpn or with our wrapper
+ # (pkexec / osascript / whatever)
+
+ # This needs more work, see #3268, but for the moment
+ # we need to be able to filter out arguments in the form
+ # --openvpn-foo, since otherwise we are shooting ourselves
+ # in the feet.
+
+ if PSUTIL_2:
+ cmdline = p.cmdline()
+ else:
+ cmdline = p.cmdline
+ if any(map(lambda s: s.find(
+ "LEAPOPENVPN") != -1, cmdline)):
+ openvpn_process = p
+ break
+ except psutil_AccessDenied:
+ pass
+ return openvpn_process
+
+def _stop_if_already_running():
+ """
+ Checks if VPN is already running and tries to stop it.
- """
- process = self.get_openvpn_process()
- if not process:
- self.log.debug('Could not find openvpn process while '
- 'trying to stop it.')
- return
+ Might raise OpenVPNAlreadyRunning.
- self.log.debug('OpenVPN is already running, trying to stop it...')
- cmdline = process.cmdline
+ :return: True if stopped, False otherwise
- manag_flag = "--management"
+ """
+ process = _get_openvpn_process()
+ if not process:
+ self.log.debug('Could not find openvpn process while '
+ 'trying to stop it.')
+ return
- if isinstance(cmdline, list) and manag_flag in cmdline:
+ log.debug('OpenVPN is already running, trying to stop it...')
+ cmdline = process.cmdline
- # we know that our invocation has this distinctive fragment, so
- # we use this fingerprint to tell other invocations apart.
- # this might break if we change the configuration path in the
- # launchers
+ manag_flag = "--management"
- def smellslikeleap(s):
- return "leap" in s and "providers" in s
+ if isinstance(cmdline, list) and manag_flag in cmdline:
- if not any(map(smellslikeleap, cmdline)):
- self.log.debug("We cannot stop this instance since we do not "
- "recognise it as a leap invocation.")
- raise AlienOpenVPNAlreadyRunning
+ # we know that our invocation has this distinctive fragment, so
+ # we use this fingerprint to tell other invocations apart.
+ # this might break if we change the configuration path in the
+ # launchers
- try:
- index = cmdline.index(manag_flag)
- host = cmdline[index + 1]
- port = cmdline[index + 2]
- self.log.debug("Trying to connect to %s:%s"
- % (host, port))
- self._connect()
-
- # XXX this has a problem with connections to different
- # remotes. So the reconnection will only work when we are
- # terminating instances left running for the same provider.
- # If we are killing an openvpn instance configured for another
- # provider, we will get:
- # TLS Error: local/remote TLS keys are out of sync
- # However, that should be a rare case right now.
- self._send_command("signal SIGTERM")
- self._close_management_socket(announce=True)
- except (Exception, AssertionError):
- self.log.failure('Problem trying to terminate OpenVPN')
- else:
- self.log.debug('Could not find the expected openvpn command line.')
-
- process = self.get_openvpn_process()
- if process is None:
- self.log.debug('Successfully finished already running '
- 'openvpn process.')
- return True
- else:
- self.log.warn('Unable to terminate OpenVPN')
- raise OpenVPNAlreadyRunning
-
-
-def _first(things):
- try:
- return things[0]
- except (IndexError, TypeError):
- return None
+ def smellslikeleap(s):
+ return "leap" in s and "providers" in s
+
+ if not any(map(smellslikeleap, cmdline)):
+ self.log.debug("We cannot stop this instance since we do not "
+ "recognise it as a leap invocation.")
+ raise AlienOpenVPNAlreadyRunning
+
+ try:
+ index = cmdline.index(manag_flag)
+ host = cmdline[index + 1]
+ port = cmdline[index + 2]
+ self.log.debug("Trying to connect to %s:%s"
+ % (host, port))
+ _connect()
+
+ # XXX this has a problem with connections to different
+ # remotes. So the reconnection will only work when we are
+ # terminating instances left running for the same provider.
+ # If we are killing an openvpn instance configured for another
+ # provider, we will get:
+ # TLS Error: local/remote TLS keys are out of sync
+ # However, that should be a rare case right now.
+ self._send_command("signal SIGTERM")
+ except (Exception, AssertionError):
+ log.failure('Problem trying to terminate OpenVPN')
+ else:
+ log.debug('Could not find the expected openvpn command line.')
+
+ process = _get_openvpn_process()
+ if process is None:
+ self.log.debug('Successfully finished already running '
+ 'openvpn process.')
+ return True
+ else:
+ self.log.warn('Unable to terminate OpenVPN')
+ raise OpenVPNAlreadyRunning
diff --git a/src/leap/bitmask/vpn/_telnet.py b/src/leap/bitmask/vpn/_telnet.py
deleted file mode 100644
index cfc82ef..0000000
--- a/src/leap/bitmask/vpn/_telnet.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# -*- coding: utf-8 -*-
-# _telnet.py
-# Copyright (C) 2013-2017 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import socket
-import telnetlib
-
-
-class ConnectionRefusedError(Exception):
- pass
-
-
-class MissingSocketError(Exception):
- pass
-
-
-class UDSTelnet(telnetlib.Telnet):
- """
- A telnet-alike class, that can listen on unix domain sockets
- """
-
- def open(self, host, port=23, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
- """
- Connect to a host. If port is 'unix', it will open a
- connection over unix docmain sockets.
-
- The optional second argument is the port number, which
- defaults to the standard telnet port (23).
- Don't try to reopen an already connected instance.
- """
- self.eof = 0
- self.host = host
- self.port = port
- self.timeout = timeout
-
- if self.port == "unix":
- # unix sockets spoken
- if not os.path.exists(self.host):
- raise MissingSocketError()
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- self.sock.connect(self.host)
- except socket.error:
- raise ConnectionRefusedError()
- else:
- self.sock = socket.create_connection((host, port), timeout)
diff --git a/src/leap/bitmask/vpn/management.py b/src/leap/bitmask/vpn/management.py
new file mode 100644
index 0000000..b9bda6c
--- /dev/null
+++ b/src/leap/bitmask/vpn/management.py
@@ -0,0 +1,339 @@
+# -*- coding: utf-8 -*-
+# management.py
+# Copyright (c) 2012 Mike Mattice
+# Copyright (C) 2017 LEAP Encryption Access Project
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Handles an OpenVPN process through its Management Interface.
+"""
+
+import time
+from collections import OrderedDict
+
+from twisted.internet import defer
+from twisted.protocols.basic import LineReceiver
+from twisted.internet.defer import Deferred
+from twisted.python import log
+
+from zope.interface import Interface
+
+from _human import bytes2human
+
+
+class IStateListener(Interface):
+
+ def change_state(self, state):
+ pass
+
+
+class ManagementProtocol(LineReceiver):
+
+ def __init__(self, verbose=False):
+
+ self.verbose = verbose
+ self.state = None
+ self.remote = None
+ self.rport = None
+ self.traffic = TrafficCounter()
+ self.openvpn_version = ''
+ self.pid = None
+
+ self._defs = []
+ self._statelog = OrderedDict()
+ self._linebuf = []
+ self._state_listeners = set([])
+
+ def addStateListener(self, listener):
+ """
+ A Listener must implement change_state method,
+ and it will be called with a State object.
+ """
+ self._state_listeners.add(listener)
+
+ def getStateHistory(self):
+ return self._statelog
+
+ def lineReceived(self, line):
+ if self.verbose:
+ print line
+
+ if line[0] == '>':
+ try:
+ infotype, data = line[1:].split(':', 1)
+ infotype = infotype.replace('-', '_')
+ except Exception, msg:
+ print "failed to parse '%r': %s" % (line, msg)
+ raise
+ m = getattr(self, '_handle_%s' % infotype, None)
+ if m:
+ try:
+ m(data)
+ except Exception, msg:
+ print "Failure in _handle_%s: %s" % (infotype, msg)
+ else:
+ self._handle_unknown(infotype, data)
+ else:
+ if line.strip() == 'END':
+ try:
+ d = self._defs.pop(0)
+ d.callback('\n'.join(self._linebuf))
+ except IndexError:
+ pass
+ self._linebuf = []
+ return
+ try:
+ status, data = line.split(': ', 1)
+ except ValueError:
+ print "ERROR PARSING:", line
+ return
+ if status in ('ERROR', 'SUCCESS'):
+ try:
+ d = self._defs.pop(0)
+ if status == 'SUCCESS':
+ d.callback(line)
+ else:
+ d.errback(line)
+ except:
+ pass
+ else:
+ self._linebuf.append(line)
+
+ def _handle_unknown(self, infotype, data):
+ log.msg('Received unhandled infotype %s with data %s' %
+ (infotype, data))
+
+ def _handle_BYTECOUNT(self, data):
+ down, up = data.split(',')
+ self.traffic.update(down, up, time.time())
+
+ def _handle_ECHO(self, data):
+ pass
+
+ def _handle_FATAL(self, data):
+ pass
+
+ def _handle_HOLD(self, data):
+ pass
+
+ def _handle_INFO(self, data):
+ pass
+
+ def _handle_LOG(self, data):
+ pass
+
+ def _handle_NEED_OK(self, data):
+ pass
+
+ def _handle_NEED_STR(self, data):
+ pass
+
+ def _handle_STATE(self, data):
+ data = data.strip().split(',')
+ remote = rport = None
+ try:
+ if len(data) == 9:
+ (ts, state, verbose, localtun,
+ remote, rport, laddr, lport, ip6) = data
+ elif len(data) == 8:
+ ts, state = data[:2]
+ except Exception as exc:
+ print "ERROR", exc
+ log.error('Failure parsing data: %s' % exc)
+
+ if state != self.state:
+ now = time.time()
+ stateobj = State(state, ts)
+ self._statelog[now] = stateobj
+ for listener in self._state_listeners:
+ listener.change_state(stateobj)
+ self.state = stateobj
+ self.remote = remote
+ self.rport = rport
+
+ def _pushdef(self):
+ d = Deferred()
+ self._defs.append(d)
+ return d
+
+ def byteCount(self, interval=0):
+ d = self._pushdef()
+ self.sendLine('bytecount %d' % (interval,))
+ return d
+
+ def signal(self, signal='SIGTERM'):
+ d = self._pushdef()
+ self.sendLine('signal %s' % (signal,))
+ return d
+
+ def _parseHoldstatus(self, result):
+ return result.split('=')[0] == '1'
+
+ def hold(self, p=''):
+ d = self._pushdef()
+ self.sendLine('hold %s' % (p,))
+ if p == '':
+ d.addCallback(self._parseHoldstatus)
+ return d
+
+ def _parsePid(self, result):
+ self.pid = int(result.split('=')[1])
+
+ def get_pid(self):
+ d = self._pushdef()
+ self.sendLine('pid')
+ d.addCallback(self._parsePid)
+ return d
+
+ def logOn(self):
+ d = self._pushdef()
+ self.sendLine('log on')
+ return d
+
+ def stateOn(self):
+ d = self._pushdef()
+ self.sendLine('state on')
+ return d
+
+ def _parseVersion(self, data):
+ version = data.split('\n')[0].split(':')[1]
+ self.openvpn_version = version.strip()
+
+ def getVersion(self):
+ d = self._pushdef()
+ self.sendLine('version')
+ d.addCallback(self._parseVersion)
+ return d
+
+ def getInfo(self):
+ state = self._statelog.values()[-1]
+ return {
+ 'remote': self.remote,
+ 'rport': self.rport,
+ 'state': state.state,
+ 'state_simple': state.simple,
+ 'state_legend': state.legend,
+ 'openvpn_version': self.openvpn_version,
+ 'pid': self.pid,
+ 'traffic_down_total': self.traffic.down,
+ 'traffic_up_total': self.traffic.up}
+
+
+class State(object):
+
+ """
+ Possible States in an OpenVPN connection, according to the
+ OpenVPN Management documentation.
+ """
+
+ CONNECTING = 'CONNECTING'
+ WAIT = 'WAIT'
+ AUTH = 'AUTH'
+ GET_CONFIG = 'GET_CONFIG'
+ ASSIGN_IP = 'ASSIGN_IP'
+ ADD_ROUTES = 'ADD_ROUTES'
+ CONNECTED = 'CONNECTED'
+ RECONNECTING = 'RECONNECTING'
+ EXITING = 'EXITING'
+
+ OFF = 'OFF'
+ ON = 'ON'
+ STARTING = 'STARTING'
+ STOPPING = 'STOPPING'
+ FAILED = 'FAILED'
+
+ _legend = {
+ 'CONNECTING': 'Connecting to remote server',
+ 'WAIT': 'Waiting from initial response from server',
+ 'AUTH': 'Authenticating with server',
+ 'GET_CONFIG': 'Downloading configuration options from server',
+ 'ASSIGN_IP': 'Assigning IP address to virtual network interface',
+ 'ADD_ROUTES': 'Adding routes to system',
+ 'CONNECTED': 'Initialization Sequence Completed',
+ 'RECONNECTING': 'A restart has occurred',
+ 'EXITING': 'A graceful exit is in progress'
+ }
+
+ _simple = {
+ 'CONNECTING': STARTING,
+ 'WAIT': STARTING,
+ 'AUTH': STARTING,
+ 'GET_CONFIG': STARTING,
+ 'ASSIGN_IP': STARTING,
+ 'ADD_ROUTES': STARTING,
+ 'CONNECTED': ON,
+ 'RECONNECTING': STARTING,
+ 'EXITING': STOPPING
+ }
+
+ def __init__(self, state, timestamp):
+ self.state = state
+ self.timestamp = timestamp
+
+ @classmethod
+ def get_legend(cls, state):
+ return cls._legend.get(state)
+
+ @classmethod
+ def get_simple(cls, state):
+ return cls._simple.get(state)
+
+ @property
+ def simple(self):
+ return self.get_simple(self.state)
+
+ @property
+ def legend(self):
+ return self.get_legend(self.state)
+
+ def __repr__(self):
+ return '<State: %s [%s]>' % (
+ self.state, time.ctime(int(self.timestamp)))
+
+
+class TrafficCounter(object):
+
+ CAPACITY = 60
+
+ def __init__(self):
+ self.down = None
+ self.up = None
+ self._buf = OrderedDict()
+
+ def update(self, down, up, ts):
+ i_down = int(down)
+ i_up = int(up)
+ self.down = i_down
+ self.up = i_up
+ if len(self._buf) > self.CAPACITY:
+ self._buf.pop(self._buf.keys()[0])
+ self._buf[ts] = i_down, i_up
+
+ def get_rate(self, human=True):
+ points = self._buf.items()
+ if len(points) < 2:
+ return ['NA', 'NA']
+ ts1, prev = points[-2]
+ ts2, last = points[-1]
+ rate_down = _get_rate(last[0], prev[0], ts2, ts1)
+ rate_up = _get_rate(last[1], prev[1], ts2, ts1)
+ rates = rate_down, rate_up
+ if human:
+ rates = map(bytes2human, rates)
+ return rates
+
+
+def _get_rate(p2, p1, ts2, ts1):
+ return ((1.0 * (p2 - p1)) / (ts2 - ts1))