diff options
author | Kali Kaneko <kali@leap.se> | 2013-07-02 22:29:32 +0900 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2013-07-02 22:29:32 +0900 |
commit | 1d30e2580592ef905d9b21c475459da1e40b1cd6 (patch) | |
tree | 6ec48c6b5234da55ecc91ad3c6235fb20b61315c /src/leap/base/auth.py | |
parent | 81dc8ebe9ef46c0fafa75cba5c4959bb822da686 (diff) | |
parent | 5b975799ce9b7a6e0a88be4bcb48bdfb90800bb3 (diff) |
Merge branch 'master' of ssh://leap.se/leap_client
Diffstat (limited to 'src/leap/base/auth.py')
-rw-r--r-- | src/leap/base/auth.py | 355 |
1 files changed, 0 insertions, 355 deletions
diff --git a/src/leap/base/auth.py b/src/leap/base/auth.py deleted file mode 100644 index c2d3f424..00000000 --- a/src/leap/base/auth.py +++ /dev/null @@ -1,355 +0,0 @@ -import binascii -import json -import logging -#import urlparse - -import requests -import srp - -from PyQt4 import QtCore - -from leap.base import constants as baseconstants -from leap.crypto import leapkeyring -from leap.util.misc import null_check -from leap.util.web import get_https_domain_and_port - -logger = logging.getLogger(__name__) - -SIGNUP_TIMEOUT = getattr(baseconstants, 'SIGNUP_TIMEOUT', 5) - -""" -Registration and authentication classes for the -SRP auth mechanism used in the leap platform. - -We're using the srp library which uses a c-based implementation -of the protocol if the c extension is available, and a python-based -one if not. -""" - - -class SRPAuthenticationError(Exception): - """ - exception raised - for authentication errors - """ - - -safe_unhexlify = lambda x: binascii.unhexlify(x) \ - if (len(x) % 2 == 0) else binascii.unhexlify('0' + x) - - -class LeapSRPRegister(object): - - def __init__(self, - schema="https", - provider=None, - verify=True, - register_path="1/users", - method="POST", - fetcher=requests, - srp=srp, - hashfun=srp.SHA256, - ng_constant=srp.NG_1024): - - null_check(provider, "provider") - - self.schema = schema - - domain, port = get_https_domain_and_port(provider) - self.provider = domain - self.port = port - - self.verify = verify - self.register_path = register_path - self.method = method - self.fetcher = fetcher - self.srp = srp - self.HASHFUN = hashfun - self.NG = ng_constant - - self.init_session() - - def init_session(self): - self.session = self.fetcher.session() - - def get_registration_uri(self): - # XXX assert is https! - # use urlparse - if self.port: - uri = "%s://%s:%s/%s" % ( - self.schema, - self.provider, - self.port, - self.register_path) - else: - uri = "%s://%s/%s" % ( - self.schema, - self.provider, - self.register_path) - - return uri - - def register_user(self, username, password, keep=False): - """ - @rtype: tuple - @rparam: (ok, request) - """ - salt, vkey = self.srp.create_salted_verification_key( - username, - password, - self.HASHFUN, - self.NG) - - user_data = { - 'user[login]': username, - 'user[password_verifier]': binascii.hexlify(vkey), - 'user[password_salt]': binascii.hexlify(salt)} - - uri = self.get_registration_uri() - logger.debug('post to uri: %s' % uri) - - # XXX get self.method - req = self.session.post( - uri, data=user_data, - timeout=SIGNUP_TIMEOUT, - verify=self.verify) - # we catch it in the form - #req.raise_for_status() - return (req.ok, req) - - -class SRPAuth(requests.auth.AuthBase): - - def __init__(self, username, password, server=None, verify=None): - # sanity check - null_check(server, 'server') - self.username = username - self.password = password - self.server = server - self.verify = verify - - logger.debug('SRPAuth. verify=%s' % verify) - logger.debug('server: %s. username=%s' % (server, username)) - - self.init_data = None - self.session = requests.session() - - self.init_srp() - - def init_srp(self): - usr = srp.User( - self.username, - self.password, - srp.SHA256, - srp.NG_1024) - uname, A = usr.start_authentication() - - self.srp_usr = usr - self.A = A - - def get_auth_data(self): - return { - 'login': self.username, - 'A': binascii.hexlify(self.A) - } - - def get_init_data(self): - try: - init_session = self.session.post( - self.server + '/1/sessions/', - data=self.get_auth_data(), - verify=self.verify) - except requests.exceptions.ConnectionError: - raise SRPAuthenticationError( - "No connection made (salt).") - except: - raise SRPAuthenticationError( - "Unknown error (salt).") - if init_session.status_code not in (200, ): - raise SRPAuthenticationError( - "No valid response (salt).") - - self.init_data = init_session.json - return self.init_data - - def get_server_proof_data(self): - try: - auth_result = self.session.put( - #self.server + '/1/sessions.json/' + self.username, - self.server + '/1/sessions/' + self.username, - data={'client_auth': binascii.hexlify(self.M)}, - verify=self.verify) - except requests.exceptions.ConnectionError: - raise SRPAuthenticationError( - "No connection made (HAMK).") - - if auth_result.status_code not in (200, ): - raise SRPAuthenticationError( - "No valid response (HAMK).") - - self.auth_data = auth_result.json - return self.auth_data - - def authenticate(self): - logger.debug('start authentication...') - - init_data = self.get_init_data() - salt = init_data.get('salt', None) - B = init_data.get('B', None) - - # XXX refactor this function - # move checks and un-hex - # to routines - - if not salt or not B: - raise SRPAuthenticationError( - "Server did not send initial data.") - - try: - unhex_salt = safe_unhexlify(salt) - except TypeError: - raise SRPAuthenticationError( - "Bad data from server (salt)") - try: - unhex_B = safe_unhexlify(B) - except TypeError: - raise SRPAuthenticationError( - "Bad data from server (B)") - - self.M = self.srp_usr.process_challenge( - unhex_salt, - unhex_B - ) - - proof_data = self.get_server_proof_data() - - HAMK = proof_data.get("M2", None) - if not HAMK: - errors = proof_data.get('errors', None) - if errors: - logger.error(errors) - raise SRPAuthenticationError("Server did not send HAMK.") - - try: - unhex_HAMK = safe_unhexlify(HAMK) - except TypeError: - raise SRPAuthenticationError( - "Bad data from server (HAMK)") - - self.srp_usr.verify_session( - unhex_HAMK) - - try: - assert self.srp_usr.authenticated() - logger.debug('user is authenticated!') - except (AssertionError): - raise SRPAuthenticationError( - "Auth verification failed.") - - def __call__(self, req): - self.authenticate() - req.cookies = self.session.cookies - return req - - -def srpauth_protected(user=None, passwd=None, server=None, verify=True): - """ - decorator factory that accepts - user and password keyword arguments - and add those to the decorated request - """ - def srpauth(fn): - def wrapper(*args, **kwargs): - if user and passwd: - auth = SRPAuth(user, passwd, server, verify) - kwargs['auth'] = auth - kwargs['verify'] = verify - if not args: - logger.warning('attempting to get from empty uri!') - return fn(*args, **kwargs) - return wrapper - return srpauth - - -def get_leap_credentials(): - settings = QtCore.QSettings() - full_username = settings.value('username') - username, domain = full_username.split('@') - seed = settings.value('%s_seed' % domain, None) - password = leapkeyring.leap_get_password(full_username, seed=seed) - return (username, password) - - -# XXX TODO -# Pass verify as single argument, -# in srpauth_protected style - -def magick_srpauth(fn): - """ - decorator that gets user and password - from the config file and adds those to - the decorated request - """ - logger.debug('magick srp auth decorator called') - - def wrapper(*args, **kwargs): - #uri = args[0] - # XXX Ugh! - # Problem with this approach. - # This won't work when we're using - # api.foo.bar - # Unless we keep a table with the - # equivalencies... - user, passwd = get_leap_credentials() - - # XXX pass verify and server too - # (pop) - auth = SRPAuth(user, passwd) - kwargs['auth'] = auth - return fn(*args, **kwargs) - return wrapper - - -if __name__ == "__main__": - """ - To test against test_provider (twisted version) - Register an user: (will be valid during the session) - >>> python auth.py add test password - - Test login with that user: - >>> python auth.py login test password - """ - - import sys - - if len(sys.argv) not in (4, 5): - print 'Usage: auth <add|login> <user> <pass> [server]' - sys.exit(0) - - action = sys.argv[1] - user = sys.argv[2] - passwd = sys.argv[3] - - if len(sys.argv) == 5: - SERVER = sys.argv[4] - else: - SERVER = "https://localhost:8443" - - if action == "login": - - @srpauth_protected( - user=user, passwd=passwd, server=SERVER, verify=False) - def test_srp_protected_get(*args, **kwargs): - req = requests.get(*args, **kwargs) - req.raise_for_status - return req - - #req = test_srp_protected_get('https://localhost:8443/1/cert') - req = test_srp_protected_get('%s/1/cert' % SERVER) - #print 'cert :', req.content[:200] + "..." - print req.content - sys.exit(0) - - if action == "add": - auth = LeapSRPRegister(provider=SERVER, verify=False) - auth.register_user(user, passwd) |