diff options
Diffstat (limited to 'src/leap/crypto')
-rw-r--r-- | src/leap/crypto/certs.py | 112 | ||||
-rw-r--r-- | src/leap/crypto/certs_gnutls.py | 112 | ||||
-rw-r--r-- | src/leap/crypto/constants.py | 18 | ||||
-rw-r--r-- | src/leap/crypto/leapkeyring.py | 70 | ||||
-rw-r--r-- | src/leap/crypto/srpauth.py | 496 | ||||
-rw-r--r-- | src/leap/crypto/srpregister.py | 163 | ||||
-rw-r--r-- | src/leap/crypto/tests/__init__.py | 16 | ||||
-rw-r--r-- | src/leap/crypto/tests/eip-service.json | 43 | ||||
-rwxr-xr-x | src/leap/crypto/tests/fake_provider.py | 376 | ||||
-rw-r--r-- | src/leap/crypto/tests/openvpn.pem | 33 | ||||
-rw-r--r-- | src/leap/crypto/tests/test_certs.py | 22 | ||||
-rw-r--r-- | src/leap/crypto/tests/test_provider.json | 15 | ||||
-rw-r--r-- | src/leap/crypto/tests/test_srpregister.py | 202 | ||||
-rw-r--r-- | src/leap/crypto/tests/wrongcert.pem | 33 |
14 files changed, 1395 insertions, 316 deletions
diff --git a/src/leap/crypto/certs.py b/src/leap/crypto/certs.py deleted file mode 100644 index cbb5725a..00000000 --- a/src/leap/crypto/certs.py +++ /dev/null @@ -1,112 +0,0 @@ -import logging -import os -from StringIO import StringIO -import ssl -import time - -from dateutil.parser import parse -from OpenSSL import crypto - -from leap.util.misc import null_check - -logger = logging.getLogger(__name__) - - -class BadCertError(Exception): - """ - raised for malformed certs - """ - - -class NoCertError(Exception): - """ - raised for cert not found in given path - """ - - -def get_https_cert_from_domain(domain, port=443): - """ - @param domain: a domain name to get a certificate from. - """ - cert = ssl.get_server_certificate((domain, port)) - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - return x509 - - -def get_cert_from_file(_file): - null_check(_file, "pem file") - if isinstance(_file, (str, unicode)): - if not os.path.isfile(_file): - raise NoCertError - with open(_file) as f: - cert = f.read() - else: - cert = _file.read() - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - return x509 - - -def get_pkey_from_file(_file): - getkey = lambda f: crypto.load_privatekey( - crypto.FILETYPE_PEM, f.read()) - - if isinstance(_file, str): - with open(_file) as f: - key = getkey(f) - else: - key = getkey(_file) - return key - - -def can_load_cert_and_pkey(string): - """ - loads certificate and private key from - a buffer - """ - try: - f = StringIO(string) - cert = get_cert_from_file(f) - - f = StringIO(string) - key = get_pkey_from_file(f) - - null_check(cert, 'certificate') - null_check(key, 'private key') - except Exception as exc: - logger.error(type(exc), exc.message) - raise BadCertError - else: - return True - - -def get_cert_fingerprint(domain=None, port=443, filepath=None, - hash_type="SHA256", sep=":"): - """ - @param domain: a domain name to get a fingerprint from - @type domain: str - @param filepath: path to a file containing a PEM file - @type filepath: str - @param hash_type: the hash function to be used in the fingerprint. - must be one of SHA1, SHA224, SHA256, SHA384, SHA512 - @type hash_type: str - @rparam: hex_fpr, a hexadecimal representation of a bytestring - containing the fingerprint. - @rtype: string - """ - if domain: - cert = get_https_cert_from_domain(domain, port=port) - if filepath: - cert = get_cert_from_file(filepath) - hex_fpr = cert.digest(hash_type) - return hex_fpr - - -def get_time_boundaries(certfile): - cert = get_cert_from_file(certfile) - null_check(cert, 'certificate') - - fromts, tots = (cert.get_notBefore(), cert.get_notAfter()) - from_, to_ = map( - lambda ts: time.gmtime(time.mktime(parse(ts).timetuple())), - (fromts, tots)) - return from_, to_ diff --git a/src/leap/crypto/certs_gnutls.py b/src/leap/crypto/certs_gnutls.py deleted file mode 100644 index 20c0e043..00000000 --- a/src/leap/crypto/certs_gnutls.py +++ /dev/null @@ -1,112 +0,0 @@ -''' -We're using PyOpenSSL now - -import ctypes -from StringIO import StringIO -import socket - -import gnutls.connection -import gnutls.crypto -import gnutls.library - -from leap.util.misc import null_check - - -class BadCertError(Exception): - """raised for malformed certs""" - - -def get_https_cert_from_domain(domain): - """ - @param domain: a domain name to get a certificate from. - """ - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - cred = gnutls.connection.X509Credentials() - - session = gnutls.connection.ClientSession(sock, cred) - session.connect((domain, 443)) - session.handshake() - cert = session.peer_certificate - return cert - - -def get_cert_from_file(_file): - getcert = lambda f: gnutls.crypto.X509Certificate(f.read()) - if isinstance(_file, str): - with open(_file) as f: - cert = getcert(f) - else: - cert = getcert(_file) - return cert - - -def get_pkey_from_file(_file): - getkey = lambda f: gnutls.crypto.X509PrivateKey(f.read()) - if isinstance(_file, str): - with open(_file) as f: - key = getkey(f) - else: - key = getkey(_file) - return key - - -def can_load_cert_and_pkey(string): - try: - f = StringIO(string) - cert = get_cert_from_file(f) - - f = StringIO(string) - key = get_pkey_from_file(f) - - null_check(cert, 'certificate') - null_check(key, 'private key') - except: - # XXX catch GNUTLSError? - raise BadCertError - else: - return True - -def get_cert_fingerprint(domain=None, filepath=None, - hash_type="SHA256", sep=":"): - """ - @param domain: a domain name to get a fingerprint from - @type domain: str - @param filepath: path to a file containing a PEM file - @type filepath: str - @param hash_type: the hash function to be used in the fingerprint. - must be one of SHA1, SHA224, SHA256, SHA384, SHA512 - @type hash_type: str - @rparam: hex_fpr, a hexadecimal representation of a bytestring - containing the fingerprint. - @rtype: string - """ - if domain: - cert = get_https_cert_from_domain(domain) - if filepath: - cert = get_cert_from_file(filepath) - - _buffer = ctypes.create_string_buffer(64) - buffer_length = ctypes.c_size_t(64) - - SUPPORTED_DIGEST_FUN = ("SHA1", "SHA224", "SHA256", "SHA384", "SHA512") - if hash_type in SUPPORTED_DIGEST_FUN: - digestfunction = getattr( - gnutls.library.constants, - "GNUTLS_DIG_%s" % hash_type) - else: - # XXX improperlyconfigured or something - raise Exception("digest function not supported") - - gnutls.library.functions.gnutls_x509_crt_get_fingerprint( - cert._c_object, digestfunction, - ctypes.byref(_buffer), ctypes.byref(buffer_length)) - - # deinit - #server_cert._X509Certificate__deinit(server_cert._c_object) - # needed? is segfaulting - - fpr = ctypes.string_at(_buffer, buffer_length.value) - hex_fpr = sep.join(u"%02X" % ord(char) for char in fpr) - - return hex_fpr -''' diff --git a/src/leap/crypto/constants.py b/src/leap/crypto/constants.py new file mode 100644 index 00000000..c5eaef1f --- /dev/null +++ b/src/leap/crypto/constants.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# constants.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +SIGNUP_TIMEOUT = 5 diff --git a/src/leap/crypto/leapkeyring.py b/src/leap/crypto/leapkeyring.py deleted file mode 100644 index c241d0bc..00000000 --- a/src/leap/crypto/leapkeyring.py +++ /dev/null @@ -1,70 +0,0 @@ -import keyring - -from leap.base.config import get_config_file - -############# -# Disclaimer -############# -# This currently is not a keyring, it's more like a joke. -# No, seriously. -# We're affected by this **bug** - -# https://bitbucket.org/kang/python-keyring-lib/ -# issue/65/dbusexception-method-opensession-with - -# so using the gnome keyring does not seem feasible right now. -# I thought this was the next best option to store secrets in plain sight. - -# in the future we should move to use the gnome/kde/macosx/win keyrings. - - -class LeapCryptedFileKeyring(keyring.backend.CryptedFileKeyring): - - filename = ".secrets" - - @property - def file_path(self): - return get_config_file(self.filename) - - def __init__(self, seed=None): - self.seed = seed - - def _get_new_password(self): - # XXX every time this method is called, - # $deity kills a kitten. - return "secret%s" % self.seed - - def _init_file(self): - self.keyring_key = self._get_new_password() - self.set_password('keyring_setting', 'pass_ref', 'pass_ref_value') - - def _unlock(self): - self.keyring_key = self._get_new_password() - print 'keyring key ', self.keyring_key - try: - ref_pw = self.get_password( - 'keyring_setting', - 'pass_ref') - print 'ref pw ', ref_pw - assert ref_pw == "pass_ref_value" - except AssertionError: - self._lock() - raise ValueError('Incorrect password') - - -def leap_set_password(key, value, seed="xxx"): - key, value = map(unicode, (key, value)) - keyring.set_keyring(LeapCryptedFileKeyring(seed=seed)) - keyring.set_password('leap', key, value) - - -def leap_get_password(key, seed="xxx"): - keyring.set_keyring(LeapCryptedFileKeyring(seed=seed)) - #import ipdb;ipdb.set_trace() - return keyring.get_password('leap', key) - - -if __name__ == "__main__": - leap_set_password('test', 'bar') - passwd = leap_get_password('test') - assert passwd == 'bar' diff --git a/src/leap/crypto/srpauth.py b/src/leap/crypto/srpauth.py new file mode 100644 index 00000000..0e95ae64 --- /dev/null +++ b/src/leap/crypto/srpauth.py @@ -0,0 +1,496 @@ +# -*- coding: utf-8 -*- +# srpauth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import binascii +import logging + +import requests +import srp +import json + +#this error is raised from requests +from simplejson.decoder import JSONDecodeError +from functools import partial + +from PySide import QtCore +from twisted.internet import threads + +from leap.common.check import leap_assert +from leap.util.request_helpers import get_content +from leap.common.events import signal as events_signal +from leap.common.events import events_pb2 as proto + +logger = logging.getLogger(__name__) + + +class SRPAuthenticationError(Exception): + """ + Exception raised for authentication errors + """ + pass + + +class SRPAuth(QtCore.QObject): + """ + SRPAuth singleton + """ + + class __impl(QtCore.QObject): + """ + Implementation of the SRPAuth interface + """ + + LOGIN_KEY = "login" + A_KEY = "A" + CLIENT_AUTH_KEY = "client_auth" + SESSION_ID_KEY = "_session_id" + + def __init__(self, provider_config): + """ + Constructor for SRPAuth implementation + + :param server: Server to which we will authenticate + :type server: str + """ + QtCore.QObject.__init__(self) + + leap_assert(provider_config, + "We need a provider config to authenticate") + + self._provider_config = provider_config + + # **************************************************** # + # Dependency injection helpers, override this for more + # granular testing + self._fetcher = requests + self._srp = srp + self._hashfun = self._srp.SHA256 + self._ng = self._srp.NG_1024 + # **************************************************** # + + self._session = self._fetcher.session() + self._session_id = None + self._session_id_lock = QtCore.QMutex() + self._uid = None + self._uid_lock = QtCore.QMutex() + self._token = None + self._token_lock = QtCore.QMutex() + + self._srp_user = None + self._srp_a = None + + def _safe_unhexlify(self, val): + """ + Rounds the val to a multiple of 2 and returns the + unhexlified value + + :param val: hexlified value + :type val: str + + :rtype: binary hex data + :return: unhexlified val + """ + return binascii.unhexlify(val) \ + if (len(val) % 2 == 0) else binascii.unhexlify('0' + val) + + def _authentication_preprocessing(self, username, password): + """ + Generates the SRP.User to get the A SRP parameter + + :param username: username to login + :type username: str + :param password: password for the username + :type password: str + """ + logger.debug("Authentication preprocessing...") + self._srp_user = self._srp.User(username, + password, + self._hashfun, + self._ng) + _, A = self._srp_user.start_authentication() + + self._srp_a = A + + def _start_authentication(self, _, username, password): + """ + Sends the first request for authentication to retrieve the + salt and B parameter + + Might raise SRPAuthenticationError + + :param _: IGNORED, output from the previous callback (None) + :type _: IGNORED + :param username: username to login + :type username: str + :param password: password for the username + :type password: str + + :return: salt and B parameters + :rtype: tuple + """ + logger.debug("Starting authentication process...") + try: + auth_data = { + self.LOGIN_KEY: username, + self.A_KEY: binascii.hexlify(self._srp_a) + } + sessions_url = "%s/%s/%s/" % \ + (self._provider_config.get_api_uri(), + self._provider_config.get_api_version(), + "sessions") + init_session = self._session.post(sessions_url, + data=auth_data, + verify=self._provider_config. + get_ca_cert_path()) + except requests.exceptions.ConnectionError as e: + logger.error("No connection made (salt): %r" % + (e,)) + raise SRPAuthenticationError("Could not establish a " + "connection") + except Exception as e: + logger.error("Unknown error: %r" % (e,)) + raise SRPAuthenticationError("Unknown error: %r" % + (e,)) + + content, mtime = get_content(init_session) + + if init_session.status_code not in (200,): + logger.error("No valid response (salt): " + "Status code = %r. Content: %r" % + (init_session.status_code, content)) + if init_session.status_code == 422: + raise SRPAuthenticationError(self.tr("Unknown user")) + + json_content = json.loads(content) + salt = json_content.get("salt", None) + B = json_content.get("B", None) + + if salt is None: + logger.error("No salt parameter sent") + raise SRPAuthenticationError(self.tr("The server did not send " + "the salt parameter")) + if B is None: + logger.error("No B parameter sent") + raise SRPAuthenticationError(self.tr("The server did not send " + "the B parameter")) + + return salt, B + + def _process_challenge(self, salt_B, username): + """ + Given the salt and B processes the auth challenge and + generates the M2 parameter + + Might throw SRPAuthenticationError + + :param salt_B: salt and B parameters for the username + :type salt_B: tuple + :param username: username for this session + :type username: str + + :return: the M2 SRP parameter + :rtype: str + """ + logger.debug("Processing challenge...") + try: + salt, B = salt_B + unhex_salt = self._safe_unhexlify(salt) + unhex_B = self._safe_unhexlify(B) + except TypeError as e: + logger.error("Bad data from server: %r" % (e,)) + raise SRPAuthenticationError(self.tr("The data sent from " + "the server had errors")) + M = self._srp_user.process_challenge(unhex_salt, unhex_B) + + auth_url = "%s/%s/%s/%s" % (self._provider_config.get_api_uri(), + self._provider_config. + get_api_version(), + "sessions", + username) + + auth_data = { + self.CLIENT_AUTH_KEY: binascii.hexlify(M) + } + + try: + auth_result = self._session.put(auth_url, + data=auth_data, + verify=self._provider_config. + get_ca_cert_path()) + except requests.exceptions.ConnectionError as e: + logger.error("No connection made (HAMK): %r" % (e,)) + raise SRPAuthenticationError(self.tr("Could not connect to " + "the server")) + + try: + content, mtime = get_content(auth_result) + except JSONDecodeError: + raise SRPAuthenticationError("Bad JSON content in auth result") + + if auth_result.status_code == 422: + error = "" + try: + error = json.loads(content).get("errors", "") + except ValueError: + logger.error("Problem parsing the received response: %s" + % (content,)) + except AttributeError: + logger.error("Expecting a dict but something else was " + "received: %s", (content,)) + logger.error("[%s] Wrong password (HAMK): [%s]" % + (auth_result.status_code, error)) + raise SRPAuthenticationError(self.tr("Wrong password")) + + if auth_result.status_code not in (200,): + logger.error("No valid response (HAMK): " + "Status code = %s. Content = %r" % + (auth_result.status_code, content)) + raise SRPAuthenticationError(self.tr("Unknown error (%s)") % + (auth_result.status_code,)) + + json_content = json.loads(content) + + try: + M2 = json_content.get("M2", None) + uid = json_content.get("id", None) + token = json_content.get("token", None) + except Exception as e: + logger.error(e) + raise Exception("Something went wrong with the login") + + events_signal(proto.CLIENT_UID, content=uid) + + self.set_uid(uid) + self.set_token(token) + + if M2 is None or self.get_uid() is None: + logger.error("Something went wrong. Content = %r" % + (content,)) + raise SRPAuthenticationError(self.tr("Problem getting data " + "from server")) + + return M2 + + def _verify_session(self, M2): + """ + Verifies the session based on the M2 parameter. If the + verification succeeds, it sets the session_id for this + session + + Might throw SRPAuthenticationError + + :param M2: M2 SRP parameter + :type M2: str + """ + logger.debug("Verifying session...") + try: + unhex_M2 = self._safe_unhexlify(M2) + except TypeError: + logger.error("Bad data from server (HAWK)") + raise SRPAuthenticationError(self.tr("Bad data from server")) + + self._srp_user.verify_session(unhex_M2) + + if not self._srp_user.authenticated(): + logger.error("Auth verification failed") + raise SRPAuthenticationError(self.tr("Auth verification " + "failed")) + logger.debug("Session verified.") + + session_id = self._session.cookies.get(self.SESSION_ID_KEY, None) + if not session_id: + logger.error("Bad cookie from server (missing _session_id)") + raise SRPAuthenticationError(self.tr("Session cookie " + "verification " + "failed")) + + events_signal(proto.CLIENT_SESSION_ID, content=session_id) + + self.set_session_id(session_id) + + def _threader(self, cb, res, *args, **kwargs): + return threads.deferToThread(cb, res, *args, **kwargs) + + def authenticate(self, username, password): + """ + Executes the whole authentication process for a user + + Might raise SRPAuthenticationError + + :param username: username for this session + :type username: str + :param password: password for this user + :type password: str + + :returns: A defer on a different thread + :rtype: twisted.internet.defer.Deferred + """ + leap_assert(self.get_session_id() is None, "Already logged in") + + d = threads.deferToThread(self._authentication_preprocessing, + username=username, + password=password) + + d.addCallback( + partial(self._threader, + self._start_authentication), + username=username, + password=password) + d.addCallback( + partial(self._threader, + self._process_challenge), + username=username) + d.addCallback(partial(self._threader, + self._verify_session)) + + return d + + def logout(self): + """ + Logs out the current session. + Expects a session_id to exists, might raise AssertionError + """ + logger.debug("Starting logout...") + + leap_assert(self.get_session_id(), + "Cannot logout an unexisting session") + + logout_url = "%s/%s/%s/" % (self._provider_config.get_api_uri(), + self._provider_config. + get_api_version(), + "sessions") + try: + self._session.delete(logout_url, + data=self.get_session_id(), + verify=self._provider_config. + get_ca_cert_path()) + except Exception as e: + logger.warning("Something went wrong with the logout: %r" % + (e,)) + + self.set_session_id(None) + self.set_uid(None) + # Also reset the session + self._session = self._fetcher.session() + logger.debug("Successfully logged out.") + + def set_session_id(self, session_id): + QtCore.QMutexLocker(self._session_id_lock) + self._session_id = session_id + + def get_session_id(self): + QtCore.QMutexLocker(self._session_id_lock) + return self._session_id + + def set_uid(self, uid): + QtCore.QMutexLocker(self._uid_lock) + self._uid = uid + + def get_uid(self): + QtCore.QMutexLocker(self._uid_lock) + return self._uid + + def set_token(self, token): + QtCore.QMutexLocker(self._token_lock) + self._token = token + + def get_token(self): + QtCore.QMutexLocker(self._token_lock) + return self._token + + __instance = None + + authentication_finished = QtCore.Signal(bool, str) + logout_finished = QtCore.Signal(bool, str) + + def __init__(self, provider_config): + """ + Creates a singleton instance if needed + """ + QtCore.QObject.__init__(self) + + # Check whether we already have an instance + if SRPAuth.__instance is None: + # Create and remember instance + SRPAuth.__instance = SRPAuth.__impl(provider_config) + + # Store instance reference as the only member in the handle + self.__dict__['_SRPAuth__instance'] = SRPAuth.__instance + + self._username = None + self._password = None + + def authenticate(self, username, password): + """ + Executes the whole authentication process for a user + + Might raise SRPAuthenticationError + + :param username: username for this session + :type username: str + :param password: password for this user + :type password: str + """ + + d = self.__instance.authenticate(username, password) + d.addCallback(self._gui_notify) + d.addErrback(self._errback) + return d + + def _gui_notify(self, _): + """ + Callback that notifies the UI with the proper signal. + + :param _: IGNORED, output from the previous callback (None) + :type _: IGNORED + """ + logger.debug("Successful login!") + self.authentication_finished.emit(True, self.tr("Succeeded")) + + def _errback(self, failure): + """ + General errback for the whole login process. Will notify the + UI with the proper signal. + + :param failure: Failure object captured from a callback. + :type failure: twisted.python.failure.Failure + """ + logger.error("Error logging in %s" % (failure,)) + self.authentication_finished.emit(False, "%s" % (failure.value,)) + failure.trap(Exception) + + def get_session_id(self): + return self.__instance.get_session_id() + + def get_uid(self): + return self.__instance.get_uid() + + def get_token(self): + return self.__instance.get_token() + + def logout(self): + """ + Logs out the current session. + Expects a session_id to exists, might raise AssertionError + """ + try: + self.__instance.logout() + self.logout_finished.emit(True, self.tr("Succeeded")) + return True + except Exception as e: + self.logout_finished.emit(False, "%s" % (e,)) + return False diff --git a/src/leap/crypto/srpregister.py b/src/leap/crypto/srpregister.py new file mode 100644 index 00000000..07b3c917 --- /dev/null +++ b/src/leap/crypto/srpregister.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# srpregister.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import binascii +import logging + +import requests +import srp + +from PySide import QtCore +from urlparse import urlparse + +from leap.config.providerconfig import ProviderConfig +from leap.crypto.constants import SIGNUP_TIMEOUT +from leap.common.check import leap_assert, leap_assert_type + +logger = logging.getLogger(__name__) + + +class SRPRegister(QtCore.QObject): + """ + Registers a user to a specific provider using SRP + """ + + USER_LOGIN_KEY = 'user[login]' + USER_VERIFIER_KEY = 'user[password_verifier]' + USER_SALT_KEY = 'user[password_salt]' + + registration_finished = QtCore.Signal(bool, object) + + def __init__(self, + provider_config=None, + register_path="users"): + """ + Constructor + + :param provider_config: provider configuration instance, + properly loaded + :type privider_config: ProviderConfig + :param register_path: webapp path for registering users + :type register_path; str + """ + QtCore.QObject.__init__(self) + leap_assert(provider_config, "Please provide a provider") + leap_assert_type(provider_config, ProviderConfig) + + self._provider_config = provider_config + + # **************************************************** # + # Dependency injection helpers, override this for more + # granular testing + self._fetcher = requests + self._srp = srp + self._hashfun = self._srp.SHA256 + self._ng = self._srp.NG_1024 + # **************************************************** # + + parsed_url = urlparse(provider_config.get_api_uri()) + self._provider = parsed_url.hostname + self._port = parsed_url.port + if self._port is None: + self._port = "443" + + self._register_path = register_path + + self._session = self._fetcher.session() + + def _get_registration_uri(self): + """ + Returns the URI where the register request should be made for + the provider + + :rtype: str + """ + + uri = "https://%s:%s/%s/%s" % ( + self._provider, + self._port, + self._provider_config.get_api_version(), + self._register_path) + + return uri + + def register_user(self, username, password): + """ + Registers a user with the validator based on the password provider + + :param username: username to register + :type username: str + :param password: password for this username + :type password: str + + :rtype: tuple + :rparam: (ok, request) + """ + salt, verifier = self._srp.create_salted_verification_key( + username, + password, + self._hashfun, + self._ng) + + user_data = { + self.USER_LOGIN_KEY: username, + self.USER_VERIFIER_KEY: binascii.hexlify(verifier), + self.USER_SALT_KEY: binascii.hexlify(salt) + } + + uri = self._get_registration_uri() + + logger.debug('Post to uri: %s' % uri) + logger.debug("Will try to register user = %s" % (username,)) + logger.debug("user_data => %r" % (user_data,)) + + ok = None + try: + req = self._session.post(uri, + data=user_data, + timeout=SIGNUP_TIMEOUT, + verify=self._provider_config. + get_ca_cert_path()) + + except requests.exceptions.SSLError as exc: + logger.error("SSLError: %s" % exc.message) + req = None + ok = False + else: + ok = req.ok + self.registration_finished.emit(ok, req) + return ok + + +if __name__ == "__main__": + logger = logging.getLogger(name='leap') + logger.setLevel(logging.DEBUG) + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s ' + '- %(name)s - %(levelname)s - %(message)s') + console.setFormatter(formatter) + logger.addHandler(console) + + provider = ProviderConfig() + + if provider.load("leap/providers/bitmask.net/provider.json"): + register = SRPRegister(provider_config=provider) + print "Registering user..." + print register.register_user("test1", "sarasaaaa") + print register.register_user("test2", "sarasaaaa") diff --git a/src/leap/crypto/tests/__init__.py b/src/leap/crypto/tests/__init__.py index e69de29b..7f118735 100644 --- a/src/leap/crypto/tests/__init__.py +++ b/src/leap/crypto/tests/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. diff --git a/src/leap/crypto/tests/eip-service.json b/src/leap/crypto/tests/eip-service.json new file mode 100644 index 00000000..24df42a2 --- /dev/null +++ b/src/leap/crypto/tests/eip-service.json @@ -0,0 +1,43 @@ +{ + "gateways": [ + { + "capabilities": { + "adblock": false, + "filter_dns": false, + "limited": true, + "ports": [ + "1194", + "443", + "53", + "80" + ], + "protocols": [ + "tcp", + "udp" + ], + "transport": [ + "openvpn" + ], + "user_ips": false + }, + "host": "harrier.cdev.bitmask.net", + "ip_address": "199.254.238.50", + "location": "seattle__wa" + } + ], + "locations": { + "seattle__wa": { + "country_code": "US", + "hemisphere": "N", + "name": "Seattle, WA", + "timezone": "-7" + } + }, + "openvpn_configuration": { + "auth": "SHA1", + "cipher": "AES-128-CBC", + "tls-cipher": "DHE-RSA-AES128-SHA" + }, + "serial": 1, + "version": 1 +}
\ No newline at end of file diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py new file mode 100755 index 00000000..54af485d --- /dev/null +++ b/src/leap/crypto/tests/fake_provider.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# fake_provider.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +"""A server faking some of the provider resources and apis, +used for testing Leap Client requests + +It needs that you create a subfolder named 'certs', +and that you place the following files: + +XXX check if in use + +[ ] test-openvpn.pem +[ ] test-provider.json +[ ] test-eip-service.json +""" +import binascii +import json +import os +import sys +import time + +import srp + +from OpenSSL import SSL + +from zope.interface import Interface, Attribute, implements + +from twisted.web.server import Site, Request +from twisted.web.static import File, Data +from twisted.web.resource import Resource +from twisted.internet import reactor + +from leap.common.testing.https_server import where + +# See +# http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.html +# for more examples + +""" +Testing the FAKE_API: +##################### + + 1) register an user + >> curl -d "user[login]=me" -d "user[password_salt]=foo" \ + -d "user[password_verifier]=beef" http://localhost:8000/1/users + << {"errors": null} + + 2) check that if you try to register again, it will fail: + >> curl -d "user[login]=me" -d "user[password_salt]=foo" \ + -d "user[password_verifier]=beef" http://localhost:8000/1/users + << {"errors": {"login": "already taken!"}} + +""" + +# Globals to mock user/sessiondb + +_USERDB = {} +_SESSIONDB = {} + +_here = os.path.split(__file__)[0] + + +safe_unhexlify = lambda x: binascii.unhexlify(x) \ + if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) + + +class IUser(Interface): + """ + Defines the User Interface + """ + login = Attribute("User login.") + salt = Attribute("Password salt.") + verifier = Attribute("Password verifier.") + session = Attribute("Session.") + svr = Attribute("Server verifier.") + + +class User(object): + """ + User object. + We store it in our simple session mocks + """ + + implements(IUser) + + def __init__(self, login, salt, verifier): + self.login = login + self.salt = salt + self.verifier = verifier + self.session = None + self.svr = None + + def set_server_verifier(self, svr): + """ + Adds a svr verifier object to this + User instance + """ + self.svr = svr + + def set_session(self, session): + """ + Adds this instance of User to the + global session dict + """ + _SESSIONDB[session] = self + self.session = session + + +class FakeUsers(Resource): + """ + Resource that handles user registration. + """ + + def __init__(self, name): + self.name = name + + def render_POST(self, request): + """ + Handles POST to the users api resource + Simulates a login. + """ + args = request.args + + login = args['user[login]'][0] + salt = args['user[password_salt]'][0] + verifier = args['user[password_verifier]'][0] + + if login in _USERDB: + request.setResponseCode(422) + return "%s\n" % json.dumps( + {'errors': {'login': 'already taken!'}}) + + print '[server]', login, verifier, salt + user = User(login, salt, verifier) + _USERDB[login] = user + return json.dumps({'errors': None}) + + +def getSession(self, sessionInterface=None): + """ + we overwrite twisted.web.server.Request.getSession method to + put the right cookie name in place + """ + if not self.session: + #cookiename = b"_".join([b'TWISTED_SESSION'] + self.sitepath) + cookiename = b"_".join([b'_session_id'] + self.sitepath) + sessionCookie = self.getCookie(cookiename) + if sessionCookie: + try: + self.session = self.site.getSession(sessionCookie) + except KeyError: + pass + # if it still hasn't been set, fix it up. + if not self.session: + self.session = self.site.makeSession() + self.addCookie(cookiename, self.session.uid, path=b'/') + self.session.touch() + if sessionInterface: + return self.session.getComponent(sessionInterface) + return self.session + + +def get_user(request): + """ + Returns user from the session dict + """ + login = request.args.get('login') + if login: + user = _USERDB.get(login[0], None) + if user: + return user + + request.getSession = getSession.__get__(request, Request) + session = request.getSession() + + user = _SESSIONDB.get(session, None) + return user + + +class FakeSession(Resource): + def __init__(self, name): + """ + Initializes session + """ + self.name = name + + def render_GET(self, request): + """ + Handles GET requests. + """ + return "%s\n" % json.dumps({'errors': None}) + + def render_POST(self, request): + """ + Handles POST requests. + """ + user = get_user(request) + + if not user: + # XXX get real error from demo provider + return json.dumps({'errors': 'no such user'}) + + A = request.args['A'][0] + + _A = safe_unhexlify(A) + _salt = safe_unhexlify(user.salt) + _verifier = safe_unhexlify(user.verifier) + + svr = srp.Verifier( + user.login, + _salt, + _verifier, + _A, + hash_alg=srp.SHA256, + ng_type=srp.NG_1024) + + s, B = svr.get_challenge() + + _B = binascii.hexlify(B) + + print '[server] login = %s' % user.login + print '[server] salt = %s' % user.salt + print '[server] len(_salt) = %s' % len(_salt) + print '[server] vkey = %s' % user.verifier + print '[server] len(vkey) = %s' % len(_verifier) + print '[server] s = %s' % binascii.hexlify(s) + print '[server] B = %s' % _B + print '[server] len(B) = %s' % len(_B) + + # override Request.getSession + request.getSession = getSession.__get__(request, Request) + session = request.getSession() + + user.set_session(session) + user.set_server_verifier(svr) + + # yep, this is tricky. + # some things are *already* unhexlified. + data = { + 'salt': user.salt, + 'B': _B, + 'errors': None} + + return json.dumps(data) + + def render_PUT(self, request): + """ + Handles PUT requests. + """ + # XXX check session??? + user = get_user(request) + + if not user: + print '[server] NO USER' + return json.dumps({'errors': 'no such user'}) + + data = request.content.read() + auth = data.split("client_auth=") + M = auth[1] if len(auth) > 1 else None + # if not H, return + if not M: + return json.dumps({'errors': 'no M proof passed by client'}) + + svr = user.svr + HAMK = svr.verify_session(binascii.unhexlify(M)) + if HAMK is None: + print '[server] verification failed!!!' + raise Exception("Authentication failed!") + #import ipdb;ipdb.set_trace() + + assert svr.authenticated() + print "***" + print '[server] User successfully authenticated using SRP!' + print "***" + + return json.dumps( + {'M2': binascii.hexlify(HAMK), + 'id': '9c943eb9d96a6ff1b7a7030bdeadbeef', + 'errors': None}) + + +class API_Sessions(Resource): + """ + Top resource for the API v1 + """ + def getChild(self, name, request): + return FakeSession(name) + + +class FileModified(File): + def render_GET(self, request): + since = request.getHeader('if-modified-since') + if since: + tsince = time.strptime(since.replace(" GMT", "")) + tfrom = time.strptime(time.ctime(os.path.getmtime(self.path))) + if tfrom > tsince: + return File.render_GET(self, request) + else: + request.setResponseCode(304) + return "" + return File.render_GET(self, request) + + +class OpenSSLServerContextFactory(object): + + def getContext(self): + """ + Create an SSL context. + """ + ctx = SSL.Context(SSL.SSLv23_METHOD) + #ctx = SSL.Context(SSL.TLSv1_METHOD) + ctx.use_certificate_file(where('leaptestscert.pem')) + ctx.use_privatekey_file(where('leaptestskey.pem')) + + return ctx + + +def get_provider_factory(): + """ + Instantiates a Site that serves the resources + that we expect from a valid provider. + Listens on: + * port 8000 for http connections + * port 8443 for https connections + + :rparam: factory for a site + :rtype: Site instance + """ + root = Data("", "") + root.putChild("", root) + root.putChild("provider.json", FileModified( + os.path.join(_here, + "test_provider.json"))) + config = Resource() + config.putChild( + "eip-service.json", + FileModified( + os.path.join(_here, "eip-service.json"))) + apiv1 = Resource() + apiv1.putChild("config", config) + apiv1.putChild("sessions", API_Sessions()) + apiv1.putChild("users", FakeUsers(None)) + apiv1.putChild("cert", FileModified( + os.path.join(_here, + 'openvpn.pem'))) + root.putChild("1", apiv1) + + factory = Site(root) + return factory + + +if __name__ == "__main__": + + from twisted.python import log + log.startLogging(sys.stdout) + + factory = get_provider_factory() + + # regular http (for debugging with curl) + reactor.listenTCP(8000, factory) + reactor.listenSSL(8443, factory, OpenSSLServerContextFactory()) + reactor.run() diff --git a/src/leap/crypto/tests/openvpn.pem b/src/leap/crypto/tests/openvpn.pem new file mode 100644 index 00000000..a95e9370 --- /dev/null +++ b/src/leap/crypto/tests/openvpn.pem @@ -0,0 +1,33 @@ +-----BEGIN CERTIFICATE----- +MIIFtTCCA52gAwIBAgIJAIGJ8Dg+DtemMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI2MjAyMDIyWhcNMTgwNjI2MjAyMDIyWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC +CgKCAgEAxJaN0lWjFu+3j48c0WG8BvmPUf026Xli5d5NE4EjGsirwfre0oTeWZT9 +WRxqLGd2wDh6Mc9r6UqH6dwqLZKbsgwB5zI2lag7UWFttJF1U1c6AJynhaLMoy73 +sL9USTmQ57iYRFrVP/nGj9/L6I1XnV6midPi7a5aZreH9q8dWaAhmc9eFDU+Y4vS +sTFS6aomajLrI6YWo5toKqLq8IMryD03IM78a7gJtLgfWs+pYZRUBlM5JaYX98eX +mVPAYYH9krWxLVN3hTt1ngECzK+epo275zQJh960/2fNCfVJSXqSXcficLs+bR7t +FEkNuOP1hFV6LuoLL+k5Su+hp5kXMYZTvYYDpW4nPJoBdSG1w5O5IxO6zh+9VLB7 +oLrlgoyWvBoou5coCBpZVU6UyWcOx58kuZF8wNr0GgdvWAFwOGVuVG5jmcVdhaKC +0C8NxHrxlhcrcp0zwtDaOxfmZfcxiXs35iwUip5vS18Nv+XBK8ad9T79Ox8nSzP3 +RGPVDpExz7gPbZglqSe47XBIk0ZuIzgOgYpJj4JrpoewoIYb+OmUgI7UZjoGsMrV ++B2BqOKs7kF0HW3i5bR9YAi0ZYvnhQgjBtwCKm4zvLqwuPZHz9VWgIk6uezgStCP +WyzQ8IcopK49fOjcKa6JT5JRU+27paIZf1BkQsTkJy/Nti4TvwMCAwEAAaOBpzCB +pDAdBgNVHQ4EFgQUEgXSd3Yl3xAzbkWa7xeNe27d99cwdQYDVR0jBG4wbIAUEgXS +d3Yl3xAzbkWa7xeNe27d99ehSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT +b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCB +ifA4Pg7XpjAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQA6Vl9Ve4Qe +ewzXAxr0BabFRhtIuF7DV+/niT46qJhW2KgYe6rwZqdAhEbgH3kTPJ5JmmcUnAEH +nmrfoku/YAb5ObfdHUACsHy4cvSvFwBUQ9vXP6+oOFJhrGW4uzRI2pHGvnqB3lQ0 +JEPmPwduBCI5reRYauPbd4Wl4VhLGrjELb4JQZL24Q5ehXMnv415m7+aMkLzT2IA +p6B2xgRR+JAeUdyCNOV1f5AqJWyAUJPWGR0e1OTKNfc49+2skK0NmzrpGsoktSHa +uN6vGBCVGiZh7BTYblWMG5q9Am7idcdmC2fdpIf5yj7CKzV7WIPxPs0I7TuRcr41 +pUBLCAElcyCPB89lySol2BDs4gk4wZs4y2shUs3o0+mIpw/6o8tQF/9IL8ALkLqr +q9SuND7O1RXcg74o3HeVmRKtoI/KdgaVhJ0rFvcq83ftfu3KMyWB6SOKOu6ZYON8 +AcSjsDDpnDrwGFvjAYHiTkS9NaaJC1/g7Y6jjhxmbTkXPA6V8MvLKQiOvqk/9gCh +85FHsFkElIYnH6fbHIRxg20cnqmddTd+H5HgBIlhiKWuydtuoQFwzR/D3ypgLBaB +OWLcBP7I+RYhKlJFIWnfiyB0xbyI4W/UfL8p8jQI8TE9oIlm3WqxJXfebDEDEstj +8nS4Fb3G5Wr4pZMjfbtmBSAgHeWH6B90jg== +-----END CERTIFICATE----- diff --git a/src/leap/crypto/tests/test_certs.py b/src/leap/crypto/tests/test_certs.py deleted file mode 100644 index e476b630..00000000 --- a/src/leap/crypto/tests/test_certs.py +++ /dev/null @@ -1,22 +0,0 @@ -import unittest - -from leap.testing.https_server import where -from leap.crypto import certs - - -class CertTestCase(unittest.TestCase): - - def test_can_load_client_and_pkey(self): - with open(where('leaptestscert.pem')) as cf: - cs = cf.read() - with open(where('leaptestskey.pem')) as kf: - ks = kf.read() - certs.can_load_cert_and_pkey(cs + ks) - - with self.assertRaises(certs.BadCertError): - # screw header - certs.can_load_cert_and_pkey(cs.replace("BEGIN", "BEGINN") + ks) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/crypto/tests/test_provider.json b/src/leap/crypto/tests/test_provider.json new file mode 100644 index 00000000..c37bef8f --- /dev/null +++ b/src/leap/crypto/tests/test_provider.json @@ -0,0 +1,15 @@ +{ + "api_uri": "https://localhost:8443", + "api_version": "1", + "ca_cert_fingerprint": "SHA256: 0f17c033115f6b76ff67871872303ff65034efe7dd1b910062ca323eb4da5c7e", + "ca_cert_uri": "https://bitmask.net/ca.crt", + "default_language": "en", + "domain": "example.com", + "enrollment_policy": "open", + "name": { + "en": "Bitmask" + }, + "services": [ + "openvpn" + ] +} diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py new file mode 100644 index 00000000..6d2b52e8 --- /dev/null +++ b/src/leap/crypto/tests/test_srpregister.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +# test_srpregister.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: + * leap/crypto/srpregister.py + * leap/crypto/srpauth.py +""" +try: + import unittest2 as unittest +except ImportError: + import unittest +import os +import sys + +from mock import MagicMock +from nose.twistedtools import reactor, deferred +from twisted.python import log +from twisted.internet import threads + +from leap.common.testing.https_server import where +from leap.config.providerconfig import ProviderConfig +from leap.crypto import srpregister, srpauth +from leap.crypto.tests import fake_provider + +log.startLogging(sys.stdout) + + +def _get_capath(): + return where("cacert.pem") + +_here = os.path.split(__file__)[0] + + +class ImproperlyConfiguredError(Exception): + """ + Raised if the test provider is missing configuration + """ + + +class SRPTestCase(unittest.TestCase): + """ + Tests for the SRP Register and Auth classes + """ + __name__ = "SRPRegister and SRPAuth tests" + + @classmethod + def setUpClass(cls): + """ + Sets up this TestCase with a simple and faked provider instance: + + * runs a threaded reactor + * loads a mocked ProviderConfig that points to the certs in the + leap.common.testing module. + """ + factory = fake_provider.get_provider_factory() + http = reactor.listenTCP(8001, factory) + https = reactor.listenSSL( + 0, factory, + fake_provider.OpenSSLServerContextFactory()) + get_port = lambda p: p.getHost().port + cls.http_port = get_port(http) + cls.https_port = get_port(https) + + provider = ProviderConfig() + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = _get_capath() + + provider.get_api_uri = MagicMock() + provider.get_api_uri.return_value = cls._get_https_uri() + + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + cls.register = srpregister.SRPRegister(provider_config=provider) + + cls.auth = srpauth.SRPAuth(provider) + + # helper methods + + @classmethod + def _get_https_uri(cls): + """ + Returns a https uri with the right https port initialized + """ + return "https://localhost:%s" % (cls.https_port,) + + # Register tests + + def test_none_port(self): + provider = ProviderConfig() + provider.get_api_uri = MagicMock() + provider.get_api_uri.return_value = "http://localhost/" + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + + register = srpregister.SRPRegister(provider_config=provider) + self.assertEquals(register._port, "443") + + @deferred() + def test_wrong_cert(self): + provider = ProviderConfig() + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = os.path.join( + _here, + "wrongcert.pem") + provider.get_api_uri = MagicMock() + provider.get_api_uri.return_value = self._get_https_uri() + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + + register = srpregister.SRPRegister(provider_config=provider) + d = threads.deferToThread(register.register_user, "foouser_firsttime", + "barpass") + d.addCallback(self.assertFalse) + return d + + @deferred() + def test_register_user(self): + """ + Checks if the registration of an unused name works as expected when + it is the first time that we attempt to register that user, as well as + when we request a user that is taken. + """ + # pristine registration + d = threads.deferToThread(self.register.register_user, + "foouser_firsttime", + "barpass") + d.addCallback(self.assertTrue) + return d + + @deferred() + def test_second_register_user(self): + # second registration attempt with the same user should return errors + d = threads.deferToThread(self.register.register_user, + "foouser_second", + "barpass") + d.addCallback(self.assertTrue) + + # FIXME currently we are catching this in an upper layer, + # we could bring the error validation to the SRPRegister class + def register_wrapper(_): + return threads.deferToThread(self.register.register_user, + "foouser_second", + "barpass") + d.addCallback(register_wrapper) + d.addCallback(self.assertFalse) + return d + + @deferred() + def test_correct_http_uri(self): + """ + Checks that registration autocorrect http uris to https ones. + """ + HTTP_URI = "http://localhost:%s" % (self.https_port, ) + HTTPS_URI = "https://localhost:%s/1/users" % (self.https_port, ) + provider = ProviderConfig() + provider.get_ca_cert_path = MagicMock() + provider.get_ca_cert_path.return_value = _get_capath() + provider.get_api_uri = MagicMock() + + # we introduce a http uri in the config file... + provider.get_api_uri.return_value = HTTP_URI + loaded = provider.load(path=os.path.join( + _here, "test_provider.json")) + if not loaded: + raise ImproperlyConfiguredError( + "Could not load test provider config") + + register = srpregister.SRPRegister(provider_config=provider) + + # ... and we check that we're correctly taking the HTTPS protocol + # instead + reg_uri = register._get_registration_uri() + self.assertEquals(reg_uri, HTTPS_URI) + register._get_registration_uri = MagicMock(return_value=HTTPS_URI) + d = threads.deferToThread(register.register_user, "test_failhttp", + "barpass") + d.addCallback(self.assertTrue) + + return d diff --git a/src/leap/crypto/tests/wrongcert.pem b/src/leap/crypto/tests/wrongcert.pem new file mode 100644 index 00000000..e6cff38a --- /dev/null +++ b/src/leap/crypto/tests/wrongcert.pem @@ -0,0 +1,33 @@ +-----BEGIN CERTIFICATE----- +MIIFtTCCA52gAwIBAgIJAIWZus5EIXNtMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI1MTc0NjExWhcNMTgwNjI1MTc0NjExWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC +CgKCAgEA2ObM7ESjyuxFZYD/Y68qOPQgjgggW+cdXfBpU2p4n7clsrUeMhWdW40Y +77Phzor9VOeqs3ZpHuyLzsYVp/kFDm8tKyo2ah5fJwzL0VCSLYaZkUQQ7GNUmTCk +furaxl8cQx/fg395V7/EngsS9B3/y5iHbctbA4MnH3jaotO5EGeo6hw7/eyCotQ9 +KbBV9GJMcY94FsXBCmUB+XypKklWTLhSaS6Cu4Fo8YLW6WmcnsyEOGS2F7WVf5at +7CBWFQZHaSgIBLmc818/mDYCnYmCVMFn/6Ndx7V2NTlz+HctWrQn0dmIOnCUeCwS +wXq9PnBR1rSx/WxwyF/WpyjOFkcIo7vm72kS70pfrYsXcZD4BQqkXYj3FyKnPt3O +ibLKtCxL8/83wOtErPcYpG6LgFkgAAlHQ9MkUi5dbmjCJtpqQmlZeK1RALdDPiB3 +K1KZimrGsmcE624dJxUIOJJpuwJDy21F8kh5ZAsAtE1prWETrQYNElNFjQxM83rS +ZR1Ql2MPSB4usEZT57+KvpEzlOnAT3elgCg21XrjSFGi14hCEao4g2OEZH5GAwm5 +frf6UlSRZ/g3tLTfI8Hv1prw15W2qO+7q7SBAplTODCRk+Yb0YoA2mMM/QXBUcXs +vKEDLSSxzNIBi3T62l39RB/ml+gPKo87ZMDivex1ZhrcJc3Yu3sCAwEAAaOBpzCB +pDAdBgNVHQ4EFgQUPjE+4pun+8FreIdpoR8v6N7xKtUwdQYDVR0jBG4wbIAUPjE+ +4pun+8FreIdpoR8v6N7xKtWhSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT +b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCF +mbrORCFzbTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQCpvCPdtvXJ +muTj379TZuCJs7/l0FhA7AHa1WAlHjsXHaA7N0+3ZWAbdtXDsowal6S+ldgU/kfV +Lq7NrRq+amJWC7SYj6cvVwhrSwSvu01fe/TWuOzHrRv1uTfJ/VXLonVufMDd9opo +bhqYxMaxLdIx6t/MYmZH4Wpiq0yfZuv//M8i7BBl/qvaWbLhg0yVAKRwjFvf59h6 +6tRFCLddELOIhLDQtk8zMbioPEbfAlKdwwP8kYGtDGj6/9/YTd/oTKRdgHuwyup3 +m0L20Y6LddC+tb0WpK5EyrNbCbEqj1L4/U7r6f/FKNA3bx6nfdXbscaMfYonKAKg +1cRrRg45sErmCz0QyTnWzXyvbjR4oQRzyW3kJ1JZudZ+AwOi00J5FYa3NiLuxl1u +gIGKWSrASQWhEdpa1nlCgX7PhdaQgYjEMpQvA0GCA0OF5JDu8en1yZqsOt1hCLIN +lkz/5jKPqrclY5hV99bE3hgCHRmIPNHCZG3wbZv2yJKxJX1YLMmQwAmSh2N7YwGG +yXRvCxQs5ChPHyRairuf/5MZCZnSVb45ppTVuNUijsbflKRUgfj/XvfqQ22f+C9N +Om2dmNvAiS2TOIfuP47CF2OUa5q4plUwmr+nyXQGM0SIoHNCj+MBdFfb3oxxAtI+ +SLhbnzQv5e84Doqz3YF0XW8jyR7q8GFLNA== +-----END CERTIFICATE----- |