1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
#!/usr/bin/env python
# Test Authentication with the webapp API works.
import requests
import json
import string
import random
import srp._pysrp as srp
import binascii
import yaml
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
len(x) % 2 == 0) else binascii.unhexlify('0' + x)
class Config():
def __init__(self, filename="/etc/leap/hiera.yaml"):
with open("/etc/leap/hiera.yaml", 'r') as stream:
config = yaml.load(stream)
self.user = config['webapp']['nagios_test_user']
if 'username' not in self.user:
raise Exception('nagios test user lacks username')
if 'password' not in self.user:
raise Exception('nagios test user lacks password')
self.api = config['api']
self.api['version'] = config['webapp']['api_version']
class Api():
def __init__(self, config, verify=True):
self.config = config.api
self.session = requests.session()
self.verify = verify
def api_url(self, path):
return self.api_root() + path
def api_root(self):
return "https://{domain}:{port}/{version}/".format(**self.config)
def get(self, path, **args):
response = self.session.get(self.api_url(path),
verify=self.verify,
**args)
return response.json()
def post(self, path, **args):
response = self.session.post(self.api_url(path),
verify=self.verify,
**args)
return response.json()
def put(self, path, **args):
response = self.session.put(self.api_url(path),
verify=self.verify,
**args)
return response.json()
class User():
def __init__(self, config):
self.config = config.user
self.srp_user = srp.User(self.config['username'], self.config['password'], srp.SHA256, srp.NG_1024)
def login(self, api):
init=self.init_authentication(api)
if ('errors' in init):
raise Exception('test user not found')
auth=self.authenticate(api, init)
if ('errors' in auth):
raise Exception('srp password auth failed')
self.verify_server(auth)
if not self.is_authenticated():
raise Exception('user is not authenticated')
def init_authentication(self, api):
uname, A = self.srp_user.start_authentication()
params = {
'login': uname,
'A': binascii.hexlify(A)
}
return api.post('sessions', data=params)
def authenticate(self, api, init):
M = self.srp_user.process_challenge(
safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
auth = api.put('sessions/' + self.config["username"],
data={'client_auth': binascii.hexlify(M)})
return auth
def verify_server(self, auth):
self.srp_user.verify_session(safe_unhexlify(auth["M2"]))
def is_authenticated(self):
return self.srp_user.authenticated()
def login_successfully():
config = Config()
user = User(config)
api = Api(config, verify=False)
user.login(api)
if __name__ == '__main__':
import nagios_test
exit_code = nagios_test.run(login_successfully)
exit(exit_code)
|