summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/auth/auth.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/bitmask/auth/auth.py')
-rw-r--r--src/leap/bitmask/auth/auth.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/src/leap/bitmask/auth/auth.py b/src/leap/bitmask/auth/auth.py
new file mode 100644
index 00000000..03065571
--- /dev/null
+++ b/src/leap/bitmask/auth/auth.py
@@ -0,0 +1,336 @@
+import binascii
+import logging
+import os.path
+import requests
+import srp
+import json
+import re
+
+from requests.adapters import HTTPAdapter
+
+from leap.exceptions import (SRPAuthenticationError,
+ SRPAuthConnectionError,
+ SRPAuthBadStatusCode,
+ SRPAuthNoSalt,
+ SRPAuthNoB,
+ SRPAuthBadDataFromServer,
+ SRPAuthBadUserOrPassword,
+ SRPAuthVerificationFailed,
+ SRPAuthNoSessionId)
+
+from leap.srp_session import SRPSession
+
+logger = logging.getLogger(__name__)
+
+
+class SRPAuth(object):
+
+ def __init__(self, api_uri, verify_certificate=True, api_version=1):
+ self.api_uri = api_uri
+ self.api_version = api_version
+
+ if verify_certificate is None:
+ verify_certificate = True
+
+ if isinstance(verify_certificate, (str, unicode)) and not os.path.isfile(verify_certificate):
+ raise ValueError(
+ 'Path {0} is not a valid file'.format(verify_certificate))
+
+ self.verify_certificate = verify_certificate
+
+ def reset_session(self):
+ adapter = HTTPAdapter(max_retries=50)
+ self._session = requests.session()
+ self._session.mount('https://', adapter)
+ self.session_id = None
+
+ def _authentication_preprocessing(self, username, password):
+
+ logger.debug('Authentication preprocessing...')
+
+ user = srp.User(username.encode('utf-8'),
+ password.encode('utf-8'),
+ srp.SHA256, srp.NG_1024)
+ _, A = user.start_authentication()
+
+ return user, A
+
+ def _start_authentication(self, username, A):
+
+ logger.debug('Starting authentication process...')
+ try:
+ auth_data = {
+ 'login': username,
+ 'A': binascii.hexlify(A)
+ }
+ sessions_url = '%s/%s/%s/' % \
+ (self.api_uri,
+ self.api_version,
+ 'sessions')
+
+ verify_certificate = self.verify_certificate
+
+ init_session = self._session.post(sessions_url,
+ data=auth_data,
+ verify=verify_certificate,
+ timeout=30)
+ except requests.exceptions.ConnectionError as e:
+ logger.error('No connection made (salt): {0!r}'.format(e))
+ raise SRPAuthConnectionError()
+ except Exception as e:
+ logger.error('Unknown error: %r' % (e,))
+ raise SRPAuthenticationError()
+
+ if init_session.status_code not in (200,):
+ logger.error('No valid response (salt): '
+ 'Status code = %r. Content: %r' %
+ (init_session.status_code, init_session.content))
+ if init_session.status_code == 422:
+ logger.error('Invalid username or password.')
+ raise SRPAuthBadUserOrPassword()
+
+ logger.error('There was a problem with authentication.')
+ raise SRPAuthBadStatusCode()
+
+ json_content = json.loads(init_session.content)
+ salt = json_content.get('salt', None)
+ B = json_content.get('B', None)
+
+ if salt is None:
+ logger.error('The server didn\'t send the salt parameter.')
+ raise SRPAuthNoSalt()
+ if B is None:
+ logger.error('The server didn\'t send the B parameter.')
+ raise SRPAuthNoB()
+
+ return salt, B
+
+ def _process_challenge(self, user, salt_B, username):
+ logger.debug('Processing challenge...')
+ try:
+ salt, B = salt_B
+ unhex_salt = _safe_unhexlify(salt)
+ unhex_B = _safe_unhexlify(B)
+ except (TypeError, ValueError) as e:
+ logger.error('Bad data from server: %r' % (e,))
+ raise SRPAuthBadDataFromServer()
+ M = user.process_challenge(unhex_salt, unhex_B)
+
+ auth_url = '%s/%s/%s/%s' % (self.api_uri,
+ self.api_version,
+ 'sessions',
+ username)
+
+ auth_data = {
+ 'client_auth': binascii.hexlify(M)
+ }
+
+ try:
+ auth_result = self._session.put(auth_url,
+ data=auth_data,
+ verify=self.verify_certificate,
+ timeout=30)
+ except requests.exceptions.ConnectionError as e:
+ logger.error('No connection made (HAMK): %r' % (e,))
+ raise SRPAuthConnectionError()
+
+ if auth_result.status_code == 422:
+ error = ''
+ try:
+ error = json.loads(auth_result.content).get('errors', '')
+ except ValueError:
+ logger.error('Problem parsing the received response: %s'
+ % (auth_result.content,))
+ except AttributeError:
+ logger.error('Expecting a dict but something else was '
+ 'received: %s', (auth_result.content,))
+ logger.error('[%s] Wrong password (HAMK): [%s]' %
+ (auth_result.status_code, error))
+ raise SRPAuthBadUserOrPassword()
+
+ if auth_result.status_code not in (200,):
+ logger.error('No valid response (HAMK): '
+ 'Status code = %s. Content = %r' %
+ (auth_result.status_code, auth_result.content))
+ raise SRPAuthBadStatusCode()
+
+ return json.loads(auth_result.content)
+
+ def _extract_data(self, json_content):
+
+ try:
+ M2 = json_content.get('M2', None)
+ uuid = json_content.get('id', None)
+ token = json_content.get('token', None)
+ except Exception as e:
+ logger.error(e)
+ raise SRPAuthBadDataFromServer()
+
+ if M2 is None or uuid is None:
+ logger.error('Something went wrong. Content = %r' %
+ (json_content,))
+ raise SRPAuthBadDataFromServer()
+
+ return uuid, token, M2
+
+ def _verify_session(self, user, M2):
+
+ logger.debug('Verifying session...')
+ try:
+ unhex_M2 = _safe_unhexlify(M2)
+ except TypeError:
+ logger.error('Bad data from server (HAMK)')
+ raise SRPAuthBadDataFromServer()
+
+ user.verify_session(unhex_M2)
+
+ if not user.authenticated():
+ logger.error('Auth verification failed.')
+ raise SRPAuthVerificationFailed()
+ logger.debug('Session verified.')
+
+ session_id = self._session.cookies.get('_session_id', None)
+ if not session_id:
+ logger.error('Bad cookie from server (missing _session_id)')
+ raise SRPAuthNoSessionId()
+
+ logger.debug('SUCCESS LOGIN')
+ return session_id
+
+ def authenticate(self, username, password):
+
+ self.reset_session()
+
+ user, A = self._authentication_preprocessing(username, password)
+ salt_B = self._start_authentication(username, A)
+
+ json_content = self._process_challenge(user, salt_B, username)
+
+ uuid, token, M2 = self._extract_data(json_content)
+ session_id = self._verify_session(user, M2)
+
+ self.session_id = session_id
+
+ return SRPSession(username, token, uuid, session_id)
+
+ def logout(self):
+ logger.debug('Starting logout...')
+
+ if self.session_id is None:
+ logger.debug('Already logged out')
+ return
+
+ logout_url = '%s/%s/%s/' % (self.api_uri,
+ self.api_version,
+ 'logout')
+ try:
+ self._session.delete(logout_url,
+ data=self.session_id,
+ verify=self.verify_certificate,
+ timeout=30)
+ self.reset_session()
+ except Exception as e:
+ logger.warning('Something went wrong with the logout: %r' %
+ (e,))
+ raise
+ else:
+ logger.debug('Successfully logged out.')
+
+ def change_password(self,
+ username,
+ current_password,
+ new_password,
+ token,
+ uuid):
+
+ if self.session_id is None:
+ logger.debug('Already logged out')
+ return
+
+ url = '%s/%s/users/%s.json' % (
+ self.api_uri,
+ self.api_version,
+ uuid)
+
+ salt, verifier = srp.create_salted_verification_key(
+ username, new_password.encode('utf-8'),
+ srp.SHA256, srp.NG_1024)
+
+ cookies = {'_session_id': self.session_id}
+ headers = {
+ 'Authorization':
+ 'Token token={0}'.format(token)
+ }
+ user_data = {
+ 'user[password_verifier]': binascii.hexlify(verifier),
+ 'user[password_salt]': binascii.hexlify(salt)
+ }
+
+ change_password = self._session.put(
+ url, data=user_data,
+ verify=self.verify_certificate,
+ cookies=cookies,
+ timeout=30,
+ headers=headers)
+
+ change_password.raise_for_status()
+
+ def register(self, username, password):
+ self.reset_session()
+
+ username = username.encode('utf-8')
+ password = password.encode('utf-8')
+
+ validate_username(username)
+
+ salt, verifier = srp.create_salted_verification_key(
+ username,
+ password,
+ srp.SHA256,
+ srp.NG_1024)
+
+ user_data = {
+ 'user[login]': username,
+ 'user[password_verifier]': binascii.hexlify(verifier),
+ 'user[password_salt]': binascii.hexlify(salt)
+ }
+
+ url = "%s/%s/users" % (
+ self.api_uri,
+ self.api_version)
+
+ logger.debug("Registering user: %s" % username)
+
+ try:
+ response = self._session.post(
+ url,
+ data=user_data,
+ timeout=30,
+ verify=self.verify_certificate)
+
+ except requests.exceptions.RequestException as exc:
+ logger.error(exc.message)
+ raise
+
+ if not response.ok:
+ try:
+ json_content = json.loads(response.content)
+ error_msg = json_content.get("errors").get("login")[0]
+ if not error_msg.istitle():
+ error_msg = "%s %s" % (username, error_msg)
+ logger.error(error_msg)
+ except Exception as e:
+ logger.error("Unknown error: %s" % e.message)
+
+ return response.ok
+
+
+def _safe_unhexlify(val):
+ return binascii.unhexlify(val) \
+ if (len(val) % 2 == 0) else binascii.unhexlify('0' + val)
+
+
+def validate_username(username):
+ accepted_characters = '^[a-z0-9\-\_\.]*$'
+ if not re.match(accepted_characters, username):
+ raise ValueError('Only lowercase letters, digits, . - and _ allowed.')