1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# -*- coding: utf-8 -*-
# srp_auth.py
# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
SRP Authentication.
"""
import binascii
import json
import srp
class SRPAuthMechanism(object):
"""
Implement a protocol-agnostic SRP Authentication mechanism.
"""
def initialize(self, username, password):
srp_user = srp.User(username.encode('utf-8'),
password.encode('utf-8'),
srp.SHA256, srp.NG_1024)
_, A = srp_user.start_authentication()
return srp_user, A
def get_handshake_params(self, username, A):
return {'login': bytes(username), 'A': binascii.hexlify(A)}
def process_handshake(self, srp_user, handshake_response):
challenge = json.loads(handshake_response)
self._check_for_errors(challenge)
salt = challenge.get('salt', None)
B = challenge.get('B', None)
unhex_salt, unhex_B = self._unhex_salt_B(salt, B)
M = srp_user.process_challenge(unhex_salt, unhex_B)
return M
def get_authentication_params(self, M, A):
# It looks A is not used server side
return {'client_auth': binascii.hexlify(M), 'A': binascii.hexlify(A)}
def process_authentication(self, authentication_response):
auth = json.loads(authentication_response)
self._check_for_errors(auth)
uuid = auth.get('id', None)
token = auth.get('token', None)
M2 = auth.get('M2', None)
self._check_auth_params(uuid, token, M2)
return uuid, token, M2
def verify_authentication(self, srp_user, M2):
unhex_M2 = _safe_unhexlify(M2)
srp_user.verify_session(unhex_M2)
assert srp_user.authenticated()
def _check_for_errors(self, response):
if 'errors' in response:
msg = response['errors']['base']
raise SRPAuthError(msg)
def _unhex_salt_B(self, salt, B):
if salt is None:
raise SRPAuthNoSalt()
if B is None:
raise SRPAuthNoB()
try:
unhex_salt = _safe_unhexlify(salt)
unhex_B = _safe_unhexlify(B)
except (TypeError, ValueError) as e:
raise SRPAuthBadDataFromServer(str(e))
return unhex_salt, unhex_B
def _check_auth_params(self, uuid, token, M2):
if not all((uuid, token, M2)):
msg = '%s' % str((M2, uuid, token))
raise SRPAuthBadDataFromServer(msg)
def _safe_unhexlify(val):
return binascii.unhexlify(val) \
if (len(val) % 2 == 0) else binascii.unhexlify('0' + val)
class SRPAuthError(Exception):
"""
Base exception for srp authentication errors
"""
class SRPAuthNoSalt(SRPAuthError):
message = 'The server didn\'t send the salt parameter'
class SRPAuthNoB(SRPAuthError):
message = 'The server didn\'t send the B parameter'
class SRPAuthBadDataFromServer(SRPAuthError):
pass
|