summaryrefslogtreecommitdiff
path: root/src/leap/bonafide/srp_auth.py
blob: ac2cd67b06809c46ff46e9133e13ef27878b30bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
# srp.py
# Copyright (C) 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
SRP Authentication.
"""

import binascii
import logging
import json

import srp

logger = logging.getLogger(__name__)


class SRPAuthMechanism(object):

    """
    Implement a protocol-agnostic SRP Authentication mechanism.
    """

    def initialize(self, username, password):
        srp_user = srp.User(username.encode('utf-8'),
                            password.encode('utf-8'),
                            srp.SHA256, srp.NG_1024)
        _, A = srp_user.start_authentication()
        return srp_user, A

    def get_handshake_params(self, username, A):
        return {'login': bytes(username), 'A': binascii.hexlify(A)}

    def process_handshake(self, srp_user, handshake_response):
        challenge = json.loads(handshake_response)
        self._check_for_errors(challenge)
        salt = challenge.get('salt', None)
        B = challenge.get('B', None)
        unhex_salt, unhex_B = self._unhex_salt_B(salt, B)
        M = srp_user.process_challenge(unhex_salt, unhex_B)
        return M

    def get_authentication_params(self, M, A):
        # I think A is not used in the server side
        return {'client_auth': binascii.hexlify(M), 'A': binascii.hexlify(A)}

    def process_authentication(self, authentication_response):
        auth = json.loads(authentication_response)
        uuid = auth.get('id', None)
        token = auth.get('token', None)
        M2 = auth.get('M2', None)
        self._check_auth_params(uuid, token, M2)
        return uuid, token, M2

    def verify_authentication(self, srp_user, M2):
        unhex_M2 = _safe_unhexlify(M2)
        srp_user.verify_session(unhex_M2)
        assert srp_user.authenticated()

    def _check_for_errors(self, challenge):
        if 'errors' in challenge:
            msg = challenge['errors']['base']
            raise SRPAuthError(msg)

    def _unhex_salt_B(self, salt, B):
        if salt is None:
            raise SRPAuthNoSalt()
        if B is None:
            raise SRPAuthNoB()
        try:
            unhex_salt = _safe_unhexlify(salt)
            unhex_B = _safe_unhexlify(B)
        except (TypeError, ValueError) as e:
            raise SRPAuthBadDataFromServer(str(e))
        return unhex_salt, unhex_B

    def _check_auth_params(self, uuid, token, M2):
        if not all((uuid, token, M2)):
            msg = '%r' % (M2, uuid, token,)
            raise SRPAuthBadDataFromServer(msg)

    #XXX move to session -----------------------
    def get_session_id(self, cookies):
        return cookies.get('_session_id', None)
    #XXX move to session -----------------------


def _safe_unhexlify(val):
    return binascii.unhexlify(val) \
        if (len(val) % 2 == 0) else binascii.unhexlify('0' + val)


class SRPAuthError(Exception):
    """
    Base exception for srp authentication errors
    """


class SRPAuthNoSalt(SRPAuthError):
    message = 'The server didn\'t send the salt parameter'


class SRPAuthNoB(SRPAuthError):
    message = 'The server didn\'t send the B parameter'

class SRPAuthBadDataFromServer(SRPAuthError):
    pass