path: root/src/leap/gui/firstrun/tests/integration
diff options
Diffstat (limited to 'src/leap/gui/firstrun/tests/integration')
1 files changed, 295 insertions, 0 deletions
diff --git a/src/leap/gui/firstrun/tests/integration/ b/src/leap/gui/firstrun/tests/integration/
new file mode 100755
index 00000000..33ee0ee6
--- /dev/null
+++ b/src/leap/gui/firstrun/tests/integration/
@@ -0,0 +1,295 @@
+#!/usr/bin/env python
+"""A server faking some of the provider resources and apis,
+used for testing Leap Client requests
+It needs that you create a subfolder named 'certs',
+and that you place the following files:
+[ ] certs/leaptestscert.pem
+[ ] certs/leaptestskey.pem
+[ ] certs/cacert.pem
+[ ] certs/openvpn.pem
+[ ] provider.json
+[ ] eip-service.json
+# XXX NOTE: intended for manual debug.
+# I intend to include this as a regular test after 0.2.0 release
+# (so we can add twisted as a dep there)
+import binascii
+import json
+import os
+import sys
+# python SRP LIB (! important MUST be >=1.0.1 !)
+import srp
+# GnuTLS Example -- is not working as expected
+from gnutls import crypto
+from gnutls.constants import COMP_LZO, COMP_DEFLATE, COMP_NULL
+from gnutls.interfaces.twisted import X509Credentials
+# Going with OpenSSL as a workaround instead
+# But we DO NOT want to introduce this dependency.
+from OpenSSL import SSL
+from zope.interface import Interface, Attribute, implements
+from twisted.web.server import Site
+from twisted.web.static import File
+from twisted.web.resource import Resource
+from twisted.internet import reactor
+# See
+# for more examples
+Testing the FAKE_API:
+ 1) register an user
+ >> curl -d "user[login]=me" -d "user[password_salt]=foo" \
+ -d "user[password_verifier]=beef" http://localhost:8000/1/users.json
+ << {"errors": null}
+ 2) check that if you try to register again, it will fail:
+ >> curl -d "user[login]=me" -d "user[password_salt]=foo" \
+ -d "user[password_verifier]=beef" http://localhost:8000/1/users.json
+ << {"errors": {"login": "already taken!"}}
+# Globals to mock user/sessiondb
+USERDB = {}
+safe_unhexlify = lambda x: binascii.unhexlify(x) \
+ if (len(x) % 2 == 0) else binascii.unhexlify('0' + x)
+class IUser(Interface):
+ login = Attribute("User login.")
+ salt = Attribute("Password salt.")
+ verifier = Attribute("Password verifier.")
+ session = Attribute("Session.")
+ svr = Attribute("Server verifier.")
+class User(object):
+ implements(IUser)
+ def __init__(self, login, salt, verifier):
+ self.login = login
+ self.salt = salt
+ self.verifier = verifier
+ self.session = None
+ def set_server_verifier(self, svr):
+ self.svr = svr
+ def set_session(self, session):
+ SESSIONDB[session] = self
+ self.session = session
+class FakeUsers(Resource):
+ def __init__(self, name):
+ = name
+ def render_POST(self, request):
+ args = request.args
+ login = args['user[login]'][0]
+ salt = args['user[password_salt]'][0]
+ verifier = args['user[password_verifier]'][0]
+ if login in USERDB:
+ return "%s\n" % json.dumps(
+ {'errors': {'login': 'already taken!'}})
+ print login, verifier, salt
+ user = User(login, salt, verifier)
+ USERDB[login] = user
+ return json.dumps({'errors': None})
+def get_user(request):
+ login = request.args.get('login')
+ if login:
+ user = USERDB.get(login[0], None)
+ if user:
+ return user
+ session = request.getSession()
+ user = SESSIONDB.get(session, None)
+ return user
+class FakeSession(Resource):
+ def __init__(self, name):
+ = name
+ def render_GET(self, request):
+ return "%s\n" % json.dumps({'errors': None})
+ def render_POST(self, request):
+ user = get_user(request)
+ if not user:
+ # XXX get real error from demo provider
+ return json.dumps({'errors': 'no such user'})
+ A = request.args['A'][0]
+ _A = safe_unhexlify(A)
+ _salt = safe_unhexlify(user.salt)
+ _verifier = safe_unhexlify(user.verifier)
+ svr = srp.Verifier(
+ user.login,
+ _salt,
+ _verifier,
+ _A,
+ hash_alg=srp.SHA256,
+ ng_type=srp.NG_1024)
+ s, B = svr.get_challenge()
+ _B = binascii.hexlify(B)
+ print 'login = %s' % user.login
+ print 'salt = %s' % user.salt
+ print 'len(_salt) = %s' % len(_salt)
+ print 'vkey = %s' % user.verifier
+ print 'len(vkey) = %s' % len(_verifier)
+ print 's = %s' % binascii.hexlify(s)
+ print 'B = %s' % _B
+ print 'len(B) = %s' % len(_B)
+ session = request.getSession()
+ user.set_session(session)
+ user.set_server_verifier(svr)
+ # yep, this is tricky.
+ # some things are *already* unhexlified.
+ data = {
+ 'salt': user.salt,
+ 'B': _B,
+ 'errors': None}
+ return json.dumps(data)
+ def render_PUT(self, request):
+ # XXX check session???
+ user = get_user(request)
+ if not user:
+ print 'NO USER'
+ return json.dumps({'errors': 'no such user'})
+ data =
+ auth = data.split("client_auth=")
+ M = auth[1] if len(auth) > 1 else None
+ # if not H, return
+ if not M:
+ return json.dumps({'errors': 'no M proof passed by client'})
+ svr = user.svr
+ HAMK = svr.verify_session(binascii.unhexlify(M))
+ if HAMK is None:
+ print 'verification failed!!!'
+ raise Exception("Authentication failed!")
+ #import ipdb;ipdb.set_trace()
+ assert svr.authenticated()
+ print "***"
+ print 'server authenticated user SRP!'
+ print "***"
+ return json.dumps(
+ {'M2': binascii.hexlify(HAMK), 'errors': None})
+class API_Sessions(Resource):
+ def getChild(self, name, request):
+ return FakeSession(name)
+def get_certs_path():
+ script_path = os.path.realpath(os.path.dirname(sys.argv[0]))
+ certs_path = os.path.join(script_path, 'certs')
+ return certs_path
+def get_TLS_credentials():
+ # XXX this is giving errors
+ # XXX REview! We want to use gnutls!
+ certs_path = get_certs_path()
+ cert = crypto.X509Certificate(
+ open(certs_path + '/leaptestscert.pem').read())
+ key = crypto.X509PrivateKey(
+ open(certs_path + '/leaptestskey.pem').read())
+ ca = crypto.X509Certificate(
+ open(certs_path + '/cacert.pem').read())
+ #crl = crypto.X509CRL(open(certs_path + '/crl.pem').read())
+ #cred = crypto.X509Credentials(cert, key, [ca], [crl])
+ cred = X509Credentials(cert, key, [ca])
+ cred.verify_peer = True
+ cred.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL)
+ return cred
+class OpenSSLServerContextFactory:
+ # XXX workaround for broken TLS interface
+ # from gnuTLS.
+ def getContext(self):
+ """Create an SSL context.
+ This is a sample implementation that loads a certificate from a file
+ called 'server.pem'."""
+ certs_path = get_certs_path()
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ ctx.use_certificate_file(certs_path + '/leaptestscert.pem')
+ ctx.use_privatekey_file(certs_path + '/leaptestskey.pem')
+ return ctx
+if __name__ == "__main__":
+ from twisted.python import log
+ log.startLogging(sys.stdout)
+ root = Resource()
+ root.putChild("provider.json", File("./provider.json"))
+ config = Resource()
+ config.putChild(
+ "eip-service.json",
+ File("./eip-service.json"))
+ apiv1 = Resource()
+ apiv1.putChild("config", config)
+ apiv1.putChild("sessions.json", API_Sessions())
+ apiv1.putChild("users.json", FakeUsers(None))
+ apiv1.putChild("cert", File(get_certs_path() + '/openvpn.pem'))
+ root.putChild("1", apiv1)
+ cred = get_TLS_credentials()
+ factory = Site(root)
+ # regular http (for debugging with curl)
+ reactor.listenTCP(8000, factory)
+ # TLS with gnutls --- seems broken :(
+ #reactor.listenTLS(8003, factory, cred)
+ # OpenSSL
+ reactor.listenSSL(8443, factory, OpenSSLServerContextFactory())