From 090aed5e7c569e07b14d74ca71068a277cc39152 Mon Sep 17 00:00:00 2001 From: kali Date: Mon, 3 Sep 2012 04:18:15 +0900 Subject: basic download cert functionality --- src/leap/eip/checks.py | 58 +++++++++++++++++++++++++++++++---- src/leap/eip/tests/test_checks.py | 63 ++++++++++++++++++++++++++++----------- src/leap/testing/https_server.py | 5 +++- 3 files changed, 103 insertions(+), 23 deletions(-) diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index 4112ef57..b0fd6323 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -1,6 +1,7 @@ -import json +#import json import logging -import os +import ssl +#import os logging.basicConfig() logger = logging.getLogger(name=__name__) @@ -13,6 +14,7 @@ from leap.base import providers from leap.eip import config as eipconfig from leap.eip import constants as eipconstants from leap.eip import exceptions as eipexceptions +from leap.eip import specs as eipspecs """ EIPConfigChecker @@ -47,6 +49,7 @@ class ProviderCertChecker(object): """ def __init__(self, fetcher=requests): self.fetcher = fetcher + self.cacert = None def run_all(self, checker=None, skip_download=False): if not checker: @@ -88,18 +91,63 @@ class ProviderCertChecker(object): # certs package. try: from leap.custom import certs - certs.ca.pemfile except ImportError: raise + self.cacert = certs.where('cacert.pem') - def is_https_working(self, uri=None, cacert=None, verify=True): + def is_https_working(self, uri=None, verify=True): + # XXX raise InsecureURI or something better assert uri.startswith('https') + if verify is True and self.cacert is not None: + verify = self.cacert self.fetcher.get(uri, verify=verify) return True - def download_new_client_cert(self): + def download_new_client_cert(self, uri=None, verify=True): + if uri is None: + uri = self._get_client_cert_uri() + # XXX raise InsecureURI or something better + assert uri.startswith('https') + if verify is True and self.cacert is not None: + verify = self.cacert + req = self.fetcher.get(uri, verify=verify) + pemfile_content = req.content + self.validate_pemfile(pemfile_content) + cert_path = self._get_client_cert_path() + self.write_cert(pemfile_content, to=cert_path) return True + def validate_pemfile(self, cert_s): + """ + checks that the passed string + is a valid pem certificate + @param cert_s: string containing pem content + @type cert_s: string + @rtype: bool + """ + try: + # XXX get a real cert validation + # so far this is only checking begin/end + # delimiters :) + ssl.PEM_cert_to_DER_cert(cert_s) + except: + # XXX raise proper exception + raise + return True + + def _get_client_cert_uri(self): + # XXX TODO + # get from provider definition? + pass + + def _get_client_cert_path(self): + # MVS+ : get provider path + return eipspecs.client_cert_path() + + def write_cert(self, pemfile_content, to=None): + with open(to, 'w') as cert_f: + cert_f.write(pemfile_content) + class EIPConfigChecker(object): """ diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py index 781fdad5..541b884b 100644 --- a/src/leap/eip/tests/test_checks.py +++ b/src/leap/eip/tests/test_checks.py @@ -6,6 +6,7 @@ try: except ImportError: import unittest import os +import urlparse from mock import patch, Mock @@ -20,6 +21,7 @@ from leap.eip import exceptions as eipexceptions from leap.eip.tests import data as testdata from leap.testing.basetest import BaseLeapTest from leap.testing.https_server import BaseHTTPSServerTestCase +from leap.testing.https_server import where as where_cert class NoLogRequestHandler: @@ -228,13 +230,18 @@ class ProviderCertCheckerTest(BaseLeapTest): class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase): class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): + responses = { + '/': ['OK', ''], + '/client.cert': [ + '-----BEGIN CERTIFICATE-----', + '-----END CERTIFICATE-----'], + '/badclient.cert': [ + 'BADCERT']} + def do_GET(self): - #XXX use path to deliver foo stuff - #path = urlparse.urlparse(self.path) - #print path - message = '\n'.join([ - 'OK', - '']) + path = urlparse.urlparse(self.path) + message = '\n'.join(self.responses.get( + path.path, None)) self.send_response(200) self.end_headers() self.wfile.write(message) @@ -254,13 +261,13 @@ class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase): # for the two checks below, I know they fail because no ca # cert is passed to them, and I know that's the error that - # requests return with our implementation. However, I believe - # the right error should be SSL23_READ_BYTES: alert bad certificate - # or something similar. I guess we're receiving this because our + # requests return with our implementation. + # We're receiving this because our # server is dying prematurely when the handshake is interrupted on the - # client side. In any case I think that requests could handle - # this error more consistently and return a ConnectionError on a - # higher level. + # client side. + # Since we have access to the server, we could check that + # the error raised has been: + # SSL23_READ_BYTES: alert bad certificate with self.assertRaises(requests.exceptions.SSLError) as exc: fetcher.get(uri, verify=True) self.assertTrue( @@ -270,15 +277,37 @@ class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase): self.assertTrue( "SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message) - # XXX get cacert from testing.https_server + # get cacert from testing.https_server + cacert = where_cert('cacert.pem') + fetcher.get(uri, verify=cacert) + self.assertTrue(checker.is_https_working(uri=uri, verify=cacert)) + + # same, but get cacert from leap.custom + # XXX TODO! def test_download_new_client_cert(self): + uri = "https://%s/client.cert" % (self.get_server()) + cacert = where_cert('cacert.pem') checker = eipchecks.ProviderCertChecker() - self.assertTrue(checker.download_new_client_cert()) + self.assertTrue(checker.download_new_client_cert( + uri=uri, verify=cacert)) - #def test_download_bad_client_cert(self): - #checker = eipchecks.ProviderCertChecker() - #self.assertTrue(checker.download_new_client_cert()) + # now download a malformed cert + uri = "https://%s/badclient.cert" % (self.get_server()) + cacert = where_cert('cacert.pem') + checker = eipchecks.ProviderCertChecker() + with self.assertRaises(ValueError): + self.assertTrue(checker.download_new_client_cert( + uri=uri, verify=cacert)) + + # did we write cert to its path? + self.assertTrue(os.path.isfile(eipspecs.client_cert_path())) + certfile = eipspecs.client_cert_path() + with open(certfile, 'r') as cf: + certcontent = cf.read() + self.assertEqual(certcontent, + '\n'.join( + self.request_handler.responses['/client.cert'])) if __name__ == "__main__": diff --git a/src/leap/testing/https_server.py b/src/leap/testing/https_server.py index 18fc0da8..21191c32 100644 --- a/src/leap/testing/https_server.py +++ b/src/leap/testing/https_server.py @@ -58,7 +58,10 @@ class BaseHTTPSServerTestCase(unittest.TestCase): self.thread.stop() def get_server(self): - return "%s:%s" % (self.HOST, self.PORT) + host, port = self.HOST, self.PORT + if host == "127.0.0.1": + host = "localhost" + return "%s:%s" % (host, port) if __name__ == "__main__": -- cgit v1.2.3