diff options
| author | Ivan Alejandro <ivanalejandro0@yahoo.com.ar> | 2013-07-05 17:16:57 -0300 | 
|---|---|---|
| committer | Ivan Alejandro <ivanalejandro0@yahoo.com.ar> | 2013-07-05 17:16:57 -0300 | 
| commit | 84c82e2e70eb92f8871f64b294f055c89c8be331 (patch) | |
| tree | 388afd548cede8272168da10bc2544fb669fe316 /src/leap/crypto/tests/test_srpauth.py | |
| parent | b7a323da8610e7722cad2353394f4843c5437ed3 (diff) | |
| parent | ec64e0245c13b6fbc928989d9b412fec034cdcc8 (diff) | |
Merge branch 'feature/srpauth_coverage' into develop
Diffstat (limited to 'src/leap/crypto/tests/test_srpauth.py')
| -rw-r--r-- | src/leap/crypto/tests/test_srpauth.py | 755 | 
1 files changed, 755 insertions, 0 deletions
| diff --git a/src/leap/crypto/tests/test_srpauth.py b/src/leap/crypto/tests/test_srpauth.py new file mode 100644 index 00000000..9a684a8f --- /dev/null +++ b/src/leap/crypto/tests/test_srpauth.py @@ -0,0 +1,755 @@ +# -*- coding: utf-8 -*- +# test_srpauth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: +    * leap/crypto/srpauth.py +""" +try: +    import unittest2 as unittest +except ImportError: +    import unittest +import os +import sys +import binascii +import requests +import mock + +from mock import MagicMock +from nose.twistedtools import reactor, deferred +from twisted.python import log +from twisted.internet import threads +from functools import partial +from requests.models import Response +from simplejson.decoder import JSONDecodeError + +from leap.common.testing.https_server import where +from leap.config.providerconfig import ProviderConfig +from leap.crypto import srpregister, srpauth +from leap.crypto.tests import fake_provider + +log.startLogging(sys.stdout) + + +def _get_capath(): +    return where("cacert.pem") + +_here = os.path.split(__file__)[0] + + +class ImproperlyConfiguredError(Exception): +    """ +    Raised if the test provider is missing configuration +    """ + + +class SRPAuthTestCase(unittest.TestCase): +    """ +    Tests for the SRPAuth class +    """ +    __name__ = "SRPAuth tests" + +    def setUp(self): +        """ +        Sets up this TestCase with a simple and faked provider instance: + +        * runs a threaded reactor +        * loads a mocked ProviderConfig that points to the certs in the +          leap.common.testing module. +        """ +        factory = fake_provider.get_provider_factory() +        http = reactor.listenTCP(0, factory) +        https = reactor.listenSSL( +            0, factory, +            fake_provider.OpenSSLServerContextFactory()) +        get_port = lambda p: p.getHost().port +        self.http_port = get_port(http) +        self.https_port = get_port(https) + +        provider = ProviderConfig() +        provider.get_ca_cert_path = MagicMock() +        provider.get_ca_cert_path.return_value = _get_capath() + +        provider.get_api_uri = MagicMock() +        provider.get_api_uri.return_value = self._get_https_uri() + +        loaded = provider.load(path=os.path.join( +            _here, "test_provider.json")) +        if not loaded: +            raise ImproperlyConfiguredError( +                "Could not load test provider config") +        self.register = srpregister.SRPRegister(provider_config=provider) +        self.provider = provider +        self.TEST_USER = "register_test_auth" +        self.TEST_PASS = "pass" + +        # Reset the singleton +        srpauth.SRPAuth._SRPAuth__instance = None +        self.auth = srpauth.SRPAuth(self.provider) +        self.auth_backend = self.auth._SRPAuth__instance + +        self.old_post = self.auth_backend._session.post +        self.old_put = self.auth_backend._session.put +        self.old_delete = self.auth_backend._session.delete + +        self.old_start_auth = self.auth_backend._start_authentication +        self.old_proc_challenge = self.auth_backend._process_challenge +        self.old_extract_data = self.auth_backend._extract_data +        self.old_verify_session = self.auth_backend._verify_session +        self.old_auth_preproc = self.auth_backend._authentication_preprocessing +        self.old_get_sid = self.auth_backend.get_session_id +        self.old_cookie_get = self.auth_backend._session.cookies.get +        self.old_auth = self.auth_backend.authenticate + +    def tearDown(self): +        self.auth_backend._session.post = self.old_post +        self.auth_backend._session.put = self.old_put +        self.auth_backend._session.delete = self.old_delete + +        self.auth_backend._start_authentication = self.old_start_auth +        self.auth_backend._process_challenge = self.old_proc_challenge +        self.auth_backend._extract_data = self.old_extract_data +        self.auth_backend._verify_session = self.old_verify_session +        self.auth_backend._authentication_preprocessing = self.old_auth_preproc +        self.auth_backend.get_session_id = self.old_get_sid +        self.auth_backend._session.cookies.get = self.old_cookie_get +        self.auth_backend.authenticate = self.old_auth + +    # helper methods + +    def _get_https_uri(self): +        """ +        Returns a https uri with the right https port initialized +        """ +        return "https://localhost:%s" % (self.https_port,) + +    # Auth tests + +    def _prepare_auth_test(self, code=200, side_effect=None): +        """ +        Creates the needed defers to test several test situations. It +        adds up to the auth preprocessing step. + +        :param code: status code for the response of POST in requests +        :type code: int +        :param side_effect: side effect triggered by the POST method +                            in requests +        :type side_effect: some kind of Exception + +        :returns: the defer that is created +        :rtype: defer.Deferred +        """ +        res = Response() +        res.status_code = code +        self.auth_backend._session.post = MagicMock(return_value=res, +                                                    side_effect=side_effect) + +        d = threads.deferToThread(self.register.register_user, +                                  self.TEST_USER, +                                  self.TEST_PASS) + +        def wrapper_preproc(*args): +            return threads.deferToThread( +                self.auth_backend._authentication_preprocessing, +                self.TEST_USER, self.TEST_PASS) + +        d.addCallback(wrapper_preproc) + +        return d + +    def test_safe_unhexlify(self): +        input_value = "somestring" +        test_value = binascii.hexlify(input_value) +        self.assertEqual( +            self.auth_backend._safe_unhexlify(test_value), +            input_value) + +    def test_safe_unhexlify_not_raises(self): +        input_value = "somestring" +        test_value = binascii.hexlify(input_value)[:-1] + +        with self.assertRaises(TypeError): +            binascii.unhexlify(test_value) + +        self.auth_backend._safe_unhexlify(test_value) + +    def test_preprocessing_loads_a(self): +        self.assertEqual(self.auth_backend._srp_a, None) +        self.auth_backend._authentication_preprocessing("user", "pass") +        self.assertIsNotNone(self.auth_backend._srp_a) +        self.assertTrue(len(self.auth_backend._srp_a) > 0) + +    @deferred() +    def test_start_authentication(self): +        d = threads.deferToThread(self.register.register_user, self.TEST_USER, +                                  self.TEST_PASS) + +        def wrapper_preproc(*args): +            return threads.deferToThread( +                self.auth_backend._authentication_preprocessing, +                self.TEST_USER, self.TEST_PASS) + +        d.addCallback(wrapper_preproc) + +        def wrapper(_): +            return threads.deferToThread( +                self.auth_backend._start_authentication, +                None, self.TEST_USER) + +        d.addCallback(wrapper) +        return d + +    @deferred() +    def test_start_authentication_fails_connerror(self): +        d = self._prepare_auth_test( +            side_effect=requests.exceptions.ConnectionError()) + +        def wrapper(_): +            with self.assertRaises(srpauth.SRPAuthConnectionError): +                self.auth_backend._start_authentication(None, self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_start_authentication_fails_any_error(self): +        d = self._prepare_auth_test(side_effect=Exception()) + +        def wrapper(_): +            with self.assertRaises(srpauth.SRPAuthenticationError): +                self.auth_backend._start_authentication(None, self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_start_authentication_fails_unknown_user(self): +        d = self._prepare_auth_test(422) + +        def wrapper(_): +            with self.assertRaises(srpauth.SRPAuthUnknownUser): +                with mock.patch('leap.util.request_helpers.get_content', +                                new_callable=MagicMock()) as \ +                        content: +                    content.return_value = ("{}", 0) + +                    self.auth_backend._start_authentication( +                        None, self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_start_authentication_fails_errorcode(self): +        d = self._prepare_auth_test(302) + +        def wrapper(_): +            with self.assertRaises(srpauth.SRPAuthBadStatusCode): +                with mock.patch('leap.util.request_helpers.get_content', +                                new_callable=MagicMock()) as \ +                        content: +                    content.return_value = ("{}", 0) + +                    self.auth_backend._start_authentication(None, +                                                            self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_start_authentication_fails_no_salt(self): +        d = self._prepare_auth_test(200) + +        def wrapper(_): +            with self.assertRaises(srpauth.SRPAuthNoSalt): +                with mock.patch('leap.util.request_helpers.get_content', +                                new_callable=MagicMock()) as \ +                        content: +                    content.return_value = ("{}", 0) + +                    self.auth_backend._start_authentication(None, +                                                            self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_start_authentication_fails_no_B(self): +        d = self._prepare_auth_test(200) + +        def wrapper(_): +            with self.assertRaises(srpauth.SRPAuthNoB): +                with mock.patch('leap.util.request_helpers.get_content', +                                new_callable=MagicMock()) as \ +                        content: +                    content.return_value = ('{"salt": ""}', 0) + +                    self.auth_backend._start_authentication(None, +                                                            self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_start_authentication_correct_saltb(self): +        d = self._prepare_auth_test(200) + +        test_salt = "12345" +        test_B = "67890" + +        def wrapper(_): +            with mock.patch('leap.util.request_helpers.get_content', +                            new_callable=MagicMock()) as \ +                    content: +                content.return_value = ('{"salt":"%s", "B":"%s"}' % (test_salt, +                                                                     test_B), +                                        0) + +                salt, B = self.auth_backend._start_authentication( +                    None, +                    self.TEST_USER) +                self.assertEqual(salt, test_salt) +                self.assertEqual(B, test_B) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    def _prepare_auth_challenge(self): +        """ +        Creates the needed defers to test several test situations. It +        adds up to the start authentication step. + +        :returns: the defer that is created +        :rtype: defer.Deferred +        """ +        d = threads.deferToThread(self.register.register_user, +                                  self.TEST_USER, +                                  self.TEST_PASS) + +        def wrapper_preproc(*args): +            return threads.deferToThread( +                self.auth_backend._authentication_preprocessing, +                self.TEST_USER, self.TEST_PASS) + +        d.addCallback(wrapper_preproc) + +        def wrapper_start(*args): +            return threads.deferToThread( +                self.auth_backend._start_authentication, +                None, self.TEST_USER) + +        d.addCallback(wrapper_start) + +        return d + +    @deferred() +    def test_process_challenge_wrong_saltb(self): +        d = self._prepare_auth_challenge() + +        def wrapper(salt_B): +            with self.assertRaises(srpauth.SRPAuthBadDataFromServer): +                self.auth_backend._process_challenge("", +                                                     username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) +        return d + +    @deferred() +    def test_process_challenge_requests_problem_raises(self): +        d = self._prepare_auth_challenge() + +        self.auth_backend._session.put = MagicMock( +            side_effect=requests.exceptions.ConnectionError()) + +        def wrapper(salt_B): +            with self.assertRaises(srpauth.SRPAuthConnectionError): +                self.auth_backend._process_challenge(salt_B, +                                                     username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) + +        return d + +    @deferred() +    def test_process_challenge_json_decode_error(self): +        d = self._prepare_auth_challenge() + +        def wrapper(salt_B): +            with mock.patch('leap.util.request_helpers.get_content', +                            new_callable=MagicMock()) as \ +                    content: +                content.return_value = ("{", 0) +                content.side_effect = JSONDecodeError("", "", 0) + +                with self.assertRaises(srpauth.SRPAuthJSONDecodeError): +                   self.auth_backend._process_challenge( +                       salt_B, +                       username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) + +        return d + +    @deferred() +    def test_process_challenge_bad_password(self): +        d = self._prepare_auth_challenge() + +        res = Response() +        res.status_code = 422 +        self.auth_backend._session.put = MagicMock(return_value=res) + +        def wrapper(salt_B): +            with mock.patch('leap.util.request_helpers.get_content', +                            new_callable=MagicMock()) as \ +                    content: +                content.return_value = ("", 0) +                with self.assertRaises(srpauth.SRPAuthBadPassword): +                    self.auth_backend._process_challenge( +                        salt_B, +                        username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) + +        return d + +    @deferred() +    def test_process_challenge_bad_password2(self): +        d = self._prepare_auth_challenge() + +        res = Response() +        res.status_code = 422 +        self.auth_backend._session.put = MagicMock(return_value=res) + +        def wrapper(salt_B): +            with mock.patch('leap.util.request_helpers.get_content', +                            new_callable=MagicMock()) as \ +                    content: +                content.return_value = ("[]", 0) +                with self.assertRaises(srpauth.SRPAuthBadPassword): +                    self.auth_backend._process_challenge( +                        salt_B, +                        username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) + +        return d + +    @deferred() +    def test_process_challenge_other_error_code(self): +        d = self._prepare_auth_challenge() + +        res = Response() +        res.status_code = 300 +        self.auth_backend._session.put = MagicMock(return_value=res) + +        def wrapper(salt_B): +            with mock.patch('leap.util.request_helpers.get_content', +                            new_callable=MagicMock()) as \ +                    content: +                content.return_value = ("{}", 0) +                with self.assertRaises(srpauth.SRPAuthBadStatusCode): +                    self.auth_backend._process_challenge( +                        salt_B, +                        username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) + +        return d + +    @deferred() +    def test_process_challenge(self): +        d = self._prepare_auth_challenge() + +        def wrapper(salt_B): +            self.auth_backend._process_challenge(salt_B, +                                                 username=self.TEST_USER) + +        d.addCallback(partial(threads.deferToThread, wrapper)) + +        return d + +    def test_extract_data_wrong_data(self): +        with self.assertRaises(srpauth.SRPAuthBadDataFromServer): +            self.auth_backend._extract_data(None) + +        with self.assertRaises(srpauth.SRPAuthBadDataFromServer): +            self.auth_backend._extract_data("") + +    def test_extract_data_fails_on_wrong_data_from_server(self): +        with self.assertRaises(srpauth.SRPAuthBadDataFromServer): +            self.auth_backend._extract_data({}) + +        with self.assertRaises(srpauth.SRPAuthBadDataFromServer): +            self.auth_backend._extract_data({"M2": ""}) + +    def test_extract_data_sets_uidtoken(self): +        test_uid = "someuid" +        test_m2 = "somem2" +        test_token = "sometoken" +        test_data = { +            "M2": test_m2, +            "id": test_uid, +            "token": test_token +        } +        m2 = self.auth_backend._extract_data(test_data) + +        self.assertEqual(m2, test_m2) +        self.assertEqual(self.auth_backend.get_uid(), test_uid) +        self.assertEqual(self.auth_backend.get_uid(), +                         self.auth.get_uid()) +        self.assertEqual(self.auth_backend.get_token(), test_token) +        self.assertEqual(self.auth_backend.get_token(), +                         self.auth.get_token()) + +    def _prepare_verify_session(self): +        """ +        Prepares the tests for verify session with needed steps +        before. It adds up to the extract_data step. + +        :returns: The defer to chain to +        :rtype: defer.Deferred +        """ +        d = self._prepare_auth_challenge() + +        def wrapper_proc_challenge(salt_B): +            return self.auth_backend._process_challenge( +                salt_B, +                username=self.TEST_USER) + +        def wrapper_extract_data(data): +            return self.auth_backend._extract_data(data) + +        d.addCallback(partial(threads.deferToThread, wrapper_proc_challenge)) +        d.addCallback(partial(threads.deferToThread, wrapper_extract_data)) + +        return d + +    @deferred() +    def test_verify_session_unhexlifiable_m2(self): +        d = self._prepare_verify_session() + +        def wrapper(M2): +            with self.assertRaises(srpauth.SRPAuthBadDataFromServer): +                self.auth_backend._verify_session("za")  # unhexlifiable value + +        d.addCallback(wrapper) + +        return d + +    @deferred() +    def test_verify_session_unverifiable_m2(self): +        d = self._prepare_verify_session() + +        def wrapper(M2): +            with self.assertRaises(srpauth.SRPAuthVerificationFailed): +                # Correctly unhelifiable value, but not for verifying the +                # session +                self.auth_backend._verify_session("abc12") + +        d.addCallback(wrapper) + +        return d + +    @deferred() +    def test_verify_session_fails_on_no_session_id(self): +        d = self._prepare_verify_session() + +        def wrapper(M2): +            self.auth_backend._session.cookies.get = MagicMock( +                return_value=None) +            with self.assertRaises(srpauth.SRPAuthNoSessionId): +                self.auth_backend._verify_session(M2) + +        d.addCallback(wrapper) + +        return d + +    @deferred() +    def test_verify_session_session_id(self): +        d = self._prepare_verify_session() + +        test_session_id = "12345" + +        def wrapper(M2): +            self.auth_backend._session.cookies.get = MagicMock( +                return_value=test_session_id) +            self.auth_backend._verify_session(M2) +            self.assertEqual(self.auth_backend.get_session_id(), +                             test_session_id) +            self.assertEqual(self.auth_backend.get_session_id(), +                             self.auth.get_session_id()) + +        d.addCallback(wrapper) + +        return d + +    @deferred() +    def test_verify_session(self): +        d = self._prepare_verify_session() + +        def wrapper(M2): +            self.auth_backend._verify_session(M2) + +        d.addCallback(wrapper) + +        return d + +    @deferred() +    def test_authenticate(self): +        self.auth_backend._authentication_preprocessing = MagicMock( +            return_value=None) +        self.auth_backend._start_authentication = MagicMock(return_value=None) +        self.auth_backend._process_challenge = MagicMock(return_value=None) +        self.auth_backend._extract_data = MagicMock(return_value=None) +        self.auth_backend._verify_session = MagicMock(return_value=None) + +        d = self.auth_backend.authenticate(self.TEST_USER, self.TEST_PASS) + +        def check(*args): +            self.auth_backend._authentication_preprocessing.\ +                assert_called_once_with( +                    username=self.TEST_USER, +                    password=self.TEST_PASS +                ) +            self.auth_backend._start_authentication.assert_called_once_with( +                None, +                username=self.TEST_USER) +            self.auth_backend._process_challenge.assert_called_once_with( +                None, +                username=self.TEST_USER) +            self.auth_backend._extract_data.assert_called_once_with( +                None, +                username=self.TEST_USER) +            self.auth_backend._verify_session.assert_called_once_with(None) + +        d.addCallback(check) + +        return d + +    @deferred() +    def test_logout_fails_if_not_logged_in(self): + +        def wrapper(*args): +            with self.assertRaises(AssertionError): +                self.auth_backend.logout() + +        d = threads.deferToThread(wrapper) +        return d + +    @deferred() +    def test_logout_traps_delete(self): +        self.auth_backend.get_session_id = MagicMock(return_value="1234") +        self.auth_backend._session.delete = MagicMock(side_effect=Exception()) + +        def wrapper(*args): +            self.auth_backend.logout() + +        d = threads.deferToThread(wrapper) +        return d + +    @deferred() +    def test_logout_clears(self): +        self.auth_backend._session_id = "1234" + +        def wrapper(*args): +            old_session = self.auth_backend._session +            self.auth_backend.logout() +            self.assertIsNone(self.auth_backend.get_session_id()) +            self.assertIsNone(self.auth_backend.get_uid()) +            self.assertNotEqual(old_session, self.auth_backend._session) + +        d = threads.deferToThread(wrapper) +        return d + + +class SRPAuthSingletonTestCase(unittest.TestCase): +    def setUp(self): +        self.old_auth = srpauth.SRPAuth._SRPAuth__impl.authenticate + +    def tearDown(self): +        srpauth.SRPAuth._SRPAuth__impl.authenticate = self.old_auth + +    def test_singleton(self): +        obj1 = srpauth.SRPAuth(ProviderConfig()) +        obj2 = srpauth.SRPAuth(ProviderConfig()) +        self.assertEqual(obj1._SRPAuth__instance, obj2._SRPAuth__instance) + +    @deferred() +    def test_authenticate_notifies_gui(self): +        auth = srpauth.SRPAuth(ProviderConfig()) +        auth._SRPAuth__instance.authenticate = MagicMock( +            return_value=threads.deferToThread(lambda: None)) +        auth._gui_notify = MagicMock() + +        d = auth.authenticate("", "") + +        def check(*args): +            auth._gui_notify.assert_called_once_with(None) + +        d.addCallback(check) +        return d + +    @deferred() +    def test_authenticate_errsback(self): +        auth = srpauth.SRPAuth(ProviderConfig()) +        auth._SRPAuth__instance.authenticate = MagicMock( +            return_value=threads.deferToThread(MagicMock( +                side_effect=Exception()))) +        auth._gui_notify = MagicMock() +        auth._errback = MagicMock() + +        d = auth.authenticate("", "") + +        def check(*args): +            self.assertFalse(auth._gui_notify.called) +            self.assertEqual(auth._errback.call_count, 1) + +        d.addCallback(check) +        return d + +    @deferred() +    def test_authenticate_runs_cleanly_when_raises(self): +        auth = srpauth.SRPAuth(ProviderConfig()) +        auth._SRPAuth__instance.authenticate = MagicMock( +            return_value=threads.deferToThread(MagicMock( +                side_effect=Exception()))) + +        d = auth.authenticate("", "") + +        return d + +    @deferred() +    def test_authenticate_runs_cleanly(self): +        auth = srpauth.SRPAuth(ProviderConfig()) +        auth._SRPAuth__instance.authenticate = MagicMock( +            return_value=threads.deferToThread(MagicMock())) + +        d = auth.authenticate("", "") + +        return d + +    def test_logout(self): +        auth = srpauth.SRPAuth(ProviderConfig()) +        auth._SRPAuth__instance.logout = MagicMock() + +        self.assertTrue(auth.logout()) + +    def test_logout_rets_false_when_raises(self): +        auth = srpauth.SRPAuth(ProviderConfig()) +        auth._SRPAuth__instance.logout = MagicMock( +            side_effect=Exception()) + +        self.assertFalse(auth.logout()) | 
