[tests] adapt events tests to recent changes
[leap_pycommon.git] / src / leap / common / certs.py
1 # -*- coding: utf-8 -*-
2 # certs.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 Implements cert checks and helpers
20 """
21
22 import os
23 import time
24 import logging
25
26 from OpenSSL import crypto
27 from dateutil.parser import parse as dateparse
28
29 from leap.common.check import leap_assert
30
31 logger = logging.getLogger(__name__)
32
33 SKIP_SSL_CHECK = os.environ.get('SKIP_TWISTED_SSL_CHECK', False)
34
35
36 def get_cert_from_string(string):
37     """
38     Returns the x509 from the contents of this string
39
40     :param string: certificate contents as downloaded
41     :type string: str
42
43     :return: x509 or None
44     """
45     leap_assert(string, "We need something to load")
46
47     x509 = None
48     try:
49         x509 = crypto.load_certificate(crypto.FILETYPE_PEM, string)
50     except Exception as e:
51         logger.error("Something went wrong while loading the certificate: %r"
52                      % (e,))
53     return x509
54
55
56 def get_privatekey_from_string(string):
57     """
58     Returns the private key from the contents of this string
59
60     :param string: private key contents as downloaded
61     :type string: str
62
63     :return: private key or None
64     """
65     leap_assert(string, "We need something to load")
66
67     pkey = None
68     try:
69         pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
70     except Exception as e:
71         logger.error("Something went wrong while loading the certificate: %r"
72                      % (e,))
73     return pkey
74
75
76 def get_digest(cert_data, method):
77     """
78     Returns the digest for the cert_data using the method specified
79
80     :param cert_data: certificate data in string form
81     :type cert_data: str
82     :param method: method to be used for digest
83     :type method: str
84
85     :rtype: str
86     """
87     x509 = get_cert_from_string(cert_data)
88     digest = x509.digest(method).replace(":", "").lower()
89
90     return digest
91
92
93 def can_load_cert_and_pkey(string):
94     """
95     Loads certificate and private key from a buffer, returns True if
96     everything went well, False otherwise
97
98     :param string: buffer containing the cert and private key
99     :type string: str or any kind of buffer
100
101     :rtype: bool
102     """
103     can_load = True
104
105     try:
106         cert = get_cert_from_string(string)
107         key = get_privatekey_from_string(string)
108
109         leap_assert(cert, 'The certificate could not be loaded')
110         leap_assert(key, 'The private key could not be loaded')
111     except Exception as e:
112         can_load = False
113         logger.error("Something went wrong while trying to load "
114                      "the certificate: %r" % (e,))
115
116     return can_load
117
118
119 def is_valid_pemfile(cert):
120     """
121     Checks that the passed string is a valid pem certificate
122
123     :param cert: String containing pem content
124     :type cert: str
125
126     :rtype: bool
127     """
128     leap_assert(cert, "We need a cert to load")
129
130     return can_load_cert_and_pkey(cert)
131
132
133 def get_cert_time_boundaries(certdata):
134     """
135     Return the time boundaries for the given certificate.
136     The returned values are UTC/GMT time.struct_time objects
137
138     :param certdata: the certificate contents
139     :type certdata: str
140
141     :rtype: tuple (from, to)
142     """
143     cert = get_cert_from_string(certdata)
144     leap_assert(cert, 'There was a problem loading the certificate')
145
146     fromts, tots = (cert.get_notBefore(), cert.get_notAfter())
147     from_ = dateparse(fromts).timetuple()
148     to_ = dateparse(tots).timetuple()
149
150     return from_, to_
151
152
153 def should_redownload(certfile, now=time.gmtime):
154     """
155     Returns True if any of the checks don't pass, False otherwise
156
157     :param certfile: path to certificate
158     :type certfile: str
159     :param now: current date function, ONLY USED FOR TESTING
160
161     :rtype: bool
162     """
163     exists = os.path.isfile(certfile)
164
165     if not exists:
166         return True
167
168     certdata = None
169     try:
170         with open(certfile, "r") as f:
171             certdata = f.read()
172             if not is_valid_pemfile(certdata):
173                 return True
174     except:
175         return True
176
177     valid_from, valid_to = get_cert_time_boundaries(certdata)
178
179     if not (valid_from < now() < valid_to):
180         return True
181
182     return False
183
184
185 def get_compatible_ssl_context_factory(cert_path=None):
186     import twisted
187     from twisted.internet import ssl
188     cert = None
189
190     if SKIP_SSL_CHECK:
191         # This should be used *only* for testing purposes.
192
193         class WebClientContextFactory(ssl.ClientContextFactory):
194             """
195             A web context factory which ignores the hostname and port and does
196             no certificate verification.
197             """
198             def getContext(self, hostname, port):
199                 return ssl.ClientContextFactory.getContext(self)
200
201         contextFactory = WebClientContextFactory()
202         return contextFactory
203
204     if twisted.version.base() > '14.0.1':
205         from twisted.web.client import BrowserLikePolicyForHTTPS
206         if cert_path:
207             cert = ssl.Certificate.loadPEM(open(cert_path).read())
208         policy = BrowserLikePolicyForHTTPS(cert)
209         return policy
210     else:
211         raise Exception(("""
212             Twisted 14.0.2 is needed in order to have secure
213             Client Web SSL Contexts, not %s
214             See: http://twistedmatrix.com/trac/ticket/7647
215             """) % (twisted.version.base()))