summaryrefslogtreecommitdiff
path: root/test/nagios/webapp_login.py
blob: 7e2efd7839e85d6c8cd47d387130212d9caa815b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python

# Test Authentication with the webapp API works.

import requests
import json
import string
import random
import srp._pysrp as srp
import binascii
import yaml

safe_unhexlify = lambda x: binascii.unhexlify(x) if (
    len(x) % 2 == 0) else binascii.unhexlify('0' + x)

class Config():
    def __init__(self, filename="/etc/leap/hiera.yaml"):
        with open("/etc/leap/hiera.yaml", 'r') as stream:
            config = yaml.load(stream)
        self.user = config['webapp']['nagios_test_user']
        if 'username' not in self.user:
            raise Exception('nagios test user lacks username')
        if 'password' not in self.user:
            raise Exception('nagios test user lacks password')
        self.api = config['api']
        self.api['version'] = config['webapp']['api_version']

class Api():
    def __init__(self, config, verify=True):
        self.config = config.api
        self.session = requests.session()
        self.verify = verify
    
    def api_url(self, path):
        return self.api_root() + path

    def api_root(self):
        return "https://{domain}:{port}/{version}/".format(**self.config)

    def get(self, path, **args):
        response = self.session.get(self.api_url(path),
                verify=self.verify,
                **args)
        return response.json()

    def post(self, path, **args):
        response = self.session.post(self.api_url(path),
                verify=self.verify,
                **args)
        return response.json()

    def put(self, path, **args):
        response = self.session.put(self.api_url(path),
                verify=self.verify,
                **args)
        return response.json()

class User():
    def __init__(self, config):
        self.config = config.user
        self.srp_user = srp.User(self.config['username'], self.config['password'], srp.SHA256, srp.NG_1024)

    def login(self, api):
        init=self.init_authentication(api)
        if ('errors' in init):
            raise Exception('test user not found')
        auth=self.authenticate(api, init)
        if ('errors' in auth):
            raise Exception('srp password auth failed')
        self.verify_server(auth)
        if not self.is_authenticated():
            raise Exception('user is not authenticated')

    def init_authentication(self, api):
        uname, A = self.srp_user.start_authentication()
        params = {
            'login': uname,
            'A': binascii.hexlify(A)
        }
        return api.post('sessions', data=params)

    def authenticate(self, api, init):
        M = self.srp_user.process_challenge(
            safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
        auth = api.put('sessions/' + self.config["username"],
                           data={'client_auth': binascii.hexlify(M)})
        return auth

    def verify_server(self, auth):
        self.srp_user.verify_session(safe_unhexlify(auth["M2"]))

    def is_authenticated(self):
        return self.srp_user.authenticated()


def login_successfully():
    config = Config()
    user = User(config)
    api = Api(config, verify=False)
    user.login(api)

if __name__ == '__main__':
    import nagios_test
    exit_code = nagios_test.run(login_successfully)
    exit(exit_code)