summaryrefslogtreecommitdiff
path: root/src/leap/crypto
diff options
context:
space:
mode:
authorkali <kali@leap.se>2013-02-15 09:31:51 +0900
committerkali <kali@leap.se>2013-02-15 09:31:51 +0900
commit9cea9c8a34343f8792d65b96f93ae22bd8685878 (patch)
tree9f512367b1d47ced5614702a00f3ff0a8fe746d7 /src/leap/crypto
parent7159734ec6c0b76fc7f3737134cd22fdaaaa7d58 (diff)
parent1032e07a50c8bb265ff9bd31b3bb00e83ddb451e (diff)
Merge branch 'release/v0.2.0'
Conflicts: README.txt
Diffstat (limited to 'src/leap/crypto')
-rw-r--r--src/leap/crypto/__init__.py0
-rw-r--r--src/leap/crypto/certs.py112
-rw-r--r--src/leap/crypto/certs_gnutls.py112
-rw-r--r--src/leap/crypto/leapkeyring.py70
-rw-r--r--src/leap/crypto/tests/__init__.py0
-rw-r--r--src/leap/crypto/tests/test_certs.py22
6 files changed, 316 insertions, 0 deletions
diff --git a/src/leap/crypto/__init__.py b/src/leap/crypto/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/crypto/__init__.py
diff --git a/src/leap/crypto/certs.py b/src/leap/crypto/certs.py
new file mode 100644
index 00000000..cbb5725a
--- /dev/null
+++ b/src/leap/crypto/certs.py
@@ -0,0 +1,112 @@
+import logging
+import os
+from StringIO import StringIO
+import ssl
+import time
+
+from dateutil.parser import parse
+from OpenSSL import crypto
+
+from leap.util.misc import null_check
+
+logger = logging.getLogger(__name__)
+
+
+class BadCertError(Exception):
+ """
+ raised for malformed certs
+ """
+
+
+class NoCertError(Exception):
+ """
+ raised for cert not found in given path
+ """
+
+
+def get_https_cert_from_domain(domain, port=443):
+ """
+ @param domain: a domain name to get a certificate from.
+ """
+ cert = ssl.get_server_certificate((domain, port))
+ x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
+ return x509
+
+
+def get_cert_from_file(_file):
+ null_check(_file, "pem file")
+ if isinstance(_file, (str, unicode)):
+ if not os.path.isfile(_file):
+ raise NoCertError
+ with open(_file) as f:
+ cert = f.read()
+ else:
+ cert = _file.read()
+ x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
+ return x509
+
+
+def get_pkey_from_file(_file):
+ getkey = lambda f: crypto.load_privatekey(
+ crypto.FILETYPE_PEM, f.read())
+
+ if isinstance(_file, str):
+ with open(_file) as f:
+ key = getkey(f)
+ else:
+ key = getkey(_file)
+ return key
+
+
+def can_load_cert_and_pkey(string):
+ """
+ loads certificate and private key from
+ a buffer
+ """
+ try:
+ f = StringIO(string)
+ cert = get_cert_from_file(f)
+
+ f = StringIO(string)
+ key = get_pkey_from_file(f)
+
+ null_check(cert, 'certificate')
+ null_check(key, 'private key')
+ except Exception as exc:
+ logger.error(type(exc), exc.message)
+ raise BadCertError
+ else:
+ return True
+
+
+def get_cert_fingerprint(domain=None, port=443, filepath=None,
+ hash_type="SHA256", sep=":"):
+ """
+ @param domain: a domain name to get a fingerprint from
+ @type domain: str
+ @param filepath: path to a file containing a PEM file
+ @type filepath: str
+ @param hash_type: the hash function to be used in the fingerprint.
+ must be one of SHA1, SHA224, SHA256, SHA384, SHA512
+ @type hash_type: str
+ @rparam: hex_fpr, a hexadecimal representation of a bytestring
+ containing the fingerprint.
+ @rtype: string
+ """
+ if domain:
+ cert = get_https_cert_from_domain(domain, port=port)
+ if filepath:
+ cert = get_cert_from_file(filepath)
+ hex_fpr = cert.digest(hash_type)
+ return hex_fpr
+
+
+def get_time_boundaries(certfile):
+ cert = get_cert_from_file(certfile)
+ null_check(cert, 'certificate')
+
+ fromts, tots = (cert.get_notBefore(), cert.get_notAfter())
+ from_, to_ = map(
+ lambda ts: time.gmtime(time.mktime(parse(ts).timetuple())),
+ (fromts, tots))
+ return from_, to_
diff --git a/src/leap/crypto/certs_gnutls.py b/src/leap/crypto/certs_gnutls.py
new file mode 100644
index 00000000..20c0e043
--- /dev/null
+++ b/src/leap/crypto/certs_gnutls.py
@@ -0,0 +1,112 @@
+'''
+We're using PyOpenSSL now
+
+import ctypes
+from StringIO import StringIO
+import socket
+
+import gnutls.connection
+import gnutls.crypto
+import gnutls.library
+
+from leap.util.misc import null_check
+
+
+class BadCertError(Exception):
+ """raised for malformed certs"""
+
+
+def get_https_cert_from_domain(domain):
+ """
+ @param domain: a domain name to get a certificate from.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ cred = gnutls.connection.X509Credentials()
+
+ session = gnutls.connection.ClientSession(sock, cred)
+ session.connect((domain, 443))
+ session.handshake()
+ cert = session.peer_certificate
+ return cert
+
+
+def get_cert_from_file(_file):
+ getcert = lambda f: gnutls.crypto.X509Certificate(f.read())
+ if isinstance(_file, str):
+ with open(_file) as f:
+ cert = getcert(f)
+ else:
+ cert = getcert(_file)
+ return cert
+
+
+def get_pkey_from_file(_file):
+ getkey = lambda f: gnutls.crypto.X509PrivateKey(f.read())
+ if isinstance(_file, str):
+ with open(_file) as f:
+ key = getkey(f)
+ else:
+ key = getkey(_file)
+ return key
+
+
+def can_load_cert_and_pkey(string):
+ try:
+ f = StringIO(string)
+ cert = get_cert_from_file(f)
+
+ f = StringIO(string)
+ key = get_pkey_from_file(f)
+
+ null_check(cert, 'certificate')
+ null_check(key, 'private key')
+ except:
+ # XXX catch GNUTLSError?
+ raise BadCertError
+ else:
+ return True
+
+def get_cert_fingerprint(domain=None, filepath=None,
+ hash_type="SHA256", sep=":"):
+ """
+ @param domain: a domain name to get a fingerprint from
+ @type domain: str
+ @param filepath: path to a file containing a PEM file
+ @type filepath: str
+ @param hash_type: the hash function to be used in the fingerprint.
+ must be one of SHA1, SHA224, SHA256, SHA384, SHA512
+ @type hash_type: str
+ @rparam: hex_fpr, a hexadecimal representation of a bytestring
+ containing the fingerprint.
+ @rtype: string
+ """
+ if domain:
+ cert = get_https_cert_from_domain(domain)
+ if filepath:
+ cert = get_cert_from_file(filepath)
+
+ _buffer = ctypes.create_string_buffer(64)
+ buffer_length = ctypes.c_size_t(64)
+
+ SUPPORTED_DIGEST_FUN = ("SHA1", "SHA224", "SHA256", "SHA384", "SHA512")
+ if hash_type in SUPPORTED_DIGEST_FUN:
+ digestfunction = getattr(
+ gnutls.library.constants,
+ "GNUTLS_DIG_%s" % hash_type)
+ else:
+ # XXX improperlyconfigured or something
+ raise Exception("digest function not supported")
+
+ gnutls.library.functions.gnutls_x509_crt_get_fingerprint(
+ cert._c_object, digestfunction,
+ ctypes.byref(_buffer), ctypes.byref(buffer_length))
+
+ # deinit
+ #server_cert._X509Certificate__deinit(server_cert._c_object)
+ # needed? is segfaulting
+
+ fpr = ctypes.string_at(_buffer, buffer_length.value)
+ hex_fpr = sep.join(u"%02X" % ord(char) for char in fpr)
+
+ return hex_fpr
+'''
diff --git a/src/leap/crypto/leapkeyring.py b/src/leap/crypto/leapkeyring.py
new file mode 100644
index 00000000..c241d0bc
--- /dev/null
+++ b/src/leap/crypto/leapkeyring.py
@@ -0,0 +1,70 @@
+import keyring
+
+from leap.base.config import get_config_file
+
+#############
+# Disclaimer
+#############
+# This currently is not a keyring, it's more like a joke.
+# No, seriously.
+# We're affected by this **bug**
+
+# https://bitbucket.org/kang/python-keyring-lib/
+# issue/65/dbusexception-method-opensession-with
+
+# so using the gnome keyring does not seem feasible right now.
+# I thought this was the next best option to store secrets in plain sight.
+
+# in the future we should move to use the gnome/kde/macosx/win keyrings.
+
+
+class LeapCryptedFileKeyring(keyring.backend.CryptedFileKeyring):
+
+ filename = ".secrets"
+
+ @property
+ def file_path(self):
+ return get_config_file(self.filename)
+
+ def __init__(self, seed=None):
+ self.seed = seed
+
+ def _get_new_password(self):
+ # XXX every time this method is called,
+ # $deity kills a kitten.
+ return "secret%s" % self.seed
+
+ def _init_file(self):
+ self.keyring_key = self._get_new_password()
+ self.set_password('keyring_setting', 'pass_ref', 'pass_ref_value')
+
+ def _unlock(self):
+ self.keyring_key = self._get_new_password()
+ print 'keyring key ', self.keyring_key
+ try:
+ ref_pw = self.get_password(
+ 'keyring_setting',
+ 'pass_ref')
+ print 'ref pw ', ref_pw
+ assert ref_pw == "pass_ref_value"
+ except AssertionError:
+ self._lock()
+ raise ValueError('Incorrect password')
+
+
+def leap_set_password(key, value, seed="xxx"):
+ key, value = map(unicode, (key, value))
+ keyring.set_keyring(LeapCryptedFileKeyring(seed=seed))
+ keyring.set_password('leap', key, value)
+
+
+def leap_get_password(key, seed="xxx"):
+ keyring.set_keyring(LeapCryptedFileKeyring(seed=seed))
+ #import ipdb;ipdb.set_trace()
+ return keyring.get_password('leap', key)
+
+
+if __name__ == "__main__":
+ leap_set_password('test', 'bar')
+ passwd = leap_get_password('test')
+ assert passwd == 'bar'
diff --git a/src/leap/crypto/tests/__init__.py b/src/leap/crypto/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/crypto/tests/__init__.py
diff --git a/src/leap/crypto/tests/test_certs.py b/src/leap/crypto/tests/test_certs.py
new file mode 100644
index 00000000..e476b630
--- /dev/null
+++ b/src/leap/crypto/tests/test_certs.py
@@ -0,0 +1,22 @@
+import unittest
+
+from leap.testing.https_server import where
+from leap.crypto import certs
+
+
+class CertTestCase(unittest.TestCase):
+
+ def test_can_load_client_and_pkey(self):
+ with open(where('leaptestscert.pem')) as cf:
+ cs = cf.read()
+ with open(where('leaptestskey.pem')) as kf:
+ ks = kf.read()
+ certs.can_load_cert_and_pkey(cs + ks)
+
+ with self.assertRaises(certs.BadCertError):
+ # screw header
+ certs.can_load_cert_and_pkey(cs.replace("BEGIN", "BEGINN") + ks)
+
+
+if __name__ == "__main__":
+ unittest.main()