summaryrefslogtreecommitdiff
path: root/src/leap/base
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/base')
-rw-r--r--src/leap/base/auth.py151
-rw-r--r--src/leap/base/connection.py10
-rw-r--r--src/leap/base/tests/__init__.py0
-rw-r--r--src/leap/base/tests/test_auth.py137
4 files changed, 113 insertions, 185 deletions
diff --git a/src/leap/base/auth.py b/src/leap/base/auth.py
index d91e138b..f1b618ba 100644
--- a/src/leap/base/auth.py
+++ b/src/leap/base/auth.py
@@ -1,7 +1,7 @@
import binascii
import json
import logging
-import urlparse
+#import urlparse
import requests
import srp
@@ -9,13 +9,14 @@ import srp
from PyQt4 import QtCore
from leap.base import constants as baseconstants
+from leap.crypto import leapkeyring
logger = logging.getLogger(__name__)
SIGNUP_TIMEOUT = getattr(baseconstants, 'SIGNUP_TIMEOUT', 5)
# XXX remove me!!
-SERVER = "http://springbok/1"
+SERVER = "https://localhost:8443/1"
"""
@@ -36,6 +37,7 @@ class LeapSRPRegister(object):
schema="https",
provider=None,
port=None,
+ verify=True,
register_path="1/users.json",
method="POST",
fetcher=requests,
@@ -46,6 +48,7 @@ class LeapSRPRegister(object):
self.schema = schema
self.provider = provider
self.port = port
+ self.verify = verify
self.register_path = register_path
self.method = method
self.fetcher = fetcher
@@ -97,7 +100,8 @@ class LeapSRPRegister(object):
# XXX get self.method
req = self.session.post(
uri, data=user_data,
- timeout=SIGNUP_TIMEOUT)
+ timeout=SIGNUP_TIMEOUT,
+ verify=self.verify)
logger.debug(req)
logger.debug('user_data: %s', user_data)
#logger.debug('response: %s', req.text)
@@ -119,18 +123,20 @@ safe_unhexlify = lambda x: binascii.unhexlify(x) \
class SRPAuth(requests.auth.AuthBase):
- def __init__(self, username, password, server=SERVER, verify=True):
+ def __init__(self, username, password, verify=None):
self.username = username
self.password = password
- self.server = server
self.verify = verify
+ # XXX init something similar to
+ # SERVER...
+
self.init_data = None
self.session = requests.session()
self.init_srp()
- def get_data(self, response):
+ def get_json_data(self, response):
return json.loads(response.content)
def init_srp(self):
@@ -151,43 +157,100 @@ class SRPAuth(requests.auth.AuthBase):
}
def get_init_data(self):
- init_session = self.session.post(
- self.server + '/sessions',
- data=self.get_auth_data(),
- verify=self.verify)
- self.init_data = self.get_data(init_session)
+ try:
+ init_session = self.session.post(
+ SERVER + '/sessions.json/',
+ data=self.get_auth_data(),
+ verify=self.verify)
+ except requests.exceptions.ConnectionError:
+ raise SRPAuthenticationError(
+ "No connection made (salt).")
+ if init_session.status_code not in (200, ):
+ raise SRPAuthenticationError(
+ "No valid response (salt).")
+
+ # XXX should get auth_result.json instead
+ self.init_data = self.get_json_data(init_session)
return self.init_data
+ def get_server_proof_data(self):
+ try:
+ auth_result = self.session.put(
+ SERVER + '/sessions.json/' + self.username,
+ data={'client_auth': binascii.hexlify(self.M)},
+ verify=self.verify)
+ except requests.exceptions.ConnectionError:
+ raise SRPAuthenticationError(
+ "No connection made (HAMK).")
+
+ if auth_result.status_code not in (200, ):
+ raise SRPAuthenticationError(
+ "No valid response (HAMK).")
+
+ # XXX should get auth_result.json instead
+ try:
+ self.auth_data = self.get_json_data(auth_result)
+ except ValueError:
+ raise SRPAuthenticationError(
+ "No valid data sent (HAMK)")
+
+ return self.auth_data
+
def authenticate(self):
- print 'start authentication...'
+ logger.debug('start authentication...')
init_data = self.get_init_data()
salt = init_data.get('salt', None)
B = init_data.get('B', None)
+ # XXX refactor this function
+ # move checks and un-hex
+ # to routines
+
if not salt or not B:
- raise SRPAuthenticationError
+ raise SRPAuthenticationError(
+ "Server did not send initial data.")
+
+ try:
+ unhex_salt = safe_unhexlify(salt)
+ except TypeError:
+ raise SRPAuthenticationError(
+ "Bad data from server (salt)")
+ try:
+ unhex_B = safe_unhexlify(B)
+ except TypeError:
+ raise SRPAuthenticationError(
+ "Bad data from server (B)")
self.M = self.srp_usr.process_challenge(
- safe_unhexlify(salt),
- safe_unhexlify(B)
+ unhex_salt,
+ unhex_B
)
- auth_result = self.session.put(
- self.server + '/sessions/' + self.username,
- data={'client_auth': binascii.hexlify(self.M)},
- verify=self.verify)
+ proof_data = self.get_server_proof_data()
+
+ HAMK = proof_data.get("M2", None)
+ if not HAMK:
+ errors = proof_data.get('errors', None)
+ if errors:
+ logger.error(errors)
+ raise SRPAuthenticationError("Server did not send HAMK.")
+
+ try:
+ unhex_HAMK = safe_unhexlify(HAMK)
+ except TypeError:
+ raise SRPAuthenticationError(
+ "Bad data from server (HAMK)")
- # XXX check for errors
- auth_data = self.get_data(auth_result)
self.srp_usr.verify_session(
- safe_unhexlify(auth_data["M2"]))
+ unhex_HAMK)
try:
assert self.srp_usr.authenticated()
- print 'user is authenticated!'
+ logger.debug('user is authenticated!')
except (AssertionError):
- raise SRPAuthenticationError
+ raise SRPAuthenticationError(
+ "Auth verification failed.")
def __call__(self, req):
self.authenticate()
@@ -195,7 +258,7 @@ class SRPAuth(requests.auth.AuthBase):
return req
-def srpauth_protected(user=None, passwd=None):
+def srpauth_protected(user=None, passwd=None, verify=True):
"""
decorator factory that accepts
user and password keyword arguments
@@ -205,41 +268,43 @@ def srpauth_protected(user=None, passwd=None):
def wrapper(*args, **kwargs):
print 'uri is ', args[0]
if user and passwd:
- auth = SRPAuth(user, passwd)
+ auth = SRPAuth(user, passwd, verify)
kwargs['auth'] = auth
return fn(*args, **kwargs)
return wrapper
return srpauth
-def magic_srpauth(fn):
+def get_leap_credentials():
+ settings = QtCore.QSettings()
+ full_username = settings.value('eip_username')
+ username, domain = full_username.split('@')
+ seed = settings.value('%s_seed' % domain, None)
+ password = leapkeyring.leap_get_password(full_username, seed=seed)
+ return (username, password)
+
+
+# XXX TODO
+# Pass verify as single argument,
+# in srpauth_protected style
+
+def magick_srpauth(fn):
"""
decorator that gets user and password
from the config file and adds those to
the decorated request
"""
- # TODO --- finish this...
- # currently broken.
+ logger.debug('magick srp auth decorator called')
+
def wrapper(*args, **kwargs):
- uri = args[0]
+ #uri = args[0]
# XXX Ugh!
# Problem with this approach.
# This won't work when we're using
# api.foo.bar
# Unless we keep a table with the
# equivalencies...
-
- domain = urlparse.urlparse(uri).netloc
-
- # XXX check this settings init...
- settings = QtCore.QSettings()
- user = settings.get('%s_username' % domain, None)
-
- # uh... I forgot.
- # get secret?
- # leapkeyring.get_password(foo?)
- passwd = settings.get('%s_password' % domain, None)
-
+ user, passwd = get_leap_credentials()
auth = SRPAuth(user, passwd)
kwargs['auth'] = auth
return fn(*args, **kwargs)
@@ -257,4 +322,4 @@ if __name__ == "__main__":
req.raise_for_status
#print req.content
- test_srp_protected_get('http://springbok/1/cert')
+ test_srp_protected_get('http://localhost:8443/1/cert')
diff --git a/src/leap/base/connection.py b/src/leap/base/connection.py
index e478538d..41d13935 100644
--- a/src/leap/base/connection.py
+++ b/src/leap/base/connection.py
@@ -37,11 +37,11 @@ class Connection(Authentication):
"""
pass
- def shutdown(self):
- """
- shutdown and quit
- """
- self.desired_con_state = self.status.DISCONNECTED
+ #def shutdown(self):
+ #"""
+ #shutdown and quit
+ #"""
+ #self.desired_con_state = self.status.DISCONNECTED
def connection_state(self):
"""
diff --git a/src/leap/base/tests/__init__.py b/src/leap/base/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/base/tests/__init__.py
diff --git a/src/leap/base/tests/test_auth.py b/src/leap/base/tests/test_auth.py
deleted file mode 100644
index 541dea61..00000000
--- a/src/leap/base/tests/test_auth.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import cgi
-import binascii
-import json
-import requests
-import urlparse
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-
-from mock import (patch, Mock)
-
-#XXX should be moved to a general location
-from leap.eip.tests.test_checks import NoLogRequestHandler
-
-from leap.testing.basetest import BaseLeapTest
-from BaseHTTPServer import BaseHTTPRequestHandler
-from leap.testing.https_server import BaseHTTPSServerTestCase
-
-from leap.base.auth import SRPAuth, SRPAuthenticationError
-
-USERNAME = "0ACOJK"
-PASSWORD = "WG3HD06E7ZF3"
-INIT_DATA = {u'B': u'd74a9f592193bba8a818dcf500f412f60ce1b999aa9b5166f59fbe02aee97be9ec71a5d62fd16dedd973041efd4c7de0568c0d0c38a3806c78fc96f9ffa59dde89e5a04969905a83b8e700ee9c03b5636ad99624ed1514319b3bdac10cde498c8e064adf2fe04bfc5ee5df0dd06693961190a16caa182c090e59ac52feec693e',
- u'salt': u'd09ed33e'}
-AUTH_RESULT = {u'M2': u'b040d0cd7ab1f93c4e87ffccdec07491782f2af303ad14f33dc4f0b4b2e40824'}
-session_id = "'BAh7ByIPc2Vzc2lvbl9pZCIlNGU2ZGNhZDc4ZjNmMzE5YzRlMGUyNzJkMzBhYTA5ZTgiDHVzZXJfaWQiJWRhYzJmZGI4YTM5YmFjZGY4M2YyOWI4NDk2NTYzMDFl--6a322f6acb2f52b995bade4eaf54bd21820ab742"
-
-
-class SRP_SERVER_HTTPSTests(BaseHTTPSServerTestCase, BaseLeapTest):
- class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
- responses = {
- '/': ['OK', ''],
- '/1/sessions': [json.dumps(INIT_DATA)],
- '/1/sessions/' + USERNAME: [json.dumps(AUTH_RESULT)]
- }
-
- def do_GET(self):
- path = urlparse.urlparse(self.path)
- message = '\n'.join(self.responses.get(
- path.path, None))
- self.send_response(200)
- self.end_headers()
- self.wfile.write(message)
-
- def do_PUT(self):
- form = cgi.FieldStorage(
- fp=self.rfile,
- headers=self.headers,
- environ={'REQUEST_METHOD': 'PUT',
- 'CONTENT_TYPE': self.headers['Content-Type'],
- })
- data = dict(
- (key, form[key].value) for key in form.keys())
- path = urlparse.urlparse(self.path)
- message = '\n'.join(
- self.responses.get(
- path.path, ''))
-
- self.send_response(200)
- self.end_headers()
- self.wfile.write(message)
-
- def do_POST(self):
- form = cgi.FieldStorage(
- fp=self.rfile,
- headers=self.headers,
- environ={'REQUEST_METHOD': 'POST',
- 'CONTENT_TYPE': self.headers['Content-Type'],
- })
- data = dict(
- (key, form[key].value) for key in form.keys())
- path = urlparse.urlparse(self.path)
- message = '\n'.join(
- self.responses.get(
- path.path, ''))
-
- self.send_response(200)
- self.end_headers()
- self.wfile.write(message)
-
- def test_srp_authenticate(self):
- srp_auth = SRPAuth(USERNAME, PASSWORD,
- "https://%s/1" % (self.get_server()), verify=False)
-
- # XXX We might want to raise different errors for SRP failures
- #This should fail at salt/B check time
- with patch.object(SRPAuth, "get_data") as mocked_post:
- with self.assertRaises(SRPAuthenticationError):
- mocked_post.return_value = json.loads("{}")
- srp_auth.authenticate()
-
- #This should fail at verification time
- with patch.object(SRPAuth, "get_data") as mocked_post:
- with self.assertRaises(SRPAuthenticationError):
- mocked_post.return_value = json.loads(
- '{"salt":"%s", "B":"%s", "M2":"%s"}' %
- (binascii.hexlify("fake"),
- binascii.hexlify("sofake"),
- binascii.hexlify("realfake")))
- srp_auth.authenticate()
-
- srp_auth.authenticate()
-
-
-class SRP_Protected_URI_Sequence(BaseHTTPSServerTestCase, BaseLeapTest):
- class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
- # XXX get the real URIs and find the server side auth sequence
- responses = {
- '/1/cert': '',
- '/1/get_protected': '',
- }
-
- def do_GET(self):
- path = urlparse.urlparse(self.path)
- message = '\n'.join(self.responses.get(
- path.path, None))
- self.send_response(200)
- if path.path == "/1/cert":
- self.send_header("set-cookie", "_session_id=" + session_id)
- if path.path == "/1/get_protected":
- # XXX use a cookie library to do some abstraction
- # and make this prettier
- if "cookie" in self.headers and \
- self.headers["cookie"].find("_session_id") > -1:
- self.send_header("set-cookie", "damn=right")
- self.end_headers()
- self.wfile.write(message)
-
- def test_srp_protected_uri(self):
- s = requests.session()
- r1 = s.get("https://%s/1/cert" %
- self.get_server(), verify=False)
- self.assertEquals(r1.cookies["_session_id"], session_id)
- r2 = s.get("https://%s/1/get_protected" %
- self.get_server(), verify=False)
- self.assertEquals(r2.cookies["damn"], 'right')