From 6da8d09846db4d2eed01e488bc6a6f5ba48b959f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 12 Aug 2013 13:25:44 +0200 Subject: move everything into bitmask namespace --- src/leap/crypto/__init__.py | 0 src/leap/crypto/srpauth.py | 606 ----------------------- src/leap/crypto/srpregister.py | 168 ------- src/leap/crypto/tests/__init__.py | 16 - src/leap/crypto/tests/eip-service.json | 43 -- src/leap/crypto/tests/fake_provider.py | 376 -------------- src/leap/crypto/tests/openvpn.pem | 33 -- src/leap/crypto/tests/test_provider.json | 15 - src/leap/crypto/tests/test_srpauth.py | 790 ------------------------------ src/leap/crypto/tests/test_srpregister.py | 201 -------- src/leap/crypto/tests/wrongcert.pem | 33 -- 11 files changed, 2281 deletions(-) delete mode 100644 src/leap/crypto/__init__.py delete mode 100644 src/leap/crypto/srpauth.py delete mode 100644 src/leap/crypto/srpregister.py delete mode 100644 src/leap/crypto/tests/__init__.py delete mode 100644 src/leap/crypto/tests/eip-service.json delete mode 100755 src/leap/crypto/tests/fake_provider.py delete mode 100644 src/leap/crypto/tests/openvpn.pem delete mode 100644 src/leap/crypto/tests/test_provider.json delete mode 100644 src/leap/crypto/tests/test_srpauth.py delete mode 100644 src/leap/crypto/tests/test_srpregister.py delete mode 100644 src/leap/crypto/tests/wrongcert.pem (limited to 'src/leap/crypto') diff --git a/src/leap/crypto/__init__.py b/src/leap/crypto/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/leap/crypto/srpauth.py b/src/leap/crypto/srpauth.py deleted file mode 100644 index fc0533fc..00000000 --- a/src/leap/crypto/srpauth.py +++ /dev/null @@ -1,606 +0,0 @@ -# -*- coding: utf-8 -*- -# srpauth.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import binascii -import logging - -import requests -import srp -import json - -#this error is raised from requests -from simplejson.decoder import JSONDecodeError -from functools import partial - -from PySide import QtCore -from twisted.internet import threads - -from leap.common.check import leap_assert -from leap.util.constants import REQUEST_TIMEOUT -from leap.util import request_helpers as reqhelper -from leap.common.events import signal as events_signal -from leap.common.events import events_pb2 as proto - -logger = logging.getLogger(__name__) - - -class SRPAuthenticationError(Exception): - """ - Exception raised for authentication errors - """ - pass - - -class SRPAuthConnectionError(SRPAuthenticationError): - """ - Exception raised when there's a connection error - """ - pass - - -class SRPAuthUnknownUser(SRPAuthenticationError): - """ - Exception raised when trying to authenticate an unknown user - """ - pass - - -class SRPAuthBadStatusCode(SRPAuthenticationError): - """ - Exception raised when we received an unknown bad status code - """ - pass - - -class SRPAuthNoSalt(SRPAuthenticationError): - """ - Exception raised when we don't receive the salt param at a - specific point in the auth process - """ - pass - - -class SRPAuthNoB(SRPAuthenticationError): - """ - Exception raised when we don't receive the B param at a specific - point in the auth process - """ - pass - - -class SRPAuthBadDataFromServer(SRPAuthenticationError): - """ - Generic exception when we receive bad data from the server. - """ - pass - - -class SRPAuthJSONDecodeError(SRPAuthenticationError): - """ - Exception raised when there's a problem decoding the JSON content - parsed as received from th e server. - """ - pass - - -class SRPAuthBadPassword(SRPAuthenticationError): - """ - Exception raised when the user provided a bad password to auth. - """ - pass - - -class SRPAuthVerificationFailed(SRPAuthenticationError): - """ - Exception raised when we can't verify the SRP data received from - the server. - """ - pass - - -class SRPAuthNoSessionId(SRPAuthenticationError): - """ - Exception raised when we don't receive a session id from the - server. - """ - pass - - -class SRPAuth(QtCore.QObject): - """ - SRPAuth singleton - """ - - class __impl(QtCore.QObject): - """ - Implementation of the SRPAuth interface - """ - - LOGIN_KEY = "login" - A_KEY = "A" - CLIENT_AUTH_KEY = "client_auth" - SESSION_ID_KEY = "_session_id" - - def __init__(self, provider_config): - """ - Constructor for SRPAuth implementation - - :param server: Server to which we will authenticate - :type server: str - """ - QtCore.QObject.__init__(self) - - leap_assert(provider_config, - "We need a provider config to authenticate") - - self._provider_config = provider_config - - # **************************************************** # - # Dependency injection helpers, override this for more - # granular testing - self._fetcher = requests - self._srp = srp - self._hashfun = self._srp.SHA256 - self._ng = self._srp.NG_1024 - # **************************************************** # - - self._session = self._fetcher.session() - self._session_id = None - self._session_id_lock = QtCore.QMutex() - self._uid = None - self._uid_lock = QtCore.QMutex() - self._token = None - self._token_lock = QtCore.QMutex() - - self._srp_user = None - self._srp_a = None - - def _safe_unhexlify(self, val): - """ - Rounds the val to a multiple of 2 and returns the - unhexlified value - - :param val: hexlified value - :type val: str - - :rtype: binary hex data - :return: unhexlified val - """ - return binascii.unhexlify(val) \ - if (len(val) % 2 == 0) else binascii.unhexlify('0' + val) - - def _authentication_preprocessing(self, username, password): - """ - Generates the SRP.User to get the A SRP parameter - - :param username: username to login - :type username: str - :param password: password for the username - :type password: str - """ - logger.debug("Authentication preprocessing...") - self._srp_user = self._srp.User(username, - password, - self._hashfun, - self._ng) - _, A = self._srp_user.start_authentication() - - self._srp_a = A - - def _start_authentication(self, _, username): - """ - Sends the first request for authentication to retrieve the - salt and B parameter - - Might raise all SRPAuthenticationError based: - SRPAuthenticationError - SRPAuthConnectionError - SRPAuthUnknownUser - SRPAuthBadStatusCode - SRPAuthNoSalt - SRPAuthNoB - - :param _: IGNORED, output from the previous callback (None) - :type _: IGNORED - :param username: username to login - :type username: str - - :return: salt and B parameters - :rtype: tuple - """ - logger.debug("Starting authentication process...") - try: - auth_data = { - self.LOGIN_KEY: username, - self.A_KEY: binascii.hexlify(self._srp_a) - } - sessions_url = "%s/%s/%s/" % \ - (self._provider_config.get_api_uri(), - self._provider_config.get_api_version(), - "sessions") - init_session = self._session.post(sessions_url, - data=auth_data, - verify=self._provider_config. - get_ca_cert_path(), - timeout=REQUEST_TIMEOUT) - # Clean up A value, we don't need it anymore - self._srp_a = None - except requests.exceptions.ConnectionError as e: - logger.error("No connection made (salt): %r" % - (e,)) - raise SRPAuthConnectionError("Could not establish a " - "connection") - except Exception as e: - logger.error("Unknown error: %r" % (e,)) - raise SRPAuthenticationError("Unknown error: %r" % - (e,)) - - content, mtime = reqhelper.get_content(init_session) - - if init_session.status_code not in (200,): - logger.error("No valid response (salt): " - "Status code = %r. Content: %r" % - (init_session.status_code, content)) - if init_session.status_code == 422: - raise SRPAuthUnknownUser(self.tr("Unknown user")) - - raise SRPAuthBadStatusCode(self.tr("There was a problem with" - " authentication")) - - json_content = json.loads(content) - salt = json_content.get("salt", None) - B = json_content.get("B", None) - - if salt is None: - logger.error("No salt parameter sent") - raise SRPAuthNoSalt(self.tr("The server did not send " - "the salt parameter")) - if B is None: - logger.error("No B parameter sent") - raise SRPAuthNoB(self.tr("The server did not send " - "the B parameter")) - - return salt, B - - def _process_challenge(self, salt_B, username): - """ - Given the salt and B processes the auth challenge and - generates the M2 parameter - - Might raise SRPAuthenticationError based: - SRPAuthenticationError - SRPAuthBadDataFromServer - SRPAuthConnectionError - SRPAuthJSONDecodeError - SRPAuthBadPassword - - :param salt_B: salt and B parameters for the username - :type salt_B: tuple - :param username: username for this session - :type username: str - - :return: the M2 SRP parameter - :rtype: str - """ - logger.debug("Processing challenge...") - try: - salt, B = salt_B - unhex_salt = self._safe_unhexlify(salt) - unhex_B = self._safe_unhexlify(B) - except (TypeError, ValueError) as e: - logger.error("Bad data from server: %r" % (e,)) - raise SRPAuthBadDataFromServer( - self.tr("The data sent from the server had errors")) - M = self._srp_user.process_challenge(unhex_salt, unhex_B) - - auth_url = "%s/%s/%s/%s" % (self._provider_config.get_api_uri(), - self._provider_config. - get_api_version(), - "sessions", - username) - - auth_data = { - self.CLIENT_AUTH_KEY: binascii.hexlify(M) - } - - try: - auth_result = self._session.put(auth_url, - data=auth_data, - verify=self._provider_config. - get_ca_cert_path(), - timeout=REQUEST_TIMEOUT) - except requests.exceptions.ConnectionError as e: - logger.error("No connection made (HAMK): %r" % (e,)) - raise SRPAuthConnectionError(self.tr("Could not connect to " - "the server")) - - try: - content, mtime = reqhelper.get_content(auth_result) - except JSONDecodeError: - raise SRPAuthJSONDecodeError("Bad JSON content in auth result") - - if auth_result.status_code == 422: - error = "" - try: - error = json.loads(content).get("errors", "") - except ValueError: - logger.error("Problem parsing the received response: %s" - % (content,)) - except AttributeError: - logger.error("Expecting a dict but something else was " - "received: %s", (content,)) - logger.error("[%s] Wrong password (HAMK): [%s]" % - (auth_result.status_code, error)) - raise SRPAuthBadPassword(self.tr("Wrong password")) - - if auth_result.status_code not in (200,): - logger.error("No valid response (HAMK): " - "Status code = %s. Content = %r" % - (auth_result.status_code, content)) - raise SRPAuthBadStatusCode(self.tr("Unknown error (%s)") % - (auth_result.status_code,)) - - return json.loads(content) - - def _extract_data(self, json_content): - """ - Extracts the necessary parameters from json_content (M2, - id, token) - - Might raise SRPAuthenticationError based: - SRPBadDataFromServer - - :param json_content: Data received from the server - :type json_content: dict - """ - try: - M2 = json_content.get("M2", None) - uid = json_content.get("id", None) - token = json_content.get("token", None) - except Exception as e: - logger.error(e) - raise SRPAuthBadDataFromServer("Something went wrong with the " - "login") - - self.set_uid(uid) - self.set_token(token) - - if M2 is None or self.get_uid() is None: - logger.error("Something went wrong. Content = %r" % - (json_content,)) - raise SRPAuthBadDataFromServer(self.tr("Problem getting data " - "from server")) - - events_signal( - proto.CLIENT_UID, content=uid, - reqcbk=lambda req, res: None) # make the rpc call async - - return M2 - - def _verify_session(self, M2): - """ - Verifies the session based on the M2 parameter. If the - verification succeeds, it sets the session_id for this - session - - Might raise SRPAuthenticationError based: - SRPAuthBadDataFromServer - SRPAuthVerificationFailed - - :param M2: M2 SRP parameter - :type M2: str - """ - logger.debug("Verifying session...") - try: - unhex_M2 = self._safe_unhexlify(M2) - except TypeError: - logger.error("Bad data from server (HAWK)") - raise SRPAuthBadDataFromServer(self.tr("Bad data from server")) - - self._srp_user.verify_session(unhex_M2) - - if not self._srp_user.authenticated(): - logger.error("Auth verification failed") - raise SRPAuthVerificationFailed(self.tr("Auth verification " - "failed")) - logger.debug("Session verified.") - - session_id = self._session.cookies.get(self.SESSION_ID_KEY, None) - if not session_id: - logger.error("Bad cookie from server (missing _session_id)") - raise SRPAuthNoSessionId(self.tr("Session cookie " - "verification " - "failed")) - - events_signal( - proto.CLIENT_SESSION_ID, content=session_id, - reqcbk=lambda req, res: None) # make the rpc call async - - self.set_session_id(session_id) - - def _threader(self, cb, res, *args, **kwargs): - return threads.deferToThread(cb, res, *args, **kwargs) - - def authenticate(self, username, password): - """ - Executes the whole authentication process for a user - - Might raise SRPAuthenticationError - - :param username: username for this session - :type username: str - :param password: password for this user - :type password: str - - :returns: A defer on a different thread - :rtype: twisted.internet.defer.Deferred - """ - leap_assert(self.get_session_id() is None, "Already logged in") - - d = threads.deferToThread(self._authentication_preprocessing, - username=username, - password=password) - - d.addCallback( - partial(self._threader, - self._start_authentication), - username=username) - d.addCallback( - partial(self._threader, - self._process_challenge), - username=username) - d.addCallback( - partial(self._threader, - self._extract_data)) - d.addCallback(partial(self._threader, - self._verify_session)) - - return d - - def logout(self): - """ - Logs out the current session. - Expects a session_id to exists, might raise AssertionError - """ - logger.debug("Starting logout...") - - leap_assert(self.get_session_id(), - "Cannot logout an unexisting session") - - logout_url = "%s/%s/%s/" % (self._provider_config.get_api_uri(), - self._provider_config. - get_api_version(), - "sessions") - try: - self._session.delete(logout_url, - data=self.get_session_id(), - verify=self._provider_config. - get_ca_cert_path(), - timeout=REQUEST_TIMEOUT) - except Exception as e: - logger.warning("Something went wrong with the logout: %r" % - (e,)) - - self.set_session_id(None) - self.set_uid(None) - # Also reset the session - self._session = self._fetcher.session() - logger.debug("Successfully logged out.") - - def set_session_id(self, session_id): - QtCore.QMutexLocker(self._session_id_lock) - self._session_id = session_id - - def get_session_id(self): - QtCore.QMutexLocker(self._session_id_lock) - return self._session_id - - def set_uid(self, uid): - QtCore.QMutexLocker(self._uid_lock) - self._uid = uid - - def get_uid(self): - QtCore.QMutexLocker(self._uid_lock) - return self._uid - - def set_token(self, token): - QtCore.QMutexLocker(self._token_lock) - self._token = token - - def get_token(self): - QtCore.QMutexLocker(self._token_lock) - return self._token - - __instance = None - - authentication_finished = QtCore.Signal(bool, str) - logout_finished = QtCore.Signal(bool, str) - - def __init__(self, provider_config): - """ - Creates a singleton instance if needed - """ - QtCore.QObject.__init__(self) - - # Check whether we already have an instance - if SRPAuth.__instance is None: - # Create and remember instance - SRPAuth.__instance = SRPAuth.__impl(provider_config) - - # Store instance reference as the only member in the handle - self.__dict__['_SRPAuth__instance'] = SRPAuth.__instance - - def authenticate(self, username, password): - """ - Executes the whole authentication process for a user - - Might raise SRPAuthenticationError based - - :param username: username for this session - :type username: str - :param password: password for this user - :type password: str - """ - - d = self.__instance.authenticate(username, password) - d.addCallback(self._gui_notify) - d.addErrback(self._errback) - return d - - def _gui_notify(self, _): - """ - Callback that notifies the UI with the proper signal. - - :param _: IGNORED, output from the previous callback (None) - :type _: IGNORED - """ - logger.debug("Successful login!") - self.authentication_finished.emit(True, self.tr("Succeeded")) - - def _errback(self, failure): - """ - General errback for the whole login process. Will notify the - UI with the proper signal. - - :param failure: Failure object captured from a callback. - :type failure: twisted.python.failure.Failure - """ - logger.error("Error logging in %s" % (failure,)) - self.authentication_finished.emit(False, "%s" % (failure.value,)) - failure.trap(Exception) - - def get_session_id(self): - return self.__instance.get_session_id() - - def get_uid(self): - return self.__instance.get_uid() - - def get_token(self): - return self.__instance.get_token() - - def logout(self): - """ - Logs out the current session. - Expects a session_id to exists, might raise AssertionError - """ - try: - self.__instance.logout() - self.logout_finished.emit(True, self.tr("Succeeded")) - return True - except Exception as e: - self.logout_finished.emit(False, "%s" % (e,)) - return False diff --git a/src/leap/crypto/srpregister.py b/src/leap/crypto/srpregister.py deleted file mode 100644 index de1978b5..00000000 --- a/src/leap/crypto/srpregister.py +++ /dev/null @@ -1,168 +0,0 @@ -# -*- coding: utf-8 -*- -# srpregister.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import binascii -import logging - -import requests -import srp - -from PySide import QtCore -from urlparse import urlparse - -from leap.config.providerconfig import ProviderConfig -from leap.util.constants import SIGNUP_TIMEOUT -from leap.common.check import leap_assert, leap_assert_type - -logger = logging.getLogger(__name__) - - -class SRPRegister(QtCore.QObject): - """ - Registers a user to a specific provider using SRP - """ - - USER_LOGIN_KEY = 'user[login]' - USER_VERIFIER_KEY = 'user[password_verifier]' - USER_SALT_KEY = 'user[password_salt]' - - registration_finished = QtCore.Signal(bool, object) - - def __init__(self, - provider_config=None, - register_path="users"): - """ - Constructor - - :param provider_config: provider configuration instance, - properly loaded - :type privider_config: ProviderConfig - :param register_path: webapp path for registering users - :type register_path; str - """ - QtCore.QObject.__init__(self) - leap_assert(provider_config, "Please provide a provider") - leap_assert_type(provider_config, ProviderConfig) - - self._provider_config = provider_config - - # **************************************************** # - # Dependency injection helpers, override this for more - # granular testing - self._fetcher = requests - self._srp = srp - self._hashfun = self._srp.SHA256 - self._ng = self._srp.NG_1024 - # **************************************************** # - - parsed_url = urlparse(provider_config.get_api_uri()) - self._provider = parsed_url.hostname - self._port = parsed_url.port - if self._port is None: - self._port = "443" - - self._register_path = register_path - - self._session = self._fetcher.session() - - def _get_registration_uri(self): - """ - Returns the URI where the register request should be made for - the provider - - :rtype: str - """ - - uri = "https://%s:%s/%s/%s" % ( - self._provider, - self._port, - self._provider_config.get_api_version(), - self._register_path) - - return uri - - def register_user(self, username, password): - """ - Registers a user with the validator based on the password provider - - :param username: username to register - :type username: str - :param password: password for this username - :type password: str - - :rtype: tuple - :rparam: (ok, request) - """ - salt, verifier = self._srp.create_salted_verification_key( - username, - password, - self._hashfun, - self._ng) - - user_data = { - self.USER_LOGIN_KEY: username, - self.USER_VERIFIER_KEY: binascii.hexlify(verifier), - self.USER_SALT_KEY: binascii.hexlify(salt) - } - - uri = self._get_registration_uri() - - logger.debug('Post to uri: %s' % uri) - logger.debug("Will try to register user = %s" % (username,)) - - ok = False - # This should be None, but we don't like when PySide segfaults, - # so it something else. - # To reproduce it, just do: - # self.registration_finished.emit(False, None) - req = [] - try: - req = self._session.post(uri, - data=user_data, - timeout=SIGNUP_TIMEOUT, - verify=self._provider_config. - get_ca_cert_path()) - - except (requests.exceptions.SSLError, - requests.exceptions.ConnectionError) as exc: - logger.error(exc.message) - ok = False - else: - ok = req.ok - - self.registration_finished.emit(ok, req) - return ok - - -if __name__ == "__main__": - logger = logging.getLogger(name='leap') - logger.setLevel(logging.DEBUG) - console = logging.StreamHandler() - console.setLevel(logging.DEBUG) - formatter = logging.Formatter( - '%(asctime)s ' - '- %(name)s - %(levelname)s - %(message)s') - console.setFormatter(formatter) - logger.addHandler(console) - - provider = ProviderConfig() - - if provider.load("leap/providers/bitmask.net/provider.json"): - register = SRPRegister(provider_config=provider) - print "Registering user..." - print register.register_user("test1", "sarasaaaa") - print register.register_user("test2", "sarasaaaa") diff --git a/src/leap/crypto/tests/__init__.py b/src/leap/crypto/tests/__init__.py deleted file mode 100644 index 7f118735..00000000 --- a/src/leap/crypto/tests/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -*- coding: utf-8 -*- -# __init__.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . diff --git a/src/leap/crypto/tests/eip-service.json b/src/leap/crypto/tests/eip-service.json deleted file mode 100644 index 24df42a2..00000000 --- a/src/leap/crypto/tests/eip-service.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "gateways": [ - { - "capabilities": { - "adblock": false, - "filter_dns": false, - "limited": true, - "ports": [ - "1194", - "443", - "53", - "80" - ], - "protocols": [ - "tcp", - "udp" - ], - "transport": [ - "openvpn" - ], - "user_ips": false - }, - "host": "harrier.cdev.bitmask.net", - "ip_address": "199.254.238.50", - "location": "seattle__wa" - } - ], - "locations": { - "seattle__wa": { - "country_code": "US", - "hemisphere": "N", - "name": "Seattle, WA", - "timezone": "-7" - } - }, - "openvpn_configuration": { - "auth": "SHA1", - "cipher": "AES-128-CBC", - "tls-cipher": "DHE-RSA-AES128-SHA" - }, - "serial": 1, - "version": 1 -} \ No newline at end of file diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py deleted file mode 100755 index 54af485d..00000000 --- a/src/leap/crypto/tests/fake_provider.py +++ /dev/null @@ -1,376 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# fake_provider.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -"""A server faking some of the provider resources and apis, -used for testing Leap Client requests - -It needs that you create a subfolder named 'certs', -and that you place the following files: - -XXX check if in use - -[ ] test-openvpn.pem -[ ] test-provider.json -[ ] test-eip-service.json -""" -import binascii -import json -import os -import sys -import time - -import srp - -from OpenSSL import SSL - -from zope.interface import Interface, Attribute, implements - -from twisted.web.server import Site, Request -from twisted.web.static import File, Data -from twisted.web.resource import Resource -from twisted.internet import reactor - -from leap.common.testing.https_server import where - -# See -# http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.html -# for more examples - -""" -Testing the FAKE_API: -##################### - - 1) register an user - >> curl -d "user[login]=me" -d "user[password_salt]=foo" \ - -d "user[password_verifier]=beef" http://localhost:8000/1/users - << {"errors": null} - - 2) check that if you try to register again, it will fail: - >> curl -d "user[login]=me" -d "user[password_salt]=foo" \ - -d "user[password_verifier]=beef" http://localhost:8000/1/users - << {"errors": {"login": "already taken!"}} - -""" - -# Globals to mock user/sessiondb - -_USERDB = {} -_SESSIONDB = {} - -_here = os.path.split(__file__)[0] - - -safe_unhexlify = lambda x: binascii.unhexlify(x) \ - if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) - - -class IUser(Interface): - """ - Defines the User Interface - """ - login = Attribute("User login.") - salt = Attribute("Password salt.") - verifier = Attribute("Password verifier.") - session = Attribute("Session.") - svr = Attribute("Server verifier.") - - -class User(object): - """ - User object. - We store it in our simple session mocks - """ - - implements(IUser) - - def __init__(self, login, salt, verifier): - self.login = login - self.salt = salt - self.verifier = verifier - self.session = None - self.svr = None - - def set_server_verifier(self, svr): - """ - Adds a svr verifier object to this - User instance - """ - self.svr = svr - - def set_session(self, session): - """ - Adds this instance of User to the - global session dict - """ - _SESSIONDB[session] = self - self.session = session - - -class FakeUsers(Resource): - """ - Resource that handles user registration. - """ - - def __init__(self, name): - self.name = name - - def render_POST(self, request): - """ - Handles POST to the users api resource - Simulates a login. - """ - args = request.args - - login = args['user[login]'][0] - salt = args['user[password_salt]'][0] - verifier = args['user[password_verifier]'][0] - - if login in _USERDB: - request.setResponseCode(422) - return "%s\n" % json.dumps( - {'errors': {'login': 'already taken!'}}) - - print '[server]', login, verifier, salt - user = User(login, salt, verifier) - _USERDB[login] = user - return json.dumps({'errors': None}) - - -def getSession(self, sessionInterface=None): - """ - we overwrite twisted.web.server.Request.getSession method to - put the right cookie name in place - """ - if not self.session: - #cookiename = b"_".join([b'TWISTED_SESSION'] + self.sitepath) - cookiename = b"_".join([b'_session_id'] + self.sitepath) - sessionCookie = self.getCookie(cookiename) - if sessionCookie: - try: - self.session = self.site.getSession(sessionCookie) - except KeyError: - pass - # if it still hasn't been set, fix it up. - if not self.session: - self.session = self.site.makeSession() - self.addCookie(cookiename, self.session.uid, path=b'/') - self.session.touch() - if sessionInterface: - return self.session.getComponent(sessionInterface) - return self.session - - -def get_user(request): - """ - Returns user from the session dict - """ - login = request.args.get('login') - if login: - user = _USERDB.get(login[0], None) - if user: - return user - - request.getSession = getSession.__get__(request, Request) - session = request.getSession() - - user = _SESSIONDB.get(session, None) - return user - - -class FakeSession(Resource): - def __init__(self, name): - """ - Initializes session - """ - self.name = name - - def render_GET(self, request): - """ - Handles GET requests. - """ - return "%s\n" % json.dumps({'errors': None}) - - def render_POST(self, request): - """ - Handles POST requests. - """ - user = get_user(request) - - if not user: - # XXX get real error from demo provider - return json.dumps({'errors': 'no such user'}) - - A = request.args['A'][0] - - _A = safe_unhexlify(A) - _salt = safe_unhexlify(user.salt) - _verifier = safe_unhexlify(user.verifier) - - svr = srp.Verifier( - user.login, - _salt, - _verifier, - _A, - hash_alg=srp.SHA256, - ng_type=srp.NG_1024) - - s, B = svr.get_challenge() - - _B = binascii.hexlify(B) - - print '[server] login = %s' % user.login - print '[server] salt = %s' % user.salt - print '[server] len(_salt) = %s' % len(_salt) - print '[server] vkey = %s' % user.verifier - print '[server] len(vkey) = %s' % len(_verifier) - print '[server] s = %s' % binascii.hexlify(s) - print '[server] B = %s' % _B - print '[server] len(B) = %s' % len(_B) - - # override Request.getSession - request.getSession = getSession.__get__(request, Request) - session = request.getSession() - - user.set_session(session) - user.set_server_verifier(svr) - - # yep, this is tricky. - # some things are *already* unhexlified. - data = { - 'salt': user.salt, - 'B': _B, - 'errors': None} - - return json.dumps(data) - - def render_PUT(self, request): - """ - Handles PUT requests. - """ - # XXX check session??? - user = get_user(request) - - if not user: - print '[server] NO USER' - return json.dumps({'errors': 'no such user'}) - - data = request.content.read() - auth = data.split("client_auth=") - M = auth[1] if len(auth) > 1 else None - # if not H, return - if not M: - return json.dumps({'errors': 'no M proof passed by client'}) - - svr = user.svr - HAMK = svr.verify_session(binascii.unhexlify(M)) - if HAMK is None: - print '[server] verification failed!!!' - raise Exception("Authentication failed!") - #import ipdb;ipdb.set_trace() - - assert svr.authenticated() - print "***" - print '[server] User successfully authenticated using SRP!' - print "***" - - return json.dumps( - {'M2': binascii.hexlify(HAMK), - 'id': '9c943eb9d96a6ff1b7a7030bdeadbeef', - 'errors': None}) - - -class API_Sessions(Resource): - """ - Top resource for the API v1 - """ - def getChild(self, name, request): - return FakeSession(name) - - -class FileModified(File): - def render_GET(self, request): - since = request.getHeader('if-modified-since') - if since: - tsince = time.strptime(since.replace(" GMT", "")) - tfrom = time.strptime(time.ctime(os.path.getmtime(self.path))) - if tfrom > tsince: - return File.render_GET(self, request) - else: - request.setResponseCode(304) - return "" - return File.render_GET(self, request) - - -class OpenSSLServerContextFactory(object): - - def getContext(self): - """ - Create an SSL context. - """ - ctx = SSL.Context(SSL.SSLv23_METHOD) - #ctx = SSL.Context(SSL.TLSv1_METHOD) - ctx.use_certificate_file(where('leaptestscert.pem')) - ctx.use_privatekey_file(where('leaptestskey.pem')) - - return ctx - - -def get_provider_factory(): - """ - Instantiates a Site that serves the resources - that we expect from a valid provider. - Listens on: - * port 8000 for http connections - * port 8443 for https connections - - :rparam: factory for a site - :rtype: Site instance - """ - root = Data("", "") - root.putChild("", root) - root.putChild("provider.json", FileModified( - os.path.join(_here, - "test_provider.json"))) - config = Resource() - config.putChild( - "eip-service.json", - FileModified( - os.path.join(_here, "eip-service.json"))) - apiv1 = Resource() - apiv1.putChild("config", config) - apiv1.putChild("sessions", API_Sessions()) - apiv1.putChild("users", FakeUsers(None)) - apiv1.putChild("cert", FileModified( - os.path.join(_here, - 'openvpn.pem'))) - root.putChild("1", apiv1) - - factory = Site(root) - return factory - - -if __name__ == "__main__": - - from twisted.python import log - log.startLogging(sys.stdout) - - factory = get_provider_factory() - - # regular http (for debugging with curl) - reactor.listenTCP(8000, factory) - reactor.listenSSL(8443, factory, OpenSSLServerContextFactory()) - reactor.run() diff --git a/src/leap/crypto/tests/openvpn.pem b/src/leap/crypto/tests/openvpn.pem deleted file mode 100644 index a95e9370..00000000 --- a/src/leap/crypto/tests/openvpn.pem +++ /dev/null @@ -1,33 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIFtTCCA52gAwIBAgIJAIGJ8Dg+DtemMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV -BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX -aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI2MjAyMDIyWhcNMTgwNjI2MjAyMDIyWjBF -MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 -ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC -CgKCAgEAxJaN0lWjFu+3j48c0WG8BvmPUf026Xli5d5NE4EjGsirwfre0oTeWZT9 -WRxqLGd2wDh6Mc9r6UqH6dwqLZKbsgwB5zI2lag7UWFttJF1U1c6AJynhaLMoy73 -sL9USTmQ57iYRFrVP/nGj9/L6I1XnV6midPi7a5aZreH9q8dWaAhmc9eFDU+Y4vS -sTFS6aomajLrI6YWo5toKqLq8IMryD03IM78a7gJtLgfWs+pYZRUBlM5JaYX98eX -mVPAYYH9krWxLVN3hTt1ngECzK+epo275zQJh960/2fNCfVJSXqSXcficLs+bR7t -FEkNuOP1hFV6LuoLL+k5Su+hp5kXMYZTvYYDpW4nPJoBdSG1w5O5IxO6zh+9VLB7 -oLrlgoyWvBoou5coCBpZVU6UyWcOx58kuZF8wNr0GgdvWAFwOGVuVG5jmcVdhaKC -0C8NxHrxlhcrcp0zwtDaOxfmZfcxiXs35iwUip5vS18Nv+XBK8ad9T79Ox8nSzP3 -RGPVDpExz7gPbZglqSe47XBIk0ZuIzgOgYpJj4JrpoewoIYb+OmUgI7UZjoGsMrV -+B2BqOKs7kF0HW3i5bR9YAi0ZYvnhQgjBtwCKm4zvLqwuPZHz9VWgIk6uezgStCP -WyzQ8IcopK49fOjcKa6JT5JRU+27paIZf1BkQsTkJy/Nti4TvwMCAwEAAaOBpzCB -pDAdBgNVHQ4EFgQUEgXSd3Yl3xAzbkWa7xeNe27d99cwdQYDVR0jBG4wbIAUEgXS -d3Yl3xAzbkWa7xeNe27d99ehSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT -b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCB -ifA4Pg7XpjAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQA6Vl9Ve4Qe -ewzXAxr0BabFRhtIuF7DV+/niT46qJhW2KgYe6rwZqdAhEbgH3kTPJ5JmmcUnAEH -nmrfoku/YAb5ObfdHUACsHy4cvSvFwBUQ9vXP6+oOFJhrGW4uzRI2pHGvnqB3lQ0 -JEPmPwduBCI5reRYauPbd4Wl4VhLGrjELb4JQZL24Q5ehXMnv415m7+aMkLzT2IA -p6B2xgRR+JAeUdyCNOV1f5AqJWyAUJPWGR0e1OTKNfc49+2skK0NmzrpGsoktSHa -uN6vGBCVGiZh7BTYblWMG5q9Am7idcdmC2fdpIf5yj7CKzV7WIPxPs0I7TuRcr41 -pUBLCAElcyCPB89lySol2BDs4gk4wZs4y2shUs3o0+mIpw/6o8tQF/9IL8ALkLqr -q9SuND7O1RXcg74o3HeVmRKtoI/KdgaVhJ0rFvcq83ftfu3KMyWB6SOKOu6ZYON8 -AcSjsDDpnDrwGFvjAYHiTkS9NaaJC1/g7Y6jjhxmbTkXPA6V8MvLKQiOvqk/9gCh -85FHsFkElIYnH6fbHIRxg20cnqmddTd+H5HgBIlhiKWuydtuoQFwzR/D3ypgLBaB -OWLcBP7I+RYhKlJFIWnfiyB0xbyI4W/UfL8p8jQI8TE9oIlm3WqxJXfebDEDEstj -8nS4Fb3G5Wr4pZMjfbtmBSAgHeWH6B90jg== ------END CERTIFICATE----- diff --git a/src/leap/crypto/tests/test_provider.json b/src/leap/crypto/tests/test_provider.json deleted file mode 100644 index c37bef8f..00000000 --- a/src/leap/crypto/tests/test_provider.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "api_uri": "https://localhost:8443", - "api_version": "1", - "ca_cert_fingerprint": "SHA256: 0f17c033115f6b76ff67871872303ff65034efe7dd1b910062ca323eb4da5c7e", - "ca_cert_uri": "https://bitmask.net/ca.crt", - "default_language": "en", - "domain": "example.com", - "enrollment_policy": "open", - "name": { - "en": "Bitmask" - }, - "services": [ - "openvpn" - ] -} diff --git a/src/leap/crypto/tests/test_srpauth.py b/src/leap/crypto/tests/test_srpauth.py deleted file mode 100644 index e3258fd3..00000000 --- a/src/leap/crypto/tests/test_srpauth.py +++ /dev/null @@ -1,790 +0,0 @@ -# -*- coding: utf-8 -*- -# test_srpauth.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for: - * leap/crypto/srpauth.py -""" -try: - import unittest2 as unittest -except ImportError: - import unittest -import os -import sys -import binascii -import requests -import mock - -from mock import MagicMock -from nose.twistedtools import reactor, deferred -from twisted.python import log -from twisted.internet import threads -from functools import partial -from requests.models import Response -from simplejson.decoder import JSONDecodeError - -from leap.common.testing.https_server import where -from leap.config.providerconfig import ProviderConfig -from leap.crypto import srpregister, srpauth -from leap.crypto.tests import fake_provider -from leap.util.request_helpers import get_content - -log.startLogging(sys.stdout) - - -def _get_capath(): - return where("cacert.pem") - -_here = os.path.split(__file__)[0] - - -class ImproperlyConfiguredError(Exception): - """ - Raised if the test provider is missing configuration - """ - - -class SRPAuthTestCase(unittest.TestCase): - """ - Tests for the SRPAuth class - """ - __name__ = "SRPAuth tests" - - def setUp(self): - """ - Sets up this TestCase with a simple and faked provider instance: - - * runs a threaded reactor - * loads a mocked ProviderConfig that points to the certs in the - leap.common.testing module. - """ - factory = fake_provider.get_provider_factory() - http = reactor.listenTCP(0, factory) - https = reactor.listenSSL( - 0, factory, - fake_provider.OpenSSLServerContextFactory()) - get_port = lambda p: p.getHost().port - self.http_port = get_port(http) - self.https_port = get_port(https) - - provider = ProviderConfig() - provider.get_ca_cert_path = mock.create_autospec( - provider.get_ca_cert_path) - provider.get_ca_cert_path.return_value = _get_capath() - - provider.get_api_uri = mock.create_autospec( - provider.get_api_uri) - provider.get_api_uri.return_value = self._get_https_uri() - - loaded = provider.load(path=os.path.join( - _here, "test_provider.json")) - if not loaded: - raise ImproperlyConfiguredError( - "Could not load test provider config") - self.register = srpregister.SRPRegister(provider_config=provider) - self.provider = provider - self.TEST_USER = "register_test_auth" - self.TEST_PASS = "pass" - - # Reset the singleton - srpauth.SRPAuth._SRPAuth__instance = None - self.auth = srpauth.SRPAuth(self.provider) - self.auth_backend = self.auth._SRPAuth__instance - - self.old_post = self.auth_backend._session.post - self.old_put = self.auth_backend._session.put - self.old_delete = self.auth_backend._session.delete - - self.old_start_auth = self.auth_backend._start_authentication - self.old_proc_challenge = self.auth_backend._process_challenge - self.old_extract_data = self.auth_backend._extract_data - self.old_verify_session = self.auth_backend._verify_session - self.old_auth_preproc = self.auth_backend._authentication_preprocessing - self.old_get_sid = self.auth_backend.get_session_id - self.old_cookie_get = self.auth_backend._session.cookies.get - self.old_auth = self.auth_backend.authenticate - - def tearDown(self): - self.auth_backend._session.post = self.old_post - self.auth_backend._session.put = self.old_put - self.auth_backend._session.delete = self.old_delete - - self.auth_backend._start_authentication = self.old_start_auth - self.auth_backend._process_challenge = self.old_proc_challenge - self.auth_backend._extract_data = self.old_extract_data - self.auth_backend._verify_session = self.old_verify_session - self.auth_backend._authentication_preprocessing = self.old_auth_preproc - self.auth_backend.get_session_id = self.old_get_sid - self.auth_backend._session.cookies.get = self.old_cookie_get - self.auth_backend.authenticate = self.old_auth - - # helper methods - - def _get_https_uri(self): - """ - Returns a https uri with the right https port initialized - """ - return "https://localhost:%s" % (self.https_port,) - - # Auth tests - - def _prepare_auth_test(self, code=200, side_effect=None): - """ - Creates the needed defers to test several test situations. It - adds up to the auth preprocessing step. - - :param code: status code for the response of POST in requests - :type code: int - :param side_effect: side effect triggered by the POST method - in requests - :type side_effect: some kind of Exception - - :returns: the defer that is created - :rtype: defer.Deferred - """ - res = Response() - res.status_code = code - self.auth_backend._session.post = mock.create_autospec( - self.auth_backend._session.post, - return_value=res, - side_effect=side_effect) - - d = threads.deferToThread(self.register.register_user, - self.TEST_USER, - self.TEST_PASS) - - def wrapper_preproc(*args): - return threads.deferToThread( - self.auth_backend._authentication_preprocessing, - self.TEST_USER, self.TEST_PASS) - - d.addCallback(wrapper_preproc) - - return d - - def test_safe_unhexlify(self): - input_value = "somestring" - test_value = binascii.hexlify(input_value) - self.assertEqual( - self.auth_backend._safe_unhexlify(test_value), - input_value) - - def test_safe_unhexlify_not_raises(self): - input_value = "somestring" - test_value = binascii.hexlify(input_value)[:-1] - - with self.assertRaises(TypeError): - binascii.unhexlify(test_value) - - self.auth_backend._safe_unhexlify(test_value) - - def test_preprocessing_loads_a(self): - self.assertEqual(self.auth_backend._srp_a, None) - self.auth_backend._authentication_preprocessing("user", "pass") - self.assertIsNotNone(self.auth_backend._srp_a) - self.assertTrue(len(self.auth_backend._srp_a) > 0) - - @deferred() - def test_start_authentication(self): - d = threads.deferToThread(self.register.register_user, self.TEST_USER, - self.TEST_PASS) - - def wrapper_preproc(*args): - return threads.deferToThread( - self.auth_backend._authentication_preprocessing, - self.TEST_USER, self.TEST_PASS) - - d.addCallback(wrapper_preproc) - - def wrapper(_): - return threads.deferToThread( - self.auth_backend._start_authentication, - None, self.TEST_USER) - - d.addCallback(wrapper) - return d - - @deferred() - def test_start_authentication_fails_connerror(self): - d = self._prepare_auth_test( - side_effect=requests.exceptions.ConnectionError()) - - def wrapper(_): - with self.assertRaises(srpauth.SRPAuthConnectionError): - self.auth_backend._start_authentication(None, self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_start_authentication_fails_any_error(self): - d = self._prepare_auth_test(side_effect=Exception()) - - def wrapper(_): - with self.assertRaises(srpauth.SRPAuthenticationError): - self.auth_backend._start_authentication(None, self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_start_authentication_fails_unknown_user(self): - d = self._prepare_auth_test(422) - - def wrapper(_): - with self.assertRaises(srpauth.SRPAuthUnknownUser): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("{}", 0) - - self.auth_backend._start_authentication( - None, self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_start_authentication_fails_errorcode(self): - d = self._prepare_auth_test(302) - - def wrapper(_): - with self.assertRaises(srpauth.SRPAuthBadStatusCode): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("{}", 0) - - self.auth_backend._start_authentication(None, - self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_start_authentication_fails_no_salt(self): - d = self._prepare_auth_test(200) - - def wrapper(_): - with self.assertRaises(srpauth.SRPAuthNoSalt): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("{}", 0) - - self.auth_backend._start_authentication(None, - self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_start_authentication_fails_no_B(self): - d = self._prepare_auth_test(200) - - def wrapper(_): - with self.assertRaises(srpauth.SRPAuthNoB): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ('{"salt": ""}', 0) - - self.auth_backend._start_authentication(None, - self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_start_authentication_correct_saltb(self): - d = self._prepare_auth_test(200) - - test_salt = "12345" - test_B = "67890" - - def wrapper(_): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ('{"salt":"%s", "B":"%s"}' % (test_salt, - test_B), - 0) - - salt, B = self.auth_backend._start_authentication( - None, - self.TEST_USER) - self.assertEqual(salt, test_salt) - self.assertEqual(B, test_B) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - def _prepare_auth_challenge(self): - """ - Creates the needed defers to test several test situations. It - adds up to the start authentication step. - - :returns: the defer that is created - :rtype: defer.Deferred - """ - d = threads.deferToThread(self.register.register_user, - self.TEST_USER, - self.TEST_PASS) - - def wrapper_preproc(*args): - return threads.deferToThread( - self.auth_backend._authentication_preprocessing, - self.TEST_USER, self.TEST_PASS) - - d.addCallback(wrapper_preproc) - - def wrapper_start(*args): - return threads.deferToThread( - self.auth_backend._start_authentication, - None, self.TEST_USER) - - d.addCallback(wrapper_start) - - return d - - @deferred() - def test_process_challenge_wrong_saltb(self): - d = self._prepare_auth_challenge() - - def wrapper(salt_B): - with self.assertRaises(srpauth.SRPAuthBadDataFromServer): - self.auth_backend._process_challenge("", - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - return d - - @deferred() - def test_process_challenge_requests_problem_raises(self): - d = self._prepare_auth_challenge() - - self.auth_backend._session.put = mock.create_autospec( - self.auth_backend._session.put, - side_effect=requests.exceptions.ConnectionError()) - - def wrapper(salt_B): - with self.assertRaises(srpauth.SRPAuthConnectionError): - self.auth_backend._process_challenge(salt_B, - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - - return d - - @deferred() - def test_process_challenge_json_decode_error(self): - d = self._prepare_auth_challenge() - - def wrapper(salt_B): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("{", 0) - content.side_effect = JSONDecodeError("", "", 0) - - with self.assertRaises(srpauth.SRPAuthJSONDecodeError): - self.auth_backend._process_challenge( - salt_B, - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - - return d - - @deferred() - def test_process_challenge_bad_password(self): - d = self._prepare_auth_challenge() - - res = Response() - res.status_code = 422 - self.auth_backend._session.put = mock.create_autospec( - self.auth_backend._session.put, - return_value=res) - - def wrapper(salt_B): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("", 0) - with self.assertRaises(srpauth.SRPAuthBadPassword): - self.auth_backend._process_challenge( - salt_B, - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - - return d - - @deferred() - def test_process_challenge_bad_password2(self): - d = self._prepare_auth_challenge() - - res = Response() - res.status_code = 422 - self.auth_backend._session.put = mock.create_autospec( - self.auth_backend._session.put, - return_value=res) - - def wrapper(salt_B): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("[]", 0) - with self.assertRaises(srpauth.SRPAuthBadPassword): - self.auth_backend._process_challenge( - salt_B, - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - - return d - - @deferred() - def test_process_challenge_other_error_code(self): - d = self._prepare_auth_challenge() - - res = Response() - res.status_code = 300 - self.auth_backend._session.put = mock.create_autospec( - self.auth_backend._session.put, - return_value=res) - - def wrapper(salt_B): - with mock.patch('leap.util.request_helpers.get_content', - new=mock.create_autospec(get_content)) as \ - content: - content.return_value = ("{}", 0) - with self.assertRaises(srpauth.SRPAuthBadStatusCode): - self.auth_backend._process_challenge( - salt_B, - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - - return d - - @deferred() - def test_process_challenge(self): - d = self._prepare_auth_challenge() - - def wrapper(salt_B): - self.auth_backend._process_challenge(salt_B, - username=self.TEST_USER) - - d.addCallback(partial(threads.deferToThread, wrapper)) - - return d - - def test_extract_data_wrong_data(self): - with self.assertRaises(srpauth.SRPAuthBadDataFromServer): - self.auth_backend._extract_data(None) - - with self.assertRaises(srpauth.SRPAuthBadDataFromServer): - self.auth_backend._extract_data("") - - def test_extract_data_fails_on_wrong_data_from_server(self): - with self.assertRaises(srpauth.SRPAuthBadDataFromServer): - self.auth_backend._extract_data({}) - - with self.assertRaises(srpauth.SRPAuthBadDataFromServer): - self.auth_backend._extract_data({"M2": ""}) - - def test_extract_data_sets_uidtoken(self): - test_uid = "someuid" - test_m2 = "somem2" - test_token = "sometoken" - test_data = { - "M2": test_m2, - "id": test_uid, - "token": test_token - } - m2 = self.auth_backend._extract_data(test_data) - - self.assertEqual(m2, test_m2) - self.assertEqual(self.auth_backend.get_uid(), test_uid) - self.assertEqual(self.auth_backend.get_uid(), - self.auth.get_uid()) - self.assertEqual(self.auth_backend.get_token(), test_token) - self.assertEqual(self.auth_backend.get_token(), - self.auth.get_token()) - - def _prepare_verify_session(self): - """ - Prepares the tests for verify session with needed steps - before. It adds up to the extract_data step. - - :returns: The defer to chain to - :rtype: defer.Deferred - """ - d = self._prepare_auth_challenge() - - def wrapper_proc_challenge(salt_B): - return self.auth_backend._process_challenge( - salt_B, - username=self.TEST_USER) - - def wrapper_extract_data(data): - return self.auth_backend._extract_data(data) - - d.addCallback(partial(threads.deferToThread, wrapper_proc_challenge)) - d.addCallback(partial(threads.deferToThread, wrapper_extract_data)) - - return d - - @deferred() - def test_verify_session_unhexlifiable_m2(self): - d = self._prepare_verify_session() - - def wrapper(M2): - with self.assertRaises(srpauth.SRPAuthBadDataFromServer): - self.auth_backend._verify_session("za") # unhexlifiable value - - d.addCallback(wrapper) - - return d - - @deferred() - def test_verify_session_unverifiable_m2(self): - d = self._prepare_verify_session() - - def wrapper(M2): - with self.assertRaises(srpauth.SRPAuthVerificationFailed): - # Correctly unhelifiable value, but not for verifying the - # session - self.auth_backend._verify_session("abc12") - - d.addCallback(wrapper) - - return d - - @deferred() - def test_verify_session_fails_on_no_session_id(self): - d = self._prepare_verify_session() - - def wrapper(M2): - self.auth_backend._session.cookies.get = mock.create_autospec( - self.auth_backend._session.cookies.get, - return_value=None) - with self.assertRaises(srpauth.SRPAuthNoSessionId): - self.auth_backend._verify_session(M2) - - d.addCallback(wrapper) - - return d - - @deferred() - def test_verify_session_session_id(self): - d = self._prepare_verify_session() - - test_session_id = "12345" - - def wrapper(M2): - self.auth_backend._session.cookies.get = mock.create_autospec( - self.auth_backend._session.cookies.get, - return_value=test_session_id) - self.auth_backend._verify_session(M2) - self.assertEqual(self.auth_backend.get_session_id(), - test_session_id) - self.assertEqual(self.auth_backend.get_session_id(), - self.auth.get_session_id()) - - d.addCallback(wrapper) - - return d - - @deferred() - def test_verify_session(self): - d = self._prepare_verify_session() - - def wrapper(M2): - self.auth_backend._verify_session(M2) - - d.addCallback(wrapper) - - return d - - @deferred() - def test_authenticate(self): - self.auth_backend._authentication_preprocessing = mock.create_autospec( - self.auth_backend._authentication_preprocessing, - return_value=None) - self.auth_backend._start_authentication = mock.create_autospec( - self.auth_backend._start_authentication, - return_value=None) - self.auth_backend._process_challenge = mock.create_autospec( - self.auth_backend._process_challenge, - return_value=None) - self.auth_backend._extract_data = mock.create_autospec( - self.auth_backend._extract_data, - return_value=None) - self.auth_backend._verify_session = mock.create_autospec( - self.auth_backend._verify_session, - return_value=None) - - d = self.auth_backend.authenticate(self.TEST_USER, self.TEST_PASS) - - def check(*args): - self.auth_backend._authentication_preprocessing.\ - assert_called_once_with( - username=self.TEST_USER, - password=self.TEST_PASS - ) - self.auth_backend._start_authentication.assert_called_once_with( - None, - username=self.TEST_USER) - self.auth_backend._process_challenge.assert_called_once_with( - None, - username=self.TEST_USER) - self.auth_backend._extract_data.assert_called_once_with( - None) - self.auth_backend._verify_session.assert_called_once_with(None) - - d.addCallback(check) - - return d - - @deferred() - def test_logout_fails_if_not_logged_in(self): - - def wrapper(*args): - with self.assertRaises(AssertionError): - self.auth_backend.logout() - - d = threads.deferToThread(wrapper) - return d - - @deferred() - def test_logout_traps_delete(self): - self.auth_backend.get_session_id = mock.create_autospec( - self.auth_backend.get_session_id, - return_value="1234") - self.auth_backend._session.delete = mock.create_autospec( - self.auth_backend._session.delete, - side_effect=Exception()) - - def wrapper(*args): - self.auth_backend.logout() - - d = threads.deferToThread(wrapper) - return d - - @deferred() - def test_logout_clears(self): - self.auth_backend._session_id = "1234" - - def wrapper(*args): - old_session = self.auth_backend._session - self.auth_backend.logout() - self.assertIsNone(self.auth_backend.get_session_id()) - self.assertIsNone(self.auth_backend.get_uid()) - self.assertNotEqual(old_session, self.auth_backend._session) - - d = threads.deferToThread(wrapper) - return d - - -class SRPAuthSingletonTestCase(unittest.TestCase): - def setUp(self): - self.old_auth = srpauth.SRPAuth._SRPAuth__impl.authenticate - - def tearDown(self): - srpauth.SRPAuth._SRPAuth__impl.authenticate = self.old_auth - - def test_singleton(self): - obj1 = srpauth.SRPAuth(ProviderConfig()) - obj2 = srpauth.SRPAuth(ProviderConfig()) - self.assertEqual(obj1._SRPAuth__instance, obj2._SRPAuth__instance) - - @deferred() - def test_authenticate_notifies_gui(self): - auth = srpauth.SRPAuth(ProviderConfig()) - auth._SRPAuth__instance.authenticate = mock.create_autospec( - auth._SRPAuth__instance.authenticate, - return_value=threads.deferToThread(lambda: None)) - auth._gui_notify = mock.create_autospec( - auth._gui_notify) - - d = auth.authenticate("", "") - - def check(*args): - auth._gui_notify.assert_called_once_with(None) - - d.addCallback(check) - return d - - @deferred() - def test_authenticate_errsback(self): - auth = srpauth.SRPAuth(ProviderConfig()) - auth._SRPAuth__instance.authenticate = mock.create_autospec( - auth._SRPAuth__instance.authenticate, - return_value=threads.deferToThread(MagicMock( - side_effect=Exception()))) - auth._gui_notify = mock.create_autospec( - auth._gui_notify) - auth._errback = mock.create_autospec( - auth._errback) - - d = auth.authenticate("", "") - - def check(*args): - self.assertFalse(auth._gui_notify.called) - self.assertEqual(auth._errback.call_count, 1) - - d.addCallback(check) - return d - - @deferred() - def test_authenticate_runs_cleanly_when_raises(self): - auth = srpauth.SRPAuth(ProviderConfig()) - auth._SRPAuth__instance.authenticate = mock.create_autospec( - auth._SRPAuth__instance.authenticate, - return_value=threads.deferToThread(MagicMock( - side_effect=Exception()))) - - d = auth.authenticate("", "") - - return d - - @deferred() - def test_authenticate_runs_cleanly(self): - auth = srpauth.SRPAuth(ProviderConfig()) - auth._SRPAuth__instance.authenticate = mock.create_autospec( - auth._SRPAuth__instance.authenticate, - return_value=threads.deferToThread(MagicMock())) - - d = auth.authenticate("", "") - - return d - - def test_logout(self): - auth = srpauth.SRPAuth(ProviderConfig()) - auth._SRPAuth__instance.logout = mock.create_autospec( - auth._SRPAuth__instance.logout) - - self.assertTrue(auth.logout()) - - def test_logout_rets_false_when_raises(self): - auth = srpauth.SRPAuth(ProviderConfig()) - auth._SRPAuth__instance.logout = mock.create_autospec( - auth._SRPAuth__instance.logout, - side_effect=Exception()) - - self.assertFalse(auth.logout()) diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py deleted file mode 100644 index 66b815f2..00000000 --- a/src/leap/crypto/tests/test_srpregister.py +++ /dev/null @@ -1,201 +0,0 @@ -# -*- coding: utf-8 -*- -# test_srpregister.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Tests for: - * leap/crypto/srpregister.py -""" -try: - import unittest2 as unittest -except ImportError: - import unittest -import os -import sys - -from mock import MagicMock -from nose.twistedtools import reactor, deferred -from twisted.python import log -from twisted.internet import threads - -from leap.common.testing.https_server import where -from leap.config.providerconfig import ProviderConfig -from leap.crypto import srpregister, srpauth -from leap.crypto.tests import fake_provider - -log.startLogging(sys.stdout) - - -def _get_capath(): - return where("cacert.pem") - -_here = os.path.split(__file__)[0] - - -class ImproperlyConfiguredError(Exception): - """ - Raised if the test provider is missing configuration - """ - - -class SRPTestCase(unittest.TestCase): - """ - Tests for the SRPRegister class - """ - __name__ = "SRPRegister tests" - - @classmethod - def setUpClass(cls): - """ - Sets up this TestCase with a simple and faked provider instance: - - * runs a threaded reactor - * loads a mocked ProviderConfig that points to the certs in the - leap.common.testing module. - """ - factory = fake_provider.get_provider_factory() - http = reactor.listenTCP(8001, factory) - https = reactor.listenSSL( - 0, factory, - fake_provider.OpenSSLServerContextFactory()) - get_port = lambda p: p.getHost().port - cls.http_port = get_port(http) - cls.https_port = get_port(https) - - provider = ProviderConfig() - provider.get_ca_cert_path = MagicMock() - provider.get_ca_cert_path.return_value = _get_capath() - - provider.get_api_uri = MagicMock() - provider.get_api_uri.return_value = cls._get_https_uri() - - loaded = provider.load(path=os.path.join( - _here, "test_provider.json")) - if not loaded: - raise ImproperlyConfiguredError( - "Could not load test provider config") - cls.register = srpregister.SRPRegister(provider_config=provider) - - cls.auth = srpauth.SRPAuth(provider) - - # helper methods - - @classmethod - def _get_https_uri(cls): - """ - Returns a https uri with the right https port initialized - """ - return "https://localhost:%s" % (cls.https_port,) - - # Register tests - - def test_none_port(self): - provider = ProviderConfig() - provider.get_api_uri = MagicMock() - provider.get_api_uri.return_value = "http://localhost/" - loaded = provider.load(path=os.path.join( - _here, "test_provider.json")) - if not loaded: - raise ImproperlyConfiguredError( - "Could not load test provider config") - - register = srpregister.SRPRegister(provider_config=provider) - self.assertEquals(register._port, "443") - - @deferred() - def test_wrong_cert(self): - provider = ProviderConfig() - loaded = provider.load(path=os.path.join( - _here, "test_provider.json")) - provider.get_ca_cert_path = MagicMock() - provider.get_ca_cert_path.return_value = os.path.join( - _here, - "wrongcert.pem") - provider.get_api_uri = MagicMock() - provider.get_api_uri.return_value = self._get_https_uri() - if not loaded: - raise ImproperlyConfiguredError( - "Could not load test provider config") - - register = srpregister.SRPRegister(provider_config=provider) - d = threads.deferToThread(register.register_user, "foouser_firsttime", - "barpass") - d.addCallback(self.assertFalse) - return d - - @deferred() - def test_register_user(self): - """ - Checks if the registration of an unused name works as expected when - it is the first time that we attempt to register that user, as well as - when we request a user that is taken. - """ - # pristine registration - d = threads.deferToThread(self.register.register_user, - "foouser_firsttime", - "barpass") - d.addCallback(self.assertTrue) - return d - - @deferred() - def test_second_register_user(self): - # second registration attempt with the same user should return errors - d = threads.deferToThread(self.register.register_user, - "foouser_second", - "barpass") - d.addCallback(self.assertTrue) - - # FIXME currently we are catching this in an upper layer, - # we could bring the error validation to the SRPRegister class - def register_wrapper(_): - return threads.deferToThread(self.register.register_user, - "foouser_second", - "barpass") - d.addCallback(register_wrapper) - d.addCallback(self.assertFalse) - return d - - @deferred() - def test_correct_http_uri(self): - """ - Checks that registration autocorrect http uris to https ones. - """ - HTTP_URI = "http://localhost:%s" % (self.https_port, ) - HTTPS_URI = "https://localhost:%s/1/users" % (self.https_port, ) - provider = ProviderConfig() - provider.get_ca_cert_path = MagicMock() - provider.get_ca_cert_path.return_value = _get_capath() - provider.get_api_uri = MagicMock() - - # we introduce a http uri in the config file... - provider.get_api_uri.return_value = HTTP_URI - loaded = provider.load(path=os.path.join( - _here, "test_provider.json")) - if not loaded: - raise ImproperlyConfiguredError( - "Could not load test provider config") - - register = srpregister.SRPRegister(provider_config=provider) - - # ... and we check that we're correctly taking the HTTPS protocol - # instead - reg_uri = register._get_registration_uri() - self.assertEquals(reg_uri, HTTPS_URI) - register._get_registration_uri = MagicMock(return_value=HTTPS_URI) - d = threads.deferToThread(register.register_user, "test_failhttp", - "barpass") - d.addCallback(self.assertTrue) - - return d diff --git a/src/leap/crypto/tests/wrongcert.pem b/src/leap/crypto/tests/wrongcert.pem deleted file mode 100644 index e6cff38a..00000000 --- a/src/leap/crypto/tests/wrongcert.pem +++ /dev/null @@ -1,33 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIFtTCCA52gAwIBAgIJAIWZus5EIXNtMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV -BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX -aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI1MTc0NjExWhcNMTgwNjI1MTc0NjExWjBF -MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 -ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC -CgKCAgEA2ObM7ESjyuxFZYD/Y68qOPQgjgggW+cdXfBpU2p4n7clsrUeMhWdW40Y -77Phzor9VOeqs3ZpHuyLzsYVp/kFDm8tKyo2ah5fJwzL0VCSLYaZkUQQ7GNUmTCk -furaxl8cQx/fg395V7/EngsS9B3/y5iHbctbA4MnH3jaotO5EGeo6hw7/eyCotQ9 -KbBV9GJMcY94FsXBCmUB+XypKklWTLhSaS6Cu4Fo8YLW6WmcnsyEOGS2F7WVf5at -7CBWFQZHaSgIBLmc818/mDYCnYmCVMFn/6Ndx7V2NTlz+HctWrQn0dmIOnCUeCwS -wXq9PnBR1rSx/WxwyF/WpyjOFkcIo7vm72kS70pfrYsXcZD4BQqkXYj3FyKnPt3O -ibLKtCxL8/83wOtErPcYpG6LgFkgAAlHQ9MkUi5dbmjCJtpqQmlZeK1RALdDPiB3 -K1KZimrGsmcE624dJxUIOJJpuwJDy21F8kh5ZAsAtE1prWETrQYNElNFjQxM83rS -ZR1Ql2MPSB4usEZT57+KvpEzlOnAT3elgCg21XrjSFGi14hCEao4g2OEZH5GAwm5 -frf6UlSRZ/g3tLTfI8Hv1prw15W2qO+7q7SBAplTODCRk+Yb0YoA2mMM/QXBUcXs -vKEDLSSxzNIBi3T62l39RB/ml+gPKo87ZMDivex1ZhrcJc3Yu3sCAwEAAaOBpzCB -pDAdBgNVHQ4EFgQUPjE+4pun+8FreIdpoR8v6N7xKtUwdQYDVR0jBG4wbIAUPjE+ -4pun+8FreIdpoR8v6N7xKtWhSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT -b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCF -mbrORCFzbTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQCpvCPdtvXJ -muTj379TZuCJs7/l0FhA7AHa1WAlHjsXHaA7N0+3ZWAbdtXDsowal6S+ldgU/kfV -Lq7NrRq+amJWC7SYj6cvVwhrSwSvu01fe/TWuOzHrRv1uTfJ/VXLonVufMDd9opo -bhqYxMaxLdIx6t/MYmZH4Wpiq0yfZuv//M8i7BBl/qvaWbLhg0yVAKRwjFvf59h6 -6tRFCLddELOIhLDQtk8zMbioPEbfAlKdwwP8kYGtDGj6/9/YTd/oTKRdgHuwyup3 -m0L20Y6LddC+tb0WpK5EyrNbCbEqj1L4/U7r6f/FKNA3bx6nfdXbscaMfYonKAKg -1cRrRg45sErmCz0QyTnWzXyvbjR4oQRzyW3kJ1JZudZ+AwOi00J5FYa3NiLuxl1u -gIGKWSrASQWhEdpa1nlCgX7PhdaQgYjEMpQvA0GCA0OF5JDu8en1yZqsOt1hCLIN -lkz/5jKPqrclY5hV99bE3hgCHRmIPNHCZG3wbZv2yJKxJX1YLMmQwAmSh2N7YwGG -yXRvCxQs5ChPHyRairuf/5MZCZnSVb45ppTVuNUijsbflKRUgfj/XvfqQ22f+C9N -Om2dmNvAiS2TOIfuP47CF2OUa5q4plUwmr+nyXQGM0SIoHNCj+MBdFfb3oxxAtI+ -SLhbnzQv5e84Doqz3YF0XW8jyR7q8GFLNA== ------END CERTIFICATE----- -- cgit v1.2.3