From 7868a869fda3e28230c19abc492057cc91b84902 Mon Sep 17 00:00:00 2001 From: k clair Date: Tue, 9 Oct 2012 12:24:40 -0700 Subject: remove debian packaging files from master branch. master should only have the source files and the README --- .../lib.linux-x86_64-2.5/gnutls/connection.py | 500 --------------------- 1 file changed, 500 deletions(-) delete mode 100644 gnutls-1.1.9/build/lib.linux-x86_64-2.5/gnutls/connection.py (limited to 'gnutls-1.1.9/build/lib.linux-x86_64-2.5/gnutls/connection.py') diff --git a/gnutls-1.1.9/build/lib.linux-x86_64-2.5/gnutls/connection.py b/gnutls-1.1.9/build/lib.linux-x86_64-2.5/gnutls/connection.py deleted file mode 100644 index 5990bd4..0000000 --- a/gnutls-1.1.9/build/lib.linux-x86_64-2.5/gnutls/connection.py +++ /dev/null @@ -1,500 +0,0 @@ -# Copyright (C) 2007-2008 AG Projects. See LICENSE for details. -# - -"""GNUTLS connection support""" - -__all__ = ['X509Credentials', 'ClientSession', 'ServerSession', 'ServerSessionFactory'] - -from time import time -from socket import SHUT_RDWR as SOCKET_SHUT_RDWR - -from _ctypes import PyObj_FromPtr -from ctypes import * - -from gnutls.validators import * -from gnutls.constants import * -from gnutls.crypto import * -from gnutls.errors import * - -from gnutls.library.constants import GNUTLS_SERVER, GNUTLS_CLIENT, GNUTLS_CRT_X509 -from gnutls.library.constants import GNUTLS_CERT_INVALID, GNUTLS_CERT_REVOKED, GNUTLS_CERT_INSECURE_ALGORITHM -from gnutls.library.constants import GNUTLS_CERT_SIGNER_NOT_FOUND, GNUTLS_CERT_SIGNER_NOT_CA -from gnutls.library.constants import GNUTLS_AL_FATAL, GNUTLS_A_BAD_CERTIFICATE -from gnutls.library.constants import GNUTLS_A_UNKNOWN_CA, GNUTLS_A_INSUFFICIENT_SECURITY -from gnutls.library.constants import GNUTLS_A_CERTIFICATE_EXPIRED, GNUTLS_A_CERTIFICATE_REVOKED -from gnutls.library.constants import GNUTLS_NAME_DNS -from gnutls.library.types import gnutls_certificate_credentials_t, gnutls_session_t, gnutls_x509_crt_t -from gnutls.library.types import gnutls_certificate_server_retrieve_function -from gnutls.library.functions import * - - -@gnutls_certificate_server_retrieve_function -def _retrieve_server_certificate(c_session, retr_st): - session = PyObj_FromPtr(gnutls_session_get_ptr(c_session)) - identity = session.credentials.select_server_identity(session) - retr_st.contents.type = GNUTLS_CRT_X509 - retr_st.contents.deinit_all = 0 - if identity is None: - retr_st.contents.ncerts = 0 - else: - retr_st.contents.ncerts = 1 - retr_st.contents.cert.x509.contents = identity.cert._c_object - retr_st.contents.key.x509 = identity.key._c_object - return 0 - - -class _ServerNameIdentities(dict): - """Used internally by X509Credentials to map server names to X509 identities for the server name extension""" - def __init__(self, identities): - dict.__init__(self) - for identity in identities: - self.add(identity) - def add(self, identity): - for name in identity.cert.alternative_names.dns: - self[name.lower()] = identity - for ip in identity.cert.alternative_names.ip: - self[ip] = identity - subject = identity.cert.subject - if subject.CN is not None: - self[subject.CN.lower()] = identity - def get(self, server_name, default=None): - server_name = server_name.lower() - if server_name in self: - return self[server_name] - for name in (n for n in self if n.startswith('*.')): - suffix = name[1:] - if server_name.endswith(suffix) and '.' not in server_name[:-len(suffix)]: - return self[name] - return default - - -class X509Credentials(object): - DH_BITS = 1024 - RSA_BITS = 1024 - - dh_params = None - rsa_params = None - - def __new__(cls, *args, **kwargs): - c_object = gnutls_certificate_credentials_t() - gnutls_certificate_allocate_credentials(byref(c_object)) - instance = object.__new__(cls) - instance.__deinit = gnutls_certificate_free_credentials - instance._c_object = c_object - return instance - - @method_args((X509Certificate, none), (X509PrivateKey, none), list_of(X509Certificate), list_of(X509CRL), list_of(X509Identity)) - def __init__(self, cert=None, key=None, trusted=[], crl_list=[], identities=[]): - """Credentials contain a X509 certificate, a private key, a list of trusted CAs and a list of CRLs (all optional). - An optional list of additional X509 identities can be specified for applications that need more that one identity""" - if cert and key: - gnutls_certificate_set_x509_key(self._c_object, byref(cert._c_object), 1, key._c_object) - elif (cert, key) != (None, None): - raise ValueError("Specify neither or both the certificate and private key") - gnutls_certificate_server_set_retrieve_function(self._c_object, _retrieve_server_certificate) - self._max_depth = 5 - self._max_bits = 8200 - self._type = CRED_CERTIFICATE - self._cert = cert - self._key = key - self._identities = tuple(identities) - self._trusted = () - self.add_trusted(trusted) - self.crl_list = crl_list - self.server_name_identities = _ServerNameIdentities(identities) - if cert and key: - self.server_name_identities.add(X509Identity(cert, key)) - self.session_params = SessionParams(self._type) - - def __del__(self): - self.__deinit(self._c_object) - - # Methods to alter the credentials at runtime - - @method_args(list_of(X509Certificate)) - def add_trusted(self, trusted): - size = len(trusted) - if size > 0: - ca_list = (gnutls_x509_crt_t * size)(*[cert._c_object for cert in trusted]) - gnutls_certificate_set_x509_trust(self._c_object, cast(byref(ca_list), POINTER(gnutls_x509_crt_t)), size) - self._trusted = self._trusted + tuple(trusted) - - def generate_dh_params(self, bits=DH_BITS): - reference = self.dh_params ## keep a reference to preserve it until replaced - X509Credentials.dh_params = DHParams(bits) - del reference - - def generate_rsa_params(self, bits=RSA_BITS): - reference = self.rsa_params ## keep a reference to preserve it until replaced - X509Credentials.rsa_params = RSAParams(bits) - del reference - - # Properties - - @property - def cert(self): - return self._cert - - @property - def key(self): - return self._key - - @property - def identities(self): - return self._identities - - @property - def trusted(self): - return self._trusted - - def _get_crl_list(self): - return self._crl_list - @method_args(list_of(X509CRL)) - def _set_crl_list(self, crl_list): - self._crl_list = tuple(crl_list) - crl_list = property(_get_crl_list, _set_crl_list) - del _get_crl_list, _set_crl_list - - def _get_max_verify_length(self): - return self._max_depth - @method_args(int) - def _set_max_verify_length(self, max_depth): - gnutls_certificate_set_verify_limits(self._c_object, self._max_bits, max_depth) - self._max_depth = max_depth - max_verify_length = property(_get_max_verify_length, _set_max_verify_length) - del _get_max_verify_length, _set_max_verify_length - - def _get_max_verify_bits(self): - return self._max_bits - @method_args(int) - def _set_max_verify_bits(self, max_bits): - gnutls_certificate_set_verify_limits(self._c_object, max_bits, self._max_depth) - self._max_bits = max_bits - max_verify_bits = property(_get_max_verify_bits, _set_max_verify_bits) - del _get_max_verify_bits, _set_max_verify_bits - - # Methods to select and validate certificates - - def check_certificate(self, cert, cert_name='certificate'): - """Verify activation, expiration and revocation for the given certificate""" - now = time() - if cert.activation_time > now: - raise CertificateExpiredError("%s is not yet activated" % cert_name) - if cert.expiration_time < now: - raise CertificateExpiredError("%s has expired" % cert_name) - for crl in self.crl_list: - crl.check_revocation(cert, cert_name=cert_name) - - def select_server_identity(self, session): - """Select which identity the server will use for a given session. The default selection algorithm uses - the server name extension. A subclass can overwrite it if a different selection algorithm is desired.""" - server_name = session.server_name - if server_name is not None: - return self.server_name_identities.get(server_name) - elif self.cert and self.key: - return self ## since we have the cert and key attributes we can behave like a X509Identity - else: - return None - - -class SessionParams(object): - _default_kx_algorithms = { - CRED_CERTIFICATE: (KX_RSA, KX_DHE_DSS, KX_DHE_RSA), - CRED_ANON: (KX_ANON_DH,)} - _all_kx_algorithms = { - CRED_CERTIFICATE: set((KX_RSA, KX_DHE_DSS, KX_DHE_RSA, KX_RSA_EXPORT)), - CRED_ANON: set((KX_ANON_DH,))} - - def __new__(cls, credentials_type): - if credentials_type not in cls._default_kx_algorithms: - raise TypeError("Unknown credentials type: %r" % credentials_type) - return object.__new__(cls) - - def __init__(self, credentials_type): - self._credentials_type = credentials_type - self._protocols = (PROTO_TLS1_1, PROTO_TLS1_0, PROTO_SSL3) - self._kx_algorithms = self._default_kx_algorithms[credentials_type] - self._ciphers = (CIPHER_AES_128_CBC, CIPHER_3DES_CBC, CIPHER_ARCFOUR_128) - self._mac_algorithms = (MAC_SHA1, MAC_MD5, MAC_RMD160) - self._compressions = (COMP_NULL,) - - def _get_protocols(self): - return self._protocols - def _set_protocols(self, protocols): - self._protocols = ProtocolListValidator(protocols) - protocols = property(_get_protocols, _set_protocols) - del _get_protocols, _set_protocols - - def _get_kx_algorithms(self): - return self._kx_algorithms - def _set_kx_algorithms(self, algorithms): - cred_type = self._credentials_type - algorithms = KeyExchangeListValidator(algorithms) - invalid = set(algorithms) - self._all_kx_algorithms[cred_type] - if invalid: - raise ValueError("Cannot specify %r with %r credentials" % (list(invalid), cred_type)) - self._kx_algorithms = algorithms - kx_algorithms = property(_get_kx_algorithms, _set_kx_algorithms) - del _get_kx_algorithms, _set_kx_algorithms - - def _get_ciphers(self): - return self._ciphers - def _set_ciphers(self, ciphers): - self._ciphers = CipherListValidator(ciphers) - ciphers = property(_get_ciphers, _set_ciphers) - del _get_ciphers, _set_ciphers - - def _get_mac_algorithms(self): - return self._mac_algorithms - def _set_mac_algorithms(self, algorithms): - self._mac_algorithms = MACListValidator(algorithms) - mac_algorithms = property(_get_mac_algorithms, _set_mac_algorithms) - del _get_mac_algorithms, _set_mac_algorithms - - def _get_compressions(self): - return self._compressions - def _set_compressions(self, compressions): - self._compressions = CompressionListValidator(compressions) - compressions = property(_get_compressions, _set_compressions) - del _get_compressions, _set_compressions - - -class Session(object): - """Abstract class representing a TLS session created from a TCP socket - and a Credentials object.""" - - session_type = None ## placeholder for GNUTLS_SERVER or GNUTLS_CLIENT as defined by subclass - - def __new__(cls, *args, **kwargs): - if cls is Session: - raise RuntimeError("Session cannot be instantiated directly") - instance = object.__new__(cls) - instance.__deinit = gnutls_deinit - instance._c_object = gnutls_session_t() - return instance - - def __init__(self, socket, credentials): - gnutls_init(byref(self._c_object), self.session_type) - ## Store a pointer to self on the C session - gnutls_session_set_ptr(self._c_object, id(self)) - # gnutls_dh_set_prime_bits(session, DH_BITS)? - gnutls_transport_set_ptr(self._c_object, socket.fileno()) - gnutls_handshake_set_private_extensions(self._c_object, 1) - self.socket = socket - self.credentials = credentials - self._update_params() - - def __del__(self): - self.__deinit(self._c_object) - - def __getattr__(self, name): - ## Generic wrapper for the underlying socket methods and attributes. - return getattr(self.socket, name) - - # Session properties - - def _get_credentials(self): - return self._credentials - @method_args(X509Credentials) - def _set_credentials(self, credentials): - ## Release all credentials, otherwise gnutls will only release an existing credential of - ## the same type as the one being set and we can end up with multiple credentials in C. - gnutls_credentials_clear(self._c_object) - gnutls_credentials_set(self._c_object, credentials._type, cast(credentials._c_object, c_void_p)) - self._credentials = credentials - credentials = property(_get_credentials, _set_credentials) - del _get_credentials, _set_credentials - - @property - def protocol(self): - return gnutls_protocol_get_name(gnutls_protocol_get_version(self._c_object)) - - @property - def kx_algorithm(self): - return gnutls_kx_get_name(gnutls_kx_get(self._c_object)) - - @property - def cipher(self): - return gnutls_cipher_get_name(gnutls_cipher_get(self._c_object)) - - @property - def mac_algorithm(self): - return gnutls_mac_get_name(gnutls_mac_get(self._c_object)) - - @property - def compression(self): - return gnutls_compression_get_name(gnutls_compression_get(self._c_object)) - - @property - def peer_certificate(self): - if gnutls_certificate_type_get(self._c_object) != GNUTLS_CRT_X509: - return None - list_size = c_uint() - cert_list = gnutls_certificate_get_peers(self._c_object, byref(list_size)) - if list_size.value == 0: - return None - cert = cert_list[0] - return X509Certificate(string_at(cert.data, cert.size), X509_FMT_DER) - - # Status checking after an operation was interrupted (these properties are - # only useful to check after an operation was interrupted, otherwise their - # value is meaningless). - - @property - def interrupted_while_writing(self): - """True if an operation was interrupted while writing""" - return gnutls_record_get_direction(self._c_object)==1 - - @property - def interrupted_while_reading(self): - """True if an operation was interrupted while reading""" - return gnutls_record_get_direction(self._c_object)==0 - - # Session methods - - def _update_params(self): - """Update the priorities of the session params using the credentials.""" - def c_priority_list(priorities): - size = len(priorities) + 1 - return (c_int * size)(*priorities) - session_params = self.credentials.session_params - # protocol order in the priority list is irrelevant (it always uses newer protocols first) - # the protocol list only specifies what protocols are to be enabled. - gnutls_protocol_set_priority(self._c_object, c_priority_list(session_params.protocols)) - gnutls_kx_set_priority(self._c_object, c_priority_list(session_params.kx_algorithms)) - gnutls_cipher_set_priority(self._c_object, c_priority_list(session_params.ciphers)) - gnutls_mac_set_priority(self._c_object, c_priority_list(session_params.mac_algorithms)) - gnutls_compression_set_priority(self._c_object, c_priority_list(session_params.compressions)) - - def handshake(self): - gnutls_handshake(self._c_object) - - #@method_args((basestring, buffer)) - def send(self, data): - data = str(data) - return gnutls_record_send(self._c_object, data, len(data)) - - def sendall(self, data): - size = len(data) - while size > 0: - sent = self.send(data[-size:]) - size -= sent - - def recv(self, limit): - data = create_string_buffer(limit) - size = gnutls_record_recv(self._c_object, data, limit) - return data[:size] - - def send_alert(self, exception): - alertdict = { - CertificateError: GNUTLS_A_BAD_CERTIFICATE, - CertificateAuthorityError: GNUTLS_A_UNKNOWN_CA, - CertificateSecurityError: GNUTLS_A_INSUFFICIENT_SECURITY, - CertificateExpiredError: GNUTLS_A_CERTIFICATE_EXPIRED, - CertificateRevokedError: GNUTLS_A_CERTIFICATE_REVOKED} - alert = alertdict.get(exception.__class__) - if alert: - gnutls_alert_send(self._c_object, GNUTLS_AL_FATAL, alert) - - @method_args(one_of(SHUT_RDWR, SHUT_WR)) - def bye(self, how=SHUT_RDWR): - gnutls_bye(self._c_object, how) - - def shutdown(self, how=SOCKET_SHUT_RDWR): - self.socket.shutdown(how) - - def close(self): - self.socket.close() - - def verify_peer(self): - status = c_uint() - gnutls_certificate_verify_peers2(self._c_object, byref(status)) - status = status.value - if status & GNUTLS_CERT_INVALID: - raise CertificateError("peer certificate is invalid") - elif status & GNUTLS_CERT_SIGNER_NOT_FOUND: - raise CertificateAuthorityError("peer certificate signer not found") - elif status & GNUTLS_CERT_SIGNER_NOT_CA: - raise CertificateAuthorityError("peer certificate signer is not a CA") - elif status & GNUTLS_CERT_INSECURE_ALGORITHM: - raise CertificateSecurityError("peer certificate uses an insecure algorithm") - elif status & GNUTLS_CERT_REVOKED: - raise CertificateRevokedError("peer certificate was revoked") - - -class ClientSession(Session): - session_type = GNUTLS_CLIENT - - def __init__(self, socket, credentials, server_name=None): - Session.__init__(self, socket, credentials) - self._server_name = None - if server_name is not None: - self.server_name = server_name - - def _get_server_name(self): - return self._server_name - @method_args(str) - def _set_server_name(self, server_name): - gnutls_server_name_set(self._c_object, GNUTLS_NAME_DNS, c_char_p(server_name), len(server_name)) - self._server_name = server_name - server_name = property(_get_server_name, _set_server_name) - del _get_server_name, _set_server_name - - -class ServerSession(Session): - session_type = GNUTLS_SERVER - - def __init__(self, socket, credentials): - Session.__init__(self, socket, credentials) - gnutls_certificate_server_set_request(self._c_object, CERT_REQUEST) - - @property - def server_name(self): - data_length = c_size_t(256) - data = create_string_buffer(data_length.value) - hostname_type = c_uint() - for i in xrange(2**16): - try: - gnutls_server_name_get(self._c_object, data, byref(data_length), byref(hostname_type), i) - except RequestedDataNotAvailable: - break - except MemoryError: - data_length.value += 1 ## one extra byte for the terminating 0 - data = create_string_buffer(data_length.value) - gnutls_server_name_get(self._c_object, data, byref(data_length), byref(hostname_type), i) - if hostname_type.value != GNUTLS_NAME_DNS: - continue - return data.value - return None - - -class ServerSessionFactory(object): - - def __init__(self, socket, credentials, session_class=ServerSession): - if not issubclass(session_class, ServerSession): - raise TypeError, "session_class must be a subclass of ServerSession" - self.socket = socket - self.credentials = credentials - self.session_class = session_class - - def __getattr__(self, name): - ## Generic wrapper for the underlying socket methods and attributes - return getattr(self.socket, name) - - def bind(self, address): - self.socket.bind(address) - - def listen(self, backlog): - self.socket.listen(backlog) - - def accept(self): - new_sock, address = self.socket.accept() - session = self.session_class(new_sock, self.credentials) - return (session, address) - - def shutdown(self, how=SOCKET_SHUT_RDWR): - self.socket.shutdown(how) - - def close(self): - self.socket.close() - -- cgit v1.2.3