From 8fd77ba036cb78c81939bbfce312b12cdc90d881 Mon Sep 17 00:00:00 2001 From: kali Date: Fri, 9 Nov 2012 18:13:32 +0900 Subject: working version of the fake provider. wizard can now be completely tested against this. --- src/leap/base/auth.py | 126 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 38 deletions(-) (limited to 'src/leap/base') diff --git a/src/leap/base/auth.py b/src/leap/base/auth.py index f1b618ba..58ae9d69 100644 --- a/src/leap/base/auth.py +++ b/src/leap/base/auth.py @@ -10,27 +10,46 @@ from PyQt4 import QtCore from leap.base import constants as baseconstants from leap.crypto import leapkeyring +from leap.util.web import get_https_domain_and_port logger = logging.getLogger(__name__) SIGNUP_TIMEOUT = getattr(baseconstants, 'SIGNUP_TIMEOUT', 5) -# XXX remove me!! -SERVER = "https://localhost:8443/1" - - """ Registration and authentication classes for the SRP auth mechanism used in the leap platform. -We're currently using the (pure python?) srp library since -it seemed the fastest way of getting something working. - -In the future we can switch to use python-gnutls, since -libgnutls implements srp protocol. +We're using the srp library which uses a c-based implementation +of the protocol if the c extension is available, and a python-based +one if not. """ +class ImproperlyConfigured(Exception): + """ + """ + + +class SRPAuthenticationError(Exception): + """ + exception raised + for authentication errors + """ + + +def null_check(value, value_name): + try: + assert value is not None + except AssertionError: + raise ImproperlyConfigured( + "%s parameter cannot be None" % value_name) + + +safe_unhexlify = lambda x: binascii.unhexlify(x) \ + if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) + + class LeapSRPRegister(object): def __init__(self, @@ -45,9 +64,19 @@ class LeapSRPRegister(object): hashfun=srp.SHA256, ng_constant=srp.NG_1024): + null_check(provider, provider) + self.schema = schema + + # XXX FIXME self.provider = provider self.port = port + # XXX splitting server,port + # deprecate port call. + domain, port = get_https_domain_and_port(provider) + self.provider = domain + self.port = port + self.verify = verify self.register_path = register_path self.method = method @@ -110,27 +139,16 @@ class LeapSRPRegister(object): return (req.ok, req) -class SRPAuthenticationError(Exception): - """ - exception raised - for authentication errors - """ - pass - -safe_unhexlify = lambda x: binascii.unhexlify(x) \ - if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) - - class SRPAuth(requests.auth.AuthBase): - def __init__(self, username, password, verify=None): + def __init__(self, username, password, server=None, verify=None): + # sanity check + null_check(server, 'server') self.username = username self.password = password + self.server = server self.verify = verify - # XXX init something similar to - # SERVER... - self.init_data = None self.session = requests.session() @@ -159,7 +177,7 @@ class SRPAuth(requests.auth.AuthBase): def get_init_data(self): try: init_session = self.session.post( - SERVER + '/sessions.json/', + self.server + '/1/sessions.json/', data=self.get_auth_data(), verify=self.verify) except requests.exceptions.ConnectionError: @@ -176,7 +194,7 @@ class SRPAuth(requests.auth.AuthBase): def get_server_proof_data(self): try: auth_result = self.session.put( - SERVER + '/sessions.json/' + self.username, + self.server + '/1/sessions.json/' + self.username, data={'client_auth': binascii.hexlify(self.M)}, verify=self.verify) except requests.exceptions.ConnectionError: @@ -258,18 +276,18 @@ class SRPAuth(requests.auth.AuthBase): return req -def srpauth_protected(user=None, passwd=None, verify=True): +def srpauth_protected(user=None, passwd=None, server=None, verify=True): """ decorator factory that accepts user and password keyword arguments and add those to the decorated request """ - def srpauth(fn, user=user, passwd=passwd): + def srpauth(fn): def wrapper(*args, **kwargs): - print 'uri is ', args[0] if user and passwd: - auth = SRPAuth(user, passwd, verify) + auth = SRPAuth(user, passwd, server, verify) kwargs['auth'] = auth + kwargs['verify'] = verify return fn(*args, **kwargs) return wrapper return srpauth @@ -305,6 +323,9 @@ def magick_srpauth(fn): # Unless we keep a table with the # equivalencies... user, passwd = get_leap_credentials() + + # XXX pass verify and server too + # (pop) auth = SRPAuth(user, passwd) kwargs['auth'] = auth return fn(*args, **kwargs) @@ -312,14 +333,43 @@ def magick_srpauth(fn): if __name__ == "__main__": + """ + To test against test_provider (twisted version) + Register an user: (will be valid during the session) + >>> python auth.py add test password + + Test login with that user: + >>> python auth.py login test password + """ + import sys - user = sys.argv[1] - passwd = sys.argv[2] - @srpauth_protected(user=user, passwd=passwd) - def test_srp_protected_get(*args, **kwargs): - req = requests.get(*args, **kwargs) - req.raise_for_status - #print req.content + if len(sys.argv) not in (4, 5): + print 'Usage: auth [server]' + sys.exit(0) + + action = sys.argv[1] + user = sys.argv[2] + passwd = sys.argv[3] + + if len(sys.argv) == 5: + SERVER = sys.argv[4] + else: + SERVER = "https://localhost:8443" + + if action == "login": + + @srpauth_protected( + user=user, passwd=passwd, server=SERVER, verify=False) + def test_srp_protected_get(*args, **kwargs): + req = requests.get(*args, **kwargs) + req.raise_for_status + return req + + req = test_srp_protected_get('https://localhost:8443/1/cert') + print 'cert :', req.content[:200] + "..." + sys.exit(0) - test_srp_protected_get('http://localhost:8443/1/cert') + if action == "add": + auth = LeapSRPRegister(provider=SERVER, verify=False) + auth.register_user(user, passwd) -- cgit v1.2.3