summaryrefslogtreecommitdiff
path: root/src/leap/crypto
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-06-28 14:25:19 -0300
committerTomás Touceda <chiiph@leap.se>2013-06-28 14:25:19 -0300
commit5b975799ce9b7a6e0a88be4bcb48bdfb90800bb3 (patch)
treebda674a79b1aeccb37b67609517bc4761db7ae07 /src/leap/crypto
parent9cea9c8a34343f8792d65b96f93ae22bd8685878 (diff)
parentc088a1544a5f7a51359d2802019c0740aab0cc5b (diff)
Merge branch 'release-0.2.2'0.2.2
Diffstat (limited to 'src/leap/crypto')
-rw-r--r--src/leap/crypto/certs.py112
-rw-r--r--src/leap/crypto/certs_gnutls.py112
-rw-r--r--src/leap/crypto/constants.py18
-rw-r--r--src/leap/crypto/leapkeyring.py70
-rw-r--r--src/leap/crypto/srpauth.py496
-rw-r--r--src/leap/crypto/srpregister.py163
-rw-r--r--src/leap/crypto/tests/__init__.py16
-rw-r--r--src/leap/crypto/tests/eip-service.json43
-rwxr-xr-xsrc/leap/crypto/tests/fake_provider.py376
-rw-r--r--src/leap/crypto/tests/openvpn.pem33
-rw-r--r--src/leap/crypto/tests/test_certs.py22
-rw-r--r--src/leap/crypto/tests/test_provider.json15
-rw-r--r--src/leap/crypto/tests/test_srpregister.py202
-rw-r--r--src/leap/crypto/tests/wrongcert.pem33
14 files changed, 1395 insertions, 316 deletions
diff --git a/src/leap/crypto/certs.py b/src/leap/crypto/certs.py
deleted file mode 100644
index cbb5725a..00000000
--- a/src/leap/crypto/certs.py
+++ /dev/null
@@ -1,112 +0,0 @@
-import logging
-import os
-from StringIO import StringIO
-import ssl
-import time
-
-from dateutil.parser import parse
-from OpenSSL import crypto
-
-from leap.util.misc import null_check
-
-logger = logging.getLogger(__name__)
-
-
-class BadCertError(Exception):
- """
- raised for malformed certs
- """
-
-
-class NoCertError(Exception):
- """
- raised for cert not found in given path
- """
-
-
-def get_https_cert_from_domain(domain, port=443):
- """
- @param domain: a domain name to get a certificate from.
- """
- cert = ssl.get_server_certificate((domain, port))
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
- return x509
-
-
-def get_cert_from_file(_file):
- null_check(_file, "pem file")
- if isinstance(_file, (str, unicode)):
- if not os.path.isfile(_file):
- raise NoCertError
- with open(_file) as f:
- cert = f.read()
- else:
- cert = _file.read()
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
- return x509
-
-
-def get_pkey_from_file(_file):
- getkey = lambda f: crypto.load_privatekey(
- crypto.FILETYPE_PEM, f.read())
-
- if isinstance(_file, str):
- with open(_file) as f:
- key = getkey(f)
- else:
- key = getkey(_file)
- return key
-
-
-def can_load_cert_and_pkey(string):
- """
- loads certificate and private key from
- a buffer
- """
- try:
- f = StringIO(string)
- cert = get_cert_from_file(f)
-
- f = StringIO(string)
- key = get_pkey_from_file(f)
-
- null_check(cert, 'certificate')
- null_check(key, 'private key')
- except Exception as exc:
- logger.error(type(exc), exc.message)
- raise BadCertError
- else:
- return True
-
-
-def get_cert_fingerprint(domain=None, port=443, filepath=None,
- hash_type="SHA256", sep=":"):
- """
- @param domain: a domain name to get a fingerprint from
- @type domain: str
- @param filepath: path to a file containing a PEM file
- @type filepath: str
- @param hash_type: the hash function to be used in the fingerprint.
- must be one of SHA1, SHA224, SHA256, SHA384, SHA512
- @type hash_type: str
- @rparam: hex_fpr, a hexadecimal representation of a bytestring
- containing the fingerprint.
- @rtype: string
- """
- if domain:
- cert = get_https_cert_from_domain(domain, port=port)
- if filepath:
- cert = get_cert_from_file(filepath)
- hex_fpr = cert.digest(hash_type)
- return hex_fpr
-
-
-def get_time_boundaries(certfile):
- cert = get_cert_from_file(certfile)
- null_check(cert, 'certificate')
-
- fromts, tots = (cert.get_notBefore(), cert.get_notAfter())
- from_, to_ = map(
- lambda ts: time.gmtime(time.mktime(parse(ts).timetuple())),
- (fromts, tots))
- return from_, to_
diff --git a/src/leap/crypto/certs_gnutls.py b/src/leap/crypto/certs_gnutls.py
deleted file mode 100644
index 20c0e043..00000000
--- a/src/leap/crypto/certs_gnutls.py
+++ /dev/null
@@ -1,112 +0,0 @@
-'''
-We're using PyOpenSSL now
-
-import ctypes
-from StringIO import StringIO
-import socket
-
-import gnutls.connection
-import gnutls.crypto
-import gnutls.library
-
-from leap.util.misc import null_check
-
-
-class BadCertError(Exception):
- """raised for malformed certs"""
-
-
-def get_https_cert_from_domain(domain):
- """
- @param domain: a domain name to get a certificate from.
- """
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- cred = gnutls.connection.X509Credentials()
-
- session = gnutls.connection.ClientSession(sock, cred)
- session.connect((domain, 443))
- session.handshake()
- cert = session.peer_certificate
- return cert
-
-
-def get_cert_from_file(_file):
- getcert = lambda f: gnutls.crypto.X509Certificate(f.read())
- if isinstance(_file, str):
- with open(_file) as f:
- cert = getcert(f)
- else:
- cert = getcert(_file)
- return cert
-
-
-def get_pkey_from_file(_file):
- getkey = lambda f: gnutls.crypto.X509PrivateKey(f.read())
- if isinstance(_file, str):
- with open(_file) as f:
- key = getkey(f)
- else:
- key = getkey(_file)
- return key
-
-
-def can_load_cert_and_pkey(string):
- try:
- f = StringIO(string)
- cert = get_cert_from_file(f)
-
- f = StringIO(string)
- key = get_pkey_from_file(f)
-
- null_check(cert, 'certificate')
- null_check(key, 'private key')
- except:
- # XXX catch GNUTLSError?
- raise BadCertError
- else:
- return True
-
-def get_cert_fingerprint(domain=None, filepath=None,
- hash_type="SHA256", sep=":"):
- """
- @param domain: a domain name to get a fingerprint from
- @type domain: str
- @param filepath: path to a file containing a PEM file
- @type filepath: str
- @param hash_type: the hash function to be used in the fingerprint.
- must be one of SHA1, SHA224, SHA256, SHA384, SHA512
- @type hash_type: str
- @rparam: hex_fpr, a hexadecimal representation of a bytestring
- containing the fingerprint.
- @rtype: string
- """
- if domain:
- cert = get_https_cert_from_domain(domain)
- if filepath:
- cert = get_cert_from_file(filepath)
-
- _buffer = ctypes.create_string_buffer(64)
- buffer_length = ctypes.c_size_t(64)
-
- SUPPORTED_DIGEST_FUN = ("SHA1", "SHA224", "SHA256", "SHA384", "SHA512")
- if hash_type in SUPPORTED_DIGEST_FUN:
- digestfunction = getattr(
- gnutls.library.constants,
- "GNUTLS_DIG_%s" % hash_type)
- else:
- # XXX improperlyconfigured or something
- raise Exception("digest function not supported")
-
- gnutls.library.functions.gnutls_x509_crt_get_fingerprint(
- cert._c_object, digestfunction,
- ctypes.byref(_buffer), ctypes.byref(buffer_length))
-
- # deinit
- #server_cert._X509Certificate__deinit(server_cert._c_object)
- # needed? is segfaulting
-
- fpr = ctypes.string_at(_buffer, buffer_length.value)
- hex_fpr = sep.join(u"%02X" % ord(char) for char in fpr)
-
- return hex_fpr
-'''
diff --git a/src/leap/crypto/constants.py b/src/leap/crypto/constants.py
new file mode 100644
index 00000000..c5eaef1f
--- /dev/null
+++ b/src/leap/crypto/constants.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+# constants.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+SIGNUP_TIMEOUT = 5
diff --git a/src/leap/crypto/leapkeyring.py b/src/leap/crypto/leapkeyring.py
deleted file mode 100644
index c241d0bc..00000000
--- a/src/leap/crypto/leapkeyring.py
+++ /dev/null
@@ -1,70 +0,0 @@
-import keyring
-
-from leap.base.config import get_config_file
-
-#############
-# Disclaimer
-#############
-# This currently is not a keyring, it's more like a joke.
-# No, seriously.
-# We're affected by this **bug**
-
-# https://bitbucket.org/kang/python-keyring-lib/
-# issue/65/dbusexception-method-opensession-with
-
-# so using the gnome keyring does not seem feasible right now.
-# I thought this was the next best option to store secrets in plain sight.
-
-# in the future we should move to use the gnome/kde/macosx/win keyrings.
-
-
-class LeapCryptedFileKeyring(keyring.backend.CryptedFileKeyring):
-
- filename = ".secrets"
-
- @property
- def file_path(self):
- return get_config_file(self.filename)
-
- def __init__(self, seed=None):
- self.seed = seed
-
- def _get_new_password(self):
- # XXX every time this method is called,
- # $deity kills a kitten.
- return "secret%s" % self.seed
-
- def _init_file(self):
- self.keyring_key = self._get_new_password()
- self.set_password('keyring_setting', 'pass_ref', 'pass_ref_value')
-
- def _unlock(self):
- self.keyring_key = self._get_new_password()
- print 'keyring key ', self.keyring_key
- try:
- ref_pw = self.get_password(
- 'keyring_setting',
- 'pass_ref')
- print 'ref pw ', ref_pw
- assert ref_pw == "pass_ref_value"
- except AssertionError:
- self._lock()
- raise ValueError('Incorrect password')
-
-
-def leap_set_password(key, value, seed="xxx"):
- key, value = map(unicode, (key, value))
- keyring.set_keyring(LeapCryptedFileKeyring(seed=seed))
- keyring.set_password('leap', key, value)
-
-
-def leap_get_password(key, seed="xxx"):
- keyring.set_keyring(LeapCryptedFileKeyring(seed=seed))
- #import ipdb;ipdb.set_trace()
- return keyring.get_password('leap', key)
-
-
-if __name__ == "__main__":
- leap_set_password('test', 'bar')
- passwd = leap_get_password('test')
- assert passwd == 'bar'
diff --git a/src/leap/crypto/srpauth.py b/src/leap/crypto/srpauth.py
new file mode 100644
index 00000000..0e95ae64
--- /dev/null
+++ b/src/leap/crypto/srpauth.py
@@ -0,0 +1,496 @@
+# -*- coding: utf-8 -*-
+# srpauth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import binascii
+import logging
+
+import requests
+import srp
+import json
+
+#this error is raised from requests
+from simplejson.decoder import JSONDecodeError
+from functools import partial
+
+from PySide import QtCore
+from twisted.internet import threads
+
+from leap.common.check import leap_assert
+from leap.util.request_helpers import get_content
+from leap.common.events import signal as events_signal
+from leap.common.events import events_pb2 as proto
+
+logger = logging.getLogger(__name__)
+
+
+class SRPAuthenticationError(Exception):
+ """
+ Exception raised for authentication errors
+ """
+ pass
+
+
+class SRPAuth(QtCore.QObject):
+ """
+ SRPAuth singleton
+ """
+
+ class __impl(QtCore.QObject):
+ """
+ Implementation of the SRPAuth interface
+ """
+
+ LOGIN_KEY = "login"
+ A_KEY = "A"
+ CLIENT_AUTH_KEY = "client_auth"
+ SESSION_ID_KEY = "_session_id"
+
+ def __init__(self, provider_config):
+ """
+ Constructor for SRPAuth implementation
+
+ :param server: Server to which we will authenticate
+ :type server: str
+ """
+ QtCore.QObject.__init__(self)
+
+ leap_assert(provider_config,
+ "We need a provider config to authenticate")
+
+ self._provider_config = provider_config
+
+ # **************************************************** #
+ # Dependency injection helpers, override this for more
+ # granular testing
+ self._fetcher = requests
+ self._srp = srp
+ self._hashfun = self._srp.SHA256
+ self._ng = self._srp.NG_1024
+ # **************************************************** #
+
+ self._session = self._fetcher.session()
+ self._session_id = None
+ self._session_id_lock = QtCore.QMutex()
+ self._uid = None
+ self._uid_lock = QtCore.QMutex()
+ self._token = None
+ self._token_lock = QtCore.QMutex()
+
+ self._srp_user = None
+ self._srp_a = None
+
+ def _safe_unhexlify(self, val):
+ """
+ Rounds the val to a multiple of 2 and returns the
+ unhexlified value
+
+ :param val: hexlified value
+ :type val: str
+
+ :rtype: binary hex data
+ :return: unhexlified val
+ """
+ return binascii.unhexlify(val) \
+ if (len(val) % 2 == 0) else binascii.unhexlify('0' + val)
+
+ def _authentication_preprocessing(self, username, password):
+ """
+ Generates the SRP.User to get the A SRP parameter
+
+ :param username: username to login
+ :type username: str
+ :param password: password for the username
+ :type password: str
+ """
+ logger.debug("Authentication preprocessing...")
+ self._srp_user = self._srp.User(username,
+ password,
+ self._hashfun,
+ self._ng)
+ _, A = self._srp_user.start_authentication()
+
+ self._srp_a = A
+
+ def _start_authentication(self, _, username, password):
+ """
+ Sends the first request for authentication to retrieve the
+ salt and B parameter
+
+ Might raise SRPAuthenticationError
+
+ :param _: IGNORED, output from the previous callback (None)
+ :type _: IGNORED
+ :param username: username to login
+ :type username: str
+ :param password: password for the username
+ :type password: str
+
+ :return: salt and B parameters
+ :rtype: tuple
+ """
+ logger.debug("Starting authentication process...")
+ try:
+ auth_data = {
+ self.LOGIN_KEY: username,
+ self.A_KEY: binascii.hexlify(self._srp_a)
+ }
+ sessions_url = "%s/%s/%s/" % \
+ (self._provider_config.get_api_uri(),
+ self._provider_config.get_api_version(),
+ "sessions")
+ init_session = self._session.post(sessions_url,
+ data=auth_data,
+ verify=self._provider_config.
+ get_ca_cert_path())
+ except requests.exceptions.ConnectionError as e:
+ logger.error("No connection made (salt): %r" %
+ (e,))
+ raise SRPAuthenticationError("Could not establish a "
+ "connection")
+ except Exception as e:
+ logger.error("Unknown error: %r" % (e,))
+ raise SRPAuthenticationError("Unknown error: %r" %
+ (e,))
+
+ content, mtime = get_content(init_session)
+
+ if init_session.status_code not in (200,):
+ logger.error("No valid response (salt): "
+ "Status code = %r. Content: %r" %
+ (init_session.status_code, content))
+ if init_session.status_code == 422:
+ raise SRPAuthenticationError(self.tr("Unknown user"))
+
+ json_content = json.loads(content)
+ salt = json_content.get("salt", None)
+ B = json_content.get("B", None)
+
+ if salt is None:
+ logger.error("No salt parameter sent")
+ raise SRPAuthenticationError(self.tr("The server did not send "
+ "the salt parameter"))
+ if B is None:
+ logger.error("No B parameter sent")
+ raise SRPAuthenticationError(self.tr("The server did not send "
+ "the B parameter"))
+
+ return salt, B
+
+ def _process_challenge(self, salt_B, username):
+ """
+ Given the salt and B processes the auth challenge and
+ generates the M2 parameter
+
+ Might throw SRPAuthenticationError
+
+ :param salt_B: salt and B parameters for the username
+ :type salt_B: tuple
+ :param username: username for this session
+ :type username: str
+
+ :return: the M2 SRP parameter
+ :rtype: str
+ """
+ logger.debug("Processing challenge...")
+ try:
+ salt, B = salt_B
+ unhex_salt = self._safe_unhexlify(salt)
+ unhex_B = self._safe_unhexlify(B)
+ except TypeError as e:
+ logger.error("Bad data from server: %r" % (e,))
+ raise SRPAuthenticationError(self.tr("The data sent from "
+ "the server had errors"))
+ M = self._srp_user.process_challenge(unhex_salt, unhex_B)
+
+ auth_url = "%s/%s/%s/%s" % (self._provider_config.get_api_uri(),
+ self._provider_config.
+ get_api_version(),
+ "sessions",
+ username)
+
+ auth_data = {
+ self.CLIENT_AUTH_KEY: binascii.hexlify(M)
+ }
+
+ try:
+ auth_result = self._session.put(auth_url,
+ data=auth_data,
+ verify=self._provider_config.
+ get_ca_cert_path())
+ except requests.exceptions.ConnectionError as e:
+ logger.error("No connection made (HAMK): %r" % (e,))
+ raise SRPAuthenticationError(self.tr("Could not connect to "
+ "the server"))
+
+ try:
+ content, mtime = get_content(auth_result)
+ except JSONDecodeError:
+ raise SRPAuthenticationError("Bad JSON content in auth result")
+
+ if auth_result.status_code == 422:
+ error = ""
+ try:
+ error = json.loads(content).get("errors", "")
+ except ValueError:
+ logger.error("Problem parsing the received response: %s"
+ % (content,))
+ except AttributeError:
+ logger.error("Expecting a dict but something else was "
+ "received: %s", (content,))
+ logger.error("[%s] Wrong password (HAMK): [%s]" %
+ (auth_result.status_code, error))
+ raise SRPAuthenticationError(self.tr("Wrong password"))
+
+ if auth_result.status_code not in (200,):
+ logger.error("No valid response (HAMK): "
+ "Status code = %s. Content = %r" %
+ (auth_result.status_code, content))
+ raise SRPAuthenticationError(self.tr("Unknown error (%s)") %
+ (auth_result.status_code,))
+
+ json_content = json.loads(content)
+
+ try:
+ M2 = json_content.get("M2", None)
+ uid = json_content.get("id", None)
+ token = json_content.get("token", None)
+ except Exception as e:
+ logger.error(e)
+ raise Exception("Something went wrong with the login")
+
+ events_signal(proto.CLIENT_UID, content=uid)
+
+ self.set_uid(uid)
+ self.set_token(token)
+
+ if M2 is None or self.get_uid() is None:
+ logger.error("Something went wrong. Content = %r" %
+ (content,))
+ raise SRPAuthenticationError(self.tr("Problem getting data "
+ "from server"))
+
+ return M2
+
+ def _verify_session(self, M2):
+ """
+ Verifies the session based on the M2 parameter. If the
+ verification succeeds, it sets the session_id for this
+ session
+
+ Might throw SRPAuthenticationError
+
+ :param M2: M2 SRP parameter
+ :type M2: str
+ """
+ logger.debug("Verifying session...")
+ try:
+ unhex_M2 = self._safe_unhexlify(M2)
+ except TypeError:
+ logger.error("Bad data from server (HAWK)")
+ raise SRPAuthenticationError(self.tr("Bad data from server"))
+
+ self._srp_user.verify_session(unhex_M2)
+
+ if not self._srp_user.authenticated():
+ logger.error("Auth verification failed")
+ raise SRPAuthenticationError(self.tr("Auth verification "
+ "failed"))
+ logger.debug("Session verified.")
+
+ session_id = self._session.cookies.get(self.SESSION_ID_KEY, None)
+ if not session_id:
+ logger.error("Bad cookie from server (missing _session_id)")
+ raise SRPAuthenticationError(self.tr("Session cookie "
+ "verification "
+ "failed"))
+
+ events_signal(proto.CLIENT_SESSION_ID, content=session_id)
+
+ self.set_session_id(session_id)
+
+ def _threader(self, cb, res, *args, **kwargs):
+ return threads.deferToThread(cb, res, *args, **kwargs)
+
+ def authenticate(self, username, password):
+ """
+ Executes the whole authentication process for a user
+
+ Might raise SRPAuthenticationError
+
+ :param username: username for this session
+ :type username: str
+ :param password: password for this user
+ :type password: str
+
+ :returns: A defer on a different thread
+ :rtype: twisted.internet.defer.Deferred
+ """
+ leap_assert(self.get_session_id() is None, "Already logged in")
+
+ d = threads.deferToThread(self._authentication_preprocessing,
+ username=username,
+ password=password)
+
+ d.addCallback(
+ partial(self._threader,
+ self._start_authentication),
+ username=username,
+ password=password)
+ d.addCallback(
+ partial(self._threader,
+ self._process_challenge),
+ username=username)
+ d.addCallback(partial(self._threader,
+ self._verify_session))
+
+ return d
+
+ def logout(self):
+ """
+ Logs out the current session.
+ Expects a session_id to exists, might raise AssertionError
+ """
+ logger.debug("Starting logout...")
+
+ leap_assert(self.get_session_id(),
+ "Cannot logout an unexisting session")
+
+ logout_url = "%s/%s/%s/" % (self._provider_config.get_api_uri(),
+ self._provider_config.
+ get_api_version(),
+ "sessions")
+ try:
+ self._session.delete(logout_url,
+ data=self.get_session_id(),
+ verify=self._provider_config.
+ get_ca_cert_path())
+ except Exception as e:
+ logger.warning("Something went wrong with the logout: %r" %
+ (e,))
+
+ self.set_session_id(None)
+ self.set_uid(None)
+ # Also reset the session
+ self._session = self._fetcher.session()
+ logger.debug("Successfully logged out.")
+
+ def set_session_id(self, session_id):
+ QtCore.QMutexLocker(self._session_id_lock)
+ self._session_id = session_id
+
+ def get_session_id(self):
+ QtCore.QMutexLocker(self._session_id_lock)
+ return self._session_id
+
+ def set_uid(self, uid):
+ QtCore.QMutexLocker(self._uid_lock)
+ self._uid = uid
+
+ def get_uid(self):
+ QtCore.QMutexLocker(self._uid_lock)
+ return self._uid
+
+ def set_token(self, token):
+ QtCore.QMutexLocker(self._token_lock)
+ self._token = token
+
+ def get_token(self):
+ QtCore.QMutexLocker(self._token_lock)
+ return self._token
+
+ __instance = None
+
+ authentication_finished = QtCore.Signal(bool, str)
+ logout_finished = QtCore.Signal(bool, str)
+
+ def __init__(self, provider_config):
+ """
+ Creates a singleton instance if needed
+ """
+ QtCore.QObject.__init__(self)
+
+ # Check whether we already have an instance
+ if SRPAuth.__instance is None:
+ # Create and remember instance
+ SRPAuth.__instance = SRPAuth.__impl(provider_config)
+
+ # Store instance reference as the only member in the handle
+ self.__dict__['_SRPAuth__instance'] = SRPAuth.__instance
+
+ self._username = None
+ self._password = None
+
+ def authenticate(self, username, password):
+ """
+ Executes the whole authentication process for a user
+
+ Might raise SRPAuthenticationError
+
+ :param username: username for this session
+ :type username: str
+ :param password: password for this user
+ :type password: str
+ """
+
+ d = self.__instance.authenticate(username, password)
+ d.addCallback(self._gui_notify)
+ d.addErrback(self._errback)
+ return d
+
+ def _gui_notify(self, _):
+ """
+ Callback that notifies the UI with the proper signal.
+
+ :param _: IGNORED, output from the previous callback (None)
+ :type _: IGNORED
+ """
+ logger.debug("Successful login!")
+ self.authentication_finished.emit(True, self.tr("Succeeded"))
+
+ def _errback(self, failure):
+ """
+ General errback for the whole login process. Will notify the
+ UI with the proper signal.
+
+ :param failure: Failure object captured from a callback.
+ :type failure: twisted.python.failure.Failure
+ """
+ logger.error("Error logging in %s" % (failure,))
+ self.authentication_finished.emit(False, "%s" % (failure.value,))
+ failure.trap(Exception)
+
+ def get_session_id(self):
+ return self.__instance.get_session_id()
+
+ def get_uid(self):
+ return self.__instance.get_uid()
+
+ def get_token(self):
+ return self.__instance.get_token()
+
+ def logout(self):
+ """
+ Logs out the current session.
+ Expects a session_id to exists, might raise AssertionError
+ """
+ try:
+ self.__instance.logout()
+ self.logout_finished.emit(True, self.tr("Succeeded"))
+ return True
+ except Exception as e:
+ self.logout_finished.emit(False, "%s" % (e,))
+ return False
diff --git a/src/leap/crypto/srpregister.py b/src/leap/crypto/srpregister.py
new file mode 100644
index 00000000..07b3c917
--- /dev/null
+++ b/src/leap/crypto/srpregister.py
@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+# srpregister.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import binascii
+import logging
+
+import requests
+import srp
+
+from PySide import QtCore
+from urlparse import urlparse
+
+from leap.config.providerconfig import ProviderConfig
+from leap.crypto.constants import SIGNUP_TIMEOUT
+from leap.common.check import leap_assert, leap_assert_type
+
+logger = logging.getLogger(__name__)
+
+
+class SRPRegister(QtCore.QObject):
+ """
+ Registers a user to a specific provider using SRP
+ """
+
+ USER_LOGIN_KEY = 'user[login]'
+ USER_VERIFIER_KEY = 'user[password_verifier]'
+ USER_SALT_KEY = 'user[password_salt]'
+
+ registration_finished = QtCore.Signal(bool, object)
+
+ def __init__(self,
+ provider_config=None,
+ register_path="users"):
+ """
+ Constructor
+
+ :param provider_config: provider configuration instance,
+ properly loaded
+ :type privider_config: ProviderConfig
+ :param register_path: webapp path for registering users
+ :type register_path; str
+ """
+ QtCore.QObject.__init__(self)
+ leap_assert(provider_config, "Please provide a provider")
+ leap_assert_type(provider_config, ProviderConfig)
+
+ self._provider_config = provider_config
+
+ # **************************************************** #
+ # Dependency injection helpers, override this for more
+ # granular testing
+ self._fetcher = requests
+ self._srp = srp
+ self._hashfun = self._srp.SHA256
+ self._ng = self._srp.NG_1024
+ # **************************************************** #
+
+ parsed_url = urlparse(provider_config.get_api_uri())
+ self._provider = parsed_url.hostname
+ self._port = parsed_url.port
+ if self._port is None:
+ self._port = "443"
+
+ self._register_path = register_path
+
+ self._session = self._fetcher.session()
+
+ def _get_registration_uri(self):
+ """
+ Returns the URI where the register request should be made for
+ the provider
+
+ :rtype: str
+ """
+
+ uri = "https://%s:%s/%s/%s" % (
+ self._provider,
+ self._port,
+ self._provider_config.get_api_version(),
+ self._register_path)
+
+ return uri
+
+ def register_user(self, username, password):
+ """
+ Registers a user with the validator based on the password provider
+
+ :param username: username to register
+ :type username: str
+ :param password: password for this username
+ :type password: str
+
+ :rtype: tuple
+ :rparam: (ok, request)
+ """
+ salt, verifier = self._srp.create_salted_verification_key(
+ username,
+ password,
+ self._hashfun,
+ self._ng)
+
+ user_data = {
+ self.USER_LOGIN_KEY: username,
+ self.USER_VERIFIER_KEY: binascii.hexlify(verifier),
+ self.USER_SALT_KEY: binascii.hexlify(salt)
+ }
+
+ uri = self._get_registration_uri()
+
+ logger.debug('Post to uri: %s' % uri)
+ logger.debug("Will try to register user = %s" % (username,))
+ logger.debug("user_data => %r" % (user_data,))
+
+ ok = None
+ try:
+ req = self._session.post(uri,
+ data=user_data,
+ timeout=SIGNUP_TIMEOUT,
+ verify=self._provider_config.
+ get_ca_cert_path())
+
+ except requests.exceptions.SSLError as exc:
+ logger.error("SSLError: %s" % exc.message)
+ req = None
+ ok = False
+ else:
+ ok = req.ok
+ self.registration_finished.emit(ok, req)
+ return ok
+
+
+if __name__ == "__main__":
+ logger = logging.getLogger(name='leap')
+ logger.setLevel(logging.DEBUG)
+ console = logging.StreamHandler()
+ console.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(
+ '%(asctime)s '
+ '- %(name)s - %(levelname)s - %(message)s')
+ console.setFormatter(formatter)
+ logger.addHandler(console)
+
+ provider = ProviderConfig()
+
+ if provider.load("leap/providers/bitmask.net/provider.json"):
+ register = SRPRegister(provider_config=provider)
+ print "Registering user..."
+ print register.register_user("test1", "sarasaaaa")
+ print register.register_user("test2", "sarasaaaa")
diff --git a/src/leap/crypto/tests/__init__.py b/src/leap/crypto/tests/__init__.py
index e69de29b..7f118735 100644
--- a/src/leap/crypto/tests/__init__.py
+++ b/src/leap/crypto/tests/__init__.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/src/leap/crypto/tests/eip-service.json b/src/leap/crypto/tests/eip-service.json
new file mode 100644
index 00000000..24df42a2
--- /dev/null
+++ b/src/leap/crypto/tests/eip-service.json
@@ -0,0 +1,43 @@
+{
+ "gateways": [
+ {
+ "capabilities": {
+ "adblock": false,
+ "filter_dns": false,
+ "limited": true,
+ "ports": [
+ "1194",
+ "443",
+ "53",
+ "80"
+ ],
+ "protocols": [
+ "tcp",
+ "udp"
+ ],
+ "transport": [
+ "openvpn"
+ ],
+ "user_ips": false
+ },
+ "host": "harrier.cdev.bitmask.net",
+ "ip_address": "199.254.238.50",
+ "location": "seattle__wa"
+ }
+ ],
+ "locations": {
+ "seattle__wa": {
+ "country_code": "US",
+ "hemisphere": "N",
+ "name": "Seattle, WA",
+ "timezone": "-7"
+ }
+ },
+ "openvpn_configuration": {
+ "auth": "SHA1",
+ "cipher": "AES-128-CBC",
+ "tls-cipher": "DHE-RSA-AES128-SHA"
+ },
+ "serial": 1,
+ "version": 1
+} \ No newline at end of file
diff --git a/src/leap/crypto/tests/fake_provider.py b/src/leap/crypto/tests/fake_provider.py
new file mode 100755
index 00000000..54af485d
--- /dev/null
+++ b/src/leap/crypto/tests/fake_provider.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# fake_provider.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""A server faking some of the provider resources and apis,
+used for testing Leap Client requests
+
+It needs that you create a subfolder named 'certs',
+and that you place the following files:
+
+XXX check if in use
+
+[ ] test-openvpn.pem
+[ ] test-provider.json
+[ ] test-eip-service.json
+"""
+import binascii
+import json
+import os
+import sys
+import time
+
+import srp
+
+from OpenSSL import SSL
+
+from zope.interface import Interface, Attribute, implements
+
+from twisted.web.server import Site, Request
+from twisted.web.static import File, Data
+from twisted.web.resource import Resource
+from twisted.internet import reactor
+
+from leap.common.testing.https_server import where
+
+# See
+# http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.html
+# for more examples
+
+"""
+Testing the FAKE_API:
+#####################
+
+ 1) register an user
+ >> curl -d "user[login]=me" -d "user[password_salt]=foo" \
+ -d "user[password_verifier]=beef" http://localhost:8000/1/users
+ << {"errors": null}
+
+ 2) check that if you try to register again, it will fail:
+ >> curl -d "user[login]=me" -d "user[password_salt]=foo" \
+ -d "user[password_verifier]=beef" http://localhost:8000/1/users
+ << {"errors": {"login": "already taken!"}}
+
+"""
+
+# Globals to mock user/sessiondb
+
+_USERDB = {}
+_SESSIONDB = {}
+
+_here = os.path.split(__file__)[0]
+
+
+safe_unhexlify = lambda x: binascii.unhexlify(x) \
+ if (len(x) % 2 == 0) else binascii.unhexlify('0' + x)
+
+
+class IUser(Interface):
+ """
+ Defines the User Interface
+ """
+ login = Attribute("User login.")
+ salt = Attribute("Password salt.")
+ verifier = Attribute("Password verifier.")
+ session = Attribute("Session.")
+ svr = Attribute("Server verifier.")
+
+
+class User(object):
+ """
+ User object.
+ We store it in our simple session mocks
+ """
+
+ implements(IUser)
+
+ def __init__(self, login, salt, verifier):
+ self.login = login
+ self.salt = salt
+ self.verifier = verifier
+ self.session = None
+ self.svr = None
+
+ def set_server_verifier(self, svr):
+ """
+ Adds a svr verifier object to this
+ User instance
+ """
+ self.svr = svr
+
+ def set_session(self, session):
+ """
+ Adds this instance of User to the
+ global session dict
+ """
+ _SESSIONDB[session] = self
+ self.session = session
+
+
+class FakeUsers(Resource):
+ """
+ Resource that handles user registration.
+ """
+
+ def __init__(self, name):
+ self.name = name
+
+ def render_POST(self, request):
+ """
+ Handles POST to the users api resource
+ Simulates a login.
+ """
+ args = request.args
+
+ login = args['user[login]'][0]
+ salt = args['user[password_salt]'][0]
+ verifier = args['user[password_verifier]'][0]
+
+ if login in _USERDB:
+ request.setResponseCode(422)
+ return "%s\n" % json.dumps(
+ {'errors': {'login': 'already taken!'}})
+
+ print '[server]', login, verifier, salt
+ user = User(login, salt, verifier)
+ _USERDB[login] = user
+ return json.dumps({'errors': None})
+
+
+def getSession(self, sessionInterface=None):
+ """
+ we overwrite twisted.web.server.Request.getSession method to
+ put the right cookie name in place
+ """
+ if not self.session:
+ #cookiename = b"_".join([b'TWISTED_SESSION'] + self.sitepath)
+ cookiename = b"_".join([b'_session_id'] + self.sitepath)
+ sessionCookie = self.getCookie(cookiename)
+ if sessionCookie:
+ try:
+ self.session = self.site.getSession(sessionCookie)
+ except KeyError:
+ pass
+ # if it still hasn't been set, fix it up.
+ if not self.session:
+ self.session = self.site.makeSession()
+ self.addCookie(cookiename, self.session.uid, path=b'/')
+ self.session.touch()
+ if sessionInterface:
+ return self.session.getComponent(sessionInterface)
+ return self.session
+
+
+def get_user(request):
+ """
+ Returns user from the session dict
+ """
+ login = request.args.get('login')
+ if login:
+ user = _USERDB.get(login[0], None)
+ if user:
+ return user
+
+ request.getSession = getSession.__get__(request, Request)
+ session = request.getSession()
+
+ user = _SESSIONDB.get(session, None)
+ return user
+
+
+class FakeSession(Resource):
+ def __init__(self, name):
+ """
+ Initializes session
+ """
+ self.name = name
+
+ def render_GET(self, request):
+ """
+ Handles GET requests.
+ """
+ return "%s\n" % json.dumps({'errors': None})
+
+ def render_POST(self, request):
+ """
+ Handles POST requests.
+ """
+ user = get_user(request)
+
+ if not user:
+ # XXX get real error from demo provider
+ return json.dumps({'errors': 'no such user'})
+
+ A = request.args['A'][0]
+
+ _A = safe_unhexlify(A)
+ _salt = safe_unhexlify(user.salt)
+ _verifier = safe_unhexlify(user.verifier)
+
+ svr = srp.Verifier(
+ user.login,
+ _salt,
+ _verifier,
+ _A,
+ hash_alg=srp.SHA256,
+ ng_type=srp.NG_1024)
+
+ s, B = svr.get_challenge()
+
+ _B = binascii.hexlify(B)
+
+ print '[server] login = %s' % user.login
+ print '[server] salt = %s' % user.salt
+ print '[server] len(_salt) = %s' % len(_salt)
+ print '[server] vkey = %s' % user.verifier
+ print '[server] len(vkey) = %s' % len(_verifier)
+ print '[server] s = %s' % binascii.hexlify(s)
+ print '[server] B = %s' % _B
+ print '[server] len(B) = %s' % len(_B)
+
+ # override Request.getSession
+ request.getSession = getSession.__get__(request, Request)
+ session = request.getSession()
+
+ user.set_session(session)
+ user.set_server_verifier(svr)
+
+ # yep, this is tricky.
+ # some things are *already* unhexlified.
+ data = {
+ 'salt': user.salt,
+ 'B': _B,
+ 'errors': None}
+
+ return json.dumps(data)
+
+ def render_PUT(self, request):
+ """
+ Handles PUT requests.
+ """
+ # XXX check session???
+ user = get_user(request)
+
+ if not user:
+ print '[server] NO USER'
+ return json.dumps({'errors': 'no such user'})
+
+ data = request.content.read()
+ auth = data.split("client_auth=")
+ M = auth[1] if len(auth) > 1 else None
+ # if not H, return
+ if not M:
+ return json.dumps({'errors': 'no M proof passed by client'})
+
+ svr = user.svr
+ HAMK = svr.verify_session(binascii.unhexlify(M))
+ if HAMK is None:
+ print '[server] verification failed!!!'
+ raise Exception("Authentication failed!")
+ #import ipdb;ipdb.set_trace()
+
+ assert svr.authenticated()
+ print "***"
+ print '[server] User successfully authenticated using SRP!'
+ print "***"
+
+ return json.dumps(
+ {'M2': binascii.hexlify(HAMK),
+ 'id': '9c943eb9d96a6ff1b7a7030bdeadbeef',
+ 'errors': None})
+
+
+class API_Sessions(Resource):
+ """
+ Top resource for the API v1
+ """
+ def getChild(self, name, request):
+ return FakeSession(name)
+
+
+class FileModified(File):
+ def render_GET(self, request):
+ since = request.getHeader('if-modified-since')
+ if since:
+ tsince = time.strptime(since.replace(" GMT", ""))
+ tfrom = time.strptime(time.ctime(os.path.getmtime(self.path)))
+ if tfrom > tsince:
+ return File.render_GET(self, request)
+ else:
+ request.setResponseCode(304)
+ return ""
+ return File.render_GET(self, request)
+
+
+class OpenSSLServerContextFactory(object):
+
+ def getContext(self):
+ """
+ Create an SSL context.
+ """
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ #ctx = SSL.Context(SSL.TLSv1_METHOD)
+ ctx.use_certificate_file(where('leaptestscert.pem'))
+ ctx.use_privatekey_file(where('leaptestskey.pem'))
+
+ return ctx
+
+
+def get_provider_factory():
+ """
+ Instantiates a Site that serves the resources
+ that we expect from a valid provider.
+ Listens on:
+ * port 8000 for http connections
+ * port 8443 for https connections
+
+ :rparam: factory for a site
+ :rtype: Site instance
+ """
+ root = Data("", "")
+ root.putChild("", root)
+ root.putChild("provider.json", FileModified(
+ os.path.join(_here,
+ "test_provider.json")))
+ config = Resource()
+ config.putChild(
+ "eip-service.json",
+ FileModified(
+ os.path.join(_here, "eip-service.json")))
+ apiv1 = Resource()
+ apiv1.putChild("config", config)
+ apiv1.putChild("sessions", API_Sessions())
+ apiv1.putChild("users", FakeUsers(None))
+ apiv1.putChild("cert", FileModified(
+ os.path.join(_here,
+ 'openvpn.pem')))
+ root.putChild("1", apiv1)
+
+ factory = Site(root)
+ return factory
+
+
+if __name__ == "__main__":
+
+ from twisted.python import log
+ log.startLogging(sys.stdout)
+
+ factory = get_provider_factory()
+
+ # regular http (for debugging with curl)
+ reactor.listenTCP(8000, factory)
+ reactor.listenSSL(8443, factory, OpenSSLServerContextFactory())
+ reactor.run()
diff --git a/src/leap/crypto/tests/openvpn.pem b/src/leap/crypto/tests/openvpn.pem
new file mode 100644
index 00000000..a95e9370
--- /dev/null
+++ b/src/leap/crypto/tests/openvpn.pem
@@ -0,0 +1,33 @@
+-----BEGIN CERTIFICATE-----
+MIIFtTCCA52gAwIBAgIJAIGJ8Dg+DtemMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI2MjAyMDIyWhcNMTgwNjI2MjAyMDIyWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEAxJaN0lWjFu+3j48c0WG8BvmPUf026Xli5d5NE4EjGsirwfre0oTeWZT9
+WRxqLGd2wDh6Mc9r6UqH6dwqLZKbsgwB5zI2lag7UWFttJF1U1c6AJynhaLMoy73
+sL9USTmQ57iYRFrVP/nGj9/L6I1XnV6midPi7a5aZreH9q8dWaAhmc9eFDU+Y4vS
+sTFS6aomajLrI6YWo5toKqLq8IMryD03IM78a7gJtLgfWs+pYZRUBlM5JaYX98eX
+mVPAYYH9krWxLVN3hTt1ngECzK+epo275zQJh960/2fNCfVJSXqSXcficLs+bR7t
+FEkNuOP1hFV6LuoLL+k5Su+hp5kXMYZTvYYDpW4nPJoBdSG1w5O5IxO6zh+9VLB7
+oLrlgoyWvBoou5coCBpZVU6UyWcOx58kuZF8wNr0GgdvWAFwOGVuVG5jmcVdhaKC
+0C8NxHrxlhcrcp0zwtDaOxfmZfcxiXs35iwUip5vS18Nv+XBK8ad9T79Ox8nSzP3
+RGPVDpExz7gPbZglqSe47XBIk0ZuIzgOgYpJj4JrpoewoIYb+OmUgI7UZjoGsMrV
++B2BqOKs7kF0HW3i5bR9YAi0ZYvnhQgjBtwCKm4zvLqwuPZHz9VWgIk6uezgStCP
+WyzQ8IcopK49fOjcKa6JT5JRU+27paIZf1BkQsTkJy/Nti4TvwMCAwEAAaOBpzCB
+pDAdBgNVHQ4EFgQUEgXSd3Yl3xAzbkWa7xeNe27d99cwdQYDVR0jBG4wbIAUEgXS
+d3Yl3xAzbkWa7xeNe27d99ehSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT
+b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCB
+ifA4Pg7XpjAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQA6Vl9Ve4Qe
+ewzXAxr0BabFRhtIuF7DV+/niT46qJhW2KgYe6rwZqdAhEbgH3kTPJ5JmmcUnAEH
+nmrfoku/YAb5ObfdHUACsHy4cvSvFwBUQ9vXP6+oOFJhrGW4uzRI2pHGvnqB3lQ0
+JEPmPwduBCI5reRYauPbd4Wl4VhLGrjELb4JQZL24Q5ehXMnv415m7+aMkLzT2IA
+p6B2xgRR+JAeUdyCNOV1f5AqJWyAUJPWGR0e1OTKNfc49+2skK0NmzrpGsoktSHa
+uN6vGBCVGiZh7BTYblWMG5q9Am7idcdmC2fdpIf5yj7CKzV7WIPxPs0I7TuRcr41
+pUBLCAElcyCPB89lySol2BDs4gk4wZs4y2shUs3o0+mIpw/6o8tQF/9IL8ALkLqr
+q9SuND7O1RXcg74o3HeVmRKtoI/KdgaVhJ0rFvcq83ftfu3KMyWB6SOKOu6ZYON8
+AcSjsDDpnDrwGFvjAYHiTkS9NaaJC1/g7Y6jjhxmbTkXPA6V8MvLKQiOvqk/9gCh
+85FHsFkElIYnH6fbHIRxg20cnqmddTd+H5HgBIlhiKWuydtuoQFwzR/D3ypgLBaB
+OWLcBP7I+RYhKlJFIWnfiyB0xbyI4W/UfL8p8jQI8TE9oIlm3WqxJXfebDEDEstj
+8nS4Fb3G5Wr4pZMjfbtmBSAgHeWH6B90jg==
+-----END CERTIFICATE-----
diff --git a/src/leap/crypto/tests/test_certs.py b/src/leap/crypto/tests/test_certs.py
deleted file mode 100644
index e476b630..00000000
--- a/src/leap/crypto/tests/test_certs.py
+++ /dev/null
@@ -1,22 +0,0 @@
-import unittest
-
-from leap.testing.https_server import where
-from leap.crypto import certs
-
-
-class CertTestCase(unittest.TestCase):
-
- def test_can_load_client_and_pkey(self):
- with open(where('leaptestscert.pem')) as cf:
- cs = cf.read()
- with open(where('leaptestskey.pem')) as kf:
- ks = kf.read()
- certs.can_load_cert_and_pkey(cs + ks)
-
- with self.assertRaises(certs.BadCertError):
- # screw header
- certs.can_load_cert_and_pkey(cs.replace("BEGIN", "BEGINN") + ks)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/crypto/tests/test_provider.json b/src/leap/crypto/tests/test_provider.json
new file mode 100644
index 00000000..c37bef8f
--- /dev/null
+++ b/src/leap/crypto/tests/test_provider.json
@@ -0,0 +1,15 @@
+{
+ "api_uri": "https://localhost:8443",
+ "api_version": "1",
+ "ca_cert_fingerprint": "SHA256: 0f17c033115f6b76ff67871872303ff65034efe7dd1b910062ca323eb4da5c7e",
+ "ca_cert_uri": "https://bitmask.net/ca.crt",
+ "default_language": "en",
+ "domain": "example.com",
+ "enrollment_policy": "open",
+ "name": {
+ "en": "Bitmask"
+ },
+ "services": [
+ "openvpn"
+ ]
+}
diff --git a/src/leap/crypto/tests/test_srpregister.py b/src/leap/crypto/tests/test_srpregister.py
new file mode 100644
index 00000000..6d2b52e8
--- /dev/null
+++ b/src/leap/crypto/tests/test_srpregister.py
@@ -0,0 +1,202 @@
+# -*- coding: utf-8 -*-
+# test_srpregister.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/crypto/srpregister.py
+ * leap/crypto/srpauth.py
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+import os
+import sys
+
+from mock import MagicMock
+from nose.twistedtools import reactor, deferred
+from twisted.python import log
+from twisted.internet import threads
+
+from leap.common.testing.https_server import where
+from leap.config.providerconfig import ProviderConfig
+from leap.crypto import srpregister, srpauth
+from leap.crypto.tests import fake_provider
+
+log.startLogging(sys.stdout)
+
+
+def _get_capath():
+ return where("cacert.pem")
+
+_here = os.path.split(__file__)[0]
+
+
+class ImproperlyConfiguredError(Exception):
+ """
+ Raised if the test provider is missing configuration
+ """
+
+
+class SRPTestCase(unittest.TestCase):
+ """
+ Tests for the SRP Register and Auth classes
+ """
+ __name__ = "SRPRegister and SRPAuth tests"
+
+ @classmethod
+ def setUpClass(cls):
+ """
+ Sets up this TestCase with a simple and faked provider instance:
+
+ * runs a threaded reactor
+ * loads a mocked ProviderConfig that points to the certs in the
+ leap.common.testing module.
+ """
+ factory = fake_provider.get_provider_factory()
+ http = reactor.listenTCP(8001, factory)
+ https = reactor.listenSSL(
+ 0, factory,
+ fake_provider.OpenSSLServerContextFactory())
+ get_port = lambda p: p.getHost().port
+ cls.http_port = get_port(http)
+ cls.https_port = get_port(https)
+
+ provider = ProviderConfig()
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = _get_capath()
+
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = cls._get_https_uri()
+
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+ cls.register = srpregister.SRPRegister(provider_config=provider)
+
+ cls.auth = srpauth.SRPAuth(provider)
+
+ # helper methods
+
+ @classmethod
+ def _get_https_uri(cls):
+ """
+ Returns a https uri with the right https port initialized
+ """
+ return "https://localhost:%s" % (cls.https_port,)
+
+ # Register tests
+
+ def test_none_port(self):
+ provider = ProviderConfig()
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = "http://localhost/"
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+ self.assertEquals(register._port, "443")
+
+ @deferred()
+ def test_wrong_cert(self):
+ provider = ProviderConfig()
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = os.path.join(
+ _here,
+ "wrongcert.pem")
+ provider.get_api_uri = MagicMock()
+ provider.get_api_uri.return_value = self._get_https_uri()
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+ d = threads.deferToThread(register.register_user, "foouser_firsttime",
+ "barpass")
+ d.addCallback(self.assertFalse)
+ return d
+
+ @deferred()
+ def test_register_user(self):
+ """
+ Checks if the registration of an unused name works as expected when
+ it is the first time that we attempt to register that user, as well as
+ when we request a user that is taken.
+ """
+ # pristine registration
+ d = threads.deferToThread(self.register.register_user,
+ "foouser_firsttime",
+ "barpass")
+ d.addCallback(self.assertTrue)
+ return d
+
+ @deferred()
+ def test_second_register_user(self):
+ # second registration attempt with the same user should return errors
+ d = threads.deferToThread(self.register.register_user,
+ "foouser_second",
+ "barpass")
+ d.addCallback(self.assertTrue)
+
+ # FIXME currently we are catching this in an upper layer,
+ # we could bring the error validation to the SRPRegister class
+ def register_wrapper(_):
+ return threads.deferToThread(self.register.register_user,
+ "foouser_second",
+ "barpass")
+ d.addCallback(register_wrapper)
+ d.addCallback(self.assertFalse)
+ return d
+
+ @deferred()
+ def test_correct_http_uri(self):
+ """
+ Checks that registration autocorrect http uris to https ones.
+ """
+ HTTP_URI = "http://localhost:%s" % (self.https_port, )
+ HTTPS_URI = "https://localhost:%s/1/users" % (self.https_port, )
+ provider = ProviderConfig()
+ provider.get_ca_cert_path = MagicMock()
+ provider.get_ca_cert_path.return_value = _get_capath()
+ provider.get_api_uri = MagicMock()
+
+ # we introduce a http uri in the config file...
+ provider.get_api_uri.return_value = HTTP_URI
+ loaded = provider.load(path=os.path.join(
+ _here, "test_provider.json"))
+ if not loaded:
+ raise ImproperlyConfiguredError(
+ "Could not load test provider config")
+
+ register = srpregister.SRPRegister(provider_config=provider)
+
+ # ... and we check that we're correctly taking the HTTPS protocol
+ # instead
+ reg_uri = register._get_registration_uri()
+ self.assertEquals(reg_uri, HTTPS_URI)
+ register._get_registration_uri = MagicMock(return_value=HTTPS_URI)
+ d = threads.deferToThread(register.register_user, "test_failhttp",
+ "barpass")
+ d.addCallback(self.assertTrue)
+
+ return d
diff --git a/src/leap/crypto/tests/wrongcert.pem b/src/leap/crypto/tests/wrongcert.pem
new file mode 100644
index 00000000..e6cff38a
--- /dev/null
+++ b/src/leap/crypto/tests/wrongcert.pem
@@ -0,0 +1,33 @@
+-----BEGIN CERTIFICATE-----
+MIIFtTCCA52gAwIBAgIJAIWZus5EIXNtMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTMwNjI1MTc0NjExWhcNMTgwNjI1MTc0NjExWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEA2ObM7ESjyuxFZYD/Y68qOPQgjgggW+cdXfBpU2p4n7clsrUeMhWdW40Y
+77Phzor9VOeqs3ZpHuyLzsYVp/kFDm8tKyo2ah5fJwzL0VCSLYaZkUQQ7GNUmTCk
+furaxl8cQx/fg395V7/EngsS9B3/y5iHbctbA4MnH3jaotO5EGeo6hw7/eyCotQ9
+KbBV9GJMcY94FsXBCmUB+XypKklWTLhSaS6Cu4Fo8YLW6WmcnsyEOGS2F7WVf5at
+7CBWFQZHaSgIBLmc818/mDYCnYmCVMFn/6Ndx7V2NTlz+HctWrQn0dmIOnCUeCwS
+wXq9PnBR1rSx/WxwyF/WpyjOFkcIo7vm72kS70pfrYsXcZD4BQqkXYj3FyKnPt3O
+ibLKtCxL8/83wOtErPcYpG6LgFkgAAlHQ9MkUi5dbmjCJtpqQmlZeK1RALdDPiB3
+K1KZimrGsmcE624dJxUIOJJpuwJDy21F8kh5ZAsAtE1prWETrQYNElNFjQxM83rS
+ZR1Ql2MPSB4usEZT57+KvpEzlOnAT3elgCg21XrjSFGi14hCEao4g2OEZH5GAwm5
+frf6UlSRZ/g3tLTfI8Hv1prw15W2qO+7q7SBAplTODCRk+Yb0YoA2mMM/QXBUcXs
+vKEDLSSxzNIBi3T62l39RB/ml+gPKo87ZMDivex1ZhrcJc3Yu3sCAwEAAaOBpzCB
+pDAdBgNVHQ4EFgQUPjE+4pun+8FreIdpoR8v6N7xKtUwdQYDVR0jBG4wbIAUPjE+
+4pun+8FreIdpoR8v6N7xKtWhSaRHMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpT
+b21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGSCCQCF
+mbrORCFzbTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4ICAQCpvCPdtvXJ
+muTj379TZuCJs7/l0FhA7AHa1WAlHjsXHaA7N0+3ZWAbdtXDsowal6S+ldgU/kfV
+Lq7NrRq+amJWC7SYj6cvVwhrSwSvu01fe/TWuOzHrRv1uTfJ/VXLonVufMDd9opo
+bhqYxMaxLdIx6t/MYmZH4Wpiq0yfZuv//M8i7BBl/qvaWbLhg0yVAKRwjFvf59h6
+6tRFCLddELOIhLDQtk8zMbioPEbfAlKdwwP8kYGtDGj6/9/YTd/oTKRdgHuwyup3
+m0L20Y6LddC+tb0WpK5EyrNbCbEqj1L4/U7r6f/FKNA3bx6nfdXbscaMfYonKAKg
+1cRrRg45sErmCz0QyTnWzXyvbjR4oQRzyW3kJ1JZudZ+AwOi00J5FYa3NiLuxl1u
+gIGKWSrASQWhEdpa1nlCgX7PhdaQgYjEMpQvA0GCA0OF5JDu8en1yZqsOt1hCLIN
+lkz/5jKPqrclY5hV99bE3hgCHRmIPNHCZG3wbZv2yJKxJX1YLMmQwAmSh2N7YwGG
+yXRvCxQs5ChPHyRairuf/5MZCZnSVb45ppTVuNUijsbflKRUgfj/XvfqQ22f+C9N
+Om2dmNvAiS2TOIfuP47CF2OUa5q4plUwmr+nyXQGM0SIoHNCj+MBdFfb3oxxAtI+
+SLhbnzQv5e84Doqz3YF0XW8jyR7q8GFLNA==
+-----END CERTIFICATE-----