summaryrefslogtreecommitdiff
path: root/src/leap/crypto/tests/test_srpregister.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/crypto/tests/test_srpregister.py')
-rw-r--r--src/leap/crypto/tests/test_srpregister.py139
1 files changed, 67 insertions, 72 deletions
diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py
index 5ba7306f..f70382ce 100644
--- a/src/leap/crypto/tests/test_srpregister.py
+++ b/src/leap/crypto/tests/test_srpregister.py
@@ -27,8 +27,9 @@ import os
import sys
from mock import MagicMock
-from nose.twistedtools import reactor, threaded_reactor, stop_reactor
+from nose.twistedtools import reactor, deferred
from twisted.python import log
+from twisted.internet import threads
from leap.common.testing.https_server import where
from leap.config.providerconfig import ProviderConfig
@@ -89,19 +90,6 @@ class SRPTestCase(unittest.TestCase):
cls.register = srpregister.SRPRegister(provider_config=provider)
cls.auth = srpauth.SRPAuth(provider)
- cls._auth_instance = cls.auth.__dict__['_SRPAuth__instance']
- cls.authenticate = cls._auth_instance.authenticate
- cls.logout = cls._auth_instance.logout
-
- # run!
- threaded_reactor()
-
- @classmethod
- def tearDownClass(cls):
- """
- Stops reactor when tearing down the class
- """
- stop_reactor()
# helper methods
@@ -114,6 +102,41 @@ class SRPTestCase(unittest.TestCase):
# Register tests
+ def test_none_port(self):
+ provider = ProviderConfig()
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = "http://localhost/"
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+ self.assertEquals(register._port, "443")
+
+ @deferred()
+ def test_wrong_cert(self):
+ provider = ProviderConfig()
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = os.path.join(
+ _here,
+ "wrongcacert.pem")
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = self._get_https_uri()
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+ d = threads.deferToThread(register.register_user, "foouser_firsttime",
+ "barpass")
+ d.addCallback(self.assertFalse)
+ return d
+
+ @deferred()
def test_register_user(self):
"""
Checks if the registration of an unused name works as expected when
@@ -121,17 +144,31 @@ class SRPTestCase(unittest.TestCase):
when we request a user that is taken.
"""
# pristine registration
- ok = self.register.register_user("foouser_firsttime", "barpass")
- self.assertTrue(ok)
-
+ d = threads.deferToThread(self.register.register_user,
+ "foouser_firsttime",
+ "barpass")
+ d.addCallback(self.assertTrue)
+ return d
+
+ @deferred()
+ def test_second_register_user(self):
# second registration attempt with the same user should return errors
- ok = self.register.register_user("foouser_second", "barpass")
- self.assertTrue(ok)
+ d = threads.deferToThread(self.register.register_user,
+ "foouser_second",
+ "barpass")
+ d.addCallback(self.assertTrue)
# FIXME currently we are catching this in an upper layer,
# we could bring the error validation to the SRPRegister class
- ok = self.register.register_user("foouser_second", "barpass")
-
+ def register_wrapper(_):
+ return threads.deferToThread(self.register.register_user,
+ "foouser_second",
+ "barpass")
+ d.addCallback(register_wrapper)
+ d.addCallback(self.assertFalse)
+ return d
+
+ @deferred()
def test_correct_http_uri(self):
"""
Checks that registration autocorrect http uris to https ones.
@@ -151,57 +188,15 @@ class SRPTestCase(unittest.TestCase):
raise ImproperlyConfiguredError(
"Could not load test provider config")
- self.register = srpregister.SRPRegister(provider_config=provider)
+ register = srpregister.SRPRegister(provider_config=provider)
# ... and we check that we're correctly taking the HTTPS protocol
# instead
- self.assertEquals(self.register._get_registration_uri(),
- HTTPS_URI)
- ok = self.register.register_user("test_failhttp", "barpass")
- self.assertTrue(ok)
-
- # XXX need to assert that _get_registration_uri was called too
-
- # Auth tests
-
- def test_auth(self):
- """
- Checks whether a pair of valid credentials is able to be authenticated.
- """
- TEST_USER = "register_test_auth"
- TEST_PASS = "pass"
-
- # pristine registration, should go well
- ok = self.register.register_user(TEST_USER, TEST_PASS)
- self.assertTrue(ok)
-
- self.authenticate(TEST_USER, TEST_PASS)
- with self.assertRaises(AssertionError):
- # AssertionError: already logged in
- # We probably could take this as its own exception
- self.authenticate(TEST_USER, TEST_PASS)
-
- self.logout()
-
- # cannot log out two times in a row (there's no session)
- with self.assertRaises(AssertionError):
- self.logout()
-
- def test_auth_with_bad_credentials(self):
- """
- Checks that auth does not succeed with bad credentials.
- """
- TEST_USER = "register_test_auth"
- TEST_PASS = "pass"
-
- # non-existent credentials, should fail
- with self.assertRaises(srpauth.SRPAuthenticationError):
- self.authenticate("baduser_1", "passwrong")
-
- # good user, bad password, should fail
- with self.assertRaises(srpauth.SRPAuthenticationError):
- self.authenticate(TEST_USER, "passwrong")
-
- # bad user, good password, should fail too :)
- with self.assertRaises(srpauth.SRPAuthenticationError):
- self.authenticate("myunclejoe", TEST_PASS)
+ reg_uri = register._get_registration_uri()
+ self.assertEquals(reg_uri, HTTPS_URI)
+ register._get_registration_uri = MagicMock(return_value=HTTPS_URI)
+ d = threads.deferToThread(register.register_user, "test_failhttp",
+ "barpass")
+ d.addCallback(self.assertTrue)
+
+ return d