1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
import binascii
import json
from requests import Session
from srp import User, srp
from requests.exceptions import HTTPError, SSLError, Timeout
from config import SYSTEM_CA_BUNDLE
class LeapAuthException(Exception):
def __init__(self, *args, **kwargs):
super(LeapAuthException, self).__init__(*args, **kwargs)
class LeapSRPSession(object):
def __init__(self, user_name, api_server_name, uuid, token, session_id, api_version='1'):
self.user_name = user_name
self.api_server_name = api_server_name
self.uuid = uuid
self.token = token
self.session_id = session_id
self.api_version = api_version
def __str__(self):
return 'LeapSRPSession(%s, %s, %s, %s, %s, %s)' % (self.user_name, self.api_server_name, self.uuid, self.token, self.session_id, self.api_version)
class LeapSecureRemotePassword(object):
def __init__(self, hash_alg=srp.SHA256, ng_type=srp.NG_1024, ca_bundle=SYSTEM_CA_BUNDLE, timeout_in_s=15,
leap_api_version='1'):
self.hash_alg = hash_alg
self.ng_type = ng_type
self.timeout_in_s = timeout_in_s
self.ca_bundle = ca_bundle
self.leap_api_version = leap_api_version
def authenticate(self, api_uri, username, password):
session = Session()
try:
return self._authenticate_with_session(session, api_uri, username, password)
except Timeout, e:
raise LeapAuthException(e)
finally:
session.close()
def _authenticate_with_session(self, http_session, api_uri, username, password):
try:
srp_user = User(username.encode('utf-8'), password.encode('utf-8'), self.hash_alg, self.ng_type)
salt, B_challenge = self._begin_authentication(srp_user, http_session, api_uri)
M2_verfication_code, leap_session = self._process_challenge(srp_user, http_session, api_uri, salt,
B_challenge)
self._verify_session(srp_user, M2_verfication_code)
return leap_session
except (HTTPError, SSLError), e:
raise LeapAuthException(e)
def _begin_authentication(self, user, session, api_uri):
_, A = user.start_authentication()
auth_data = {
"login": user.get_username(),
"A": binascii.hexlify(A)
}
session_url = '%s/%s/sessions' % (api_uri, self.leap_api_version)
response = session.post(session_url, data=auth_data, verify=self.ca_bundle, timeout=self.timeout_in_s)
response.raise_for_status()
json_content = json.loads(response.content)
salt = _safe_unhexlify(json_content.get('salt'))
B = _safe_unhexlify(json_content.get('B'))
return salt, B
def _process_challenge(self, user, session, api_uri, salt, B):
M = user.process_challenge(salt, B)
auth_data = {
"client_auth": binascii.hexlify(M)
}
auth_url = '%s/%s/sessions/%s' % (api_uri, self.leap_api_version, user.get_username())
response = session.put(auth_url, data=auth_data, verify=self.ca_bundle, timeout=self.timeout_in_s)
response.raise_for_status()
auth_json = json.loads(response.content)
M2 = _safe_unhexlify(auth_json.get('M2'))
uuid = auth_json.get('id')
token = auth_json.get('token')
session_id = response.cookies.get('_session_id')
return M2, LeapSRPSession(user.get_username(), api_uri, uuid, token, session_id)
def _verify_session(self, user, M2):
user.verify_session(M2)
if not user.authenticated():
raise LeapAuthException()
def _safe_unhexlify(hex_str):
return binascii.unhexlify(hex_str) \
if (len(hex_str) % 2 == 0) else binascii.unhexlify('0' + hex_str)
|