diff options
Diffstat (limited to 'src/leap/base')
-rw-r--r-- | src/leap/base/auth.py | 151 | ||||
-rw-r--r-- | src/leap/base/connection.py | 10 | ||||
-rw-r--r-- | src/leap/base/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/base/tests/test_auth.py | 137 |
4 files changed, 113 insertions, 185 deletions
diff --git a/src/leap/base/auth.py b/src/leap/base/auth.py index d91e138b..f1b618ba 100644 --- a/src/leap/base/auth.py +++ b/src/leap/base/auth.py @@ -1,7 +1,7 @@ import binascii import json import logging -import urlparse +#import urlparse import requests import srp @@ -9,13 +9,14 @@ import srp from PyQt4 import QtCore from leap.base import constants as baseconstants +from leap.crypto import leapkeyring logger = logging.getLogger(__name__) SIGNUP_TIMEOUT = getattr(baseconstants, 'SIGNUP_TIMEOUT', 5) # XXX remove me!! -SERVER = "http://springbok/1" +SERVER = "https://localhost:8443/1" """ @@ -36,6 +37,7 @@ class LeapSRPRegister(object): schema="https", provider=None, port=None, + verify=True, register_path="1/users.json", method="POST", fetcher=requests, @@ -46,6 +48,7 @@ class LeapSRPRegister(object): self.schema = schema self.provider = provider self.port = port + self.verify = verify self.register_path = register_path self.method = method self.fetcher = fetcher @@ -97,7 +100,8 @@ class LeapSRPRegister(object): # XXX get self.method req = self.session.post( uri, data=user_data, - timeout=SIGNUP_TIMEOUT) + timeout=SIGNUP_TIMEOUT, + verify=self.verify) logger.debug(req) logger.debug('user_data: %s', user_data) #logger.debug('response: %s', req.text) @@ -119,18 +123,20 @@ safe_unhexlify = lambda x: binascii.unhexlify(x) \ class SRPAuth(requests.auth.AuthBase): - def __init__(self, username, password, server=SERVER, verify=True): + def __init__(self, username, password, verify=None): self.username = username self.password = password - self.server = server self.verify = verify + # XXX init something similar to + # SERVER... + self.init_data = None self.session = requests.session() self.init_srp() - def get_data(self, response): + def get_json_data(self, response): return json.loads(response.content) def init_srp(self): @@ -151,43 +157,100 @@ class SRPAuth(requests.auth.AuthBase): } def get_init_data(self): - init_session = self.session.post( - self.server + '/sessions', - data=self.get_auth_data(), - verify=self.verify) - self.init_data = self.get_data(init_session) + try: + init_session = self.session.post( + SERVER + '/sessions.json/', + data=self.get_auth_data(), + verify=self.verify) + except requests.exceptions.ConnectionError: + raise SRPAuthenticationError( + "No connection made (salt).") + if init_session.status_code not in (200, ): + raise SRPAuthenticationError( + "No valid response (salt).") + + # XXX should get auth_result.json instead + self.init_data = self.get_json_data(init_session) return self.init_data + def get_server_proof_data(self): + try: + auth_result = self.session.put( + SERVER + '/sessions.json/' + self.username, + data={'client_auth': binascii.hexlify(self.M)}, + verify=self.verify) + except requests.exceptions.ConnectionError: + raise SRPAuthenticationError( + "No connection made (HAMK).") + + if auth_result.status_code not in (200, ): + raise SRPAuthenticationError( + "No valid response (HAMK).") + + # XXX should get auth_result.json instead + try: + self.auth_data = self.get_json_data(auth_result) + except ValueError: + raise SRPAuthenticationError( + "No valid data sent (HAMK)") + + return self.auth_data + def authenticate(self): - print 'start authentication...' + logger.debug('start authentication...') init_data = self.get_init_data() salt = init_data.get('salt', None) B = init_data.get('B', None) + # XXX refactor this function + # move checks and un-hex + # to routines + if not salt or not B: - raise SRPAuthenticationError + raise SRPAuthenticationError( + "Server did not send initial data.") + + try: + unhex_salt = safe_unhexlify(salt) + except TypeError: + raise SRPAuthenticationError( + "Bad data from server (salt)") + try: + unhex_B = safe_unhexlify(B) + except TypeError: + raise SRPAuthenticationError( + "Bad data from server (B)") self.M = self.srp_usr.process_challenge( - safe_unhexlify(salt), - safe_unhexlify(B) + unhex_salt, + unhex_B ) - auth_result = self.session.put( - self.server + '/sessions/' + self.username, - data={'client_auth': binascii.hexlify(self.M)}, - verify=self.verify) + proof_data = self.get_server_proof_data() + + HAMK = proof_data.get("M2", None) + if not HAMK: + errors = proof_data.get('errors', None) + if errors: + logger.error(errors) + raise SRPAuthenticationError("Server did not send HAMK.") + + try: + unhex_HAMK = safe_unhexlify(HAMK) + except TypeError: + raise SRPAuthenticationError( + "Bad data from server (HAMK)") - # XXX check for errors - auth_data = self.get_data(auth_result) self.srp_usr.verify_session( - safe_unhexlify(auth_data["M2"])) + unhex_HAMK) try: assert self.srp_usr.authenticated() - print 'user is authenticated!' + logger.debug('user is authenticated!') except (AssertionError): - raise SRPAuthenticationError + raise SRPAuthenticationError( + "Auth verification failed.") def __call__(self, req): self.authenticate() @@ -195,7 +258,7 @@ class SRPAuth(requests.auth.AuthBase): return req -def srpauth_protected(user=None, passwd=None): +def srpauth_protected(user=None, passwd=None, verify=True): """ decorator factory that accepts user and password keyword arguments @@ -205,41 +268,43 @@ def srpauth_protected(user=None, passwd=None): def wrapper(*args, **kwargs): print 'uri is ', args[0] if user and passwd: - auth = SRPAuth(user, passwd) + auth = SRPAuth(user, passwd, verify) kwargs['auth'] = auth return fn(*args, **kwargs) return wrapper return srpauth -def magic_srpauth(fn): +def get_leap_credentials(): + settings = QtCore.QSettings() + full_username = settings.value('eip_username') + username, domain = full_username.split('@') + seed = settings.value('%s_seed' % domain, None) + password = leapkeyring.leap_get_password(full_username, seed=seed) + return (username, password) + + +# XXX TODO +# Pass verify as single argument, +# in srpauth_protected style + +def magick_srpauth(fn): """ decorator that gets user and password from the config file and adds those to the decorated request """ - # TODO --- finish this... - # currently broken. + logger.debug('magick srp auth decorator called') + def wrapper(*args, **kwargs): - uri = args[0] + #uri = args[0] # XXX Ugh! # Problem with this approach. # This won't work when we're using # api.foo.bar # Unless we keep a table with the # equivalencies... - - domain = urlparse.urlparse(uri).netloc - - # XXX check this settings init... - settings = QtCore.QSettings() - user = settings.get('%s_username' % domain, None) - - # uh... I forgot. - # get secret? - # leapkeyring.get_password(foo?) - passwd = settings.get('%s_password' % domain, None) - + user, passwd = get_leap_credentials() auth = SRPAuth(user, passwd) kwargs['auth'] = auth return fn(*args, **kwargs) @@ -257,4 +322,4 @@ if __name__ == "__main__": req.raise_for_status #print req.content - test_srp_protected_get('http://springbok/1/cert') + test_srp_protected_get('http://localhost:8443/1/cert') diff --git a/src/leap/base/connection.py b/src/leap/base/connection.py index e478538d..41d13935 100644 --- a/src/leap/base/connection.py +++ b/src/leap/base/connection.py @@ -37,11 +37,11 @@ class Connection(Authentication): """ pass - def shutdown(self): - """ - shutdown and quit - """ - self.desired_con_state = self.status.DISCONNECTED + #def shutdown(self): + #""" + #shutdown and quit + #""" + #self.desired_con_state = self.status.DISCONNECTED def connection_state(self): """ diff --git a/src/leap/base/tests/__init__.py b/src/leap/base/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/base/tests/__init__.py diff --git a/src/leap/base/tests/test_auth.py b/src/leap/base/tests/test_auth.py deleted file mode 100644 index 541dea61..00000000 --- a/src/leap/base/tests/test_auth.py +++ /dev/null @@ -1,137 +0,0 @@ -import cgi -import binascii -import json -import requests -import urlparse -try: - import unittest2 as unittest -except ImportError: - import unittest - -from mock import (patch, Mock) - -#XXX should be moved to a general location -from leap.eip.tests.test_checks import NoLogRequestHandler - -from leap.testing.basetest import BaseLeapTest -from BaseHTTPServer import BaseHTTPRequestHandler -from leap.testing.https_server import BaseHTTPSServerTestCase - -from leap.base.auth import SRPAuth, SRPAuthenticationError - -USERNAME = "0ACOJK" -PASSWORD = "WG3HD06E7ZF3" -INIT_DATA = {u'B': u'd74a9f592193bba8a818dcf500f412f60ce1b999aa9b5166f59fbe02aee97be9ec71a5d62fd16dedd973041efd4c7de0568c0d0c38a3806c78fc96f9ffa59dde89e5a04969905a83b8e700ee9c03b5636ad99624ed1514319b3bdac10cde498c8e064adf2fe04bfc5ee5df0dd06693961190a16caa182c090e59ac52feec693e', - u'salt': u'd09ed33e'} -AUTH_RESULT = {u'M2': u'b040d0cd7ab1f93c4e87ffccdec07491782f2af303ad14f33dc4f0b4b2e40824'} -session_id = "'BAh7ByIPc2Vzc2lvbl9pZCIlNGU2ZGNhZDc4ZjNmMzE5YzRlMGUyNzJkMzBhYTA5ZTgiDHVzZXJfaWQiJWRhYzJmZGI4YTM5YmFjZGY4M2YyOWI4NDk2NTYzMDFl--6a322f6acb2f52b995bade4eaf54bd21820ab742" - - -class SRP_SERVER_HTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest): - class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): - responses = { - '/': ['OK', ''], - '/1/sessions': [json.dumps(INIT_DATA)], - '/1/sessions/' + USERNAME: [json.dumps(AUTH_RESULT)] - } - - def do_GET(self): - path = urlparse.urlparse(self.path) - message = '\n'.join(self.responses.get( - path.path, None)) - self.send_response(200) - self.end_headers() - self.wfile.write(message) - - def do_PUT(self): - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={'REQUEST_METHOD': 'PUT', - 'CONTENT_TYPE': self.headers['Content-Type'], - }) - data = dict( - (key, form[key].value) for key in form.keys()) - path = urlparse.urlparse(self.path) - message = '\n'.join( - self.responses.get( - path.path, '')) - - self.send_response(200) - self.end_headers() - self.wfile.write(message) - - def do_POST(self): - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={'REQUEST_METHOD': 'POST', - 'CONTENT_TYPE': self.headers['Content-Type'], - }) - data = dict( - (key, form[key].value) for key in form.keys()) - path = urlparse.urlparse(self.path) - message = '\n'.join( - self.responses.get( - path.path, '')) - - self.send_response(200) - self.end_headers() - self.wfile.write(message) - - def test_srp_authenticate(self): - srp_auth = SRPAuth(USERNAME, PASSWORD, - "https://%s/1" % (self.get_server()), verify=False) - - # XXX We might want to raise different errors for SRP failures - #This should fail at salt/B check time - with patch.object(SRPAuth, "get_data") as mocked_post: - with self.assertRaises(SRPAuthenticationError): - mocked_post.return_value = json.loads("{}") - srp_auth.authenticate() - - #This should fail at verification time - with patch.object(SRPAuth, "get_data") as mocked_post: - with self.assertRaises(SRPAuthenticationError): - mocked_post.return_value = json.loads( - '{"salt":"%s", "B":"%s", "M2":"%s"}' % - (binascii.hexlify("fake"), - binascii.hexlify("sofake"), - binascii.hexlify("realfake"))) - srp_auth.authenticate() - - srp_auth.authenticate() - - -class SRP_Protected_URI_Sequence(BaseHTTPSServerTestCase, BaseLeapTest): - class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): - # XXX get the real URIs and find the server side auth sequence - responses = { - '/1/cert': '', - '/1/get_protected': '', - } - - def do_GET(self): - path = urlparse.urlparse(self.path) - message = '\n'.join(self.responses.get( - path.path, None)) - self.send_response(200) - if path.path == "/1/cert": - self.send_header("set-cookie", "_session_id=" + session_id) - if path.path == "/1/get_protected": - # XXX use a cookie library to do some abstraction - # and make this prettier - if "cookie" in self.headers and \ - self.headers["cookie"].find("_session_id") > -1: - self.send_header("set-cookie", "damn=right") - self.end_headers() - self.wfile.write(message) - - def test_srp_protected_uri(self): - s = requests.session() - r1 = s.get("https://%s/1/cert" % - self.get_server(), verify=False) - self.assertEquals(r1.cookies["_session_id"], session_id) - r2 = s.get("https://%s/1/get_protected" % - self.get_server(), verify=False) - self.assertEquals(r2.cookies["damn"], 'right') |