summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-03-13 11:33:42 -0300
committerTomás Touceda <chiiph@leap.se>2013-03-13 11:33:42 -0300
commit240d6b7762f7cc8f4c6fd229e4538aa9aa2262a5 (patch)
tree092380d6a067a4da799fcd81eab97fd018d99838
parent60bcc7b27aa934a0d62033e7152b87d5af638491 (diff)
Check validity for downloaded certs and re-download if needed
-rw-r--r--src/leap/services/eip/eipbootstrapper.py6
-rw-r--r--src/leap/util/certs.py99
2 files changed, 100 insertions, 5 deletions
diff --git a/src/leap/services/eip/eipbootstrapper.py b/src/leap/services/eip/eipbootstrapper.py
index ec3dfe7b..c83cb1b5 100644
--- a/src/leap/services/eip/eipbootstrapper.py
+++ b/src/leap/services/eip/eipbootstrapper.py
@@ -32,7 +32,7 @@ from leap.util.check import leap_assert, leap_assert_type
from leap.util.checkerthread import CheckerThread
from leap.util.files import check_and_fix_urw_only, get_mtime, mkdir_p
from leap.util.request_helpers import get_content
-from leap.util.certs import is_valid_pemfile
+from leap.util.certs import is_valid_pemfile, should_redownload
logger = logging.getLogger(__name__)
@@ -154,6 +154,10 @@ class EIPBootstrapper(QtCore.QObject):
get_client_cert_path(self._provider_config,
about_to_download=True)
+ # For re-download if something is wrong with the cert
+ self._download_if_needed = self._download_if_needed and \
+ not should_redownload(client_cert_path)
+
if self._download_if_needed and \
os.path.exists(client_cert_path):
try:
diff --git a/src/leap/util/certs.py b/src/leap/util/certs.py
index d6065474..a8bcd65e 100644
--- a/src/leap/util/certs.py
+++ b/src/leap/util/certs.py
@@ -19,15 +19,58 @@
Implements cert checks and helpers
"""
+import os
+import time
import logging
from OpenSSL import crypto
+from dateutil.parser import parse as dateparse
from leap.util.check import leap_assert
logger = logging.getLogger(__name__)
+def get_cert_from_string(string):
+ """
+ Returns the x509 from the contents of this string
+
+ @param string: certificate contents as downloaded
+ @type string: str
+
+ @return: x509 or None
+ """
+ leap_assert(string, "We need something to load")
+
+ x509 = None
+ try:
+ x509 = crypto.load_certificate(crypto.FILETYPE_PEM, string)
+ except Exception as e:
+ logger.error("Something went wrong while loading the certificate: %r"
+ % (e,))
+ return x509
+
+
+def get_privatekey_from_string(string):
+ """
+ Returns the private key from the contents of this string
+
+ @param string: private key contents as downloaded
+ @type string: str
+
+ @return: private key or None
+ """
+ leap_assert(string, "We need something to load")
+
+ pkey = None
+ try:
+ pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
+ except Exception as e:
+ logger.error("Something went wrong while loading the certificate: %r"
+ % (e,))
+ return pkey
+
+
def get_digest(cert_data, method):
"""
Returns the digest for the cert_data using the method specified
@@ -39,7 +82,7 @@ def get_digest(cert_data, method):
@rtype: str
"""
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
+ x509 = get_cert_from_string(cert_data)
digest = x509.digest(method).replace(":", "").lower()
return digest
@@ -55,12 +98,11 @@ def can_load_cert_and_pkey(string):
@rtype: bool
"""
-
can_load = True
try:
- cert = crypto.load_certificate(crypto.FILETYPE_PEM, string)
- key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
+ cert = get_cert_from_string(string)
+ key = get_privatekey_from_string(string)
leap_assert(cert, 'The certificate could not be loaded')
leap_assert(key, 'The private key could not be loaded')
@@ -84,3 +126,52 @@ def is_valid_pemfile(cert):
leap_assert(cert, "We need a cert to load")
return can_load_cert_and_pkey(cert)
+
+
+def get_cert_time_boundaries(certfile):
+ """
+ Returns the time boundaries for the certificate saved in certfile
+
+ @param certfile: path to certificate
+ @type certfile: str
+
+ @rtype: tuple (from, to)
+ """
+ cert = get_cert_from_string(certfile)
+ leap_assert(cert, 'There was a problem loading the certificate')
+
+ fromts, tots = (cert.get_notBefore(), cert.get_notAfter())
+ from_, to_ = map(
+ lambda ts: time.gmtime(time.mktime(dateparse(ts).timetuple())),
+ (fromts, tots))
+ return from_, to_
+
+
+def should_redownload(certfile, now=time.gmtime):
+ """
+ Returns True if any of the checks don't pass, False otherwise
+
+ @param certfile: path to certificate
+ @type certfile: str
+ @param now: current date function, ONLY USED FOR TESTING
+
+ @rtype: bool
+ """
+ exists = os.path.isfile(certfile)
+
+ if not exists:
+ return True
+
+ try:
+ with open(certfile, "r") as f:
+ if not is_valid_pemfile(f.read()):
+ return True
+ except:
+ return True
+
+ valid_from, valid_to = get_cert_time_boundaries(certfile)
+
+ if not (valid_from < now() < valid_to):
+ return True
+
+ return False