diff options
Diffstat (limited to 'vendor/github.com/pion/udp/conn.go')
-rw-r--r-- | vendor/github.com/pion/udp/conn.go | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/vendor/github.com/pion/udp/conn.go b/vendor/github.com/pion/udp/conn.go new file mode 100644 index 0000000..9f680e8 --- /dev/null +++ b/vendor/github.com/pion/udp/conn.go @@ -0,0 +1,296 @@ +// Package udp provides a connection-oriented listener over a UDP PacketConn +package udp + +import ( + "context" + "errors" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/pion/transport/deadline" + "github.com/pion/transport/packetio" +) + +const receiveMTU = 8192 +const defaultListenBacklog = 128 // same as Linux default + +var errClosedListener = errors.New("udp: listener closed") +var errListenQueueExceeded = errors.New("udp: listen queue exceeded") + +// listener augments a connection-oriented Listener over a UDP PacketConn +type listener struct { + pConn *net.UDPConn + + accepting atomic.Value // bool + acceptCh chan *Conn + doneCh chan struct{} + doneOnce sync.Once + acceptFilter func([]byte) bool + + connLock sync.Mutex + conns map[string]*Conn + connWG sync.WaitGroup + + readWG sync.WaitGroup + errClose atomic.Value // error +} + +// Accept waits for and returns the next connection to the listener. +func (l *listener) Accept() (net.Conn, error) { + select { + case c := <-l.acceptCh: + l.connWG.Add(1) + return c, nil + + case <-l.doneCh: + return nil, errClosedListener + } +} + +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. +func (l *listener) Close() error { + var err error + l.doneOnce.Do(func() { + l.accepting.Store(false) + close(l.doneCh) + + l.connLock.Lock() + // Close unaccepted connections + L_CLOSE: + for { + select { + case c := <-l.acceptCh: + close(c.doneCh) + delete(l.conns, c.rAddr.String()) + + default: + break L_CLOSE + } + } + nConns := len(l.conns) + l.connLock.Unlock() + + l.connWG.Done() + + if nConns == 0 { + // Wait if this is the final connection + l.readWG.Wait() + if errClose, ok := l.errClose.Load().(error); ok { + err = errClose + } + } else { + err = nil + } + }) + + return err +} + +// Addr returns the listener's network address. +func (l *listener) Addr() net.Addr { + return l.pConn.LocalAddr() +} + +// ListenConfig stores options for listening to an address. +type ListenConfig struct { + // Backlog defines the maximum length of the queue of pending + // connections. It is equivalent of the backlog argument of + // POSIX listen function. + // If a connection request arrives when the queue is full, + // the request will be silently discarded, unlike TCP. + // Set zero to use default value 128 which is same as Linux default. + Backlog int + + // AcceptFilter determines whether the new conn should be made for + // the incoming packet. If not set, any packet creates new conn. + AcceptFilter func([]byte) bool +} + +// Listen creates a new listener based on the ListenConfig. +func (lc *ListenConfig) Listen(network string, laddr *net.UDPAddr) (net.Listener, error) { + if lc.Backlog == 0 { + lc.Backlog = defaultListenBacklog + } + + conn, err := net.ListenUDP(network, laddr) + if err != nil { + return nil, err + } + + l := &listener{ + pConn: conn, + acceptCh: make(chan *Conn, lc.Backlog), + conns: make(map[string]*Conn), + doneCh: make(chan struct{}), + acceptFilter: lc.AcceptFilter, + } + l.accepting.Store(true) + l.connWG.Add(1) + l.readWG.Add(2) // wait readLoop and Close execution routine + + go l.readLoop() + go func() { + l.connWG.Wait() + if err := l.pConn.Close(); err != nil { + l.errClose.Store(err) + } + l.readWG.Done() + }() + + return l, nil +} + +// Listen creates a new listener using default ListenConfig. +func Listen(network string, laddr *net.UDPAddr) (net.Listener, error) { + return (&ListenConfig{}).Listen(network, laddr) +} + +var readBufferPool = &sync.Pool{ + New: func() interface{} { + buf := make([]byte, receiveMTU) + return &buf + }, +} + +// readLoop has to tasks: +// 1. Dispatching incoming packets to the correct Conn. +// It can therefore not be ended until all Conns are closed. +// 2. Creating a new Conn when receiving from a new remote. +func (l *listener) readLoop() { + defer l.readWG.Done() + + for { + buf := *(readBufferPool.Get().(*[]byte)) + n, raddr, err := l.pConn.ReadFrom(buf) + if err != nil { + return + } + conn, ok, err := l.getConn(raddr, buf[:n]) + if err != nil { + continue + } + if ok { + _, _ = conn.buffer.Write(buf[:n]) + } + } +} + +func (l *listener) getConn(raddr net.Addr, buf []byte) (*Conn, bool, error) { + l.connLock.Lock() + defer l.connLock.Unlock() + conn, ok := l.conns[raddr.String()] + if !ok { + if !l.accepting.Load().(bool) { + return nil, false, errClosedListener + } + if l.acceptFilter != nil { + if !l.acceptFilter(buf) { + return nil, false, nil + } + } + conn = l.newConn(raddr) + select { + case l.acceptCh <- conn: + l.conns[raddr.String()] = conn + default: + return nil, false, errListenQueueExceeded + } + } + return conn, true, nil +} + +// Conn augments a connection-oriented connection over a UDP PacketConn +type Conn struct { + listener *listener + + rAddr net.Addr + + buffer *packetio.Buffer + + doneCh chan struct{} + doneOnce sync.Once + + writeDeadline *deadline.Deadline +} + +func (l *listener) newConn(rAddr net.Addr) *Conn { + return &Conn{ + listener: l, + rAddr: rAddr, + buffer: packetio.NewBuffer(), + doneCh: make(chan struct{}), + writeDeadline: deadline.New(), + } +} + +// Read +func (c *Conn) Read(p []byte) (int, error) { + return c.buffer.Read(p) +} + +// Write writes len(p) bytes from p to the DTLS connection +func (c *Conn) Write(p []byte) (n int, err error) { + select { + case <-c.writeDeadline.Done(): + return 0, context.DeadlineExceeded + default: + } + return c.listener.pConn.WriteTo(p, c.rAddr) +} + +// Close closes the conn and releases any Read calls +func (c *Conn) Close() error { + var err error + c.doneOnce.Do(func() { + c.listener.connWG.Done() + close(c.doneCh) + c.listener.connLock.Lock() + delete(c.listener.conns, c.rAddr.String()) + nConns := len(c.listener.conns) + c.listener.connLock.Unlock() + + if nConns == 0 && !c.listener.accepting.Load().(bool) { + // Wait if this is the final connection + c.listener.readWG.Wait() + if errClose, ok := c.listener.errClose.Load().(error); ok { + err = errClose + } + } else { + err = nil + } + }) + + return err +} + +// LocalAddr implements net.Conn.LocalAddr +func (c *Conn) LocalAddr() net.Addr { + return c.listener.pConn.LocalAddr() +} + +// RemoteAddr implements net.Conn.RemoteAddr +func (c *Conn) RemoteAddr() net.Addr { + return c.rAddr +} + +// SetDeadline implements net.Conn.SetDeadline +func (c *Conn) SetDeadline(t time.Time) error { + c.writeDeadline.Set(t) + return c.SetReadDeadline(t) +} + +// SetReadDeadline implements net.Conn.SetDeadline +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.buffer.SetReadDeadline(t) +} + +// SetWriteDeadline implements net.Conn.SetDeadline +func (c *Conn) SetWriteDeadline(t time.Time) error { + c.writeDeadline.Set(t) + // Write deadline of underlying connection should not be changed + // since the connection can be shared. + return nil +} |