diff --git a/CHANGES.txt b/CHANGES.txt
new file mode 100644
index 0000000..e92537c
--- /dev/null
+++ b/CHANGES.txt
@@ -0,0 +1 @@
-- 0.1.0 initial release
@@ -0,0 +1,37 @@
+= LEAP =
+= The Internet Encryption Toolkit =
+--- python-leap-client-0.1.0.orig/docs/leap.1
++++ python-leap-client-0.1.0/docs/leap.1
+@@ -29,6 +29,6 @@
+ You can report bugs at the bugtracker site of leap:
+-Kali <kaliyuga at riseup dot net>
++This manpage written by kali <kaliyuga at riseup dot net> for the debian package, but obviously can be used for any other distribution.
+ .BR PolicyKit.conf (7)
+override_dh_installchangelogs dh_installchangelogs
diff --git a/debian/python-leap-client.doc-base.EX b/debian/python-leap-client.doc-base.EX
+ @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
+ $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
+ @echo
+ @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
+ $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
+ @echo
+ @echo "The overview file is in $(BUILDDIR)/changes."
+ $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
+ @echo
+ @echo "Link check complete; look for any errors in the above output " \
+ "or in $(BUILDDIR)/linkcheck/output.txt."
+ $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
+ @echo "Testing of doctests in the sources finished, look at the " \
+ "results in $(BUILDDIR)/doctest/output.txt."
diff --git a/setup/linux/leap.desktop b/setup/linux/leap.desktop
+Comment[sv]=Anonymitet och avlyssningsskydd
diff --git a/setup/linux/polkit/net.openvpn.gui.leap.policy b/setup/linux/polkit/net.openvpn.gui.leap.policy
new file mode 100644
index 0000000..70a22b6
--- /dev/null
+++ b/setup/linux/polkit/net.openvpn.gui.leap.policy
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE policyconfig PUBLIC
+ "-//freedesktop//DTD PolicyKit Policy Configuration 1.0//EN"
+ "">
+ <vendor>LEAP Project</vendor>
+ <vendor_url></vendor_url>
+ <action id="net.openvpn,">
+ <description>Runs the openvpn binary</description>
+ <description xml:lang="es">Ejecuta el binario openvpn</description>
+ <message>OpenVPN needs that you authenticate to start</message>
+ <message xml:lang="es">OpenVPN necesita autorizacion para comenzar</message>
+ <icon_name>package-x-generic</icon_name>
+ <defaults>
+ <allow_any>auth_self_keep</allow_any>
+ <allow_inactive>auth_self_keep</allow_inactive>
+ <allow_active>auth_self_keep</allow_active>
+ </defaults>
+ <annotate key="org.freedesktop.policykit.exec.path">/usr/sbin/openvpn</annotate>
+ </action>
diff --git a/src/leap/ b/src/leap/
new file mode 100644
index 0000000..0a61fd4
--- /dev/null
+++ b/src/leap/
@@ -0,0 +1,41 @@
+import logging
+# This is only needed for Python v2 but is harmless for Python v3.
+import sip
+sip.setapi('QVariant', 2)
+from PyQt4.QtGui import (QApplication, QSystemTrayIcon, QMessageBox)
+from leap.baseapp.mainwindow import LeapWindow
+logger = logging.getLogger(name=__name__)
+def main():
+ """
+ launches the main event loop
+ long live to the (hidden) leap window!
+ """
+ import sys
+ from leap.utils import leap_argparse
+ parser, opts = leap_argparse.init_leapc_args()
+ debug = getattr(opts, 'debug', False)
+ #XXX get debug level and set logger accordingly
+ if debug:
+ logger.debug('args: ', opts)
+ app = QApplication(sys.argv)
+ if not QSystemTrayIcon.isSystemTrayAvailable():
+ QMessageBox.critical(None, "Systray",
+ "I couldn't detect any \
+system tray on this system.")
+ sys.exit(1)
+ if not debug:
+ QApplication.setQuitOnLastWindowClosed(False)
+ window = LeapWindow(opts)
+ sys.exit(app.exec_())
+if __name__ == "__main__":
+ main()
diff --git a/src/leap/baseapp/ b/src/leap/baseapp/
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/leap/baseapp/
diff --git a/src/leap/baseapp/ b/src/leap/baseapp/
new file mode 100644
index 0000000..efdb472
--- /dev/null
+++ b/src/leap/baseapp/
@@ -0,0 +1,40 @@
+import ConfigParser
+import os
+def get_config(config_file=None):
+ """
+ temporary method for getting configs,
+ mainly for early stage development process.
+ in the future we will get preferences
+ from the storage api
+ """
+ config = ConfigParser.ConfigParser()
+ #config.readfp(open('defaults.cfg'))
+ #XXX does this work on win / mac also???
+ conf_path_list = ['eip.cfg', # XXX build a
+ # proper path with platform-specific places
+ # XXX make .config/foo
+ os.path.expanduser('~/.eip.cfg')]
+ if config_file:
+ config.readfp(config_file)
+ else:
+ return config
+# XXX wrapper around config? to get default values
+def get_with_defaults(config, section, option):
+ if config.has_option(section, option):
+ return config.get(section, option)
+ else:
+ # XXX lookup in defaults dict???
+ pass
+def get_vpn_stdout_mockup():
+ command = "python"
+ args = ["-u", "-c", "from eip_client import fakeclient;\
+ return command, args
diff --git a/src/leap/baseapp/ b/src/leap/baseapp/
new file mode 100644
index 0000000..68b6de8
--- /dev/null
+++ b/src/leap/baseapp/
@@ -0,0 +1,398 @@
+# vim: set fileencoding=utf-8 :
+#!/usr/bin/env python
+import logging
+import time
+logger = logging.getLogger(name=__name__)
+from PyQt4.QtGui import (QMainWindow, QWidget, QVBoxLayout, QMessageBox,
+ QSystemTrayIcon, QGroupBox, QLabel, QPixmap,
+ QHBoxLayout, QIcon,
+ QPushButton, QGridLayout, QAction, QMenu,
+ QTextBrowser, qApp)
+from PyQt4.QtCore import (pyqtSlot, pyqtSignal, QTimer)
+from leap.gui import mainwindow_rc
+from leap.eip.conductor import EIPConductor
+class LeapWindow(QMainWindow):
+ #XXX tbd: refactor into model / view / controller
+ #and put in its own modules...
+ newLogLine = pyqtSignal([str])
+ statusChange = pyqtSignal([object])
+ def __init__(self, opts):
+ super(LeapWindow, self).__init__()
+ self.debugmode = getattr(opts, 'debug', False)
+ self.vpn_service_started = False
+ self.createWindowHeader()
+ self.createIconGroupBox()
+ self.createActions()
+ self.createTrayIcon()
+ if self.debugmode:
+ self.createLogBrowser()
+ # create timer
+ self.timer = QTimer()
+ # bind signals
+ self.trayIcon.activated.connect(self.iconActivated)
+ self.newLogLine.connect(self.onLoggerNewLine)
+ self.statusChange.connect(self.onStatusChange)
+ self.timer.timeout.connect(self.onTimerTick)
+ widget = QWidget()
+ self.setCentralWidget(widget)
+ # add widgets to layout
+ mainLayout = QVBoxLayout()
+ mainLayout.addWidget(self.headerBox)
+ mainLayout.addWidget(self.statusIconBox)
+ if self.debugmode:
+ mainLayout.addWidget(self.statusBox)
+ mainLayout.addWidget(self.loggerBox)
+ widget.setLayout(mainLayout)
+ #
+ # conductor is in charge of all
+ # vpn-related configuration / monitoring.
+ # we pass a tuple of signals that will be
+ # triggered when status changes.
+ #
+ config_file = getattr(opts, 'config_file', None)
+ self.conductor = EIPConductor(
+ watcher_cb=self.newLogLine.emit,
+ config_file=config_file,
+ status_signals=(self.statusChange.emit, ))
+ self.setWindowTitle("Leap")
+ self.resize(400, 300)
+ self.set_statusbarMessage('ready')
+ if self.conductor.autostart:
+ self.start_or_stopVPN()
+ def closeEvent(self, event):
+ """
+ redefines close event (persistent window behaviour)
+ """
+ if self.trayIcon.isVisible() and not self.debugmode:
+ QMessageBox.information(self, "Systray",
+ "The program will keep running "
+ "in the system tray. To "
+ "terminate the program, choose "
+ "<b>Quit</b> in the "
+ "context menu of the system tray entry.")
+ self.hide()
+ event.ignore()
+ if self.debugmode:
+ self.cleanupAndQuit()
+ def setIcon(self, name):
+ icon = self.Icons.get(name)
+ self.trayIcon.setIcon(icon)
+ self.setWindowIcon(icon)
+ def setToolTip(self):
+ """
+ get readable status and place it on systray tooltip
+ """
+ status = self.conductor.status.get_readable_status()
+ self.trayIcon.setToolTip(status)
+ def iconActivated(self, reason):
+ """
+ handles left click, left double click
+ showing the trayicon menu
+ """
+ #XXX there's a bug here!
+ #menu shows on (0,0) corner first time,
+ #until double clicked at least once.
+ if reason in (QSystemTrayIcon.Trigger,
+ QSystemTrayIcon.DoubleClick):
+ def createWindowHeader(self):
+ """
+ description lines for main window
+ """
+ #XXX good candidate to refactor out! :)
+ self.headerBox = QGroupBox()
+ self.headerLabel = QLabel("<font size=40><b>E</b>ncryption \
+<b>I</b>nternet <b>P</b>roxy</font>")
+ self.headerLabelSub = QLabel("<i>trust your \
+ pixmap = QPixmap(':/images/leapfrog.jpg')
+ frog_lbl = QLabel()
+ frog_lbl.setPixmap(pixmap)
+ headerLayout = QHBoxLayout()
+ headerLayout.addWidget(frog_lbl)
+ headerLayout.addWidget(self.headerLabel)
+ headerLayout.addWidget(self.headerLabelSub)
+ headerLayout.addStretch()
+ self.headerBox.setLayout(headerLayout)
+ def getIcon(self, icon_name):
+ # XXX get from connection dict
+ icons = {'disconnected': 0,
+ 'connecting': 1,
+ 'connected': 2}
+ return icons.get(icon_name, None)
+ def createIconGroupBox(self):
+ """
+ dummy icongroupbox
+ (to be removed from here -- reference only)
+ """
+ icons = {
+ 'disconnected': ':/images/conn_error.png',
+ 'connecting': ':/images/conn_connecting.png',
+ 'connected': ':/images/conn_connected.png'
+ }
+ con_widgets = {
+ 'disconnected': QLabel(),
+ 'connecting': QLabel(),
+ 'connected': QLabel(),
+ }
+ con_widgets['disconnected'].setPixmap(
+ QPixmap(icons['disconnected']))
+ con_widgets['connecting'].setPixmap(
+ QPixmap(icons['connecting']))
+ con_widgets['connected'].setPixmap(
+ QPixmap(icons['connected'])),
+ self.ConnectionWidgets = con_widgets
+ con_icons = {
+ 'disconnected': QIcon(icons['disconnected']),
+ 'connecting': QIcon(icons['connecting']),
+ 'connected': QIcon(icons['connected'])
+ }
+ self.Icons = con_icons
+ self.statusIconBox = QGroupBox("Connection Status")
+ statusIconLayout = QHBoxLayout()
+ statusIconLayout.addWidget(self.ConnectionWidgets['disconnected'])
+ statusIconLayout.addWidget(self.ConnectionWidgets['connecting'])
+ statusIconLayout.addWidget(self.ConnectionWidgets['connected'])
+ statusIconLayout.itemAt(1).widget().hide()
+ statusIconLayout.itemAt(2).widget().hide()
+ self.statusIconBox.setLayout(statusIconLayout)
+ def createActions(self):
+ """
+ creates actions to be binded to tray icon
+ """
+ self.connectVPNAction = QAction("Connect to &VPN", self,
+ triggered=self.hide)
+ # XXX change action name on (dis)connect
+ self.dis_connectAction = QAction("&(Dis)connect", self,
+ triggered=self.start_or_stopVPN)
+ self.minimizeAction = QAction("Mi&nimize", self,
+ triggered=self.hide)
+ self.maximizeAction = QAction("Ma&ximize", self,
+ triggered=self.showMaximized)
+ self.restoreAction = QAction("&Restore", self,
+ triggered=self.showNormal)
+ self.quitAction = QAction("&Quit", self,
+ triggered=self.cleanupAndQuit)
+ def createTrayIcon(self):
+ """
+ creates the tray icon
+ """
+ self.trayIconMenu = QMenu(self)
+ self.trayIconMenu.addAction(self.connectVPNAction)
+ self.trayIconMenu.addAction(self.dis_connectAction)
+ self.trayIconMenu.addSeparator()
+ self.trayIconMenu.addAction(self.minimizeAction)
+ self.trayIconMenu.addAction(self.maximizeAction)
+ self.trayIconMenu.addAction(self.restoreAction)
+ self.trayIconMenu.addSeparator()
+ self.trayIconMenu.addAction(self.quitAction)
+ self.trayIcon = QSystemTrayIcon(self)
+ self.trayIcon.setContextMenu(self.trayIconMenu)
+ def createLogBrowser(self):
+ """
+ creates Browser widget for displaying logs
+ (in debug mode only).
+ """
+ self.loggerBox = QGroupBox()
+ logging_layout = QVBoxLayout()
+ self.logbrowser = QTextBrowser()
+ startStopButton = QPushButton("&Connect")
+ startStopButton.clicked.connect(self.start_or_stopVPN)
+ self.startStopButton = startStopButton
+ logging_layout.addWidget(self.logbrowser)
+ logging_layout.addWidget(self.startStopButton)
+ self.loggerBox.setLayout(logging_layout)
+ # status box
+ self.statusBox = QGroupBox()
+ grid = QGridLayout()
+ self.updateTS = QLabel('')
+ self.status_label = QLabel('Disconnected')
+ self.ip_label = QLabel('')
+ self.remote_label = QLabel('')
+ tun_read_label = QLabel("tun read")
+ self.tun_read_bytes = QLabel("0")
+ tun_write_label = QLabel("tun write")
+ self.tun_write_bytes = QLabel("0")
+ grid.addWidget(self.updateTS, 0, 0)
+ grid.addWidget(self.status_label, 0, 1)
+ grid.addWidget(self.ip_label, 1, 0)
+ grid.addWidget(self.remote_label, 1, 1)
+ grid.addWidget(tun_read_label, 2, 0)
+ grid.addWidget(self.tun_read_bytes, 2, 1)
+ grid.addWidget(tun_write_label, 3, 0)
+ grid.addWidget(self.tun_write_bytes, 3, 1)
+ self.statusBox.setLayout(grid)
+ @pyqtSlot(str)
+ def onLoggerNewLine(self, line):
+ """
+ simple slot: writes new line to logger Pane.
+ """
+ if self.debugmode:
+ self.logbrowser.append(line[:-1])
+ def set_statusbarMessage(self, msg):
+ self.statusBar().showMessage(msg)
+ @pyqtSlot(object)
+ def onStatusChange(self, status):
+ """
+ slot for status changes. triggers new signals for
+ updating icon, status bar, etc.
+ """
+ print('STATUS CHANGED! (on Qt-land)')
+ print('%s -> %s' % (status.previous, status.current))
+ icon_name = self.conductor.get_icon_name()
+ self.setIcon(icon_name)
+ print 'icon = ', icon_name
+ # change connection pixmap widget
+ self.setConnWidget(icon_name)
+ def setConnWidget(self, icon_name):
+ #print 'changing icon to %s' % icon_name
+ oldlayout = self.statusIconBox.layout()
+ # XXX reuse with icons
+ # XXX move states to StateWidget
+ states = {"disconnected": 0,
+ "connecting": 1,
+ "connected": 2}
+ for i in range(3):
+ oldlayout.itemAt(i).widget().hide()
+ new = states[icon_name]
+ oldlayout.itemAt(new).widget().show()
+ @pyqtSlot()
+ def start_or_stopVPN(self):
+ """
+ stub for running child process with vpn
+ """
+ if self.vpn_service_started is False:
+ self.conductor.connect()
+ if self.debugmode:
+ self.startStopButton.setText('&Disconnect')
+ self.vpn_service_started = True
+ # XXX what is optimum polling interval?
+ # too little is overkill, too much
+ # will miss transition states..
+ self.timer.start(250.0)
+ return
+ if self.vpn_service_started is True:
+ self.conductor.disconnect()
+ # FIXME this should trigger also
+ # statuschange event. why isn't working??
+ if self.debugmode:
+ self.startStopButton.setText('&Connect')
+ self.vpn_service_started = False
+ self.timer.stop()
+ return
+ @pyqtSlot()
+ def onTimerTick(self):
+ self.statusUpdate()
+ @pyqtSlot()
+ def statusUpdate(self):
+ """
+ called on timer tick
+ polls status and updates ui with real time
+ info about transferred bytes / connection state.
+ """
+ # XXX it's too expensive to poll
+ # continously. move to signal events instead.
+ if not self.vpn_service_started:
+ return
+ # XXX remove all access to manager layer
+ # from here.
+ if self.conductor.manager.with_errors:
+ #XXX how to wait on pkexec???
+ #something better that this workaround, plz!!
+ time.sleep(10)
+ print('errors. disconnect.')
+ self.start_or_stopVPN() # is stop
+ state = self.conductor.poll_connection_state()
+ if not state:
+ return
+ ts, con_status, ok, ip, remote = state
+ self.set_statusbarMessage(con_status)
+ self.setToolTip()
+ ts = time.strftime("%a %b %d %X", ts)
+ if self.debugmode:
+ self.updateTS.setText(ts)
+ self.status_label.setText(con_status)
+ self.ip_label.setText(ip)
+ self.remote_label.setText(remote)
+ # status i/o
+ status = self.conductor.manager.get_status_io()
+ if status and self.debugmode:
+ #XXX move this to systray menu indicators
+ ts, (tun_read, tun_write, tcp_read, tcp_write, auth_read) = status
+ ts = time.strftime("%a %b %d %X", ts)
+ self.updateTS.setText(ts)
+ self.tun_read_bytes.setText(tun_read)
+ self.tun_write_bytes.setText(tun_write)
+ def cleanupAndQuit(self):
+ """
+ cleans state before shutting down app.
+ """
+ # TODO:make sure to shutdown all child process / threads
+ # in conductor
+ self.conductor.cleanup()
+ qApp.quit()
diff --git a/src/leap/eip/ b/src/leap/eip/
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/leap/eip/
diff --git a/src/leap/eip/ b/src/leap/eip/
new file mode 100644
index 0000000..e3adadc
--- /dev/null
+++ b/src/leap/eip/
@@ -0,0 +1,272 @@
+stablishes a vpn connection and monitors its state
+from __future__ import (division, unicode_literals, print_function)
+#import threading
+from functools import partial
+import logging
+from leap.utils.coroutines import spawn_and_watch_process
+from leap.baseapp.config import get_config, get_vpn_stdout_mockup
+from leap.eip.vpnwatcher import EIPConnectionStatus, status_watcher
+from leap.eip.vpnmanager import OpenVPNManager, ConnectionRefusedError
+logger = logging.getLogger(name=__name__)
+# TODO Move exceptions to their own module
+class ConnectionError(Exception):
+ """
+ generic connection error
+ """
+ pass
+class EIPClientError(Exception):
+ """
+ base EIPClient exception
+ """
+ def __str__(self):
+ if len(self.args) >= 1:
+ return repr(self.args[0])
+ else:
+ return ConnectionError
+class UnrecoverableError(EIPClientError):
+ """
+ we cannot do anything about it, sorry
+ """
+ pass
+class OpenVPNConnection(object):
+ """
+ All related to invocation
+ of the openvpn binary
+ """
+ # Connection Methods
+ def __init__(self, config_file=None, watcher_cb=None):
+ #change watcher_cb to line_observer
+ """
+ :param config_file: configuration file to read from
+ :param watcher_cb: callback to be \
+called for each line in watched stdout
+ :param signal_map: dictionary of signal names and callables \
+to be triggered for each one of them.
+ :type config_file: str
+ :type watcher_cb: function
+ :type signal_map: dict
+ """
+ # XXX get host/port from config
+ self.manager = OpenVPNManager()
+ self.config_file = config_file
+ self.watcher_cb = watcher_cb
+ #self.signal_maps = signal_maps
+ self.subp = None
+ self.watcher = None
+ self.server = None
+ self.port = None
+ self.proto = None
+ self.autostart = True
+ self._get_config()
+ def _set_command_mockup(self):
+ """
+ sets command and args for a command mockup
+ that just mimics the output from the real thing
+ """
+ command, args = get_vpn_stdout_mockup()
+ self.command, self.args = command, args
+ def _get_config(self):
+ """
+ retrieves the config options from defaults or
+ home file, or config file passed in command line.
+ """
+ config = get_config(config_file=self.config_file)
+ self.config = config
+ if config.has_option('openvpn', 'command'):
+ commandline = config.get('openvpn', 'command')
+ if commandline == "mockup":
+ self._set_command_mockup()
+ return
+ command_split = commandline.split(' ')
+ command = command_split[0]
+ if len(command_split) > 1:
+ args = command_split[1:]
+ else:
+ args = []
+ self.command = command
+ #print("debug: command = %s" % command)
+ self.args = args
+ else:
+ self._set_command_mockup()
+ if config.has_option('openvpn', 'autostart'):
+ autostart = config.get('openvpn', 'autostart')
+ self.autostart = autostart
+ def _launch_openvpn(self):
+ """
+ invocation of openvpn binaries in a subprocess.
+ """
+ #deprecate watcher_cb,
+ #use _only_ signal_maps instead
+ if self.watcher_cb is not None:
+ linewrite_callback = self.watcher_cb
+ else:
+ #XXX get logger instead
+ linewrite_callback = lambda line: print('watcher: %s' % line)
+ observers = (linewrite_callback,
+ partial(status_watcher, self.status))
+ subp, watcher = spawn_and_watch_process(
+ self.command,
+ self.args,
+ observers=observers)
+ self.subp = subp
+ self.watcher = watcher
+ conn_result = self.status.CONNECTED
+ return conn_result
+ def _try_connection(self):
+ """
+ attempts to connect
+ """
+ if self.subp is not None:
+ print('cowardly refusing to launch subprocess again')
+ return
+ self._launch_openvpn()
+ def cleanup(self):
+ """
+ terminates child subprocess
+ """
+ if self.subp:
+ self.subp.terminate()
+class EIPConductor(OpenVPNConnection):
+ """
+ Manages the execution of the OpenVPN process, auto starts, monitors the
+ network connection, handles configuration, fixes leaky hosts, handles
+ errors, etc.
+ Preferences will be stored via the Storage API. (TBD)
+ Status updates (connected, bandwidth, etc) are signaled to the GUI.
+ """
+ def __init__(self, *args, **kwargs):
+ self.settingsfile = kwargs.get('settingsfile', None)
+ self.logfile = kwargs.get('logfile', None)
+ self.error_queue = []
+ self.desired_con_state = None # ???
+ status_signals = kwargs.pop('status_signals', None)
+ self.status = EIPConnectionStatus(callbacks=status_signals)
+ super(EIPConductor, self).__init__(*args, **kwargs)
+ def connect(self):
+ """
+ entry point for connection process
+ """
+ self.manager.forget_errors()
+ self._try_connection()
+ # XXX should capture errors?
+ def disconnect(self):
+ """
+ disconnects client
+ """
+ self._disconnect()
+ self.status.change_to(self.status.DISCONNECTED)
+ pass
+ def shutdown(self):
+ """
+ shutdown and quit
+ """
+ self.desired_con_state = self.status.DISCONNECTED
+ def connection_state(self):
+ """
+ returns the current connection state
+ """
+ return self.status.current
+ def desired_connection_state(self):
+ """
+ returns the desired_connection state
+ """
+ return self.desired_con_state
+ def poll_connection_state(self):
+ """
+ """
+ try:
+ state = self.manager.get_connection_state()
+ except ConnectionRefusedError:
+ # connection refused. might be not ready yet.
+ return
+ if not state:
+ return
+ (ts, status_step,
+ ok, ip, remote) = state
+ self.status.set_vpn_state(status_step)
+ status_step = self.status.get_readable_status()
+ return (ts, status_step, ok, ip, remote)
+ def get_icon_name(self):
+ """
+ get icon name from status object
+ """
+ return self.status.get_state_icon()
+ #
+ # private methods
+ #
+ def _disconnect(self):
+ """
+ private method for disconnecting
+ """
+ if self.subp is not None:
+ self.subp.terminate()
+ self.subp = None
+ # XXX signal state changes! :)
+ def _is_alive(self):
+ """
+ don't know yet
+ """
+ pass
+ def _connect(self):
+ """
+ entry point for connection cascade methods.
+ """
+ #conn_result = ConState.DISCONNECTED
+ try:
+ conn_result = self._try_connection()
+ except UnrecoverableError as except_msg:
+ logger.error("FATAL: %s" % unicode(except_msg))
+ conn_result = self.status.UNRECOVERABLE
+ except Exception as except_msg:
+ self.error_queue.append(except_msg)
+ logger.error("Failed Connection: %s" %
+ unicode(except_msg))
+ return conn_result
diff --git a/src/leap/eip/ b/src/leap/eip/
new file mode 100644
index 0000000..78777cf
--- /dev/null
+++ b/src/leap/eip/
@@ -0,0 +1,262 @@
+from __future__ import (print_function)
+import logging
+import os
+import socket
+import telnetlib
+import time
+logger = logging.getLogger(name=__name__)
+class MissingSocketError(Exception):
+ pass
+class ConnectionRefusedError(Exception):
+ pass
+class UDSTelnet(telnetlib.Telnet):
+ def open(self, host, port=0, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
+ """Connect to a host. If port is 'unix', it
+ will open a connection over unix docmain sockets.
+ The optional second argument is the port number, which
+ defaults to the standard telnet port (23).
+ Don't try to reopen an already connected instance.
+ """
+ self.eof = 0
+ if not port:
+ port = TELNET_PORT
+ = host
+ self.port = port
+ self.timeout = timeout
+ if self.port == "unix":
+ # unix sockets spoken
+ if not os.path.exists(
+ raise MissingSocketError
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ self.sock.connect(
+ except socket.error:
+ raise ConnectionRefusedError
+ else:
+ self.sock = socket.create_connection((host, port), timeout)
+# this class based in code from cube-routed project
+class OpenVPNManager(object):
+ """
+ Run commands over OpenVPN management interface
+ and parses the output.
+ """
+ # XXX might need a lock to avoid
+ # race conditions here...
+ def __init__(self, host="/tmp/.eip.sock", port="unix", password=None):
+ #XXX hardcoded host here. change.
+ = host
+ if isinstance(port, str) and port.isdigit():
+ port = int(port)
+ self.port = port
+ self.password = password
+ = None
+ #XXX workaround for signaling
+ #the ui that we don't know how to
+ #manage a connection error
+ self.with_errors = False
+ def forget_errors(self):
+ print('forgetting errors')
+ self.with_errors = False
+ def connect(self):
+ """Connect to openvpn management interface"""
+ try:
+ self.close()
+ except:
+ #XXX don't like this general
+ #catch here.
+ pass
+ if self.connected():
+ return True
+ = UDSTelnet(, self.port)
+ # XXX make password optional
+ # specially for win plat. we should generate
+ # the pass on the fly when invoking manager
+ # from conductor
+ + '\n')
+'SUCCESS:', 2)
+ self._seek_to_eof()
+ self.forget_errors()
+ return True
+ def _seek_to_eof(self):
+ """
+ Read as much as available. Position seek pointer to end of stream
+ """
+ b =
+ while b:
+ b =
+ def connected(self):
+ """
+ Returns True if connected
+ rtype: bool
+ """
+ #return bool(getattr(self, 'tn', None))
+ try:
+ assert
+ return True
+ except:
+ #XXX get rid of
+ #this pokemon exception!!!
+ return False
+ def close(self, announce=True):
+ """
+ Close connection to openvpn management interface
+ """
+ if announce:
+ del
+ def _send_command(self, cmd, tries=0):
+ """
+ Send a command to openvpn and return response as list
+ """
+ if tries > 3:
+ return []
+ if not self.connected():
+ try:
+ self.connect()
+ except MissingSocketError:
+ #XXX capture more helpful error
+ #messages
+ #pass
+ return self.make_error()
+ try:
+ + "\n")
+ except socket.error:
+ logger.error('socket error')
+ print('socket error!')
+ self.close(announce=False)
+ self._send_command(cmd, tries=tries + 1)
+ return []
+ buf ="END", 2)
+ self._seek_to_eof()
+ blist = buf.split('\r\n')
+ if blist[-1].startswith('END'):
+ del blist[-1]
+ return blist
+ else:
+ return []
+ def _send_short_command(self, cmd):
+ """
+ parse output from commands that are
+ delimited by "success" instead
+ """
+ if not self.connected():
+ self.connect()
+ + "\n")
+ # XXX not working?
+ buf ="SUCCESS", 2)
+ self._seek_to_eof()
+ blist = buf.split('\r\n')
+ return blist
+ #
+ # useful vpn commands
+ #
+ def pid(self):
+ #XXX broken
+ return self._send_short_command("pid")
+ def make_error(self):
+ """
+ capture error and wrap it in an
+ understandable format
+ """
+ #XXX get helpful error codes
+ self.with_errors = True
+ now = int(time.time())
+ return '%s,LAUNCHER ERROR,ERROR,-,-' % now
+ def state(self):
+ """
+ OpenVPN command: state
+ """
+ state = self._send_command("state")
+ if not state:
+ return None
+ if isinstance(state, str):
+ return state
+ if isinstance(state, list):
+ if len(state) == 1:
+ return state[0]
+ else:
+ return state[-1]
+ def status(self):
+ """
+ OpenVPN command: status
+ """
+ status = self._send_command("status")
+ return status
+ def status2(self):
+ """
+ OpenVPN command: last 2 statuses
+ """
+ return self._send_command("status 2")
+ #
+ # parse info
+ #
+ def get_status_io(self):
+ status = self.status()
+ if isinstance(status, str):
+ lines = status.split('\n')
+ if isinstance(status, list):
+ lines = status
+ try:
+ (header, when, tun_read, tun_write,
+ tcp_read, tcp_write, auth_read) = tuple(lines)
+ except ValueError:
+ return None
+ when_ts = time.strptime(when.split(',')[1], "%a %b %d %H:%M:%S %Y")
+ sep = ','
+ # XXX cleanup!
+ tun_read = tun_read.split(sep)[1]
+ tun_write = tun_write.split(sep)[1]
+ tcp_read = tcp_read.split(sep)[1]
+ tcp_write = tcp_write.split(sep)[1]
+ auth_read = auth_read.split(sep)[1]
+ # XXX this could be a named tuple. prettier.
+ return when_ts, (tun_read, tun_write, tcp_read, tcp_write, auth_read)
+ def get_connection_state(self):
+ state = self.state()
+ if state is not None:
+ ts, status_step, ok, ip, remote = state.split(',')
+ ts = time.gmtime(float(ts))
+ # XXX this could be a named tuple. prettier.
+ return ts, status_step, ok, ip, remote
diff --git a/src/leap/eip/ b/src/leap/eip/
+"""generic watcher object that keeps track of connection status"""
+# This should be deprecated in favor of daemon mode + management
+# interface. But we can leave it here for debug purposes.
+class EIPConnectionStatus(object):
+ """
+ Keep track of client (gui) and openvpn
+ states.
+ These are the OpenVPN states:
+ CONNECTING -- OpenVPN's initial state.
+ WAIT -- (Client only) Waiting for initial response
+ from server.
+ AUTH -- (Client only) Authenticating with server.
+ GET_CONFIG -- (Client only) Downloading configuration options
+ from server.
+ ASSIGN_IP -- Assigning IP address to virtual network
+ interface.
+ ADD_ROUTES -- Adding routes to system.
+ CONNECTED -- Initialization Sequence Completed.
+ RECONNECTING -- A restart has occurred.
+ EXITING -- A graceful exit is in progress.
+ We add some extra states:
+ DISCONNECTED -- GUI initial state.
+ UNRECOVERABLE -- An unrecoverable error has been raised
+ while invoking openvpn service.
+ """
+ WAIT = 2
+ AUTH = 3
+ # gui specific states:
+ def __init__(self, callbacks=None):
+ """
+ EIPConnectionStatus is initialized with a tuple
+ of signals to be triggered.
+ :param callbacks: a tuple of (callable) observers
+ :type callbacks: tuple
+ """
+ # (callbacks to connect to signals in Qt-land)
+ self.current = self.DISCONNECTED
+ self.previous = None
+ self.callbacks = callbacks
+ def get_readable_status(self):
+ # XXX DRY status / labels a little bit.
+ # think we'll want to i18n this.
+ human_status = {
+ 0: 'disconnected',
+ 1: 'connecting',
+ 2: 'waiting',
+ 3: 'authenticating',
+ 4: 'getting config',
+ 5: 'assigning ip',
+ 6: 'adding routes',
+ 7: 'connected',
+ 8: 'reconnecting',
+ 9: 'exiting',
+ 11: 'unrecoverable error',
+ }
+ return human_status[self.current]
+ def get_state_icon(self):
+ """
+ returns the high level icon
+ for each fine-grain openvpn state
+ """
+ connecting = (self.CONNECTING,
+ self.WAIT,
+ self.AUTH,
+ self.GET_CONFIG,
+ self.ASSIGN_IP,
+ self.ADD_ROUTES)
+ connected = (self.CONNECTED,)
+ disconnected = (self.DISCONNECTED,
+ # this can be made smarter,
+ # but it's like it'll change,
+ # so +readability.
+ if self.current in connecting:
+ return "connecting"
+ if self.current in connected:
+ return "connected"
+ if self.current in disconnected:
+ return "disconnected"
+ def set_vpn_state(self, status):
+ """
+ accepts a state string from the management
+ interface, and sets the internal state.
+ :param status: openvpn STATE (uppercase).
+ :type status: str
+ """
+ if hasattr(self, status):
+ self.change_to(getattr(self, status))
+ def set_current(self, to):
+ """
+ setter for the 'current' property
+ :param to: destination state
+ :type to: int
+ """
+ self.current = to
+ def change_to(self, to):
+ """
+ :param to: destination state
+ :type to: int
+ """
+ if to == self.current:
+ return
+ changed = False
+ from_ = self.current
+ self.current = to
+ # We can add transition restrictions
+ # here to ensure no transitions are
+ # allowed outside the fsm.
+ self.set_current(to)
+ changed = True
+ #trigger signals (as callbacks)
+ #print('current state: %s' % self.current)
+ if changed:
+ self.previous = from_
+ if self.callbacks:
+ for cb in self.callbacks:
+ if callable(cb):
+ cb(self)
+def status_watcher(cs, line):
+ """
+ a wrapper that calls to ConnectionStatus object
+ :param cs: a EIPConnectionStatus instance
+ :type cs: EIPConnectionStatus object
+ :param line: a single line of the watched output
+ :type line: str
+ """
+ #print('status watcher watching')
+ # from the mullvad code, should watch for
+ # things like:
+ # "Initialization Sequence Completed"
+ # "With Errors"
+ # "Tap-Win32"
+ if "Completed" in line:
+ cs.change_to(cs.CONNECTED)
+ return
+ if "Initial packet from" in line:
+ cs.change_to(cs.CONNECTING)
+ return
diff --git a/src/leap/gui/ b/src/leap/gui/
+# -*- coding: utf-8 -*-
+# Resource object code
+# Created: Sun Jul 22 17:08:49 2012
+# by: The Resource Compiler for PyQt (Qt v4.8.2)
+# WARNING! All changes made in this file will be lost!
+from PyQt4 import QtCore
+qt_resource_data = "\
+qt_resource_name = "\
+qt_resource_struct = "\
+def qInitResources():
+ QtCore.qRegisterResourceData(0x01, qt_resource_struct, qt_resource_name, qt_resource_data)
+def qCleanupResources():
+ QtCore.qUnregisterResourceData(0x01, qt_resource_struct, qt_resource_name, qt_resource_data)
+fakeoutput = """
+mullvad Sun Jun 17 14:34:57 2012 OpenVPN 2.2.1 i486-linux-gnu [SSL] [LZO2] [EPOLL] [PKCS11] [eurephia] [MH] [PF_INET6] [IPv6 payload 20110424-2 (2.2RC2)] built
+ on Mar 23 2012
+Sun Jun 17 14:34:57 2012 MANAGEMENT: TCP Socket listening on [AF_INET]
+Sun Jun 17 14:34:57 2012 NOTE: the current --script-security setting may allow this configuration to call user-defined scripts
+Sun Jun 17 14:34:57 2012 WARNING: file 'ssl/1021380964266.key' is group or others accessible
+Sun Jun 17 14:34:57 2012 LZO compression initialized
+Sun Jun 17 14:34:57 2012 Control Channel MTU parms [ L:1542 D:138 EF:38 EB:0 ET:0 EL:0 ]
+Sun Jun 17 14:34:57 2012 Socket Buffers: R=[163840->131072] S=[163840->131072]
+Sun Jun 17 14:34:57 2012 Data Channel MTU parms [ L:1542 D:1450 EF:42 EB:135 ET:0 EL:0 AF:3/1 ]
+Sun Jun 17 14:34:57 2012 Local Options hash (VER=V4): '41690919'
+Sun Jun 17 14:34:57 2012 Expected Remote Options hash (VER=V4): '530fdded'
+Sun Jun 17 14:34:57 2012 UDPv4 link local: [undef]
+Sun Jun 17 14:34:57 2012 UDPv4 link remote: [AF_INET]
+Sun Jun 17 14:34:57 2012 TLS: Initial packet from [AF_INET], sid=63c29ace 1d3060d0
+Sun Jun 17 14:34:58 2012 VERIFY OK: depth=2, /C=NA/ST=None/L=None/O=Mullvad/CN=Mullvad_CA/
+Sun Jun 17 14:34:58 2012 VERIFY OK: depth=1, /C=NA/ST=None/L=None/O=Mullvad/
+Sun Jun 17 14:34:58 2012 Validating certificate key usage
+Sun Jun 17 14:34:58 2012 ++ Certificate has key usage 00a0, expects 00a0
+Sun Jun 17 14:34:58 2012 VERIFY KU OK
+Sun Jun 17 14:34:58 2012 Validating certificate extended key usage
+Sun Jun 17 14:34:58 2012 ++ Certificate has EKU (str) TLS Web Server Authentication, expects TLS Web Server Authentication
+Sun Jun 17 14:34:58 2012 VERIFY EKU OK
+Sun Jun 17 14:34:58 2012 VERIFY OK: depth=0, /C=NA/ST=None/L=None/O=Mullvad/
+Sun Jun 17 14:34:59 2012 Data Channel Encrypt: Cipher 'BF-CBC' initialized with 128 bit key
+Sun Jun 17 14:34:59 2012 Data Channel Encrypt: Using 160 bit message hash 'SHA1' for HMAC authentication
+Sun Jun 17 14:34:59 2012 Data Channel Decrypt: Cipher 'BF-CBC' initialized with 128 bit key
+Sun Jun 17 14:34:59 2012 Data Channel Decrypt: Using 160 bit message hash 'SHA1' for HMAC authentication
+Sun Jun 17 14:34:59 2012 Control Channel: TLSv1, cipher TLSv1/SSLv3 DHE-RSA-AES256-SHA, 2048 bit RSA
+Sun Jun 17 14:34:59 2012 [] Peer Connection Initiated with [AF_INET]
+Sun Jun 17 14:35:01 2012 SENT CONTROL []: 'PUSH_REQUEST' (status=1)
+Sun Jun 17 14:35:02 2012 PUSH: Received control message: 'PUSH_REPLY,redirect-gateway def1 bypass-dhcp,dhcp-option DNS,route,topology net30,ifconfig'
+Sun Jun 17 14:35:02 2012 OPTIONS IMPORT: --ifconfig/up options modified
+Sun Jun 17 14:35:02 2012 OPTIONS IMPORT: route options modified
+Sun Jun 17 14:35:02 2012 OPTIONS IMPORT: --ip-win32 and/or --dhcp-option options modified
+Sun Jun 17 14:35:02 2012 ROUTE default_gateway=
+Sun Jun 17 14:35:02 2012 TUN/TAP device tun0 opened
+Sun Jun 17 14:35:02 2012 TUN/TAP TX queue length set to 100
+Sun Jun 17 14:35:02 2012 do_ifconfig, tt->ipv6=0, tt->did_ifconfig_ipv6_setup=0
+Sun Jun 17 14:35:02 2012 /sbin/ifconfig tun0 pointopoint mtu 1500
+Sun Jun 17 14:35:02 2012 /etc/openvpn/update-resolv-conf tun0 1500 1542 init
+dhcp-option DNS
+Sun Jun 17 14:35:05 2012 /sbin/route add -net netmask gw
+Sun Jun 17 14:35:05 2012 /sbin/route add -net netmask gw
+Sun Jun 17 14:35:05 2012 /sbin/route add -net netmask gw
+Sun Jun 17 14:35:05 2012 /sbin/route add -net netmask gw
+Sun Jun 17 14:35:05 2012 Initialization Sequence Completed
+Sun Jun 17 14:34:57 2012 MANAGEMENT: TCP Socket listening on [AF_INET]
+import time
+import sys
+def write_output():
+ for line in fakeoutput.split('\n'):
+ sys.stdout.write(line + '\n')
+ sys.stdout.flush()
+ #print(line)
+ time.sleep(0.1)
+if __name__ == "__main__":
+ write_output()
+import manager
+from mock import Mock
+from eip_client.vpnmanager import OpenVPNManager
+vpn_commands = {
+ 'status': [
+ 'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012',
+ 'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102',
+ 'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329',
+ 'Auth read bytes,872102'],
+ 'state': ['1340616463,CONNECTED,SUCCESS,,'],
+ # XXX add more tests
+ }
+def get_openvpn_manager_mocks():
+ manager = OpenVPNManager()
+ manager.status = Mock(return_value='\n'.join(vpn_commands['status']))
+ manager.state = Mock(return_value=vpn_commands['state'][0])
+ return manager
+# the problem of watching a stdout pipe from
+# openvpn binary: using subprocess and coroutines
+# acting as event consumers
+from __future__ import division, print_function
+from subprocess import PIPE, Popen
+import sys
+from threading import Thread
+ON_POSIX = 'posix' in sys.builtin_module_names
+# Coroutines goodies
+def coroutine(func):
+ def start(*args, **kwargs):
+ cr = func(*args, **kwargs)
+ return cr
+ return start
+def process_events(callback):
+ """
+ coroutine loop that receives
+ events sent and dispatch the callback.
+ :param callback: callback to be called\
+for each event
+ :type callback: callable
+ """
+ try:
+ while True:
+ m = (yield)
+ if callable(callback):
+ callback(m)
+ else:
+ #XXX log instead
+ print('not a callable passed')
+ except GeneratorExit:
+ return
+# Threads
+def launch_thread(target, args):
+ """
+ launch and demonize thread.
+ :param target: target function that will run in thread
+ :type target: function
+ :param args: args to be passed to thread
+ :type args: list
+ """
+ t = Thread(target=target,
+ args=args)
+ t.daemon = True
+ t.start()
+ return t
+def watch_output(out, observers):
+ """
+ initializes dict of observer coroutines
+ and pushes lines to each of them as they are received
+ from the watched output.
+ :param out: stdout of a process.
+ :type out: fd
+ :param observers: tuple of coroutines to send data\
+for each event
+ :type ovservers: tuple
+ """
+ observer_dict = {observer: process_events(observer)
+ for observer in observers}
+ for line in iter(out.readline, b''):
+ for obs in observer_dict:
+ observer_dict[obs].send(line)
+ out.close()
+def spawn_and_watch_process(command, args, observers=None):
+ """
+ spawns a subprocess with command, args, and launch
+ a watcher thread.
+ :param command: command to be executed in the subprocess
+ :type command: str
+ :param args: arguments
+ :type args: list
+ :param observers: tuple of observer functions to be called \
+for each line in the subprocess output.
+ :type observers: tuple
+ :return: a tuple containing the child process instance, and watcher_thread,
+ :rtype: (Subprocess, Thread)
+ """
+ subp = Popen([command] + args,
+ stdout=PIPE,
+ stderr=PIPE,
+ bufsize=1,
+ close_fds=ON_POSIX)
+ watcher = launch_thread(
+ watch_output,
+ (subp.stdout, observers))
+ return subp, watcher
+import argparse
+def build_parser():
+ epilog = "Copyright 2012 The Leap Project"
+ parser = argparse.ArgumentParser(description="""
+Launches main LEAP Client""", epilog=epilog)
+ parser.add_argument('--debug', action="store_true",
+ help='launches in debug mode')
+ parser.add_argument('--config', metavar="CONFIG FILE", nargs='?',
+ action="store", dest="config_file",
+ type=argparse.FileType('r'),
+ help='optional config file')
+ return parser
+def init_leapc_args():
+ parser = build_parser()
+ opts = parser.parse_args()
+ return parser, opts
+# XXX put here a sample BaseEIPTestCase
+# code borrowed from python stdlib tests
+# I think we're not using it at the end...
+# XXX Review and Remove
+import contextlib
+import socket
+import sys
+import unittest
+HOST = "localhost"
+class TestFailed(Exception):
+ """Test failed."""
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
+def _run_suite(suite):
+ """Run tests from a unittest.TestSuite-derived class."""
+ runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
+ failfast=False)
+ result =
+ if not result.wasSuccessful():
+ if len(result.errors) == 1 and not result.failures:
+ err = result.errors[0][1]
+ elif len(result.failures) == 1 and not result.errors:
+ err = result.failures[0][1]
+ else:
+ err = "multiple errors occurred"
+ raise TestFailed(err)
+def run_unittest(*classes):
+ """Run tests from unittest.TestCase-derived classes."""
+ valid_types = (unittest.TestSuite, unittest.TestCase)
+ suite = unittest.TestSuite()
+ for cls in classes:
+ if isinstance(cls, str):
+ if cls in sys.modules:
+ suite.addTest(unittest.findTestCases(sys.modules[cls]))
+ else:
+ raise ValueError("str arguments must be keys in sys.modules")
+ elif isinstance(cls, valid_types):
+ suite.addTest(cls)
+ else:
+ suite.addTest(unittest.makeSuite(cls))
+ _run_suite(suite)
+def captured_output(stream_name):
+ """Return a context manager used by captured_stdout/stdin/stderr
+ that temporarily replaces the sys stream *stream_name* with a StringIO."""
+ import io
+ orig_stdout = getattr(sys, stream_name)
+ setattr(sys, stream_name, io.StringIO())
+ try:
+ yield getattr(sys, stream_name)
+ finally:
+ setattr(sys, stream_name, orig_stdout)
+def captured_stdout():
+ """Capture the output of sys.stdout:
+ with captured_stdout() as s:
+ print("hello")
+ self.assertEqual(s.getvalue(), "hello")
+ """
+ return captured_output("stdout")
+def captured_stderr():
+ return captured_output("stderr")
+def captured_stdin():
+ return captured_output("stdin")
+"""Supporting definitions for the Python regression tests."""
+if __name__ != '':
+ raise ImportError('support must be imported from the test package')
+import contextlib
+import errno
+import functools
+import gc
+import socket
+import sys
+import os
+import platform
+import shutil
+import warnings
+import unittest
+import importlib
+import re
+import subprocess
+import imp
+import time
+import sysconfig
+import fnmatch
+import logging.handlers
+import struct
+ import _thread, threading
+except ImportError:
+ _thread = None
+ threading = None
+ import multiprocessing.process
+except ImportError:
+ multiprocessing = None
+ import faulthandler
+except ImportError:
+ faulthandler = None
+ import zlib
+except ImportError:
+ zlib = None
+__all__ = [
+ "Error", "TestFailed", "ResourceDenied", "import_module",
+ "verbose", "use_resources", "max_memuse", "record_original_stdout",
+ "get_original_stdout", "unload", "unlink", "rmtree", "forget",
+ "is_resource_enabled", "requires", "requires_freebsd_version",
+ "requires_linux_version", "requires_mac_ver", "find_unused_port", "bind_port",
+ "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd",
+ "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource",
+ "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource",
+ "captured_stdout", "captured_stdin", "captured_stderr", "time_out",
+ "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask',
+ "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest",
+ "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
+ "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail",
+ "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
+ "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
+ "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
+ "anticipate_failure"
+ ]
+class Error(Exception):
+ """Base class for regression test exceptions."""
+class TestFailed(Error):
+ """Test failed."""
+class ResourceDenied(unittest.SkipTest):
+ """Test skipped because it requested a disallowed resource.
+ This is raised when a test calls requires() for a resource that
+ has not be enabled. It is used to distinguish between expected
+ and unexpected skips.
+ """
+def _ignore_deprecated_imports(ignore=True):
+ """Context manager to suppress package and module deprecation
+ warnings when importing them.
+ If ignore is False, this context manager has no effect."""
+ if ignore:
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", ".+ (module|package)",
+ DeprecationWarning)
+ yield
+ else:
+ yield
+def import_module(name, deprecated=False):
+ """Import and return the module to be tested, raising SkipTest if
+ it is not available.
+ If deprecated is True, any module or package deprecation messages
+ will be suppressed."""
+ with _ignore_deprecated_imports(deprecated):
+ try:
+ return importlib.import_module(name)
+ except ImportError as msg:
+ raise unittest.SkipTest(str(msg))
+def _save_and_remove_module(name, orig_modules):
+ """Helper function to save and remove a module from sys.modules
+ Raise ImportError if the module can't be imported."""
+ # try to import the module and raise an error if it can't be imported
+ if name not in sys.modules:
+ __import__(name)
+ del sys.modules[name]
+ for modname in list(sys.modules):
+ if modname == name or modname.startswith(name + '.'):
+ orig_modules[modname] = sys.modules[modname]
+ del sys.modules[modname]
+def _save_and_block_module(name, orig_modules):
+ """Helper function to save and block a module in sys.modules
+ Return True if the module was in sys.modules, False otherwise."""
+ saved = True
+ try:
+ orig_modules[name] = sys.modules[name]
+ except KeyError:
+ saved = False
+ sys.modules[name] = None
+ return saved
+def anticipate_failure(condition):
+ """Decorator to mark a test that is known to be broken in some cases
+ Any use of this decorator should have a comment identifying the
+ associated tracker issue.
+ """
+ if condition:
+ return unittest.expectedFailure
+ return lambda f: f
+def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
+ """Imports and returns a module, deliberately bypassing the sys.modules cache
+ and importing a fresh copy of the module. Once the import is complete,
+ the sys.modules cache is restored to its original state.
+ Modules named in fresh are also imported anew if needed by the import.
+ If one of these modules can't be imported, None is returned.
+ Importing of modules named in blocked is prevented while the fresh import
+ takes place.
+ If deprecated is True, any module or package deprecation messages
+ will be suppressed."""
+ # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
+ # to make sure that this utility function is working as expected
+ with _ignore_deprecated_imports(deprecated):
+ # Keep track of modules saved for later restoration as well
+ # as those which just need a blocking entry removed
+ orig_modules = {}
+ names_to_remove = []
+ _save_and_remove_module(name, orig_modules)
+ try:
+ for fresh_name in fresh:
+ _save_and_remove_module(fresh_name, orig_modules)
+ for blocked_name in blocked:
+ if not _save_and_block_module(blocked_name, orig_modules):
+ names_to_remove.append(blocked_name)
+ fresh_module = importlib.import_module(name)
+ except ImportError:
+ fresh_module = None
+ finally:
+ for orig_name, module in orig_modules.items():
+ sys.modules[orig_name] = module
+ for name_to_remove in names_to_remove:
+ del sys.modules[name_to_remove]
+ return fresh_module
+def get_attribute(obj, name):
+ """Get an attribute, raising SkipTest if AttributeError is raised."""
+ try:
+ attribute = getattr(obj, name)
+ except AttributeError:
+ raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
+ else:
+ return attribute
+verbose = 1 # Flag set to 0 by
+use_resources = None # Flag set to [] by
+max_memuse = 0 # Disable bigmem tests (they will still be run with
+ # small sizes, to make sure they work.)
+real_max_memuse = 0
+failfast = False
+match_tests = None
+# _original_stdout is meant to hold stdout at the time regrtest began.
+# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
+# The point is to have some flavor of stdout the user can actually see.
+_original_stdout = None
+def record_original_stdout(stdout):
+ global _original_stdout
+ _original_stdout = stdout
+def get_original_stdout():
+ return _original_stdout or sys.stdout
+def unload(name):
+ try:
+ del sys.modules[name]
+ except KeyError:
+ pass
+def unlink(filename):
+ try:
+ os.unlink(filename)
+ except OSError as error:
+ # The filename need not exist.
+ if error.errno not in (errno.ENOENT, errno.ENOTDIR):
+ raise
+def rmtree(path):
+ try:
+ shutil.rmtree(path)
+ except OSError as error:
+ if error.errno != errno.ENOENT:
+ raise
+def make_legacy_pyc(source):
+ """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
+ The choice of .pyc or .pyo extension is done based on the __debug__ flag
+ value.
+ :param source: The file system path to the source file. The source file
+ does not need to exist, however the PEP 3147 pyc file must exist.
+ :return: The file system path to the legacy pyc file.
+ """
+ pyc_file = imp.cache_from_source(source)
+ up_one = os.path.dirname(os.path.abspath(source))
+ legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
+ os.rename(pyc_file, legacy_pyc)
+ return legacy_pyc
+def forget(modname):
+ """'Forget' a module was ever imported.
+ This removes the module from sys.modules and deletes any PEP 3147 or
+ legacy .pyc and .pyo files.
+ """
+ unload(modname)
+ for dirname in sys.path:
+ source = os.path.join(dirname, modname + '.py')
+ # It doesn't matter if they exist or not, unlink all possible
+ # combinations of PEP 3147 and legacy pyc and pyo files.
+ unlink(source + 'c')
+ unlink(source + 'o')
+ unlink(imp.cache_from_source(source, debug_override=True))
+ unlink(imp.cache_from_source(source, debug_override=False))
+# On some platforms, should not run gui test even if it is allowed
+# in `use_resources'.
+if sys.platform.startswith('win'):
+ import ctypes
+ import ctypes.wintypes
+ def _is_gui_available():
+ WSF_VISIBLE = 0x0001
+ class USEROBJECTFLAGS(ctypes.Structure):
+ _fields_ = [("fInherit", ctypes.wintypes.BOOL),
+ ("fReserved", ctypes.wintypes.BOOL),
+ ("dwFlags", ctypes.wintypes.DWORD)]
+ dll = ctypes.windll.user32
+ h = dll.GetProcessWindowStation()
+ if not h:
+ raise ctypes.WinError()
+ needed = ctypes.wintypes.DWORD()
+ res = dll.GetUserObjectInformationW(h,
+ ctypes.byref(uof),
+ ctypes.sizeof(uof),
+ ctypes.byref(needed))
+ if not res:
+ raise ctypes.WinError()
+ return bool(uof.dwFlags & WSF_VISIBLE)
+ def _is_gui_available():
+ return True
+def is_resource_enabled(resource):
+ """Test whether a resource is enabled. Known resources are set by
+ return use_resources is not None and resource in use_resources
+def requires(resource, msg=None):
+ """Raise ResourceDenied if the specified resource is not available.
+ If the caller's module is __main__ then automatically return True. The
+ possibility of False being returned occurs when is
+ executing.
+ """
+ if resource == 'gui' and not _is_gui_available():
+ raise unittest.SkipTest("Cannot use the 'gui' resource")
+ # see if the caller's module is __main__ - if so, treat as if
+ # the resource was set
+ if sys._getframe(1).f_globals.get("__name__") == "__main__":
+ return
+ if not is_resource_enabled(resource):
+ if msg is None:
+ msg = "Use of the %r resource not enabled" % resource
+ raise ResourceDenied(msg)
+def _requires_unix_version(sysname, min_version):
+ """Decorator raising SkipTest if the OS is `sysname` and the version is less
+ than `min_version`.
+ For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
+ the FreeBSD version is less than 7.2.
+ """
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kw):
+ if platform.system() == sysname:
+ version_txt = platform.release().split('-', 1)[0]
+ try:
+ version = tuple(map(int, version_txt.split('.')))
+ except ValueError:
+ pass
+ else:
+ if version < min_version:
+ min_version_txt = '.'.join(map(str, min_version))
+ raise unittest.SkipTest(
+ "%s version %s or higher required, not %s"
+ % (sysname, min_version_txt, version_txt))
+ return wrapper
+ return decorator
+def requires_freebsd_version(*min_version):
+ """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
+ less than `min_version`.
+ For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
+ version is less than 7.2.
+ """
+ return _requires_unix_version('FreeBSD', min_version)
+def requires_linux_version(*min_version):
+ """Decorator raising SkipTest if the OS is Linux and the Linux version is
+ less than `min_version`.
+ For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
+ version is less than 2.6.32.
+ """
+ return _requires_unix_version('Linux', min_version)
+def requires_mac_ver(*min_version):
+ """Decorator raising SkipTest if the OS is Mac OS X and the OS X
+ version if less than min_version.
+ For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
+ is lesser than 10.5.
+ """
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kw):
+ if sys.platform == 'darwin':
+ version_txt = platform.mac_ver()[0]
+ try:
+ version = tuple(map(int, version_txt.split('.')))
+ except ValueError:
+ pass
+ else:
+ if version < min_version:
+ min_version_txt = '.'.join(map(str, min_version))
+ raise unittest.SkipTest(
+ "Mac OS X %s or higher required, not %s"
+ % (min_version_txt, version_txt))
+ return func(*args, **kw)
+ wrapper.min_version = min_version
+ return wrapper
+ return decorator
+HOST = 'localhost'
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it.
+ """
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
+def _is_ipv6_enabled():
+ """Check whether IPv6 is enabled on this host."""
+ if socket.has_ipv6:
+ try:
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.bind(('::1', 0))
+ except (socket.error, socket.gaierror):
+ pass
+ else:
+ sock.close()
+ return True
+ return False
+IPV6_ENABLED = _is_ipv6_enabled()
+# A constant likely larger than the underlying OS pipe buffer size.
+# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe
+# buffer size: take 1M to be sure.
+PIPE_MAX_SIZE = 1024 * 1024
+# decorator for skipping tests on non-IEEE 754 platforms
+requires_IEEE_754 = unittest.skipUnless(
+ float.__getformat__("double").startswith("IEEE"),
+ "test requires IEEE 754 doubles")
+requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
+is_jython = sys.platform.startswith('java')
+# Filename used for testing
+if == 'java':
+ # Jython disallows @ in module names
+ TESTFN = '$test'
+ TESTFN = '@test'
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+# TESTFN_UNICODE is a non-ascii filename
+TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
+if sys.platform == 'darwin':
+ # In Mac OS X's VFS API file names are, by definition, canonically
+ # decomposed Unicode, encoded using UTF-8. See QA1173:
+ #
+ import unicodedata
+ TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
+TESTFN_ENCODING = sys.getfilesystemencoding()
+# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
+# encoded by the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename.
+if in ('nt', 'ce'):
+ # skip win32s (0) or Windows 9x/ME (1)
+ if sys.getwindowsversion().platform >= 2:
+ # Different kinds of characters from various languages to minimize the
+ # probability that the whole name is encodable to MBCS (issue #9819)
+ TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
+ try:
+ except UnicodeEncodeError:
+ pass
+ else:
+ print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
+ 'Unicode filename tests may not be effective'
+# Mac OS X denies unencodable filenames (invalid utf-8)
+elif sys.platform != 'darwin':
+ try:
+ # ascii and utf-8 cannot encode the byte 0xff
+ b'\xff'.decode(TESTFN_ENCODING)
+ except UnicodeDecodeError:
+ # 0xff will be encoded using the surrogate character u+DCFF
+ + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
+ else:
+ # File system encoding (eg. ISO-8859-* encodings) can encode
+ # the byte 0xff. Skip some unicode filename tests.
+ pass
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+def temp_cwd(name='tempcwd', quiet=False, path=None):
+ """
+ Context manager that temporarily changes the CWD.
+ An existing path may be provided as *path*, in which case this
+ function makes no changes to the file system.
+ Otherwise, the new CWD is created in the current directory and it's
+ named *name*. If *quiet* is False (default) and it's not possible to
+ create or change the CWD, an error is raised. If it's True, only a
+ warning is raised and the original CWD is used.
+ """
+ saved_dir = os.getcwd()
+ is_temporary = False
+ if path is None:
+ path = name
+ try:
+ os.mkdir(name)
+ is_temporary = True
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to create temp CWD ' + name,
+ RuntimeWarning, stacklevel=3)
+ try:
+ os.chdir(path)
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to change the CWD to ' + name,
+ RuntimeWarning, stacklevel=3)
+ try:
+ yield os.getcwd()
+ finally:
+ os.chdir(saved_dir)
+ if is_temporary:
+ rmtree(name)
+if hasattr(os, "umask"):
+ @contextlib.contextmanager
+ def temp_umask(umask):
+ """Context manager that temporarily sets the process umask."""
+ oldmask = os.umask(umask)
+ try:
+ yield
+ finally:
+ os.umask(oldmask)
+def findfile(file, here=__file__, subdir=None):
+ """Try to find a file on sys.path and the working directory. If it is not
+ found the argument passed to the function is returned (this does not
+ necessarily signal failure; could still be the legitimate path)."""
+ if os.path.isabs(file):
+ return file
+ if subdir is not None:
+ file = os.path.join(subdir, file)
+ path = sys.path
+ path = [os.path.dirname(here)] + path
+ for dn in path:
+ fn = os.path.join(dn, file)
+ if os.path.exists(fn): return fn
+ return file
+def create_empty_file(filename):
+ """Create an empty file. If the file already exists, truncate it."""
+ fd =, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
+ os.close(fd)
+def sortdict(dict):
+ "Like repr(dict), but in sorted order."
+ items = sorted(dict.items())
+ reprpairs = ["%r: %r" % pair for pair in items]
+ withcommas = ", ".join(reprpairs)
+ return "{%s}" % withcommas
+def make_bad_fd():
+ """
+ Create an invalid file descriptor by opening and closing a file and return
+ its fd.
+ """
+ file = open(TESTFN, "wb")
+ try:
+ return file.fileno()
+ finally:
+ file.close()
+ unlink(TESTFN)
+def check_syntax_error(testcase, statement):
+ testcase.assertRaises(SyntaxError, compile, statement,
+ '<test string>', 'exec')
+def open_urlresource(url, *args, **kw):
+ import urllib.request, urllib.parse
+ check = kw.pop('check', None)
+ filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
+ fn = os.path.join(os.path.dirname(__file__), "data", filename)
+ def check_valid_file(fn):
+ f = open(fn, *args, **kw)
+ if check is None:
+ return f
+ elif check(f):
+ return f
+ f.close()
+ if os.path.exists(fn):
+ f = check_valid_file(fn)
+ if f is not None:
+ return f
+ unlink(fn)
+ # Verify the requirement before downloading the file
+ requires('urlfetch')
+ print('\tfetching %s ...' % url, file=get_original_stdout())
+ f = urllib.request.urlopen(url, timeout=15)
+ try:
+ with open(fn, "wb") as out:
+ s =
+ while s:
+ out.write(s)
+ s =
+ finally:
+ f.close()
+ f = check_valid_file(fn)
+ if f is not None:
+ return f
+ raise TestFailed('invalid resource %r' % fn)
+class WarningsRecorder(object):
+ """Convenience wrapper for the warnings list returned on
+ entry to the warnings.catch_warnings() context manager.
+ """
+ def __init__(self, warnings_list):
+ self._warnings = warnings_list
+ self._last = 0
+ def __getattr__(self, attr):
+ if len(self._warnings) > self._last:
+ return getattr(self._warnings[-1], attr)
+ elif attr in warnings.WarningMessage._WARNING_DETAILS:
+ return None
+ raise AttributeError("%r has no attribute %r" % (self, attr))
+ @property
+ def warnings(self):
+ return self._warnings[self._last:]
+ def reset(self):
+ self._last = len(self._warnings)
+def _filterwarnings(filters, quiet=False):
+ """Catch the warnings, then check if all the expected
+ warnings have been raised and re-raise unexpected warnings.
+ If 'quiet' is True, only re-raise the unexpected warnings.
+ """
+ # Clear the warning registry of the calling module
+ # in order to re-raise the warnings.
+ frame = sys._getframe(2)
+ registry = frame.f_globals.get('__warningregistry__')
+ if registry:
+ registry.clear()
+ with warnings.catch_warnings(record=True) as w:
+ # Set filter "always" to record all warnings. Because
+ # test_warnings swap the module, we need to look up in
+ # the sys.modules dictionary.
+ sys.modules['warnings'].simplefilter("always")
+ yield WarningsRecorder(w)
+ # Filter the recorded warnings
+ reraise = list(w)
+ missing = []
+ for msg, cat in filters:
+ seen = False
+ for w in reraise[:]:
+ warning = w.message
+ # Filter out the matching messages
+ if (re.match(msg, str(warning), re.I) and
+ issubclass(warning.__class__, cat)):
+ seen = True
+ reraise.remove(w)
+ if not seen and not quiet:
+ # This filter caught nothing
+ missing.append((msg, cat.__name__))
+ if reraise:
+ raise AssertionError("unhandled warning %s" % reraise[0])
+ if missing:
+ raise AssertionError("filter (%r, %s) did not catch any warning" %
+ missing[0])
+def check_warnings(*filters, **kwargs):
+ """Context manager to silence warnings.
+ Accept 2-tuples as positional arguments:
+ ("message regexp", WarningCategory)
+ Optional argument:
+ - if 'quiet' is True, it does not fail if a filter catches nothing
+ (default True without argument,
+ default False if some filters are defined)
+ Without argument, it defaults to:
+ check_warnings(("", Warning), quiet=True)
+ """
+ quiet = kwargs.get('quiet')
+ if not filters:
+ filters = (("", Warning),)
+ # Preserve backward compatibility
+ if quiet is None:
+ quiet = True
+ return _filterwarnings(filters, quiet)
+class CleanImport(object):
+ """Context manager to force import to return a new module reference.
+ This is useful for testing module-level behaviours, such as
+ the emission of a DeprecationWarning on import.
+ Use like this:
+ with CleanImport("foo"):
+ importlib.import_module("foo") # new reference
+ """
+ def __init__(self, *module_names):
+ self.original_modules = sys.modules.copy()
+ for module_name in module_names:
+ if module_name in sys.modules:
+ module = sys.modules[module_name]
+ # It is possible that module_name is just an alias for
+ # another module (e.g. stub for modules renamed in 3.x).
+ # In that case, we also need delete the real module to clear
+ # the import cache.
+ if module.__name__ != module_name:
+ del sys.modules[module.__name__]
+ del sys.modules[module_name]
+ def __enter__(self):
+ return self
+ def __exit__(self, *ignore_exc):
+ sys.modules.update(self.original_modules)
+class EnvironmentVarGuard(
+ """Class to help protect the environment variable properly. Can be used as
+ a context manager."""
+ def __init__(self):
+ self._environ = os.environ
+ self._changed = {}
+ def __getitem__(self, envvar):
+ return self._environ[envvar]
+ def __setitem__(self, envvar, value):
+ # Remember the initial value on the first access
+ if envvar not in self._changed:
+ self._changed[envvar] = self._environ.get(envvar)
+ self._environ[envvar] = value
+ def __delitem__(self, envvar):
+ # Remember the initial value on the first access
+ if envvar not in self._changed:
+ self._changed[envvar] = self._environ.get(envvar)
+ if envvar in self._environ:
+ del self._environ[envvar]
+ def keys(self):
+ return self._environ.keys()
+ def __iter__(self):
+ return iter(self._environ)
+ def __len__(self):
+ return len(self._environ)
+ def set(self, envvar, value):
+ self[envvar] = value
+ def unset(self, envvar):
+ del self[envvar]
+ def __enter__(self):
+ return self
+ def __exit__(self, *ignore_exc):
+ for (k, v) in self._changed.items():
+ if v is None:
+ if k in self._environ:
+ del self._environ[k]
+ else:
+ self._environ[k] = v
+ os.environ = self._environ
+class DirsOnSysPath(object):
+ """Context manager to temporarily add directories to sys.path.
+ This makes a copy of sys.path, appends any directories given
+ as positional arguments, then reverts sys.path to the copied
+ settings when the context ends.
+ Note that *all* sys.path modifications in the body of the
+ context manager, including replacement of the object,
+ will be reverted at the end of the block.
+ """
+ def __init__(self, *paths):
+ self.original_value = sys.path[:]
+ self.original_object = sys.path
+ sys.path.extend(paths)
+ def __enter__(self):
+ return self
+ def __exit__(self, *ignore_exc):
+ sys.path = self.original_object
+ sys.path[:] = self.original_value
+class TransientResource(object):
+ """Raise ResourceDenied if an exception is raised while the context manager
+ is in effect that matches the specified exception and attributes."""
+ def __init__(self, exc, **kwargs):
+ self.exc = exc
+ self.attrs = kwargs
+ def __enter__(self):
+ return self
+ def __exit__(self, type_=None, value=None, traceback=None):
+ """If type_ is a subclass of self.exc and value has attributes matching
+ self.attrs, raise ResourceDenied. Otherwise let the exception
+ propagate (if any)."""
+ if type_ is not None and issubclass(self.exc, type_):
+ for attr, attr_value in self.attrs.items():
+ if not hasattr(value, attr):
+ break
+ if getattr(value, attr) != attr_value:
+ break
+ else:
+ raise ResourceDenied("an optional resource is not available")
+# Context managers that raise ResourceDenied when various issues
+# with the Internet connection manifest themselves as exceptions.
+# XXX deprecate these and use transient_internet() instead
+time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
+socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
+ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
+def transient_internet(resource_name, *, timeout=30.0, errnos=()):
+ """Return a context manager that raises ResourceDenied when various issues
+ with the Internet connection manifest themselves as exceptions."""
+ default_errnos = [
+ ('ECONNREFUSED', 111),
+ ('ECONNRESET', 104),
+ ('EHOSTUNREACH', 113),
+ ('ENETUNREACH', 101),
+ ('ETIMEDOUT', 110),
+ ]
+ default_gai_errnos = [
+ ('EAI_AGAIN', -3),
+ ('EAI_FAIL', -4),
+ ('EAI_NONAME', -2),
+ ('EAI_NODATA', -5),
+ # Encountered when trying to resolve IPv6-only hostnames
+ ('WSANO_DATA', 11004),
+ ]
+ denied = ResourceDenied("Resource %r is not available" % resource_name)
+ captured_errnos = errnos
+ gai_errnos = []
+ if not captured_errnos:
+ captured_errnos = [getattr(errno, name, num)
+ for (name, num) in default_errnos]
+ gai_errnos = [getattr(socket, name, num)
+ for (name, num) in default_gai_errnos]
+ def filter_error(err):
+ n = getattr(err, 'errno', None)
+ if (isinstance(err, socket.timeout) or
+ (isinstance(err, socket.gaierror) and n in gai_errnos) or
+ n in captured_errnos):
+ if not verbose:
+ sys.stderr.write(denied.args[0] + "\n")
+ raise denied from err
+ old_timeout = socket.getdefaulttimeout()
+ try:
+ if timeout is not None:
+ socket.setdefaulttimeout(timeout)
+ yield
+ except IOError as err:
+ # urllib can wrap original socket errors multiple times (!), we must
+ # unwrap to get at the original error.
+ while True:
+ a = err.args
+ if len(a) >= 1 and isinstance(a[0], IOError):
+ err = a[0]
+ # The error can also be wrapped as args[1]:
+ # except socket.error as msg:
+ # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
+ elif len(a) >= 2 and isinstance(a[1], IOError):
+ err = a[1]
+ else:
+ break
+ filter_error(err)
+ raise
+ # XXX should we catch generic exceptions and look for their
+ # __cause__ or __context__?
+ finally:
+ socket.setdefaulttimeout(old_timeout)
+def captured_output(stream_name):
+ """Return a context manager used by captured_stdout/stdin/stderr
+ that temporarily replaces the sys stream *stream_name* with a StringIO."""
+ import io
+ orig_stdout = getattr(sys, stream_name)
+ setattr(sys, stream_name, io.StringIO())
+ try:
+ yield getattr(sys, stream_name)
+ finally:
+ setattr(sys, stream_name, orig_stdout)
+def captured_stdout():
+ """Capture the output of sys.stdout:
+ with captured_stdout() as s:
+ print("hello")
+ self.assertEqual(s.getvalue(), "hello")
+ """
+ return captured_output("stdout")
+def captured_stderr():
+ return captured_output("stderr")
+def captured_stdin():
+ return captured_output("stdin")
+def gc_collect():
+ """Force as many objects as possible to be collected.
+ In non-CPython implementations of Python, this is needed because timely
+ deallocation is not guaranteed by the garbage collector. (Even in CPython
+ this can be the case in case of reference cycles.) This means that __del__
+ methods may be called later than expected and weakrefs may remain alive for
+ longer than expected. This function tries its best to force all garbage
+ objects to disappear.
+ """
+ gc.collect()
+ if is_jython:
+ time.sleep(0.1)
+ gc.collect()
+ gc.collect()
+def disable_gc():
+ have_gc = gc.isenabled()
+ gc.disable()
+ try:
+ yield
+ finally:
+ if have_gc:
+ gc.enable()
+def python_is_optimized():
+ """Find if Python was built with optimizations."""
+ cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
+ final_opt = ""
+ for opt in cflags.split():
+ if opt.startswith('-O'):
+ final_opt = opt
+ return final_opt != '' and final_opt != '-O0'
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+def run_with_locale(catstr, *locales):
+ def decorator(func):
+ def inner(*args, **kwds):
+ try:
+ import locale
+ category = getattr(locale, catstr)
+ orig_locale = locale.setlocale(category)
+ except AttributeError:
+ # if the test author gives us an invalid category string
+ raise
+ except:
+ # cannot retrieve original locale, so do nothing
+ locale = orig_locale = None
+ else:
+ for loc in locales:
+ try:
+ locale.setlocale(category, loc)
+ break
+ except:
+ pass
+ # now run the function, resetting the locale on exceptions
+ try:
+ return func(*args, **kwds)
+ finally:
+ if locale and orig_locale:
+ locale.setlocale(category, orig_locale)
+ inner.__name__ = func.__name__
+ inner.__doc__ = func.__doc__
+ return inner
+ return decorator
+# Big-memory-test support. Separate from 'resources' because memory use
+# should be configurable.
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+_4G = 4 * _1G
+MAX_Py_ssize_t = sys.maxsize
+def set_memlimit(limit):
+ global max_memuse
+ global real_max_memuse
+ sizes = {
+ 'k': 1024,
+ 'm': _1M,
+ 'g': _1G,
+ 't': 1024*_1G,
+ }
+ m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+ if m is None:
+ raise ValueError('Invalid memory limit %r' % (limit,))
+ memlimit = int(float( * sizes[])
+ real_max_memuse = memlimit
+ if memlimit > MAX_Py_ssize_t:
+ memlimit = MAX_Py_ssize_t
+ if memlimit < _2G - 1:
+ raise ValueError('Memory limit %r too low to be useful' % (limit,))
+ max_memuse = memlimit
+class _MemoryWatchdog:
+ """An object which periodically watches the process' memory consumption
+ and prints it out.
+ """
+ def __init__(self):
+ self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
+ self.started = False
+ self.thread = None
+ try:
+ self.page_size = os.sysconf('SC_PAGESIZE')
+ except (ValueError, AttributeError):
+ try:
+ self.page_size = os.sysconf('SC_PAGE_SIZE')
+ except (ValueError, AttributeError):
+ self.page_size = 4096
+ def consumer(self, fd):
+ HEADER = "l"
+ header_size = struct.calcsize(HEADER)
+ try:
+ while True:
+ header =, header_size)
+ if len(header) < header_size:
+ # Pipe closed on other end
+ break
+ data_len, = struct.unpack(HEADER, header)
+ data =, data_len)
+ statm = data.decode('ascii')
+ data = int(statm.split()[5])
+ print(" ... process data size: {data:.1f}G"
+ .format(data=data * self.page_size / (1024 ** 3)))
+ finally:
+ os.close(fd)
+ def start(self):
+ if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
+ return
+ try:
+ rfd =, os.O_RDONLY)
+ except OSError as e:
+ warnings.warn('/proc not available for stats: {}'.format(e),
+ RuntimeWarning)
+ sys.stderr.flush()
+ return
+ pipe_fd, wfd = os.pipe()
+ # _file_watchdog() doesn't take the GIL in its child thread, and
+ # therefore collects statistics timely
+ faulthandler._file_watchdog(rfd, wfd, 1.0)
+ self.started = True
+ self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
+ self.thread.daemon = True
+ self.thread.start()
+ def stop(self):
+ if not self.started:
+ return
+ faulthandler._cancel_file_watchdog()
+ self.thread.join()
+def bigmemtest(size, memuse, dry_run=True):
+ """Decorator for bigmem tests.
+ 'minsize' is the minimum useful size for the test (in arbitrary,
+ test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+ the test, or a good estimate of it.
+ if 'dry_run' is False, it means the test doesn't support dummy runs
+ when -M is not specified.
+ """
+ def decorator(f):
+ def wrapper(self):
+ size = wrapper.size
+ memuse = wrapper.memuse
+ if not real_max_memuse:
+ maxsize = 5147
+ else:
+ maxsize = size
+ if ((real_max_memuse or not dry_run)
+ and real_max_memuse < maxsize * memuse):
+ raise unittest.SkipTest(
+ "not enough memory: %.1fG minimum needed"
+ % (size * memuse / (1024 ** 3)))
+ if real_max_memuse and verbose and faulthandler and threading:
+ print()
+ print(" ... expected peak memory use: {peak:.1f}G"
+ .format(peak=size * memuse / (1024 ** 3)))
+ watchdog = _MemoryWatchdog()
+ watchdog.start()
+ else:
+ watchdog = None
+ try:
+ return f(self, maxsize)
+ finally:
+ if watchdog:
+ watchdog.stop()
+ wrapper.size = size
+ wrapper.memuse = memuse
+ return wrapper
+ return decorator
+def bigaddrspacetest(f):
+ """Decorator for tests that fill the address space."""
+ def wrapper(self):
+ if max_memuse < MAX_Py_ssize_t:
+ if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
+ raise unittest.SkipTest(
+ "not enough memory: try a 32-bit build instead")
+ else:
+ raise unittest.SkipTest(
+ "not enough memory: %.1fG minimum needed"
+ % (MAX_Py_ssize_t / (1024 ** 3)))
+ else:
+ return f(self)
+ return wrapper
+# unittest integration.
+class BasicTestRunner:
+ def run(self, test):
+ result = unittest.TestResult()
+ test(result)
+ return result
+def _id(obj):
+ return obj
+def requires_resource(resource):
+ if resource == 'gui' and not _is_gui_available():
+ return unittest.skip("resource 'gui' is not available")
+ if is_resource_enabled(resource):
+ return _id
+ else:
+ return unittest.skip("resource {0!r} is not enabled".format(resource))
+def cpython_only(test):
+ """
+ Decorator for tests only applicable on CPython.
+ """
+ return impl_detail(cpython=True)(test)
+def impl_detail(msg=None, **guards):
+ if check_impl_detail(**guards):
+ return _id
+ if msg is None:
+ guardnames, default = _parse_guards(guards)
+ if default:
+ msg = "implementation detail not available on {0}"
+ else:
+ msg = "implementation detail specific to {0}"
+ guardnames = sorted(guardnames.keys())
+ msg = msg.format(' or '.join(guardnames))
+ return unittest.skip(msg)
+def _parse_guards(guards):
+ # Returns a tuple ({platform_name: run_me}, default_value)
+ if not guards:
+ return ({'cpython': True}, False)
+ is_true = list(guards.values())[0]
+ assert list(guards.values()) == [is_true] * len(guards) # all True or all False
+ return (guards, not is_true)
+# Use the following check to guard CPython's implementation-specific tests --
+# or to run them only on the implementation(s) guarded by the arguments.
+def check_impl_detail(**guards):
+ """This function returns True or False depending on the host platform.
+ Examples:
+ if check_impl_detail(): # only on CPython (default)
+ if check_impl_detail(jython=True): # only on Jython
+ if check_impl_detail(cpython=False): # everywhere except on CPython
+ """
+ guards, default = _parse_guards(guards)
+ return guards.get(platform.python_implementation().lower(), default)
+def no_tracing(func):
+ """Decorator to temporarily turn off tracing for the duration of a test."""
+ if not hasattr(sys, 'gettrace'):
+ return func
+ else:
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ original_trace = sys.gettrace()
+ try:
+ sys.settrace(None)
+ return func(*args, **kwargs)
+ finally:
+ sys.settrace(original_trace)
+ return wrapper
+def refcount_test(test):
+ """Decorator for tests which involve reference counting.
+ To start, the decorator does not run the test if is not run by CPython.
+ After that, any trace function is unset during the test to prevent
+ unexpected refcounts caused by the trace function.
+ """
+ return no_tracing(cpython_only(test))
+def _filter_suite(suite, pred):
+ """Recursively filter test cases in a suite based on a predicate."""
+ newtests = []
+ for test in suite._tests:
+ if isinstance(test, unittest.TestSuite):
+ _filter_suite(test, pred)
+ newtests.append(test)
+ else:
+ if pred(test):
+ newtests.append(test)
+ suite._tests = newtests
+def _run_suite(suite):
+ """Run tests from a unittest.TestSuite-derived class."""
+ if verbose:
+ runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
+ failfast=failfast)
+ else:
+ runner = BasicTestRunner()
+ result =
+ if not result.wasSuccessful():
+ if len(result.errors) == 1 and not result.failures:
+ err = result.errors[0][1]
+ elif len(result.failures) == 1 and not result.errors:
+ err = result.failures[0][1]
+ else:
+ err = "multiple errors occurred"
+ if not verbose: err += "; run in verbose mode for details"
+ raise TestFailed(err)
+def run_unittest(*classes):
+ """Run tests from unittest.TestCase-derived classes."""
+ valid_types = (unittest.TestSuite, unittest.TestCase)
+ suite = unittest.TestSuite()
+ for cls in classes:
+ if isinstance(cls, str):
+ if cls in sys.modules:
+ suite.addTest(unittest.findTestCases(sys.modules[cls]))
+ else:
+ raise ValueError("str arguments must be keys in sys.modules")
+ elif isinstance(cls, valid_types):
+ suite.addTest(cls)
+ else:
+ suite.addTest(unittest.makeSuite(cls))
+ def case_pred(test):
+ if match_tests is None:
+ return True
+ for name in"."):
+ if fnmatch.fnmatchcase(name, match_tests):
+ return True
+ return False
+ _filter_suite(suite, case_pred)
+ _run_suite(suite)
+# doctest driver.
+def run_doctest(module, verbosity=None):
+ """Run doctest on the given module. Return (#failures, #tests).
+ If optional argument verbosity is not specified (or is None), pass
+ support's belief about verbosity on to doctest. Else doctest's
+ usual behavior is used (it searches sys.argv for -v).
+ """
+ import doctest
+ if verbosity is None:
+ verbosity = verbose
+ else:
+ verbosity = None
+ f, t = doctest.testmod(module, verbose=verbosity)
+ if f:
+ raise TestFailed("%d of %d doctests failed" % (f, t))
+ if verbose:
+ print('doctest (%s) ... %d tests with zero failures' %
+ (module.__name__, t))
+ return f, t
+# Support for saving and restoring the imported modules.
+def modules_setup():
+ return sys.modules.copy(),
+def modules_cleanup(oldmodules):
+ # Encoders/decoders are registered permanently within the internal
+ # codec cache. If we destroy the corresponding modules their
+ # globals will be set to None which will trip up the cached functions.
+ encodings = [(k, v) for k, v in sys.modules.items()
+ if k.startswith('encodings.')]
+ sys.modules.clear()
+ sys.modules.update(encodings)
+ # XXX: This kind of problem can affect more than just encodings. In particular
+ # extension modules (such as _ssl) don't cope with reloading properly.
+ # Really, test modules should be cleaning out the test specific modules they
+ # know they added (ala test_runpy) rather than relying on this function (as
+ # test_importhooks and test_pkg do currently).
+ # Implicitly imported *real* modules should be left alone (see issue 10556).
+ sys.modules.update(oldmodules)
+# Threading support to prevent reporting refleaks when running -R
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
+def threading_setup():
+ if _thread:
+ return _thread._count(), threading._dangling.copy()
+ else:
+ return 1, ()
+def threading_cleanup(*original_values):
+ if not _thread:
+ return
+ _MAX_COUNT = 10
+ for count in range(_MAX_COUNT):
+ values = _thread._count(), threading._dangling
+ if values == original_values:
+ break
+ time.sleep(0.1)
+ gc_collect()
+ # XXX print a warning in case of failure?
+def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ if not _thread:
+ return func
+ @functools.wraps(func)
+ def decorator(*args):
+ key = threading_setup()
+ try:
+ return func(*args)
+ finally:
+ threading_cleanup(*key)
+ return decorator
+def reap_children():
+ """Use this function at the end of test_main() whenever sub-processes
+ are started. This will help ensure that no extra children (zombies)
+ stick around to hog resources and create problems when looking
+ for refleaks.
+ """
+ # Reap all our dead child processes so we don't leave zombies around.
+ # These hog resources and might be causing some of the buildbots to die.
+ if hasattr(os, 'waitpid'):
+ any_process = -1
+ while True:
+ try:
+ # This will raise an exception on Windows. That's ok.
+ pid, status = os.waitpid(any_process, os.WNOHANG)
+ if pid == 0:
+ break
+ except:
+ break
+def swap_attr(obj, attr, new_val):
+ """Temporary swap out an attribute with a new object.
+ Usage:
+ with swap_attr(obj, "attr", 5):
+ ...
+ This will set obj.attr to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `attr` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+ """
+ if hasattr(obj, attr):
+ real_val = getattr(obj, attr)
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ setattr(obj, attr, real_val)
+ else:
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ delattr(obj, attr)
+def swap_item(obj, item, new_val):
+ """Temporary swap out an item with a new object.
+ Usage:
+ with swap_item(obj, "item", 5):
+ ...
+ This will set obj["item"] to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `item` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+ """
+ if item in obj:
+ real_val = obj[item]
+ obj[item] = new_val
+ try:
+ yield
+ finally:
+ obj[item] = real_val
+ else:
+ obj[item] = new_val
+ try:
+ yield
+ finally:
+ del obj[item]
+def strip_python_stderr(stderr):
+ """Strip the stderr of a Python process from potential debug output
+ emitted by the interpreter.
+ This will typically be run on the result of the communicate() method
+ of a subprocess.Popen object.
+ """
+ stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
+ return stderr
+def args_from_interpreter_flags():
+ """Return a list of command-line arguments reproducing the current
+ settings in sys.flags and sys.warnoptions."""
+ flag_opt_map = {
+ 'bytes_warning': 'b',
+ 'dont_write_bytecode': 'B',
+ 'ignore_environment': 'E',
+ 'no_user_site': 's',
+ 'no_site': 'S',
+ 'optimize': 'O',
+ 'verbose': 'v',
+ }
+ args = []
+ for flag, opt in flag_opt_map.items():
+ v = getattr(sys.flags, flag)
+ if v > 0:
+ args.append('-' + opt * v)
+ for opt in sys.warnoptions:
+ args.append('-W' + opt)
+ return args
+# Support for assertions about logging.
+class TestHandler(logging.handlers.BufferingHandler):
+ def __init__(self, matcher):
+ # BufferingHandler takes a "capacity" argument
+ # so as to know when to flush. As we're overriding
+ # shouldFlush anyway, we can set a capacity of zero.
+ # You can call flush() manually to clear out the
+ # buffer.
+ logging.handlers.BufferingHandler.__init__(self, 0)
+ self.matcher = matcher
+ def shouldFlush(self):
+ return False
+ def emit(self, record):
+ self.format(record)
+ self.buffer.append(record.__dict__)
+ def matches(self, **kwargs):
+ """
+ Look for a saved dict whose keys/values match the supplied arguments.
+ """
+ result = False
+ for d in self.buffer:
+ if self.matcher.matches(d, **kwargs):
+ result = True
+ break
+ return result
+class Matcher(object):
+ _partial_matches = ('msg', 'message')
+ def matches(self, d, **kwargs):
+ """
+ Try to match a single dict with the supplied arguments.
+ Keys whose values are strings and which are in self._partial_matches
+ will be checked for partial (i.e. substring) matches. You can extend
+ this scheme to (for example) do regular expression matching, etc.
+ """
+ result = True
+ for k in kwargs:
+ v = kwargs[k]
+ dv = d.get(k)
+ if not self.match_value(k, dv, v):
+ result = False
+ break
+ return result
+ def match_value(self, k, dv, v):
+ """
+ Try to match a single stored value (dv) with a supplied value (v).
+ """
+ if type(v) != type(dv):
+ result = False
+ elif type(dv) is not str or k not in self._partial_matches:
+ result = (v == dv)
+ else:
+ result = dv.find(v) >= 0
+ return result
+_can_symlink = None
+def can_symlink():
+ global _can_symlink
+ if _can_symlink is not None:
+ return _can_symlink
+ symlink_path = TESTFN + "can_symlink"
+ try:
+ os.symlink(TESTFN, symlink_path)
+ can = True
+ except (OSError, NotImplementedError, AttributeError):
+ can = False
+ else:
+ os.remove(symlink_path)
+ _can_symlink = can
+ return can
+def skip_unless_symlink(test):
+ """Skip decorator for tests that require functional symlink"""
+ ok = can_symlink()
+ msg = "Requires functional symlink implementation"
+ return test if ok else unittest.skip(msg)(test)
+def patch(test_instance, object_to_patch, attr_name, new_value):
+ """Override 'object_to_patch'.'attr_name' with 'new_value'.
+ Also, add a cleanup procedure to 'test_instance' to restore
+ 'object_to_patch' value for 'attr_name'.
+ The 'attr_name' should be a valid attribute for 'object_to_patch'.
+ """
+ # check that 'attr_name' is a real attribute for 'object_to_patch'
+ # will raise AttributeError if it does not exist
+ getattr(object_to_patch, attr_name)
+ # keep a copy of the old value
+ attr_is_local = False
+ try:
+ old_value = object_to_patch.__dict__[attr_name]
+ except (AttributeError, KeyError):
+ old_value = getattr(object_to_patch, attr_name, None)
+ else:
+ attr_is_local = True
+ # restore the value when the test is done
+ def cleanup():
+ if attr_is_local:
+ setattr(object_to_patch, attr_name, old_value)
+ else:
+ delattr(object_to_patch, attr_name)
+ test_instance.addCleanup(cleanup)
+ # actually override the attribute
+ setattr(object_to_patch, attr_name, new_value)
+from argparse import Namespace
+import unittest
+from eip_client.utils import eip_argparse
+class EIPArgParseTest(unittest.TestCase):
+ """
+ Test argparse options for eip client
+ """
+ def setUp(self):
+ """
+ get the parser
+ """
+ self.parser = eip_argparse.build_parser()
+ def test_debug_mode(self):
+ """
+ test debug mode option
+ """
+ opts = self.parser.parse_args(
+ ['--debug'])
+ self.assertEqual(opts,
+ Namespace(config=None,
+ debug=True))
+# Ideas for testing conductor:
+# - test_process_spawning
+# - test_process_watching_pipe
+# - test_process_output_callback
+# - test_status_change
+# - test_status_change_callback
+# vim: set fileencoding=utf-8 :
+from argparse import Namespace
+import logging
+logger = logging.getLogger(name=__name__)
+import sys
+import unittest
+# black magic XXX ??
+import sip
+sip.setapi('QVariant', 2)
+from PyQt4 import QtGui
+from PyQt4.QtTest import QTest
+from PyQt4.QtCore import Qt
+from eip_client import eipcmainwindow, conductor
+class MainWindowTest(unittest.TestCase):
+ """
+ Test our mainwindow GUI
+ """
+ ##################################################
+ # To be moved to BaseEIPTestCase
+ def setUp(self):
+ '''Create the GUI'''
+ = QtGui.QApplication(sys.argv)
+ opts = Namespace(config=None,
+ debug=False)
+ = eipcmainwindow.EIPCMainWindow(opts)
+ def tearDown(self):
+ """
+ cleanup
+ """
+ # we have to delete references, otherwise
+ # we get nice segfaults :)
+ del(
+ del(
+ ##################################################
+ def test_system_has_systray(self):
+ """
+ does this system has a systray?
+ not the application response to that.
+ """
+ self.assertEqual(
+ True)
+ def test_defaults(self):
+ """
+ test that the defaults are those expected
+ """
+ self.assertEqual(, "EIP")
+ #self.assertEqual(, 15)
+ #logger.debug('durationSpinBox: %s' %
+ def test_main_window_has_conductor_instance(self):
+ """
+ test main window instantiates conductor class
+ """
+ self.assertEqual(hasattr(, 'conductor'), True)
+ self.assertEqual(isinstance(,
+ conductor.EIPConductor), True)
+ # Let's roll... let's test serious things
+ # ... better to have a different TestCase for this?
+ # plan is:
+ # 1) we signal to the app that we are running from the
+ # testrunner -- so it knows, just in case :P
+ # 2) we init the conductor with the default-for-testrunner
+ # options -- like getting a fake-output client script
+ # that mocks openvpn output to stdout.
+ # 3) we check that the important things work as they
+ # expected for the output of the binaries.
+ # get generic helper methods for the base testcase class.
+ # mock_good_output
+ # mock_bad_output
+ # check_status
+ def test_connected_status_good_output(self):
+ """
+ check we get 'connected' state after mocked \
+good output from the fake openvpn process.
+ """
+ self.mock_good_output()
+ # wait?
+ self.check_state('connected')
+ def test_unrecoverable_status_bad_output(self):
+ """
+ check we get 'unrecoverable' state after
+ mocked bad output from the fake openvpn process.
+ """
+ self.mock_bad_output()
+ self.check_state('unrecoverable')
+ def test_icon_reflects_state(self):
+ """
+ test that the icon changes after an injection
+ of a change-of-state event.
+ """
+ self.mock_status_change('connected')
+ # icon == connectedIcon
+ # examine: QSystemtrayIcon.MessageIcon ??
+ self.mock_status_change('disconnected')
+ # ico == disconnectedIcon
+ self.mock_status_change('connecting')
+ # icon == connectingIcon
+ def test_status_signals_are_working(self):
+ """
+ test that status-change signals are being triggered
+ """
+ #???
+ pass
+ # sample tests below... to be removed
+ #def test_show_message_button_does_show_message(self):
+ #"""
+ #test that clicking on main window button shows message
+ #"""
+ #ok_show =
+ #trayIcon =
+ # fake left click
+ #QTest.mouseClick(ok_show, Qt.LeftButton)
+ # how to assert that message has been shown?
+ #import ipdb;ipdb.set_trace()
+ #def test_do_fallback_if_not_systray(self):
+ #"""
+ #test that we do whatever we decide to do
+ #when we detect no systray.
+ #what happens with unity??
+ #"""
+ #pass
+if __name__ == "__main__":
+ unittest.main()
+import socket
+import select
+import telnetlib
+import contextlib
+from unittest import TestCase
+import support
+from eip_client.vpnmanager import OpenVPNManager
+HOST = "localhost"
+class SocketStub(object):
+ ''' a socket proxy that re-defines sendall() '''
+ def __init__(self, reads=[]):
+ self.reads = reads
+ self.writes = []
+ self.block = False
+ def sendall(self, data):
+ self.writes.append(data)
+ def recv(self, size):
+ out = b''
+ while self.reads and len(out) < size:
+ out += self.reads.pop(0)
+ #print(out)
+ if len(out) > size:
+ self.reads.insert(0, out[size:])
+ out = out[:size]
+ return out
+class TelnetAlike(telnetlib.Telnet):
+ def fileno(self):
+ raise NotImplementedError()
+ def close(self):
+ pass
+ def sock_avail(self):
+ return (not self.sock.block)
+ def msg(self, msg, *args):
+ with support.captured_stdout() as out:
+ telnetlib.Telnet.msg(self, msg, *args)
+ self._messages += out.getvalue()
+ return
+ def read_very_lazy(self):
+ self.fill_rawq()
+ _all = self.read_all()
+ print 'faking lazy:', _all
+ return _all
+def new_select(*s_args):
+ block = False
+ for l in s_args:
+ for fob in l:
+ if isinstance(fob, TelnetAlike):
+ block = fob.sock.block
+ if block:
+ return [[], [], []]
+ else:
+ return s_args
+def test_socket(reads):
+ def new_conn(*ignored):
+ return SocketStub(reads)
+ try:
+ old_conn = socket.create_connection
+ socket.create_connection = new_conn
+ yield None
+ finally:
+ socket.create_connection = old_conn
+ return
+# VPN Commands Dict
+vpn_commands = {
+ 'status': [
+ 'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012',
+ 'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102',
+ 'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329',
+ 'Auth read bytes,872102'],
+ 'state': ['1340616463,CONNECTED,SUCCESS,,'],
+ # XXX add more tests
+ }
+class VPNManagementStub(TelnetAlike):
+ epilogue = "\nEND\n"
+ def write(self, data):
+ #print('data written')
+ data = data[:-1]
+ if data not in vpn_commands:
+ print('not in commands')
+ telnetlib.Telnet.write(self, data)
+ else:
+ msg = '\n'.join(vpn_commands[data]) + self.epilogue
+ print 'writing...'
+ print msg
+ for line in vpn_commands[data]:
+ self.sock.reads.append(line)
+ #telnetlib.Telnet.write(self, line)
+ self.sock.reads.append(self.epilogue)
+ #telnetlib.Telnet.write(self, self.epilogue)
+def test_telnet(reads=[], cls=VPNManagementStub):
+ ''' return a telnetlib.Telnet object that uses a SocketStub with
+ reads queued up to be read, and write method mocking a vpn
+ management interface'''
+ for x in reads:
+ assert type(x) is bytes, x
+ with test_socket(reads):
+ telnet = cls('dummy', 0)
+ telnet._messages = '' # debuglevel output
+ return telnet
+class ReadTests(TestCase):
+ def setUp(self):
+ self.old_select =
+ = new_select
+ def tearDown(self):
+ = self.old_select
+ def test_read_until(self):
+ """
+ read_until(expected, timeout=None)
+ test the blocking version of read_util
+ """
+ want = [b'xxxmatchyyy']
+ telnet = test_telnet(want)
+ data = telnet.read_until(b'match')
+ self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq,
+ telnet.rawq, telnet.sock.reads))
+ reads = [b'x' * 50, b'match', b'y' * 50]
+ expect = b''.join(reads[:-1])
+ telnet = test_telnet(reads)
+ data = telnet.read_until(b'match')
+ self.assertEqual(data, expect)
+ def test_read_all(self):
+ """
+ read_all()
+ Read all data until EOF; may block.
+ """
+ reads = [b'x' * 500, b'y' * 500, b'z' * 500]
+ expect = b''.join(reads)
+ telnet = test_telnet(reads)
+ data = telnet.read_all()
+ self.assertEqual(data, expect)
+ return
+ def test_read_some(self):
+ """
+ read_some()
+ Read at least one byte or EOF; may block.
+ """
+ # test 'at least one byte'
+ telnet = test_telnet([b'x' * 500])
+ data = telnet.read_some()
+ self.assertTrue(len(data) >= 1)
+ # test EOF
+ telnet = test_telnet()
+ data = telnet.read_some()
+ self.assertEqual(b'', data)
+ def _read_eager(self, func_name):
+ """
+ read_*_eager()
+ Read all data available already queued or on the socket,
+ without blocking.
+ """
+ want = b'x' * 100
+ telnet = test_telnet([want])
+ func = getattr(telnet, func_name)
+ telnet.sock.block = True
+ self.assertEqual(b'', func())
+ telnet.sock.block = False
+ data = b''
+ while True:
+ try:
+ data += func()
+ except EOFError:
+ break
+ self.assertEqual(data, want)
+ def test_read_eager(self):
+ # read_eager and read_very_eager make the same gaurantees
+ # (they behave differently but we only test the gaurantees)
+ self._read_eager('read_eager')
+ self._read_eager('read_very_eager')
+ #self._read_eager('read_very_lazy')
+ # NB -- we need to test the IAC block which is mentioned in the
+ # docstring but not in the module docs
+ def read_very_lazy(self):
+ want = b'x' * 100
+ telnet = test_telnet([want])
+ self.assertEqual(b'', telnet.read_very_lazy())
+ while telnet.sock.reads:
+ telnet.fill_rawq()
+ data = telnet.read_very_lazy()
+ self.assertEqual(want, data)
+ self.assertRaises(EOFError, telnet.read_very_lazy)
+ def test_read_lazy(self):
+ want = b'x' * 100
+ telnet = test_telnet([want])
+ self.assertEqual(b'', telnet.read_lazy())
+ data = b''
+ while True:
+ try:
+ read_data = telnet.read_lazy()
+ data += read_data
+ if not read_data:
+ telnet.fill_rawq()
+ except EOFError:
+ break
+ self.assertTrue(want.startswith(data))
+ self.assertEqual(data, want)
+def _seek_to_eof(self):
+ """
+ Read as much as available. Position seek pointer to end of stream
+ """
+ #import ipdb;ipdb.set_trace()
+ while
+ print 'reading...'
+ print 'and filling rawq'
+ try:
+ b =
+ while b:
+ b =
+ except EOFError:
+ pass
+def connect_to_stub(self):
+ """
+ stub to be added to manager
+ """
+ try:
+ self.close()
+ except:
+ pass
+ if self.connected():
+ return True
+ = test_telnet()
+ self._seek_to_eof()
+ return True
+class VPNManagerTests(TestCase):
+ def setUp(self):
+ self.old_select =
+ = new_select
+ patched_manager = OpenVPNManager
+ patched_manager._seek_to_eof = _seek_to_eof
+ patched_manager.connect = connect_to_stub
+ self.manager = patched_manager()
+ def tearDown(self):
+ = self.old_select
+ # tests
+ #def test_read_very_lazy(self):
+ #want = b'x' * 100
+ #telnet = test_telnet()
+ #self.assertEqual(b'', telnet.read_very_lazy())
+ #print 'writing to telnet'
+ #telnet.write('status\n')
+ #import ipdb;ipdb.set_trace()
+ #while telnet.sock.reads:
+ #print 'reading...'
+ #print 'and filling rawq'
+ #telnet.fill_rawq()
+ #import ipdb;ipdb.set_trace()
+ #data = telnet.read_very_lazy()
+ #print 'data ->', data
+ #def test_manager_status(self):
+ #buf = self.manager._send_command('state')
+ #import ipdb;ipdb.set_trace()
+ #print 'buf-->'
+ #print buf
+ def test_manager_state(self):
+ buf = self.manager.state()
+ print 'buf-->'
+ print buf
+ import ipdb;ipdb.set_trace()
+ def test_command(self):
+ commands = [b'status']
+ for com in commands:
+ telnet = test_telnet()
+ telnet.write(com)
+ buf = telnet.read_until(b'END')
+ print 'buf '
+ print buf
+def test_main(verbose=None):
+ support.run_unittest(
+ #ReadTests,
+ VPNManagerTests)
+if __name__ == '__main__':
+ test_main()
+import unittest
+import sys
+import time
+from eip_client.mocks.manager import get_openvpn_manager_mocks
+class VPNManagerTests(unittest.TestCase):
+ def setUp(self):
+ self.manager = get_openvpn_manager_mocks()
+ #
+ # tests
+ #
+ def test_status_command(self):
+ ret = self.manager.status()
+ #print ret
+ def test_connection_state(self):
+ ts, status, ok, ip, remote = self.manager.get_connection_state()
+ self.assertTrue(status in ('CONNECTED', 'DISCONNECTED'))
+ self.assertTrue(isinstance(ts, time.struct_time))
+ def test_status_io(self):
+ when_ts, counters = self.manager.get_status_io()
+ self.assertTrue(isinstance(when_ts, time.struct_time))
+ self.assertEqual(len(counters), 5)
+ self.assertTrue(all(map(lambda x: x.isdigit(), counters)))
+def test():
+ suite = unittest.TestSuite()
+ for cls in (VPNManagerTests,):
+ suite.addTest(unittest.makeSuite(cls))
+ runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
+ failfast=False)
+ result =
+if __name__ == "__main__":
+ test()