summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/leap/base/pluggableconfig.py3
-rw-r--r--src/leap/crypto/leapkeyring.py1
-rw-r--r--src/leap/eip/tests/test_eipconnection.py22
-rw-r--r--src/leap/eip/tests/test_openvpnconnection.py21
-rw-r--r--src/leap/gui/__init__.py3
-rw-r--r--src/leap/gui/firstrun/__init__.py4
-rw-r--r--src/leap/gui/firstrun/connect.py231
-rw-r--r--src/leap/gui/firstrun/login.py247
-rw-r--r--src/leap/gui/firstrun/providerselect.py22
-rw-r--r--src/leap/gui/firstrun/register.py83
-rwxr-xr-xsrc/leap/gui/firstrun/tests/integration/fake_provider.py31
-rwxr-xr-xsrc/leap/gui/firstrun/wizard.py88
-rw-r--r--src/leap/gui/progress.py103
-rw-r--r--src/leap/gui/tests/__init__.py0
-rw-r--r--src/leap/gui/tests/test_firstrun_login.py212
-rw-r--r--src/leap/gui/tests/test_firstrun_providerselect.py201
-rw-r--r--src/leap/gui/tests/test_firstrun_register.py244
-rw-r--r--src/leap/gui/tests/test_firstrun_wizard.py137
-rw-r--r--src/leap/gui/tests/test_mainwindow_rc.py (renamed from src/leap/gui/test_mainwindow_rc.py)3
-rw-r--r--src/leap/gui/tests/test_progress.py449
-rw-r--r--src/leap/gui/tests/test_threads.py27
-rw-r--r--src/leap/testing/pyqt.py52
-rw-r--r--src/leap/testing/qunittest.py302
-rw-r--r--src/leap/util/web.py1
24 files changed, 2003 insertions, 484 deletions
diff --git a/src/leap/base/pluggableconfig.py b/src/leap/base/pluggableconfig.py
index 34c1e060..0ca985ea 100644
--- a/src/leap/base/pluggableconfig.py
+++ b/src/leap/base/pluggableconfig.py
@@ -419,7 +419,8 @@ class PluggableConfig(object):
return True
-def testmain():
+def testmain(): # pragma: no cover
+
from tests import test_validation as t
import pprint
diff --git a/src/leap/crypto/leapkeyring.py b/src/leap/crypto/leapkeyring.py
index d4be7bf9..c241d0bc 100644
--- a/src/leap/crypto/leapkeyring.py
+++ b/src/leap/crypto/leapkeyring.py
@@ -53,6 +53,7 @@ class LeapCryptedFileKeyring(keyring.backend.CryptedFileKeyring):
def leap_set_password(key, value, seed="xxx"):
+ key, value = map(unicode, (key, value))
keyring.set_keyring(LeapCryptedFileKeyring(seed=seed))
keyring.set_password('leap', key, value)
diff --git a/src/leap/eip/tests/test_eipconnection.py b/src/leap/eip/tests/test_eipconnection.py
index 4ee5ae30..163f8d45 100644
--- a/src/leap/eip/tests/test_eipconnection.py
+++ b/src/leap/eip/tests/test_eipconnection.py
@@ -1,6 +1,8 @@
+import glob
import logging
import platform
-import os
+#import os
+import shutil
logging.basicConfig()
logger = logging.getLogger(name=__name__)
@@ -66,11 +68,26 @@ class EIPConductorTest(BaseLeapTest):
self.manager = Mock(name="openvpnmanager_mock")
self.con = MockedEIPConnection()
self.con.provider = self.provider
+
+ # XXX watch out. This sometimes is throwing the following error:
+ # NoSuchProcess: process no longer exists (pid=6571)
+ # because of a bad implementation of _check_if_running_instance
+
self.con.run_openvpn_checks()
def tearDown(self):
+ pass
+
+ def doCleanups(self):
+ super(BaseLeapTest, self).doCleanups()
+ self.cleanupSocketDir()
del self.con
+ def cleanupSocketDir(self):
+ ptt = ('/tmp/leap-tmp*')
+ for tmpdir in glob.glob(ptt):
+ shutil.rmtree(tmpdir)
+
#
# tests
#
@@ -81,6 +98,7 @@ class EIPConductorTest(BaseLeapTest):
"""
con = self.con
self.assertEqual(con.autostart, True)
+ # XXX moar!
def test_ovpn_command(self):
"""
@@ -98,6 +116,7 @@ class EIPConductorTest(BaseLeapTest):
# needed to run tests. (roughly 3 secs for this only)
# We should modularize and inject Mocks on more places.
+ oldcon = self.con
del(self.con)
config_checker = Mock()
self.con = MockedEIPConnection(config_checker=config_checker)
@@ -107,6 +126,7 @@ class EIPConductorTest(BaseLeapTest):
skip_download=False)
# XXX test for cert_checker also
+ self.con = oldcon
# connect/disconnect calls
diff --git a/src/leap/eip/tests/test_openvpnconnection.py b/src/leap/eip/tests/test_openvpnconnection.py
index 0f27facf..f7493567 100644
--- a/src/leap/eip/tests/test_openvpnconnection.py
+++ b/src/leap/eip/tests/test_openvpnconnection.py
@@ -58,16 +58,27 @@ class OpenVPNConnectionTest(BaseLeapTest):
def setUp(self):
# XXX this will have to change for win, host=localhost
host = eipconfig.get_socket_path()
+ self.host = host
self.manager = MockedOpenVPNConnection(host=host)
def tearDown(self):
+ pass
+
+ def doCleanups(self):
+ super(BaseLeapTest, self).doCleanups()
+ self.cleanupSocketDir()
+
+ def cleanupSocketDir(self):
# remove the socket folder.
# XXX only if posix. in win, host is localhost, so nothing
# has to be done.
- if self.manager.host:
- folder, fpath = os.path.split(self.manager.host)
- assert folder.startswith('/tmp/leap-tmp') # safety check
- shutil.rmtree(folder)
+ if self.host:
+ folder, fpath = os.path.split(self.host)
+ try:
+ assert folder.startswith('/tmp/leap-tmp') # safety check
+ shutil.rmtree(folder)
+ except:
+ self.fail("could not remove temp file")
del self.manager
@@ -108,12 +119,14 @@ class OpenVPNConnectionTest(BaseLeapTest):
self.assertEqual(self.manager.port, 7777)
def test_port_types_init(self):
+ oldmanager = self.manager
self.manager = MockedOpenVPNConnection(port="42")
self.assertEqual(self.manager.port, 42)
self.manager = MockedOpenVPNConnection()
self.assertEqual(self.manager.port, "unix")
self.manager = MockedOpenVPNConnection(port="bad")
self.assertEqual(self.manager.port, None)
+ self.manager = oldmanager
def test_uds_telnet_called_on_connect(self):
self.manager.connect_to_management()
diff --git a/src/leap/gui/__init__.py b/src/leap/gui/__init__.py
index 9b8f8746..804bfbc1 100644
--- a/src/leap/gui/__init__.py
+++ b/src/leap/gui/__init__.py
@@ -6,5 +6,6 @@ except ValueError:
pass
import firstrun
+import firstrun.wizard
-__all__ = ['firstrun']
+__all__ = ['firstrun', 'firstrun.wizard']
diff --git a/src/leap/gui/firstrun/__init__.py b/src/leap/gui/firstrun/__init__.py
index 8a70d90e..d380b75a 100644
--- a/src/leap/gui/firstrun/__init__.py
+++ b/src/leap/gui/firstrun/__init__.py
@@ -5,7 +5,6 @@ try:
except ValueError:
pass
-import connect
import intro
import last
import login
@@ -17,7 +16,6 @@ import register
import regvalidation
__all__ = [
- 'connect',
'intro',
'last',
'login',
@@ -26,4 +24,4 @@ __all__ = [
'providerselect',
'providersetup',
'register',
- 'regvalidation']
+ 'regvalidation'] # ,'wizard']
diff --git a/src/leap/gui/firstrun/connect.py b/src/leap/gui/firstrun/connect.py
deleted file mode 100644
index a0fe021c..00000000
--- a/src/leap/gui/firstrun/connect.py
+++ /dev/null
@@ -1,231 +0,0 @@
-"""
-Connecting Page, used in First Run Wizard
-"""
-# XXX FIXME
-# DEPRECATED. All functionality moved to regvalidation
-# This file should be removed after checking that one is ok.
-# XXX
-
-import logging
-
-from PyQt4 import QtGui
-
-logger = logging.getLogger(__name__)
-
-from leap.base import auth
-
-from leap.gui.constants import APP_LOGO
-from leap.gui.styles import ErrorLabelStyleSheet
-
-
-class ConnectingPage(QtGui.QWizardPage):
-
- # XXX change to a ValidationPage
-
- def __init__(self, parent=None):
- super(ConnectingPage, self).__init__(parent)
-
- self.setTitle("Connecting")
- self.setSubTitle('Connecting to provider.')
-
- self.setPixmap(
- QtGui.QWizard.LogoPixmap,
- QtGui.QPixmap(APP_LOGO))
-
- self.status = QtGui.QLabel("")
- self.status.setWordWrap(True)
- self.progress = QtGui.QProgressBar()
- self.progress.setMaximum(100)
- self.progress.hide()
-
- # for pre-checks
- self.status_line_1 = QtGui.QLabel()
- self.status_line_2 = QtGui.QLabel()
- self.status_line_3 = QtGui.QLabel()
- self.status_line_4 = QtGui.QLabel()
-
- # for connecting signals...
- self.status_line_5 = QtGui.QLabel()
-
- layout = QtGui.QGridLayout()
- layout.addWidget(self.status, 0, 1)
- layout.addWidget(self.progress, 5, 1)
- layout.addWidget(self.status_line_1, 8, 1)
- layout.addWidget(self.status_line_2, 9, 1)
- layout.addWidget(self.status_line_3, 10, 1)
- layout.addWidget(self.status_line_4, 11, 1)
-
- # XXX to be used?
- #self.validation_status = QtGui.QLabel("")
- #self.validation_status.setStyleSheet(
- #ErrorLabelStyleSheet)
- #self.validation_msg = QtGui.QLabel("")
-
- self.setLayout(layout)
-
- self.goto_login_again = False
-
- def set_status(self, status):
- self.status.setText(status)
- self.status.setWordWrap(True)
-
- def set_status_line(self, line, status):
- line = getattr(self, 'status_line_%s' % line)
- if line:
- line.setText(status)
-
- def set_validation_status(self, status):
- # Do not remember if we're using
- # status lines > 3 now...
- # if we are, move below
- self.status_line_3.setStyleSheet(
- ErrorLabelStyleSheet)
- self.status_line_3.setText(status)
-
- def set_validation_message(self, message):
- self.status_line_4.setText(message)
- self.status_line_4.setWordWrap(True)
-
- def get_donemsg(self, msg):
- return "%s ... done" % msg
-
- def run_eip_checks_for_provider_and_connect(self, domain):
- wizard = self.wizard()
- conductor = wizard.conductor
- start_eip_signal = getattr(
- wizard,
- 'start_eipconnection_signal', None)
-
- if conductor:
- conductor.set_provider_domain(domain)
- conductor.run_checks()
- self.conductor = conductor
- errors = self.eip_error_check()
- if not errors and start_eip_signal:
- start_eip_signal.emit()
-
- else:
- logger.warning(
- "No conductor found. This means that "
- "probably the wizard has been launched "
- "in an stand-alone way")
-
- def eip_error_check(self):
- """
- a version of the main app error checker,
- but integrated within the connecting page of the wizard.
- consumes the conductor error queue.
- pops errors, and add those to the wizard page
- """
- logger.debug('eip error check from connecting page')
- errq = self.conductor.error_queue
- # XXX missing!
-
- def fetch_and_validate(self):
- # XXX MOVE TO validate function in register-validation
- import time
- domain = self.field('provider_domain')
- wizard = self.wizard()
- #pconfig = wizard.providerconfig
- eipconfigchecker = wizard.eipconfigchecker()
- pCertChecker = wizard.providercertchecker(
- domain=domain)
-
- # username and password are in different fields
- # if they were stored in log_in or sign_up pages.
- from_login = self.wizard().from_login
- unamek_base = 'userName'
- passwk_base = 'userPassword'
- unamek = 'login_%s' % unamek_base if from_login else unamek_base
- passwk = 'login_%s' % passwk_base if from_login else passwk_base
-
- username = self.field(unamek)
- password = self.field(passwk)
- credentials = username, password
-
- self.progress.show()
-
- fetching_eip_conf_msg = 'Fetching eip service configuration'
- self.set_status(fetching_eip_conf_msg)
- self.progress.setValue(30)
-
- # Fetching eip service
- eipconfigchecker.fetch_eip_service_config(
- domain=domain)
-
- self.status_line_1.setText(
- self.get_donemsg(fetching_eip_conf_msg))
-
- getting_client_cert_msg = 'Getting client certificate'
- self.set_status(getting_client_cert_msg)
- self.progress.setValue(66)
-
- # Download cert
- try:
- pCertChecker.download_new_client_cert(
- credentials=credentials,
- # FIXME FIXME FIXME
- # XXX FIX THIS!!!!!
- # BUG #638. remove verify
- # FIXME FIXME FIXME
- verify=False)
- except auth.SRPAuthenticationError as exc:
- self.set_validation_status(
- "Authentication error: %s" % exc.message)
- return False
-
- time.sleep(2)
- self.status_line_2.setText(
- self.get_donemsg(getting_client_cert_msg))
-
- validating_clientcert_msg = 'Validating client certificate'
- self.set_status(validating_clientcert_msg)
- self.progress.setValue(90)
- time.sleep(2)
- self.status_line_3.setText(
- self.get_donemsg(validating_clientcert_msg))
-
- self.progress.setValue(100)
- time.sleep(3)
-
- # here we go! :)
- self.run_eip_checks_for_provider_and_connect(domain)
-
- #self.validation_block = self.wait_for_validation_block()
-
- # XXX signal timeout!
- return True
-
- #
- # wizardpage methods
- #
-
- def nextId(self):
- wizard = self.wizard()
- # XXX this does not work because
- # page login has already been met
- #if self.goto_login_again:
- #next_ = "login"
- #else:
- #next_ = "lastpage"
- next_ = "lastpage"
- return wizard.get_page_index(next_)
-
- def initializePage(self):
- # XXX if we're coming from signup page
- # we could say something like
- # 'registration successful!'
- self.status.setText(
- "We have "
- "all we need to connect with the provider.<br><br> "
- "Click <i>next</i> to continue. ")
- self.progress.setValue(0)
- self.progress.hide()
- self.status_line_1.setText('')
- self.status_line_2.setText('')
- self.status_line_3.setText('')
-
- def validatePage(self):
- # XXX remove
- validated = self.fetch_and_validate()
- return validated
diff --git a/src/leap/gui/firstrun/login.py b/src/leap/gui/firstrun/login.py
index 02bace86..e7afee9f 100644
--- a/src/leap/gui/firstrun/login.py
+++ b/src/leap/gui/firstrun/login.py
@@ -82,6 +82,120 @@ class LogInPage(InlineValidationPage, UserFormMixIn): # InlineValidationPage
#self.registerField('is_login_wizard')
+ # actual checks
+
+ def _do_checks(self):
+
+ full_username = self.userNameLineEdit.text()
+ ###########################
+ # 0) check user@domain form
+ ###########################
+
+ def checkusername():
+ if full_username.count('@') != 1:
+ return self.fail(
+ self.tr(
+ "Username must be in the username@provider form."))
+ else:
+ return True
+
+ yield(("head_sentinel", 0), checkusername)
+
+ username, domain = full_username.split('@')
+ password = self.userPasswordLineEdit.text()
+
+ # We try a call to an authenticated
+ # page here as a mean to catch
+ # srp authentication errors while
+ wizard = self.wizard()
+ eipconfigchecker = wizard.eipconfigchecker()
+
+ ########################
+ # 1) try name resolution
+ ########################
+ # show the frame before going on...
+ QtCore.QMetaObject.invokeMethod(
+ self, "showStepsFrame")
+
+ # Able to contact domain?
+ # can get definition?
+ # two-by-one
+ def resolvedomain():
+ try:
+ eipconfigchecker.fetch_definition(domain=domain)
+
+ # we're using requests here for all
+ # the possible error cases that it catches.
+ except requests.exceptions.ConnectionError as exc:
+ return self.fail(exc.message[1])
+ except requests.exceptions.HTTPError as exc:
+ return self.fail(exc.message)
+ except Exception as exc:
+ # XXX get catchall error msg
+ return self.fail(
+ exc.message)
+ else:
+ return True
+
+ yield((self.tr("Resolving domain name"), 20), resolvedomain)
+
+ wizard.set_providerconfig(
+ eipconfigchecker.defaultprovider.config)
+
+ ########################
+ # 2) do authentication
+ ########################
+ credentials = username, password
+ pCertChecker = wizard.providercertchecker(
+ domain=domain)
+
+ def validate_credentials():
+ #################
+ # FIXME #BUG #638
+ verify = False
+
+ try:
+ pCertChecker.download_new_client_cert(
+ credentials=credentials,
+ verify=verify)
+
+ except auth.SRPAuthenticationError as exc:
+ return self.fail(
+ self.tr("Authentication error: %s" % exc.message))
+
+ except Exception as exc:
+ return self.fail(exc.message)
+
+ else:
+ return True
+
+ yield(('Validating credentials', 60), validate_credentials)
+
+ self.set_done()
+ yield(("end_sentinel", 100), lambda: None)
+
+ def green_validation_status(self):
+ val = self.validationMsg
+ val.setText(self.tr('Credentials validated.'))
+ val.setStyleSheet(styles.GreenLineEdit)
+
+ def on_checks_validation_ready(self):
+ """
+ after checks
+ """
+ if self.is_done():
+ self.disableFields()
+ self.cleanup_errormsg()
+ self.clean_wizard_errors(self.current_page)
+ # make the user confirm the transition
+ # to next page.
+ self.nextText('&Next')
+ self.nextFocus()
+ self.green_validation_status()
+ self.do_confirm_next = True
+
+ # ui update
+
def nextText(self, text):
self.setButtonText(
QtGui.QWizard.NextButton, text)
@@ -94,12 +208,18 @@ class LogInPage(InlineValidationPage, UserFormMixIn): # InlineValidationPage
self.wizard().button(
QtGui.QWizard.NextButton).setDisabled(True)
- def onUserNameEdit(self, *args):
+ def onUserNamePositionChanged(self, *args):
if self.initial_username_sample:
self.userNameLineEdit.setText('')
# XXX set regular color
self.initial_username_sample = None
+ def onUserNameTextChanged(self, *args):
+ if self.initial_username_sample:
+ k = args[0][-1]
+ self.initial_username_sample = None
+ self.userNameLineEdit.setText(k)
+
def disableFields(self):
for field in (self.userNameLineEdit,
self.userPasswordLineEdit):
@@ -111,13 +231,8 @@ class LogInPage(InlineValidationPage, UserFormMixIn): # InlineValidationPage
errors = self.wizard().get_validation_error(
self.current_page)
- #prev_er = getattr(self, 'prevalidation_error', None)
showerr = self.validationMsg.setText
- #if not errors and prev_er:
- #showerr(prev_er)
- #return
-#
if errors:
bad_str = getattr(self, 'bad_string', None)
cur_str = self.userNameLineEdit.text()
@@ -128,9 +243,6 @@ class LogInPage(InlineValidationPage, UserFormMixIn): # InlineValidationPage
self.bad_string = cur_str
showerr(errors)
else:
- #if prev_er:
- #showerr(prev_er)
- #return
# not the first time
if cur_str == bad_str:
showerr(errors)
@@ -177,7 +289,9 @@ class LogInPage(InlineValidationPage, UserFormMixIn): # InlineValidationPage
username = self.userNameLineEdit
username.setText('username@provider.example.org')
username.cursorPositionChanged.connect(
- self.onUserNameEdit)
+ self.onUserNamePositionChanged)
+ username.textChanged.connect(
+ self.onUserNameTextChanged)
self.initial_username_sample = True
self.validationMsg.setText('')
self.valFrame.hide()
@@ -215,116 +329,3 @@ class LogInPage(InlineValidationPage, UserFormMixIn): # InlineValidationPage
self.do_checks()
return self.is_done()
-
- def _do_checks(self):
- # XXX convert this to inline
-
- full_username = self.userNameLineEdit.text()
- ###########################
- # 0) check user@domain form
- ###########################
-
- def checkusername():
- if full_username.count('@') != 1:
- return self.fail(
- self.tr(
- "Username must be in the username@provider form."))
- else:
- return True
-
- yield(("head_sentinel", 0), checkusername)
-
- # XXX I think this is not needed
- # since we're also checking for the is_signup field.
- #self.wizard().from_login = True
-
- username, domain = full_username.split('@')
- password = self.userPasswordLineEdit.text()
-
- # We try a call to an authenticated
- # page here as a mean to catch
- # srp authentication errors while
- wizard = self.wizard()
- eipconfigchecker = wizard.eipconfigchecker()
-
- ########################
- # 1) try name resolution
- ########################
- # show the frame before going on...
- QtCore.QMetaObject.invokeMethod(
- self, "showStepsFrame")
-
- # Able to contact domain?
- # can get definition?
- # two-by-one
- def resolvedomain():
- try:
- eipconfigchecker.fetch_definition(domain=domain)
-
- # we're using requests here for all
- # the possible error cases that it catches.
- except requests.exceptions.ConnectionError as exc:
- return self.fail(exc.message[1])
- except requests.exceptions.HTTPError as exc:
- return self.fail(exc.message)
- except Exception as exc:
- # XXX get catchall error msg
- return self.fail(
- exc.message)
-
- yield((self.tr("resolving domain name"), 20), resolvedomain)
-
- wizard.set_providerconfig(
- eipconfigchecker.defaultprovider.config)
-
- ########################
- # 2) do authentication
- ########################
- credentials = username, password
- pCertChecker = wizard.providercertchecker(
- domain=domain)
-
- def validate_credentials():
- #################
- # FIXME #BUG #638
- verify = False
-
- try:
- pCertChecker.download_new_client_cert(
- credentials=credentials,
- verify=verify)
-
- except auth.SRPAuthenticationError as exc:
- return self.fail(
- self.tr("Authentication error: %s" % exc.message))
-
- except Exception as exc:
- return self.fail(exc.message)
-
- else:
- return True
-
- yield(('Validating credentials', 20), validate_credentials)
-
- self.set_done()
- yield(("end_sentinel", 0), lambda: None)
-
- def green_validation_status(self):
- val = self.validationMsg
- val.setText(self.tr('Credentials validated.'))
- val.setStyleSheet(styles.GreenLineEdit)
-
- def on_checks_validation_ready(self):
- """
- after checks
- """
- if self.is_done():
- self.disableFields()
- self.cleanup_errormsg()
- self.clean_wizard_errors(self.current_page)
- # make the user confirm the transition
- # to next page.
- self.nextText('&Next')
- self.nextFocus()
- self.green_validation_status()
- self.do_confirm_next = True
diff --git a/src/leap/gui/firstrun/providerselect.py b/src/leap/gui/firstrun/providerselect.py
index a4be51a9..fd48f7f9 100644
--- a/src/leap/gui/firstrun/providerselect.py
+++ b/src/leap/gui/firstrun/providerselect.py
@@ -40,7 +40,7 @@ class SelectProviderPage(InlineValidationPage):
self.did_cert_check = False
- self.is_done = False
+ self.done = False
self.setupSteps()
self.setupUI()
@@ -131,7 +131,7 @@ class SelectProviderPage(InlineValidationPage):
# certinfo
- def setupCertInfoGroup(self):
+ def setupCertInfoGroup(self): # pragma: no cover
# XXX not used now.
certinfoGroup = QtGui.QGroupBox(
self.tr("Certificate validation"))
@@ -188,7 +188,6 @@ class SelectProviderPage(InlineValidationPage):
_domain = u"%s:%s" % (domain, port) if port != 443 else unicode(domain)
netchecker = wizard.netchecker()
-
providercertchecker = wizard.providercertchecker()
eipconfigchecker = wizard.eipconfigchecker(domain=_domain)
@@ -205,6 +204,7 @@ class SelectProviderPage(InlineValidationPage):
this domain
"""
try:
+ #import ipdb;ipdb.set_trace()
netchecker.check_name_resolution(
domain)
@@ -306,7 +306,7 @@ class SelectProviderPage(InlineValidationPage):
# done!
- self.is_done = True
+ self.done = True
yield(("end_sentinel", 100), lambda: None)
def on_checks_validation_ready(self):
@@ -316,7 +316,7 @@ class SelectProviderPage(InlineValidationPage):
self.domain_checked = True
self.completeChanged.emit()
# let's set focus...
- if self.is_done:
+ if self.is_done():
self.wizard().clean_validation_error(self.current_page)
nextbutton = self.wizard().button(QtGui.QWizard.NextButton)
nextbutton.setFocus()
@@ -329,7 +329,7 @@ class SelectProviderPage(InlineValidationPage):
def is_insecure_cert_trusted(self):
return self.trustProviderCertCheckBox.isChecked()
- def onTrustCheckChanged(self, state):
+ def onTrustCheckChanged(self, state): # pragma: no cover XXX
checked = False
if state == 2:
checked = True
@@ -342,7 +342,7 @@ class SelectProviderPage(InlineValidationPage):
# trigger signal to redraw next button
self.completeChanged.emit()
- def add_cert_info(self, certinfo):
+ def add_cert_info(self, certinfo): # pragma: no cover XXX
self.certWarning.setText(
"Do you want to <b>trust this provider certificate?</b>")
self.certInfo.setText(
@@ -351,7 +351,7 @@ class SelectProviderPage(InlineValidationPage):
self.certinfoGroup.show()
def onProviderChanged(self, text):
- self.is_done = False
+ self.done = False
provider = self.providerNameEdit.text()
if provider:
self.providerCheckButton.setDisabled(False)
@@ -374,7 +374,7 @@ class SelectProviderPage(InlineValidationPage):
def isComplete(self):
provider = self.providerNameEdit.text()
- if not self.is_done:
+ if not self.is_done():
return False
if not provider:
@@ -383,7 +383,7 @@ class SelectProviderPage(InlineValidationPage):
if self.is_insecure_cert_trusted():
return True
if not self.did_cert_check:
- if self.is_done:
+ if self.is_done():
# XXX sure?
return True
return False
@@ -452,7 +452,7 @@ class SelectProviderPage(InlineValidationPage):
if hasattr(self, 'certinfoGroup'):
# XXX remove ?
self.certinfoGroup.hide()
- self.is_done = False
+ self.done = False
self.providerCheckButton.setDisabled(True)
self.valFrame.hide()
self.steps.removeAllSteps()
diff --git a/src/leap/gui/firstrun/register.py b/src/leap/gui/firstrun/register.py
index e85723cb..4c811093 100644
--- a/src/leap/gui/firstrun/register.py
+++ b/src/leap/gui/firstrun/register.py
@@ -131,6 +131,16 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
field.setDisabled(True)
# error painting
+ def paintEvent(self, event):
+ """
+ we hook our populate errors
+ on paintEvent because we need it to catch
+ when user enters the page coming from next,
+ and initializePage does not cover that case.
+ Maybe there's a better event to hook upon.
+ """
+ super(RegisterUserPage, self).paintEvent(event)
+ self.populateErrors()
def markRedAndGetFocus(self, field):
field.setStyleSheet(styles.ErrorLineEdit)
@@ -193,16 +203,21 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
"""
self.bad_string = None
- def paintEvent(self, event):
+ def green_validation_status(self):
+ val = self.validationMsg
+ val.setText(self.tr('Registration succeeded!'))
+ val.setStyleSheet(styles.GreenLineEdit)
+
+ def reset_validation_status(self):
"""
- we hook our populate errors
- on paintEvent because we need it to catch
- when user enters the page coming from next,
- and initializePage does not cover that case.
- Maybe there's a better event to hook upon.
+ empty the validation msg
+ and clean the inline validation widget.
"""
- super(RegisterUserPage, self).paintEvent(event)
- self.populateErrors()
+ self.validationMsg.setText('')
+ self.steps.removeAllSteps()
+ self.clearTable()
+
+ # actual checks
def _do_checks(self):
"""
@@ -255,6 +270,7 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
schema="https",
provider=provider,
verify=verify)
+ #import ipdb;ipdb.set_trace()
try:
ok, req = signup.register_user(
username, password)
@@ -277,9 +293,15 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
self.tr(
"Error during registration (%s)") % req.status_code)
- validation_msgs = json.loads(req.content)
- errors = validation_msgs.get('errors', None)
- logger.debug('validation errors: %s' % validation_msgs)
+ try:
+ validation_msgs = json.loads(req.content)
+ errors = validation_msgs.get('errors', None)
+ logger.debug('validation errors: %s' % validation_msgs)
+ except ValueError:
+ # probably bad json returned
+ return self.fail(
+ self.tr(
+ "Could not register (bad response)"))
if errors and errors.get('login', None):
# XXX this sometimes catch the blank username
@@ -287,11 +309,13 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
return self.fail(
self.tr('Username not available.'))
+ return True
+
logger.debug('registering user')
yield(("registering with provider", 40), register)
self.set_done()
- yield(("end_sentinel", 0), lambda: None)
+ yield(("end_sentinel", 100), lambda: None)
def on_checks_validation_ready(self):
"""
@@ -308,20 +332,6 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
self.green_validation_status()
self.do_confirm_next = True
- def green_validation_status(self):
- val = self.validationMsg
- val.setText(self.tr('Registration succeeded!'))
- val.setStyleSheet(styles.GreenLineEdit)
-
- def reset_validation_status(self):
- """
- empty the validation msg
- and clean the inline validation widget.
- """
- self.validationMsg.setText('')
- self.steps.removeAllSteps()
- self.clearTable()
-
# pagewizard methods
def validatePage(self):
@@ -352,17 +362,26 @@ class RegisterUserPage(InlineValidationPage, UserFormMixIn):
"""
inits wizard page
"""
- provider = self.field('provider_domain')
- self.setSubTitle(
- self.tr("Register a new user with provider %s.") %
- provider)
+ provider = unicode(self.field('provider_domain'))
+ if provider:
+ # here we should have provider
+ # but in tests we might not.
+
+ # XXX this error causes a segfault on free()
+ # that we might want to get fixed ...
+ #self.setSubTitle(
+ #self.tr("Register a new user with provider %s.") %
+ #provider)
+ self.setSubTitle(
+ self.tr("Register a new user with provider %s." %
+ provider))
self.validationMsg.setText('')
self.userPassword2LineEdit.setText('')
self.valFrame.hide()
def nextId(self):
wizard = self.wizard()
- if not wizard:
- return
+ #if not wizard:
+ #return
# XXX this should be called connect
return wizard.get_page_index('signupvalidation')
diff --git a/src/leap/gui/firstrun/tests/integration/fake_provider.py b/src/leap/gui/firstrun/tests/integration/fake_provider.py
index 33ee0ee6..445b4487 100755
--- a/src/leap/gui/firstrun/tests/integration/fake_provider.py
+++ b/src/leap/gui/firstrun/tests/integration/fake_provider.py
@@ -40,6 +40,8 @@ from twisted.web.static import File
from twisted.web.resource import Resource
from twisted.internet import reactor
+from leap.testing.https_server import where
+
# See
# http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.htmln
# for more examples
@@ -229,14 +231,13 @@ def get_certs_path():
def get_TLS_credentials():
# XXX this is giving errors
# XXX REview! We want to use gnutls!
- certs_path = get_certs_path()
cert = crypto.X509Certificate(
- open(certs_path + '/leaptestscert.pem').read())
+ open(where('leaptestscert.pem')).read())
key = crypto.X509PrivateKey(
- open(certs_path + '/leaptestskey.pem').read())
+ open(where('leaptestskey.pem')).read())
ca = crypto.X509Certificate(
- open(certs_path + '/cacert.pem').read())
+ open(where('cacert.pem')).read())
#crl = crypto.X509CRL(open(certs_path + '/crl.pem').read())
#cred = crypto.X509Credentials(cert, key, [ca], [crl])
cred = X509Credentials(cert, key, [ca])
@@ -253,19 +254,17 @@ class OpenSSLServerContextFactory:
"""Create an SSL context.
This is a sample implementation that loads a certificate from a file
called 'server.pem'."""
- certs_path = get_certs_path()
ctx = SSL.Context(SSL.SSLv23_METHOD)
- ctx.use_certificate_file(certs_path + '/leaptestscert.pem')
- ctx.use_privatekey_file(certs_path + '/leaptestskey.pem')
+ #certs_path = get_certs_path()
+ #ctx.use_certificate_file(certs_path + '/leaptestscert.pem')
+ #ctx.use_privatekey_file(certs_path + '/leaptestskey.pem')
+ ctx.use_certificate_file(where('leaptestscert.pem'))
+ ctx.use_privatekey_file(where('leaptestskey.pem'))
return ctx
-if __name__ == "__main__":
-
- from twisted.python import log
- log.startLogging(sys.stdout)
-
+def serve_fake_provider():
root = Resource()
root.putChild("provider.json", File("./provider.json"))
config = Resource()
@@ -293,3 +292,11 @@ if __name__ == "__main__":
reactor.listenSSL(8443, factory, OpenSSLServerContextFactory())
reactor.run()
+
+
+if __name__ == "__main__":
+
+ from twisted.python import log
+ log.startLogging(sys.stdout)
+
+ serve_fake_provider()
diff --git a/src/leap/gui/firstrun/wizard.py b/src/leap/gui/firstrun/wizard.py
index 9b77b877..89209401 100755
--- a/src/leap/gui/firstrun/wizard.py
+++ b/src/leap/gui/firstrun/wizard.py
@@ -2,8 +2,11 @@
import logging
import sip
-sip.setapi('QString', 2)
-sip.setapi('QVariant', 2)
+try:
+ sip.setapi('QString', 2)
+ sip.setapi('QVariant', 2)
+except ValueError:
+ pass
from PyQt4 import QtCore
from PyQt4 import QtGui
@@ -46,12 +49,29 @@ TODO-ish:
"""
+def get_pages_dict():
+ return OrderedDict((
+ ('intro', firstrun.intro.IntroPage),
+ ('providerselection',
+ firstrun.providerselect.SelectProviderPage),
+ ('login', firstrun.login.LogInPage),
+ ('providerinfo', firstrun.providerinfo.ProviderInfoPage),
+ ('providersetupvalidation',
+ firstrun.providersetup.ProviderSetupValidationPage),
+ ('signup', firstrun.register.RegisterUserPage),
+ ('signupvalidation',
+ firstrun.regvalidation.RegisterUserValidationPage),
+ ('lastpage', firstrun.last.LastPage)
+ ))
+
+
class FirstRunWizard(QtGui.QWizard):
def __init__(
self,
conductor_instance,
parent=None,
+ pages_dict=None,
eip_username=None,
providers=None,
success_cb=None, is_provider_setup=False,
@@ -112,20 +132,7 @@ class FirstRunWizard(QtGui.QWizard):
self.is_previously_registered = bool(self.eip_username)
self.from_login = False
- pages_dict = OrderedDict((
- ('intro', firstrun.intro.IntroPage),
- ('providerselection',
- firstrun.providerselect.SelectProviderPage),
- ('login', firstrun.login.LogInPage),
- ('providerinfo', firstrun.providerinfo.ProviderInfoPage),
- ('providersetupvalidation',
- firstrun.providersetup.ProviderSetupValidationPage),
- ('signup', firstrun.register.RegisterUserPage),
- ('signupvalidation',
- firstrun.regvalidation.RegisterUserValidationPage),
- ('connecting', firstrun.connect.ConnectingPage),
- ('lastpage', firstrun.last.LastPage)
- ))
+ pages_dict = pages_dict or get_pages_dict()
self.add_pages_from_dict(pages_dict)
self.validation_errors = {}
@@ -146,6 +153,10 @@ class FirstRunWizard(QtGui.QWizard):
# TODO: set style for MAC / windows ...
#self.setWizardStyle()
+ #
+ # setup pages in wizard
+ #
+
def add_pages_from_dict(self, pages_dict):
"""
@param pages_dict: the dictionary with pages, where
@@ -168,6 +179,10 @@ class FirstRunWizard(QtGui.QWizard):
"""
return self.pages_dict.keys().index(page_name)
+ #
+ # validation errors
+ #
+
def set_validation_error(self, pagename, error):
self.validation_errors[pagename] = error
@@ -179,20 +194,6 @@ class FirstRunWizard(QtGui.QWizard):
def get_validation_error(self, pagename):
return self.validation_errors.get(pagename, None)
- def set_providerconfig(self, providerconfig):
- self.providerconfig = providerconfig
-
- def setWindowFlags(self, flags):
- logger.debug('setting window flags')
- QtGui.QWizard.setWindowFlags(self, flags)
-
- def focusOutEvent(self, event):
- # needed ?
- self.setFocus(True)
- self.activateWindow()
- self.raise_()
- self.show()
-
def accept(self):
"""
final step in the wizard.
@@ -246,11 +247,14 @@ class FirstRunWizard(QtGui.QWizard):
if cb and callable(cb):
self.success_cb()
- def get_provider_by_index(self):
- provider = self.field('provider_index')
- return self.providers[provider]
+ # misc helpers
def get_random_str(self, n):
+ """
+ returns a random string
+ :param n: the length of the desired string
+ :rvalue: str
+ """
from string import (ascii_uppercase, ascii_lowercase, digits)
from random import choice
return ''.join(choice(
@@ -258,6 +262,24 @@ class FirstRunWizard(QtGui.QWizard):
ascii_lowercase +
digits) for x in range(n))
+ def set_providerconfig(self, providerconfig):
+ """
+ sets a providerconfig attribute
+ used when we fetch and parse a json configuration
+ """
+ self.providerconfig = providerconfig
+
+ def get_provider_by_index(self): # pragma: no cover
+ """
+ returns the value of a provider given its index.
+ this was used in the select provider page,
+ in the case where we were preseeding providers in a combobox
+ """
+ # Leaving it here for the moment when we go back at the
+ # option of preseeding with known provider values.
+ provider = self.field('provider_index')
+ return self.providers[provider]
+
if __name__ == '__main__':
# standalone test
diff --git a/src/leap/gui/progress.py b/src/leap/gui/progress.py
index 64b87b2c..ffea80de 100644
--- a/src/leap/gui/progress.py
+++ b/src/leap/gui/progress.py
@@ -4,7 +4,7 @@ from first run wizard
"""
try:
from collections import OrderedDict
-except ImportError:
+except ImportError: # pragma: no cover
# We must be in 2.6
from leap.util.dicts import OrderedDict
@@ -73,15 +73,16 @@ class ProgressStepContainer(object):
self.steps = {}
def step(self, identity):
- return self.step.get(identity)
+ return self.steps.get(identity, None)
def addStep(self, step):
self.steps[step.index] = step
def removeStep(self, step):
- del self.steps[step.index]
- del step
- self.dirty = True
+ if step and self.steps.get(step.index, None):
+ del self.steps[step.index]
+ del step
+ self.dirty = True
def removeAllSteps(self):
for item in iter(self):
@@ -107,7 +108,7 @@ class StepsTableWidget(QtGui.QTableWidget):
"""
def __init__(self, parent=None):
- super(StepsTableWidget, self).__init__(parent)
+ super(StepsTableWidget, self).__init__(parent=parent)
# remove headers and all edit/select behavior
self.horizontalHeader().hide()
@@ -149,18 +150,39 @@ class StepsTableWidget(QtGui.QTableWidget):
class WithStepsMixIn(object):
+ """
+ This Class is a mixin that can be inherited
+ by InlineValidation pages (which will display
+ a progress steps widget in the same page as the form)
+ or by Validation Pages (which will only display
+ the progress steps in the page, below a progress bar widget)
+ """
+ STEPS_TIMER_MS = 100
- # worker threads for checks
+ #
+ # methods related to worker threads
+ # launched for individual checks
+ #
def setupStepsProcessingQueue(self):
+ """
+ should be called from the init method
+ of the derived classes
+ """
self.steps_queue = Queue.Queue()
self.stepscheck_timer = QtCore.QTimer()
self.stepscheck_timer.timeout.connect(self.processStepsQueue)
- self.stepscheck_timer.start(100)
+ self.stepscheck_timer.start(self.STEPS_TIMER_MS)
# we need to keep a reference to child threads
self.threads = []
def do_checks(self):
+ """
+ main entry point for checks.
+ it calls _do_checks in derived classes,
+ and it expects it to be a generator
+ yielding a tuple in the form (("message", progress_int), checkfunction)
+ """
# yo dawg, I heard you like checks
# so I put a __do_checks in your do_checks
@@ -168,7 +190,7 @@ class WithStepsMixIn(object):
def __do_checks(fun=None, queue=None):
- for checkcase in fun():
+ for checkcase in fun(): # pragma: no cover
checkmsg, checkfun = checkcase
queue.put(checkmsg)
@@ -180,15 +202,34 @@ class WithStepsMixIn(object):
__do_checks,
fun=self._do_checks,
queue=self.steps_queue))
- t.finished.connect(self.on_checks_validation_ready)
+ if hasattr(self, 'on_checks_validation_ready'):
+ t.finished.connect(self.on_checks_validation_ready)
t.begin()
self.threads.append(t)
+ def processStepsQueue(self):
+ """
+ consume steps queue
+ and pass messages
+ to the ui updater functions
+ """
+ while self.steps_queue.qsize():
+ try:
+ status = self.steps_queue.get(0)
+ if status == "failed":
+ self.set_failed_icon()
+ else:
+ self.onStepStatusChanged(*status)
+ except Queue.Empty: # pragma: no cover
+ pass
+
def fail(self, err=None):
"""
return failed state
and send error notification as
- a nice side effect
+ a nice side effect. this function is called from
+ the _do_checks check functions returned in the
+ generator.
"""
wizard = self.wizard()
senderr = lambda err: wizard.set_validation_error(
@@ -202,38 +243,29 @@ class WithStepsMixIn(object):
def launch_checks(self):
self.do_checks()
+ # (gui) presentation stuff begins #####################
+
# slot
#@QtCore.pyqtSlot(str, int)
def onStepStatusChanged(self, status, progress=None):
+ status = unicode(status)
if status not in ("head_sentinel", "end_sentinel"):
self.add_status_line(status)
if status in ("end_sentinel"):
- self.checks_finished = True
+ #self.checks_finished = True
self.set_checked_icon()
if progress and hasattr(self, 'progress'):
self.progress.setValue(progress)
self.progress.update()
- def processStepsQueue(self):
- """
- consume steps queue
- and pass messages
- to the ui updater functions
- """
- while self.steps_queue.qsize():
- try:
- status = self.steps_queue.get(0)
- if status == "failed":
- self.set_failed_icon()
- else:
- self.onStepStatusChanged(*status)
- except Queue.Empty:
- pass
-
def setupSteps(self):
self.steps = ProgressStepContainer()
# steps table widget
- self.stepsTableWidget = StepsTableWidget(self)
+ if isinstance(self, QtCore.QObject):
+ parent = self
+ else:
+ parent = None
+ self.stepsTableWidget = StepsTableWidget(parent=parent)
zeros = (0, 0, 0, 0)
self.stepsTableWidget.setContentsMargins(*zeros)
self.errors = OrderedDict()
@@ -242,15 +274,17 @@ class WithStepsMixIn(object):
self.errors[name] = error
def pop_first_error(self):
- return list(reversed(self.errors.items())).pop()
+ errkey, errval = list(reversed(self.errors.items())).pop()
+ del self.errors[errkey]
+ return errkey, errval
def clean_errors(self):
self.errors = OrderedDict()
def clean_wizard_errors(self, pagename=None):
- if pagename is None:
+ if pagename is None: # pragma: no cover
pagename = getattr(self, 'prev_page', None)
- if pagename is None:
+ if pagename is None: # pragma: no cover
return
logger.debug('cleaning wizard errors for %s' % pagename)
self.wizard().set_validation_error(pagename, None)
@@ -295,6 +329,8 @@ class WithStepsMixIn(object):
# setting cell widget.
# see note on StepsTableWidget about plans to
# change this for a better solution.
+ if not hasattr(self, 'steps'):
+ return
index = len(self.steps)
table = self.stepsTableWidget
_index = index - 1 if current else index - 2
@@ -340,6 +376,9 @@ class WithStepsMixIn(object):
def is_done(self):
return self.done
+ # convenience for going back and forth
+ # in the wizard pages.
+
def go_back(self):
self.wizard().back()
diff --git a/src/leap/gui/tests/__init__.py b/src/leap/gui/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/gui/tests/__init__.py
diff --git a/src/leap/gui/tests/test_firstrun_login.py b/src/leap/gui/tests/test_firstrun_login.py
new file mode 100644
index 00000000..fa800c23
--- /dev/null
+++ b/src/leap/gui/tests/test_firstrun_login.py
@@ -0,0 +1,212 @@
+import sys
+import unittest
+
+import mock
+
+from leap.testing import qunittest
+#from leap.testing import pyqt
+
+from PyQt4 import QtGui
+#from PyQt4 import QtCore
+#import PyQt4.QtCore # some weirdness with mock module
+
+from PyQt4.QtTest import QTest
+from PyQt4.QtCore import Qt
+
+from leap.gui import firstrun
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ # We must be in 2.6
+ from leap.util.dicts import OrderedDict
+
+
+class TestPage(firstrun.login.LogInPage):
+ pass
+
+
+class LogInPageLogicTestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+ __name__ = "register user page logic tests"
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.page = TestPage(None)
+ self.page.wizard = mock.MagicMock()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.page = None
+
+ def test__do_checks(self):
+ eq = self.assertEqual
+
+ self.page.userNameLineEdit.setText('testuser@domain')
+ self.page.userPasswordLineEdit.setText('testpassword')
+
+ # fake register process
+ with mock.patch('leap.base.auth.LeapSRPRegister') as mockAuth:
+ mockSignup = mock.MagicMock()
+
+ reqMockup = mock.Mock()
+ # XXX should inject bad json to get error
+ reqMockup.content = '{"errors": null}'
+ mockSignup.register_user.return_value = (True, reqMockup)
+ mockAuth.return_value = mockSignup
+ checks = [x for x in self.page._do_checks()]
+
+ eq(len(checks), 4)
+ labels = [str(x) for (x, y), z in checks]
+ eq(labels, ['head_sentinel',
+ 'Resolving domain name',
+ 'Validating credentials',
+ 'end_sentinel'])
+ progress = [y for (x, y), z in checks]
+ eq(progress, [0, 20, 60, 100])
+
+ # normal run, ie, no exceptions
+
+ checkfuns = [z for (x, y), z in checks]
+ checkusername, resolvedomain, valcreds = checkfuns[:-1]
+
+ self.assertTrue(checkusername())
+ #self.mocknetchecker.check_name_resolution.assert_called_with(
+ #'test_provider1')
+
+ self.assertTrue(resolvedomain())
+ #self.mockpcertchecker.is_https_working.assert_called_with(
+ #"https://test_provider1", verify=True)
+
+ self.assertTrue(valcreds())
+
+ # XXX missing: inject failing exceptions
+ # XXX TODO make it break
+
+
+class RegisterUserPageUITestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+ __name__ = "Register User Page UI tests"
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+
+ self.pagename = "signup"
+ pages = OrderedDict((
+ (self.pagename, TestPage),
+ ('providersetupvalidation',
+ firstrun.regvalidation.RegisterUserValidationPage)))
+ self.wizard = firstrun.wizard.FirstRunWizard(None, pages_dict=pages)
+ self.page = self.wizard.page(self.wizard.get_page_index(self.pagename))
+
+ self.page.do_checks = mock.Mock()
+
+ # wizard would do this for us
+ self.page.initializePage()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.wizard = None
+
+ # XXX refactor out
+ def fill_field(self, field, text):
+ """
+ fills a field (line edit) that is passed along
+ :param field: the qLineEdit
+ :param text: the text to be filled
+ :type field: QLineEdit widget
+ :type text: str
+ """
+ keyp = QTest.keyPress
+ field.setFocus(True)
+ for c in text:
+ keyp(field, c)
+ self.assertEqual(field.text(), text)
+
+ def del_field(self, field):
+ """
+ deletes entried text in
+ field line edit
+ :param field: the QLineEdit
+ :type field: QLineEdit widget
+ """
+ keyp = QTest.keyPress
+ for c in range(len(field.text())):
+ keyp(field, Qt.Key_Backspace)
+ self.assertEqual(field.text(), "")
+
+ def test_buttons_disabled_until_textentry(self):
+ # it's a commit button this time
+ nextbutton = self.wizard.button(QtGui.QWizard.CommitButton)
+
+ self.assertFalse(nextbutton.isEnabled())
+
+ f_username = self.page.userNameLineEdit
+ f_password = self.page.userPasswordLineEdit
+
+ self.fill_field(f_username, "testuser")
+ self.fill_field(f_password, "testpassword")
+
+ # commit should be enabled
+ # XXX Need a workaround here
+ # because the isComplete is not being evaluated...
+ # (no event loop running??)
+ #import ipdb;ipdb.set_trace()
+ #self.assertTrue(nextbutton.isEnabled())
+ self.assertTrue(self.page.isComplete())
+
+ self.del_field(f_username)
+ self.del_field(f_password)
+
+ # after rm fields commit button
+ # should be disabled again
+ #self.assertFalse(nextbutton.isEnabled())
+ self.assertFalse(self.page.isComplete())
+
+ def test_validate_page(self):
+ self.assertFalse(self.page.validatePage())
+ # XXX TODO MOAR CASES...
+ # add errors, False
+ # change done, False
+ # not done, do_checks called
+ # click confirm, True
+ # done and do_confirm, True
+
+ def test_next_id(self):
+ self.assertEqual(self.page.nextId(), 1)
+
+ def test_paint_event(self):
+ self.page.populateErrors = mock.Mock()
+ self.page.paintEvent(None)
+ self.page.populateErrors.assert_called_with()
+
+ def test_validation_ready(self):
+ f_username = self.page.userNameLineEdit
+ f_password = self.page.userPasswordLineEdit
+
+ self.fill_field(f_username, "testuser")
+ self.fill_field(f_password, "testpassword")
+
+ self.page.done = True
+ self.page.on_checks_validation_ready()
+ self.assertFalse(f_username.isEnabled())
+ self.assertFalse(f_password.isEnabled())
+
+ self.assertEqual(self.page.validationMsg.text(),
+ "Credentials validated.")
+ self.assertEqual(self.page.do_confirm_next, True)
+
+ def test_regex(self):
+ # XXX enter invalid username with key presses
+ # check text is not updated
+ pass
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/gui/tests/test_firstrun_providerselect.py b/src/leap/gui/tests/test_firstrun_providerselect.py
new file mode 100644
index 00000000..976c68cd
--- /dev/null
+++ b/src/leap/gui/tests/test_firstrun_providerselect.py
@@ -0,0 +1,201 @@
+import sys
+import unittest
+
+import mock
+
+from leap.testing import qunittest
+#from leap.testing import pyqt
+
+from PyQt4 import QtGui
+#from PyQt4 import QtCore
+#import PyQt4.QtCore # some weirdness with mock module
+
+from PyQt4.QtTest import QTest
+from PyQt4.QtCore import Qt
+
+from leap.gui import firstrun
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ # We must be in 2.6
+ from leap.util.dicts import OrderedDict
+
+
+class TestPage(firstrun.providerselect.SelectProviderPage):
+ pass
+
+
+class SelectProviderPageLogicTestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.page = TestPage(None)
+ self.page.wizard = mock.MagicMock()
+
+ mocknetchecker = mock.Mock()
+ self.page.wizard().netchecker.return_value = mocknetchecker
+ self.mocknetchecker = mocknetchecker
+
+ mockpcertchecker = mock.Mock()
+ self.page.wizard().providercertchecker.return_value = mockpcertchecker
+ self.mockpcertchecker = mockpcertchecker
+
+ mockeipconfchecker = mock.Mock()
+ self.page.wizard().eipconfigchecker.return_value = mockeipconfchecker
+ self.mockeipconfchecker = mockeipconfchecker
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.page = None
+
+ def test__do_checks(self):
+ eq = self.assertEqual
+
+ self.page.providerNameEdit.setText('test_provider1')
+
+ checks = [x for x in self.page._do_checks()]
+ eq(len(checks), 5)
+ labels = [str(x) for (x, y), z in checks]
+ eq(labels, ['head_sentinel', 'checking domain name',
+ 'checking https connection',
+ 'fetching provider info', 'end_sentinel'])
+ progress = [y for (x, y), z in checks]
+ eq(progress, [0, 20, 40, 80, 100])
+
+ # normal run, ie, no exceptions
+
+ checkfuns = [z for (x, y), z in checks]
+ namecheck, httpscheck, fetchinfo = checkfuns[1:-1]
+
+ self.assertTrue(namecheck())
+ self.mocknetchecker.check_name_resolution.assert_called_with(
+ 'test_provider1')
+
+ self.assertTrue(httpscheck())
+ self.mockpcertchecker.is_https_working.assert_called_with(
+ "https://test_provider1", verify=True)
+
+ self.assertTrue(fetchinfo())
+ self.mockeipconfchecker.fetch_definition.assert_called_with(
+ domain="test_provider1")
+
+ # XXX missing: inject failing exceptions
+ # XXX TODO make it break
+
+
+class SelectProviderPageUITestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+ __name__ = "Select Provider Page UI tests"
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+
+ self.pagename = "providerselection"
+ pages = OrderedDict((
+ (self.pagename, TestPage),
+ ('providerinfo',
+ firstrun.providerinfo.ProviderInfoPage)))
+ self.wizard = firstrun.wizard.FirstRunWizard(None, pages_dict=pages)
+ self.page = self.wizard.page(self.wizard.get_page_index(self.pagename))
+
+ self.page.do_checks = mock.Mock()
+
+ # wizard would do this for us
+ self.page.initializePage()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.wizard = None
+
+ def fill_provider(self):
+ """
+ fills provider line edit
+ """
+ keyp = QTest.keyPress
+ pedit = self.page.providerNameEdit
+ pedit.setFocus(True)
+ for c in "testprovider":
+ keyp(pedit, c)
+ self.assertEqual(pedit.text(), "testprovider")
+
+ def del_provider(self):
+ """
+ deletes entried provider in
+ line edit
+ """
+ keyp = QTest.keyPress
+ pedit = self.page.providerNameEdit
+ for c in range(len("testprovider")):
+ keyp(pedit, Qt.Key_Backspace)
+ self.assertEqual(pedit.text(), "")
+
+ def test_buttons_disabled_until_textentry(self):
+ nextbutton = self.wizard.button(QtGui.QWizard.NextButton)
+ checkbutton = self.page.providerCheckButton
+
+ self.assertFalse(nextbutton.isEnabled())
+ self.assertFalse(checkbutton.isEnabled())
+
+ self.fill_provider()
+ # checkbutton should be enabled
+ self.assertTrue(checkbutton.isEnabled())
+ self.assertFalse(nextbutton.isEnabled())
+
+ self.del_provider()
+ # after rm provider checkbutton disabled again
+ self.assertFalse(checkbutton.isEnabled())
+ self.assertFalse(nextbutton.isEnabled())
+
+ def test_check_button_triggers_tests(self):
+ checkbutton = self.page.providerCheckButton
+ self.assertFalse(checkbutton.isEnabled())
+ self.assertFalse(self.page.do_checks.called)
+
+ self.fill_provider()
+
+ self.assertTrue(checkbutton.isEnabled())
+ mclick = QTest.mouseClick
+ # click!
+ mclick(checkbutton, Qt.LeftButton)
+ self.waitFor(seconds=0.1)
+ self.assertTrue(self.page.do_checks.called)
+
+ # XXX
+ # can play with different side_effects for do_checks mock...
+ # so we can see what happens with errors and so on
+
+ def test_page_completed_after_checks(self):
+ nextbutton = self.wizard.button(QtGui.QWizard.NextButton)
+ self.assertFalse(nextbutton.isEnabled())
+
+ self.assertFalse(self.page.isComplete())
+ self.fill_provider()
+ # simulate checks done
+ self.page.done = True
+ self.page.on_checks_validation_ready()
+ self.assertTrue(self.page.isComplete())
+ # cannot test for nexbutton enabled
+ # cause it's the the wizard loop
+ # that would do that I think
+
+ def test_validate_page(self):
+ self.assertTrue(self.page.validatePage())
+
+ def test_next_id(self):
+ self.assertEqual(self.page.nextId(), 1)
+
+ def test_paint_event(self):
+ self.page.populateErrors = mock.Mock()
+ self.page.paintEvent(None)
+ self.page.populateErrors.assert_called_with()
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/gui/tests/test_firstrun_register.py b/src/leap/gui/tests/test_firstrun_register.py
new file mode 100644
index 00000000..3447fe9d
--- /dev/null
+++ b/src/leap/gui/tests/test_firstrun_register.py
@@ -0,0 +1,244 @@
+import sys
+import unittest
+
+import mock
+
+from leap.testing import qunittest
+#from leap.testing import pyqt
+
+from PyQt4 import QtGui
+#from PyQt4 import QtCore
+#import PyQt4.QtCore # some weirdness with mock module
+
+from PyQt4.QtTest import QTest
+from PyQt4.QtCore import Qt
+
+from leap.gui import firstrun
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ # We must be in 2.6
+ from leap.util.dicts import OrderedDict
+
+
+class TestPage(firstrun.register.RegisterUserPage):
+
+ def field(self, field):
+ if field == "provider_domain":
+ return "testprovider"
+
+
+class RegisterUserPageLogicTestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+ __name__ = "register user page logic tests"
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.page = TestPage(None)
+ self.page.wizard = mock.MagicMock()
+
+ #mocknetchecker = mock.Mock()
+ #self.page.wizard().netchecker.return_value = mocknetchecker
+ #self.mocknetchecker = mocknetchecker
+#
+ #mockpcertchecker = mock.Mock()
+ #self.page.wizard().providercertchecker.return_value = mockpcertchecker
+ #self.mockpcertchecker = mockpcertchecker
+#
+ #mockeipconfchecker = mock.Mock()
+ #self.page.wizard().eipconfigchecker.return_value = mockeipconfchecker
+ #self.mockeipconfchecker = mockeipconfchecker
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.page = None
+
+ def test__do_checks(self):
+ eq = self.assertEqual
+
+ self.page.userNameLineEdit.setText('testuser')
+ self.page.userPasswordLineEdit.setText('testpassword')
+ self.page.userPassword2LineEdit.setText('testpassword')
+
+ # fake register process
+ with mock.patch('leap.base.auth.LeapSRPRegister') as mockAuth:
+ mockSignup = mock.MagicMock()
+
+ reqMockup = mock.Mock()
+ # XXX should inject bad json to get error
+ reqMockup.content = '{"errors": null}'
+ mockSignup.register_user.return_value = (True, reqMockup)
+ mockAuth.return_value = mockSignup
+ checks = [x for x in self.page._do_checks()]
+
+ eq(len(checks), 3)
+ labels = [str(x) for (x, y), z in checks]
+ eq(labels, ['head_sentinel',
+ 'registering with provider',
+ 'end_sentinel'])
+ progress = [y for (x, y), z in checks]
+ eq(progress, [0, 40, 100])
+
+ # normal run, ie, no exceptions
+
+ checkfuns = [z for (x, y), z in checks]
+ passcheck, register = checkfuns[:-1]
+
+ self.assertTrue(passcheck())
+ #self.mocknetchecker.check_name_resolution.assert_called_with(
+ #'test_provider1')
+
+ self.assertTrue(register())
+ #self.mockpcertchecker.is_https_working.assert_called_with(
+ #"https://test_provider1", verify=True)
+
+ # XXX missing: inject failing exceptions
+ # XXX TODO make it break
+
+
+class RegisterUserPageUITestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+ __name__ = "Register User Page UI tests"
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+
+ self.pagename = "signup"
+ pages = OrderedDict((
+ (self.pagename, TestPage),
+ ('signupvalidation',
+ firstrun.regvalidation.RegisterUserValidationPage)))
+ self.wizard = firstrun.wizard.FirstRunWizard(None, pages_dict=pages)
+ self.page = self.wizard.page(self.wizard.get_page_index(self.pagename))
+
+ self.page.do_checks = mock.Mock()
+
+ # wizard would do this for us
+ self.page.initializePage()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.wizard = None
+
+ def fill_field(self, field, text):
+ """
+ fills a field (line edit) that is passed along
+ :param field: the qLineEdit
+ :param text: the text to be filled
+ :type field: QLineEdit widget
+ :type text: str
+ """
+ keyp = QTest.keyPress
+ field.setFocus(True)
+ for c in text:
+ keyp(field, c)
+ self.assertEqual(field.text(), text)
+
+ def del_field(self, field):
+ """
+ deletes entried text in
+ field line edit
+ :param field: the QLineEdit
+ :type field: QLineEdit widget
+ """
+ keyp = QTest.keyPress
+ for c in range(len(field.text())):
+ keyp(field, Qt.Key_Backspace)
+ self.assertEqual(field.text(), "")
+
+ def test_buttons_disabled_until_textentry(self):
+ # it's a commit button this time
+ nextbutton = self.wizard.button(QtGui.QWizard.CommitButton)
+
+ self.assertFalse(nextbutton.isEnabled())
+
+ f_username = self.page.userNameLineEdit
+ f_password = self.page.userPasswordLineEdit
+ f_passwor2 = self.page.userPassword2LineEdit
+
+ self.fill_field(f_username, "testuser")
+ self.fill_field(f_password, "testpassword")
+ self.fill_field(f_passwor2, "testpassword")
+
+ # commit should be enabled
+ # XXX Need a workaround here
+ # because the isComplete is not being evaluated...
+ # (no event loop running??)
+ #import ipdb;ipdb.set_trace()
+ #self.assertTrue(nextbutton.isEnabled())
+ self.assertTrue(self.page.isComplete())
+
+ self.del_field(f_username)
+ self.del_field(f_password)
+ self.del_field(f_passwor2)
+
+ # after rm fields commit button
+ # should be disabled again
+ #self.assertFalse(nextbutton.isEnabled())
+ self.assertFalse(self.page.isComplete())
+
+ @unittest.skip
+ def test_check_button_triggers_tests(self):
+ checkbutton = self.page.providerCheckButton
+ self.assertFalse(checkbutton.isEnabled())
+ self.assertFalse(self.page.do_checks.called)
+
+ self.fill_provider()
+
+ self.assertTrue(checkbutton.isEnabled())
+ mclick = QTest.mouseClick
+ # click!
+ mclick(checkbutton, Qt.LeftButton)
+ self.waitFor(seconds=0.1)
+ self.assertTrue(self.page.do_checks.called)
+
+ # XXX
+ # can play with different side_effects for do_checks mock...
+ # so we can see what happens with errors and so on
+
+ def test_validate_page(self):
+ self.assertFalse(self.page.validatePage())
+ # XXX TODO MOAR CASES...
+ # add errors, False
+ # change done, False
+ # not done, do_checks called
+ # click confirm, True
+ # done and do_confirm, True
+
+ def test_next_id(self):
+ self.assertEqual(self.page.nextId(), 1)
+
+ def test_paint_event(self):
+ self.page.populateErrors = mock.Mock()
+ self.page.paintEvent(None)
+ self.page.populateErrors.assert_called_with()
+
+ def test_validation_ready(self):
+ f_username = self.page.userNameLineEdit
+ f_password = self.page.userPasswordLineEdit
+ f_passwor2 = self.page.userPassword2LineEdit
+
+ self.fill_field(f_username, "testuser")
+ self.fill_field(f_password, "testpassword")
+ self.fill_field(f_passwor2, "testpassword")
+
+ self.page.done = True
+ self.page.on_checks_validation_ready()
+ self.assertFalse(f_username.isEnabled())
+ self.assertFalse(f_password.isEnabled())
+ self.assertFalse(f_passwor2.isEnabled())
+
+ self.assertEqual(self.page.validationMsg.text(),
+ "Registration succeeded!")
+ self.assertEqual(self.page.do_confirm_next, True)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/gui/tests/test_firstrun_wizard.py b/src/leap/gui/tests/test_firstrun_wizard.py
new file mode 100644
index 00000000..091cd932
--- /dev/null
+++ b/src/leap/gui/tests/test_firstrun_wizard.py
@@ -0,0 +1,137 @@
+import sys
+import unittest
+
+import mock
+
+from leap.testing import qunittest
+from leap.testing import pyqt
+
+from PyQt4 import QtGui
+#from PyQt4 import QtCore
+import PyQt4.QtCore # some weirdness with mock module
+
+from PyQt4.QtTest import QTest
+#from PyQt4.QtCore import Qt
+
+from leap.gui import firstrun
+
+
+class TestWizard(firstrun.wizard.FirstRunWizard):
+ pass
+
+
+PAGES_DICT = dict((
+ ('intro', firstrun.intro.IntroPage),
+ ('providerselection',
+ firstrun.providerselect.SelectProviderPage),
+ ('login', firstrun.login.LogInPage),
+ ('providerinfo', firstrun.providerinfo.ProviderInfoPage),
+ ('providersetupvalidation',
+ firstrun.providersetup.ProviderSetupValidationPage),
+ ('signup', firstrun.register.RegisterUserPage),
+ ('signupvalidation',
+ firstrun.regvalidation.RegisterUserValidationPage),
+ ('lastpage', firstrun.last.LastPage)
+))
+
+
+mockQSettings = mock.MagicMock()
+mockQSettings().setValue.return_value = True
+
+#PyQt4.QtCore.QSettings = mockQSettings
+
+
+class FirstRunWizardTestCase(qunittest.TestCase):
+
+ # XXX can spy on signal connections
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.wizard = TestWizard(None)
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+ self.wizard = None
+
+ def test_defaults(self):
+ self.assertEqual(self.wizard.pages_dict, PAGES_DICT)
+
+ @mock.patch('PyQt4.QtCore.QSettings', mockQSettings)
+ def test_accept(self):
+ """
+ test the main accept method
+ that gets called when user has gone
+ thru all the wizard and click on finish button
+ """
+
+ self.wizard.success_cb = mock.Mock()
+ self.wizard.success_cb.return_value = True
+
+ # dummy values; we inject them in the field
+ # mocks (where wizard gets them) and then
+ # we check that they are passed to QSettings.setValue
+ field_returns = ["testuser", "1234", "testprovider", True]
+
+ def field_side_effects(*args):
+ return field_returns.pop(0)
+
+ self.wizard.field = mock.Mock(side_effect=field_side_effects)
+ self.wizard.get_random_str = mock.Mock()
+ RANDOMSTR = "thisisarandomstringTM"
+ self.wizard.get_random_str.return_value = RANDOMSTR
+
+ # mocked settings (see decorator on this method)
+ mqs = PyQt4.QtCore.QSettings
+
+ # go! call accept...
+ self.wizard.accept()
+
+ # did settings().setValue get called with the proper
+ # arguments?
+ call = mock.call
+ calls = [call("FirstRunWizardDone", True),
+ call("provider_domain", "testprovider"),
+ call("remember_user_and_pass", True),
+ call("eip_username", "testuser@testprovider"),
+ call("testprovider_seed", RANDOMSTR)]
+ mqs().setValue.assert_has_calls(calls, any_order=True)
+
+ # assert success callback is success oh boy
+ self.wizard.success_cb.assert_called_with()
+
+ def test_random_str(self):
+ r = self.wizard.get_random_str(42)
+ self.assertTrue(len(r) == 42)
+
+ def test_page_index(self):
+ """
+ we test both the get_page_index function
+ and the correct ordering of names
+ """
+ # remember it's implemented as an ordered dict
+
+ pagenames = ('intro', 'providerselection', 'login', 'providerinfo',
+ 'providersetupvalidation', 'signup', 'signupvalidation',
+ 'lastpage')
+ eq = self.assertEqual
+ w = self.wizard
+ for index, name in enumerate(pagenames):
+ eq(w.get_page_index(name), index)
+
+ def test_validation_errors(self):
+ """
+ tests getters and setters for validation errors
+ """
+ page = "testpage"
+ eq = self.assertEqual
+ w = self.wizard
+ eq(w.get_validation_error(page), None)
+ w.set_validation_error(page, "error")
+ eq(w.get_validation_error(page), "error")
+ w.clean_validation_error(page)
+ eq(w.get_validation_error(page), None)
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/gui/test_mainwindow_rc.py b/src/leap/gui/tests/test_mainwindow_rc.py
index c5abb4aa..67b9fae0 100644
--- a/src/leap/gui/test_mainwindow_rc.py
+++ b/src/leap/gui/tests/test_mainwindow_rc.py
@@ -27,3 +27,6 @@ class MainWindowResourcesTest(unittest.TestCase):
self.assertEqual(
hashlib.md5(mainwindow_rc.qt_resource_data).hexdigest(),
'53e196f29061d8f08f112e5a2e64eb53')
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/gui/tests/test_progress.py b/src/leap/gui/tests/test_progress.py
new file mode 100644
index 00000000..1f9f9e38
--- /dev/null
+++ b/src/leap/gui/tests/test_progress.py
@@ -0,0 +1,449 @@
+from collections import namedtuple
+import sys
+import unittest
+import Queue
+
+import mock
+
+from leap.testing import qunittest
+from leap.testing import pyqt
+
+from PyQt4 import QtGui
+from PyQt4 import QtCore
+from PyQt4.QtTest import QTest
+from PyQt4.QtCore import Qt
+
+from leap.gui import progress
+
+
+class ProgressStepTestCase(unittest.TestCase):
+
+ def test_step_attrs(self):
+ ps = progress.ProgressStep
+ step = ps('test', False, 1)
+ # instance
+ self.assertEqual(step.index, 1)
+ self.assertEqual(step.name, "test")
+ self.assertEqual(step.done, False)
+ step = ps('test2', True, 2)
+ self.assertEqual(step.index, 2)
+ self.assertEqual(step.name, "test2")
+ self.assertEqual(step.done, True)
+
+ # class methods and attrs
+ self.assertEqual(ps.columns(), ('name', 'done'))
+ self.assertEqual(ps.NAME, 0)
+ self.assertEqual(ps.DONE, 1)
+
+
+class ProgressStepContainerTestCase(unittest.TestCase):
+ def setUp(self):
+ self.psc = progress.ProgressStepContainer()
+
+ def addSteps(self, number):
+ Step = progress.ProgressStep
+ for n in range(number):
+ self.psc.addStep(Step("%s" % n, False, n))
+
+ def test_attrs(self):
+ self.assertEqual(self.psc.columns,
+ ('name', 'done'))
+
+ def test_add_steps(self):
+ Step = progress.ProgressStep
+ self.assertTrue(len(self.psc) == 0)
+ self.psc.addStep(Step('one', False, 0))
+ self.assertTrue(len(self.psc) == 1)
+ self.psc.addStep(Step('two', False, 1))
+ self.assertTrue(len(self.psc) == 2)
+
+ def test_del_all_steps(self):
+ self.assertTrue(len(self.psc) == 0)
+ self.addSteps(5)
+ self.assertTrue(len(self.psc) == 5)
+ self.psc.removeAllSteps()
+ self.assertTrue(len(self.psc) == 0)
+
+ def test_del_step(self):
+ Step = progress.ProgressStep
+ self.addSteps(5)
+ self.assertTrue(len(self.psc) == 5)
+ self.psc.removeStep(self.psc.step(4))
+ self.assertTrue(len(self.psc) == 4)
+ self.psc.removeStep(self.psc.step(4))
+ self.psc.removeStep(Step('none', False, 5))
+ self.psc.removeStep(self.psc.step(4))
+
+ def test_iter(self):
+ self.addSteps(10)
+ self.assertEqual(
+ [x.index for x in self.psc],
+ [x for x in range(10)])
+
+
+class StepsTableWidgetTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.stw = progress.StepsTableWidget()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+
+ def test_defaults(self):
+ self.assertTrue(isinstance(self.stw, QtGui.QTableWidget))
+ self.assertEqual(self.stw.focusPolicy(), 0)
+
+
+class TestWithStepsClass(QtGui.QWidget, progress.WithStepsMixIn):
+
+ def __init__(self, parent=None):
+ super(TestWithStepsClass, self).__init__(parent=parent)
+ self.setupStepsProcessingQueue()
+ self.statuses = []
+ self.current_page = "testpage"
+
+ def onStepStatusChanged(self, *args):
+ """
+ blank out this gui method
+ that will add status lines
+ """
+ self.statuses.append(args)
+
+
+class WithStepsMixInTestCase(qunittest.TestCase):
+
+ TIMER_WAIT = 2 * progress.WithStepsMixIn.STEPS_TIMER_MS / 1000.0
+
+ # XXX can spy on signal connections
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.stepy = TestWithStepsClass()
+ #self.connects = []
+ #pyqt.enableSignalDebugging(
+ #connectCall=lambda *args: self.connects.append(args))
+ #self.assertEqual(self.connects, [])
+ #self.stepy.stepscheck_timer.timeout.disconnect(
+ #self.stepy.processStepsQueue)
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+
+ def test_has_queue(self):
+ s = self.stepy
+ self.assertTrue(hasattr(s, 'steps_queue'))
+ self.assertTrue(isinstance(s.steps_queue, Queue.Queue))
+ self.assertTrue(isinstance(s.stepscheck_timer, QtCore.QTimer))
+
+ def test_do_checks_delegation(self):
+ s = self.stepy
+
+ _do_checks = mock.Mock()
+ _do_checks.return_value = (
+ (("test", 0), lambda: None),
+ (("test", 0), lambda: None))
+ s._do_checks = _do_checks
+ s.do_checks()
+ self.waitFor(seconds=self.TIMER_WAIT)
+ _do_checks.assert_called_with()
+ self.assertEqual(len(s.statuses), 2)
+
+ # test that a failed test interrupts the run
+
+ s.statuses = []
+ _do_checks = mock.Mock()
+ _do_checks.return_value = (
+ (("test", 0), lambda: None),
+ (("test", 0), lambda: False),
+ (("test", 0), lambda: None))
+ s._do_checks = _do_checks
+ s.do_checks()
+ self.waitFor(seconds=self.TIMER_WAIT)
+ _do_checks.assert_called_with()
+ self.assertEqual(len(s.statuses), 2)
+
+ def test_process_queue(self):
+ s = self.stepy
+ q = s.steps_queue
+ s.set_failed_icon = mock.MagicMock()
+ with self.assertRaises(AssertionError):
+ q.put('foo')
+ self.waitFor(seconds=self.TIMER_WAIT)
+ s.set_failed_icon.assert_called_with()
+ q.put("failed")
+ self.waitFor(seconds=self.TIMER_WAIT)
+ s.set_failed_icon.assert_called_with()
+
+ def test_on_checks_validation_ready_called(self):
+ s = self.stepy
+ s.on_checks_validation_ready = mock.MagicMock()
+
+ _do_checks = mock.Mock()
+ _do_checks.return_value = (
+ (("test", 0), lambda: None),)
+ s._do_checks = _do_checks
+ s.do_checks()
+
+ self.waitFor(seconds=self.TIMER_WAIT)
+ s.on_checks_validation_ready.assert_called_with()
+
+ def test_fail(self):
+ s = self.stepy
+
+ s.wizard = mock.Mock()
+ wizard = s.wizard.return_value
+ wizard.set_validation_error.return_value = True
+ s.completeChanged = mock.Mock()
+ s.completeChanged.emit.return_value = True
+
+ self.assertFalse(s.fail(err="foo"))
+ self.waitFor(seconds=self.TIMER_WAIT)
+ wizard.set_validation_error.assert_called_with('testpage', 'foo')
+ s.completeChanged.emit.assert_called_with()
+
+ # with no args
+ s.wizard = mock.Mock()
+ wizard = s.wizard.return_value
+ wizard.set_validation_error.return_value = True
+ s.completeChanged = mock.Mock()
+ s.completeChanged.emit.return_value = True
+
+ self.assertFalse(s.fail())
+ self.waitFor(seconds=self.TIMER_WAIT)
+ with self.assertRaises(AssertionError):
+ wizard.set_validation_error.assert_called_with()
+ s.completeChanged.emit.assert_called_with()
+
+ def test_done(self):
+ s = self.stepy
+ s.done = False
+
+ s.completeChanged = mock.Mock()
+ s.completeChanged.emit.return_value = True
+
+ self.assertFalse(s.is_done())
+ s.set_done()
+ self.assertTrue(s.is_done())
+ s.completeChanged.emit.assert_called_with()
+
+ s.completeChanged = mock.Mock()
+ s.completeChanged.emit.return_value = True
+ s.set_undone()
+ self.assertFalse(s.is_done())
+
+ def test_back_and_next(self):
+ s = self.stepy
+ s.wizard = mock.Mock()
+ wizard = s.wizard.return_value
+ wizard.back.return_value = True
+ wizard.next.return_value = True
+ s.go_back()
+ wizard.back.assert_called_with()
+ s.go_next()
+ wizard.next.assert_called_with()
+
+ def test_on_step_statuschanged_slot(self):
+ s = self.stepy
+ s.onStepStatusChanged = progress.WithStepsMixIn.onStepStatusChanged
+ s.add_status_line = mock.Mock()
+ s.set_checked_icon = mock.Mock()
+ s.progress = mock.Mock()
+ s.progress.setValue.return_value = True
+ s.progress.update.return_value = True
+
+ s.onStepStatusChanged(s, "end_sentinel")
+ s.set_checked_icon.assert_called_with()
+
+ s.onStepStatusChanged(s, "foo")
+ s.add_status_line.assert_called_with("foo")
+
+ s.onStepStatusChanged(s, "bar", 42)
+ s.progress.setValue.assert_called_with(42)
+ s.progress.update.assert_called_with()
+
+ def test_steps_and_errors(self):
+ s = self.stepy
+ s.setupSteps()
+ self.assertTrue(isinstance(s.steps, progress.ProgressStepContainer))
+ self.assertEqual(s.errors, {})
+ s.set_error('fooerror', 'barerror')
+ self.assertEqual(s.errors, {'fooerror': 'barerror'})
+ s.set_error('2', 42)
+ self.assertEqual(s.errors, {'fooerror': 'barerror', '2': 42})
+ fe = s.pop_first_error()
+ self.assertEqual(fe, ('fooerror', 'barerror'))
+ self.assertEqual(s.errors, {'2': 42})
+ s.clean_errors()
+ self.assertEqual(s.errors, {})
+
+ def test_launch_chechs_slot(self):
+ s = self.stepy
+ s.do_checks = mock.Mock()
+ s.launch_checks()
+ s.do_checks.assert_called_with()
+
+ def test_clean_wizard_errors(self):
+ s = self.stepy
+ s.wizard = mock.Mock()
+ wizard = s.wizard.return_value
+ wizard.set_validation_error.return_value = True
+ s.clean_wizard_errors(pagename="foopage")
+ wizard.set_validation_error.assert_called_with("foopage", None)
+
+ def test_clear_table(self):
+ s = self.stepy
+ s.stepsTableWidget = mock.Mock()
+ s.stepsTableWidget.clearContents.return_value = True
+ s.clearTable()
+ s.stepsTableWidget.clearContents.assert_called_with()
+
+ def test_populate_steps_table(self):
+ s = self.stepy
+ Step = namedtuple('Step', ['name', 'done'])
+
+ class Steps(object):
+ columns = ("name", "done")
+ _items = (Step('step1', False), Step('step2', False))
+
+ def __len__(self):
+ return 2
+
+ def __iter__(self):
+ for i in self._items:
+ yield i
+
+ s.steps = Steps()
+
+ s.stepsTableWidget = mock.Mock()
+ s.stepsTableWidget.setItem.return_value = True
+ s.resizeTable = mock.Mock()
+ s.update = mock.Mock()
+ s.populateStepsTable()
+ s.update.assert_called_with()
+ s.resizeTable.assert_called_with()
+
+ # assert stepsTableWidget.setItem called ...
+ # we do not want to get into the actual
+ # <QTableWidgetItem object at 0x92a565c>
+ call_list = s.stepsTableWidget.setItem.call_args_list
+ indexes = [(y, z) for y, z, xx in [x[0] for x in call_list]]
+ self.assertEqual(indexes,
+ [(0, 0), (0, 1), (1, 0), (1, 1)])
+
+ def test_add_status_line(self):
+ s = self.stepy
+ s.steps = progress.ProgressStepContainer()
+ s.stepsTableWidget = mock.Mock()
+ s.stepsTableWidget.width.return_value = 100
+ s.set_item = mock.Mock()
+ s.set_item_icon = mock.Mock()
+ s.add_status_line("new status")
+ s.set_item_icon.assert_called_with(current=False)
+
+ def test_set_item_icon(self):
+ s = self.stepy
+ s.steps = progress.ProgressStepContainer()
+ s.stepsTableWidget = mock.Mock()
+ s.stepsTableWidget.setCellWidget.return_value = True
+ s.stepsTableWidget.width.return_value = 100
+ #s.set_item = mock.Mock()
+ #s.set_item_icon = mock.Mock()
+ s.add_status_line("new status")
+ s.add_status_line("new 2 status")
+ s.add_status_line("new 3 status")
+ call_list = s.stepsTableWidget.setCellWidget.call_args_list
+ indexes = [(y, z) for y, z, xx in [x[0] for x in call_list]]
+ self.assertEqual(
+ indexes,
+ [(0, 1), (-1, 1), (1, 1), (0, 1), (2, 1), (1, 1)])
+
+
+class TestInlineValidationPage(progress.InlineValidationPage):
+ pass
+
+
+class InlineValidationPageTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.page = TestInlineValidationPage()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+
+ def test_defaults(self):
+ self.assertFalse(self.page.done)
+ # if setupProcessingQueue was called
+ self.assertTrue(isinstance(self.page.stepscheck_timer, QtCore.QTimer))
+ self.assertTrue(isinstance(self.page.steps_queue, Queue.Queue))
+
+ def test_validation_frame(self):
+ # test frame creation
+ self.page.stepsTableWidget = progress.StepsTableWidget(
+ parent=self.page)
+ self.page.setupValidationFrame()
+ self.assertTrue(isinstance(self.page.valFrame, QtGui.QFrame))
+
+ # test show steps calls frame.show
+ self.page.valFrame = mock.Mock()
+ self.page.valFrame.show.return_value = True
+ self.page.showStepsFrame()
+ self.page.valFrame.show.assert_called_with()
+
+
+class TestValidationPage(progress.ValidationPage):
+ pass
+
+
+class ValidationPageTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.app = QtGui.QApplication(sys.argv)
+ QtGui.qApp = self.app
+ self.page = TestValidationPage()
+
+ def tearDown(self):
+ QtGui.qApp = None
+ self.app = None
+
+ def test_defaults(self):
+ self.assertFalse(self.page.done)
+ # if setupProcessingQueue was called
+ self.assertTrue(isinstance(self.page.timer, QtCore.QTimer))
+ self.assertTrue(isinstance(self.page.stepscheck_timer, QtCore.QTimer))
+ self.assertTrue(isinstance(self.page.steps_queue, Queue.Queue))
+
+ def test_is_complete(self):
+ self.assertFalse(self.page.isComplete())
+ self.page.done = True
+ self.assertTrue(self.page.isComplete())
+ self.page.done = False
+ self.assertFalse(self.page.isComplete())
+
+ def test_show_hide_progress(self):
+ p = self.page
+ p.progress = mock.Mock()
+ p.progress.show.return_code = True
+ p.show_progress()
+ p.progress.show.assert_called_with()
+ p.progress.hide.return_code = True
+ p.hide_progress()
+ p.progress.hide.assert_called_with()
+
+ def test_initialize_page(self):
+ p = self.page
+ p.timer = mock.Mock()
+ p.timer.singleShot.return_code = True
+ p.initializePage()
+ p.timer.singleShot.assert_called_with(0, p.do_checks)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/gui/tests/test_threads.py b/src/leap/gui/tests/test_threads.py
new file mode 100644
index 00000000..06c19606
--- /dev/null
+++ b/src/leap/gui/tests/test_threads.py
@@ -0,0 +1,27 @@
+import unittest
+
+import mock
+from leap.gui import threads
+
+
+class FunThreadTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.fun = mock.MagicMock()
+ self.fun.return_value = "foo"
+ self.t = threads.FunThread(fun=self.fun)
+
+ def test_thread(self):
+ self.t.begin()
+ self.t.wait()
+ self.fun.assert_called()
+ del self.t
+
+ def test_run(self):
+ # this is called by PyQt
+ self.t.run()
+ del self.t
+ self.fun.assert_called()
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/testing/pyqt.py b/src/leap/testing/pyqt.py
new file mode 100644
index 00000000..6edaf059
--- /dev/null
+++ b/src/leap/testing/pyqt.py
@@ -0,0 +1,52 @@
+from PyQt4 import QtCore
+
+_oldConnect = QtCore.QObject.connect
+_oldDisconnect = QtCore.QObject.disconnect
+_oldEmit = QtCore.QObject.emit
+
+
+def _wrapConnect(callableObject):
+ """
+ Returns a wrapped call to the old version of QtCore.QObject.connect
+ """
+ @staticmethod
+ def call(*args):
+ callableObject(*args)
+ _oldConnect(*args)
+ return call
+
+
+def _wrapDisconnect(callableObject):
+ """
+ Returns a wrapped call to the old version of QtCore.QObject.disconnect
+ """
+ @staticmethod
+ def call(*args):
+ callableObject(*args)
+ _oldDisconnect(*args)
+ return call
+
+
+def enableSignalDebugging(**kwargs):
+ """
+ Call this to enable Qt Signal debugging. This will trap all
+ connect, and disconnect calls.
+ """
+
+ f = lambda *args: None
+ connectCall = kwargs.get('connectCall', f)
+ disconnectCall = kwargs.get('disconnectCall', f)
+ emitCall = kwargs.get('emitCall', f)
+
+ def printIt(msg):
+ def call(*args):
+ print msg, args
+ return call
+ QtCore.QObject.connect = _wrapConnect(connectCall)
+ QtCore.QObject.disconnect = _wrapDisconnect(disconnectCall)
+
+ def new_emit(self, *args):
+ emitCall(self, *args)
+ _oldEmit(self, *args)
+
+ QtCore.QObject.emit = new_emit
diff --git a/src/leap/testing/qunittest.py b/src/leap/testing/qunittest.py
new file mode 100644
index 00000000..b89ccec3
--- /dev/null
+++ b/src/leap/testing/qunittest.py
@@ -0,0 +1,302 @@
+# -*- coding: utf-8 -*-
+
+# **qunittest** is an standard Python `unittest` enhancement for PyQt4,
+# allowing
+# you to test asynchronous code using standard synchronous testing facility.
+#
+# The source for `qunittest` is available on [GitHub][gh], and released under
+# the MIT license.
+#
+# Slightly modified by The Leap Project.
+
+### Prerequisites
+
+# Import unittest2 or unittest
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+# ... and some standard Python libraries
+import sys
+import functools
+import contextlib
+import re
+
+# ... and several PyQt classes
+from PyQt4.QtCore import QTimer
+from PyQt4.QtTest import QTest
+from PyQt4 import QtGui
+
+### The code
+
+
+# Override standard main method, by invoking it inside PyQt event loop
+
+def main(*args, **kwargs):
+ qapplication = QtGui.QApplication(sys.argv)
+
+ QTimer.singleShot(0, unittest.main(*args, **kwargs))
+ qapplication.exec_()
+
+"""
+This main substitute does not integrate with unittest.
+
+Note about mixing the event loop and unittests:
+
+Unittest will fail if we keep more than one reference to a QApplication.
+(pyqt expects to be and only one).
+So, for the things that need a QApplication to exist, do something like:
+
+ self.app = QApplication()
+ QtGui.qApp = self.app
+
+in the class setUp, and::
+
+ QtGui.qApp = None
+ self.app = None
+
+in the class tearDown.
+
+For some explanation about this, see
+ http://stuvel.eu/blog/127/multiple-instances-of-qapplication-in-one-process
+and
+ http://www.riverbankcomputing.com/pipermail/pyqt/2010-September/027705.html
+"""
+
+
+# Helper returning the name of a given signal
+
+def _signal_name(signal):
+ s = repr(signal)
+ name_re = "signal (\w+) of (\w+)"
+ match = re.search(name_re, s, re.I)
+ if not match:
+ return "??"
+ return "%s#%s" % (match.group(2), match.group(1))
+
+
+class _SignalConnector(object):
+ """ Encapsulates signal assertion testing """
+ def __init__(self, test, signal, callable_):
+ self.test = test
+ self.callable_ = callable_
+ self.called_with = None
+ self.emited = False
+ self.signal = signal
+ self._asserted = False
+
+ signal.connect(self.on_signal_emited)
+
+ # Store given parameters and mark signal as `emited`
+ def on_signal_emited(self, *args, **kwargs):
+ self.called_with = (args, kwargs)
+ self.emited = True
+
+ def assertEmission(self):
+ # Assert once wheter signal was emited or not
+ was_asserted = self._asserted
+ self._asserted = True
+
+ if not was_asserted:
+ if not self.emited:
+ self.test.fail(
+ "signal %s not emited" % (_signal_name(self.signal)))
+
+ # Call given callable is necessary
+ if self.callable_:
+ args, kwargs = self.called_with
+ self.callable_(*args, **kwargs)
+
+ def __enter__(self):
+ # Assert emission when context is entered
+ self.assertEmission()
+ return self.called_with
+
+ def __exit__(self, *_):
+ return False
+
+### Unit Testing
+
+# `qunittest` does not force much abould how test should look - it just adds
+# several helpers for asynchronous code testing.
+#
+# Common test case may look like this:
+#
+# import qunittest
+# from calculator import Calculator
+#
+# class TestCalculator(qunittest.TestCase):
+# def setUp(self):
+# self.calc = Calculator()
+#
+# def test_should_add_two_numbers_synchronously(self):
+# # given
+# a, b = 2, 3
+#
+# # when
+# r = self.calc.add(a, b)
+#
+# # then
+# self.assertEqual(5, r)
+#
+# def test_should_calculate_factorial_in_background(self):
+# # given
+#
+# # when
+# self.calc.factorial(20)
+#
+# # then
+# self.assertEmited(self.calc.done) with (args, kwargs):
+# self.assertEqual([2432902008176640000], args)
+#
+# if __name__ == "__main__":
+# main()
+#
+# Test can be run by typing:
+#
+# python test_calculator.py
+#
+# Automatic test discovery is not supported now, because testing PyQt needs
+# an instance of `QApplication` and its `exec_` method is blocking.
+#
+
+
+### TestCase class
+
+class TestCase(unittest.TestCase):
+ """
+ Extends standard `unittest.TestCase` with several PyQt4 testing features
+ useful for asynchronous testing.
+ """
+ def __init__(self, *args, **kwargs):
+ super(TestCase, self).__init__(*args, **kwargs)
+
+ self._clearSignalConnectors()
+ self._succeeded = False
+ self.addCleanup(self._clearSignalConnectors)
+ self.tearDown = self._decorateTearDown(self.tearDown)
+
+ ### Protected methods
+
+ def _clearSignalConnectors(self):
+ self._connectedSignals = []
+
+ def _decorateTearDown(self, tearDown):
+ @functools.wraps(tearDown)
+ def decorator():
+ self._ensureEmitedSignals()
+ return tearDown()
+ return decorator
+
+ def _ensureEmitedSignals(self):
+ """
+ Checks if signals were acually emited. Raises AssertionError if no.
+ """
+ # TODO: add information about line
+ for signal in self._connectedSignals:
+ signal.assertEmission()
+
+ ### Assertions
+
+ def assertEmited(self, signal, callable_=None, timeout=1):
+ """
+ Asserts if given `signal` was emited. Waits 1 second by default,
+ before asserts signal emission.
+
+ If `callable_` is given, it should be a function which takes two
+ arguments: `args` and `kwargs`. It will be called after blocking
+ operation or when assertion about signal emission is made and
+ signal was emited.
+
+ When timeout is not `False`, method call is blocking, and ends
+ after `timeout` seconds. After that time, it validates wether
+ signal was emited.
+
+ When timeout is `False`, method is non blocking, and test should wait
+ for signals afterwards. Otherwise, at the end of the test, all
+ signal emissions are checked if appeared.
+
+ Function returns context, which yields to list of parameters given
+ to signal. It can be useful for testing given parameters. Following
+ code:
+
+ with self.assertEmited(widget.signal) as (args, kwargs):
+ self.assertEqual(1, len(args))
+ self.assertEqual("Hello World!", args[0])
+
+ will wait 1 second and test for correct parameters, is signal was
+ emtied.
+
+ Note that code:
+
+ with self.assertEmited(widget.signal, timeout=False) as (a, k):
+ # Will not be invoked
+
+ will always fail since signal cannot be emited in the time of its
+ connection - code inside the context will not be invoked at all.
+ """
+
+ connector = _SignalConnector(self, signal, callable_)
+ self._connectedSignals.append(connector)
+ if timeout:
+ self.waitFor(timeout)
+ connector.assertEmission()
+
+ return connector
+
+ ### Helper methods
+
+ @contextlib.contextmanager
+ def invokeAfter(self, seconds, callable_=None):
+ """
+ Waits given amount of time and executes the context.
+
+ If `callable_` is given, executes it, instead of context.
+ """
+ self.waitFor(seconds)
+ if callable_:
+ callable_()
+ else:
+ yield
+
+ def waitFor(self, seconds):
+ """
+ Waits given amount of time.
+
+ self.widget.loadImage(url)
+ self.waitFor(seconds=10)
+ """
+ QTest.qWait(seconds * 1000)
+
+ def succeed(self, bool_=True):
+ """ Marks test as suceeded for next `failAfter()` invocation. """
+ self._succeeded = self._succeeded or bool_
+
+ def failAfter(self, seconds, message=None):
+ """
+ Waits given amount of time, and fails the test if `succeed(bool)`
+ is not called - in most common case, `succeed(bool)` should be called
+ asynchronously (in signal handler):
+
+ self.widget.signal.connect(lambda: self.succeed())
+ self.failAfter(1, "signal not emited?")
+
+ After invocation, test is no longer consider as succeeded.
+ """
+ self.waitFor(seconds)
+ if not self._succeeded:
+ self.fail(message)
+
+ self._succeeded = False
+
+### Credits
+#
+# * **Who is responsible:** [Dawid Fatyga][df]
+# * **Source:** [GitHub][gh]
+# * **Doc. generator:** [rocco][ro]
+#
+# [gh]: https://www.github.com/dejw/qunittest
+# [df]: https://github.com/dejw
+# [ro]: http://rtomayko.github.com/rocco/
+#
diff --git a/src/leap/util/web.py b/src/leap/util/web.py
index b2aef058..15de0561 100644
--- a/src/leap/util/web.py
+++ b/src/leap/util/web.py
@@ -13,6 +13,7 @@ def get_https_domain_and_port(full_domain):
from a full_domain string that can
contain a colon
"""
+ full_domain = unicode(full_domain)
if full_domain is None:
return None, None