From d193fee401d606f6120ac11819a0127e7ee92458 Mon Sep 17 00:00:00 2001 From: kali Date: Tue, 26 Mar 2013 01:15:44 +0900 Subject: tests for srpregister and srpauth in this commit too, the twisted fake_provider implementation --- src/leap/crypto/srpauth.py | 31 +-- src/leap/crypto/srpregister.py | 25 ++- src/leap/crypto/tests/__init__.py | 16 ++ src/leap/crypto/tests/fake_provider.py | 333 ++++++++++++++++++++++++++++++ src/leap/crypto/tests/test.txt | 1 + src/leap/crypto/tests/test_provider.json | 15 ++ src/leap/crypto/tests/test_srpauth.py | 136 ++++++++++++ src/leap/crypto/tests/test_srpregister.py | 142 +++++++++++++ 8 files changed, 677 insertions(+), 22 deletions(-) create mode 100644 src/leap/crypto/tests/__init__.py create mode 100755 src/leap/crypto/tests/fake_provider.py create mode 100644 src/leap/crypto/tests/test.txt create mode 100644 src/leap/crypto/tests/test_provider.json create mode 100644 src/leap/crypto/tests/test_srpauth.py create mode 100644 src/leap/crypto/tests/test_srpregister.py (limited to 'src/leap/crypto') diff --git a/src/leap/crypto/srpauth.py b/src/leap/crypto/srpauth.py index 152d77b5..027ee0d7 100644 --- a/src/leap/crypto/srpauth.py +++ b/src/leap/crypto/srpauth.py @@ -272,7 +272,14 @@ class SRPAuth(QtCore.QObject): "failed")) logger.debug("Session verified.") - self.set_session_id(self._session.cookies["_session_id"]) + SESSION_ID_KEY = "_session_id" + session_id = self._session.cookies.get(SESSION_ID_KEY, None) + if not session_id: + logger.error("Bad cookie from server (missing _session_id)") + raise SRPAuthenticationError(self.tr("Session cookie " + "verification " + "failed")) + self.set_session_id(session_id) def authenticate(self, username, password): """ @@ -409,11 +416,18 @@ class SRPAuth(QtCore.QObject): if __name__ == "__main__": + import signal import sys + from functools import partial app = QtGui.QApplication(sys.argv) - import signal + if not len(sys.argv) == 3: + print 'Usage: srpauth.py ' + sys.exit(0) + + _user = sys.argv[1] + _pass = sys.argv[2] def sigint_handler(*args, **kwargs): logger.debug('SIGINT catched. shutting down...') @@ -452,20 +466,9 @@ if __name__ == "__main__": provider = ProviderConfig() if provider.load("leap/providers/bitmask.net/provider.json"): - # url = "%s/tickets" % (provider.get_api_uri(),) - # print url - # res = requests.session().get(url, verify=provider.get_ca_cert_path()) - # print res.content - # res.raise_for_status() auth = SRPAuth(provider) - auth_instantiated = partial(auth.authenticate, "test2", "sarasaaaa") + auth_instantiated = partial(auth.authenticate, _user, _pass) checker.add_checks([auth_instantiated, auth.logout]) - #auth.authenticate("test2", "sarasaaaa") - #res = requests.session().get("%s/cert" % (provider.get_api_uri(),), - #verify=provider.get_ca_cert_path()) - #print res.content - #auth.logout() - sys.exit(app.exec_()) diff --git a/src/leap/crypto/srpregister.py b/src/leap/crypto/srpregister.py index 9a9cac76..dc137aeb 100644 --- a/src/leap/crypto/srpregister.py +++ b/src/leap/crypto/srpregister.py @@ -55,7 +55,7 @@ class SRPRegister(QtCore.QObject): @type register_path; str """ QtCore.QObject.__init__(self) - leap_assert(provider_config, "Please provider a provider") + leap_assert(provider_config, "Please provide a provider") leap_assert_type(provider_config, ProviderConfig) self._provider_config = provider_config @@ -125,15 +125,24 @@ class SRPRegister(QtCore.QObject): logger.debug("Will try to register user = %s" % (username,)) logger.debug("user_data => %r" % (user_data,)) - req = self._session.post(uri, - data=user_data, - timeout=SIGNUP_TIMEOUT, - verify=self._provider_config. - get_ca_cert_path()) + try: + req = self._session.post(uri, + data=user_data, + timeout=SIGNUP_TIMEOUT, + verify=self._provider_config. + get_ca_cert_path()) - self.registration_finished.emit(req.ok, req) + except requests.exceptions.SSLError as exc: + logger.error("SSLError: %s" % exc.message) + _ok = False + req = None - return req.ok + else: + _ok = req.ok + + self.registration_finished.emit(_ok, req) + + return _ok if __name__ == "__main__": diff --git a/src/leap/crypto/tests/__init__.py b/src/leap/crypto/tests/__init__.py new file mode 100644 index 00000000..7f118735 --- /dev/null +++ b/src/leap/crypto/tests/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py new file mode 100755 index 00000000..4b05bbff --- /dev/null +++ b/src/leap/crypto/tests/fake_provider.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# fake_provider.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""A server faking some of the provider resources and apis, +used for testing Leap Client requests + +It needs that you create a subfolder named 'certs', +and that you place the following files: + +XXX check if in use + +[ ] test-openvpn.pem +[ ] test-provider.json +[ ] test-eip-service.json +""" +import binascii +import json +import os +import sys + +import srp + +from OpenSSL import SSL + +from zope.interface import Interface, Attribute, implements + +from twisted.web.server import Site, Request +from twisted.web.static import File +from twisted.web.resource import Resource +from twisted.internet import reactor + +from leap.common.testing.https_server import where + +# See +# http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.html +# for more examples + +""" +Testing the FAKE_API: +##################### + + 1) register an user + >> curl -d "user[login]=me" -d "user[password_salt]=foo" \ + -d "user[password_verifier]=beef" http://localhost:8000/1/users + << {"errors": null} + + 2) check that if you try to register again, it will fail: + >> curl -d "user[login]=me" -d "user[password_salt]=foo" \ + -d "user[password_verifier]=beef" http://localhost:8000/1/users + << {"errors": {"login": "already taken!"}} + +""" + +# Globals to mock user/sessiondb + +_USERDB = {} +_SESSIONDB = {} + +_here = os.path.split(__file__)[0] + + +safe_unhexlify = lambda x: binascii.unhexlify(x) \ + if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) + + +class IUser(Interface): + login = Attribute("User login.") + salt = Attribute("Password salt.") + verifier = Attribute("Password verifier.") + session = Attribute("Session.") + svr = Attribute("Server verifier.") + + +class User(object): + + implements(IUser) + + def __init__(self, login, salt, verifier): + self.login = login + self.salt = salt + self.verifier = verifier + self.session = None + + def set_server_verifier(self, svr): + self.svr = svr + + def set_session(self, session): + _SESSIONDB[session] = self + self.session = session + + +class FakeUsers(Resource): + def __init__(self, name): + self.name = name + + def render_POST(self, request): + args = request.args + + login = args['user[login]'][0] + salt = args['user[password_salt]'][0] + verifier = args['user[password_verifier]'][0] + + if login in _USERDB: + return "%s\n" % json.dumps( + {'errors': {'login': 'already taken!'}}) + + print '[server]', login, verifier, salt + user = User(login, salt, verifier) + _USERDB[login] = user + return json.dumps({'errors': None}) + + +def getSession(self, sessionInterface=None): + """ + we overwrite twisted.web.server.Request.getSession method to + put the right cookie name in place + """ + if not self.session: + #cookiename = b"_".join([b'TWISTED_SESSION'] + self.sitepath) + cookiename = b"_".join([b'_session_id'] + self.sitepath) + sessionCookie = self.getCookie(cookiename) + if sessionCookie: + try: + self.session = self.site.getSession(sessionCookie) + except KeyError: + pass + # if it still hasn't been set, fix it up. + if not self.session: + self.session = self.site.makeSession() + self.addCookie(cookiename, self.session.uid, path=b'/') + self.session.touch() + if sessionInterface: + return self.session.getComponent(sessionInterface) + return self.session + + +def get_user(request): + """ + Returns user from the session dict + """ + login = request.args.get('login') + if login: + user = _USERDB.get(login[0], None) + if user: + return user + + request.getSession = getSession.__get__(request, Request) + session = request.getSession() + + user = _SESSIONDB.get(session, None) + return user + + +class FakeSession(Resource): + def __init__(self, name): + """ + Initializes session + """ + self.name = name + + def render_GET(self, request): + """ + Handles GET requests. + """ + return "%s\n" % json.dumps({'errors': None}) + + def render_POST(self, request): + """ + Handles POST requests. + """ + user = get_user(request) + + if not user: + # XXX get real error from demo provider + return json.dumps({'errors': 'no such user'}) + + A = request.args['A'][0] + + _A = safe_unhexlify(A) + _salt = safe_unhexlify(user.salt) + _verifier = safe_unhexlify(user.verifier) + + svr = srp.Verifier( + user.login, + _salt, + _verifier, + _A, + hash_alg=srp.SHA256, + ng_type=srp.NG_1024) + + s, B = svr.get_challenge() + + _B = binascii.hexlify(B) + + print '[server] login = %s' % user.login + print '[server] salt = %s' % user.salt + print '[server] len(_salt) = %s' % len(_salt) + print '[server] vkey = %s' % user.verifier + print '[server] len(vkey) = %s' % len(_verifier) + print '[server] s = %s' % binascii.hexlify(s) + print '[server] B = %s' % _B + print '[server] len(B) = %s' % len(_B) + + # override Request.getSession + request.getSession = getSession.__get__(request, Request) + session = request.getSession() + + user.set_session(session) + user.set_server_verifier(svr) + + # yep, this is tricky. + # some things are *already* unhexlified. + data = { + 'salt': user.salt, + 'B': _B, + 'errors': None} + + return json.dumps(data) + + def render_PUT(self, request): + """ + Handles PUT requests. + """ + # XXX check session??? + user = get_user(request) + + if not user: + print '[server] NO USER' + return json.dumps({'errors': 'no such user'}) + + data = request.content.read() + auth = data.split("client_auth=") + M = auth[1] if len(auth) > 1 else None + # if not H, return + if not M: + return json.dumps({'errors': 'no M proof passed by client'}) + + svr = user.svr + HAMK = svr.verify_session(binascii.unhexlify(M)) + if HAMK is None: + print '[server] verification failed!!!' + raise Exception("Authentication failed!") + #import ipdb;ipdb.set_trace() + + assert svr.authenticated() + print "***" + print '[server] User successfully authenticated using SRP!' + print "***" + + return json.dumps( + {'M2': binascii.hexlify(HAMK), + 'id': '9c943eb9d96a6ff1b7a7030bdeadbeef', + 'errors': None}) + + +class API_Sessions(Resource): + def getChild(self, name, request): + return FakeSession(name) + + +class OpenSSLServerContextFactory: + + def getContext(self): + """ + Create an SSL context. + """ + ctx = SSL.Context(SSL.SSLv23_METHOD) + #ctx = SSL.Context(SSL.TLSv1_METHOD) + ctx.use_certificate_file(where('leaptestscert.pem')) + ctx.use_privatekey_file(where('leaptestskey.pem')) + + return ctx + + +def get_provider_factory(): + """ + Instantiates a Site that serves the resources + that we expect from a valid provider. + Listens on: + * port 8000 for http connections + * port 8443 for https connections + + @rparam: factory for a site + @rtype: Site instance + """ + root = Resource() + root.putChild("provider.json", File( + os.path.join(_here, + "test_provider.json"))) + config = Resource() + config.putChild( + "eip-service.json", + File("./eip-service.json")) + apiv1 = Resource() + apiv1.putChild("config", config) + apiv1.putChild("sessions", API_Sessions()) + apiv1.putChild("users", FakeUsers(None)) + apiv1.putChild("cert", File( + os.path.join(_here, + 'openvpn.pem'))) + root.putChild("1", apiv1) + + factory = Site(root) + return factory + + +if __name__ == "__main__": + + from twisted.python import log + log.startLogging(sys.stdout) + + factory = get_provider_factory() + + # regular http (for debugging with curl) + reactor.listenTCP(8000, factory) + reactor.listenSSL(8443, factory, OpenSSLServerContextFactory()) + reactor.run() + + diff --git a/src/leap/crypto/tests/test.txt b/src/leap/crypto/tests/test.txt new file mode 100644 index 00000000..d6406617 --- /dev/null +++ b/src/leap/crypto/tests/test.txt @@ -0,0 +1 @@ +OK! diff --git a/src/leap/crypto/tests/test_provider.json b/src/leap/crypto/tests/test_provider.json new file mode 100644 index 00000000..c37bef8f --- /dev/null +++ b/src/leap/crypto/tests/test_provider.json @@ -0,0 +1,15 @@ +{ + "api_uri": "https://localhost:8443", + "api_version": "1", + "ca_cert_fingerprint": "SHA256: 0f17c033115f6b76ff67871872303ff65034efe7dd1b910062ca323eb4da5c7e", + "ca_cert_uri": "https://bitmask.net/ca.crt", + "default_language": "en", + "domain": "example.com", + "enrollment_policy": "open", + "name": { + "en": "Bitmask" + }, + "services": [ + "openvpn" + ] +} diff --git a/src/leap/crypto/tests/test_srpauth.py b/src/leap/crypto/tests/test_srpauth.py new file mode 100644 index 00000000..ce9403c7 --- /dev/null +++ b/src/leap/crypto/tests/test_srpauth.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# test_srpauth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for leap/crypto/srpauth.py +""" +try: + import unittest +except ImportError: + import unittest +import os +import sys + +from mock import MagicMock +from nose.twistedtools import reactor, threaded_reactor, stop_reactor +from twisted.python import log + +from leap.common.testing.https_server import where +from leap.config.providerconfig import ProviderConfig +from leap.crypto import srpauth +from leap.crypto import srpregister +from leap.crypto.tests import fake_provider + +log.startLogging(sys.stdout) + + +def _get_capath(): + return where("cacert.pem") + +_here = os.path.split(__file__)[0] + + +class ImproperlyConfiguredError(Exception): + """ + Raised if the test provider is missing configuration + """ + + +class SRPRegisterTestCase(unittest.TestCase): + """ + Tests for the SRP Authentication class + """ + __name__ = "SRPAuth tests" + + @classmethod + def setUpClass(cls): + """ + Sets up this TestCase with a simple and faked provider instance: + + * runs a threaded reactor + * loads a mocked ProviderConfig that points to the certs in the + leap.common.testing module. + """ + factory = fake_provider.get_provider_factory() + reactor.listenTCP(8000, factory) + reactor.listenSSL( + 8443, factory, + fake_provider.OpenSSLServerContextFactory()) + threaded_reactor() + + provider = ProviderConfig() + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = _get_capath() + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + cls.provider = provider + cls.register = srpregister.SRPRegister(provider_config=provider) + cls.auth = srpauth.SRPAuth(provider) + cls._auth_instance = cls.auth.__dict__['_SRPAuth__instance'] + cls.authenticate = cls._auth_instance.authenticate + cls.logout = cls._auth_instance.logout + + @classmethod + def tearDownClass(cls): + """ + Stops reactor when tearing down the class + """ + stop_reactor() + + def test_auth(self): + """ + Checks whether a pair of valid credentials is able to be authenticated. + """ + TEST_USER = "register_test_auth" + TEST_PASS = "pass" + + # pristine registration, should go well + ok = self.register.register_user(TEST_USER, TEST_PASS) + self.assertTrue(ok) + + self.authenticate(TEST_USER, TEST_PASS) + with self.assertRaises(AssertionError): + # AssertionError: already logged in + # We probably could take this as its own exception + self.authenticate(TEST_USER, TEST_PASS) + + self.logout() + + # cannot log out two times in a row (there's no session) + with self.assertRaises(AssertionError): + self.logout() + + def test_auth_with_bad_credentials(self): + """ + Checks that auth does not succeed with bad credentials. + """ + TEST_USER = "register_test_auth" + TEST_PASS = "pass" + + # non-existent credentials, should fail + with self.assertRaises(srpauth.SRPAuthenticationError): + self.authenticate("baduser_1", "passwrong") + + # good user, bad password, should fail + with self.assertRaises(srpauth.SRPAuthenticationError): + self.authenticate(TEST_USER, "passwrong") + + # bad user, good password, should fail too :) + with self.assertRaises(srpauth.SRPAuthenticationError): + self.authenticate("myunclejoe", TEST_PASS) diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py new file mode 100644 index 00000000..b065958d --- /dev/null +++ b/src/leap/crypto/tests/test_srpregister.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# test_srpregister.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Tests for leap/crypto/srpregister.py +""" +try: + import unittest +except ImportError: + import unittest +import os +import sys + +from mock import MagicMock +from nose.twistedtools import reactor, threaded_reactor, stop_reactor +from twisted.python import log + +from leap.common.testing.https_server import where +from leap.config.providerconfig import ProviderConfig +from leap.crypto import srpregister +from leap.crypto.tests import fake_provider + +log.startLogging(sys.stdout) + + +def _get_capath(): + return where("cacert.pem") + +_here = os.path.split(__file__)[0] + + +class ImproperlyConfiguredError(Exception): + """ + Raised if the test provider is missing configuration + """ + + +class SRPRegisterTestCase(unittest.TestCase): + """ + Tests for the SRP Register class + """ + __name__ = "SRPRegister tests" + + @classmethod + def setUpClass(cls): + """ + Sets up this TestCase with a simple and faked provider instance: + + * runs a threaded reactor + """ + factory = fake_provider.get_provider_factory() + reactor.listenTCP(8000, factory) + reactor.listenSSL( + 8443, factory, + fake_provider.OpenSSLServerContextFactory()) + threaded_reactor() + + def setUp(self): + """ + Sets up common parameters for each test: + + * loads a mocked ProviderConfig that points to the certs in the + leap.common.testing module. + """ + provider = ProviderConfig() + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = _get_capath() + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + self.register = srpregister.SRPRegister(provider_config=provider) + + @classmethod + def tearDownClass(cls): + """ + Stops reactor when tearing down the class + """ + stop_reactor() + + def test_register_user(self): + """ + Checks if the registration of an unused name works as expected when + it is the first time that we attempt to register that user, as well as + when we request a user that is taken. + """ + # pristine registration + ok = self.register.register_user("foouser_firsttime", "barpass") + self.assertTrue(ok) + + # second registration attempt with the same user should return errors + ok = self.register.register_user("foouser_second", "barpass") + self.assertTrue(ok) + + # FIXME currently we are catching this in an upper layer, + # we could bring the error validation to the SRPRegister class + ok = self.register.register_user("foouser_second", "barpass") + # XXX + #self.assertFalse(ok) + + def test_correct_http_uri(self): + """ + Checks that registration autocorrect http uris to https ones. + """ + HTTP_URI = "http://localhost:8443" + HTTPS_URI = "https://localhost:8443/1/users" + provider = ProviderConfig() + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = _get_capath() + provider.get_api_uri = MagicMock() + + # we introduce a http uri in the config file... + provider.get_api_uri.return_value = HTTP_URI + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + self.register = srpregister.SRPRegister(provider_config=provider) + + # ... and we check that we're correctly taking the HTTPS protocol + # instead + self.assertEquals(self.register._get_registration_uri(), + HTTPS_URI) + ok = self.register.register_user("test_failhttp", "barpass") + self.assertTrue(ok) + + # XXX need to assert that _get_registration_uri was called too -- cgit v1.2.3 From 05fe7f44a899288a8a69b9a46793513b87f8d228 Mon Sep 17 00:00:00 2001 From: kali Date: Tue, 26 Mar 2013 02:55:55 +0900 Subject: workaround for srp server timing out on consecutive runs --- src/leap/crypto/tests/fake_provider.py | 2 - src/leap/crypto/tests/test_srpauth.py | 136 ------------------------------ src/leap/crypto/tests/test_srpregister.py | 107 ++++++++++++++++++----- 3 files changed, 86 insertions(+), 159 deletions(-) delete mode 100644 src/leap/crypto/tests/test_srpauth.py (limited to 'src/leap/crypto') diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py index 4b05bbff..d3e05812 100755 --- a/src/leap/crypto/tests/fake_provider.py +++ b/src/leap/crypto/tests/fake_provider.py @@ -329,5 +329,3 @@ if __name__ == "__main__": reactor.listenTCP(8000, factory) reactor.listenSSL(8443, factory, OpenSSLServerContextFactory()) reactor.run() - - diff --git a/src/leap/crypto/tests/test_srpauth.py b/src/leap/crypto/tests/test_srpauth.py deleted file mode 100644 index ce9403c7..00000000 --- a/src/leap/crypto/tests/test_srpauth.py +++ /dev/null @@ -1,136 +0,0 @@ -# -*- coding: utf-8 -*- -# test_srpauth.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for leap/crypto/srpauth.py -""" -try: - import unittest -except ImportError: - import unittest -import os -import sys - -from mock import MagicMock -from nose.twistedtools import reactor, threaded_reactor, stop_reactor -from twisted.python import log - -from leap.common.testing.https_server import where -from leap.config.providerconfig import ProviderConfig -from leap.crypto import srpauth -from leap.crypto import srpregister -from leap.crypto.tests import fake_provider - -log.startLogging(sys.stdout) - - -def _get_capath(): - return where("cacert.pem") - -_here = os.path.split(__file__)[0] - - -class ImproperlyConfiguredError(Exception): - """ - Raised if the test provider is missing configuration - """ - - -class SRPRegisterTestCase(unittest.TestCase): - """ - Tests for the SRP Authentication class - """ - __name__ = "SRPAuth tests" - - @classmethod - def setUpClass(cls): - """ - Sets up this TestCase with a simple and faked provider instance: - - * runs a threaded reactor - * loads a mocked ProviderConfig that points to the certs in the - leap.common.testing module. - """ - factory = fake_provider.get_provider_factory() - reactor.listenTCP(8000, factory) - reactor.listenSSL( - 8443, factory, - fake_provider.OpenSSLServerContextFactory()) - threaded_reactor() - - provider = ProviderConfig() - provider.get_ca_cert_path = MagicMock() - provider.get_ca_cert_path.return_value = _get_capath() - loaded = provider.load(path=os.path.join( - _here, "test_provider.json")) - if not loaded: - raise ImproperlyConfiguredError( - "Could not load test provider config") - cls.provider = provider - cls.register = srpregister.SRPRegister(provider_config=provider) - cls.auth = srpauth.SRPAuth(provider) - cls._auth_instance = cls.auth.__dict__['_SRPAuth__instance'] - cls.authenticate = cls._auth_instance.authenticate - cls.logout = cls._auth_instance.logout - - @classmethod - def tearDownClass(cls): - """ - Stops reactor when tearing down the class - """ - stop_reactor() - - def test_auth(self): - """ - Checks whether a pair of valid credentials is able to be authenticated. - """ - TEST_USER = "register_test_auth" - TEST_PASS = "pass" - - # pristine registration, should go well - ok = self.register.register_user(TEST_USER, TEST_PASS) - self.assertTrue(ok) - - self.authenticate(TEST_USER, TEST_PASS) - with self.assertRaises(AssertionError): - # AssertionError: already logged in - # We probably could take this as its own exception - self.authenticate(TEST_USER, TEST_PASS) - - self.logout() - - # cannot log out two times in a row (there's no session) - with self.assertRaises(AssertionError): - self.logout() - - def test_auth_with_bad_credentials(self): - """ - Checks that auth does not succeed with bad credentials. - """ - TEST_USER = "register_test_auth" - TEST_PASS = "pass" - - # non-existent credentials, should fail - with self.assertRaises(srpauth.SRPAuthenticationError): - self.authenticate("baduser_1", "passwrong") - - # good user, bad password, should fail - with self.assertRaises(srpauth.SRPAuthenticationError): - self.authenticate(TEST_USER, "passwrong") - - # bad user, good password, should fail too :) - with self.assertRaises(srpauth.SRPAuthenticationError): - self.authenticate("myunclejoe", TEST_PASS) diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py index b065958d..a59f71cb 100644 --- a/src/leap/crypto/tests/test_srpregister.py +++ b/src/leap/crypto/tests/test_srpregister.py @@ -15,7 +15,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ -Tests for leap/crypto/srpregister.py +Tests for: + * leap/crypto/srpregister.py + * leap/crypto/srpauth.py """ try: import unittest @@ -30,7 +32,7 @@ from twisted.python import log from leap.common.testing.https_server import where from leap.config.providerconfig import ProviderConfig -from leap.crypto import srpregister +from leap.crypto import srpregister, srpauth from leap.crypto.tests import fake_provider log.startLogging(sys.stdout) @@ -48,11 +50,11 @@ class ImproperlyConfiguredError(Exception): """ -class SRPRegisterTestCase(unittest.TestCase): +class SRPTestCase(unittest.TestCase): """ - Tests for the SRP Register class + Tests for the SRP Register and Auth classes """ - __name__ = "SRPRegister tests" + __name__ = "SRPRegister and SRPAuth tests" @classmethod def setUpClass(cls): @@ -60,30 +62,39 @@ class SRPRegisterTestCase(unittest.TestCase): Sets up this TestCase with a simple and faked provider instance: * runs a threaded reactor + * loads a mocked ProviderConfig that points to the certs in the + leap.common.testing module. """ factory = fake_provider.get_provider_factory() - reactor.listenTCP(8000, factory) - reactor.listenSSL( - 8443, factory, + http = reactor.listenTCP(8001, factory) + https = reactor.listenSSL( + 0, factory, fake_provider.OpenSSLServerContextFactory()) - threaded_reactor() - - def setUp(self): - """ - Sets up common parameters for each test: + get_port = lambda p: p.getHost().port + cls.http_port = get_port(http) + cls.https_port = get_port(https) - * loads a mocked ProviderConfig that points to the certs in the - leap.common.testing module. - """ provider = ProviderConfig() provider.get_ca_cert_path = MagicMock() provider.get_ca_cert_path.return_value = _get_capath() + + provider.get_api_uri = MagicMock() + provider.get_api_uri.return_value = cls._get_https_uri() + loaded = provider.load(path=os.path.join( _here, "test_provider.json")) if not loaded: raise ImproperlyConfiguredError( "Could not load test provider config") - self.register = srpregister.SRPRegister(provider_config=provider) + cls.register = srpregister.SRPRegister(provider_config=provider) + + cls.auth = srpauth.SRPAuth(provider) + cls._auth_instance = cls.auth.__dict__['_SRPAuth__instance'] + cls.authenticate = cls._auth_instance.authenticate + cls.logout = cls._auth_instance.logout + + # run! + threaded_reactor() @classmethod def tearDownClass(cls): @@ -92,6 +103,17 @@ class SRPRegisterTestCase(unittest.TestCase): """ stop_reactor() + # helper methods + + @classmethod + def _get_https_uri(cls): + """ + Returns a https uri with the right https port initialized + """ + return "https://localhost:%s" % (cls.https_port,) + + # Register tests + def test_register_user(self): """ Checks if the registration of an unused name works as expected when @@ -109,15 +131,13 @@ class SRPRegisterTestCase(unittest.TestCase): # FIXME currently we are catching this in an upper layer, # we could bring the error validation to the SRPRegister class ok = self.register.register_user("foouser_second", "barpass") - # XXX - #self.assertFalse(ok) def test_correct_http_uri(self): """ Checks that registration autocorrect http uris to https ones. """ - HTTP_URI = "http://localhost:8443" - HTTPS_URI = "https://localhost:8443/1/users" + HTTP_URI = "http://localhost:%s" % (self.https_port, ) + HTTPS_URI = "https://localhost:%s/1/users" % (self.https_port, ) provider = ProviderConfig() provider.get_ca_cert_path = MagicMock() provider.get_ca_cert_path.return_value = _get_capath() @@ -130,6 +150,7 @@ class SRPRegisterTestCase(unittest.TestCase): if not loaded: raise ImproperlyConfiguredError( "Could not load test provider config") + self.register = srpregister.SRPRegister(provider_config=provider) # ... and we check that we're correctly taking the HTTPS protocol @@ -140,3 +161,47 @@ class SRPRegisterTestCase(unittest.TestCase): self.assertTrue(ok) # XXX need to assert that _get_registration_uri was called too + + # Auth tests + + def test_auth(self): + """ + Checks whether a pair of valid credentials is able to be authenticated. + """ + TEST_USER = "register_test_auth" + TEST_PASS = "pass" + + # pristine registration, should go well + ok = self.register.register_user(TEST_USER, TEST_PASS) + self.assertTrue(ok) + + self.authenticate(TEST_USER, TEST_PASS) + with self.assertRaises(AssertionError): + # AssertionError: already logged in + # We probably could take this as its own exception + self.authenticate(TEST_USER, TEST_PASS) + + self.logout() + + # cannot log out two times in a row (there's no session) + with self.assertRaises(AssertionError): + self.logout() + + def test_auth_with_bad_credentials(self): + """ + Checks that auth does not succeed with bad credentials. + """ + TEST_USER = "register_test_auth" + TEST_PASS = "pass" + + # non-existent credentials, should fail + with self.assertRaises(srpauth.SRPAuthenticationError): + self.authenticate("baduser_1", "passwrong") + + # good user, bad password, should fail + with self.assertRaises(srpauth.SRPAuthenticationError): + self.authenticate(TEST_USER, "passwrong") + + # bad user, good password, should fail too :) + with self.assertRaises(srpauth.SRPAuthenticationError): + self.authenticate("myunclejoe", TEST_PASS) -- cgit v1.2.3 From 42593d4c6bda51a544a72abc0f935633939dad49 Mon Sep 17 00:00:00 2001 From: kali Date: Mon, 8 Apr 2013 23:44:22 +0900 Subject: Several fixes as per review --- src/leap/crypto/srpauth.py | 4 ++-- src/leap/crypto/srpregister.py | 12 +++++------- src/leap/crypto/tests/fake_provider.py | 29 ++++++++++++++++++++++++++++- src/leap/crypto/tests/test.txt | 1 - src/leap/crypto/tests/test_srpregister.py | 2 +- 5 files changed, 36 insertions(+), 12 deletions(-) delete mode 100644 src/leap/crypto/tests/test.txt (limited to 'src/leap/crypto') diff --git a/src/leap/crypto/srpauth.py b/src/leap/crypto/srpauth.py index 027ee0d7..8028a6dc 100644 --- a/src/leap/crypto/srpauth.py +++ b/src/leap/crypto/srpauth.py @@ -50,6 +50,7 @@ class SRPAuth(QtCore.QObject): LOGIN_KEY = "login" A_KEY = "A" CLIENT_AUTH_KEY = "client_auth" + SESSION_ID_KEY = "_session_id" def __init__(self, provider_config): """ @@ -272,8 +273,7 @@ class SRPAuth(QtCore.QObject): "failed")) logger.debug("Session verified.") - SESSION_ID_KEY = "_session_id" - session_id = self._session.cookies.get(SESSION_ID_KEY, None) + session_id = self._session.cookies.get(self.SESSION_ID_KEY, None) if not session_id: logger.error("Bad cookie from server (missing _session_id)") raise SRPAuthenticationError(self.tr("Session cookie " diff --git a/src/leap/crypto/srpregister.py b/src/leap/crypto/srpregister.py index dc137aeb..59aaf257 100644 --- a/src/leap/crypto/srpregister.py +++ b/src/leap/crypto/srpregister.py @@ -125,6 +125,7 @@ class SRPRegister(QtCore.QObject): logger.debug("Will try to register user = %s" % (username,)) logger.debug("user_data => %r" % (user_data,)) + ok = None try: req = self._session.post(uri, data=user_data, @@ -134,15 +135,12 @@ class SRPRegister(QtCore.QObject): except requests.exceptions.SSLError as exc: logger.error("SSLError: %s" % exc.message) - _ok = False req = None - + ok = False else: - _ok = req.ok - - self.registration_finished.emit(_ok, req) - - return _ok + ok = req.ok + self.registration_finished.emit(ok, req) + return ok if __name__ == "__main__": diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py index d3e05812..d533b82b 100755 --- a/src/leap/crypto/tests/fake_provider.py +++ b/src/leap/crypto/tests/fake_provider.py @@ -78,6 +78,9 @@ safe_unhexlify = lambda x: binascii.unhexlify(x) \ class IUser(Interface): + """ + Defines the User Interface + """ login = Attribute("User login.") salt = Attribute("Password salt.") verifier = Attribute("Password verifier.") @@ -86,6 +89,10 @@ class IUser(Interface): class User(object): + """ + User object. + We store it in our simple session mocks + """ implements(IUser) @@ -94,20 +101,37 @@ class User(object): self.salt = salt self.verifier = verifier self.session = None + self.svr = None def set_server_verifier(self, svr): + """ + Adds a svr verifier object to this + User instance + """ self.svr = svr def set_session(self, session): + """ + Adds this instance of User to the + global session dict + """ _SESSIONDB[session] = self self.session = session class FakeUsers(Resource): + """ + Resource that handles user registration. + """ + def __init__(self, name): self.name = name def render_POST(self, request): + """ + Handles POST to the users api resource + Simulates a login. + """ args = request.args login = args['user[login]'][0] @@ -268,11 +292,14 @@ class FakeSession(Resource): class API_Sessions(Resource): + """ + Top resource for the API v1 + """ def getChild(self, name, request): return FakeSession(name) -class OpenSSLServerContextFactory: +class OpenSSLServerContextFactory(object): def getContext(self): """ diff --git a/src/leap/crypto/tests/test.txt b/src/leap/crypto/tests/test.txt deleted file mode 100644 index d6406617..00000000 --- a/src/leap/crypto/tests/test.txt +++ /dev/null @@ -1 +0,0 @@ -OK! diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py index a59f71cb..5ba7306f 100644 --- a/src/leap/crypto/tests/test_srpregister.py +++ b/src/leap/crypto/tests/test_srpregister.py @@ -20,7 +20,7 @@ Tests for: * leap/crypto/srpauth.py """ try: - import unittest + import unittest2 as unittest except ImportError: import unittest import os -- cgit v1.2.3