diff options
author | kali <kali@leap.se> | 2012-09-04 01:09:53 +0900 |
---|---|---|
committer | kali <kali@leap.se> | 2012-09-04 01:10:45 +0900 |
commit | 8b3ad186e947ad962252c5d4b47a52ccb5514d98 (patch) | |
tree | dfa70cab48f1ecb85e7fdc0489cbb48ff03e4f66 /src/leap/eip/checks.py | |
parent | 6c4012fc128c5af1b75cf33eef00590cf0e82438 (diff) | |
parent | 37d7e272b7f8a649034a0cf60f6c4a1424bf767a (diff) |
Merge branch 'feature/provider-cert-check' into develop
For #501: write base.checks.ProviderCertChecks
(in eip.checks, though.) Basic functionality is there, merging to
tip. Might have to reopen to implement actual cert ts check.
Diffstat (limited to 'src/leap/eip/checks.py')
-rw-r--r-- | src/leap/eip/checks.py | 154 |
1 files changed, 151 insertions, 3 deletions
diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py index c6a7ca72..51a7e219 100644 --- a/src/leap/eip/checks.py +++ b/src/leap/eip/checks.py @@ -1,5 +1,6 @@ -import json +#import json import logging +import ssl import os logging.basicConfig() @@ -13,6 +14,7 @@ from leap.base import providers from leap.eip import config as eipconfig from leap.eip import constants as eipconstants from leap.eip import exceptions as eipexceptions +from leap.eip import specs as eipspecs """ EIPConfigChecker @@ -40,12 +42,158 @@ class LeapNetworkChecker(object): class ProviderCertChecker(object): - pass + """ + Several checks needed for getting + client certs and checking tls connection + with provider. + """ + def __init__(self, fetcher=requests): + self.fetcher = fetcher + self.cacert = None + + def run_all(self, checker=None, skip_download=False): + if not checker: + checker = self + + # For MVS+ + # checker.download_ca_cert() + # checker.download_ca_signature() + # checker.get_ca_signatures() + # checker.is_there_trust_path() + + # For MVS + checker.is_there_provider_ca() + checker.is_https_working() + checker.check_new_cert_needed() + #checker.download_new_client_cert() + + def download_ca_cert(self): + # MVS+ + raise NotImplementedError + + def download_ca_signature(self): + # MVS+ + raise NotImplementedError + + def get_ca_signatures(self): + # MVS+ + raise NotImplementedError + + def is_there_trust_path(self): + # MVS+ + raise NotImplementedError + + def is_there_provider_ca(self): + # XXX fake it till you make it! :P + return True + + # enable this when we have + # a custom "branded" bundle + # certs package. + try: + from leap.custom import certs + except ImportError: + raise + self.cacert = certs.where('cacert.pem') + + def is_https_working(self, uri=None, verify=True): + # XXX raise InsecureURI or something better + assert uri.startswith('https') + if verify is True and self.cacert is not None: + verify = self.cacert + self.fetcher.get(uri, verify=verify) + return True + + def check_new_cert_needed(self, skip_download=False): + if not self.is_cert_valid(do_raise=False): + self.download_new_client_cert(skip_download=skip_download) + return True + return False + + def download_new_client_cert(self, uri=None, verify=True, + skip_download=False): + if skip_download: + return True + if uri is None: + uri = self._get_client_cert_uri() + # XXX raise InsecureURI or something better + assert uri.startswith('https') + if verify is True and self.cacert is not None: + verify = self.cacert + req = self.fetcher.get(uri, verify=verify) + pemfile_content = req.content + self.is_valid_pemfile(pemfile_content) + cert_path = self._get_client_cert_path() + self.write_cert(pemfile_content, to=cert_path) + return True + + def is_cert_valid(self, cert_path=None, do_raise=True): + exists = lambda: self.is_certificate_exists() + valid_pemfile = lambda: self.is_valid_pemfile() + not_expired = lambda: self.is_cert_not_expired() + print 'exists?', exists + print 'valid', valid_pemfile + print 'not expired', not_expired + + valid = exists() and valid_pemfile() and not_expired() + if not valid: + if do_raise: + raise Exception('missing cert') + else: + return False + return True + + def is_certificate_exists(self, certfile=None): + if certfile is None: + certfile = self._get_client_cert_path() + return os.path.isfile(certfile) + + def is_cert_not_expired(self): + return True + # XXX TODO + # waiting on #507. If we're not using PyOpenSSL or anything alike + # we will have to roll our own x509 parsing to extract time info. + + def is_valid_pemfile(self, cert_s=None): + """ + checks that the passed string + is a valid pem certificate + @param cert_s: string containing pem content + @type cert_s: string + @rtype: bool + """ + if cert_s is None: + certfile = self._get_client_cert_path() + with open(certfile) as cf: + cert_s = cf.read() + try: + # XXX get a real cert validation + # so far this is only checking begin/end + # delimiters :) + ssl.PEM_cert_to_DER_cert(cert_s) + except: + # XXX raise proper exception + raise + return True + + def _get_client_cert_uri(self): + return "https://%s/cert/get" % (baseconstants.DEFAULT_TEST_PROVIDER) + + def _get_client_cert_path(self): + # MVS+ : get provider path + return eipspecs.client_cert_path() + + def is_cert_still_valid(self): + raise NotImplementedError + + def write_cert(self, pemfile_content, to=None): + with open(to, 'w') as cert_f: + cert_f.write(pemfile_content) class EIPConfigChecker(object): """ - Several tests needed + Several checks needed to ensure a EIPConnection can be sucessfully established. use run_all to run all checks. |