summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorazul <azul@riseup.net>2014-04-02 16:56:50 +0200
committerazul <azul@riseup.net>2014-04-02 16:56:50 +0200
commit81555ec6244ed76f92e3629880f68104b8705817 (patch)
treefd4e34d9a3904653e03b635658d3d50c02d6e78f
parent38474e94e3bfaeb40fb87bf2a9b8b1fbe546bc05 (diff)
parenta813647ecf95a8c965bff80342903613354a22e5 (diff)
Merge pull request #142 from varac/feature/include_nagios_checks
Feature/include nagios checks
-rwxr-xr-xtest/nagios/soledad_sync.py94
-rwxr-xr-xtest/nagios/webapp_login.py86
2 files changed, 180 insertions, 0 deletions
diff --git a/test/nagios/soledad_sync.py b/test/nagios/soledad_sync.py
new file mode 100755
index 0000000..3f176b5
--- /dev/null
+++ b/test/nagios/soledad_sync.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+
+# Test Soledad sync
+#
+# This script performs a slightly modified U1DB sync to the Soledad server and
+# returns whether that sync was succesful or not.
+
+
+import tempfile
+import requests
+import os
+import srp._pysrp as srp
+import shutil
+import u1db
+from u1db.remote.http_target import HTTPSyncTarget
+from webapp_login import read_config, parse, authenticate, fail
+
+
+# monkey patch U1DB's HTTPSyncTarget to perform token based auth
+
+def set_token_credentials(self, uuid, token):
+ self._creds = {'token': (uuid, token)}
+
+def _sign_request(self, method, url_query, params):
+ uuid, token = self._creds['token']
+ auth = '%s:%s' % (uuid, token)
+ return [('Authorization', 'Token %s' % auth.encode('base64')[:-1])]
+
+HTTPSyncTarget.set_token_credentials = set_token_credentials
+HTTPSyncTarget._sign_request = _sign_request
+
+
+# The following function could fetch all info needed to sync using soledad.
+# Despite that, we won't use all that info because we are instead faking a
+# Soledad sync by using U1DB slightly modified syncing capabilities. Part of
+# the code is commented and left here for future reference, in case we decide
+# to actually use the Soledad client in the future.
+
+def get_soledad_info(config, tempdir):
+ # get login and get user info
+ user = config['user']
+ api = config['api']
+ usr = srp.User( user['username'], user['password'], srp.SHA256, srp.NG_1024 )
+ try:
+ auth = parse(authenticate(api, usr))
+ except requests.exceptions.ConnectionError:
+ fail('no connection to server')
+ # get soledad server url
+ service_url = 'https://%s:%d/%d/config/soledad-service.json' % \
+ (api['domain'], api['port'], api['version'])
+ soledad_hosts = requests.get(service_url).json['hosts']
+ host = soledad_hosts.keys()[0]
+ server_url = 'https://%s:%d/user-%s' % \
+ (soledad_hosts[host]['hostname'], soledad_hosts[host]['port'],
+ auth['id'])
+ # get provider ca certificate
+ #ca_cert = requests.get('https://127.0.0.1/ca.crt', verify=False).text
+ #cert_file = os.path.join(tempdir, 'ca.crt')
+ cert_file = None # not used for now
+ #with open(cert_file, 'w') as f:
+ # f.write(ca_cert)
+ return auth['id'], user['password'], server_url, cert_file, auth['token']
+
+
+def run_tests():
+ tempdir = tempfile.mkdtemp()
+ uuid, password, server_url, cert_file, token = \
+ get_soledad_info(read_config(), tempdir)
+ exc = None
+ try:
+ # in the future, we can replace the following by an actual Soledad
+ # client sync, if needed
+ db = u1db.open(os.path.join(tempdir, '%s.db' % uuid), True)
+ creds = {'token': {'uuid': uuid, 'token': token}}
+ db.sync(server_url, creds=creds, autocreate=False)
+ except Exception as e:
+ exc = e
+ shutil.rmtree(tempdir)
+ exit(report(exc))
+
+
+def report(exc):
+ if exc is None:
+ print '0 soledad_sync - OK - can sync soledad fine'
+ return 0
+ if isinstance(exc, u1db.errors.U1DBError):
+ print '2 soledad_sync - CRITICAL - ' + exc.message
+ else:
+ print '2 soledad_sync - CRITICAL - ' + str(exc)
+ return 2
+
+
+if __name__ == '__main__':
+ run_tests()
diff --git a/test/nagios/webapp_login.py b/test/nagios/webapp_login.py
new file mode 100755
index 0000000..1711238
--- /dev/null
+++ b/test/nagios/webapp_login.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+
+# Test Authentication with the webapp API works.
+
+import requests
+import json
+import string
+import random
+import srp._pysrp as srp
+import binascii
+import yaml
+
+
+safe_unhexlify = lambda x: binascii.unhexlify(x) if (
+ len(x) % 2 == 0) else binascii.unhexlify('0' + x)
+
+
+def read_config():
+ with open("/etc/leap/hiera.yaml", 'r') as stream:
+ config = yaml.load(stream)
+ user = config['webapp']['nagios_test_user']
+ if 'username' not in user:
+ fail('nagios test user lacks username')
+ if 'password' not in user:
+ fail('nagios test user lacks password')
+ api = config['api']
+ api['version'] = config['webapp']['api_version']
+ return {'api': api, 'user': user}
+
+
+def run_tests(config):
+ user = config['user']
+ api = config['api']
+ usr = srp.User(user['username'], user['password'], srp.SHA256, srp.NG_1024)
+ try:
+ auth = parse(authenticate(api, usr))
+ except requests.exceptions.ConnectionError:
+ fail('no connection to server')
+ exit(report(auth, usr))
+
+# parse the server responses
+
+
+def parse(response):
+ request = response.request
+ try:
+ return json.loads(response.text)
+ except ValueError:
+ return None
+
+
+def authenticate(api, usr):
+ api_url = "https://{domain}:{port}/{version}".format(**api)
+ session = requests.session()
+ uname, A = usr.start_authentication()
+ params = {
+ 'login': uname,
+ 'A': binascii.hexlify(A)
+ }
+ init = parse(
+ session.post(api_url + '/sessions', data=params, verify=False))
+ if ('errors' in init):
+ fail('test user not found')
+ M = usr.process_challenge(
+ safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
+ return session.put(api_url + '/sessions/' + uname, verify=False,
+ data={'client_auth': binascii.hexlify(M)})
+
+
+def report(auth, usr):
+ if ('errors' in auth):
+ fail('srp password auth failed')
+ usr.verify_session(safe_unhexlify(auth["M2"]))
+ if usr.authenticated():
+ print '0 webapp_login - OK - can login to webapp fine'
+ return 0
+ print '1 webapp_login - WARNING - failed to verify webapp server'
+ return 1
+
+
+def fail(reason):
+ print '2 webapp_login - CRITICAL - ' + reason
+ exit(2)
+
+if __name__ == '__main__':
+ run_tests(read_config())