diff options
Diffstat (limited to 'vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go')
-rw-r--r-- | vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go b/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go new file mode 100644 index 0000000..447a949 --- /dev/null +++ b/vendor/github.com/pion/interceptor/pkg/nack/generator_interceptor.go @@ -0,0 +1,162 @@ +package nack + +import ( + "math/rand" + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// GeneratorInterceptor interceptor generates nack feedback messages. +type GeneratorInterceptor struct { + interceptor.NoOp + size uint16 + skipLastN uint16 + interval time.Duration + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger + + receiveLogs map[uint32]*receiveLog + receiveLogsMu sync.Mutex +} + +// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor +func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) { + r := &GeneratorInterceptor{ + size: 8192, + skipLastN: 0, + interval: time.Millisecond * 100, + receiveLogs: map[uint32]*receiveLog{}, + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + if _, err := newReceiveLog(r.size); err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + n.m.Lock() + defer n.m.Unlock() + + if n.isClosed() { + return writer + } + + n.wg.Add(1) + + go n.loop(writer) + + return writer +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + if !streamSupportNack(info) { + return reader + } + + // error is already checked in NewGeneratorInterceptor + receiveLog, _ := newReceiveLog(n.size) + n.receiveLogsMu.Lock() + n.receiveLogs[info.SSRC] = receiveLog + n.receiveLogsMu.Unlock() + + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + pkt := rtp.Packet{} + if err = pkt.Unmarshal(b[:i]); err != nil { + return 0, nil, err + } + receiveLog.add(pkt.Header.SequenceNumber) + + return i, attr, nil + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.receiveLogsMu.Lock() + delete(n.receiveLogs, info.SSRC) + n.receiveLogsMu.Unlock() +} + +// Close closes the interceptor +func (n *GeneratorInterceptor) Close() error { + defer n.wg.Wait() + n.m.Lock() + defer n.m.Unlock() + + if !n.isClosed() { + close(n.close) + } + + return nil +} + +func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer n.wg.Done() + + senderSSRC := rand.Uint32() // #nosec + + ticker := time.NewTicker(n.interval) + for { + select { + case <-ticker.C: + func() { + n.receiveLogsMu.Lock() + defer n.receiveLogsMu.Unlock() + + for ssrc, receiveLog := range n.receiveLogs { + missing := receiveLog.missingSeqNumbers(n.skipLastN) + if len(missing) == 0 { + continue + } + + nack := &rtcp.TransportLayerNack{ + SenderSSRC: senderSSRC, + MediaSSRC: ssrc, + Nacks: rtcp.NackPairsFromSequenceNumbers(missing), + } + + if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed sending nack: %+v", err) + } + } + }() + case <-n.close: + return + } + } +} + +func (n *GeneratorInterceptor) isClosed() bool { + select { + case <-n.close: + return true + default: + return false + } +} |