diff options
Diffstat (limited to 'vendor/github.com/pion/interceptor/pkg/nack')
8 files changed, 576 insertions, 0 deletions
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/errors.go b/vendor/github.com/pion/interceptor/pkg/nack/errors.go new file mode 100644 index 0000000..bbfc773 --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/errors.go @@ -0,0 +1,6 @@ +package nack + +import "errors" + +// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied. +var ErrInvalidSize = errors.New("invalid buffer size") diff --git a/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go b/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go new file mode 100644 index 0000000..447a949 --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go @@ -0,0 +1,162 @@ +package nack + +import ( + "math/rand" + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// GeneratorInterceptor interceptor generates nack feedback messages. +type GeneratorInterceptor struct { + interceptor.NoOp + size uint16 + skipLastN uint16 + interval time.Duration + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger + + receiveLogs map[uint32]*receiveLog + receiveLogsMu sync.Mutex +} + +// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor +func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) { + r := &GeneratorInterceptor{ + size: 8192, + skipLastN: 0, + interval: time.Millisecond * 100, + receiveLogs: map[uint32]*receiveLog{}, + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + if _, err := newReceiveLog(r.size); err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + n.m.Lock() + defer n.m.Unlock() + + if n.isClosed() { + return writer + } + + n.wg.Add(1) + + go n.loop(writer) + + return writer +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + if !streamSupportNack(info) { + return reader + } + + // error is already checked in NewGeneratorInterceptor + receiveLog, _ := newReceiveLog(n.size) + n.receiveLogsMu.Lock() + n.receiveLogs[info.SSRC] = receiveLog + n.receiveLogsMu.Unlock() + + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + pkt := rtp.Packet{} + if err = pkt.Unmarshal(b[:i]); err != nil { + return 0, nil, err + } + receiveLog.add(pkt.Header.SequenceNumber) + + return i, attr, nil + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.receiveLogsMu.Lock() + delete(n.receiveLogs, info.SSRC) + n.receiveLogsMu.Unlock() +} + +// Close closes the interceptor +func (n *GeneratorInterceptor) Close() error { + defer n.wg.Wait() + n.m.Lock() + defer n.m.Unlock() + + if !n.isClosed() { + close(n.close) + } + + return nil +} + +func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer n.wg.Done() + + senderSSRC := rand.Uint32() // #nosec + + ticker := time.NewTicker(n.interval) + for { + select { + case <-ticker.C: + func() { + n.receiveLogsMu.Lock() + defer n.receiveLogsMu.Unlock() + + for ssrc, receiveLog := range n.receiveLogs { + missing := receiveLog.missingSeqNumbers(n.skipLastN) + if len(missing) == 0 { + continue + } + + nack := &rtcp.TransportLayerNack{ + SenderSSRC: senderSSRC, + MediaSSRC: ssrc, + Nacks: rtcp.NackPairsFromSequenceNumbers(missing), + } + + if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed sending nack: %+v", err) + } + } + }() + case <-n.close: + return + } + } +} + +func (n *GeneratorInterceptor) isClosed() bool { + select { + case <-n.close: + return true + default: + return false + } +} diff --git a/vendor/github.com/pion/interceptor/pkg/nack/generator_option.go b/vendor/github.com/pion/interceptor/pkg/nack/generator_option.go new file mode 100644 index 0000000..092f5db --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/generator_option.go @@ -0,0 +1,44 @@ +package nack + +import ( + "time" + + "github.com/pion/logging" +) + +// GeneratorOption can be used to configure GeneratorInterceptor +type GeneratorOption func(r *GeneratorInterceptor) error + +// GeneratorSize sets the size of the interceptor. +// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 +func GeneratorSize(size uint16) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.size = size + return nil + } +} + +// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating +// nack requests. +func GeneratorSkipLastN(skipLastN uint16) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.skipLastN = skipLastN + return nil + } +} + +// GeneratorLog sets a logger for the interceptor +func GeneratorLog(log logging.LeveledLogger) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.log = log + return nil + } +} + +// GeneratorInterval sets the nack send interval for the interceptor +func GeneratorInterval(interval time.Duration) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.interval = interval + return nil + } +} diff --git a/vendor/github.com/pion/interceptor/pkg/nack/nack.go b/vendor/github.com/pion/interceptor/pkg/nack/nack.go new file mode 100644 index 0000000..a658e7f --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/nack.go @@ -0,0 +1,14 @@ +// Package nack provides interceptors to implement sending and receiving negative acknowledgements +package nack + +import "github.com/pion/interceptor" + +func streamSupportNack(info *interceptor.StreamInfo) bool { + for _, fb := range info.RTCPFeedback { + if fb.Type == "nack" && fb.Parameter == "" { + return true + } + } + + return false +} diff --git a/vendor/github.com/pion/interceptor/pkg/nack/receive_log.go b/vendor/github.com/pion/interceptor/pkg/nack/receive_log.go new file mode 100644 index 0000000..8107f59 --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/receive_log.go @@ -0,0 +1,134 @@ +package nack + +import ( + "fmt" + "sync" +) + +type receiveLog struct { + packets []uint64 + size uint16 + end uint16 + started bool + lastConsecutive uint16 + m sync.RWMutex +} + +func newReceiveLog(size uint16) (*receiveLog, error) { + allowedSizes := make([]uint16, 0) + correctSize := false + for i := 6; i < 16; i++ { + if size == 1<<i { + correctSize = true + break + } + allowedSizes = append(allowedSizes, 1<<i) + } + + if !correctSize { + return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes) + } + + return &receiveLog{ + packets: make([]uint64, size/64), + size: size, + }, nil +} + +func (s *receiveLog) add(seq uint16) { + s.m.Lock() + defer s.m.Unlock() + + if !s.started { + s.setReceived(seq) + s.end = seq + s.started = true + s.lastConsecutive = seq + return + } + + diff := seq - s.end + switch { + case diff == 0: + return + case diff < uint16SizeHalf: + // this means a positive diff, in other words seq > end (with counting for rollovers) + for i := s.end + 1; i != seq; i++ { + // clear packets between end and seq (these may contain packets from a "size" ago) + s.delReceived(i) + } + s.end = seq + + if s.lastConsecutive+1 == seq { + s.lastConsecutive = seq + } else if seq-s.lastConsecutive > s.size { + s.lastConsecutive = seq - s.size + s.fixLastConsecutive() // there might be valid packets at the beginning of the buffer now + } + case s.lastConsecutive+1 == seq: + // negative diff, seq < end (with counting for rollovers) + s.lastConsecutive = seq + s.fixLastConsecutive() // there might be other valid packets after seq + } + + s.setReceived(seq) +} + +func (s *receiveLog) get(seq uint16) bool { + s.m.RLock() + defer s.m.RUnlock() + + diff := s.end - seq + if diff >= uint16SizeHalf { + return false + } + + if diff >= s.size { + return false + } + + return s.getReceived(seq) +} + +func (s *receiveLog) missingSeqNumbers(skipLastN uint16) []uint16 { + s.m.RLock() + defer s.m.RUnlock() + + until := s.end - skipLastN + if until-s.lastConsecutive >= uint16SizeHalf { + // until < s.lastConsecutive (counting for rollover) + return nil + } + + missingPacketSeqNums := make([]uint16, 0) + for i := s.lastConsecutive + 1; i != until+1; i++ { + if !s.getReceived(i) { + missingPacketSeqNums = append(missingPacketSeqNums, i) + } + } + + return missingPacketSeqNums +} + +func (s *receiveLog) setReceived(seq uint16) { + pos := seq % s.size + s.packets[pos/64] |= 1 << (pos % 64) +} + +func (s *receiveLog) delReceived(seq uint16) { + pos := seq % s.size + s.packets[pos/64] &^= 1 << (pos % 64) +} + +func (s *receiveLog) getReceived(seq uint16) bool { + pos := seq % s.size + return (s.packets[pos/64] & (1 << (pos % 64))) != 0 +} + +func (s *receiveLog) fixLastConsecutive() { + i := s.lastConsecutive + 1 + for ; i != s.end+1 && s.getReceived(i); i++ { + // find all consecutive packets + } + s.lastConsecutive = i - 1 +} diff --git a/vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go b/vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go new file mode 100644 index 0000000..121657e --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/responder_interceptor.go @@ -0,0 +1,119 @@ +package nack + +import ( + "sync" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// ResponderInterceptor responds to nack feedback messages +type ResponderInterceptor struct { + interceptor.NoOp + size uint16 + log logging.LeveledLogger + + streams map[uint32]*localStream + streamsMu sync.Mutex +} + +type localStream struct { + sendBuffer *sendBuffer + rtpWriter interceptor.RTPWriter +} + +// NewResponderInterceptor returns a new GeneratorInterceptor interceptor +func NewResponderInterceptor(opts ...ResponderOption) (*ResponderInterceptor, error) { + r := &ResponderInterceptor{ + size: 8192, + log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"), + streams: map[uint32]*localStream{}, + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + if _, err := newSendBuffer(r.size); err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (n *ResponderInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + if err != nil { + return 0, nil, err + } + for _, rtcpPacket := range pkts { + nack, ok := rtcpPacket.(*rtcp.TransportLayerNack) + if !ok { + continue + } + + go n.resendPackets(nack) + } + + return i, attr, err + }) +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + if !streamSupportNack(info) { + return writer + } + + // error is already checked in NewGeneratorInterceptor + sendBuffer, _ := newSendBuffer(n.size) + n.streamsMu.Lock() + n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer} + n.streamsMu.Unlock() + + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + sendBuffer.add(&rtp.Packet{Header: *header, Payload: payload}) + return writer.Write(header, payload, attributes) + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *ResponderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.streamsMu.Lock() + delete(n.streams, info.SSRC) + n.streamsMu.Unlock() +} + +func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) { + n.streamsMu.Lock() + stream, ok := n.streams[nack.MediaSSRC] + n.streamsMu.Unlock() + if !ok { + return + } + + for i := range nack.Nacks { + nack.Nacks[i].Range(func(seq uint16) bool { + if p := stream.sendBuffer.get(seq); p != nil { + if _, err := stream.rtpWriter.Write(&p.Header, p.Payload, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed resending nacked packet: %+v", err) + } + } + + return true + }) + } +} diff --git a/vendor/github.com/pion/interceptor/pkg/nack/responder_option.go b/vendor/github.com/pion/interceptor/pkg/nack/responder_option.go new file mode 100644 index 0000000..7ad52c8 --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/responder_option.go @@ -0,0 +1,23 @@ +package nack + +import "github.com/pion/logging" + +// ResponderOption can be used to configure ResponderInterceptor +type ResponderOption func(s *ResponderInterceptor) error + +// ResponderSize sets the size of the interceptor. +// Size must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 +func ResponderSize(size uint16) ResponderOption { + return func(r *ResponderInterceptor) error { + r.size = size + return nil + } +} + +// ResponderLog sets a logger for the interceptor +func ResponderLog(log logging.LeveledLogger) ResponderOption { + return func(r *ResponderInterceptor) error { + r.log = log + return nil + } +} diff --git a/vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go b/vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go new file mode 100644 index 0000000..cf3f020 --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/send_buffer.go @@ -0,0 +1,74 @@ +package nack + +import ( + "fmt" + + "github.com/pion/rtp" +) + +const ( + uint16SizeHalf = 1 << 15 +) + +type sendBuffer struct { + packets []*rtp.Packet + size uint16 + lastAdded uint16 + started bool +} + +func newSendBuffer(size uint16) (*sendBuffer, error) { + allowedSizes := make([]uint16, 0) + correctSize := false + for i := 0; i < 16; i++ { + if size == 1<<i { + correctSize = true + break + } + allowedSizes = append(allowedSizes, 1<<i) + } + + if !correctSize { + return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes) + } + + return &sendBuffer{ + packets: make([]*rtp.Packet, size), + size: size, + }, nil +} + +func (s *sendBuffer) add(packet *rtp.Packet) { + seq := packet.SequenceNumber + if !s.started { + s.packets[seq%s.size] = packet + s.lastAdded = seq + s.started = true + return + } + + diff := seq - s.lastAdded + if diff == 0 { + return + } else if diff < uint16SizeHalf { + for i := s.lastAdded + 1; i != seq; i++ { + s.packets[i%s.size] = nil + } + } + + s.packets[seq%s.size] = packet + s.lastAdded = seq +} + +func (s *sendBuffer) get(seq uint16) *rtp.Packet { + diff := s.lastAdded - seq + if diff >= uint16SizeHalf { + return nil + } + + if diff >= s.size { + return nil + } + + return s.packets[seq%s.size] +} |