1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
#!/usr/bin/env python
"""Asynchronous server using Twisted with GNUTLS"""
import sys
import os
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.error import CannotListenError, ConnectionDone
from twisted.internet import reactor
from gnutls.constants import *
from gnutls.crypto import *
from gnutls.errors import *
from gnutls.interfaces.twisted import X509Credentials
class EchoProtocol(LineOnlyReceiver):
def connectionMade(self):
session = self.transport.socket
try:
peer_name = session.peer_certificate.subject
except AttributeError:
peer_name = 'Unknown'
print '\nNew connection from:', peer_name
print 'Protocol: ', session.protocol
print 'KX algorithm: ', session.kx_algorithm
print 'Cipher: ', session.cipher
print 'MAC algorithm:', session.mac_algorithm
print 'Compression: ', session.compression
def lineReceived(self, line):
if line == 'quit':
self.transport.loseConnection()
return
self.sendLine(line)
def connectionLost(self, reason):
if reason.type != ConnectionDone:
print "Connection was lost:", str(reason.value)
class EchoFactory(Factory):
protocol = EchoProtocol
script_path = os.path.realpath(os.path.dirname(sys.argv[0]))
certs_path = os.path.join(script_path, 'certs')
cert = X509Certificate(open(certs_path + '/valid.crt').read())
key = X509PrivateKey(open(certs_path + '/valid.key').read())
ca = X509Certificate(open(certs_path + '/ca.pem').read())
crl = X509CRL(open(certs_path + '/crl.pem').read())
cred = X509Credentials(cert, key, [ca], [crl])
cred.verify_peer = True
cred.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL)
reactor.listenTLS(10000, EchoFactory(), cred)
reactor.run()
|