summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkali <kali@leap.se>2012-08-08 20:06:14 +0900
committerkali <kali@leap.se>2012-08-08 20:06:14 +0900
commitb1e025a343d0c6019127871acb47d7b295d342e9 (patch)
treeb5d175e1bf8b625433ba8d8ec83cb45b0ccc8618
parentc1fa99be4dc4174a34620324ec5056b793196b53 (diff)
parent60a51aed9c1ee9249a79b3d996ae86d93a9532de (diff)
Merge branch 'tests-cleanup' into develop
moved out old broken stuff; copied a run_scripts entry point for tests; created a bunch of (mostly stubs) simple tests on secondary functions.
-rw-r--r--.gitignore4
-rw-r--r--Makefile3
-rw-r--r--README.txt18
-rwxr-xr-xrun_tests.sh141
-rw-r--r--setup/install_venv.py246
-rw-r--r--setup/requirements.pip1
-rw-r--r--setup/test-requires5
-rwxr-xr-xsetup/tools/with_venv.sh4
-rw-r--r--src/leap/__init__.py5
-rw-r--r--src/leap/gui/mainwindow_rc.py2
-rw-r--r--src/leap/gui/test_mainwindow_rc.py26
-rw-r--r--src/leap/tests/fakeclient.py63
-rw-r--r--src/leap/tests/mocks/__init__.py1
-rw-r--r--src/leap/tests/mocks/manager.py20
-rw-r--r--src/leap/util/fileutil.py4
-rw-r--r--src/leap/util/test_fileutil.py99
-rw-r--r--src/leap/util/test_leap_argparse.py (renamed from tests/test_argparse.py)13
-rw-r--r--tests/README1
-rw-r--r--tests/__init__.py1
-rw-r--r--tests/support.py111
-rw-r--r--tests/support_tests.py1725
-rw-r--r--tests/test_conductor.py8
-rw-r--r--tests/test_mainwindow.py150
-rw-r--r--tests/test_mgminterface.py333
-rw-r--r--tests/test_qt_environment.py39
-rw-r--r--tests/test_vpn_management.py42
26 files changed, 600 insertions, 2465 deletions
diff --git a/.gitignore b/.gitignore
index 7c06293..26838e5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,9 +8,11 @@ core
debian/python-leap-client/
dist/
docs/_build
+docs/covhtml
include/
lib/
local/
+man/
share/
src/leap.egg-info/
-src/leap_client.egg-info/
+src/leap_client.egg-info
diff --git a/Makefile b/Makefile
index 8f50f56..3ddcb0a 100644
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,9 @@
# ################################
# Makefile for compiling resources
# files.
+# TODO move to setup scripts
+# and implement it in python
+# http://die-offenbachs.homelinux.org:48888/hg/eric5/file/5072605ad4dd/compileUiFiles.py
###### EDIT ######################
#Directory with ui and resource files
RESOURCE_DIR = data/resources
diff --git a/README.txt b/README.txt
index 4a4ae52..bb6f082 100644
--- a/README.txt
+++ b/README.txt
@@ -27,13 +27,27 @@ leap --debug
Running tests
=============
-nosetests -v
-[ currently broken ]
+
+./run_tests.sh
+
+if you want to run specific tests, pass the (sub)module to nose:
+
+nosetests leap.util
+
+or
+
+nosetests leap.util.test_leap_argparse
Deps
====
+
apt-get install python-qt4 python-qt4-doc pyqt4-dev-tools
+Test-deps
+=========
+
+test-requires
+
Hack
====
diff --git a/run_tests.sh b/run_tests.sh
new file mode 100755
index 0000000..9a28d2f
--- /dev/null
+++ b/run_tests.sh
@@ -0,0 +1,141 @@
+#!/bin/bash
+
+set -eu
+
+function usage {
+ echo "Usage: $0 [OPTION]..."
+ echo "Run leap-client test suite"
+ echo ""
+ echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
+ echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
+ echo " -s, --no-site-packages Isolate the virtualenv from the global Python environment"
+ echo " -x, --stop Stop running tests after the first error or failure."
+ echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
+ echo " -p, --pep8 Just run pep8"
+ echo " -P, --no-pep8 Don't run pep8"
+ echo " -c, --coverage Generate coverage report"
+ echo " -h, --help Print this usage message"
+ echo " --hide-elapsed Don't print the elapsed time for each test along with slow test list"
+ echo ""
+ echo "Note: with no options specified, the script will try to run the tests in a virtual environment,"
+ echo " If no virtualenv is found, the script will ask if you would like to create one. If you "
+ echo " prefer to run tests NOT in a virtual environment, simply pass the -N option."
+ exit
+}
+
+function process_option {
+ case "$1" in
+ -h|--help) usage;;
+ -V|--virtual-env) always_venv=1; never_venv=0;;
+ -N|--no-virtual-env) always_venv=0; never_venv=1;;
+ -s|--no-site-packages) no_site_packages=1;;
+ -f|--force) force=1;;
+ -p|--pep8) just_pep8=1;;
+ -P|--no-pep8) no_pep8=1;;
+ -c|--coverage) coverage=1;;
+ -*) noseopts="$noseopts $1";;
+ *) noseargs="$noseargs $1"
+ esac
+}
+
+venv=.venv
+with_venv=setup/tools/with_venv.sh
+always_venv=0
+never_venv=0
+force=0
+no_site_packages=0
+installvenvopts=
+noseargs=
+noseopts=
+wrapper=""
+just_pep8=0
+no_pep8=0
+coverage=0
+
+for arg in "$@"; do
+ process_option $arg
+done
+
+# If enabled, tell nose to collect coverage data
+if [ $coverage -eq 1 ]; then
+ noseopts="$noseopts --with-coverage --cover-package=leap-client"
+fi
+
+if [ $no_site_packages -eq 1 ]; then
+ installvenvopts="--no-site-packages"
+fi
+
+function run_tests {
+ # Just run the test suites in current environment
+ ${wrapper} $NOSETESTS
+ # If we get some short import error right away, print the error log directly
+ RESULT=$?
+ return $RESULT
+}
+
+function run_pep8 {
+ echo "Running pep8 ..."
+ srcfiles="src/leap tests"
+ # Just run PEP8 in current environment
+ pep8_opts="--ignore=E202,W602 --exclude=*_rc.py --repeat"
+ ${wrapper} pep8 ${pep8_opts} ${srcfiles}
+}
+
+NOSETESTS="nosetests $noseopts $noseargs"
+
+if [ $never_venv -eq 0 ]
+then
+ # Remove the virtual environment if --force used
+ if [ $force -eq 1 ]; then
+ echo "Cleaning virtualenv..."
+ rm -rf ${venv}
+ fi
+ if [ -e ${venv} ]; then
+ wrapper="${with_venv}"
+ else
+ if [ $always_venv -eq 1 ]; then
+ # Automatically install the virtualenv
+ python setup/install_venv.py $installvenvopts
+ wrapper="${with_venv}"
+ else
+ echo -e "No virtual environment found...create one? (Y/n) \c"
+ read use_ve
+ if [ "x$use_ve" = "xY" -o "x$use_ve" = "x" -o "x$use_ve" = "xy" ]; then
+ # Install the virtualenv and run the test suite in it
+ python setup/install_venv.py $installvenvopts
+ wrapper=${with_venv}
+ fi
+ fi
+ fi
+fi
+
+# Delete old coverage data from previous runs
+if [ $coverage -eq 1 ]; then
+ ${wrapper} coverage erase
+fi
+
+if [ $just_pep8 -eq 1 ]; then
+ run_pep8
+ exit
+fi
+
+run_tests
+
+if [ -z "$noseargs" ]; then
+ if [ $no_pep8 -eq 0 ]; then
+ run_pep8
+ fi
+fi
+
+function run_coverage {
+ # XXX not working? getting 3rd party modules
+ coverage_opts="--include `pwd`/src/leap/*,`pwd`/src/leap/eip/*"
+ ${wrapper} coverage html -d docs/covhtml -i $coverage_opts
+ echo "now point your browser at docs/covhtml/index.html"
+}
+
+if [ $coverage -eq 1 ]; then
+ echo "Generating coverage report in docs/covhtml/"
+ run_coverage
+ exit
+fi
diff --git a/setup/install_venv.py b/setup/install_venv.py
new file mode 100644
index 0000000..6cb3a69
--- /dev/null
+++ b/setup/install_venv.py
@@ -0,0 +1,246 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Copyright 2010 OpenStack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Installation script for Nova's development virtualenv
+"""
+
+import optparse
+import os
+import subprocess
+import sys
+import platform
+
+
+ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
+VENV = os.path.join(ROOT, '.venv')
+PIP_REQUIRES = os.path.join(ROOT, 'tools', 'pip-requires')
+TEST_REQUIRES = os.path.join(ROOT, 'tools', 'test-requires')
+PY_VERSION = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
+
+
+def die(message, *args):
+ print >> sys.stderr, message % args
+ sys.exit(1)
+
+
+def check_python_version():
+ if sys.version_info < (2, 6):
+ die("Need Python Version >= 2.6")
+
+
+def run_command_with_code(cmd, redirect_output=True, check_exit_code=True):
+ """
+ Runs a command in an out-of-process shell, returning the
+ output of that command. Working directory is ROOT.
+ """
+ if redirect_output:
+ stdout = subprocess.PIPE
+ else:
+ stdout = None
+
+ proc = subprocess.Popen(cmd, cwd=ROOT, stdout=stdout)
+ output = proc.communicate()[0]
+ if check_exit_code and proc.returncode != 0:
+ die('Command "%s" failed.\n%s', ' '.join(cmd), output)
+ return (output, proc.returncode)
+
+
+def run_command(cmd, redirect_output=True, check_exit_code=True):
+ return run_command_with_code(cmd, redirect_output, check_exit_code)[0]
+
+
+class Distro(object):
+
+ def check_cmd(self, cmd):
+ return bool(run_command(['which', cmd], check_exit_code=False).strip())
+
+ def install_virtualenv(self):
+ if self.check_cmd('virtualenv'):
+ return
+
+ if self.check_cmd('easy_install'):
+ print 'Installing virtualenv via easy_install...',
+ if run_command(['easy_install', 'virtualenv']):
+ print 'Succeeded'
+ return
+ else:
+ print 'Failed'
+
+ die('ERROR: virtualenv not found.\n\nDevelopment'
+ ' requires virtualenv, please install it using your'
+ ' favorite package management tool')
+
+ def post_process(self):
+ """Any distribution-specific post-processing gets done here.
+
+ In particular, this is useful for applying patches to code inside
+ the venv."""
+ pass
+
+
+class Debian(Distro):
+ """This covers all Debian-based distributions."""
+
+ def check_pkg(self, pkg):
+ return run_command_with_code(['dpkg', '-l', pkg],
+ check_exit_code=False)[1] == 0
+
+ def apt_install(self, pkg, **kwargs):
+ run_command(['sudo', 'apt-get', 'install', '-y', pkg], **kwargs)
+
+ def apply_patch(self, originalfile, patchfile):
+ run_command(['patch', originalfile, patchfile])
+
+ def install_virtualenv(self):
+ if self.check_cmd('virtualenv'):
+ return
+
+ if not self.check_pkg('python-virtualenv'):
+ self.apt_install('python-virtualenv', check_exit_code=False)
+
+ super(Debian, self).install_virtualenv()
+
+
+class Fedora(Distro):
+ """This covers all Fedora-based distributions.
+
+ Includes: Fedora, RHEL, CentOS, Scientific Linux"""
+
+ def check_pkg(self, pkg):
+ return run_command_with_code(['rpm', '-q', pkg],
+ check_exit_code=False)[1] == 0
+
+ def yum_install(self, pkg, **kwargs):
+ run_command(['sudo', 'yum', 'install', '-y', pkg], **kwargs)
+
+ def apply_patch(self, originalfile, patchfile):
+ run_command(['patch', originalfile, patchfile])
+
+ def install_virtualenv(self):
+ if self.check_cmd('virtualenv'):
+ return
+
+ if not self.check_pkg('python-virtualenv'):
+ self.yum_install('python-virtualenv', check_exit_code=False)
+
+ super(Fedora, self).install_virtualenv()
+
+
+def get_distro():
+ if os.path.exists('/etc/fedora-release') or \
+ os.path.exists('/etc/redhat-release'):
+ return Fedora()
+ elif os.path.exists('/etc/debian_version'):
+ return Debian()
+ else:
+ return Distro()
+
+
+def check_dependencies():
+ get_distro().install_virtualenv()
+
+
+def create_virtualenv(venv=VENV, no_site_packages=True):
+ """Creates the virtual environment and installs PIP only into the
+ virtual environment
+ """
+ print 'Creating venv...',
+ if no_site_packages:
+ run_command(['virtualenv', '-q', '--no-site-packages', VENV])
+ else:
+ run_command(['virtualenv', '-q', VENV])
+ print 'done.'
+ print 'Installing pip in virtualenv...',
+ if not run_command(['tools/with_venv.sh', 'easy_install',
+ 'pip>1.0']).strip():
+ die("Failed to install pip.")
+ print 'done.'
+
+
+def pip_install(*args):
+ run_command(['tools/with_venv.sh',
+ 'pip', 'install', '--upgrade'] + list(args),
+ redirect_output=False)
+
+
+def install_dependencies(venv=VENV):
+ print 'Installing dependencies with pip (this can take a while)...'
+
+ # First things first, make sure our venv has the latest pip and distribute.
+ pip_install('pip')
+ pip_install('distribute')
+
+ pip_install('-r', PIP_REQUIRES)
+ pip_install('-r', TEST_REQUIRES)
+
+ # Tell the virtual env how to "import nova"
+ pthfile = os.path.join(venv, "lib", PY_VERSION, "site-packages",
+ "novaclient.pth")
+ f = open(pthfile, 'w')
+ f.write("%s\n" % ROOT)
+
+
+def post_process():
+ get_distro().post_process()
+
+
+def print_help():
+ help = """
+ python-novaclient development environment setup is complete.
+
+ python-novaclient development uses virtualenv to track and manage Python
+ dependencies while in development and testing.
+
+ To activate the python-novaclient virtualenv for the extent of your current
+ shell session you can run:
+
+ $ source .venv/bin/activate
+
+ Or, if you prefer, you can run commands in the virtualenv on a case by case
+ basis by running:
+
+ $ tools/with_venv.sh <your command>
+
+ Also, make test will automatically use the virtualenv.
+ """
+ print help
+
+
+def parse_args():
+ """Parse command-line arguments"""
+ parser = optparse.OptionParser()
+ parser.add_option("-n", "--no-site-packages", dest="no_site_packages",
+ default=False, action="store_true",
+ help="Do not inherit packages from global Python install")
+ return parser.parse_args()
+
+
+def main(argv):
+ (options, args) = parse_args()
+ check_python_version()
+ check_dependencies()
+ create_virtualenv(no_site_packages=options.no_site_packages)
+ install_dependencies()
+ post_process()
+ print_help()
+
+if __name__ == '__main__':
+ main(sys.argv)
diff --git a/setup/requirements.pip b/setup/requirements.pip
index e69de29..1352d5e 100644
--- a/setup/requirements.pip
+++ b/setup/requirements.pip
@@ -0,0 +1 @@
+argparse
diff --git a/setup/test-requires b/setup/test-requires
new file mode 100644
index 0000000..26db61c
--- /dev/null
+++ b/setup/test-requires
@@ -0,0 +1,5 @@
+coverage
+mock
+nose
+pep8==1.1
+sphinx>=1.1.2
diff --git a/setup/tools/with_venv.sh b/setup/tools/with_venv.sh
new file mode 100755
index 0000000..0e58f1a
--- /dev/null
+++ b/setup/tools/with_venv.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+TOOLS=`dirname $0`
+VENV=$TOOLS/../../.venv
+source $VENV/bin/activate && $@
diff --git a/src/leap/__init__.py b/src/leap/__init__.py
index e69de29..a7ae10e 100644
--- a/src/leap/__init__.py
+++ b/src/leap/__init__.py
@@ -0,0 +1,5 @@
+from leap import eip
+from leap import baseapp
+from leap import util
+
+__all__ = [eip, baseapp, util]
diff --git a/src/leap/gui/mainwindow_rc.py b/src/leap/gui/mainwindow_rc.py
index e5a671f..4c003cf 100644
--- a/src/leap/gui/mainwindow_rc.py
+++ b/src/leap/gui/mainwindow_rc.py
@@ -2,7 +2,7 @@
# Resource object code
#
-# Created: Sun Jul 22 17:08:49 2012
+# Created: Mon Jul 23 11:10:54 2012
# by: The Resource Compiler for PyQt (Qt v4.8.2)
#
# WARNING! All changes made in this file will be lost!
diff --git a/src/leap/gui/test_mainwindow_rc.py b/src/leap/gui/test_mainwindow_rc.py
new file mode 100644
index 0000000..fd02704
--- /dev/null
+++ b/src/leap/gui/test_mainwindow_rc.py
@@ -0,0 +1,26 @@
+import unittest
+import hashlib
+
+import sip
+sip.setapi('QVariant', 2)
+
+from leap.gui import mainwindow_rc
+
+# I have to admit that there's something
+# perverse in testing this.
+# But I thought that it could be a good idea
+# to put a check to avoid non-updated resources files.
+
+# so, if you came here because an updated resource
+# did break a test, what you have to do is getting
+# the md5 hash of your qt_resource_data and change it here.
+
+# annoying? yep. try making a script for that :P
+
+
+class MainWindowResourcesTest(unittest.TestCase):
+
+ def test_mainwindow_resources_hash(self):
+ self.assertEqual(
+ hashlib.md5(mainwindow_rc.qt_resource_data).hexdigest(),
+ '5cc26322f96fabaa05c404f22774c716')
diff --git a/src/leap/tests/fakeclient.py b/src/leap/tests/fakeclient.py
deleted file mode 100644
index 45de2cd..0000000
--- a/src/leap/tests/fakeclient.py
+++ /dev/null
@@ -1,63 +0,0 @@
-fakeoutput = """
-mullvad Sun Jun 17 14:34:57 2012 OpenVPN 2.2.1 i486-linux-gnu [SSL] [LZO2] [EPOLL] [PKCS11] [eurephia] [MH] [PF_INET6] [IPv6 payload 20110424-2 (2.2RC2)] built
- on Mar 23 2012
-Sun Jun 17 14:34:57 2012 MANAGEMENT: TCP Socket listening on [AF_INET]127.0.0.1:7505
-Sun Jun 17 14:34:57 2012 NOTE: the current --script-security setting may allow this configuration to call user-defined scripts
-Sun Jun 17 14:34:57 2012 WARNING: file 'ssl/1021380964266.key' is group or others accessible
-Sun Jun 17 14:34:57 2012 LZO compression initialized
-Sun Jun 17 14:34:57 2012 Control Channel MTU parms [ L:1542 D:138 EF:38 EB:0 ET:0 EL:0 ]
-Sun Jun 17 14:34:57 2012 Socket Buffers: R=[163840->131072] S=[163840->131072]
-Sun Jun 17 14:34:57 2012 Data Channel MTU parms [ L:1542 D:1450 EF:42 EB:135 ET:0 EL:0 AF:3/1 ]
-Sun Jun 17 14:34:57 2012 Local Options hash (VER=V4): '41690919'
-Sun Jun 17 14:34:57 2012 Expected Remote Options hash (VER=V4): '530fdded'
-Sun Jun 17 14:34:57 2012 UDPv4 link local: [undef]
-Sun Jun 17 14:34:57 2012 UDPv4 link remote: [AF_INET]46.21.99.25:1197
-Sun Jun 17 14:34:57 2012 TLS: Initial packet from [AF_INET]46.21.99.25:1197, sid=63c29ace 1d3060d0
-Sun Jun 17 14:34:58 2012 VERIFY OK: depth=2, /C=NA/ST=None/L=None/O=Mullvad/CN=Mullvad_CA/emailAddress=info@mullvad.net
-Sun Jun 17 14:34:58 2012 VERIFY OK: depth=1, /C=NA/ST=None/L=None/O=Mullvad/CN=master.mullvad.net/emailAddress=info@mullvad.net
-Sun Jun 17 14:34:58 2012 Validating certificate key usage
-Sun Jun 17 14:34:58 2012 ++ Certificate has key usage 00a0, expects 00a0
-Sun Jun 17 14:34:58 2012 VERIFY KU OK
-Sun Jun 17 14:34:58 2012 Validating certificate extended key usage
-Sun Jun 17 14:34:58 2012 ++ Certificate has EKU (str) TLS Web Server Authentication, expects TLS Web Server Authentication
-Sun Jun 17 14:34:58 2012 VERIFY EKU OK
-Sun Jun 17 14:34:58 2012 VERIFY OK: depth=0, /C=NA/ST=None/L=None/O=Mullvad/CN=se2.mullvad.net/emailAddress=info@mullvad.net
-Sun Jun 17 14:34:59 2012 Data Channel Encrypt: Cipher 'BF-CBC' initialized with 128 bit key
-Sun Jun 17 14:34:59 2012 Data Channel Encrypt: Using 160 bit message hash 'SHA1' for HMAC authentication
-Sun Jun 17 14:34:59 2012 Data Channel Decrypt: Cipher 'BF-CBC' initialized with 128 bit key
-Sun Jun 17 14:34:59 2012 Data Channel Decrypt: Using 160 bit message hash 'SHA1' for HMAC authentication
-Sun Jun 17 14:34:59 2012 Control Channel: TLSv1, cipher TLSv1/SSLv3 DHE-RSA-AES256-SHA, 2048 bit RSA
-Sun Jun 17 14:34:59 2012 [se2.mullvad.net] Peer Connection Initiated with [AF_INET]46.21.99.25:1197
-Sun Jun 17 14:35:01 2012 SENT CONTROL [se2.mullvad.net]: 'PUSH_REQUEST' (status=1)
-Sun Jun 17 14:35:02 2012 PUSH: Received control message: 'PUSH_REPLY,redirect-gateway def1 bypass-dhcp,dhcp-option DNS 10.11.0.1,route 10.11.0.1,topology net30,ifconfig 10.11.0.202 10.11.0.201'
-Sun Jun 17 14:35:02 2012 OPTIONS IMPORT: --ifconfig/up options modified
-Sun Jun 17 14:35:02 2012 OPTIONS IMPORT: route options modified
-Sun Jun 17 14:35:02 2012 OPTIONS IMPORT: --ip-win32 and/or --dhcp-option options modified
-Sun Jun 17 14:35:02 2012 ROUTE default_gateway=192.168.0.1
-Sun Jun 17 14:35:02 2012 TUN/TAP device tun0 opened
-Sun Jun 17 14:35:02 2012 TUN/TAP TX queue length set to 100
-Sun Jun 17 14:35:02 2012 do_ifconfig, tt->ipv6=0, tt->did_ifconfig_ipv6_setup=0
-Sun Jun 17 14:35:02 2012 /sbin/ifconfig tun0 10.11.0.202 pointopoint 10.11.0.201 mtu 1500
-Sun Jun 17 14:35:02 2012 /etc/openvpn/update-resolv-conf tun0 1500 1542 10.11.0.202 10.11.0.201 init
-dhcp-option DNS 10.11.0.1
-Sun Jun 17 14:35:05 2012 /sbin/route add -net 46.21.99.25 netmask 255.255.255.255 gw 192.168.0.1
-Sun Jun 17 14:35:05 2012 /sbin/route add -net 0.0.0.0 netmask 128.0.0.0 gw 10.11.0.201
-Sun Jun 17 14:35:05 2012 /sbin/route add -net 128.0.0.0 netmask 128.0.0.0 gw 10.11.0.201
-Sun Jun 17 14:35:05 2012 /sbin/route add -net 10.11.0.1 netmask 255.255.255.255 gw 10.11.0.201
-Sun Jun 17 14:35:05 2012 Initialization Sequence Completed
-Sun Jun 17 14:34:57 2012 MANAGEMENT: TCP Socket listening on [AF_INET]127.0.0.1:7505
-"""
-
-import time
-import sys
-
-
-def write_output():
- for line in fakeoutput.split('\n'):
- sys.stdout.write(line + '\n')
- sys.stdout.flush()
- #print(line)
- time.sleep(0.1)
-
-if __name__ == "__main__":
- write_output()
diff --git a/src/leap/tests/mocks/__init__.py b/src/leap/tests/mocks/__init__.py
deleted file mode 100644
index 06f9687..0000000
--- a/src/leap/tests/mocks/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-import manager
diff --git a/src/leap/tests/mocks/manager.py b/src/leap/tests/mocks/manager.py
deleted file mode 100644
index 564631c..0000000
--- a/src/leap/tests/mocks/manager.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from mock import Mock
-
-from eip_client.vpnmanager import OpenVPNManager
-
-vpn_commands = {
- 'status': [
- 'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012',
- 'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102',
- 'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329',
- 'Auth read bytes,872102'],
- 'state': ['1340616463,CONNECTED,SUCCESS,172.28.0.2,198.252.153.38'],
- # XXX add more tests
- }
-
-
-def get_openvpn_manager_mocks():
- manager = OpenVPNManager()
- manager.status = Mock(return_value='\n'.join(vpn_commands['status']))
- manager.state = Mock(return_value=vpn_commands['state'][0])
- return manager
diff --git a/src/leap/util/fileutil.py b/src/leap/util/fileutil.py
index cc3bf34..429e4b1 100644
--- a/src/leap/util/fileutil.py
+++ b/src/leap/util/fileutil.py
@@ -96,7 +96,9 @@ def check_and_fix_urw_only(_file):
test for 600 mode and try
to set it if anything different found
"""
- mode = os.stat(_file).st_mode
+ mode = stat.S_IMODE(
+ os.stat(_file).st_mode)
+
if mode != int('600', 8):
try:
logger.warning(
diff --git a/src/leap/util/test_fileutil.py b/src/leap/util/test_fileutil.py
new file mode 100644
index 0000000..849deca
--- /dev/null
+++ b/src/leap/util/test_fileutil.py
@@ -0,0 +1,99 @@
+import os
+import platform
+import shutil
+import stat
+import tempfile
+import unittest
+
+from leap.util import fileutil
+
+
+class FileUtilTest(unittest.TestCase):
+ """
+ test our file utils
+ """
+
+ def setUp(self):
+ self.system = platform.system()
+ self.create_temp_dir()
+
+ def tearDown(self):
+ self.remove_temp_dir()
+
+ #
+ # helpers
+ #
+
+ def create_temp_dir(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def remove_temp_dir(self):
+ shutil.rmtree(self.tmpdir)
+
+ def get_file_path(self, filename):
+ return os.path.join(
+ self.tmpdir,
+ filename)
+
+ def touch_exec_file(self):
+ fp = self.get_file_path('testexec')
+ open(fp, 'w').close()
+ os.chmod(
+ fp,
+ stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+ return fp
+
+ def get_mode(self, fp):
+ return stat.S_IMODE(os.stat(fp).st_mode)
+
+ #
+ # tests
+ #
+
+ def test_is_user_executable(self):
+ """
+ test that a 700 file
+ is an 700 file. kindda oximoronic, but...
+ """
+ # XXX could check access X_OK
+
+ fp = self.touch_exec_file()
+ mode = self.get_mode(fp)
+ self.assertEqual(mode, int('700', 8))
+
+ def test_which(self):
+ """
+ not a very reliable test,
+ but I cannot think of anything smarter now
+ I guess it's highly improbable that copy
+ command is somewhere else..?
+ """
+ # XXX yep, we can change the syspath
+ # for the test... !
+
+ if self.system == "Linux":
+ self.assertEqual(
+ fileutil.which('cp'),
+ '/bin/cp')
+
+ def test_mkdir_p(self):
+ """
+ test our mkdir -p implementation
+ """
+ testdir = self.get_file_path(
+ os.path.join('test', 'foo', 'bar'))
+ self.assertEqual(os.path.isdir(testdir), False)
+ fileutil.mkdir_p(testdir)
+ self.assertEqual(os.path.isdir(testdir), True)
+
+ def test_check_and_fix_urw_only(self):
+ """
+ test function that fixes perms on
+ files that should be rw only for owner
+ """
+ fp = self.touch_exec_file()
+ mode = self.get_mode(fp)
+ self.assertEqual(mode, int('700', 8))
+ fileutil.check_and_fix_urw_only(fp)
+ mode = self.get_mode(fp)
+ self.assertEqual(mode, int('600', 8))
diff --git a/tests/test_argparse.py b/src/leap/util/test_leap_argparse.py
index a8ce509..1442e82 100644
--- a/tests/test_argparse.py
+++ b/src/leap/util/test_leap_argparse.py
@@ -1,7 +1,7 @@
from argparse import Namespace
import unittest
-from eip_client.utils import eip_argparse
+from leap.util import leap_argparse
class LeapArgParseTest(unittest.TestCase):
@@ -13,14 +13,15 @@ class LeapArgParseTest(unittest.TestCase):
"""
get the parser
"""
- self.parser = eip_argparse.build_parser()
+ self.parser = leap_argparse.build_parser()
def test_debug_mode(self):
"""
test debug mode option
"""
opts = self.parser.parse_args(
- ['--debug'])
- self.assertEqual(opts,
- Namespace(config=None,
- debug=True))
+ ['--debug'])
+ self.assertEqual(
+ opts,
+ Namespace(config_file=None,
+ debug=True))
diff --git a/tests/README b/tests/README
new file mode 100644
index 0000000..8745f03
--- /dev/null
+++ b/tests/README
@@ -0,0 +1 @@
+write here general, integration tests.
diff --git a/tests/__init__.py b/tests/__init__.py
index ef0df1c..e69de29 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1 +0,0 @@
-# XXX put here a sample BaseEIPTestCase
diff --git a/tests/support.py b/tests/support.py
deleted file mode 100644
index 8ac4966..0000000
--- a/tests/support.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# code borrowed from python stdlib tests
-# I think we're not using it at the end...
-# XXX Review and Remove
-
-import contextlib
-import socket
-import sys
-import unittest
-
-
-HOST = "localhost"
-
-
-class TestFailed(Exception):
- """Test failed."""
-
-
-def bind_port(sock, host=HOST):
- """Bind the socket to a free port and return the port number. Relies on
- ephemeral ports in order to ensure we are using an unbound port. This is
- important as many tests may be running simultaneously, especially in a
- buildbot environment. This method raises an exception if the sock.family
- is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
- or SO_REUSEPORT set on it. Tests should *never* set these socket options
- for TCP/IP sockets. The only case for setting these options is testing
- multicasting via multiple UDP sockets.
-
- Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
- on Windows), it will be set on the socket. This will prevent anyone else
- from bind()'ing to our host/port for the duration of the test.
- """
-
- if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
- if hasattr(socket, 'SO_REUSEADDR'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
- raise TestFailed("tests should never set the SO_REUSEADDR " \
- "socket option on TCP/IP sockets!")
- if hasattr(socket, 'SO_REUSEPORT'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
- raise TestFailed("tests should never set the SO_REUSEPORT " \
- "socket option on TCP/IP sockets!")
- if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
- sock.bind((host, 0))
- port = sock.getsockname()[1]
- return port
-
-
-def _run_suite(suite):
- """Run tests from a unittest.TestSuite-derived class."""
- runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
- failfast=False)
- result = runner.run(suite)
- if not result.wasSuccessful():
- if len(result.errors) == 1 and not result.failures:
- err = result.errors[0][1]
- elif len(result.failures) == 1 and not result.errors:
- err = result.failures[0][1]
- else:
- err = "multiple errors occurred"
- raise TestFailed(err)
-
-
-def run_unittest(*classes):
- """Run tests from unittest.TestCase-derived classes."""
- valid_types = (unittest.TestSuite, unittest.TestCase)
- suite = unittest.TestSuite()
- for cls in classes:
- if isinstance(cls, str):
- if cls in sys.modules:
- suite.addTest(unittest.findTestCases(sys.modules[cls]))
- else:
- raise ValueError("str arguments must be keys in sys.modules")
- elif isinstance(cls, valid_types):
- suite.addTest(cls)
- else:
- suite.addTest(unittest.makeSuite(cls))
-
- _run_suite(suite)
-
-
-@contextlib.contextmanager
-def captured_output(stream_name):
- """Return a context manager used by captured_stdout/stdin/stderr
- that temporarily replaces the sys stream *stream_name* with a StringIO."""
- import io
- orig_stdout = getattr(sys, stream_name)
- setattr(sys, stream_name, io.StringIO())
- try:
- yield getattr(sys, stream_name)
- finally:
- setattr(sys, stream_name, orig_stdout)
-
-
-def captured_stdout():
- """Capture the output of sys.stdout:
-
- with captured_stdout() as s:
- print("hello")
- self.assertEqual(s.getvalue(), "hello")
- """
- return captured_output("stdout")
-
-
-def captured_stderr():
- return captured_output("stderr")
-
-
-def captured_stdin():
- return captured_output("stdin")
diff --git a/tests/support_tests.py b/tests/support_tests.py
deleted file mode 100644
index 2c56e12..0000000
--- a/tests/support_tests.py
+++ /dev/null
@@ -1,1725 +0,0 @@
-"""Supporting definitions for the Python regression tests."""
-
-if __name__ != 'test.support':
- raise ImportError('support must be imported from the test package')
-
-import contextlib
-import errno
-import functools
-import gc
-import socket
-import sys
-import os
-import platform
-import shutil
-import warnings
-import unittest
-import importlib
-import collections.abc
-import re
-import subprocess
-import imp
-import time
-import sysconfig
-import fnmatch
-import logging.handlers
-import struct
-
-try:
- import _thread, threading
-except ImportError:
- _thread = None
- threading = None
-try:
- import multiprocessing.process
-except ImportError:
- multiprocessing = None
-
-try:
- import faulthandler
-except ImportError:
- faulthandler = None
-
-try:
- import zlib
-except ImportError:
- zlib = None
-
-__all__ = [
- "Error", "TestFailed", "ResourceDenied", "import_module",
- "verbose", "use_resources", "max_memuse", "record_original_stdout",
- "get_original_stdout", "unload", "unlink", "rmtree", "forget",
- "is_resource_enabled", "requires", "requires_freebsd_version",
- "requires_linux_version", "requires_mac_ver", "find_unused_port", "bind_port",
- "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd",
- "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource",
- "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource",
- "captured_stdout", "captured_stdin", "captured_stderr", "time_out",
- "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask',
- "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest",
- "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
- "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail",
- "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
- "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
- "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
- "anticipate_failure"
- ]
-
-class Error(Exception):
- """Base class for regression test exceptions."""
-
-class TestFailed(Error):
- """Test failed."""
-
-class ResourceDenied(unittest.SkipTest):
- """Test skipped because it requested a disallowed resource.
-
- This is raised when a test calls requires() for a resource that
- has not be enabled. It is used to distinguish between expected
- and unexpected skips.
- """
-
-@contextlib.contextmanager
-def _ignore_deprecated_imports(ignore=True):
- """Context manager to suppress package and module deprecation
- warnings when importing them.
-
- If ignore is False, this context manager has no effect."""
- if ignore:
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", ".+ (module|package)",
- DeprecationWarning)
- yield
- else:
- yield
-
-
-def import_module(name, deprecated=False):
- """Import and return the module to be tested, raising SkipTest if
- it is not available.
-
- If deprecated is True, any module or package deprecation messages
- will be suppressed."""
- with _ignore_deprecated_imports(deprecated):
- try:
- return importlib.import_module(name)
- except ImportError as msg:
- raise unittest.SkipTest(str(msg))
-
-
-def _save_and_remove_module(name, orig_modules):
- """Helper function to save and remove a module from sys.modules
-
- Raise ImportError if the module can't be imported."""
- # try to import the module and raise an error if it can't be imported
- if name not in sys.modules:
- __import__(name)
- del sys.modules[name]
- for modname in list(sys.modules):
- if modname == name or modname.startswith(name + '.'):
- orig_modules[modname] = sys.modules[modname]
- del sys.modules[modname]
-
-def _save_and_block_module(name, orig_modules):
- """Helper function to save and block a module in sys.modules
-
- Return True if the module was in sys.modules, False otherwise."""
- saved = True
- try:
- orig_modules[name] = sys.modules[name]
- except KeyError:
- saved = False
- sys.modules[name] = None
- return saved
-
-
-def anticipate_failure(condition):
- """Decorator to mark a test that is known to be broken in some cases
-
- Any use of this decorator should have a comment identifying the
- associated tracker issue.
- """
- if condition:
- return unittest.expectedFailure
- return lambda f: f
-
-
-def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
- """Imports and returns a module, deliberately bypassing the sys.modules cache
- and importing a fresh copy of the module. Once the import is complete,
- the sys.modules cache is restored to its original state.
-
- Modules named in fresh are also imported anew if needed by the import.
- If one of these modules can't be imported, None is returned.
-
- Importing of modules named in blocked is prevented while the fresh import
- takes place.
-
- If deprecated is True, any module or package deprecation messages
- will be suppressed."""
- # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
- # to make sure that this utility function is working as expected
- with _ignore_deprecated_imports(deprecated):
- # Keep track of modules saved for later restoration as well
- # as those which just need a blocking entry removed
- orig_modules = {}
- names_to_remove = []
- _save_and_remove_module(name, orig_modules)
- try:
- for fresh_name in fresh:
- _save_and_remove_module(fresh_name, orig_modules)
- for blocked_name in blocked:
- if not _save_and_block_module(blocked_name, orig_modules):
- names_to_remove.append(blocked_name)
- fresh_module = importlib.import_module(name)
- except ImportError:
- fresh_module = None
- finally:
- for orig_name, module in orig_modules.items():
- sys.modules[orig_name] = module
- for name_to_remove in names_to_remove:
- del sys.modules[name_to_remove]
- return fresh_module
-
-
-def get_attribute(obj, name):
- """Get an attribute, raising SkipTest if AttributeError is raised."""
- try:
- attribute = getattr(obj, name)
- except AttributeError:
- raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
- else:
- return attribute
-
-verbose = 1 # Flag set to 0 by regrtest.py
-use_resources = None # Flag set to [] by regrtest.py
-max_memuse = 0 # Disable bigmem tests (they will still be run with
- # small sizes, to make sure they work.)
-real_max_memuse = 0
-failfast = False
-match_tests = None
-
-# _original_stdout is meant to hold stdout at the time regrtest began.
-# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
-# The point is to have some flavor of stdout the user can actually see.
-_original_stdout = None
-def record_original_stdout(stdout):
- global _original_stdout
- _original_stdout = stdout
-
-def get_original_stdout():
- return _original_stdout or sys.stdout
-
-def unload(name):
- try:
- del sys.modules[name]
- except KeyError:
- pass
-
-def unlink(filename):
- try:
- os.unlink(filename)
- except OSError as error:
- # The filename need not exist.
- if error.errno not in (errno.ENOENT, errno.ENOTDIR):
- raise
-
-def rmtree(path):
- try:
- shutil.rmtree(path)
- except OSError as error:
- if error.errno != errno.ENOENT:
- raise
-
-def make_legacy_pyc(source):
- """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
-
- The choice of .pyc or .pyo extension is done based on the __debug__ flag
- value.
-
- :param source: The file system path to the source file. The source file
- does not need to exist, however the PEP 3147 pyc file must exist.
- :return: The file system path to the legacy pyc file.
- """
- pyc_file = imp.cache_from_source(source)
- up_one = os.path.dirname(os.path.abspath(source))
- legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
- os.rename(pyc_file, legacy_pyc)
- return legacy_pyc
-
-def forget(modname):
- """'Forget' a module was ever imported.
-
- This removes the module from sys.modules and deletes any PEP 3147 or
- legacy .pyc and .pyo files.
- """
- unload(modname)
- for dirname in sys.path:
- source = os.path.join(dirname, modname + '.py')
- # It doesn't matter if they exist or not, unlink all possible
- # combinations of PEP 3147 and legacy pyc and pyo files.
- unlink(source + 'c')
- unlink(source + 'o')
- unlink(imp.cache_from_source(source, debug_override=True))
- unlink(imp.cache_from_source(source, debug_override=False))
-
-# On some platforms, should not run gui test even if it is allowed
-# in `use_resources'.
-if sys.platform.startswith('win'):
- import ctypes
- import ctypes.wintypes
- def _is_gui_available():
- UOI_FLAGS = 1
- WSF_VISIBLE = 0x0001
- class USEROBJECTFLAGS(ctypes.Structure):
- _fields_ = [("fInherit", ctypes.wintypes.BOOL),
- ("fReserved", ctypes.wintypes.BOOL),
- ("dwFlags", ctypes.wintypes.DWORD)]
- dll = ctypes.windll.user32
- h = dll.GetProcessWindowStation()
- if not h:
- raise ctypes.WinError()
- uof = USEROBJECTFLAGS()
- needed = ctypes.wintypes.DWORD()
- res = dll.GetUserObjectInformationW(h,
- UOI_FLAGS,
- ctypes.byref(uof),
- ctypes.sizeof(uof),
- ctypes.byref(needed))
- if not res:
- raise ctypes.WinError()
- return bool(uof.dwFlags & WSF_VISIBLE)
-else:
- def _is_gui_available():
- return True
-
-def is_resource_enabled(resource):
- """Test whether a resource is enabled. Known resources are set by
- regrtest.py."""
- return use_resources is not None and resource in use_resources
-
-def requires(resource, msg=None):
- """Raise ResourceDenied if the specified resource is not available.
-
- If the caller's module is __main__ then automatically return True. The
- possibility of False being returned occurs when regrtest.py is
- executing.
- """
- if resource == 'gui' and not _is_gui_available():
- raise unittest.SkipTest("Cannot use the 'gui' resource")
- # see if the caller's module is __main__ - if so, treat as if
- # the resource was set
- if sys._getframe(1).f_globals.get("__name__") == "__main__":
- return
- if not is_resource_enabled(resource):
- if msg is None:
- msg = "Use of the %r resource not enabled" % resource
- raise ResourceDenied(msg)
-
-def _requires_unix_version(sysname, min_version):
- """Decorator raising SkipTest if the OS is `sysname` and the version is less
- than `min_version`.
-
- For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
- the FreeBSD version is less than 7.2.
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kw):
- if platform.system() == sysname:
- version_txt = platform.release().split('-', 1)[0]
- try:
- version = tuple(map(int, version_txt.split('.')))
- except ValueError:
- pass
- else:
- if version < min_version:
- min_version_txt = '.'.join(map(str, min_version))
- raise unittest.SkipTest(
- "%s version %s or higher required, not %s"
- % (sysname, min_version_txt, version_txt))
- return wrapper
- return decorator
-
-def requires_freebsd_version(*min_version):
- """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
- less than `min_version`.
-
- For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
- version is less than 7.2.
- """
- return _requires_unix_version('FreeBSD', min_version)
-
-def requires_linux_version(*min_version):
- """Decorator raising SkipTest if the OS is Linux and the Linux version is
- less than `min_version`.
-
- For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
- version is less than 2.6.32.
- """
- return _requires_unix_version('Linux', min_version)
-
-def requires_mac_ver(*min_version):
- """Decorator raising SkipTest if the OS is Mac OS X and the OS X
- version if less than min_version.
-
- For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
- is lesser than 10.5.
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kw):
- if sys.platform == 'darwin':
- version_txt = platform.mac_ver()[0]
- try:
- version = tuple(map(int, version_txt.split('.')))
- except ValueError:
- pass
- else:
- if version < min_version:
- min_version_txt = '.'.join(map(str, min_version))
- raise unittest.SkipTest(
- "Mac OS X %s or higher required, not %s"
- % (min_version_txt, version_txt))
- return func(*args, **kw)
- wrapper.min_version = min_version
- return wrapper
- return decorator
-
-
-HOST = 'localhost'
-
-def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
- """Returns an unused port that should be suitable for binding. This is
- achieved by creating a temporary socket with the same family and type as
- the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
- the specified host address (defaults to 0.0.0.0) with the port set to 0,
- eliciting an unused ephemeral port from the OS. The temporary socket is
- then closed and deleted, and the ephemeral port is returned.
-
- Either this method or bind_port() should be used for any tests where a
- server socket needs to be bound to a particular port for the duration of
- the test. Which one to use depends on whether the calling code is creating
- a python socket, or if an unused port needs to be provided in a constructor
- or passed to an external program (i.e. the -accept argument to openssl's
- s_server mode). Always prefer bind_port() over find_unused_port() where
- possible. Hard coded ports should *NEVER* be used. As soon as a server
- socket is bound to a hard coded port, the ability to run multiple instances
- of the test simultaneously on the same host is compromised, which makes the
- test a ticking time bomb in a buildbot environment. On Unix buildbots, this
- may simply manifest as a failed test, which can be recovered from without
- intervention in most cases, but on Windows, the entire python process can
- completely and utterly wedge, requiring someone to log in to the buildbot
- and manually kill the affected process.
-
- (This is easy to reproduce on Windows, unfortunately, and can be traced to
- the SO_REUSEADDR socket option having different semantics on Windows versus
- Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
- listen and then accept connections on identical host/ports. An EADDRINUSE
- socket.error will be raised at some point (depending on the platform and
- the order bind and listen were called on each socket).
-
- However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
- will ever be raised when attempting to bind two identical host/ports. When
- accept() is called on each socket, the second caller's process will steal
- the port from the first caller, leaving them both in an awkwardly wedged
- state where they'll no longer respond to any signals or graceful kills, and
- must be forcibly killed via OpenProcess()/TerminateProcess().
-
- The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
- instead of SO_REUSEADDR, which effectively affords the same semantics as
- SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
- Source world compared to Windows ones, this is a common mistake. A quick
- look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
- openssl.exe is called with the 's_server' option, for example. See
- http://bugs.python.org/issue2550 for more info. The following site also
- has a very thorough description about the implications of both REUSEADDR
- and EXCLUSIVEADDRUSE on Windows:
- http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
-
- XXX: although this approach is a vast improvement on previous attempts to
- elicit unused ports, it rests heavily on the assumption that the ephemeral
- port returned to us by the OS won't immediately be dished back out to some
- other process when we close and delete our temporary socket but before our
- calling code has a chance to bind the returned port. We can deal with this
- issue if/when we come across it.
- """
-
- tempsock = socket.socket(family, socktype)
- port = bind_port(tempsock)
- tempsock.close()
- del tempsock
- return port
-
-def bind_port(sock, host=HOST):
- """Bind the socket to a free port and return the port number. Relies on
- ephemeral ports in order to ensure we are using an unbound port. This is
- important as many tests may be running simultaneously, especially in a
- buildbot environment. This method raises an exception if the sock.family
- is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
- or SO_REUSEPORT set on it. Tests should *never* set these socket options
- for TCP/IP sockets. The only case for setting these options is testing
- multicasting via multiple UDP sockets.
-
- Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
- on Windows), it will be set on the socket. This will prevent anyone else
- from bind()'ing to our host/port for the duration of the test.
- """
-
- if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
- if hasattr(socket, 'SO_REUSEADDR'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
- raise TestFailed("tests should never set the SO_REUSEADDR " \
- "socket option on TCP/IP sockets!")
- if hasattr(socket, 'SO_REUSEPORT'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
- raise TestFailed("tests should never set the SO_REUSEPORT " \
- "socket option on TCP/IP sockets!")
- if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
- sock.bind((host, 0))
- port = sock.getsockname()[1]
- return port
-
-def _is_ipv6_enabled():
- """Check whether IPv6 is enabled on this host."""
- if socket.has_ipv6:
- try:
- sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- sock.bind(('::1', 0))
- except (socket.error, socket.gaierror):
- pass
- else:
- sock.close()
- return True
- return False
-
-IPV6_ENABLED = _is_ipv6_enabled()
-
-
-# A constant likely larger than the underlying OS pipe buffer size.
-# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe
-# buffer size: take 1M to be sure.
-PIPE_MAX_SIZE = 1024 * 1024
-
-
-# decorator for skipping tests on non-IEEE 754 platforms
-requires_IEEE_754 = unittest.skipUnless(
- float.__getformat__("double").startswith("IEEE"),
- "test requires IEEE 754 doubles")
-
-requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
-
-is_jython = sys.platform.startswith('java')
-
-# Filename used for testing
-if os.name == 'java':
- # Jython disallows @ in module names
- TESTFN = '$test'
-else:
- TESTFN = '@test'
-
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-
-
-# TESTFN_UNICODE is a non-ascii filename
-TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
-if sys.platform == 'darwin':
- # In Mac OS X's VFS API file names are, by definition, canonically
- # decomposed Unicode, encoded using UTF-8. See QA1173:
- # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
- import unicodedata
- TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
-TESTFN_ENCODING = sys.getfilesystemencoding()
-
-# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
-# encoded by the filesystem encoding (in strict mode). It can be None if we
-# cannot generate such filename.
-TESTFN_UNENCODABLE = None
-if os.name in ('nt', 'ce'):
- # skip win32s (0) or Windows 9x/ME (1)
- if sys.getwindowsversion().platform >= 2:
- # Different kinds of characters from various languages to minimize the
- # probability that the whole name is encodable to MBCS (issue #9819)
- TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
- try:
- TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
- except UnicodeEncodeError:
- pass
- else:
- print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
- 'Unicode filename tests may not be effective'
- % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
- TESTFN_UNENCODABLE = None
-# Mac OS X denies unencodable filenames (invalid utf-8)
-elif sys.platform != 'darwin':
- try:
- # ascii and utf-8 cannot encode the byte 0xff
- b'\xff'.decode(TESTFN_ENCODING)
- except UnicodeDecodeError:
- # 0xff will be encoded using the surrogate character u+DCFF
- TESTFN_UNENCODABLE = TESTFN \
- + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
- else:
- # File system encoding (eg. ISO-8859-* encodings) can encode
- # the byte 0xff. Skip some unicode filename tests.
- pass
-
-# Save the initial cwd
-SAVEDCWD = os.getcwd()
-
-@contextlib.contextmanager
-def temp_cwd(name='tempcwd', quiet=False, path=None):
- """
- Context manager that temporarily changes the CWD.
-
- An existing path may be provided as *path*, in which case this
- function makes no changes to the file system.
-
- Otherwise, the new CWD is created in the current directory and it's
- named *name*. If *quiet* is False (default) and it's not possible to
- create or change the CWD, an error is raised. If it's True, only a
- warning is raised and the original CWD is used.
- """
- saved_dir = os.getcwd()
- is_temporary = False
- if path is None:
- path = name
- try:
- os.mkdir(name)
- is_temporary = True
- except OSError:
- if not quiet:
- raise
- warnings.warn('tests may fail, unable to create temp CWD ' + name,
- RuntimeWarning, stacklevel=3)
- try:
- os.chdir(path)
- except OSError:
- if not quiet:
- raise
- warnings.warn('tests may fail, unable to change the CWD to ' + name,
- RuntimeWarning, stacklevel=3)
- try:
- yield os.getcwd()
- finally:
- os.chdir(saved_dir)
- if is_temporary:
- rmtree(name)
-
-
-if hasattr(os, "umask"):
- @contextlib.contextmanager
- def temp_umask(umask):
- """Context manager that temporarily sets the process umask."""
- oldmask = os.umask(umask)
- try:
- yield
- finally:
- os.umask(oldmask)
-
-
-def findfile(file, here=__file__, subdir=None):
- """Try to find a file on sys.path and the working directory. If it is not
- found the argument passed to the function is returned (this does not
- necessarily signal failure; could still be the legitimate path)."""
- if os.path.isabs(file):
- return file
- if subdir is not None:
- file = os.path.join(subdir, file)
- path = sys.path
- path = [os.path.dirname(here)] + path
- for dn in path:
- fn = os.path.join(dn, file)
- if os.path.exists(fn): return fn
- return file
-
-def create_empty_file(filename):
- """Create an empty file. If the file already exists, truncate it."""
- fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
- os.close(fd)
-
-def sortdict(dict):
- "Like repr(dict), but in sorted order."
- items = sorted(dict.items())
- reprpairs = ["%r: %r" % pair for pair in items]
- withcommas = ", ".join(reprpairs)
- return "{%s}" % withcommas
-
-def make_bad_fd():
- """
- Create an invalid file descriptor by opening and closing a file and return
- its fd.
- """
- file = open(TESTFN, "wb")
- try:
- return file.fileno()
- finally:
- file.close()
- unlink(TESTFN)
-
-def check_syntax_error(testcase, statement):
- testcase.assertRaises(SyntaxError, compile, statement,
- '<test string>', 'exec')
-
-def open_urlresource(url, *args, **kw):
- import urllib.request, urllib.parse
-
- check = kw.pop('check', None)
-
- filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
-
- fn = os.path.join(os.path.dirname(__file__), "data", filename)
-
- def check_valid_file(fn):
- f = open(fn, *args, **kw)
- if check is None:
- return f
- elif check(f):
- f.seek(0)
- return f
- f.close()
-
- if os.path.exists(fn):
- f = check_valid_file(fn)
- if f is not None:
- return f
- unlink(fn)
-
- # Verify the requirement before downloading the file
- requires('urlfetch')
-
- print('\tfetching %s ...' % url, file=get_original_stdout())
- f = urllib.request.urlopen(url, timeout=15)
- try:
- with open(fn, "wb") as out:
- s = f.read()
- while s:
- out.write(s)
- s = f.read()
- finally:
- f.close()
-
- f = check_valid_file(fn)
- if f is not None:
- return f
- raise TestFailed('invalid resource %r' % fn)
-
-
-class WarningsRecorder(object):
- """Convenience wrapper for the warnings list returned on
- entry to the warnings.catch_warnings() context manager.
- """
- def __init__(self, warnings_list):
- self._warnings = warnings_list
- self._last = 0
-
- def __getattr__(self, attr):
- if len(self._warnings) > self._last:
- return getattr(self._warnings[-1], attr)
- elif attr in warnings.WarningMessage._WARNING_DETAILS:
- return None
- raise AttributeError("%r has no attribute %r" % (self, attr))
-
- @property
- def warnings(self):
- return self._warnings[self._last:]
-
- def reset(self):
- self._last = len(self._warnings)
-
-
-def _filterwarnings(filters, quiet=False):
- """Catch the warnings, then check if all the expected
- warnings have been raised and re-raise unexpected warnings.
- If 'quiet' is True, only re-raise the unexpected warnings.
- """
- # Clear the warning registry of the calling module
- # in order to re-raise the warnings.
- frame = sys._getframe(2)
- registry = frame.f_globals.get('__warningregistry__')
- if registry:
- registry.clear()
- with warnings.catch_warnings(record=True) as w:
- # Set filter "always" to record all warnings. Because
- # test_warnings swap the module, we need to look up in
- # the sys.modules dictionary.
- sys.modules['warnings'].simplefilter("always")
- yield WarningsRecorder(w)
- # Filter the recorded warnings
- reraise = list(w)
- missing = []
- for msg, cat in filters:
- seen = False
- for w in reraise[:]:
- warning = w.message
- # Filter out the matching messages
- if (re.match(msg, str(warning), re.I) and
- issubclass(warning.__class__, cat)):
- seen = True
- reraise.remove(w)
- if not seen and not quiet:
- # This filter caught nothing
- missing.append((msg, cat.__name__))
- if reraise:
- raise AssertionError("unhandled warning %s" % reraise[0])
- if missing:
- raise AssertionError("filter (%r, %s) did not catch any warning" %
- missing[0])
-
-
-@contextlib.contextmanager
-def check_warnings(*filters, **kwargs):
- """Context manager to silence warnings.
-
- Accept 2-tuples as positional arguments:
- ("message regexp", WarningCategory)
-
- Optional argument:
- - if 'quiet' is True, it does not fail if a filter catches nothing
- (default True without argument,
- default False if some filters are defined)
-
- Without argument, it defaults to:
- check_warnings(("", Warning), quiet=True)
- """
- quiet = kwargs.get('quiet')
- if not filters:
- filters = (("", Warning),)
- # Preserve backward compatibility
- if quiet is None:
- quiet = True
- return _filterwarnings(filters, quiet)
-
-
-class CleanImport(object):
- """Context manager to force import to return a new module reference.
-
- This is useful for testing module-level behaviours, such as
- the emission of a DeprecationWarning on import.
-
- Use like this:
-
- with CleanImport("foo"):
- importlib.import_module("foo") # new reference
- """
-
- def __init__(self, *module_names):
- self.original_modules = sys.modules.copy()
- for module_name in module_names:
- if module_name in sys.modules:
- module = sys.modules[module_name]
- # It is possible that module_name is just an alias for
- # another module (e.g. stub for modules renamed in 3.x).
- # In that case, we also need delete the real module to clear
- # the import cache.
- if module.__name__ != module_name:
- del sys.modules[module.__name__]
- del sys.modules[module_name]
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- sys.modules.update(self.original_modules)
-
-
-class EnvironmentVarGuard(collections.abc.MutableMapping):
-
- """Class to help protect the environment variable properly. Can be used as
- a context manager."""
-
- def __init__(self):
- self._environ = os.environ
- self._changed = {}
-
- def __getitem__(self, envvar):
- return self._environ[envvar]
-
- def __setitem__(self, envvar, value):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- self._environ[envvar] = value
-
- def __delitem__(self, envvar):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- if envvar in self._environ:
- del self._environ[envvar]
-
- def keys(self):
- return self._environ.keys()
-
- def __iter__(self):
- return iter(self._environ)
-
- def __len__(self):
- return len(self._environ)
-
- def set(self, envvar, value):
- self[envvar] = value
-
- def unset(self, envvar):
- del self[envvar]
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- for (k, v) in self._changed.items():
- if v is None:
- if k in self._environ:
- del self._environ[k]
- else:
- self._environ[k] = v
- os.environ = self._environ
-
-
-class DirsOnSysPath(object):
- """Context manager to temporarily add directories to sys.path.
-
- This makes a copy of sys.path, appends any directories given
- as positional arguments, then reverts sys.path to the copied
- settings when the context ends.
-
- Note that *all* sys.path modifications in the body of the
- context manager, including replacement of the object,
- will be reverted at the end of the block.
- """
-
- def __init__(self, *paths):
- self.original_value = sys.path[:]
- self.original_object = sys.path
- sys.path.extend(paths)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- sys.path = self.original_object
- sys.path[:] = self.original_value
-
-
-class TransientResource(object):
-
- """Raise ResourceDenied if an exception is raised while the context manager
- is in effect that matches the specified exception and attributes."""
-
- def __init__(self, exc, **kwargs):
- self.exc = exc
- self.attrs = kwargs
-
- def __enter__(self):
- return self
-
- def __exit__(self, type_=None, value=None, traceback=None):
- """If type_ is a subclass of self.exc and value has attributes matching
- self.attrs, raise ResourceDenied. Otherwise let the exception
- propagate (if any)."""
- if type_ is not None and issubclass(self.exc, type_):
- for attr, attr_value in self.attrs.items():
- if not hasattr(value, attr):
- break
- if getattr(value, attr) != attr_value:
- break
- else:
- raise ResourceDenied("an optional resource is not available")
-
-# Context managers that raise ResourceDenied when various issues
-# with the Internet connection manifest themselves as exceptions.
-# XXX deprecate these and use transient_internet() instead
-time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
-socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
-ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
-
-
-@contextlib.contextmanager
-def transient_internet(resource_name, *, timeout=30.0, errnos=()):
- """Return a context manager that raises ResourceDenied when various issues
- with the Internet connection manifest themselves as exceptions."""
- default_errnos = [
- ('ECONNREFUSED', 111),
- ('ECONNRESET', 104),
- ('EHOSTUNREACH', 113),
- ('ENETUNREACH', 101),
- ('ETIMEDOUT', 110),
- ]
- default_gai_errnos = [
- ('EAI_AGAIN', -3),
- ('EAI_FAIL', -4),
- ('EAI_NONAME', -2),
- ('EAI_NODATA', -5),
- # Encountered when trying to resolve IPv6-only hostnames
- ('WSANO_DATA', 11004),
- ]
-
- denied = ResourceDenied("Resource %r is not available" % resource_name)
- captured_errnos = errnos
- gai_errnos = []
- if not captured_errnos:
- captured_errnos = [getattr(errno, name, num)
- for (name, num) in default_errnos]
- gai_errnos = [getattr(socket, name, num)
- for (name, num) in default_gai_errnos]
-
- def filter_error(err):
- n = getattr(err, 'errno', None)
- if (isinstance(err, socket.timeout) or
- (isinstance(err, socket.gaierror) and n in gai_errnos) or
- n in captured_errnos):
- if not verbose:
- sys.stderr.write(denied.args[0] + "\n")
- raise denied from err
-
- old_timeout = socket.getdefaulttimeout()
- try:
- if timeout is not None:
- socket.setdefaulttimeout(timeout)
- yield
- except IOError as err:
- # urllib can wrap original socket errors multiple times (!), we must
- # unwrap to get at the original error.
- while True:
- a = err.args
- if len(a) >= 1 and isinstance(a[0], IOError):
- err = a[0]
- # The error can also be wrapped as args[1]:
- # except socket.error as msg:
- # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
- elif len(a) >= 2 and isinstance(a[1], IOError):
- err = a[1]
- else:
- break
- filter_error(err)
- raise
- # XXX should we catch generic exceptions and look for their
- # __cause__ or __context__?
- finally:
- socket.setdefaulttimeout(old_timeout)
-
-
-@contextlib.contextmanager
-def captured_output(stream_name):
- """Return a context manager used by captured_stdout/stdin/stderr
- that temporarily replaces the sys stream *stream_name* with a StringIO."""
- import io
- orig_stdout = getattr(sys, stream_name)
- setattr(sys, stream_name, io.StringIO())
- try:
- yield getattr(sys, stream_name)
- finally:
- setattr(sys, stream_name, orig_stdout)
-
-def captured_stdout():
- """Capture the output of sys.stdout:
-
- with captured_stdout() as s:
- print("hello")
- self.assertEqual(s.getvalue(), "hello")
- """
- return captured_output("stdout")
-
-def captured_stderr():
- return captured_output("stderr")
-
-def captured_stdin():
- return captured_output("stdin")
-
-
-def gc_collect():
- """Force as many objects as possible to be collected.
-
- In non-CPython implementations of Python, this is needed because timely
- deallocation is not guaranteed by the garbage collector. (Even in CPython
- this can be the case in case of reference cycles.) This means that __del__
- methods may be called later than expected and weakrefs may remain alive for
- longer than expected. This function tries its best to force all garbage
- objects to disappear.
- """
- gc.collect()
- if is_jython:
- time.sleep(0.1)
- gc.collect()
- gc.collect()
-
-@contextlib.contextmanager
-def disable_gc():
- have_gc = gc.isenabled()
- gc.disable()
- try:
- yield
- finally:
- if have_gc:
- gc.enable()
-
-
-def python_is_optimized():
- """Find if Python was built with optimizations."""
- cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
- final_opt = ""
- for opt in cflags.split():
- if opt.startswith('-O'):
- final_opt = opt
- return final_opt != '' and final_opt != '-O0'
-
-
-#=======================================================================
-# Decorator for running a function in a different locale, correctly resetting
-# it afterwards.
-
-def run_with_locale(catstr, *locales):
- def decorator(func):
- def inner(*args, **kwds):
- try:
- import locale
- category = getattr(locale, catstr)
- orig_locale = locale.setlocale(category)
- except AttributeError:
- # if the test author gives us an invalid category string
- raise
- except:
- # cannot retrieve original locale, so do nothing
- locale = orig_locale = None
- else:
- for loc in locales:
- try:
- locale.setlocale(category, loc)
- break
- except:
- pass
-
- # now run the function, resetting the locale on exceptions
- try:
- return func(*args, **kwds)
- finally:
- if locale and orig_locale:
- locale.setlocale(category, orig_locale)
- inner.__name__ = func.__name__
- inner.__doc__ = func.__doc__
- return inner
- return decorator
-
-#=======================================================================
-# Big-memory-test support. Separate from 'resources' because memory use
-# should be configurable.
-
-# Some handy shorthands. Note that these are used for byte-limits as well
-# as size-limits, in the various bigmem tests
-_1M = 1024*1024
-_1G = 1024 * _1M
-_2G = 2 * _1G
-_4G = 4 * _1G
-
-MAX_Py_ssize_t = sys.maxsize
-
-def set_memlimit(limit):
- global max_memuse
- global real_max_memuse
- sizes = {
- 'k': 1024,
- 'm': _1M,
- 'g': _1G,
- 't': 1024*_1G,
- }
- m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
- re.IGNORECASE | re.VERBOSE)
- if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
- if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
- max_memuse = memlimit
-
-class _MemoryWatchdog:
- """An object which periodically watches the process' memory consumption
- and prints it out.
- """
-
- def __init__(self):
- self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
- self.started = False
- self.thread = None
- try:
- self.page_size = os.sysconf('SC_PAGESIZE')
- except (ValueError, AttributeError):
- try:
- self.page_size = os.sysconf('SC_PAGE_SIZE')
- except (ValueError, AttributeError):
- self.page_size = 4096
-
- def consumer(self, fd):
- HEADER = "l"
- header_size = struct.calcsize(HEADER)
- try:
- while True:
- header = os.read(fd, header_size)
- if len(header) < header_size:
- # Pipe closed on other end
- break
- data_len, = struct.unpack(HEADER, header)
- data = os.read(fd, data_len)
- statm = data.decode('ascii')
- data = int(statm.split()[5])
- print(" ... process data size: {data:.1f}G"
- .format(data=data * self.page_size / (1024 ** 3)))
- finally:
- os.close(fd)
-
- def start(self):
- if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
- return
- try:
- rfd = os.open(self.procfile, os.O_RDONLY)
- except OSError as e:
- warnings.warn('/proc not available for stats: {}'.format(e),
- RuntimeWarning)
- sys.stderr.flush()
- return
- pipe_fd, wfd = os.pipe()
- # _file_watchdog() doesn't take the GIL in its child thread, and
- # therefore collects statistics timely
- faulthandler._file_watchdog(rfd, wfd, 1.0)
- self.started = True
- self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
- self.thread.daemon = True
- self.thread.start()
-
- def stop(self):
- if not self.started:
- return
- faulthandler._cancel_file_watchdog()
- self.thread.join()
-
-
-def bigmemtest(size, memuse, dry_run=True):
- """Decorator for bigmem tests.
-
- 'minsize' is the minimum useful size for the test (in arbitrary,
- test-interpreted units.) 'memuse' is the number of 'bytes per size' for
- the test, or a good estimate of it.
-
- if 'dry_run' is False, it means the test doesn't support dummy runs
- when -M is not specified.
- """
- def decorator(f):
- def wrapper(self):
- size = wrapper.size
- memuse = wrapper.memuse
- if not real_max_memuse:
- maxsize = 5147
- else:
- maxsize = size
-
- if ((real_max_memuse or not dry_run)
- and real_max_memuse < maxsize * memuse):
- raise unittest.SkipTest(
- "not enough memory: %.1fG minimum needed"
- % (size * memuse / (1024 ** 3)))
-
- if real_max_memuse and verbose and faulthandler and threading:
- print()
- print(" ... expected peak memory use: {peak:.1f}G"
- .format(peak=size * memuse / (1024 ** 3)))
- watchdog = _MemoryWatchdog()
- watchdog.start()
- else:
- watchdog = None
-
- try:
- return f(self, maxsize)
- finally:
- if watchdog:
- watchdog.stop()
-
- wrapper.size = size
- wrapper.memuse = memuse
- return wrapper
- return decorator
-
-def bigaddrspacetest(f):
- """Decorator for tests that fill the address space."""
- def wrapper(self):
- if max_memuse < MAX_Py_ssize_t:
- if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
- raise unittest.SkipTest(
- "not enough memory: try a 32-bit build instead")
- else:
- raise unittest.SkipTest(
- "not enough memory: %.1fG minimum needed"
- % (MAX_Py_ssize_t / (1024 ** 3)))
- else:
- return f(self)
- return wrapper
-
-#=======================================================================
-# unittest integration.
-
-class BasicTestRunner:
- def run(self, test):
- result = unittest.TestResult()
- test(result)
- return result
-
-def _id(obj):
- return obj
-
-def requires_resource(resource):
- if resource == 'gui' and not _is_gui_available():
- return unittest.skip("resource 'gui' is not available")
- if is_resource_enabled(resource):
- return _id
- else:
- return unittest.skip("resource {0!r} is not enabled".format(resource))
-
-def cpython_only(test):
- """
- Decorator for tests only applicable on CPython.
- """
- return impl_detail(cpython=True)(test)
-
-def impl_detail(msg=None, **guards):
- if check_impl_detail(**guards):
- return _id
- if msg is None:
- guardnames, default = _parse_guards(guards)
- if default:
- msg = "implementation detail not available on {0}"
- else:
- msg = "implementation detail specific to {0}"
- guardnames = sorted(guardnames.keys())
- msg = msg.format(' or '.join(guardnames))
- return unittest.skip(msg)
-
-def _parse_guards(guards):
- # Returns a tuple ({platform_name: run_me}, default_value)
- if not guards:
- return ({'cpython': True}, False)
- is_true = list(guards.values())[0]
- assert list(guards.values()) == [is_true] * len(guards) # all True or all False
- return (guards, not is_true)
-
-# Use the following check to guard CPython's implementation-specific tests --
-# or to run them only on the implementation(s) guarded by the arguments.
-def check_impl_detail(**guards):
- """This function returns True or False depending on the host platform.
- Examples:
- if check_impl_detail(): # only on CPython (default)
- if check_impl_detail(jython=True): # only on Jython
- if check_impl_detail(cpython=False): # everywhere except on CPython
- """
- guards, default = _parse_guards(guards)
- return guards.get(platform.python_implementation().lower(), default)
-
-
-def no_tracing(func):
- """Decorator to temporarily turn off tracing for the duration of a test."""
- if not hasattr(sys, 'gettrace'):
- return func
- else:
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- original_trace = sys.gettrace()
- try:
- sys.settrace(None)
- return func(*args, **kwargs)
- finally:
- sys.settrace(original_trace)
- return wrapper
-
-
-def refcount_test(test):
- """Decorator for tests which involve reference counting.
-
- To start, the decorator does not run the test if is not run by CPython.
- After that, any trace function is unset during the test to prevent
- unexpected refcounts caused by the trace function.
-
- """
- return no_tracing(cpython_only(test))
-
-
-def _filter_suite(suite, pred):
- """Recursively filter test cases in a suite based on a predicate."""
- newtests = []
- for test in suite._tests:
- if isinstance(test, unittest.TestSuite):
- _filter_suite(test, pred)
- newtests.append(test)
- else:
- if pred(test):
- newtests.append(test)
- suite._tests = newtests
-
-def _run_suite(suite):
- """Run tests from a unittest.TestSuite-derived class."""
- if verbose:
- runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
- failfast=failfast)
- else:
- runner = BasicTestRunner()
-
- result = runner.run(suite)
- if not result.wasSuccessful():
- if len(result.errors) == 1 and not result.failures:
- err = result.errors[0][1]
- elif len(result.failures) == 1 and not result.errors:
- err = result.failures[0][1]
- else:
- err = "multiple errors occurred"
- if not verbose: err += "; run in verbose mode for details"
- raise TestFailed(err)
-
-
-def run_unittest(*classes):
- """Run tests from unittest.TestCase-derived classes."""
- valid_types = (unittest.TestSuite, unittest.TestCase)
- suite = unittest.TestSuite()
- for cls in classes:
- if isinstance(cls, str):
- if cls in sys.modules:
- suite.addTest(unittest.findTestCases(sys.modules[cls]))
- else:
- raise ValueError("str arguments must be keys in sys.modules")
- elif isinstance(cls, valid_types):
- suite.addTest(cls)
- else:
- suite.addTest(unittest.makeSuite(cls))
- def case_pred(test):
- if match_tests is None:
- return True
- for name in test.id().split("."):
- if fnmatch.fnmatchcase(name, match_tests):
- return True
- return False
- _filter_suite(suite, case_pred)
- _run_suite(suite)
-
-
-#=======================================================================
-# doctest driver.
-
-def run_doctest(module, verbosity=None):
- """Run doctest on the given module. Return (#failures, #tests).
-
- If optional argument verbosity is not specified (or is None), pass
- support's belief about verbosity on to doctest. Else doctest's
- usual behavior is used (it searches sys.argv for -v).
- """
-
- import doctest
-
- if verbosity is None:
- verbosity = verbose
- else:
- verbosity = None
-
- f, t = doctest.testmod(module, verbose=verbosity)
- if f:
- raise TestFailed("%d of %d doctests failed" % (f, t))
- if verbose:
- print('doctest (%s) ... %d tests with zero failures' %
- (module.__name__, t))
- return f, t
-
-
-#=======================================================================
-# Support for saving and restoring the imported modules.
-
-def modules_setup():
- return sys.modules.copy(),
-
-def modules_cleanup(oldmodules):
- # Encoders/decoders are registered permanently within the internal
- # codec cache. If we destroy the corresponding modules their
- # globals will be set to None which will trip up the cached functions.
- encodings = [(k, v) for k, v in sys.modules.items()
- if k.startswith('encodings.')]
- sys.modules.clear()
- sys.modules.update(encodings)
- # XXX: This kind of problem can affect more than just encodings. In particular
- # extension modules (such as _ssl) don't cope with reloading properly.
- # Really, test modules should be cleaning out the test specific modules they
- # know they added (ala test_runpy) rather than relying on this function (as
- # test_importhooks and test_pkg do currently).
- # Implicitly imported *real* modules should be left alone (see issue 10556).
- sys.modules.update(oldmodules)
-
-#=======================================================================
-# Threading support to prevent reporting refleaks when running regrtest.py -R
-
-# NOTE: we use thread._count() rather than threading.enumerate() (or the
-# moral equivalent thereof) because a threading.Thread object is still alive
-# until its __bootstrap() method has returned, even after it has been
-# unregistered from the threading module.
-# thread._count(), on the other hand, only gets decremented *after* the
-# __bootstrap() method has returned, which gives us reliable reference counts
-# at the end of a test run.
-
-def threading_setup():
- if _thread:
- return _thread._count(), threading._dangling.copy()
- else:
- return 1, ()
-
-def threading_cleanup(*original_values):
- if not _thread:
- return
- _MAX_COUNT = 10
- for count in range(_MAX_COUNT):
- values = _thread._count(), threading._dangling
- if values == original_values:
- break
- time.sleep(0.1)
- gc_collect()
- # XXX print a warning in case of failure?
-
-def reap_threads(func):
- """Use this function when threads are being used. This will
- ensure that the threads are cleaned up even when the test fails.
- If threading is unavailable this function does nothing.
- """
- if not _thread:
- return func
-
- @functools.wraps(func)
- def decorator(*args):
- key = threading_setup()
- try:
- return func(*args)
- finally:
- threading_cleanup(*key)
- return decorator
-
-def reap_children():
- """Use this function at the end of test_main() whenever sub-processes
- are started. This will help ensure that no extra children (zombies)
- stick around to hog resources and create problems when looking
- for refleaks.
- """
-
- # Reap all our dead child processes so we don't leave zombies around.
- # These hog resources and might be causing some of the buildbots to die.
- if hasattr(os, 'waitpid'):
- any_process = -1
- while True:
- try:
- # This will raise an exception on Windows. That's ok.
- pid, status = os.waitpid(any_process, os.WNOHANG)
- if pid == 0:
- break
- except:
- break
-
-@contextlib.contextmanager
-def swap_attr(obj, attr, new_val):
- """Temporary swap out an attribute with a new object.
-
- Usage:
- with swap_attr(obj, "attr", 5):
- ...
-
- This will set obj.attr to 5 for the duration of the with: block,
- restoring the old value at the end of the block. If `attr` doesn't
- exist on `obj`, it will be created and then deleted at the end of the
- block.
- """
- if hasattr(obj, attr):
- real_val = getattr(obj, attr)
- setattr(obj, attr, new_val)
- try:
- yield
- finally:
- setattr(obj, attr, real_val)
- else:
- setattr(obj, attr, new_val)
- try:
- yield
- finally:
- delattr(obj, attr)
-
-@contextlib.contextmanager
-def swap_item(obj, item, new_val):
- """Temporary swap out an item with a new object.
-
- Usage:
- with swap_item(obj, "item", 5):
- ...
-
- This will set obj["item"] to 5 for the duration of the with: block,
- restoring the old value at the end of the block. If `item` doesn't
- exist on `obj`, it will be created and then deleted at the end of the
- block.
- """
- if item in obj:
- real_val = obj[item]
- obj[item] = new_val
- try:
- yield
- finally:
- obj[item] = real_val
- else:
- obj[item] = new_val
- try:
- yield
- finally:
- del obj[item]
-
-def strip_python_stderr(stderr):
- """Strip the stderr of a Python process from potential debug output
- emitted by the interpreter.
-
- This will typically be run on the result of the communicate() method
- of a subprocess.Popen object.
- """
- stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
- return stderr
-
-def args_from_interpreter_flags():
- """Return a list of command-line arguments reproducing the current
- settings in sys.flags and sys.warnoptions."""
- flag_opt_map = {
- 'bytes_warning': 'b',
- 'dont_write_bytecode': 'B',
- 'ignore_environment': 'E',
- 'no_user_site': 's',
- 'no_site': 'S',
- 'optimize': 'O',
- 'verbose': 'v',
- }
- args = []
- for flag, opt in flag_opt_map.items():
- v = getattr(sys.flags, flag)
- if v > 0:
- args.append('-' + opt * v)
- for opt in sys.warnoptions:
- args.append('-W' + opt)
- return args
-
-#============================================================
-# Support for assertions about logging.
-#============================================================
-
-class TestHandler(logging.handlers.BufferingHandler):
- def __init__(self, matcher):
- # BufferingHandler takes a "capacity" argument
- # so as to know when to flush. As we're overriding
- # shouldFlush anyway, we can set a capacity of zero.
- # You can call flush() manually to clear out the
- # buffer.
- logging.handlers.BufferingHandler.__init__(self, 0)
- self.matcher = matcher
-
- def shouldFlush(self):
- return False
-
- def emit(self, record):
- self.format(record)
- self.buffer.append(record.__dict__)
-
- def matches(self, **kwargs):
- """
- Look for a saved dict whose keys/values match the supplied arguments.
- """
- result = False
- for d in self.buffer:
- if self.matcher.matches(d, **kwargs):
- result = True
- break
- return result
-
-class Matcher(object):
-
- _partial_matches = ('msg', 'message')
-
- def matches(self, d, **kwargs):
- """
- Try to match a single dict with the supplied arguments.
-
- Keys whose values are strings and which are in self._partial_matches
- will be checked for partial (i.e. substring) matches. You can extend
- this scheme to (for example) do regular expression matching, etc.
- """
- result = True
- for k in kwargs:
- v = kwargs[k]
- dv = d.get(k)
- if not self.match_value(k, dv, v):
- result = False
- break
- return result
-
- def match_value(self, k, dv, v):
- """
- Try to match a single stored value (dv) with a supplied value (v).
- """
- if type(v) != type(dv):
- result = False
- elif type(dv) is not str or k not in self._partial_matches:
- result = (v == dv)
- else:
- result = dv.find(v) >= 0
- return result
-
-
-_can_symlink = None
-def can_symlink():
- global _can_symlink
- if _can_symlink is not None:
- return _can_symlink
- symlink_path = TESTFN + "can_symlink"
- try:
- os.symlink(TESTFN, symlink_path)
- can = True
- except (OSError, NotImplementedError, AttributeError):
- can = False
- else:
- os.remove(symlink_path)
- _can_symlink = can
- return can
-
-def skip_unless_symlink(test):
- """Skip decorator for tests that require functional symlink"""
- ok = can_symlink()
- msg = "Requires functional symlink implementation"
- return test if ok else unittest.skip(msg)(test)
-
-def patch(test_instance, object_to_patch, attr_name, new_value):
- """Override 'object_to_patch'.'attr_name' with 'new_value'.
-
- Also, add a cleanup procedure to 'test_instance' to restore
- 'object_to_patch' value for 'attr_name'.
- The 'attr_name' should be a valid attribute for 'object_to_patch'.
-
- """
- # check that 'attr_name' is a real attribute for 'object_to_patch'
- # will raise AttributeError if it does not exist
- getattr(object_to_patch, attr_name)
-
- # keep a copy of the old value
- attr_is_local = False
- try:
- old_value = object_to_patch.__dict__[attr_name]
- except (AttributeError, KeyError):
- old_value = getattr(object_to_patch, attr_name, None)
- else:
- attr_is_local = True
-
- # restore the value when the test is done
- def cleanup():
- if attr_is_local:
- setattr(object_to_patch, attr_name, old_value)
- else:
- delattr(object_to_patch, attr_name)
-
- test_instance.addCleanup(cleanup)
-
- # actually override the attribute
- setattr(object_to_patch, attr_name, new_value)
diff --git a/tests/test_conductor.py b/tests/test_conductor.py
deleted file mode 100644
index c8e5d54..0000000
--- a/tests/test_conductor.py
+++ /dev/null
@@ -1,8 +0,0 @@
-
-# Ideas for testing conductor:
-# - test_process_spawning
-# - test_process_watching_pipe
-# - test_process_output_callback
-
-# - test_status_change
-# - test_status_change_callback
diff --git a/tests/test_mainwindow.py b/tests/test_mainwindow.py
deleted file mode 100644
index 323a257..0000000
--- a/tests/test_mainwindow.py
+++ /dev/null
@@ -1,150 +0,0 @@
-# vim: set fileencoding=utf-8 :
-from argparse import Namespace
-import logging
-logger = logging.getLogger(name=__name__)
-
-import sys
-import unittest
-
-# black magic XXX ??
-import sip
-sip.setapi('QVariant', 2)
-
-from PyQt4 import QtGui
-from PyQt4.QtTest import QTest
-from PyQt4.QtCore import Qt
-
-from eip_client import eipcmainwindow, conductor
-
-
-class MainWindowTest(unittest.TestCase):
- """
- Test our mainwindow GUI
- """
-
- ##################################################
- # FIXME
- # To be moved to BaseEIPTestCase
-
- def setUp(self):
- '''Create the GUI'''
- self.app = QtGui.QApplication(sys.argv)
- opts = Namespace(config=None,
- debug=False)
- self.win = eipcmainwindow.EIPCMainWindow(opts)
-
- def tearDown(self):
- """
- cleanup
- """
- # we have to delete references, otherwise
- # we get nice segfaults :)
- del(self.win)
- del(self.app)
-
- ##################################################
-
- def test_system_has_systray(self):
- """
- does this system has a systray?
- not the application response to that.
- """
- self.assertEqual(
- self.win.trayIcon.isSystemTrayAvailable(),
- True)
-
- def test_defaults(self):
- """
- test that the defaults are those expected
- """
- self.assertEqual(self.win.windowTitle(), "EIP")
- #self.assertEqual(self.win.durationSpinBox.value(), 15)
- #logger.debug('durationSpinBox: %s' % self.win.durationSpinBox.value())
-
- def test_main_window_has_conductor_instance(self):
- """
- test main window instantiates conductor class
- """
- self.assertEqual(hasattr(self.win, 'conductor'), True)
- self.assertEqual(isinstance(self.win.conductor,
- conductor.EIPConductor), True)
-
- # Let's roll... let's test serious things
- # ... better to have a different TestCase for this?
- # plan is:
- # 1) we signal to the app that we are running from the
- # testrunner -- so it knows, just in case :P
- # 2) we init the conductor with the default-for-testrunner
- # options -- like getting a fake-output client script
- # that mocks openvpn output to stdout.
- # 3) we check that the important things work as they
- # expected for the output of the binaries.
- # XXX TODO:
- # get generic helper methods for the base testcase class.
- # mock_good_output
- # mock_bad_output
- # check_status
-
- def test_connected_status_good_output(self):
- """
- check we get 'connected' state after mocked \
-good output from the fake openvpn process.
- """
- self.mock_good_output()
- # wait?
- self.check_state('connected')
-
- def test_unrecoverable_status_bad_output(self):
- """
- check we get 'unrecoverable' state after
- mocked bad output from the fake openvpn process.
- """
- self.mock_bad_output()
- self.check_state('unrecoverable')
-
- def test_icon_reflects_state(self):
- """
- test that the icon changes after an injection
- of a change-of-state event.
- """
- self.mock_status_change('connected')
- # icon == connectedIcon
- # examine: QSystemtrayIcon.MessageIcon ??
- self.mock_status_change('disconnected')
- # ico == disconnectedIcon
- self.mock_status_change('connecting')
- # icon == connectingIcon
-
- def test_status_signals_are_working(self):
- """
- test that status-change signals are being triggered
- """
- #???
- pass
-
-
- # sample tests below... to be removed
-
- #def test_show_message_button_does_show_message(self):
- #"""
- #test that clicking on main window button shows message
- #"""
- # FIXME
- #ok_show = self.win.showMessageButton
- #trayIcon = self.win.trayIcon
- # fake left click
- #QTest.mouseClick(ok_show, Qt.LeftButton)
- # how to assert that message has been shown?
- #import ipdb;ipdb.set_trace()
-
-
- #def test_do_fallback_if_not_systray(self):
- #"""
- #test that we do whatever we decide to do
- #when we detect no systray.
- #what happens with unity??
- #"""
- #pass
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/test_mgminterface.py b/tests/test_mgminterface.py
deleted file mode 100644
index 398b7f3..0000000
--- a/tests/test_mgminterface.py
+++ /dev/null
@@ -1,333 +0,0 @@
-import socket
-import select
-import telnetlib
-import contextlib
-
-from unittest import TestCase
-
-import support
-
-from eip_client.vpnmanager import OpenVPNManager
-
-HOST = "localhost"
-
-
-class SocketStub(object):
- ''' a socket proxy that re-defines sendall() '''
- def __init__(self, reads=[]):
- self.reads = reads
- self.writes = []
- self.block = False
-
- def sendall(self, data):
- self.writes.append(data)
-
- def recv(self, size):
- out = b''
- while self.reads and len(out) < size:
- out += self.reads.pop(0)
- #print(out)
- if len(out) > size:
- self.reads.insert(0, out[size:])
- out = out[:size]
- return out
-
-
-class TelnetAlike(telnetlib.Telnet):
- def fileno(self):
- raise NotImplementedError()
-
- def close(self):
- pass
-
- def sock_avail(self):
- return (not self.sock.block)
-
- def msg(self, msg, *args):
- with support.captured_stdout() as out:
- telnetlib.Telnet.msg(self, msg, *args)
- self._messages += out.getvalue()
- return
-
- def read_very_lazy(self):
- self.fill_rawq()
- _all = self.read_all()
- print 'faking lazy:', _all
- return _all
-
-
-def new_select(*s_args):
- block = False
- for l in s_args:
- for fob in l:
- if isinstance(fob, TelnetAlike):
- block = fob.sock.block
- if block:
- return [[], [], []]
- else:
- return s_args
-
-
-@contextlib.contextmanager
-def test_socket(reads):
- def new_conn(*ignored):
- return SocketStub(reads)
- try:
- old_conn = socket.create_connection
- socket.create_connection = new_conn
- yield None
- finally:
- socket.create_connection = old_conn
- return
-
-
-#
-# VPN Commands Dict
-#
-
-vpn_commands = {
- 'status': [
- 'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012',
- 'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102',
- 'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329',
- 'Auth read bytes,872102'],
- 'state': ['1340616463,CONNECTED,SUCCESS,172.28.0.2,198.252.153.38'],
- # XXX add more tests
- }
-
-
-class VPNManagementStub(TelnetAlike):
- epilogue = "\nEND\n"
-
- def write(self, data):
- #print('data written')
- data = data[:-1]
- if data not in vpn_commands:
- print('not in commands')
- telnetlib.Telnet.write(self, data)
- else:
- msg = '\n'.join(vpn_commands[data]) + self.epilogue
- print 'writing...'
- print msg
- for line in vpn_commands[data]:
- self.sock.reads.append(line)
- #telnetlib.Telnet.write(self, line)
- self.sock.reads.append(self.epilogue)
- #telnetlib.Telnet.write(self, self.epilogue)
-
-
-def test_telnet(reads=[], cls=VPNManagementStub):
- ''' return a telnetlib.Telnet object that uses a SocketStub with
- reads queued up to be read, and write method mocking a vpn
- management interface'''
- for x in reads:
- assert type(x) is bytes, x
- with test_socket(reads):
- telnet = cls('dummy', 0)
- telnet._messages = '' # debuglevel output
- return telnet
-
-
-class ReadTests(TestCase):
- def setUp(self):
- self.old_select = select.select
- select.select = new_select
-
- def tearDown(self):
- select.select = self.old_select
-
- def test_read_until(self):
- """
- read_until(expected, timeout=None)
- test the blocking version of read_util
- """
- want = [b'xxxmatchyyy']
- telnet = test_telnet(want)
- data = telnet.read_until(b'match')
- self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq,
- telnet.rawq, telnet.sock.reads))
-
- reads = [b'x' * 50, b'match', b'y' * 50]
- expect = b''.join(reads[:-1])
- telnet = test_telnet(reads)
- data = telnet.read_until(b'match')
- self.assertEqual(data, expect)
-
- def test_read_all(self):
- """
- read_all()
- Read all data until EOF; may block.
- """
- reads = [b'x' * 500, b'y' * 500, b'z' * 500]
- expect = b''.join(reads)
- telnet = test_telnet(reads)
- data = telnet.read_all()
- self.assertEqual(data, expect)
- return
-
- def test_read_some(self):
- """
- read_some()
- Read at least one byte or EOF; may block.
- """
- # test 'at least one byte'
- telnet = test_telnet([b'x' * 500])
- data = telnet.read_some()
- self.assertTrue(len(data) >= 1)
- # test EOF
- telnet = test_telnet()
- data = telnet.read_some()
- self.assertEqual(b'', data)
-
- def _read_eager(self, func_name):
- """
- read_*_eager()
- Read all data available already queued or on the socket,
- without blocking.
- """
- want = b'x' * 100
- telnet = test_telnet([want])
- func = getattr(telnet, func_name)
- telnet.sock.block = True
- self.assertEqual(b'', func())
- telnet.sock.block = False
- data = b''
- while True:
- try:
- data += func()
- except EOFError:
- break
- self.assertEqual(data, want)
-
- def test_read_eager(self):
- # read_eager and read_very_eager make the same gaurantees
- # (they behave differently but we only test the gaurantees)
- self._read_eager('read_eager')
- self._read_eager('read_very_eager')
- #self._read_eager('read_very_lazy')
- # NB -- we need to test the IAC block which is mentioned in the
- # docstring but not in the module docs
-
- def read_very_lazy(self):
- want = b'x' * 100
- telnet = test_telnet([want])
- self.assertEqual(b'', telnet.read_very_lazy())
- while telnet.sock.reads:
- telnet.fill_rawq()
- data = telnet.read_very_lazy()
- self.assertEqual(want, data)
- self.assertRaises(EOFError, telnet.read_very_lazy)
-
- def test_read_lazy(self):
- want = b'x' * 100
- telnet = test_telnet([want])
- self.assertEqual(b'', telnet.read_lazy())
- data = b''
- while True:
- try:
- read_data = telnet.read_lazy()
- data += read_data
- if not read_data:
- telnet.fill_rawq()
- except EOFError:
- break
- self.assertTrue(want.startswith(data))
- self.assertEqual(data, want)
-
-
-def _seek_to_eof(self):
- """
- Read as much as available. Position seek pointer to end of stream
- """
- #import ipdb;ipdb.set_trace()
- while self.tn.sock.reads:
- print 'reading...'
- print 'and filling rawq'
- self.tn.fill_rawq()
- self.tn.process_rawq()
- try:
- b = self.tn.read_eager()
- while b:
- b = self.tn.read_eager()
- except EOFError:
- pass
-
-
-def connect_to_stub(self):
- """
- stub to be added to manager
- """
- try:
- self.close()
- except:
- pass
- if self.connected():
- return True
- self.tn = test_telnet()
-
- self._seek_to_eof()
- return True
-
-
-
-class VPNManagerTests(TestCase):
-
- def setUp(self):
- self.old_select = select.select
- select.select = new_select
-
- patched_manager = OpenVPNManager
- patched_manager._seek_to_eof = _seek_to_eof
- patched_manager.connect = connect_to_stub
- self.manager = patched_manager()
-
- def tearDown(self):
- select.select = self.old_select
-
- # tests
-
-
- #def test_read_very_lazy(self):
- #want = b'x' * 100
- #telnet = test_telnet()
- #self.assertEqual(b'', telnet.read_very_lazy())
- #print 'writing to telnet'
- #telnet.write('status\n')
- #import ipdb;ipdb.set_trace()
- #while telnet.sock.reads:
- #print 'reading...'
- #print 'and filling rawq'
- #telnet.fill_rawq()
- #import ipdb;ipdb.set_trace()
- #data = telnet.read_very_lazy()
- #print 'data ->', data
-
- #def test_manager_status(self):
- #buf = self.manager._send_command('state')
- #import ipdb;ipdb.set_trace()
- #print 'buf-->'
- #print buf
-#
- def test_manager_state(self):
- buf = self.manager.state()
- print 'buf-->'
- print buf
- import ipdb;ipdb.set_trace()
-
- def test_command(self):
- commands = [b'status']
- for com in commands:
- telnet = test_telnet()
- telnet.write(com)
- buf = telnet.read_until(b'END')
- print 'buf '
- print buf
-
-
-def test_main(verbose=None):
- support.run_unittest(
- #ReadTests,
- VPNManagerTests)
-
-if __name__ == '__main__':
- test_main()
diff --git a/tests/test_qt_environment.py b/tests/test_qt_environment.py
new file mode 100644
index 0000000..08fccf4
--- /dev/null
+++ b/tests/test_qt_environment.py
@@ -0,0 +1,39 @@
+import sys
+import unittest
+
+import sip
+sip.setapi('QVariant', 2)
+
+from PyQt4 import QtGui
+
+
+class TestWin(QtGui.QMainWindow):
+ """
+ a _really_ minimal test window,
+ with only one tray icon
+ """
+ def __init__(self):
+ super(TestWin, self).__init__()
+ self.trayIcon = QtGui.QSystemTrayIcon(self)
+
+
+class QtEnvironTest(unittest.TestCase):
+ """
+ Test we're running a proper qt environment
+ """
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ self.win = TestWin()
+
+ def tearDown(self):
+ del(self.win)
+ del(self.app)
+
+ def test_system_has_systray(self):
+ """
+ does system have systray available?
+ """
+ self.assertEqual(
+ self.win.trayIcon.isSystemTrayAvailable(),
+ True)
diff --git a/tests/test_vpn_management.py b/tests/test_vpn_management.py
deleted file mode 100644
index d8a0314..0000000
--- a/tests/test_vpn_management.py
+++ /dev/null
@@ -1,42 +0,0 @@
-import unittest
-import sys
-import time
-
-from eip_client.mocks.manager import get_openvpn_manager_mocks
-
-
-class VPNManagerTests(unittest.TestCase):
-
- def setUp(self):
- self.manager = get_openvpn_manager_mocks()
-
- #
- # tests
- #
-
- def test_status_command(self):
- ret = self.manager.status()
- #print ret
-
- def test_connection_state(self):
- ts, status, ok, ip, remote = self.manager.get_connection_state()
- self.assertTrue(status in ('CONNECTED', 'DISCONNECTED'))
- self.assertTrue(isinstance(ts, time.struct_time))
-
- def test_status_io(self):
- when_ts, counters = self.manager.get_status_io()
- self.assertTrue(isinstance(when_ts, time.struct_time))
- self.assertEqual(len(counters), 5)
- self.assertTrue(all(map(lambda x: x.isdigit(), counters)))
-
-
-def test():
- suite = unittest.TestSuite()
- for cls in (VPNManagerTests,):
- suite.addTest(unittest.makeSuite(cls))
- runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
- failfast=False)
- result = runner.run(suite)
-
-if __name__ == "__main__":
- test()