summaryrefslogtreecommitdiff
path: root/src/leap/common/certs.py
blob: 37ede8eb3c2524863393070cb321a95a0c6551e2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# -*- coding: utf-8 -*-
# certs.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Implements cert checks and helpers
"""

import os
import time
import logging

from OpenSSL import crypto
from dateutil.parser import parse as dateparse

from leap.common.check import leap_assert

logger = logging.getLogger(__name__)


def get_cert_from_string(string):
    """
    Returns the x509 from the contents of this string

    :param string: certificate contents as downloaded
    :type string: str

    :return: x509 or None
    """
    leap_assert(string, "We need something to load")

    x509 = None
    try:
        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, string)
    except Exception as e:
        logger.error("Something went wrong while loading the certificate: %r"
                     % (e,))
    return x509


def get_privatekey_from_string(string):
    """
    Returns the private key from the contents of this string

    :param string: private key contents as downloaded
    :type string: str

    :return: private key or None
    """
    leap_assert(string, "We need something to load")

    pkey = None
    try:
        pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
    except Exception as e:
        logger.error("Something went wrong while loading the certificate: %r"
                     % (e,))
    return pkey


def get_digest(cert_data, method):
    """
    Returns the digest for the cert_data using the method specified

    :param cert_data: certificate data in string form
    :type cert_data: str
    :param method: method to be used for digest
    :type method: str

    :rtype: str
    """
    x509 = get_cert_from_string(cert_data)
    digest = x509.digest(method).replace(":", "").lower()

    return digest


def can_load_cert_and_pkey(string):
    """
    Loads certificate and private key from a buffer, returns True if
    everything went well, False otherwise

    :param string: buffer containing the cert and private key
    :type string: str or any kind of buffer

    :rtype: bool
    """
    can_load = True

    try:
        cert = get_cert_from_string(string)
        key = get_privatekey_from_string(string)

        leap_assert(cert, 'The certificate could not be loaded')
        leap_assert(key, 'The private key could not be loaded')
    except Exception as e:
        can_load = False
        logger.error("Something went wrong while trying to load "
                     "the certificate: %r" % (e,))

    return can_load


def is_valid_pemfile(cert):
    """
    Checks that the passed string is a valid pem certificate

    :param cert: String containing pem content
    :type cert: str

    :rtype: bool
    """
    leap_assert(cert, "We need a cert to load")

    return can_load_cert_and_pkey(cert)


def get_cert_time_boundaries(certdata):
    """
    Return the time boundaries for the given certificate.
    The returned values are UTC/GMT time.struct_time objects

    :param certdata: the certificate contents
    :type certdata: str

    :rtype: tuple (from, to)
    """
    cert = get_cert_from_string(certdata)
    leap_assert(cert, 'There was a problem loading the certificate')

    fromts, tots = (cert.get_notBefore(), cert.get_notAfter())
    from_ = dateparse(fromts).timetuple()
    to_ = dateparse(tots).timetuple()

    return from_, to_


def should_redownload(certfile, now=time.gmtime):
    """
    Returns True if any of the checks don't pass, False otherwise

    :param certfile: path to certificate
    :type certfile: str
    :param now: current date function, ONLY USED FOR TESTING

    :rtype: bool
    """
    exists = os.path.isfile(certfile)

    if not exists:
        return True

    certdata = None
    try:
        with open(certfile, "r") as f:
            certdata = f.read()
            if not is_valid_pemfile(certdata):
                return True
    except:
        return True

    valid_from, valid_to = get_cert_time_boundaries(certdata)

    if not (valid_from < now() < valid_to):
        return True

    return False


def get_compatible_ssl_context_factory(cert_path=None):
    import twisted
    cert = None
    if twisted.version.base() > '14.0.1':
        from twisted.web.client import BrowserLikePolicyForHTTPS
        from twisted.internet import ssl
        if cert_path:
            cert = ssl.Certificate.loadPEM(open(cert_path).read())
        policy = BrowserLikePolicyForHTTPS(cert)
        return policy
    else:
        raise Exception(("""
            Twisted 14.0.2 is needed in order to have secure
            Client Web SSL Contexts, not %s
            See: http://twistedmatrix.com/trac/ticket/7647
            """) % (twisted.version.base()))