summaryrefslogtreecommitdiff
path: root/service/app/bitmask_libraries/leap_srp.py
blob: a1de7de37900c0ae38bea0853ec8fb36ed25c070 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import binascii
import json

from requests import Session
from srp import User, srp
from requests.exceptions import HTTPError, SSLError, Timeout
from config import SYSTEM_CA_BUNDLE


class LeapAuthException(Exception):
    def __init__(self, *args, **kwargs):
        super(LeapAuthException, self).__init__(*args, **kwargs)


class LeapSRPSession(object):
    def __init__(self, user_name, api_server_name, uuid, token, session_id, api_version='1'):
        self.user_name = user_name
        self.api_server_name = api_server_name
        self.uuid = uuid
        self.token = token
        self.session_id = session_id
        self.api_version = api_version

    def __str__(self):
        return 'LeapSRPSession(%s, %s, %s, %s, %s, %s)' % (self.user_name, self.api_server_name, self.uuid, self.token, self.session_id, self.api_version)


class LeapSecureRemotePassword(object):
    def __init__(self, hash_alg=srp.SHA256, ng_type=srp.NG_1024, ca_bundle=SYSTEM_CA_BUNDLE, timeout_in_s=15,
                 leap_api_version='1'):

        self.hash_alg = hash_alg
        self.ng_type = ng_type
        self.timeout_in_s = timeout_in_s
        self.ca_bundle = ca_bundle
        self.leap_api_version = leap_api_version

    def authenticate(self, api_uri, username, password):
        session = Session()
        try:
            return self._authenticate_with_session(session, api_uri, username, password)
        except Timeout, e:
            raise LeapAuthException(e)
        finally:
            session.close()

    def _authenticate_with_session(self, http_session, api_uri, username, password):
        try:
            srp_user = User(username.encode('utf-8'), password.encode('utf-8'), self.hash_alg, self.ng_type)

            salt, B_challenge = self._begin_authentication(srp_user, http_session, api_uri)
            M2_verfication_code, leap_session = self._process_challenge(srp_user, http_session, api_uri, salt,
                                                                        B_challenge)
            self._verify_session(srp_user, M2_verfication_code)

            return leap_session
        except (HTTPError, SSLError), e:
            raise LeapAuthException(e)

    def _begin_authentication(self, user, session, api_uri):
        _, A = user.start_authentication()

        auth_data = {
            "login": user.get_username(),
            "A": binascii.hexlify(A)
        }
        session_url = '%s/%s/sessions' % (api_uri, self.leap_api_version)
        response = session.post(session_url, data=auth_data, verify=self.ca_bundle, timeout=self.timeout_in_s)
        response.raise_for_status()
        json_content = json.loads(response.content)

        salt = _safe_unhexlify(json_content.get('salt'))
        B = _safe_unhexlify(json_content.get('B'))

        return salt, B

    def _process_challenge(self, user, session, api_uri, salt, B):
        M = user.process_challenge(salt, B)

        auth_data = {
            "client_auth": binascii.hexlify(M)
        }

        auth_url = '%s/%s/sessions/%s' % (api_uri, self.leap_api_version, user.get_username())
        response = session.put(auth_url, data=auth_data, verify=self.ca_bundle, timeout=self.timeout_in_s)
        response.raise_for_status()
        auth_json = json.loads(response.content)

        M2 = _safe_unhexlify(auth_json.get('M2'))
        uuid = auth_json.get('id')
        token = auth_json.get('token')
        session_id = response.cookies.get('_session_id')

        return M2, LeapSRPSession(user.get_username(), api_uri, uuid, token, session_id)

    def _verify_session(self, user, M2):
        user.verify_session(M2)
        if not user.authenticated():
            raise LeapAuthException()


def _safe_unhexlify(hex_str):
    return binascii.unhexlify(hex_str) \
        if (len(hex_str) % 2 == 0) else binascii.unhexlify('0' + hex_str)