summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/crypto
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-08-12 16:25:50 -0300
committerTomás Touceda <chiiph@leap.se>2013-08-12 16:25:50 -0300
commit75a1b6e96b789a8d3d4b9b22bbf62e30ffe62751 (patch)
treecc39f23e95bdbff7495cc866e2f51c1c4f54bc32 /src/leap/bitmask/crypto
parent733fd79e1da439604bd45587417fe466a6af9d92 (diff)
parent3c7981e61d3b48f9a000d08056ff79e993c71ce1 (diff)
Merge remote-tracking branch 'kali/feature/create_bitmask_namespace' into develop
Diffstat (limited to 'src/leap/bitmask/crypto')
-rw-r--r--src/leap/bitmask/crypto/__init__.py0
-rw-r--r--src/leap/bitmask/crypto/srpauth.py606
-rw-r--r--src/leap/bitmask/crypto/srpregister.py168
-rw-r--r--src/leap/bitmask/crypto/tests/__init__.py16
-rw-r--r--src/leap/bitmask/crypto/tests/eip-service.json43
-rwxr-xr-xsrc/leap/bitmask/crypto/tests/fake_provider.py376
-rw-r--r--src/leap/bitmask/crypto/tests/openvpn.pem33
-rw-r--r--src/leap/bitmask/crypto/tests/test_provider.json15
-rw-r--r--src/leap/bitmask/crypto/tests/test_srpauth.py791
-rw-r--r--src/leap/bitmask/crypto/tests/test_srpregister.py201
-rw-r--r--src/leap/bitmask/crypto/tests/wrongcert.pem33
11 files changed, 2282 insertions, 0 deletions
diff --git a/src/leap/bitmask/crypto/__init__.py b/src/leap/bitmask/crypto/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/bitmask/crypto/__init__.py
diff --git a/src/leap/bitmask/crypto/srpauth.py b/src/leap/bitmask/crypto/srpauth.py
new file mode 100644
index 00000000..2d34bb74
--- /dev/null
+++ b/src/leap/bitmask/crypto/srpauth.py
@@ -0,0 +1,606 @@
+# -*- coding: utf-8 -*-
+# srpauth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import binascii
+import logging
+
+import requests
+import srp
+import json
+
+#this error is raised from requests
+from simplejson.decoder import JSONDecodeError
+from functools import partial
+
+from PySide import QtCore
+from twisted.internet import threads
+
+from leap.bitmask.util import request_helpers as reqhelper
+from leap.bitmask.util.constants import REQUEST_TIMEOUT
+from leap.common.check import leap_assert
+from leap.common.events import signal as events_signal
+from leap.common.events import events_pb2 as proto
+
+logger = logging.getLogger(__name__)
+
+
+class SRPAuthenticationError(Exception):
+ """
+ Exception raised for authentication errors
+ """
+ pass
+
+
+class SRPAuthConnectionError(SRPAuthenticationError):
+ """
+ Exception raised when there's a connection error
+ """
+ pass
+
+
+class SRPAuthUnknownUser(SRPAuthenticationError):
+ """
+ Exception raised when trying to authenticate an unknown user
+ """
+ pass
+
+
+class SRPAuthBadStatusCode(SRPAuthenticationError):
+ """
+ Exception raised when we received an unknown bad status code
+ """
+ pass
+
+
+class SRPAuthNoSalt(SRPAuthenticationError):
+ """
+ Exception raised when we don't receive the salt param at a
+ specific point in the auth process
+ """
+ pass
+
+
+class SRPAuthNoB(SRPAuthenticationError):
+ """
+ Exception raised when we don't receive the B param at a specific
+ point in the auth process
+ """
+ pass
+
+
+class SRPAuthBadDataFromServer(SRPAuthenticationError):
+ """
+ Generic exception when we receive bad data from the server.
+ """
+ pass
+
+
+class SRPAuthJSONDecodeError(SRPAuthenticationError):
+ """
+ Exception raised when there's a problem decoding the JSON content
+ parsed as received from th e server.
+ """
+ pass
+
+
+class SRPAuthBadPassword(SRPAuthenticationError):
+ """
+ Exception raised when the user provided a bad password to auth.
+ """
+ pass
+
+
+class SRPAuthVerificationFailed(SRPAuthenticationError):
+ """
+ Exception raised when we can't verify the SRP data received from
+ the server.
+ """
+ pass
+
+
+class SRPAuthNoSessionId(SRPAuthenticationError):
+ """
+ Exception raised when we don't receive a session id from the
+ server.
+ """
+ pass
+
+
+class SRPAuth(QtCore.QObject):
+ """
+ SRPAuth singleton
+ """
+
+ class __impl(QtCore.QObject):
+ """
+ Implementation of the SRPAuth interface
+ """
+
+ LOGIN_KEY = "login"
+ A_KEY = "A"
+ CLIENT_AUTH_KEY = "client_auth"
+ SESSION_ID_KEY = "_session_id"
+
+ def __init__(self, provider_config):
+ """
+ Constructor for SRPAuth implementation
+
+ :param server: Server to which we will authenticate
+ :type server: str
+ """
+ QtCore.QObject.__init__(self)
+
+ leap_assert(provider_config,
+ "We need a provider config to authenticate")
+
+ self._provider_config = provider_config
+
+ # **************************************************** #
+ # Dependency injection helpers, override this for more
+ # granular testing
+ self._fetcher = requests
+ self._srp = srp
+ self._hashfun = self._srp.SHA256
+ self._ng = self._srp.NG_1024
+ # **************************************************** #
+
+ self._session = self._fetcher.session()
+ self._session_id = None
+ self._session_id_lock = QtCore.QMutex()
+ self._uid = None
+ self._uid_lock = QtCore.QMutex()
+ self._token = None
+ self._token_lock = QtCore.QMutex()
+
+ self._srp_user = None
+ self._srp_a = None
+
+ def _safe_unhexlify(self, val):
+ """
+ Rounds the val to a multiple of 2 and returns the
+ unhexlified value
+
+ :param val: hexlified value
+ :type val: str
+
+ :rtype: binary hex data
+ :return: unhexlified val
+ """
+ return binascii.unhexlify(val) \
+ if (len(val) % 2 == 0) else binascii.unhexlify('0' + val)
+
+ def _authentication_preprocessing(self, username, password):
+ """
+ Generates the SRP.User to get the A SRP parameter
+
+ :param username: username to login
+ :type username: str
+ :param password: password for the username
+ :type password: str
+ """
+ logger.debug("Authentication preprocessing...")
+ self._srp_user = self._srp.User(username,
+ password,
+ self._hashfun,
+ self._ng)
+ _, A = self._srp_user.start_authentication()
+
+ self._srp_a = A
+
+ def _start_authentication(self, _, username):
+ """
+ Sends the first request for authentication to retrieve the
+ salt and B parameter
+
+ Might raise all SRPAuthenticationError based:
+ SRPAuthenticationError
+ SRPAuthConnectionError
+ SRPAuthUnknownUser
+ SRPAuthBadStatusCode
+ SRPAuthNoSalt
+ SRPAuthNoB
+
+ :param _: IGNORED, output from the previous callback (None)
+ :type _: IGNORED
+ :param username: username to login
+ :type username: str
+
+ :return: salt and B parameters
+ :rtype: tuple
+ """
+ logger.debug("Starting authentication process...")
+ try:
+ auth_data = {
+ self.LOGIN_KEY: username,
+ self.A_KEY: binascii.hexlify(self._srp_a)
+ }
+ sessions_url = "%s/%s/%s/" % \
+ (self._provider_config.get_api_uri(),
+ self._provider_config.get_api_version(),
+ "sessions")
+ init_session = self._session.post(sessions_url,
+ data=auth_data,
+ verify=self._provider_config.
+ get_ca_cert_path(),
+ timeout=REQUEST_TIMEOUT)
+ # Clean up A value, we don't need it anymore
+ self._srp_a = None
+ except requests.exceptions.ConnectionError as e:
+ logger.error("No connection made (salt): %r" %
+ (e,))
+ raise SRPAuthConnectionError("Could not establish a "
+ "connection")
+ except Exception as e:
+ logger.error("Unknown error: %r" % (e,))
+ raise SRPAuthenticationError("Unknown error: %r" %
+ (e,))
+
+ content, mtime = reqhelper.get_content(init_session)
+
+ if init_session.status_code not in (200,):
+ logger.error("No valid response (salt): "
+ "Status code = %r. Content: %r" %
+ (init_session.status_code, content))
+ if init_session.status_code == 422:
+ raise SRPAuthUnknownUser(self.tr("Unknown user"))
+
+ raise SRPAuthBadStatusCode(self.tr("There was a problem with"
+ " authentication"))
+
+ json_content = json.loads(content)
+ salt = json_content.get("salt", None)
+ B = json_content.get("B", None)
+
+ if salt is None:
+ logger.error("No salt parameter sent")
+ raise SRPAuthNoSalt(self.tr("The server did not send "
+ "the salt parameter"))
+ if B is None:
+ logger.error("No B parameter sent")
+ raise SRPAuthNoB(self.tr("The server did not send "
+ "the B parameter"))
+
+ return salt, B
+
+ def _process_challenge(self, salt_B, username):
+ """
+ Given the salt and B processes the auth challenge and
+ generates the M2 parameter
+
+ Might raise SRPAuthenticationError based:
+ SRPAuthenticationError
+ SRPAuthBadDataFromServer
+ SRPAuthConnectionError
+ SRPAuthJSONDecodeError
+ SRPAuthBadPassword
+
+ :param salt_B: salt and B parameters for the username
+ :type salt_B: tuple
+ :param username: username for this session
+ :type username: str
+
+ :return: the M2 SRP parameter
+ :rtype: str
+ """
+ logger.debug("Processing challenge...")
+ try:
+ salt, B = salt_B
+ unhex_salt = self._safe_unhexlify(salt)
+ unhex_B = self._safe_unhexlify(B)
+ except (TypeError, ValueError) as e:
+ logger.error("Bad data from server: %r" % (e,))
+ raise SRPAuthBadDataFromServer(
+ self.tr("The data sent from the server had errors"))
+ M = self._srp_user.process_challenge(unhex_salt, unhex_B)
+
+ auth_url = "%s/%s/%s/%s" % (self._provider_config.get_api_uri(),
+ self._provider_config.
+ get_api_version(),
+ "sessions",
+ username)
+
+ auth_data = {
+ self.CLIENT_AUTH_KEY: binascii.hexlify(M)
+ }
+
+ try:
+ auth_result = self._session.put(auth_url,
+ data=auth_data,
+ verify=self._provider_config.
+ get_ca_cert_path(),
+ timeout=REQUEST_TIMEOUT)
+ except requests.exceptions.ConnectionError as e:
+ logger.error("No connection made (HAMK): %r" % (e,))
+ raise SRPAuthConnectionError(self.tr("Could not connect to "
+ "the server"))
+
+ try:
+ content, mtime = reqhelper.get_content(auth_result)
+ except JSONDecodeError:
+ raise SRPAuthJSONDecodeError("Bad JSON content in auth result")
+
+ if auth_result.status_code == 422:
+ error = ""
+ try:
+ error = json.loads(content).get("errors", "")
+ except ValueError:
+ logger.error("Problem parsing the received response: %s"
+ % (content,))
+ except AttributeError:
+ logger.error("Expecting a dict but something else was "
+ "received: %s", (content,))
+ logger.error("[%s] Wrong password (HAMK): [%s]" %
+ (auth_result.status_code, error))
+ raise SRPAuthBadPassword(self.tr("Wrong password"))
+
+ if auth_result.status_code not in (200,):
+ logger.error("No valid response (HAMK): "
+ "Status code = %s. Content = %r" %
+ (auth_result.status_code, content))
+ raise SRPAuthBadStatusCode(self.tr("Unknown error (%s)") %
+ (auth_result.status_code,))
+
+ return json.loads(content)
+
+ def _extract_data(self, json_content):
+ """
+ Extracts the necessary parameters from json_content (M2,
+ id, token)
+
+ Might raise SRPAuthenticationError based:
+ SRPBadDataFromServer
+
+ :param json_content: Data received from the server
+ :type json_content: dict
+ """
+ try:
+ M2 = json_content.get("M2", None)
+ uid = json_content.get("id", None)
+ token = json_content.get("token", None)
+ except Exception as e:
+ logger.error(e)
+ raise SRPAuthBadDataFromServer("Something went wrong with the "
+ "login")
+
+ self.set_uid(uid)
+ self.set_token(token)
+
+ if M2 is None or self.get_uid() is None:
+ logger.error("Something went wrong. Content = %r" %
+ (json_content,))
+ raise SRPAuthBadDataFromServer(self.tr("Problem getting data "
+ "from server"))
+
+ events_signal(
+ proto.CLIENT_UID, content=uid,
+ reqcbk=lambda req, res: None) # make the rpc call async
+
+ return M2
+
+ def _verify_session(self, M2):
+ """
+ Verifies the session based on the M2 parameter. If the
+ verification succeeds, it sets the session_id for this
+ session
+
+ Might raise SRPAuthenticationError based:
+ SRPAuthBadDataFromServer
+ SRPAuthVerificationFailed
+
+ :param M2: M2 SRP parameter
+ :type M2: str
+ """
+ logger.debug("Verifying session...")
+ try:
+ unhex_M2 = self._safe_unhexlify(M2)
+ except TypeError:
+ logger.error("Bad data from server (HAWK)")
+ raise SRPAuthBadDataFromServer(self.tr("Bad data from server"))
+
+ self._srp_user.verify_session(unhex_M2)
+
+ if not self._srp_user.authenticated():
+ logger.error("Auth verification failed")
+ raise SRPAuthVerificationFailed(self.tr("Auth verification "
+ "failed"))
+ logger.debug("Session verified.")
+
+ session_id = self._session.cookies.get(self.SESSION_ID_KEY, None)
+ if not session_id:
+ logger.error("Bad cookie from server (missing _session_id)")
+ raise SRPAuthNoSessionId(self.tr("Session cookie "
+ "verification "
+ "failed"))
+
+ events_signal(
+ proto.CLIENT_SESSION_ID, content=session_id,
+ reqcbk=lambda req, res: None) # make the rpc call async
+
+ self.set_session_id(session_id)
+
+ def _threader(self, cb, res, *args, **kwargs):
+ return threads.deferToThread(cb, res, *args, **kwargs)
+
+ def authenticate(self, username, password):
+ """
+ Executes the whole authentication process for a user
+
+ Might raise SRPAuthenticationError
+
+ :param username: username for this session
+ :type username: str
+ :param password: password for this user
+ :type password: str
+
+ :returns: A defer on a different thread
+ :rtype: twisted.internet.defer.Deferred
+ """
+ leap_assert(self.get_session_id() is None, "Already logged in")
+
+ d = threads.deferToThread(self._authentication_preprocessing,
+ username=username,
+ password=password)
+
+ d.addCallback(
+ partial(self._threader,
+ self._start_authentication),
+ username=username)
+ d.addCallback(
+ partial(self._threader,
+ self._process_challenge),
+ username=username)
+ d.addCallback(
+ partial(self._threader,
+ self._extract_data))
+ d.addCallback(partial(self._threader,
+ self._verify_session))
+
+ return d
+
+ def logout(self):
+ """
+ Logs out the current session.
+ Expects a session_id to exists, might raise AssertionError
+ """
+ logger.debug("Starting logout...")
+
+ leap_assert(self.get_session_id(),
+ "Cannot logout an unexisting session")
+
+ logout_url = "%s/%s/%s/" % (self._provider_config.get_api_uri(),
+ self._provider_config.
+ get_api_version(),
+ "sessions")
+ try:
+ self._session.delete(logout_url,
+ data=self.get_session_id(),
+ verify=self._provider_config.
+ get_ca_cert_path(),
+ timeout=REQUEST_TIMEOUT)
+ except Exception as e:
+ logger.warning("Something went wrong with the logout: %r" %
+ (e,))
+
+ self.set_session_id(None)
+ self.set_uid(None)
+ # Also reset the session
+ self._session = self._fetcher.session()
+ logger.debug("Successfully logged out.")
+
+ def set_session_id(self, session_id):
+ QtCore.QMutexLocker(self._session_id_lock)
+ self._session_id = session_id
+
+ def get_session_id(self):
+ QtCore.QMutexLocker(self._session_id_lock)
+ return self._session_id
+
+ def set_uid(self, uid):
+ QtCore.QMutexLocker(self._uid_lock)
+ self._uid = uid
+
+ def get_uid(self):
+ QtCore.QMutexLocker(self._uid_lock)
+ return self._uid
+
+ def set_token(self, token):
+ QtCore.QMutexLocker(self._token_lock)
+ self._token = token
+
+ def get_token(self):
+ QtCore.QMutexLocker(self._token_lock)
+ return self._token
+
+ __instance = None
+
+ authentication_finished = QtCore.Signal(bool, str)
+ logout_finished = QtCore.Signal(bool, str)
+
+ def __init__(self, provider_config):
+ """
+ Creates a singleton instance if needed
+ """
+ QtCore.QObject.__init__(self)
+
+ # Check whether we already have an instance
+ if SRPAuth.__instance is None:
+ # Create and remember instance
+ SRPAuth.__instance = SRPAuth.__impl(provider_config)
+
+ # Store instance reference as the only member in the handle
+ self.__dict__['_SRPAuth__instance'] = SRPAuth.__instance
+
+ def authenticate(self, username, password):
+ """
+ Executes the whole authentication process for a user
+
+ Might raise SRPAuthenticationError based
+
+ :param username: username for this session
+ :type username: str
+ :param password: password for this user
+ :type password: str
+ """
+
+ d = self.__instance.authenticate(username, password)
+ d.addCallback(self._gui_notify)
+ d.addErrback(self._errback)
+ return d
+
+ def _gui_notify(self, _):
+ """
+ Callback that notifies the UI with the proper signal.
+
+ :param _: IGNORED, output from the previous callback (None)
+ :type _: IGNORED
+ """
+ logger.debug("Successful login!")
+ self.authentication_finished.emit(True, self.tr("Succeeded"))
+
+ def _errback(self, failure):
+ """
+ General errback for the whole login process. Will notify the
+ UI with the proper signal.
+
+ :param failure: Failure object captured from a callback.
+ :type failure: twisted.python.failure.Failure
+ """
+ logger.error("Error logging in %s" % (failure,))
+ self.authentication_finished.emit(False, "%s" % (failure.value,))
+ failure.trap(Exception)
+
+ def get_session_id(self):
+ return self.__instance.get_session_id()
+
+ def get_uid(self):
+ return self.__instance.get_uid()
+
+ def get_token(self):
+ return self.__instance.get_token()
+
+ def logout(self):
+ """
+ Logs out the current session.
+ Expects a session_id to exists, might raise AssertionError
+ """
+ try:
+ self.__instance.logout()
+ self.logout_finished.emit(True, self.tr("Succeeded"))
+ return True
+ except Exception as e:
+ self.logout_finished.emit(False, "%s" % (e,))
+ return False
diff --git a/src/leap/bitmask/crypto/srpregister.py b/src/leap/bitmask/crypto/srpregister.py
new file mode 100644
index 00000000..c69294d7
--- /dev/null
+++ b/src/leap/bitmask/crypto/srpregister.py
@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+# srpregister.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import binascii
+import logging
+
+import requests
+import srp
+
+from PySide import QtCore
+from urlparse import urlparse
+
+from leap.bitmask.config.providerconfig import ProviderConfig
+from leap.bitmask.util.constants import SIGNUP_TIMEOUT
+from leap.common.check import leap_assert, leap_assert_type
+
+logger = logging.getLogger(__name__)
+
+
+class SRPRegister(QtCore.QObject):
+ """
+ Registers a user to a specific provider using SRP
+ """
+
+ USER_LOGIN_KEY = 'user[login]'
+ USER_VERIFIER_KEY = 'user[password_verifier]'
+ USER_SALT_KEY = 'user[password_salt]'
+
+ registration_finished = QtCore.Signal(bool, object)
+
+ def __init__(self,
+ provider_config=None,
+ register_path="users"):
+ """
+ Constructor
+
+ :param provider_config: provider configuration instance,
+ properly loaded
+ :type privider_config: ProviderConfig
+ :param register_path: webapp path for registering users
+ :type register_path; str
+ """
+ QtCore.QObject.__init__(self)
+ leap_assert(provider_config, "Please provide a provider")
+ leap_assert_type(provider_config, ProviderConfig)
+
+ self._provider_config = provider_config
+
+ # **************************************************** #
+ # Dependency injection helpers, override this for more
+ # granular testing
+ self._fetcher = requests
+ self._srp = srp
+ self._hashfun = self._srp.SHA256
+ self._ng = self._srp.NG_1024
+ # **************************************************** #
+
+ parsed_url = urlparse(provider_config.get_api_uri())
+ self._provider = parsed_url.hostname
+ self._port = parsed_url.port
+ if self._port is None:
+ self._port = "443"
+
+ self._register_path = register_path
+
+ self._session = self._fetcher.session()
+
+ def _get_registration_uri(self):
+ """
+ Returns the URI where the register request should be made for
+ the provider
+
+ :rtype: str
+ """
+
+ uri = "https://%s:%s/%s/%s" % (
+ self._provider,
+ self._port,
+ self._provider_config.get_api_version(),
+ self._register_path)
+
+ return uri
+
+ def register_user(self, username, password):
+ """
+ Registers a user with the validator based on the password provider
+
+ :param username: username to register
+ :type username: str
+ :param password: password for this username
+ :type password: str
+
+ :rtype: tuple
+ :rparam: (ok, request)
+ """
+ salt, verifier = self._srp.create_salted_verification_key(
+ username,
+ password,
+ self._hashfun,
+ self._ng)
+
+ user_data = {
+ self.USER_LOGIN_KEY: username,
+ self.USER_VERIFIER_KEY: binascii.hexlify(verifier),
+ self.USER_SALT_KEY: binascii.hexlify(salt)
+ }
+
+ uri = self._get_registration_uri()
+
+ logger.debug('Post to uri: %s' % uri)
+ logger.debug("Will try to register user = %s" % (username,))
+
+ ok = False
+ # This should be None, but we don't like when PySide segfaults,
+ # so it something else.
+ # To reproduce it, just do:
+ # self.registration_finished.emit(False, None)
+ req = []
+ try:
+ req = self._session.post(uri,
+ data=user_data,
+ timeout=SIGNUP_TIMEOUT,
+ verify=self._provider_config.
+ get_ca_cert_path())
+
+ except (requests.exceptions.SSLError,
+ requests.exceptions.ConnectionError) as exc:
+ logger.error(exc.message)
+ ok = False
+ else:
+ ok = req.ok
+
+ self.registration_finished.emit(ok, req)
+ return ok
+
+
+if __name__ == "__main__":
+ logger = logging.getLogger(name='leap')
+ logger.setLevel(logging.DEBUG)
+ console = logging.StreamHandler()
+ console.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(
+ '%(asctime)s '
+ '- %(name)s - %(levelname)s - %(message)s')
+ console.setFormatter(formatter)
+ logger.addHandler(console)
+
+ provider = ProviderConfig()
+
+ if provider.load("leap/providers/bitmask.net/provider.json"):
+ register = SRPRegister(provider_config=provider)
+ print "Registering user..."
+ print register.register_user("test1", "sarasaaaa")
+ print register.register_user("test2", "sarasaaaa")
diff --git a/src/leap/bitmask/crypto/tests/__init__.py b/src/leap/bitmask/crypto/tests/__init__.py
new file mode 100644
index 00000000..7f118735
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/__init__.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/src/leap/bitmask/crypto/tests/eip-service.json b/src/leap/bitmask/crypto/tests/eip-service.json
new file mode 100644
index 00000000..24df42a2
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/eip-service.json
@@ -0,0 +1,43 @@
+{
+ "gateways": [
+ {
+ "capabilities": {
+ "adblock": false,
+ "filter_dns": false,
+ "limited": true,
+ "ports": [
+ "1194",
+ "443",
+ "53",
+ "80"
+ ],
+ "protocols": [
+ "tcp",
+ "udp"
+ ],
+ "transport": [
+ "openvpn"
+ ],
+ "user_ips": false
+ },
+ "host": "harrier.cdev.bitmask.net",
+ "ip_address": "199.254.238.50",
+ "location": "seattle__wa"
+ }
+ ],
+ "locations": {
+ "seattle__wa": {
+ "country_code": "US",
+ "hemisphere": "N",
+ "name": "Seattle, WA",
+ "timezone": "-7"
+ }
+ },
+ "openvpn_configuration": {
+ "auth": "SHA1",
+ "cipher": "AES-128-CBC",
+ "tls-cipher": "DHE-RSA-AES128-SHA"
+ },
+ "serial": 1,
+ "version": 1
+} \ No newline at end of file
diff --git a/src/leap/bitmask/crypto/tests/fake_provider.py b/src/leap/bitmask/crypto/tests/fake_provider.py
new file mode 100755
index 00000000..54af485d
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/fake_provider.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# fake_provider.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""A server faking some of the provider resources and apis,
+used for testing Leap Client requests
+
+It needs that you create a subfolder named 'certs',
+and that you place the following files:
+
+XXX check if in use
+
+[ ] test-openvpn.pem
+[ ] test-provider.json
+[ ] test-eip-service.json
+"""
+import binascii
+import json
+import os
+import sys
+import time
+
+import srp
+
+from OpenSSL import SSL
+
+from zope.interface import Interface, Attribute, implements
+
+from twisted.web.server import Site, Request
+from twisted.web.static import File, Data
+from twisted.web.resource import Resource
+from twisted.internet import reactor
+
+from leap.common.testing.https_server import where
+
+# See
+# http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.html
+# for more examples
+
+"""
+Testing the FAKE_API:
+#####################
+
+ 1) register an user
+ >> curl -d "user[login]=me" -d "user[password_salt]=foo" \
+ -d "user[password_verifier]=beef" http://localhost:8000/1/users
+ << {"errors": null}
+
+ 2) check that if you try to register again, it will fail:
+ >> curl -d "user[login]=me" -d "user[password_salt]=foo" \
+ -d "user[password_verifier]=beef" http://localhost:8000/1/users
+ << {"errors": {"login": "already taken!"}}
+
+"""
+
+# Globals to mock user/sessiondb
+
+_USERDB = {}
+_SESSIONDB = {}
+
+_here = os.path.split(__file__)[0]
+
+
+safe_unhexlify = lambda x: binascii.unhexlify(x) \
+ if (len(x) % 2 == 0) else binascii.unhexlify('0' + x)
+
+
+class IUser(Interface):
+ """
+ Defines the User Interface
+ """
+ login = Attribute("User login.")
+ salt = Attribute("Password salt.")
+ verifier = Attribute("Password verifier.")
+ session = Attribute("Session.")
+ svr = Attribute("Server verifier.")
+
+
+class User(object):
+ """
+ User object.
+ We store it in our simple session mocks
+ """
+
+ implements(IUser)
+
+ def __init__(self, login, salt, verifier):
+ self.login = login
+ self.salt = salt
+ self.verifier = verifier
+ self.session = None
+ self.svr = None
+
+ def set_server_verifier(self, svr):
+ """
+ Adds a svr verifier object to this
+ User instance
+ """
+ self.svr = svr
+
+ def set_session(self, session):
+ """
+ Adds this instance of User to the
+ global session dict
+ """
+ _SESSIONDB[session] = self
+ self.session = session
+
+
+class FakeUsers(Resource):
+ """
+ Resource that handles user registration.
+ """
+
+ def __init__(self, name):
+ self.name = name
+
+ def render_POST(self, request):
+ """
+ Handles POST to the users api resource
+ Simulates a login.
+ """
+ args = request.args
+
+ login = args['user[login]'][0]
+ salt = args['user[password_salt]'][0]
+ verifier = args['user[password_verifier]'][0]
+
+ if login in _USERDB:
+ request.setResponseCode(422)
+ return "%s\n" % json.dumps(
+ {'errors': {'login': 'already taken!'}})
+
+ print '[server]', login, verifier, salt
+ user = User(login, salt, verifier)
+ _USERDB[login] = user
+ return json.dumps({'errors': None})
+
+
+def getSession(self, sessionInterface=None):
+ """
+ we overwrite twisted.web.server.Request.getSession method to
+ put the right cookie name in place
+ """
+ if not self.session:
+ #cookiename = b"_".join([b'TWISTED_SESSION'] + self.sitepath)
+ cookiename = b"_".join([b'_session_id'] + self.sitepath)
+ sessionCookie = self.getCookie(cookiename)
+ if sessionCookie:
+ try:
+ self.session = self.site.getSession(sessionCookie)
+ except KeyError:
+ pass
+ # if it still hasn't been set, fix it up.
+ if not self.session:
+ self.session = self.site.makeSession()
+ self.addCookie(cookiename, self.session.uid, path=b'/')
+ self.session.touch()
+ if sessionInterface:
+ return self.session.getComponent(sessionInterface)
+ return self.session
+
+
+def get_user(request):
+ """
+ Returns user from the session dict
+ """
+ login = request.args.get('login')
+ if login:
+ user = _USERDB.get(login[0], None)
+ if user:
+ return user
+
+ request.getSession = getSession.__get__(request, Request)
+ session = request.getSession()
+
+ user = _SESSIONDB.get(session, None)
+ return user
+
+
+class FakeSession(Resource):
+ def __init__(self, name):
+ """
+ Initializes session
+ """
+ self.name = name
+
+ def render_GET(self, request):
+ """
+ Handles GET requests.
+ """
+ return "%s\n" % json.dumps({'errors': None})
+
+ def render_POST(self, request):
+ """
+ Handles POST requests.
+ """
+ user = get_user(request)
+
+ if not user:
+ # XXX get real error from demo provider
+ return json.dumps({'errors': 'no such user'})
+
+ A = request.args['A'][0]
+
+ _A = safe_unhexlify(A)
+ _salt = safe_unhexlify(user.salt)
+ _verifier = safe_unhexlify(user.verifier)
+
+ svr = srp.Verifier(
+ user.login,
+ _salt,
+ _verifier,
+ _A,
+ hash_alg=srp.SHA256,
+ ng_type=srp.NG_1024)
+
+ s, B = svr.get_challenge()
+
+ _B = binascii.hexlify(B)
+
+ print '[server] login = %s' % user.login
+ print '[server] salt = %s' % user.salt
+ print '[server] len(_salt) = %s' % len(_salt)
+ print '[server] vkey = %s' % user.verifier
+ print '[server] len(vkey) = %s' % len(_verifier)
+ print '[server] s = %s' % binascii.hexlify(s)
+ print '[server] B = %s' % _B
+ print '[server] len(B) = %s' % len(_B)
+
+ # override Request.getSession
+ request.getSession = getSession.__get__(request, Request)
+ session = request.getSession()
+
+ user.set_session(session)
+ user.set_server_verifier(svr)
+
+ # yep, this is tricky.
+ # some things are *already* unhexlified.
+ data = {
+ 'salt': user.salt,
+ 'B': _B,
+ 'errors': None}
+
+ return json.dumps(data)
+
+ def render_PUT(self, request):
+ """
+ Handles PUT requests.
+ """
+ # XXX check session???
+ user = get_user(request)
+
+ if not user:
+ print '[server] NO USER'
+ return json.dumps({'errors': 'no such user'})
+
+ data = request.content.read()
+ auth = data.split("client_auth=")
+ M = auth[1] if len(auth) > 1 else None
+ # if not H, return
+ if not M:
+ return json.dumps({'errors': 'no M proof passed by client'})
+
+ svr = user.svr
+ HAMK = svr.verify_session(binascii.unhexlify(M))
+ if HAMK is None:
+ print '[server] verification failed!!!'
+ raise Exception("Authentication failed!")
+ #import ipdb;ipdb.set_trace()
+
+ assert svr.authenticated()
+ print "***"
+ print '[server] User successfully authenticated using SRP!'
+ print "***"
+
+ return json.dumps(
+ {'M2': binascii.hexlify(HAMK),
+ 'id': '9c943eb9d96a6ff1b7a7030bdeadbeef',
+ 'errors': None})
+
+
+class API_Sessions(Resource):
+ """
+ Top resource for the API v1
+ """
+ def getChild(self, name, request):
+ return FakeSession(name)
+
+
+class FileModified(File):
+ def render_GET(self, request):
+ since = request.getHeader('if-modified-since')
+ if since:
+ tsince = time.strptime(since.replace(" GMT", ""))
+ tfrom = time.strptime(time.ctime(os.path.getmtime(self.path)))
+ if tfrom > tsince:
+ return File.render_GET(self, request)
+ else:
+ request.setResponseCode(304)
+ return ""
+ return File.render_GET(self, request)
+
+
+class OpenSSLServerContextFactory(object):
+
+ def getContext(self):
+ """
+ Create an SSL context.
+ """
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ #ctx = SSL.Context(SSL.TLSv1_METHOD)
+ ctx.use_certificate_file(where('leaptestscert.pem'))
+ ctx.use_privatekey_file(where('leaptestskey.pem'))
+
+ return ctx
+
+
+def get_provider_factory():
+ """
+ Instantiates a Site that serves the resources
+ that we expect from a valid provider.
+ Listens on:
+ * port 8000 for http connections
+ * port 8443 for https connections
+
+ :rparam: factory for a site
+ :rtype: Site instance
+ """
+ root = Data("", "")
+ root.putChild("", root)
+ root.putChild("provider.json", FileModified(
+ os.path.join(_here,
+ "test_provider.json")))
+ config = Resource()
+ config.putChild(
+ "eip-service.json",
+ FileModified(
+ os.path.join(_here, "eip-service.json")))
+ apiv1 = Resource()
+ apiv1.putChild("config", config)
+ apiv1.putChild("sessions", API_Sessions())
+ apiv1.putChild("users", FakeUsers(None))
+ apiv1.putChild("cert", FileModified(
+ os.path.join(_here,
+ 'openvpn.pem')))
+ root.putChild("1", apiv1)
+
+ factory = Site(root)
+ return factory
+
+
+if __name__ == "__main__":
+
+ from twisted.python import log
+ log.startLogging(sys.stdout)
+
+ factory = get_provider_factory()
+
+ # regular http (for debugging with curl)
+ reactor.listenTCP(8000, factory)
+ reactor.listenSSL(8443, factory, OpenSSLServerContextFactory())
+ reactor.run()
diff --git a/src/leap/bitmask/crypto/tests/openvpn.pem b/src/leap/bitmask/crypto/tests/openvpn.pem
new file mode 100644
index 00000000..a95e9370
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/openvpn.pem
@@ -0,0 +1,33 @@
+-----BEGIN CERTIFICATE-----
+MIIFtTCCA52gAwIBAgIJAIGJ8Dg+DtemMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI2MjAyMDIyWhcNMTgwNjI2MjAyMDIyWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEAxJaN0lWjFu+3j48c0WG8BvmPUf026Xli5d5NE4EjGsirwfre0oTeWZT9
+WRxqLGd2wDh6Mc9r6UqH6dwqLZKbsgwB5zI2lag7UWFttJF1U1c6AJynhaLMoy73
+sL9USTmQ57iYRFrVP/nGj9/L6I1XnV6midPi7a5aZreH9q8dWaAhmc9eFDU+Y4vS
+sTFS6aomajLrI6YWo5toKqLq8IMryD03IM78a7gJtLgfWs+pYZRUBlM5JaYX98eX
+mVPAYYH9krWxLVN3hTt1ngECzK+epo275zQJh960/2fNCfVJSXqSXcficLs+bR7t
+FEkNuOP1hFV6LuoLL+k5Su+hp5kXMYZTvYYDpW4nPJoBdSG1w5O5IxO6zh+9VLB7
+oLrlgoyWvBoou5coCBpZVU6UyWcOx58kuZF8wNr0GgdvWAFwOGVuVG5jmcVdhaKC
+0C8NxHrxlhcrcp0zwtDaOxfmZfcxiXs35iwUip5vS18Nv+XBK8ad9T79Ox8nSzP3
+RGPVDpExz7gPbZglqSe47XBIk0ZuIzgOgYpJj4JrpoewoIYb+OmUgI7UZjoGsMrV
++B2BqOKs7kF0HW3i5bR9YAi0ZYvnhQgjBtwCKm4zvLqwuPZHz9VWgIk6uezgStCP
+WyzQ8IcopK49fOjcKa6JT5JRU+27paIZf1BkQsTkJy/Nti4TvwMCAwEAAaOBpzCB
+pDAdBgNVHQ4EFgQUEgXSd3Yl3xAzbkWa7xeNe27d99cwdQYDVR0jBG4wbIAUEgXS
+d3Yl3xAzbkWa7xeNe27d99ehSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT
+b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCB
+ifA4Pg7XpjAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQA6Vl9Ve4Qe
+ewzXAxr0BabFRhtIuF7DV+/niT46qJhW2KgYe6rwZqdAhEbgH3kTPJ5JmmcUnAEH
+nmrfoku/YAb5ObfdHUACsHy4cvSvFwBUQ9vXP6+oOFJhrGW4uzRI2pHGvnqB3lQ0
+JEPmPwduBCI5reRYauPbd4Wl4VhLGrjELb4JQZL24Q5ehXMnv415m7+aMkLzT2IA
+p6B2xgRR+JAeUdyCNOV1f5AqJWyAUJPWGR0e1OTKNfc49+2skK0NmzrpGsoktSHa
+uN6vGBCVGiZh7BTYblWMG5q9Am7idcdmC2fdpIf5yj7CKzV7WIPxPs0I7TuRcr41
+pUBLCAElcyCPB89lySol2BDs4gk4wZs4y2shUs3o0+mIpw/6o8tQF/9IL8ALkLqr
+q9SuND7O1RXcg74o3HeVmRKtoI/KdgaVhJ0rFvcq83ftfu3KMyWB6SOKOu6ZYON8
+AcSjsDDpnDrwGFvjAYHiTkS9NaaJC1/g7Y6jjhxmbTkXPA6V8MvLKQiOvqk/9gCh
+85FHsFkElIYnH6fbHIRxg20cnqmddTd+H5HgBIlhiKWuydtuoQFwzR/D3ypgLBaB
+OWLcBP7I+RYhKlJFIWnfiyB0xbyI4W/UfL8p8jQI8TE9oIlm3WqxJXfebDEDEstj
+8nS4Fb3G5Wr4pZMjfbtmBSAgHeWH6B90jg==
+-----END CERTIFICATE-----
diff --git a/src/leap/bitmask/crypto/tests/test_provider.json b/src/leap/bitmask/crypto/tests/test_provider.json
new file mode 100644
index 00000000..c37bef8f
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/test_provider.json
@@ -0,0 +1,15 @@
+{
+ "api_uri": "https://localhost:8443",
+ "api_version": "1",
+ "ca_cert_fingerprint": "SHA256: 0f17c033115f6b76ff67871872303ff65034efe7dd1b910062ca323eb4da5c7e",
+ "ca_cert_uri": "https://bitmask.net/ca.crt",
+ "default_language": "en",
+ "domain": "example.com",
+ "enrollment_policy": "open",
+ "name": {
+ "en": "Bitmask"
+ },
+ "services": [
+ "openvpn"
+ ]
+}
diff --git a/src/leap/bitmask/crypto/tests/test_srpauth.py b/src/leap/bitmask/crypto/tests/test_srpauth.py
new file mode 100644
index 00000000..043da15e
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/test_srpauth.py
@@ -0,0 +1,791 @@
+# -*- coding: utf-8 -*-
+# test_srpauth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/crypto/srpauth.py
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+import os
+import sys
+import binascii
+import requests
+import mock
+
+from functools import partial
+
+from mock import MagicMock
+from nose.twistedtools import reactor, deferred
+from twisted.python import log
+from twisted.internet import threads
+from requests.models import Response
+from simplejson.decoder import JSONDecodeError
+
+from leap.bitmask.config.providerconfig import ProviderConfig
+from leap.bitmask.crypto import srpregister, srpauth
+from leap.bitmask.crypto.tests import fake_provider
+from leap.bitmask.util.request_helpers import get_content
+from leap.common.testing.https_server import where
+
+log.startLogging(sys.stdout)
+
+
+def _get_capath():
+ return where("cacert.pem")
+
+_here = os.path.split(__file__)[0]
+
+
+class ImproperlyConfiguredError(Exception):
+ """
+ Raised if the test provider is missing configuration
+ """
+
+
+class SRPAuthTestCase(unittest.TestCase):
+ """
+ Tests for the SRPAuth class
+ """
+ __name__ = "SRPAuth tests"
+
+ def setUp(self):
+ """
+ Sets up this TestCase with a simple and faked provider instance:
+
+ * runs a threaded reactor
+ * loads a mocked ProviderConfig that points to the certs in the
+ leap.common.testing module.
+ """
+ factory = fake_provider.get_provider_factory()
+ http = reactor.listenTCP(0, factory)
+ https = reactor.listenSSL(
+ 0, factory,
+ fake_provider.OpenSSLServerContextFactory())
+ get_port = lambda p: p.getHost().port
+ self.http_port = get_port(http)
+ self.https_port = get_port(https)
+
+ provider = ProviderConfig()
+ provider.get_ca_cert_path = mock.create_autospec(
+ provider.get_ca_cert_path)
+ provider.get_ca_cert_path.return_value = _get_capath()
+
+ provider.get_api_uri = mock.create_autospec(
+ provider.get_api_uri)
+ provider.get_api_uri.return_value = self._get_https_uri()
+
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+ self.register = srpregister.SRPRegister(provider_config=provider)
+ self.provider = provider
+ self.TEST_USER = "register_test_auth"
+ self.TEST_PASS = "pass"
+
+ # Reset the singleton
+ srpauth.SRPAuth._SRPAuth__instance = None
+ self.auth = srpauth.SRPAuth(self.provider)
+ self.auth_backend = self.auth._SRPAuth__instance
+
+ self.old_post = self.auth_backend._session.post
+ self.old_put = self.auth_backend._session.put
+ self.old_delete = self.auth_backend._session.delete
+
+ self.old_start_auth = self.auth_backend._start_authentication
+ self.old_proc_challenge = self.auth_backend._process_challenge
+ self.old_extract_data = self.auth_backend._extract_data
+ self.old_verify_session = self.auth_backend._verify_session
+ self.old_auth_preproc = self.auth_backend._authentication_preprocessing
+ self.old_get_sid = self.auth_backend.get_session_id
+ self.old_cookie_get = self.auth_backend._session.cookies.get
+ self.old_auth = self.auth_backend.authenticate
+
+ def tearDown(self):
+ self.auth_backend._session.post = self.old_post
+ self.auth_backend._session.put = self.old_put
+ self.auth_backend._session.delete = self.old_delete
+
+ self.auth_backend._start_authentication = self.old_start_auth
+ self.auth_backend._process_challenge = self.old_proc_challenge
+ self.auth_backend._extract_data = self.old_extract_data
+ self.auth_backend._verify_session = self.old_verify_session
+ self.auth_backend._authentication_preprocessing = self.old_auth_preproc
+ self.auth_backend.get_session_id = self.old_get_sid
+ self.auth_backend._session.cookies.get = self.old_cookie_get
+ self.auth_backend.authenticate = self.old_auth
+
+ # helper methods
+
+ def _get_https_uri(self):
+ """
+ Returns a https uri with the right https port initialized
+ """
+ return "https://localhost:%s" % (self.https_port,)
+
+ # Auth tests
+
+ def _prepare_auth_test(self, code=200, side_effect=None):
+ """
+ Creates the needed defers to test several test situations. It
+ adds up to the auth preprocessing step.
+
+ :param code: status code for the response of POST in requests
+ :type code: int
+ :param side_effect: side effect triggered by the POST method
+ in requests
+ :type side_effect: some kind of Exception
+
+ :returns: the defer that is created
+ :rtype: defer.Deferred
+ """
+ res = Response()
+ res.status_code = code
+ self.auth_backend._session.post = mock.create_autospec(
+ self.auth_backend._session.post,
+ return_value=res,
+ side_effect=side_effect)
+
+ d = threads.deferToThread(self.register.register_user,
+ self.TEST_USER,
+ self.TEST_PASS)
+
+ def wrapper_preproc(*args):
+ return threads.deferToThread(
+ self.auth_backend._authentication_preprocessing,
+ self.TEST_USER, self.TEST_PASS)
+
+ d.addCallback(wrapper_preproc)
+
+ return d
+
+ def test_safe_unhexlify(self):
+ input_value = "somestring"
+ test_value = binascii.hexlify(input_value)
+ self.assertEqual(
+ self.auth_backend._safe_unhexlify(test_value),
+ input_value)
+
+ def test_safe_unhexlify_not_raises(self):
+ input_value = "somestring"
+ test_value = binascii.hexlify(input_value)[:-1]
+
+ with self.assertRaises(TypeError):
+ binascii.unhexlify(test_value)
+
+ self.auth_backend._safe_unhexlify(test_value)
+
+ def test_preprocessing_loads_a(self):
+ self.assertEqual(self.auth_backend._srp_a, None)
+ self.auth_backend._authentication_preprocessing("user", "pass")
+ self.assertIsNotNone(self.auth_backend._srp_a)
+ self.assertTrue(len(self.auth_backend._srp_a) > 0)
+
+ @deferred()
+ def test_start_authentication(self):
+ d = threads.deferToThread(self.register.register_user, self.TEST_USER,
+ self.TEST_PASS)
+
+ def wrapper_preproc(*args):
+ return threads.deferToThread(
+ self.auth_backend._authentication_preprocessing,
+ self.TEST_USER, self.TEST_PASS)
+
+ d.addCallback(wrapper_preproc)
+
+ def wrapper(_):
+ return threads.deferToThread(
+ self.auth_backend._start_authentication,
+ None, self.TEST_USER)
+
+ d.addCallback(wrapper)
+ return d
+
+ @deferred()
+ def test_start_authentication_fails_connerror(self):
+ d = self._prepare_auth_test(
+ side_effect=requests.exceptions.ConnectionError())
+
+ def wrapper(_):
+ with self.assertRaises(srpauth.SRPAuthConnectionError):
+ self.auth_backend._start_authentication(None, self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_start_authentication_fails_any_error(self):
+ d = self._prepare_auth_test(side_effect=Exception())
+
+ def wrapper(_):
+ with self.assertRaises(srpauth.SRPAuthenticationError):
+ self.auth_backend._start_authentication(None, self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_start_authentication_fails_unknown_user(self):
+ d = self._prepare_auth_test(422)
+
+ def wrapper(_):
+ with self.assertRaises(srpauth.SRPAuthUnknownUser):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("{}", 0)
+
+ self.auth_backend._start_authentication(
+ None, self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_start_authentication_fails_errorcode(self):
+ d = self._prepare_auth_test(302)
+
+ def wrapper(_):
+ with self.assertRaises(srpauth.SRPAuthBadStatusCode):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("{}", 0)
+
+ self.auth_backend._start_authentication(None,
+ self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_start_authentication_fails_no_salt(self):
+ d = self._prepare_auth_test(200)
+
+ def wrapper(_):
+ with self.assertRaises(srpauth.SRPAuthNoSalt):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("{}", 0)
+
+ self.auth_backend._start_authentication(None,
+ self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_start_authentication_fails_no_B(self):
+ d = self._prepare_auth_test(200)
+
+ def wrapper(_):
+ with self.assertRaises(srpauth.SRPAuthNoB):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ('{"salt": ""}', 0)
+
+ self.auth_backend._start_authentication(None,
+ self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_start_authentication_correct_saltb(self):
+ d = self._prepare_auth_test(200)
+
+ test_salt = "12345"
+ test_B = "67890"
+
+ def wrapper(_):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ('{"salt":"%s", "B":"%s"}' % (test_salt,
+ test_B),
+ 0)
+
+ salt, B = self.auth_backend._start_authentication(
+ None,
+ self.TEST_USER)
+ self.assertEqual(salt, test_salt)
+ self.assertEqual(B, test_B)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ def _prepare_auth_challenge(self):
+ """
+ Creates the needed defers to test several test situations. It
+ adds up to the start authentication step.
+
+ :returns: the defer that is created
+ :rtype: defer.Deferred
+ """
+ d = threads.deferToThread(self.register.register_user,
+ self.TEST_USER,
+ self.TEST_PASS)
+
+ def wrapper_preproc(*args):
+ return threads.deferToThread(
+ self.auth_backend._authentication_preprocessing,
+ self.TEST_USER, self.TEST_PASS)
+
+ d.addCallback(wrapper_preproc)
+
+ def wrapper_start(*args):
+ return threads.deferToThread(
+ self.auth_backend._start_authentication,
+ None, self.TEST_USER)
+
+ d.addCallback(wrapper_start)
+
+ return d
+
+ @deferred()
+ def test_process_challenge_wrong_saltb(self):
+ d = self._prepare_auth_challenge()
+
+ def wrapper(salt_B):
+ with self.assertRaises(srpauth.SRPAuthBadDataFromServer):
+ self.auth_backend._process_challenge("",
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+ return d
+
+ @deferred()
+ def test_process_challenge_requests_problem_raises(self):
+ d = self._prepare_auth_challenge()
+
+ self.auth_backend._session.put = mock.create_autospec(
+ self.auth_backend._session.put,
+ side_effect=requests.exceptions.ConnectionError())
+
+ def wrapper(salt_B):
+ with self.assertRaises(srpauth.SRPAuthConnectionError):
+ self.auth_backend._process_challenge(salt_B,
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+
+ return d
+
+ @deferred()
+ def test_process_challenge_json_decode_error(self):
+ d = self._prepare_auth_challenge()
+
+ def wrapper(salt_B):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("{", 0)
+ content.side_effect = JSONDecodeError("", "", 0)
+
+ with self.assertRaises(srpauth.SRPAuthJSONDecodeError):
+ self.auth_backend._process_challenge(
+ salt_B,
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+
+ return d
+
+ @deferred()
+ def test_process_challenge_bad_password(self):
+ d = self._prepare_auth_challenge()
+
+ res = Response()
+ res.status_code = 422
+ self.auth_backend._session.put = mock.create_autospec(
+ self.auth_backend._session.put,
+ return_value=res)
+
+ def wrapper(salt_B):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("", 0)
+ with self.assertRaises(srpauth.SRPAuthBadPassword):
+ self.auth_backend._process_challenge(
+ salt_B,
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+
+ return d
+
+ @deferred()
+ def test_process_challenge_bad_password2(self):
+ d = self._prepare_auth_challenge()
+
+ res = Response()
+ res.status_code = 422
+ self.auth_backend._session.put = mock.create_autospec(
+ self.auth_backend._session.put,
+ return_value=res)
+
+ def wrapper(salt_B):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("[]", 0)
+ with self.assertRaises(srpauth.SRPAuthBadPassword):
+ self.auth_backend._process_challenge(
+ salt_B,
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+
+ return d
+
+ @deferred()
+ def test_process_challenge_other_error_code(self):
+ d = self._prepare_auth_challenge()
+
+ res = Response()
+ res.status_code = 300
+ self.auth_backend._session.put = mock.create_autospec(
+ self.auth_backend._session.put,
+ return_value=res)
+
+ def wrapper(salt_B):
+ with mock.patch('leap.util.request_helpers.get_content',
+ new=mock.create_autospec(get_content)) as \
+ content:
+ content.return_value = ("{}", 0)
+ with self.assertRaises(srpauth.SRPAuthBadStatusCode):
+ self.auth_backend._process_challenge(
+ salt_B,
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+
+ return d
+
+ @deferred()
+ def test_process_challenge(self):
+ d = self._prepare_auth_challenge()
+
+ def wrapper(salt_B):
+ self.auth_backend._process_challenge(salt_B,
+ username=self.TEST_USER)
+
+ d.addCallback(partial(threads.deferToThread, wrapper))
+
+ return d
+
+ def test_extract_data_wrong_data(self):
+ with self.assertRaises(srpauth.SRPAuthBadDataFromServer):
+ self.auth_backend._extract_data(None)
+
+ with self.assertRaises(srpauth.SRPAuthBadDataFromServer):
+ self.auth_backend._extract_data("")
+
+ def test_extract_data_fails_on_wrong_data_from_server(self):
+ with self.assertRaises(srpauth.SRPAuthBadDataFromServer):
+ self.auth_backend._extract_data({})
+
+ with self.assertRaises(srpauth.SRPAuthBadDataFromServer):
+ self.auth_backend._extract_data({"M2": ""})
+
+ def test_extract_data_sets_uidtoken(self):
+ test_uid = "someuid"
+ test_m2 = "somem2"
+ test_token = "sometoken"
+ test_data = {
+ "M2": test_m2,
+ "id": test_uid,
+ "token": test_token
+ }
+ m2 = self.auth_backend._extract_data(test_data)
+
+ self.assertEqual(m2, test_m2)
+ self.assertEqual(self.auth_backend.get_uid(), test_uid)
+ self.assertEqual(self.auth_backend.get_uid(),
+ self.auth.get_uid())
+ self.assertEqual(self.auth_backend.get_token(), test_token)
+ self.assertEqual(self.auth_backend.get_token(),
+ self.auth.get_token())
+
+ def _prepare_verify_session(self):
+ """
+ Prepares the tests for verify session with needed steps
+ before. It adds up to the extract_data step.
+
+ :returns: The defer to chain to
+ :rtype: defer.Deferred
+ """
+ d = self._prepare_auth_challenge()
+
+ def wrapper_proc_challenge(salt_B):
+ return self.auth_backend._process_challenge(
+ salt_B,
+ username=self.TEST_USER)
+
+ def wrapper_extract_data(data):
+ return self.auth_backend._extract_data(data)
+
+ d.addCallback(partial(threads.deferToThread, wrapper_proc_challenge))
+ d.addCallback(partial(threads.deferToThread, wrapper_extract_data))
+
+ return d
+
+ @deferred()
+ def test_verify_session_unhexlifiable_m2(self):
+ d = self._prepare_verify_session()
+
+ def wrapper(M2):
+ with self.assertRaises(srpauth.SRPAuthBadDataFromServer):
+ self.auth_backend._verify_session("za") # unhexlifiable value
+
+ d.addCallback(wrapper)
+
+ return d
+
+ @deferred()
+ def test_verify_session_unverifiable_m2(self):
+ d = self._prepare_verify_session()
+
+ def wrapper(M2):
+ with self.assertRaises(srpauth.SRPAuthVerificationFailed):
+ # Correctly unhelifiable value, but not for verifying the
+ # session
+ self.auth_backend._verify_session("abc12")
+
+ d.addCallback(wrapper)
+
+ return d
+
+ @deferred()
+ def test_verify_session_fails_on_no_session_id(self):
+ d = self._prepare_verify_session()
+
+ def wrapper(M2):
+ self.auth_backend._session.cookies.get = mock.create_autospec(
+ self.auth_backend._session.cookies.get,
+ return_value=None)
+ with self.assertRaises(srpauth.SRPAuthNoSessionId):
+ self.auth_backend._verify_session(M2)
+
+ d.addCallback(wrapper)
+
+ return d
+
+ @deferred()
+ def test_verify_session_session_id(self):
+ d = self._prepare_verify_session()
+
+ test_session_id = "12345"
+
+ def wrapper(M2):
+ self.auth_backend._session.cookies.get = mock.create_autospec(
+ self.auth_backend._session.cookies.get,
+ return_value=test_session_id)
+ self.auth_backend._verify_session(M2)
+ self.assertEqual(self.auth_backend.get_session_id(),
+ test_session_id)
+ self.assertEqual(self.auth_backend.get_session_id(),
+ self.auth.get_session_id())
+
+ d.addCallback(wrapper)
+
+ return d
+
+ @deferred()
+ def test_verify_session(self):
+ d = self._prepare_verify_session()
+
+ def wrapper(M2):
+ self.auth_backend._verify_session(M2)
+
+ d.addCallback(wrapper)
+
+ return d
+
+ @deferred()
+ def test_authenticate(self):
+ self.auth_backend._authentication_preprocessing = mock.create_autospec(
+ self.auth_backend._authentication_preprocessing,
+ return_value=None)
+ self.auth_backend._start_authentication = mock.create_autospec(
+ self.auth_backend._start_authentication,
+ return_value=None)
+ self.auth_backend._process_challenge = mock.create_autospec(
+ self.auth_backend._process_challenge,
+ return_value=None)
+ self.auth_backend._extract_data = mock.create_autospec(
+ self.auth_backend._extract_data,
+ return_value=None)
+ self.auth_backend._verify_session = mock.create_autospec(
+ self.auth_backend._verify_session,
+ return_value=None)
+
+ d = self.auth_backend.authenticate(self.TEST_USER, self.TEST_PASS)
+
+ def check(*args):
+ self.auth_backend._authentication_preprocessing.\
+ assert_called_once_with(
+ username=self.TEST_USER,
+ password=self.TEST_PASS
+ )
+ self.auth_backend._start_authentication.assert_called_once_with(
+ None,
+ username=self.TEST_USER)
+ self.auth_backend._process_challenge.assert_called_once_with(
+ None,
+ username=self.TEST_USER)
+ self.auth_backend._extract_data.assert_called_once_with(
+ None)
+ self.auth_backend._verify_session.assert_called_once_with(None)
+
+ d.addCallback(check)
+
+ return d
+
+ @deferred()
+ def test_logout_fails_if_not_logged_in(self):
+
+ def wrapper(*args):
+ with self.assertRaises(AssertionError):
+ self.auth_backend.logout()
+
+ d = threads.deferToThread(wrapper)
+ return d
+
+ @deferred()
+ def test_logout_traps_delete(self):
+ self.auth_backend.get_session_id = mock.create_autospec(
+ self.auth_backend.get_session_id,
+ return_value="1234")
+ self.auth_backend._session.delete = mock.create_autospec(
+ self.auth_backend._session.delete,
+ side_effect=Exception())
+
+ def wrapper(*args):
+ self.auth_backend.logout()
+
+ d = threads.deferToThread(wrapper)
+ return d
+
+ @deferred()
+ def test_logout_clears(self):
+ self.auth_backend._session_id = "1234"
+
+ def wrapper(*args):
+ old_session = self.auth_backend._session
+ self.auth_backend.logout()
+ self.assertIsNone(self.auth_backend.get_session_id())
+ self.assertIsNone(self.auth_backend.get_uid())
+ self.assertNotEqual(old_session, self.auth_backend._session)
+
+ d = threads.deferToThread(wrapper)
+ return d
+
+
+class SRPAuthSingletonTestCase(unittest.TestCase):
+ def setUp(self):
+ self.old_auth = srpauth.SRPAuth._SRPAuth__impl.authenticate
+
+ def tearDown(self):
+ srpauth.SRPAuth._SRPAuth__impl.authenticate = self.old_auth
+
+ def test_singleton(self):
+ obj1 = srpauth.SRPAuth(ProviderConfig())
+ obj2 = srpauth.SRPAuth(ProviderConfig())
+ self.assertEqual(obj1._SRPAuth__instance, obj2._SRPAuth__instance)
+
+ @deferred()
+ def test_authenticate_notifies_gui(self):
+ auth = srpauth.SRPAuth(ProviderConfig())
+ auth._SRPAuth__instance.authenticate = mock.create_autospec(
+ auth._SRPAuth__instance.authenticate,
+ return_value=threads.deferToThread(lambda: None))
+ auth._gui_notify = mock.create_autospec(
+ auth._gui_notify)
+
+ d = auth.authenticate("", "")
+
+ def check(*args):
+ auth._gui_notify.assert_called_once_with(None)
+
+ d.addCallback(check)
+ return d
+
+ @deferred()
+ def test_authenticate_errsback(self):
+ auth = srpauth.SRPAuth(ProviderConfig())
+ auth._SRPAuth__instance.authenticate = mock.create_autospec(
+ auth._SRPAuth__instance.authenticate,
+ return_value=threads.deferToThread(MagicMock(
+ side_effect=Exception())))
+ auth._gui_notify = mock.create_autospec(
+ auth._gui_notify)
+ auth._errback = mock.create_autospec(
+ auth._errback)
+
+ d = auth.authenticate("", "")
+
+ def check(*args):
+ self.assertFalse(auth._gui_notify.called)
+ self.assertEqual(auth._errback.call_count, 1)
+
+ d.addCallback(check)
+ return d
+
+ @deferred()
+ def test_authenticate_runs_cleanly_when_raises(self):
+ auth = srpauth.SRPAuth(ProviderConfig())
+ auth._SRPAuth__instance.authenticate = mock.create_autospec(
+ auth._SRPAuth__instance.authenticate,
+ return_value=threads.deferToThread(MagicMock(
+ side_effect=Exception())))
+
+ d = auth.authenticate("", "")
+
+ return d
+
+ @deferred()
+ def test_authenticate_runs_cleanly(self):
+ auth = srpauth.SRPAuth(ProviderConfig())
+ auth._SRPAuth__instance.authenticate = mock.create_autospec(
+ auth._SRPAuth__instance.authenticate,
+ return_value=threads.deferToThread(MagicMock()))
+
+ d = auth.authenticate("", "")
+
+ return d
+
+ def test_logout(self):
+ auth = srpauth.SRPAuth(ProviderConfig())
+ auth._SRPAuth__instance.logout = mock.create_autospec(
+ auth._SRPAuth__instance.logout)
+
+ self.assertTrue(auth.logout())
+
+ def test_logout_rets_false_when_raises(self):
+ auth = srpauth.SRPAuth(ProviderConfig())
+ auth._SRPAuth__instance.logout = mock.create_autospec(
+ auth._SRPAuth__instance.logout,
+ side_effect=Exception())
+
+ self.assertFalse(auth.logout())
diff --git a/src/leap/bitmask/crypto/tests/test_srpregister.py b/src/leap/bitmask/crypto/tests/test_srpregister.py
new file mode 100644
index 00000000..4d6e7be3
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/test_srpregister.py
@@ -0,0 +1,201 @@
+# -*- coding: utf-8 -*-
+# test_srpregister.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/crypto/srpregister.py
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+import os
+import sys
+
+from mock import MagicMock
+from nose.twistedtools import reactor, deferred
+from twisted.python import log
+from twisted.internet import threads
+
+from leap.bitmask.config.providerconfig import ProviderConfig
+from leap.bitmask.crypto import srpregister, srpauth
+from leap.bitmask.crypto.tests import fake_provider
+from leap.common.testing.https_server import where
+
+log.startLogging(sys.stdout)
+
+
+def _get_capath():
+ return where("cacert.pem")
+
+_here = os.path.split(__file__)[0]
+
+
+class ImproperlyConfiguredError(Exception):
+ """
+ Raised if the test provider is missing configuration
+ """
+
+
+class SRPTestCase(unittest.TestCase):
+ """
+ Tests for the SRPRegister class
+ """
+ __name__ = "SRPRegister tests"
+
+ @classmethod
+ def setUpClass(cls):
+ """
+ Sets up this TestCase with a simple and faked provider instance:
+
+ * runs a threaded reactor
+ * loads a mocked ProviderConfig that points to the certs in the
+ leap.common.testing module.
+ """
+ factory = fake_provider.get_provider_factory()
+ http = reactor.listenTCP(8001, factory)
+ https = reactor.listenSSL(
+ 0, factory,
+ fake_provider.OpenSSLServerContextFactory())
+ get_port = lambda p: p.getHost().port
+ cls.http_port = get_port(http)
+ cls.https_port = get_port(https)
+
+ provider = ProviderConfig()
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = _get_capath()
+
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = cls._get_https_uri()
+
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+ cls.register = srpregister.SRPRegister(provider_config=provider)
+
+ cls.auth = srpauth.SRPAuth(provider)
+
+ # helper methods
+
+ @classmethod
+ def _get_https_uri(cls):
+ """
+ Returns a https uri with the right https port initialized
+ """
+ return "https://localhost:%s" % (cls.https_port,)
+
+ # Register tests
+
+ def test_none_port(self):
+ provider = ProviderConfig()
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = "http://localhost/"
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+ self.assertEquals(register._port, "443")
+
+ @deferred()
+ def test_wrong_cert(self):
+ provider = ProviderConfig()
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = os.path.join(
+ _here,
+ "wrongcert.pem")
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = self._get_https_uri()
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+ d = threads.deferToThread(register.register_user, "foouser_firsttime",
+ "barpass")
+ d.addCallback(self.assertFalse)
+ return d
+
+ @deferred()
+ def test_register_user(self):
+ """
+ Checks if the registration of an unused name works as expected when
+ it is the first time that we attempt to register that user, as well as
+ when we request a user that is taken.
+ """
+ # pristine registration
+ d = threads.deferToThread(self.register.register_user,
+ "foouser_firsttime",
+ "barpass")
+ d.addCallback(self.assertTrue)
+ return d
+
+ @deferred()
+ def test_second_register_user(self):
+ # second registration attempt with the same user should return errors
+ d = threads.deferToThread(self.register.register_user,
+ "foouser_second",
+ "barpass")
+ d.addCallback(self.assertTrue)
+
+ # FIXME currently we are catching this in an upper layer,
+ # we could bring the error validation to the SRPRegister class
+ def register_wrapper(_):
+ return threads.deferToThread(self.register.register_user,
+ "foouser_second",
+ "barpass")
+ d.addCallback(register_wrapper)
+ d.addCallback(self.assertFalse)
+ return d
+
+ @deferred()
+ def test_correct_http_uri(self):
+ """
+ Checks that registration autocorrect http uris to https ones.
+ """
+ HTTP_URI = "http://localhost:%s" % (self.https_port, )
+ HTTPS_URI = "https://localhost:%s/1/users" % (self.https_port, )
+ provider = ProviderConfig()
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = _get_capath()
+ provider.get_api_uri = MagicMock()
+
+ # we introduce a http uri in the config file...
+ provider.get_api_uri.return_value = HTTP_URI
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+
+ # ... and we check that we're correctly taking the HTTPS protocol
+ # instead
+ reg_uri = register._get_registration_uri()
+ self.assertEquals(reg_uri, HTTPS_URI)
+ register._get_registration_uri = MagicMock(return_value=HTTPS_URI)
+ d = threads.deferToThread(register.register_user, "test_failhttp",
+ "barpass")
+ d.addCallback(self.assertTrue)
+
+ return d
diff --git a/src/leap/bitmask/crypto/tests/wrongcert.pem b/src/leap/bitmask/crypto/tests/wrongcert.pem
new file mode 100644
index 00000000..e6cff38a
--- /dev/null
+++ b/src/leap/bitmask/crypto/tests/wrongcert.pem
@@ -0,0 +1,33 @@
+-----BEGIN CERTIFICATE-----
+MIIFtTCCA52gAwIBAgIJAIWZus5EIXNtMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI1MTc0NjExWhcNMTgwNjI1MTc0NjExWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEA2ObM7ESjyuxFZYD/Y68qOPQgjgggW+cdXfBpU2p4n7clsrUeMhWdW40Y
+77Phzor9VOeqs3ZpHuyLzsYVp/kFDm8tKyo2ah5fJwzL0VCSLYaZkUQQ7GNUmTCk
+furaxl8cQx/fg395V7/EngsS9B3/y5iHbctbA4MnH3jaotO5EGeo6hw7/eyCotQ9
+KbBV9GJMcY94FsXBCmUB+XypKklWTLhSaS6Cu4Fo8YLW6WmcnsyEOGS2F7WVf5at
+7CBWFQZHaSgIBLmc818/mDYCnYmCVMFn/6Ndx7V2NTlz+HctWrQn0dmIOnCUeCwS
+wXq9PnBR1rSx/WxwyF/WpyjOFkcIo7vm72kS70pfrYsXcZD4BQqkXYj3FyKnPt3O
+ibLKtCxL8/83wOtErPcYpG6LgFkgAAlHQ9MkUi5dbmjCJtpqQmlZeK1RALdDPiB3
+K1KZimrGsmcE624dJxUIOJJpuwJDy21F8kh5ZAsAtE1prWETrQYNElNFjQxM83rS
+ZR1Ql2MPSB4usEZT57+KvpEzlOnAT3elgCg21XrjSFGi14hCEao4g2OEZH5GAwm5
+frf6UlSRZ/g3tLTfI8Hv1prw15W2qO+7q7SBAplTODCRk+Yb0YoA2mMM/QXBUcXs
+vKEDLSSxzNIBi3T62l39RB/ml+gPKo87ZMDivex1ZhrcJc3Yu3sCAwEAAaOBpzCB
+pDAdBgNVHQ4EFgQUPjE+4pun+8FreIdpoR8v6N7xKtUwdQYDVR0jBG4wbIAUPjE+
+4pun+8FreIdpoR8v6N7xKtWhSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT
+b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCF
+mbrORCFzbTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQCpvCPdtvXJ
+muTj379TZuCJs7/l0FhA7AHa1WAlHjsXHaA7N0+3ZWAbdtXDsowal6S+ldgU/kfV
+Lq7NrRq+amJWC7SYj6cvVwhrSwSvu01fe/TWuOzHrRv1uTfJ/VXLonVufMDd9opo
+bhqYxMaxLdIx6t/MYmZH4Wpiq0yfZuv//M8i7BBl/qvaWbLhg0yVAKRwjFvf59h6
+6tRFCLddELOIhLDQtk8zMbioPEbfAlKdwwP8kYGtDGj6/9/YTd/oTKRdgHuwyup3
+m0L20Y6LddC+tb0WpK5EyrNbCbEqj1L4/U7r6f/FKNA3bx6nfdXbscaMfYonKAKg
+1cRrRg45sErmCz0QyTnWzXyvbjR4oQRzyW3kJ1JZudZ+AwOi00J5FYa3NiLuxl1u
+gIGKWSrASQWhEdpa1nlCgX7PhdaQgYjEMpQvA0GCA0OF5JDu8en1yZqsOt1hCLIN
+lkz/5jKPqrclY5hV99bE3hgCHRmIPNHCZG3wbZv2yJKxJX1YLMmQwAmSh2N7YwGG
+yXRvCxQs5ChPHyRairuf/5MZCZnSVb45ppTVuNUijsbflKRUgfj/XvfqQ22f+C9N
+Om2dmNvAiS2TOIfuP47CF2OUa5q4plUwmr+nyXQGM0SIoHNCj+MBdFfb3oxxAtI+
+SLhbnzQv5e84Doqz3YF0XW8jyR7q8GFLNA==
+-----END CERTIFICATE-----