From b612f422bf156a3b3927038472ad885b1afa556e Mon Sep 17 00:00:00 2001 From: kali Date: Mon, 3 Sep 2012 02:51:41 +0900 Subject: providercertchecks:check_https_is_working implementing a https server with its own base testcase for convenience. https is delicate, and I think it's better checking against a real implementation than mocking everything here. --- src/leap/eip/checks.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) (limited to 'src/leap/eip/checks.py') diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index c6a7ca72..4112ef57 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -40,12 +40,70 @@ class LeapNetworkChecker(object): class ProviderCertChecker(object): - pass + """ + Several checks needed for getting + client certs and checking tls connection + with provider. + """ + def __init__(self, fetcher=requests): + self.fetcher = fetcher + + def run_all(self, checker=None, skip_download=False): + if not checker: + checker = self + + # For MVS+ + # checker.download_ca_cert() + # checker.download_ca_signature() + # checker.get_ca_signatures() + # checker.is_there_trust_path() + + # For MVS + checker.is_there_provider_ca() + checker.is_https_working() + checker.download_new_client_cert() + + def download_ca_cert(self): + # MVS+ + raise NotImplementedError + + def download_ca_signature(self): + # MVS+ + raise NotImplementedError + + def get_ca_signatures(self): + # MVS+ + raise NotImplementedError + + def is_there_trust_path(self): + # MVS+ + raise NotImplementedError + + def is_there_provider_ca(self): + # XXX fake it till you make it! :P + return True + + # enable this when we have + # a custom "branded" bundle + # certs package. + try: + from leap.custom import certs + certs.ca.pemfile + except ImportError: + raise + + def is_https_working(self, uri=None, cacert=None, verify=True): + assert uri.startswith('https') + self.fetcher.get(uri, verify=verify) + return True + + def download_new_client_cert(self): + return True class EIPConfigChecker(object): """ - Several tests needed + Several checks needed to ensure a EIPConnection can be sucessfully established. use run_all to run all checks. -- cgit v1.2.3 From 090aed5e7c569e07b14d74ca71068a277cc39152 Mon Sep 17 00:00:00 2001 From: kali Date: Mon, 3 Sep 2012 04:18:15 +0900 Subject: basic download cert functionality --- src/leap/eip/checks.py | 58 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 5 deletions(-) (limited to 'src/leap/eip/checks.py') diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index 4112ef57..b0fd6323 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -1,6 +1,7 @@ -import json +#import json import logging -import os +import ssl +#import os logging.basicConfig() logger = logging.getLogger(name=__name__) @@ -13,6 +14,7 @@ from leap.base import providers from leap.eip import config as eipconfig from leap.eip import constants as eipconstants from leap.eip import exceptions as eipexceptions +from leap.eip import specs as eipspecs """ EIPConfigChecker @@ -47,6 +49,7 @@ class ProviderCertChecker(object): """ def __init__(self, fetcher=requests): self.fetcher = fetcher + self.cacert = None def run_all(self, checker=None, skip_download=False): if not checker: @@ -88,18 +91,63 @@ class ProviderCertChecker(object): # certs package. try: from leap.custom import certs - certs.ca.pemfile except ImportError: raise + self.cacert = certs.where('cacert.pem') - def is_https_working(self, uri=None, cacert=None, verify=True): + def is_https_working(self, uri=None, verify=True): + # XXX raise InsecureURI or something better assert uri.startswith('https') + if verify is True and self.cacert is not None: + verify = self.cacert self.fetcher.get(uri, verify=verify) return True - def download_new_client_cert(self): + def download_new_client_cert(self, uri=None, verify=True): + if uri is None: + uri = self._get_client_cert_uri() + # XXX raise InsecureURI or something better + assert uri.startswith('https') + if verify is True and self.cacert is not None: + verify = self.cacert + req = self.fetcher.get(uri, verify=verify) + pemfile_content = req.content + self.validate_pemfile(pemfile_content) + cert_path = self._get_client_cert_path() + self.write_cert(pemfile_content, to=cert_path) return True + def validate_pemfile(self, cert_s): + """ + checks that the passed string + is a valid pem certificate + @param cert_s: string containing pem content + @type cert_s: string + @rtype: bool + """ + try: + # XXX get a real cert validation + # so far this is only checking begin/end + # delimiters :) + ssl.PEM_cert_to_DER_cert(cert_s) + except: + # XXX raise proper exception + raise + return True + + def _get_client_cert_uri(self): + # XXX TODO + # get from provider definition? + pass + + def _get_client_cert_path(self): + # MVS+ : get provider path + return eipspecs.client_cert_path() + + def write_cert(self, pemfile_content, to=None): + with open(to, 'w') as cert_f: + cert_f.write(pemfile_content) + class EIPConfigChecker(object): """ -- cgit v1.2.3 From 37d7e272b7f8a649034a0cf60f6c4a1424bf767a Mon Sep 17 00:00:00 2001 From: kali Date: Tue, 4 Sep 2012 01:08:05 +0900 Subject: better separate cert validation/download logic stubbing out the timestamp validity check (waiting for #507) also some more deep tests are missing, wrote todo in tests. --- src/leap/eip/checks.py | 58 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 8 deletions(-) (limited to 'src/leap/eip/checks.py') diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index b0fd6323..51a7e219 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -1,7 +1,7 @@ #import json import logging import ssl -#import os +import os logging.basicConfig() logger = logging.getLogger(name=__name__) @@ -64,7 +64,8 @@ class ProviderCertChecker(object): # For MVS checker.is_there_provider_ca() checker.is_https_working() - checker.download_new_client_cert() + checker.check_new_cert_needed() + #checker.download_new_client_cert() def download_ca_cert(self): # MVS+ @@ -103,7 +104,16 @@ class ProviderCertChecker(object): self.fetcher.get(uri, verify=verify) return True - def download_new_client_cert(self, uri=None, verify=True): + def check_new_cert_needed(self, skip_download=False): + if not self.is_cert_valid(do_raise=False): + self.download_new_client_cert(skip_download=skip_download) + return True + return False + + def download_new_client_cert(self, uri=None, verify=True, + skip_download=False): + if skip_download: + return True if uri is None: uri = self._get_client_cert_uri() # XXX raise InsecureURI or something better @@ -112,12 +122,39 @@ class ProviderCertChecker(object): verify = self.cacert req = self.fetcher.get(uri, verify=verify) pemfile_content = req.content - self.validate_pemfile(pemfile_content) + self.is_valid_pemfile(pemfile_content) cert_path = self._get_client_cert_path() self.write_cert(pemfile_content, to=cert_path) return True - def validate_pemfile(self, cert_s): + def is_cert_valid(self, cert_path=None, do_raise=True): + exists = lambda: self.is_certificate_exists() + valid_pemfile = lambda: self.is_valid_pemfile() + not_expired = lambda: self.is_cert_not_expired() + print 'exists?', exists + print 'valid', valid_pemfile + print 'not expired', not_expired + + valid = exists() and valid_pemfile() and not_expired() + if not valid: + if do_raise: + raise Exception('missing cert') + else: + return False + return True + + def is_certificate_exists(self, certfile=None): + if certfile is None: + certfile = self._get_client_cert_path() + return os.path.isfile(certfile) + + def is_cert_not_expired(self): + return True + # XXX TODO + # waiting on #507. If we're not using PyOpenSSL or anything alike + # we will have to roll our own x509 parsing to extract time info. + + def is_valid_pemfile(self, cert_s=None): """ checks that the passed string is a valid pem certificate @@ -125,6 +162,10 @@ class ProviderCertChecker(object): @type cert_s: string @rtype: bool """ + if cert_s is None: + certfile = self._get_client_cert_path() + with open(certfile) as cf: + cert_s = cf.read() try: # XXX get a real cert validation # so far this is only checking begin/end @@ -136,14 +177,15 @@ class ProviderCertChecker(object): return True def _get_client_cert_uri(self): - # XXX TODO - # get from provider definition? - pass + return "https://%s/cert/get" % (baseconstants.DEFAULT_TEST_PROVIDER) def _get_client_cert_path(self): # MVS+ : get provider path return eipspecs.client_cert_path() + def is_cert_still_valid(self): + raise NotImplementedError + def write_cert(self, pemfile_content, to=None): with open(to, 'w') as cert_f: cert_f.write(pemfile_content) -- cgit v1.2.3