summaryrefslogtreecommitdiff
path: root/quicwrapper
diff options
context:
space:
mode:
Diffstat (limited to 'quicwrapper')
-rw-r--r--quicwrapper/README.md4
-rw-r--r--quicwrapper/listener.go206
-rw-r--r--quicwrapper/quicwrapper.go186
3 files changed, 396 insertions, 0 deletions
diff --git a/quicwrapper/README.md b/quicwrapper/README.md
new file mode 100644
index 0000000..25d18e6
--- /dev/null
+++ b/quicwrapper/README.md
@@ -0,0 +1,4 @@
+Code in this package taken from the lantern project:
+https://github.com/getlantern/quicwrapper
+
+License: Apache-2.0
diff --git a/quicwrapper/listener.go b/quicwrapper/listener.go
new file mode 100644
index 0000000..de28d2a
--- /dev/null
+++ b/quicwrapper/listener.go
@@ -0,0 +1,206 @@
+package quicwrapper
+
+import (
+ "context"
+ "crypto/tls"
+ "net"
+ "sync"
+ "sync/atomic"
+
+ "github.com/apex/log"
+ "github.com/getlantern/ops"
+ quic "github.com/lucas-clemente/quic-go"
+)
+
+// ListenAddr creates a QUIC server listening on a given address.
+// The net.Conn instances returned by the net.Listener may be multiplexed connections.
+func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (net.Listener, error) {
+ tlsConf = defaultNextProtos(tlsConf, DefaultServerProtos)
+ ql, err := quic.ListenAddr(addr, tlsConf, config)
+ if err != nil {
+ return nil, err
+ }
+ return listen(ql, tlsConf, config)
+}
+
+// Listen creates a QUIC server listening on a given net.PacketConn
+// The net.Conn instances returned by the net.Listener may be multiplexed connections.
+// The caller is responsible for closing the net.PacketConn after the listener has been
+// closed.
+func Listen(pconn net.PacketConn, tlsConf *tls.Config, config *Config) (net.Listener, error) {
+ tlsConf = defaultNextProtos(tlsConf, DefaultServerProtos)
+ ql, err := quic.Listen(pconn, tlsConf, config)
+ if err != nil {
+ return nil, err
+ }
+ return listen(ql, tlsConf, config)
+}
+
+func listen(ql quic.Listener, tlsConf *tls.Config, config *Config) (net.Listener, error) {
+ l := &listener{
+ quicListener: ql,
+ config: config,
+ connections: make(chan net.Conn, 1000),
+ acceptError: make(chan error, 1),
+ closedSignal: make(chan struct{}),
+ }
+ // XXX wat is this?
+ ops.Go(l.listen)
+ //ops.Go(l.logStats)
+
+ return l, nil
+}
+
+var _ net.Listener = &listener{}
+
+// wraps quic.Listener to create a net.Listener
+type listener struct {
+ numConnections int64
+ numVirtualConnections int64
+ quicListener quic.Listener
+ config *Config
+ connections chan net.Conn
+ acceptError chan error
+ closedSignal chan struct{}
+ closeErr error
+ closeOnce sync.Once
+}
+
+// implements net.Listener.Accept
+func (l *listener) Accept() (net.Conn, error) {
+ select {
+ case conn, ok := <-l.connections:
+ if !ok {
+ return nil, ErrListenerClosed
+ }
+ return conn, nil
+ case err, ok := <-l.acceptError:
+ if !ok {
+ return nil, ErrListenerClosed
+ }
+ return nil, err
+ case <-l.closedSignal:
+ return nil, ErrListenerClosed
+ }
+}
+
+// implements net.Listener.Close
+// Shut down the QUIC listener.
+// this implicitly sends CONNECTION_CLOSE frames to peers
+// note: it is still the responsibility of the caller
+// to call Close() on any Conn returned from Accept()
+func (l *listener) Close() error {
+ l.closeOnce.Do(func() {
+ close(l.closedSignal)
+ l.closeErr = l.quicListener.Close()
+ })
+ return l.closeErr
+}
+
+func (l *listener) isClosed() bool {
+ select {
+ case <-l.closedSignal:
+ return true
+ default:
+ return false
+ }
+}
+
+// implements net.Listener.Addr
+func (l *listener) Addr() net.Addr {
+ return l.quicListener.Addr()
+}
+
+func (l *listener) listen() {
+ group := &sync.WaitGroup{}
+
+ defer func() {
+ l.Close()
+ close(l.acceptError)
+ // wait for writers to exit, drain connections
+ group.Wait()
+ close(l.connections)
+ for c := range l.connections {
+ c.Close()
+ }
+
+ log.Debugf("Listener finished with Connections: %d Virtual: %d", atomic.LoadInt64(&l.numConnections), atomic.LoadInt64(&l.numVirtualConnections))
+ }()
+
+ for {
+ session, err := l.quicListener.Accept(context.Background())
+ if err != nil {
+ if !l.isClosed() {
+ l.acceptError <- err
+ }
+ return
+ }
+ if l.isClosed() {
+ session.CloseWithError(0, "")
+ return
+ } else {
+ atomic.AddInt64(&l.numConnections, 1)
+ group.Add(1)
+ ops.Go(func() {
+ l.handleSession(session)
+ atomic.AddInt64(&l.numConnections, -1)
+ group.Done()
+ })
+ }
+ }
+}
+
+func (l *listener) handleSession(session quic.Connection) {
+
+ // keep a smoothed average of the bandwidth estimate
+ // for the session
+ // bw := NewEMABandwidthSampler(session)
+ // bw.Start()
+
+ // track active session connections
+ active := make(map[quic.StreamID]Conn)
+ var mx sync.Mutex
+
+ defer func() {
+ // bw.Stop()
+ session.CloseWithError(0, "")
+
+ // snapshot any non-closed connections, then nil out active list
+ // conns being closed will 'remove' themselves from the nil
+ // list, not the snapshot.
+ var snapshot map[quic.StreamID]Conn
+ mx.Lock()
+ snapshot = active
+ active = nil
+ mx.Unlock()
+
+ // immediately close any connections that are still active
+ for _, conn := range snapshot {
+ conn.Close()
+ }
+ }()
+
+ for {
+ stream, err := session.AcceptStream(context.Background())
+ if err != nil {
+ if isPeerGoingAway(err) {
+ // log.Tracef("Accepting stream: Peer going away (%v)", err)
+ return
+ } else {
+ // log.Errorf("Accepting stream: %v", err)
+ return
+ }
+ } else {
+ atomic.AddInt64(&l.numVirtualConnections, 1)
+ conn := newConn(stream, session, func(id quic.StreamID) {
+ atomic.AddInt64(&l.numVirtualConnections, -1)
+ // remove conn from active list
+ mx.Lock()
+ delete(active, id)
+ mx.Unlock()
+ })
+
+ l.connections <- conn
+ }
+ }
+}
diff --git a/quicwrapper/quicwrapper.go b/quicwrapper/quicwrapper.go
new file mode 100644
index 0000000..0a6473f
--- /dev/null
+++ b/quicwrapper/quicwrapper.go
@@ -0,0 +1,186 @@
+// Wraps quic structures in standard net interfaces and
+// improves context awareness.
+// Conn instances created by this package may be multiplexed
+package quicwrapper
+
+import (
+ "crypto/tls"
+ "crypto/x509"
+ "errors"
+ "io"
+ "net"
+ "strings"
+ "sync"
+ "time"
+
+ //"github.com/getlantern/golog"
+
+ quic "github.com/lucas-clemente/quic-go"
+)
+
+const (
+ // this is a very non-informative error string that quic-go
+ // gives back to indicate that something terminated with no explicit error
+ // e.g. this is returned when a session terminates "normally"
+ // (peer going away)
+ applicationNoError = "Application error 0x0"
+ closedConnError = "use of closed network connection"
+ noActivity = "recent network activity"
+
+ // This the value represents HTTP/3 protocol (over quic v1).
+ AlpnH3 = "h3"
+ // This the value represents HTTP/3 protocol (over quic draft 29).
+ AlpnH3_29 = "h3-29"
+)
+
+var (
+ //log = golog.LoggerFor("quicwrapper")
+ ErrListenerClosed = errors.New("listener closed")
+
+ // client asks for this unless explicitly specified in tls.Config
+ DefaultClientProtos = []string{AlpnH3}
+ // server accepts these unless explicitly specified in tls.Config
+ DefaultServerProtos = []string{AlpnH3, AlpnH3_29}
+)
+
+type Config = quic.Config
+
+var _ net.Conn = &Conn{}
+
+type streamClosedFn func(id quic.StreamID)
+
+// wraps quic.Stream and other info to implement net.Conn
+type Conn struct {
+ quic.Stream
+ session quic.Connection
+ // bw BandwidthEstimator
+ onClose streamClosedFn
+ closeOnce sync.Once
+ closeErr error
+}
+
+func newConn(stream quic.Stream, session quic.Connection, onClose streamClosedFn) *Conn {
+ if onClose == nil {
+ onClose = func(id quic.StreamID) {}
+ }
+
+ return &Conn{
+ Stream: stream,
+ session: session,
+ // bw: bw,
+ onClose: onClose,
+ }
+}
+
+// implements net.Conn.Read
+func (c *Conn) Read(b []byte) (int, error) {
+ n, err := c.Stream.Read(b)
+ if err != nil && err != io.EOF {
+ // remote end closed stream
+ if _, ok := err.(*quic.StreamError); ok {
+ err = io.EOF
+ }
+ // treat peer going away as EOF
+ if isPeerGoingAway(err) {
+ err = io.EOF
+ }
+ }
+ return n, err
+}
+
+// implements net.Conn.Write
+func (c *Conn) Write(b []byte) (int, error) {
+ n, err := c.Stream.Write(b)
+ if err != nil && err != io.EOF {
+ // treat "stop sending" as EOF
+ if _, ok := err.(*quic.StreamError); ok {
+ err = io.EOF
+ }
+ // treat peer going away as EOF
+ if isPeerGoingAway(err) {
+ err = io.EOF
+ }
+ }
+ return n, err
+}
+
+// implements net.Conn.Close
+func (c *Conn) Close() error {
+ c.closeOnce.Do(func() {
+ c.close()
+ })
+ return c.closeErr
+}
+
+func (c *Conn) close() error {
+ // this only closes the write side
+ c.closeErr = c.Stream.Close()
+ // to close both ends, this also forefully
+ // cancels any pending reads / in flight data.
+ c.Stream.CancelRead(0)
+ c.onClose(c.StreamID()) // XXX ?
+ return c.closeErr
+}
+
+// implements net.Conn.LocalAddr
+func (c *Conn) LocalAddr() net.Addr {
+ return c.session.LocalAddr()
+}
+
+// implements net.Conn.RemoteAddr
+func (c *Conn) RemoteAddr() net.Addr {
+ return c.session.RemoteAddr()
+}
+
+func (c *Conn) SetDeadline(t time.Time) error {
+ return nil
+}
+
+func (c *Conn) SetReadDeadline(t time.Time) error {
+ return nil
+}
+
+func (c *Conn) SetWriteDeadline(t time.Time) error {
+ return nil
+}
+
+// Returns certificates presented by peer
+func (c *Conn) PeerCertificates() []*x509.Certificate {
+ // the ConnectionState interface the quic-go api is
+ // considered unstable, so this is not exposed directly.
+ return c.session.ConnectionState().TLS.PeerCertificates
+}
+
+/*
+ func (c *Conn) BandwidthEstimate() Bandwidth {
+ return c.bw.BandwidthEstimate()
+ }
+*/
+
+func isPeerGoingAway(err error) bool {
+ if err == nil {
+ return false
+ }
+ str := err.Error()
+
+ if strings.Contains(str, closedConnError) ||
+ strings.Contains(str, applicationNoError) ||
+ strings.Contains(str, noActivity) {
+ return true
+ } else {
+ return false
+ }
+}
+
+// returns a tls.Config with NextProtos set to AlpnH3
+// if NextProtos is unset in the given tls.Config.
+func defaultNextProtos(tlsConf *tls.Config, defaultProtos []string) *tls.Config {
+ if len(tlsConf.NextProtos) == 0 {
+ c := tlsConf.Clone()
+ c.NextProtos = make([]string, len(defaultProtos))
+ copy(c.NextProtos, defaultProtos)
+ return c
+ } else {
+ return tlsConf
+ }
+}