From 8fd77ba036cb78c81939bbfce312b12cdc90d881 Mon Sep 17 00:00:00 2001 From: kali Date: Fri, 9 Nov 2012 18:13:32 +0900 Subject: working version of the fake provider. wizard can now be completely tested against this. --- .../firstrun/tests/integration/fake_provider.py | 175 ++++++++++++++++++++- 1 file changed, 167 insertions(+), 8 deletions(-) (limited to 'src/leap/gui/firstrun') diff --git a/src/leap/gui/firstrun/tests/integration/fake_provider.py b/src/leap/gui/firstrun/tests/integration/fake_provider.py index 27886d3b..09c6c468 100755 --- a/src/leap/gui/firstrun/tests/integration/fake_provider.py +++ b/src/leap/gui/firstrun/tests/integration/fake_provider.py @@ -1,8 +1,8 @@ -#/usr/bin/env python +#!/usr/bin/env python """A server faking some of the provider resources and apis, -used for testing Leap Client requests. +used for testing Leap Client requests -Right needs that you create a subfolder named 'certs', +It needs that you create a subfolder named 'certs', and that you place the following files: [ ] certs/leaptestscert.pem @@ -14,10 +14,14 @@ and that you place the following files: [ ] eip-service.json """ +import binascii import json import os import sys +# python SRP LIB (! important MUST be >=1.0.1 !) +import srp + # GnuTLS Example -- is not working as expected from gnutls import crypto from gnutls.constants import COMP_LZO, COMP_DEFLATE, COMP_NULL @@ -27,6 +31,8 @@ from gnutls.interfaces.twisted import X509Credentials # But we DO NOT want to introduce this dependency. from OpenSSL import SSL +from zope.interface import Interface, Attribute, implements + from twisted.web.server import Site from twisted.web.static import File from twisted.web.resource import Resource @@ -36,21 +42,173 @@ from twisted.internet import reactor # http://twistedmatrix.com/documents/current/web/howto/web-in-60/index.htmln # for more examples +""" +Testing the FAKE_API: +##################### + + 1) register an user + >> curl -d "user[login]=me" -d "user[password_salt]=foo" -d "user[password_verifier]=beef" http://localhost:8000/1/users.json + << {"errors": null} + + 2) check that if you try to register again, it will fail: + >> curl -d "user[login]=me" -d "user[password_salt]=foo" -d "user[password_verifier]=beef" http://localhost:8000/1/users.json + << {"errors": {"login": "already taken!"}} + +""" + +# Globals to mock user/sessiondb + +USERDB = {} +SESSIONDB = {} + + +safe_unhexlify = lambda x: binascii.unhexlify(x) \ + if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) + + +class IUser(Interface): + login = Attribute("User login.") + salt = Attribute("Password salt.") + verifier = Attribute("Password verifier.") + session = Attribute("Session.") + svr = Attribute("Server verifier.") + + +class User(object): + implements(IUser) + + def __init__(self, login, salt, verifier): + self.login = login + self.salt = salt + self.verifier = verifier + self.session = None + + def set_server_verifier(self, svr): + self.svr = svr + + def set_session(self, session): + SESSIONDB[session] = self + self.session = session + + +class FakeUsers(Resource): + def __init__(self, name): + self.name = name + + def render_POST(self, request): + args = request.args + + login = args['user[login]'][0] + salt = args['user[password_salt]'][0] + verifier = args['user[password_verifier]'][0] + + if login in USERDB: + return "%s\n" % json.dumps( + {'errors': {'login': 'already taken!'}}) + + print login, verifier, salt + user = User(login, salt, verifier) + USERDB[login] = user + return json.dumps({'errors': None}) + + +def get_user(request): + login = request.args.get('login') + if login: + user = USERDB.get(login[0], None) + if user: + return user + + session = request.getSession() + user = SESSIONDB.get(session, None) + return user + class FakeSession(Resource): def __init__(self, name): self.name = name def render_GET(self, request): - return json.dumps({'errors': None}) + return "%s\n" % json.dumps({'errors': None}) def render_POST(self, request): - return json.dumps( - {'salt': 'deadbeef', 'B': 'deadbeef', 'errors': None}) + + user = get_user(request) + + if not user: + # XXX get real error from demo provider + return json.dumps({'errors': 'no such user'}) + + A = request.args['A'][0] + + _A = safe_unhexlify(A) + _salt = safe_unhexlify(user.salt) + _verifier = safe_unhexlify(user.verifier) + + svr = srp.Verifier( + user.login, + _salt, + _verifier, + _A, + hash_alg=srp.SHA256, + ng_type=srp.NG_1024) + + s, B = svr.get_challenge() + + _B = binascii.hexlify(B) + + print 'login = %s' % user.login + print 'salt = %s' % user.salt + print 'len(_salt) = %s' % len(_salt) + print 'vkey = %s' % user.verifier + print 'len(vkey) = %s' % len(_verifier) + print 's = %s' % binascii.hexlify(s) + print 'B = %s' % _B + print 'len(B) = %s' % len(_B) + + session = request.getSession() + user.set_session(session) + user.set_server_verifier(svr) + + # yep, this is tricky. + # some things are *already* unhexlified. + data = { + 'salt': user.salt, + 'B': _B, + 'errors': None} + + return json.dumps(data) def render_PUT(self, request): + + # XXX check session??? + user = get_user(request) + + if not user: + print 'NO USER' + return json.dumps({'errors': 'no such user'}) + + data = request.content.read() + auth = data.split("client_auth=") + M = auth[1] if len(auth) > 1 else None + # if not H, return + if not M: + return json.dumps({'errors': 'no M proof passed by client'}) + + svr = user.svr + HAMK = svr.verify_session(binascii.unhexlify(M)) + if HAMK is None: + print 'verification failed!!!' + raise Exception("Authentication failed!") + #import ipdb;ipdb.set_trace() + + assert svr.authenticated() + print "***" + print 'server authenticated user SRP!' + print "***" + return json.dumps( - {'M2': 'deadbeef', 'errors': None}) + {'M2': binascii.hexlify(HAMK), 'errors': None}) class API_Sessions(Resource): @@ -113,6 +271,7 @@ if __name__ == "__main__": apiv1 = Resource() apiv1.putChild("config", config) apiv1.putChild("sessions.json", API_Sessions()) + apiv1.putChild("users.json", FakeUsers(None)) apiv1.putChild("cert", File(get_certs_path() + '/openvpn.pem')) root.putChild("1", apiv1) @@ -120,7 +279,7 @@ if __name__ == "__main__": factory = Site(root) - # regular http + # regular http (for debugging with curl) reactor.listenTCP(8000, factory) # TLS with gnutls --- seems broken :( -- cgit v1.2.3