summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkali <kali@leap.se>2012-09-03 04:18:15 +0900
committerkali <kali@leap.se>2012-09-03 04:18:15 +0900
commit090aed5e7c569e07b14d74ca71068a277cc39152 (patch)
tree55fcb679d00a274925d75b0360b56b0162a3bd41
parentb612f422bf156a3b3927038472ad885b1afa556e (diff)
basic download cert functionality
-rw-r--r--src/leap/eip/checks.py58
-rw-r--r--src/leap/eip/tests/test_checks.py63
-rw-r--r--src/leap/testing/https_server.py5
3 files changed, 103 insertions, 23 deletions
diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py
index 4112ef57..b0fd6323 100644
--- a/src/leap/eip/checks.py
+++ b/src/leap/eip/checks.py
@@ -1,6 +1,7 @@
-import json
+#import json
import logging
-import os
+import ssl
+#import os
logging.basicConfig()
logger = logging.getLogger(name=__name__)
@@ -13,6 +14,7 @@ from leap.base import providers
from leap.eip import config as eipconfig
from leap.eip import constants as eipconstants
from leap.eip import exceptions as eipexceptions
+from leap.eip import specs as eipspecs
"""
EIPConfigChecker
@@ -47,6 +49,7 @@ class ProviderCertChecker(object):
"""
def __init__(self, fetcher=requests):
self.fetcher = fetcher
+ self.cacert = None
def run_all(self, checker=None, skip_download=False):
if not checker:
@@ -88,18 +91,63 @@ class ProviderCertChecker(object):
# certs package.
try:
from leap.custom import certs
- certs.ca.pemfile
except ImportError:
raise
+ self.cacert = certs.where('cacert.pem')
- def is_https_working(self, uri=None, cacert=None, verify=True):
+ def is_https_working(self, uri=None, verify=True):
+ # XXX raise InsecureURI or something better
assert uri.startswith('https')
+ if verify is True and self.cacert is not None:
+ verify = self.cacert
self.fetcher.get(uri, verify=verify)
return True
- def download_new_client_cert(self):
+ def download_new_client_cert(self, uri=None, verify=True):
+ if uri is None:
+ uri = self._get_client_cert_uri()
+ # XXX raise InsecureURI or something better
+ assert uri.startswith('https')
+ if verify is True and self.cacert is not None:
+ verify = self.cacert
+ req = self.fetcher.get(uri, verify=verify)
+ pemfile_content = req.content
+ self.validate_pemfile(pemfile_content)
+ cert_path = self._get_client_cert_path()
+ self.write_cert(pemfile_content, to=cert_path)
return True
+ def validate_pemfile(self, cert_s):
+ """
+ checks that the passed string
+ is a valid pem certificate
+ @param cert_s: string containing pem content
+ @type cert_s: string
+ @rtype: bool
+ """
+ try:
+ # XXX get a real cert validation
+ # so far this is only checking begin/end
+ # delimiters :)
+ ssl.PEM_cert_to_DER_cert(cert_s)
+ except:
+ # XXX raise proper exception
+ raise
+ return True
+
+ def _get_client_cert_uri(self):
+ # XXX TODO
+ # get from provider definition?
+ pass
+
+ def _get_client_cert_path(self):
+ # MVS+ : get provider path
+ return eipspecs.client_cert_path()
+
+ def write_cert(self, pemfile_content, to=None):
+ with open(to, 'w') as cert_f:
+ cert_f.write(pemfile_content)
+
class EIPConfigChecker(object):
"""
diff --git a/src/leap/eip/tests/test_checks.py b/src/leap/eip/tests/test_checks.py
index 781fdad5..541b884b 100644
--- a/src/leap/eip/tests/test_checks.py
+++ b/src/leap/eip/tests/test_checks.py
@@ -6,6 +6,7 @@ try:
except ImportError:
import unittest
import os
+import urlparse
from mock import patch, Mock
@@ -20,6 +21,7 @@ from leap.eip import exceptions as eipexceptions
from leap.eip.tests import data as testdata
from leap.testing.basetest import BaseLeapTest
from leap.testing.https_server import BaseHTTPSServerTestCase
+from leap.testing.https_server import where as where_cert
class NoLogRequestHandler:
@@ -228,13 +230,18 @@ class ProviderCertCheckerTest(BaseLeapTest):
class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase):
class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
+ responses = {
+ '/': ['OK', ''],
+ '/client.cert': [
+ '-----BEGIN CERTIFICATE-----',
+ '-----END CERTIFICATE-----'],
+ '/badclient.cert': [
+ 'BADCERT']}
+
def do_GET(self):
- #XXX use path to deliver foo stuff
- #path = urlparse.urlparse(self.path)
- #print path
- message = '\n'.join([
- 'OK',
- ''])
+ path = urlparse.urlparse(self.path)
+ message = '\n'.join(self.responses.get(
+ path.path, None))
self.send_response(200)
self.end_headers()
self.wfile.write(message)
@@ -254,13 +261,13 @@ class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase):
# for the two checks below, I know they fail because no ca
# cert is passed to them, and I know that's the error that
- # requests return with our implementation. However, I believe
- # the right error should be SSL23_READ_BYTES: alert bad certificate
- # or something similar. I guess we're receiving this because our
+ # requests return with our implementation.
+ # We're receiving this because our
# server is dying prematurely when the handshake is interrupted on the
- # client side. In any case I think that requests could handle
- # this error more consistently and return a ConnectionError on a
- # higher level.
+ # client side.
+ # Since we have access to the server, we could check that
+ # the error raised has been:
+ # SSL23_READ_BYTES: alert bad certificate
with self.assertRaises(requests.exceptions.SSLError) as exc:
fetcher.get(uri, verify=True)
self.assertTrue(
@@ -270,15 +277,37 @@ class ProviderCertCheckerHTTPSTests(BaseHTTPSServerTestCase):
self.assertTrue(
"SSL23_GET_SERVER_HELLO:unknown protocol" in exc.message)
- # XXX get cacert from testing.https_server
+ # get cacert from testing.https_server
+ cacert = where_cert('cacert.pem')
+ fetcher.get(uri, verify=cacert)
+ self.assertTrue(checker.is_https_working(uri=uri, verify=cacert))
+
+ # same, but get cacert from leap.custom
+ # XXX TODO!
def test_download_new_client_cert(self):
+ uri = "https://%s/client.cert" % (self.get_server())
+ cacert = where_cert('cacert.pem')
checker = eipchecks.ProviderCertChecker()
- self.assertTrue(checker.download_new_client_cert())
+ self.assertTrue(checker.download_new_client_cert(
+ uri=uri, verify=cacert))
- #def test_download_bad_client_cert(self):
- #checker = eipchecks.ProviderCertChecker()
- #self.assertTrue(checker.download_new_client_cert())
+ # now download a malformed cert
+ uri = "https://%s/badclient.cert" % (self.get_server())
+ cacert = where_cert('cacert.pem')
+ checker = eipchecks.ProviderCertChecker()
+ with self.assertRaises(ValueError):
+ self.assertTrue(checker.download_new_client_cert(
+ uri=uri, verify=cacert))
+
+ # did we write cert to its path?
+ self.assertTrue(os.path.isfile(eipspecs.client_cert_path()))
+ certfile = eipspecs.client_cert_path()
+ with open(certfile, 'r') as cf:
+ certcontent = cf.read()
+ self.assertEqual(certcontent,
+ '\n'.join(
+ self.request_handler.responses['/client.cert']))
if __name__ == "__main__":
diff --git a/src/leap/testing/https_server.py b/src/leap/testing/https_server.py
index 18fc0da8..21191c32 100644
--- a/src/leap/testing/https_server.py
+++ b/src/leap/testing/https_server.py
@@ -58,7 +58,10 @@ class BaseHTTPSServerTestCase(unittest.TestCase):
self.thread.stop()
def get_server(self):
- return "%s:%s" % (self.HOST, self.PORT)
+ host, port = self.HOST, self.PORT
+ if host == "127.0.0.1":
+ host = "localhost"
+ return "%s:%s" % (host, port)
if __name__ == "__main__":