summaryrefslogtreecommitdiff
path: root/service/pixelated/bitmask_libraries/certs.py
diff options
context:
space:
mode:
Diffstat (limited to 'service/pixelated/bitmask_libraries/certs.py')
-rw-r--r--service/pixelated/bitmask_libraries/certs.py104
1 files changed, 19 insertions, 85 deletions
diff --git a/service/pixelated/bitmask_libraries/certs.py b/service/pixelated/bitmask_libraries/certs.py
index a321e00e..9d543672 100644
--- a/service/pixelated/bitmask_libraries/certs.py
+++ b/service/pixelated/bitmask_libraries/certs.py
@@ -14,108 +14,42 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import os
-import requests
-import json
-from leap.common import ca_bundle
-from .config import AUTO_DETECT_CA_BUNDLE
-LEAP_CERT = None
-LEAP_FINGERPRINT = None
-PACKAGED_CERTS_HOME = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "..", "certificates"))
-
-
-def which_api_CA_bundle(provider):
- return str(LeapCertificate(provider).api_ca_bundle())
-
-
-def which_bootstrap_cert_fingerprint():
- return LEAP_FINGERPRINT
-
-
-def which_bootstrap_CA_bundle(provider):
- if LEAP_CERT is not None:
- return LEAP_CERT
- return str(LeapCertificate(provider).auto_detect_bootstrap_ca_bundle())
-
-
-def refresh_ca_bundle(provider):
- LeapCertificate(provider).refresh_ca_bundle()
+class LeapCertificate(object):
+ LEAP_CERT = None
+ LEAP_FINGERPRINT = None
-class LeapCertificate(object):
def __init__(self, provider):
self._config = provider.config
self._server_name = provider.server_name
self._provider = provider
- def auto_detect_bootstrap_ca_bundle(self):
- if self._config.bootstrap_ca_cert_bundle == AUTO_DETECT_CA_BUNDLE:
- local_cert = self._local_bootstrap_server_cert()
- if local_cert:
- return local_cert
- else:
- return ca_bundle.where()
+ @staticmethod
+ def set_cert_and_fingerprint(cert_file=None, cert_fingerprint=None):
+ if cert_fingerprint is None:
+ LeapCertificate.LEAP_CERT = str(cert_file) if cert_file else True
+ LeapCertificate.LEAP_FINGERPRINT = None
else:
- return self._config.bootstrap_ca_cert_bundle
-
- def api_ca_bundle(self):
- if self._provider.config.ca_cert_bundle:
- return self._provider.config.ca_cert_bundle
-
- cert_file = self._api_cert_file()
-
- if not os.path.isfile(cert_file):
- self._download_server_cert(cert_file)
+ LeapCertificate.LEAP_FINGERPRINT = cert_fingerprint
+ LeapCertificate.LEAP_CERT = False
- return cert_file
+ @property
+ def provider_web_cert(self):
+ return self.LEAP_CERT
- def refresh_ca_bundle(self):
- cert_file = self._api_cert_file()
- self._download_server_cert(cert_file)
+ @property
+ def provider_api_cert(self):
+ return str(os.path.join(self._provider.config.leap_home, 'providers', self._server_name, 'keys', 'client', 'api.pem'))
- def _api_cert_file(self):
- certs_root = self._api_certs_root_path()
- return os.path.join(certs_root, 'api.pem')
-
- def _api_certs_root_path(self):
+ def setup_ca_bundle(self):
path = os.path.join(self._provider.config.leap_home, 'providers', self._server_name, 'keys', 'client')
if not os.path.isdir(path):
os.makedirs(path, 0700)
- return path
-
- def _local_bootstrap_server_cert(self):
- cert_file = self._bootstrap_certs_cert_file()
- if os.path.isfile(cert_file):
- return cert_file
-
- cert_file = os.path.join(PACKAGED_CERTS_HOME, '%s.ca.crt' % self._server_name)
- if os.path.exists(cert_file):
- return cert_file
-
- # else download the file
- cert_file = self._bootstrap_certs_cert_file()
- response = requests.get('https://%s/provider.json' % self._server_name)
- provider_data = json.loads(response.content)
- ca_cert_uri = str(provider_data['ca_cert_uri'])
-
- response = requests.get(ca_cert_uri)
- with open(cert_file, 'w') as file:
- file.write(response.content)
-
- return cert_file
-
- def _bootstrap_certs_cert_file(self):
- path = os.path.join(self._provider.config.leap_home, 'providers', self._server_name)
- if not os.path.isdir(path):
- os.makedirs(path, 0700)
+ self._download_cert(self.provider_api_cert)
- file_path = os.path.join(path, '%s.ca.crt' % self._server_name)
-
- return file_path
-
- def _download_server_cert(self, cert_file_name):
+ def _download_cert(self, cert_file_name):
cert = self._provider.fetch_valid_certificate()
-
with open(cert_file_name, 'w') as file:
file.write(cert)