diff options
| author | Tomás Touceda <chiiph@leap.se> | 2013-03-13 11:33:42 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2013-03-13 11:33:42 -0300 | 
| commit | 240d6b7762f7cc8f4c6fd229e4538aa9aa2262a5 (patch) | |
| tree | 092380d6a067a4da799fcd81eab97fd018d99838 | |
| parent | 60bcc7b27aa934a0d62033e7152b87d5af638491 (diff) | |
Check validity for downloaded certs and re-download if needed
| -rw-r--r-- | src/leap/services/eip/eipbootstrapper.py | 6 | ||||
| -rw-r--r-- | src/leap/util/certs.py | 99 | 
2 files changed, 100 insertions, 5 deletions
| diff --git a/src/leap/services/eip/eipbootstrapper.py b/src/leap/services/eip/eipbootstrapper.py index ec3dfe7b..c83cb1b5 100644 --- a/src/leap/services/eip/eipbootstrapper.py +++ b/src/leap/services/eip/eipbootstrapper.py @@ -32,7 +32,7 @@ from leap.util.check import leap_assert, leap_assert_type  from leap.util.checkerthread import CheckerThread  from leap.util.files import check_and_fix_urw_only, get_mtime, mkdir_p  from leap.util.request_helpers import get_content -from leap.util.certs import is_valid_pemfile +from leap.util.certs import is_valid_pemfile, should_redownload  logger = logging.getLogger(__name__) @@ -154,6 +154,10 @@ class EIPBootstrapper(QtCore.QObject):              get_client_cert_path(self._provider_config,                                   about_to_download=True) +        # For re-download if something is wrong with the cert +        self._download_if_needed = self._download_if_needed and \ +            not should_redownload(client_cert_path) +          if self._download_if_needed and \                  os.path.exists(client_cert_path):              try: diff --git a/src/leap/util/certs.py b/src/leap/util/certs.py index d6065474..a8bcd65e 100644 --- a/src/leap/util/certs.py +++ b/src/leap/util/certs.py @@ -19,15 +19,58 @@  Implements cert checks and helpers  """ +import os +import time  import logging  from OpenSSL import crypto +from dateutil.parser import parse as dateparse  from leap.util.check import leap_assert  logger = logging.getLogger(__name__) +def get_cert_from_string(string): +    """ +    Returns the x509 from the contents of this string + +    @param string: certificate contents as downloaded +    @type string: str + +    @return: x509 or None +    """ +    leap_assert(string, "We need something to load") + +    x509 = None +    try: +        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, string) +    except Exception as e: +        logger.error("Something went wrong while loading the certificate: %r" +                     % (e,)) +    return x509 + + +def get_privatekey_from_string(string): +    """ +    Returns the private key from the contents of this string + +    @param string: private key contents as downloaded +    @type string: str + +    @return: private key or None +    """ +    leap_assert(string, "We need something to load") + +    pkey = None +    try: +        pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, string) +    except Exception as e: +        logger.error("Something went wrong while loading the certificate: %r" +                     % (e,)) +    return pkey + +  def get_digest(cert_data, method):      """      Returns the digest for the cert_data using the method specified @@ -39,7 +82,7 @@ def get_digest(cert_data, method):      @rtype: str      """ -    x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data) +    x509 = get_cert_from_string(cert_data)      digest = x509.digest(method).replace(":", "").lower()      return digest @@ -55,12 +98,11 @@ def can_load_cert_and_pkey(string):      @rtype: bool      """ -      can_load = True      try: -        cert = crypto.load_certificate(crypto.FILETYPE_PEM, string) -        key = crypto.load_privatekey(crypto.FILETYPE_PEM, string) +        cert = get_cert_from_string(string) +        key = get_privatekey_from_string(string)          leap_assert(cert, 'The certificate could not be loaded')          leap_assert(key, 'The private key could not be loaded') @@ -84,3 +126,52 @@ def is_valid_pemfile(cert):      leap_assert(cert, "We need a cert to load")      return can_load_cert_and_pkey(cert) + + +def get_cert_time_boundaries(certfile): +    """ +    Returns the time boundaries for the certificate saved in certfile + +    @param certfile: path to certificate +    @type certfile: str + +    @rtype: tuple (from, to) +    """ +    cert = get_cert_from_string(certfile) +    leap_assert(cert, 'There was a problem loading the certificate') + +    fromts, tots = (cert.get_notBefore(), cert.get_notAfter()) +    from_, to_ = map( +        lambda ts: time.gmtime(time.mktime(dateparse(ts).timetuple())), +        (fromts, tots)) +    return from_, to_ + + +def should_redownload(certfile, now=time.gmtime): +    """ +    Returns True if any of the checks don't pass, False otherwise + +    @param certfile: path to certificate +    @type certfile: str +    @param now: current date function, ONLY USED FOR TESTING + +    @rtype: bool +    """ +    exists = os.path.isfile(certfile) + +    if not exists: +        return True + +    try: +        with open(certfile, "r") as f: +            if not is_valid_pemfile(f.read()): +                return True +    except: +        return True + +    valid_from, valid_to = get_cert_time_boundaries(certfile) + +    if not (valid_from < now() < valid_to): +        return True + +    return False | 
