summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/interceptor/pkg/nack
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/interceptor/pkg/nack')
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/errors.go6
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go162
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/generator_option.go44
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/nack.go14
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/receive_log.go134
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go119
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/responder_option.go23
-rw-r--r--vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go74
8 files changed, 576 insertions, 0 deletions
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/errors.go b/vendor/github.com/pion/interceptor/pkg/nack/errors.go
new file mode 100644
index 0000000..bbfc773
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/errors.go
@@ -0,0 +1,6 @@
+package nack
+
+import "errors"
+
+// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
+var ErrInvalidSize = errors.New("invalid buffer size")
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go b/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go
new file mode 100644
index 0000000..447a949
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go
@@ -0,0 +1,162 @@
+package nack
+
+import (
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/pion/interceptor"
+ "github.com/pion/logging"
+ "github.com/pion/rtcp"
+ "github.com/pion/rtp"
+)
+
+// GeneratorInterceptor interceptor generates nack feedback messages.
+type GeneratorInterceptor struct {
+ interceptor.NoOp
+ size uint16
+ skipLastN uint16
+ interval time.Duration
+ m sync.Mutex
+ wg sync.WaitGroup
+ close chan struct{}
+ log logging.LeveledLogger
+
+ receiveLogs map[uint32]*receiveLog
+ receiveLogsMu sync.Mutex
+}
+
+// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
+func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
+ r := &GeneratorInterceptor{
+ size: 8192,
+ skipLastN: 0,
+ interval: time.Millisecond * 100,
+ receiveLogs: map[uint32]*receiveLog{},
+ close: make(chan struct{}),
+ log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
+ }
+
+ for _, opt := range opts {
+ if err := opt(r); err != nil {
+ return nil, err
+ }
+ }
+
+ if _, err := newReceiveLog(r.size); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
+// will be called once per packet batch.
+func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
+ n.m.Lock()
+ defer n.m.Unlock()
+
+ if n.isClosed() {
+ return writer
+ }
+
+ n.wg.Add(1)
+
+ go n.loop(writer)
+
+ return writer
+}
+
+// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
+// will be called once per rtp packet.
+func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
+ if !streamSupportNack(info) {
+ return reader
+ }
+
+ // error is already checked in NewGeneratorInterceptor
+ receiveLog, _ := newReceiveLog(n.size)
+ n.receiveLogsMu.Lock()
+ n.receiveLogs[info.SSRC] = receiveLog
+ n.receiveLogsMu.Unlock()
+
+ return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
+ i, attr, err := reader.Read(b, a)
+ if err != nil {
+ return 0, nil, err
+ }
+
+ pkt := rtp.Packet{}
+ if err = pkt.Unmarshal(b[:i]); err != nil {
+ return 0, nil, err
+ }
+ receiveLog.add(pkt.Header.SequenceNumber)
+
+ return i, attr, nil
+ })
+}
+
+// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
+func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
+ n.receiveLogsMu.Lock()
+ delete(n.receiveLogs, info.SSRC)
+ n.receiveLogsMu.Unlock()
+}
+
+// Close closes the interceptor
+func (n *GeneratorInterceptor) Close() error {
+ defer n.wg.Wait()
+ n.m.Lock()
+ defer n.m.Unlock()
+
+ if !n.isClosed() {
+ close(n.close)
+ }
+
+ return nil
+}
+
+func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
+ defer n.wg.Done()
+
+ senderSSRC := rand.Uint32() // #nosec
+
+ ticker := time.NewTicker(n.interval)
+ for {
+ select {
+ case <-ticker.C:
+ func() {
+ n.receiveLogsMu.Lock()
+ defer n.receiveLogsMu.Unlock()
+
+ for ssrc, receiveLog := range n.receiveLogs {
+ missing := receiveLog.missingSeqNumbers(n.skipLastN)
+ if len(missing) == 0 {
+ continue
+ }
+
+ nack := &rtcp.TransportLayerNack{
+ SenderSSRC: senderSSRC,
+ MediaSSRC: ssrc,
+ Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
+ }
+
+ if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
+ n.log.Warnf("failed sending nack: %+v", err)
+ }
+ }
+ }()
+ case <-n.close:
+ return
+ }
+ }
+}
+
+func (n *GeneratorInterceptor) isClosed() bool {
+ select {
+ case <-n.close:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/generator_option.go b/vendor/github.com/pion/interceptor/pkg/nack/generator_option.go
new file mode 100644
index 0000000..092f5db
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/generator_option.go
@@ -0,0 +1,44 @@
+package nack
+
+import (
+ "time"
+
+ "github.com/pion/logging"
+)
+
+// GeneratorOption can be used to configure GeneratorInterceptor
+type GeneratorOption func(r *GeneratorInterceptor) error
+
+// GeneratorSize sets the size of the interceptor.
+// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
+func GeneratorSize(size uint16) GeneratorOption {
+ return func(r *GeneratorInterceptor) error {
+ r.size = size
+ return nil
+ }
+}
+
+// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating
+// nack requests.
+func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
+ return func(r *GeneratorInterceptor) error {
+ r.skipLastN = skipLastN
+ return nil
+ }
+}
+
+// GeneratorLog sets a logger for the interceptor
+func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
+ return func(r *GeneratorInterceptor) error {
+ r.log = log
+ return nil
+ }
+}
+
+// GeneratorInterval sets the nack send interval for the interceptor
+func GeneratorInterval(interval time.Duration) GeneratorOption {
+ return func(r *GeneratorInterceptor) error {
+ r.interval = interval
+ return nil
+ }
+}
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/nack.go b/vendor/github.com/pion/interceptor/pkg/nack/nack.go
new file mode 100644
index 0000000..a658e7f
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/nack.go
@@ -0,0 +1,14 @@
+// Package nack provides interceptors to implement sending and receiving negative acknowledgements
+package nack
+
+import "github.com/pion/interceptor"
+
+func streamSupportNack(info *interceptor.StreamInfo) bool {
+ for _, fb := range info.RTCPFeedback {
+ if fb.Type == "nack" && fb.Parameter == "" {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/receive_log.go b/vendor/github.com/pion/interceptor/pkg/nack/receive_log.go
new file mode 100644
index 0000000..8107f59
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/receive_log.go
@@ -0,0 +1,134 @@
+package nack
+
+import (
+ "fmt"
+ "sync"
+)
+
+type receiveLog struct {
+ packets []uint64
+ size uint16
+ end uint16
+ started bool
+ lastConsecutive uint16
+ m sync.RWMutex
+}
+
+func newReceiveLog(size uint16) (*receiveLog, error) {
+ allowedSizes := make([]uint16, 0)
+ correctSize := false
+ for i := 6; i < 16; i++ {
+ if size == 1<<i {
+ correctSize = true
+ break
+ }
+ allowedSizes = append(allowedSizes, 1<<i)
+ }
+
+ if !correctSize {
+ return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes)
+ }
+
+ return &receiveLog{
+ packets: make([]uint64, size/64),
+ size: size,
+ }, nil
+}
+
+func (s *receiveLog) add(seq uint16) {
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if !s.started {
+ s.setReceived(seq)
+ s.end = seq
+ s.started = true
+ s.lastConsecutive = seq
+ return
+ }
+
+ diff := seq - s.end
+ switch {
+ case diff == 0:
+ return
+ case diff < uint16SizeHalf:
+ // this means a positive diff, in other words seq > end (with counting for rollovers)
+ for i := s.end + 1; i != seq; i++ {
+ // clear packets between end and seq (these may contain packets from a "size" ago)
+ s.delReceived(i)
+ }
+ s.end = seq
+
+ if s.lastConsecutive+1 == seq {
+ s.lastConsecutive = seq
+ } else if seq-s.lastConsecutive > s.size {
+ s.lastConsecutive = seq - s.size
+ s.fixLastConsecutive() // there might be valid packets at the beginning of the buffer now
+ }
+ case s.lastConsecutive+1 == seq:
+ // negative diff, seq < end (with counting for rollovers)
+ s.lastConsecutive = seq
+ s.fixLastConsecutive() // there might be other valid packets after seq
+ }
+
+ s.setReceived(seq)
+}
+
+func (s *receiveLog) get(seq uint16) bool {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ diff := s.end - seq
+ if diff >= uint16SizeHalf {
+ return false
+ }
+
+ if diff >= s.size {
+ return false
+ }
+
+ return s.getReceived(seq)
+}
+
+func (s *receiveLog) missingSeqNumbers(skipLastN uint16) []uint16 {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ until := s.end - skipLastN
+ if until-s.lastConsecutive >= uint16SizeHalf {
+ // until < s.lastConsecutive (counting for rollover)
+ return nil
+ }
+
+ missingPacketSeqNums := make([]uint16, 0)
+ for i := s.lastConsecutive + 1; i != until+1; i++ {
+ if !s.getReceived(i) {
+ missingPacketSeqNums = append(missingPacketSeqNums, i)
+ }
+ }
+
+ return missingPacketSeqNums
+}
+
+func (s *receiveLog) setReceived(seq uint16) {
+ pos := seq % s.size
+ s.packets[pos/64] |= 1 << (pos % 64)
+}
+
+func (s *receiveLog) delReceived(seq uint16) {
+ pos := seq % s.size
+ s.packets[pos/64] &^= 1 << (pos % 64)
+}
+
+func (s *receiveLog) getReceived(seq uint16) bool {
+ pos := seq % s.size
+ return (s.packets[pos/64] & (1 << (pos % 64))) != 0
+}
+
+func (s *receiveLog) fixLastConsecutive() {
+ i := s.lastConsecutive + 1
+ for ; i != s.end+1 && s.getReceived(i); i++ {
+ // find all consecutive packets
+ }
+ s.lastConsecutive = i - 1
+}
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go b/vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go
new file mode 100644
index 0000000..121657e
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go
@@ -0,0 +1,119 @@
+package nack
+
+import (
+ "sync"
+
+ "github.com/pion/interceptor"
+ "github.com/pion/logging"
+ "github.com/pion/rtcp"
+ "github.com/pion/rtp"
+)
+
+// ResponderInterceptor responds to nack feedback messages
+type ResponderInterceptor struct {
+ interceptor.NoOp
+ size uint16
+ log logging.LeveledLogger
+
+ streams map[uint32]*localStream
+ streamsMu sync.Mutex
+}
+
+type localStream struct {
+ sendBuffer *sendBuffer
+ rtpWriter interceptor.RTPWriter
+}
+
+// NewResponderInterceptor returns a new GeneratorInterceptor interceptor
+func NewResponderInterceptor(opts ...ResponderOption) (*ResponderInterceptor, error) {
+ r := &ResponderInterceptor{
+ size: 8192,
+ log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"),
+ streams: map[uint32]*localStream{},
+ }
+
+ for _, opt := range opts {
+ if err := opt(r); err != nil {
+ return nil, err
+ }
+ }
+
+ if _, err := newSendBuffer(r.size); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
+// change in the future. The returned method will be called once per packet batch.
+func (n *ResponderInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
+ return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
+ i, attr, err := reader.Read(b, a)
+ if err != nil {
+ return 0, nil, err
+ }
+
+ pkts, err := rtcp.Unmarshal(b[:i])
+ if err != nil {
+ return 0, nil, err
+ }
+ for _, rtcpPacket := range pkts {
+ nack, ok := rtcpPacket.(*rtcp.TransportLayerNack)
+ if !ok {
+ continue
+ }
+
+ go n.resendPackets(nack)
+ }
+
+ return i, attr, err
+ })
+}
+
+// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
+// will be called once per rtp packet.
+func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
+ if !streamSupportNack(info) {
+ return writer
+ }
+
+ // error is already checked in NewGeneratorInterceptor
+ sendBuffer, _ := newSendBuffer(n.size)
+ n.streamsMu.Lock()
+ n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
+ n.streamsMu.Unlock()
+
+ return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
+ sendBuffer.add(&rtp.Packet{Header: *header, Payload: payload})
+ return writer.Write(header, payload, attributes)
+ })
+}
+
+// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
+func (n *ResponderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
+ n.streamsMu.Lock()
+ delete(n.streams, info.SSRC)
+ n.streamsMu.Unlock()
+}
+
+func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
+ n.streamsMu.Lock()
+ stream, ok := n.streams[nack.MediaSSRC]
+ n.streamsMu.Unlock()
+ if !ok {
+ return
+ }
+
+ for i := range nack.Nacks {
+ nack.Nacks[i].Range(func(seq uint16) bool {
+ if p := stream.sendBuffer.get(seq); p != nil {
+ if _, err := stream.rtpWriter.Write(&p.Header, p.Payload, interceptor.Attributes{}); err != nil {
+ n.log.Warnf("failed resending nacked packet: %+v", err)
+ }
+ }
+
+ return true
+ })
+ }
+}
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/responder_option.go b/vendor/github.com/pion/interceptor/pkg/nack/responder_option.go
new file mode 100644
index 0000000..7ad52c8
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/responder_option.go
@@ -0,0 +1,23 @@
+package nack
+
+import "github.com/pion/logging"
+
+// ResponderOption can be used to configure ResponderInterceptor
+type ResponderOption func(s *ResponderInterceptor) error
+
+// ResponderSize sets the size of the interceptor.
+// Size must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
+func ResponderSize(size uint16) ResponderOption {
+ return func(r *ResponderInterceptor) error {
+ r.size = size
+ return nil
+ }
+}
+
+// ResponderLog sets a logger for the interceptor
+func ResponderLog(log logging.LeveledLogger) ResponderOption {
+ return func(r *ResponderInterceptor) error {
+ r.log = log
+ return nil
+ }
+}
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go b/vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go
new file mode 100644
index 0000000..cf3f020
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go
@@ -0,0 +1,74 @@
+package nack
+
+import (
+ "fmt"
+
+ "github.com/pion/rtp"
+)
+
+const (
+ uint16SizeHalf = 1 << 15
+)
+
+type sendBuffer struct {
+ packets []*rtp.Packet
+ size uint16
+ lastAdded uint16
+ started bool
+}
+
+func newSendBuffer(size uint16) (*sendBuffer, error) {
+ allowedSizes := make([]uint16, 0)
+ correctSize := false
+ for i := 0; i < 16; i++ {
+ if size == 1<<i {
+ correctSize = true
+ break
+ }
+ allowedSizes = append(allowedSizes, 1<<i)
+ }
+
+ if !correctSize {
+ return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes)
+ }
+
+ return &sendBuffer{
+ packets: make([]*rtp.Packet, size),
+ size: size,
+ }, nil
+}
+
+func (s *sendBuffer) add(packet *rtp.Packet) {
+ seq := packet.SequenceNumber
+ if !s.started {
+ s.packets[seq%s.size] = packet
+ s.lastAdded = seq
+ s.started = true
+ return
+ }
+
+ diff := seq - s.lastAdded
+ if diff == 0 {
+ return
+ } else if diff < uint16SizeHalf {
+ for i := s.lastAdded + 1; i != seq; i++ {
+ s.packets[i%s.size] = nil
+ }
+ }
+
+ s.packets[seq%s.size] = packet
+ s.lastAdded = seq
+}
+
+func (s *sendBuffer) get(seq uint16) *rtp.Packet {
+ diff := s.lastAdded - seq
+ if diff >= uint16SizeHalf {
+ return nil
+ }
+
+ if diff >= s.size {
+ return nil
+ }
+
+ return s.packets[seq%s.size]
+}