From 5bbbdc6029855a2b0aeb830f4d2f9e5546d4585b Mon Sep 17 00:00:00 2001 From: antialias Date: Mon, 22 Oct 2012 17:29:26 -0400 Subject: Test HTTPS Server in place and returning canned responses. But verification failing. --- src/leap/base/auth.py | 17 ++++---- src/leap/base/tests/test_auth.py | 85 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 src/leap/base/tests/test_auth.py diff --git a/src/leap/base/auth.py b/src/leap/base/auth.py index 5a9ebe1d..d91e138b 100644 --- a/src/leap/base/auth.py +++ b/src/leap/base/auth.py @@ -119,12 +119,11 @@ safe_unhexlify = lambda x: binascii.unhexlify(x) \ class SRPAuth(requests.auth.AuthBase): - def __init__(self, username, password): + def __init__(self, username, password, server=SERVER, verify=True): self.username = username self.password = password - - # XXX init something similar to - # SERVER... + self.server = server + self.verify = verify self.init_data = None self.session = requests.session() @@ -153,8 +152,9 @@ class SRPAuth(requests.auth.AuthBase): def get_init_data(self): init_session = self.session.post( - SERVER + '/sessions', - data=self.get_auth_data()) + self.server + '/sessions', + data=self.get_auth_data(), + verify=self.verify) self.init_data = self.get_data(init_session) return self.init_data @@ -174,8 +174,9 @@ class SRPAuth(requests.auth.AuthBase): ) auth_result = self.session.put( - SERVER + '/sessions/' + self.username, - data={'client_auth': binascii.hexlify(self.M)}) + self.server + '/sessions/' + self.username, + data={'client_auth': binascii.hexlify(self.M)}, + verify=self.verify) # XXX check for errors auth_data = self.get_data(auth_result) diff --git a/src/leap/base/tests/test_auth.py b/src/leap/base/tests/test_auth.py new file mode 100644 index 00000000..7b5df99e --- /dev/null +++ b/src/leap/base/tests/test_auth.py @@ -0,0 +1,85 @@ +import cgi +import binascii +import json +import requests +import urlparse +try: + import unittest2 as unittest +except ImportError: + import unittest + +from mock import (patch, Mock) + +#XXX should be moved to a general location +from leap.eip.tests.test_checks import NoLogRequestHandler + +from leap.testing.basetest import BaseLeapTest +from BaseHTTPServer import BaseHTTPRequestHandler +from leap.testing.https_server import BaseHTTPSServerTestCase + +from leap.base.auth import SRPAuth + +USERNAME = "0ACOJK" +PASSWORD = "WG3HD06E7ZF3" +INIT_DATA = {u'B': u'd74a9f592193bba8a818dcf500f412f60ce1b999aa9b5166f59fbe02aee97be9ec71a5d62fd16dedd973041efd4c7de0568c0d0c38a3806c78fc96f9ffa59dde89e5a04969905a83b8e700ee9c03b5636ad99624ed1514319b3bdac10cde498c8e064adf2fe04bfc5ee5df0dd06693961190a16caa182c090e59ac52feec693e', + u'salt': u'd09ed33e'} +AUTH_RESULT = {u'M2': u'b040d0cd7ab1f93c4e87ffccdec07491782f2af303ad14f33dc4f0b4b2e40824'} + + +class SRP_SERVER_HTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest): + class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): + responses = { + '/': [ 'OK', '' ], + '/1/sessions': [ json.dumps(INIT_DATA) ], + '/1/sessions/' + USERNAME: [ json.dumps(AUTH_RESULT) ] + } + + def do_GET(self): + path = urlparse.urlparse(self.path) + message = '\n'.join(self.responses.get( + path.path, None)) + self.send_response(200) + self.end_headers() + self.wfile.write(message) + + def do_PUT(self): + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={'REQUEST_METHOD': 'PUT', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + data = dict( + (key, form[key].value) for key in form.keys()) + path = urlparse.urlparse(self.path) + message = '\n'.join( + self.responses.get( + path.path, '')) + + self.send_response(200) + self.end_headers() + self.wfile.write(message) + + def do_POST(self): + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + data = dict( + (key, form[key].value) for key in form.keys()) + path = urlparse.urlparse(self.path) + message = '\n'.join( + self.responses.get( + path.path, '')) + + self.send_response(200) + self.end_headers() + self.wfile.write(message) + + def test_srp_authenticate(self): + srp_auth = SRPAuth(USERNAME, PASSWORD, + "https://%s/1" % (self.get_server()), verify=False) + + srp_auth.authenticate() -- cgit v1.2.3 From 5029c3f63b44c30584f46a95aa3cf345bf64f0f5 Mon Sep 17 00:00:00 2001 From: antialias Date: Fri, 26 Oct 2012 18:21:16 -0400 Subject: Added generalize test to SRP URIs and failing test cases for SRP. --- src/leap/base/tests/test_auth.py | 53 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/src/leap/base/tests/test_auth.py b/src/leap/base/tests/test_auth.py index 7b5df99e..5652743d 100644 --- a/src/leap/base/tests/test_auth.py +++ b/src/leap/base/tests/test_auth.py @@ -17,7 +17,7 @@ from leap.testing.basetest import BaseLeapTest from BaseHTTPServer import BaseHTTPRequestHandler from leap.testing.https_server import BaseHTTPSServerTestCase -from leap.base.auth import SRPAuth +from leap.base.auth import SRPAuth, SRPAuthenticationError USERNAME = "0ACOJK" PASSWORD = "WG3HD06E7ZF3" @@ -82,4 +82,55 @@ class SRP_SERVER_HTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest): srp_auth = SRPAuth(USERNAME, PASSWORD, "https://%s/1" % (self.get_server()), verify=False) + # XXX We might want to raise different errors for SRP failures + #This should fail at salt/B check time + with patch.object(SRPAuth, "get_data") as mocked_post: + with self.assertRaises(SRPAuthenticationError): + mocked_post.return_value = json.loads("{}") + srp_auth.authenticate() + + #This should fail at verification time + with patch.object(SRPAuth, "get_data") as mocked_post: + with self.assertRaises(SRPAuthenticationError): + mocked_post.return_value = json.loads( + '{"salt":"%s", "B":"%s", "M2":"%s"}' % + (binascii.hexlify("fake"), binascii.hexlify("sofake"), + binascii.hexlify("realfake"))) + srp_auth.authenticate() + srp_auth.authenticate() + +class SRP_Protected_URI_Sequence(BaseHTTPSServerTestCase, BaseLeapTest): + class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): + # XXX get the real URIs and find the server side auth sequence + responses = { + '/1/get_cookie' : '', + '/1/get_protected' : '', + } + + def do_GET(self): + path = urlparse.urlparse(self.path) + message = '\n'.join(self.responses.get( + path.path, None)) + self.send_response(200) + if path.path == "/1/get_cookie": + self.send_header("set-cookie", "authorized=True") + if path.path == "/1/get_protected": + # XXX use a cookie library to do some abstraction + # and make this prettier + if self.headers.has_key("cookie") and \ + self.headers["cookie"].find("authorized=True") > -1: + self.send_header("set-cookie", "damn=right") + self.end_headers() + self.wfile.write(message) + + + def test_srp_protected_uri(self): + print self.get_server() + s = requests.session() + r1 = s.get("https://%s/1/get_cookie" % self.get_server(), verify=False) + self.assertEquals(r1.cookies["authorized"], 'True') + r2 = s.get("https://%s/1/get_protected" % self.get_server(), verify=False) + self.assertEquals(r2.cookies["damn"], 'right') + + -- cgit v1.2.3