diff options
author | Kali Kaneko <kali@leap.se> | 2013-06-26 05:00:51 +0900 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2013-06-26 05:01:25 +0900 |
commit | 3a0de3f36929f06edc9fa8a5c9e7b5471976a03c (patch) | |
tree | 7d42a12c470dc1315e5e66214c68a862dba00fb0 /src/leap/crypto/tests/test_srpregister.py | |
parent | 04a0fa945a0ec239960255bc2a0f2f01a5b4a224 (diff) | |
parent | bc3652f5c51bdd414d85a2388ee6cba757eca19c (diff) |
Merge remote-tracking branch 'chiiph/feature/fix_test_srpregister' into develop
Nice reworking, all things deferreds!
Note that in this commit we're removing a couple of tests that should
go in test_srpauth... soon.
Diffstat (limited to 'src/leap/crypto/tests/test_srpregister.py')
-rw-r--r-- | src/leap/crypto/tests/test_srpregister.py | 139 |
1 files changed, 67 insertions, 72 deletions
diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py index 5ba7306f..f70382ce 100644 --- a/src/leap/crypto/tests/test_srpregister.py +++ b/src/leap/crypto/tests/test_srpregister.py @@ -27,8 +27,9 @@ import os import sys from mock import MagicMock -from nose.twistedtools import reactor, threaded_reactor, stop_reactor +from nose.twistedtools import reactor, deferred from twisted.python import log +from twisted.internet import threads from leap.common.testing.https_server import where from leap.config.providerconfig import ProviderConfig @@ -89,19 +90,6 @@ class SRPTestCase(unittest.TestCase): cls.register = srpregister.SRPRegister(provider_config=provider) cls.auth = srpauth.SRPAuth(provider) - cls._auth_instance = cls.auth.__dict__['_SRPAuth__instance'] - cls.authenticate = cls._auth_instance.authenticate - cls.logout = cls._auth_instance.logout - - # run! - threaded_reactor() - - @classmethod - def tearDownClass(cls): - """ - Stops reactor when tearing down the class - """ - stop_reactor() # helper methods @@ -114,6 +102,41 @@ class SRPTestCase(unittest.TestCase): # Register tests + def test_none_port(self): + provider = ProviderConfig() + provider.get_api_uri = MagicMock() + provider.get_api_uri.return_value = "http://localhost/" + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + + register = srpregister.SRPRegister(provider_config=provider) + self.assertEquals(register._port, "443") + + @deferred() + def test_wrong_cert(self): + provider = ProviderConfig() + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = os.path.join( + _here, + "wrongcacert.pem") + provider.get_api_uri = MagicMock() + provider.get_api_uri.return_value = self._get_https_uri() + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + + register = srpregister.SRPRegister(provider_config=provider) + d = threads.deferToThread(register.register_user, "foouser_firsttime", + "barpass") + d.addCallback(self.assertFalse) + return d + + @deferred() def test_register_user(self): """ Checks if the registration of an unused name works as expected when @@ -121,17 +144,31 @@ class SRPTestCase(unittest.TestCase): when we request a user that is taken. """ # pristine registration - ok = self.register.register_user("foouser_firsttime", "barpass") - self.assertTrue(ok) - + d = threads.deferToThread(self.register.register_user, + "foouser_firsttime", + "barpass") + d.addCallback(self.assertTrue) + return d + + @deferred() + def test_second_register_user(self): # second registration attempt with the same user should return errors - ok = self.register.register_user("foouser_second", "barpass") - self.assertTrue(ok) + d = threads.deferToThread(self.register.register_user, + "foouser_second", + "barpass") + d.addCallback(self.assertTrue) # FIXME currently we are catching this in an upper layer, # we could bring the error validation to the SRPRegister class - ok = self.register.register_user("foouser_second", "barpass") - + def register_wrapper(_): + return threads.deferToThread(self.register.register_user, + "foouser_second", + "barpass") + d.addCallback(register_wrapper) + d.addCallback(self.assertFalse) + return d + + @deferred() def test_correct_http_uri(self): """ Checks that registration autocorrect http uris to https ones. @@ -151,57 +188,15 @@ class SRPTestCase(unittest.TestCase): raise ImproperlyConfiguredError( "Could not load test provider config") - self.register = srpregister.SRPRegister(provider_config=provider) + register = srpregister.SRPRegister(provider_config=provider) # ... and we check that we're correctly taking the HTTPS protocol # instead - self.assertEquals(self.register._get_registration_uri(), - HTTPS_URI) - ok = self.register.register_user("test_failhttp", "barpass") - self.assertTrue(ok) - - # XXX need to assert that _get_registration_uri was called too - - # Auth tests - - def test_auth(self): - """ - Checks whether a pair of valid credentials is able to be authenticated. - """ - TEST_USER = "register_test_auth" - TEST_PASS = "pass" - - # pristine registration, should go well - ok = self.register.register_user(TEST_USER, TEST_PASS) - self.assertTrue(ok) - - self.authenticate(TEST_USER, TEST_PASS) - with self.assertRaises(AssertionError): - # AssertionError: already logged in - # We probably could take this as its own exception - self.authenticate(TEST_USER, TEST_PASS) - - self.logout() - - # cannot log out two times in a row (there's no session) - with self.assertRaises(AssertionError): - self.logout() - - def test_auth_with_bad_credentials(self): - """ - Checks that auth does not succeed with bad credentials. - """ - TEST_USER = "register_test_auth" - TEST_PASS = "pass" - - # non-existent credentials, should fail - with self.assertRaises(srpauth.SRPAuthenticationError): - self.authenticate("baduser_1", "passwrong") - - # good user, bad password, should fail - with self.assertRaises(srpauth.SRPAuthenticationError): - self.authenticate(TEST_USER, "passwrong") - - # bad user, good password, should fail too :) - with self.assertRaises(srpauth.SRPAuthenticationError): - self.authenticate("myunclejoe", TEST_PASS) + reg_uri = register._get_registration_uri() + self.assertEquals(reg_uri, HTTPS_URI) + register._get_registration_uri = MagicMock(return_value=HTTPS_URI) + d = threads.deferToThread(register.register_user, "test_failhttp", + "barpass") + d.addCallback(self.assertTrue) + + return d |