From 50c5fdc8a15f37d506292b02eef992e83752152b Mon Sep 17 00:00:00 2001 From: atanarjuat Date: Mon, 30 May 2022 13:37:16 +0200 Subject: quic explorations --- quicwrapper/README.md | 4 + quicwrapper/listener.go | 206 +++++++++++++++++++++++++++++++++++++++++++++ quicwrapper/quicwrapper.go | 186 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 396 insertions(+) create mode 100644 quicwrapper/README.md create mode 100644 quicwrapper/listener.go create mode 100644 quicwrapper/quicwrapper.go (limited to 'quicwrapper') diff --git a/quicwrapper/README.md b/quicwrapper/README.md new file mode 100644 index 0000000..25d18e6 --- /dev/null +++ b/quicwrapper/README.md @@ -0,0 +1,4 @@ +Code in this package taken from the lantern project: +https://github.com/getlantern/quicwrapper + +License: Apache-2.0 diff --git a/quicwrapper/listener.go b/quicwrapper/listener.go new file mode 100644 index 0000000..de28d2a --- /dev/null +++ b/quicwrapper/listener.go @@ -0,0 +1,206 @@ +package quicwrapper + +import ( + "context" + "crypto/tls" + "net" + "sync" + "sync/atomic" + + "github.com/apex/log" + "github.com/getlantern/ops" + quic "github.com/lucas-clemente/quic-go" +) + +// ListenAddr creates a QUIC server listening on a given address. +// The net.Conn instances returned by the net.Listener may be multiplexed connections. +func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (net.Listener, error) { + tlsConf = defaultNextProtos(tlsConf, DefaultServerProtos) + ql, err := quic.ListenAddr(addr, tlsConf, config) + if err != nil { + return nil, err + } + return listen(ql, tlsConf, config) +} + +// Listen creates a QUIC server listening on a given net.PacketConn +// The net.Conn instances returned by the net.Listener may be multiplexed connections. +// The caller is responsible for closing the net.PacketConn after the listener has been +// closed. +func Listen(pconn net.PacketConn, tlsConf *tls.Config, config *Config) (net.Listener, error) { + tlsConf = defaultNextProtos(tlsConf, DefaultServerProtos) + ql, err := quic.Listen(pconn, tlsConf, config) + if err != nil { + return nil, err + } + return listen(ql, tlsConf, config) +} + +func listen(ql quic.Listener, tlsConf *tls.Config, config *Config) (net.Listener, error) { + l := &listener{ + quicListener: ql, + config: config, + connections: make(chan net.Conn, 1000), + acceptError: make(chan error, 1), + closedSignal: make(chan struct{}), + } + // XXX wat is this? + ops.Go(l.listen) + //ops.Go(l.logStats) + + return l, nil +} + +var _ net.Listener = &listener{} + +// wraps quic.Listener to create a net.Listener +type listener struct { + numConnections int64 + numVirtualConnections int64 + quicListener quic.Listener + config *Config + connections chan net.Conn + acceptError chan error + closedSignal chan struct{} + closeErr error + closeOnce sync.Once +} + +// implements net.Listener.Accept +func (l *listener) Accept() (net.Conn, error) { + select { + case conn, ok := <-l.connections: + if !ok { + return nil, ErrListenerClosed + } + return conn, nil + case err, ok := <-l.acceptError: + if !ok { + return nil, ErrListenerClosed + } + return nil, err + case <-l.closedSignal: + return nil, ErrListenerClosed + } +} + +// implements net.Listener.Close +// Shut down the QUIC listener. +// this implicitly sends CONNECTION_CLOSE frames to peers +// note: it is still the responsibility of the caller +// to call Close() on any Conn returned from Accept() +func (l *listener) Close() error { + l.closeOnce.Do(func() { + close(l.closedSignal) + l.closeErr = l.quicListener.Close() + }) + return l.closeErr +} + +func (l *listener) isClosed() bool { + select { + case <-l.closedSignal: + return true + default: + return false + } +} + +// implements net.Listener.Addr +func (l *listener) Addr() net.Addr { + return l.quicListener.Addr() +} + +func (l *listener) listen() { + group := &sync.WaitGroup{} + + defer func() { + l.Close() + close(l.acceptError) + // wait for writers to exit, drain connections + group.Wait() + close(l.connections) + for c := range l.connections { + c.Close() + } + + log.Debugf("Listener finished with Connections: %d Virtual: %d", atomic.LoadInt64(&l.numConnections), atomic.LoadInt64(&l.numVirtualConnections)) + }() + + for { + session, err := l.quicListener.Accept(context.Background()) + if err != nil { + if !l.isClosed() { + l.acceptError <- err + } + return + } + if l.isClosed() { + session.CloseWithError(0, "") + return + } else { + atomic.AddInt64(&l.numConnections, 1) + group.Add(1) + ops.Go(func() { + l.handleSession(session) + atomic.AddInt64(&l.numConnections, -1) + group.Done() + }) + } + } +} + +func (l *listener) handleSession(session quic.Connection) { + + // keep a smoothed average of the bandwidth estimate + // for the session + // bw := NewEMABandwidthSampler(session) + // bw.Start() + + // track active session connections + active := make(map[quic.StreamID]Conn) + var mx sync.Mutex + + defer func() { + // bw.Stop() + session.CloseWithError(0, "") + + // snapshot any non-closed connections, then nil out active list + // conns being closed will 'remove' themselves from the nil + // list, not the snapshot. + var snapshot map[quic.StreamID]Conn + mx.Lock() + snapshot = active + active = nil + mx.Unlock() + + // immediately close any connections that are still active + for _, conn := range snapshot { + conn.Close() + } + }() + + for { + stream, err := session.AcceptStream(context.Background()) + if err != nil { + if isPeerGoingAway(err) { + // log.Tracef("Accepting stream: Peer going away (%v)", err) + return + } else { + // log.Errorf("Accepting stream: %v", err) + return + } + } else { + atomic.AddInt64(&l.numVirtualConnections, 1) + conn := newConn(stream, session, func(id quic.StreamID) { + atomic.AddInt64(&l.numVirtualConnections, -1) + // remove conn from active list + mx.Lock() + delete(active, id) + mx.Unlock() + }) + + l.connections <- conn + } + } +} diff --git a/quicwrapper/quicwrapper.go b/quicwrapper/quicwrapper.go new file mode 100644 index 0000000..0a6473f --- /dev/null +++ b/quicwrapper/quicwrapper.go @@ -0,0 +1,186 @@ +// Wraps quic structures in standard net interfaces and +// improves context awareness. +// Conn instances created by this package may be multiplexed +package quicwrapper + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "io" + "net" + "strings" + "sync" + "time" + + //"github.com/getlantern/golog" + + quic "github.com/lucas-clemente/quic-go" +) + +const ( + // this is a very non-informative error string that quic-go + // gives back to indicate that something terminated with no explicit error + // e.g. this is returned when a session terminates "normally" + // (peer going away) + applicationNoError = "Application error 0x0" + closedConnError = "use of closed network connection" + noActivity = "recent network activity" + + // This the value represents HTTP/3 protocol (over quic v1). + AlpnH3 = "h3" + // This the value represents HTTP/3 protocol (over quic draft 29). + AlpnH3_29 = "h3-29" +) + +var ( + //log = golog.LoggerFor("quicwrapper") + ErrListenerClosed = errors.New("listener closed") + + // client asks for this unless explicitly specified in tls.Config + DefaultClientProtos = []string{AlpnH3} + // server accepts these unless explicitly specified in tls.Config + DefaultServerProtos = []string{AlpnH3, AlpnH3_29} +) + +type Config = quic.Config + +var _ net.Conn = &Conn{} + +type streamClosedFn func(id quic.StreamID) + +// wraps quic.Stream and other info to implement net.Conn +type Conn struct { + quic.Stream + session quic.Connection + // bw BandwidthEstimator + onClose streamClosedFn + closeOnce sync.Once + closeErr error +} + +func newConn(stream quic.Stream, session quic.Connection, onClose streamClosedFn) *Conn { + if onClose == nil { + onClose = func(id quic.StreamID) {} + } + + return &Conn{ + Stream: stream, + session: session, + // bw: bw, + onClose: onClose, + } +} + +// implements net.Conn.Read +func (c *Conn) Read(b []byte) (int, error) { + n, err := c.Stream.Read(b) + if err != nil && err != io.EOF { + // remote end closed stream + if _, ok := err.(*quic.StreamError); ok { + err = io.EOF + } + // treat peer going away as EOF + if isPeerGoingAway(err) { + err = io.EOF + } + } + return n, err +} + +// implements net.Conn.Write +func (c *Conn) Write(b []byte) (int, error) { + n, err := c.Stream.Write(b) + if err != nil && err != io.EOF { + // treat "stop sending" as EOF + if _, ok := err.(*quic.StreamError); ok { + err = io.EOF + } + // treat peer going away as EOF + if isPeerGoingAway(err) { + err = io.EOF + } + } + return n, err +} + +// implements net.Conn.Close +func (c *Conn) Close() error { + c.closeOnce.Do(func() { + c.close() + }) + return c.closeErr +} + +func (c *Conn) close() error { + // this only closes the write side + c.closeErr = c.Stream.Close() + // to close both ends, this also forefully + // cancels any pending reads / in flight data. + c.Stream.CancelRead(0) + c.onClose(c.StreamID()) // XXX ? + return c.closeErr +} + +// implements net.Conn.LocalAddr +func (c *Conn) LocalAddr() net.Addr { + return c.session.LocalAddr() +} + +// implements net.Conn.RemoteAddr +func (c *Conn) RemoteAddr() net.Addr { + return c.session.RemoteAddr() +} + +func (c *Conn) SetDeadline(t time.Time) error { + return nil +} + +func (c *Conn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *Conn) SetWriteDeadline(t time.Time) error { + return nil +} + +// Returns certificates presented by peer +func (c *Conn) PeerCertificates() []*x509.Certificate { + // the ConnectionState interface the quic-go api is + // considered unstable, so this is not exposed directly. + return c.session.ConnectionState().TLS.PeerCertificates +} + +/* + func (c *Conn) BandwidthEstimate() Bandwidth { + return c.bw.BandwidthEstimate() + } +*/ + +func isPeerGoingAway(err error) bool { + if err == nil { + return false + } + str := err.Error() + + if strings.Contains(str, closedConnError) || + strings.Contains(str, applicationNoError) || + strings.Contains(str, noActivity) { + return true + } else { + return false + } +} + +// returns a tls.Config with NextProtos set to AlpnH3 +// if NextProtos is unset in the given tls.Config. +func defaultNextProtos(tlsConf *tls.Config, defaultProtos []string) *tls.Config { + if len(tlsConf.NextProtos) == 0 { + c := tlsConf.Clone() + c.NextProtos = make([]string, len(defaultProtos)) + copy(c.NextProtos, defaultProtos) + return c + } else { + return tlsConf + } +} -- cgit v1.2.3