summaryrefslogtreecommitdiff
path: root/src/leap/eip/checks.py
diff options
context:
space:
mode:
authorkali <kali@leap.se>2012-09-04 01:09:53 +0900
committerkali <kali@leap.se>2012-09-04 01:10:45 +0900
commit8b3ad186e947ad962252c5d4b47a52ccb5514d98 (patch)
treedfa70cab48f1ecb85e7fdc0489cbb48ff03e4f66 /src/leap/eip/checks.py
parent6c4012fc128c5af1b75cf33eef00590cf0e82438 (diff)
parent37d7e272b7f8a649034a0cf60f6c4a1424bf767a (diff)
Merge branch 'feature/provider-cert-check' into develop
For #501: write base.checks.ProviderCertChecks (in eip.checks, though.) Basic functionality is there, merging to tip. Might have to reopen to implement actual cert ts check.
Diffstat (limited to 'src/leap/eip/checks.py')
-rw-r--r--src/leap/eip/checks.py154
1 files changed, 151 insertions, 3 deletions
diff --git a/src/leap/eip/checks.py b/src/leap/eip/checks.py
index c6a7ca72..51a7e219 100644
--- a/src/leap/eip/checks.py
+++ b/src/leap/eip/checks.py
@@ -1,5 +1,6 @@
-import json
+#import json
import logging
+import ssl
import os
logging.basicConfig()
@@ -13,6 +14,7 @@ from leap.base import providers
from leap.eip import config as eipconfig
from leap.eip import constants as eipconstants
from leap.eip import exceptions as eipexceptions
+from leap.eip import specs as eipspecs
"""
EIPConfigChecker
@@ -40,12 +42,158 @@ class LeapNetworkChecker(object):
class ProviderCertChecker(object):
- pass
+ """
+ Several checks needed for getting
+ client certs and checking tls connection
+ with provider.
+ """
+ def __init__(self, fetcher=requests):
+ self.fetcher = fetcher
+ self.cacert = None
+
+ def run_all(self, checker=None, skip_download=False):
+ if not checker:
+ checker = self
+
+ # For MVS+
+ # checker.download_ca_cert()
+ # checker.download_ca_signature()
+ # checker.get_ca_signatures()
+ # checker.is_there_trust_path()
+
+ # For MVS
+ checker.is_there_provider_ca()
+ checker.is_https_working()
+ checker.check_new_cert_needed()
+ #checker.download_new_client_cert()
+
+ def download_ca_cert(self):
+ # MVS+
+ raise NotImplementedError
+
+ def download_ca_signature(self):
+ # MVS+
+ raise NotImplementedError
+
+ def get_ca_signatures(self):
+ # MVS+
+ raise NotImplementedError
+
+ def is_there_trust_path(self):
+ # MVS+
+ raise NotImplementedError
+
+ def is_there_provider_ca(self):
+ # XXX fake it till you make it! :P
+ return True
+
+ # enable this when we have
+ # a custom "branded" bundle
+ # certs package.
+ try:
+ from leap.custom import certs
+ except ImportError:
+ raise
+ self.cacert = certs.where('cacert.pem')
+
+ def is_https_working(self, uri=None, verify=True):
+ # XXX raise InsecureURI or something better
+ assert uri.startswith('https')
+ if verify is True and self.cacert is not None:
+ verify = self.cacert
+ self.fetcher.get(uri, verify=verify)
+ return True
+
+ def check_new_cert_needed(self, skip_download=False):
+ if not self.is_cert_valid(do_raise=False):
+ self.download_new_client_cert(skip_download=skip_download)
+ return True
+ return False
+
+ def download_new_client_cert(self, uri=None, verify=True,
+ skip_download=False):
+ if skip_download:
+ return True
+ if uri is None:
+ uri = self._get_client_cert_uri()
+ # XXX raise InsecureURI or something better
+ assert uri.startswith('https')
+ if verify is True and self.cacert is not None:
+ verify = self.cacert
+ req = self.fetcher.get(uri, verify=verify)
+ pemfile_content = req.content
+ self.is_valid_pemfile(pemfile_content)
+ cert_path = self._get_client_cert_path()
+ self.write_cert(pemfile_content, to=cert_path)
+ return True
+
+ def is_cert_valid(self, cert_path=None, do_raise=True):
+ exists = lambda: self.is_certificate_exists()
+ valid_pemfile = lambda: self.is_valid_pemfile()
+ not_expired = lambda: self.is_cert_not_expired()
+ print 'exists?', exists
+ print 'valid', valid_pemfile
+ print 'not expired', not_expired
+
+ valid = exists() and valid_pemfile() and not_expired()
+ if not valid:
+ if do_raise:
+ raise Exception('missing cert')
+ else:
+ return False
+ return True
+
+ def is_certificate_exists(self, certfile=None):
+ if certfile is None:
+ certfile = self._get_client_cert_path()
+ return os.path.isfile(certfile)
+
+ def is_cert_not_expired(self):
+ return True
+ # XXX TODO
+ # waiting on #507. If we're not using PyOpenSSL or anything alike
+ # we will have to roll our own x509 parsing to extract time info.
+
+ def is_valid_pemfile(self, cert_s=None):
+ """
+ checks that the passed string
+ is a valid pem certificate
+ @param cert_s: string containing pem content
+ @type cert_s: string
+ @rtype: bool
+ """
+ if cert_s is None:
+ certfile = self._get_client_cert_path()
+ with open(certfile) as cf:
+ cert_s = cf.read()
+ try:
+ # XXX get a real cert validation
+ # so far this is only checking begin/end
+ # delimiters :)
+ ssl.PEM_cert_to_DER_cert(cert_s)
+ except:
+ # XXX raise proper exception
+ raise
+ return True
+
+ def _get_client_cert_uri(self):
+ return "https://%s/cert/get" % (baseconstants.DEFAULT_TEST_PROVIDER)
+
+ def _get_client_cert_path(self):
+ # MVS+ : get provider path
+ return eipspecs.client_cert_path()
+
+ def is_cert_still_valid(self):
+ raise NotImplementedError
+
+ def write_cert(self, pemfile_content, to=None):
+ with open(to, 'w') as cert_f:
+ cert_f.write(pemfile_content)
class EIPConfigChecker(object):
"""
- Several tests needed
+ Several checks needed
to ensure a EIPConnection
can be sucessfully established.
use run_all to run all checks.