summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/interceptor/pkg/report/receiver_interceptor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/interceptor/pkg/report/receiver_interceptor.go')
-rw-r--r--vendor/github.com/pion/interceptor/pkg/report/receiver_interceptor.go166
1 files changed, 166 insertions, 0 deletions
diff --git a/vendor/github.com/pion/interceptor/pkg/report/receiver_interceptor.go b/vendor/github.com/pion/interceptor/pkg/report/receiver_interceptor.go
new file mode 100644
index 0000000..5235b99
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/report/receiver_interceptor.go
@@ -0,0 +1,166 @@
+package report
+
+import (
+ "sync"
+ "time"
+
+ "github.com/pion/interceptor"
+ "github.com/pion/logging"
+ "github.com/pion/rtcp"
+ "github.com/pion/rtp"
+)
+
+// ReceiverInterceptor interceptor generates receiver reports.
+type ReceiverInterceptor struct {
+ interceptor.NoOp
+ interval time.Duration
+ now func() time.Time
+ streams sync.Map
+ log logging.LeveledLogger
+ m sync.Mutex
+ wg sync.WaitGroup
+ close chan struct{}
+}
+
+// NewReceiverInterceptor returns a new ReceiverInterceptor interceptor.
+func NewReceiverInterceptor(opts ...ReceiverOption) (*ReceiverInterceptor, error) {
+ r := &ReceiverInterceptor{
+ interval: 1 * time.Second,
+ now: time.Now,
+ log: logging.NewDefaultLoggerFactory().NewLogger("receiver_interceptor"),
+ close: make(chan struct{}),
+ }
+
+ for _, opt := range opts {
+ if err := opt(r); err != nil {
+ return nil, err
+ }
+ }
+
+ return r, nil
+}
+
+func (r *ReceiverInterceptor) isClosed() bool {
+ select {
+ case <-r.close:
+ return true
+ default:
+ return false
+ }
+}
+
+// Close closes the interceptor.
+func (r *ReceiverInterceptor) Close() error {
+ defer r.wg.Wait()
+ r.m.Lock()
+ defer r.m.Unlock()
+
+ if !r.isClosed() {
+ close(r.close)
+ }
+
+ return nil
+}
+
+// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
+// will be called once per packet batch.
+func (r *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
+ r.m.Lock()
+ defer r.m.Unlock()
+
+ if r.isClosed() {
+ return writer
+ }
+
+ r.wg.Add(1)
+
+ go r.loop(writer)
+
+ return writer
+}
+
+func (r *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
+ defer r.wg.Done()
+
+ ticker := time.NewTicker(r.interval)
+ for {
+ select {
+ case <-ticker.C:
+ now := r.now()
+ r.streams.Range(func(key, value interface{}) bool {
+ stream := value.(*receiverStream)
+
+ var pkts []rtcp.Packet
+
+ pkts = append(pkts, stream.generateReport(now))
+
+ if _, err := rtcpWriter.Write(pkts, interceptor.Attributes{}); err != nil {
+ r.log.Warnf("failed sending: %+v", err)
+ }
+
+ return true
+ })
+
+ case <-r.close:
+ return
+ }
+ }
+}
+
+// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
+// will be called once per rtp packet.
+func (r *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
+ stream := newReceiverStream(info.SSRC, info.ClockRate)
+ r.streams.Store(info.SSRC, stream)
+
+ return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
+ i, attr, err := reader.Read(b, a)
+ if err != nil {
+ return 0, nil, err
+ }
+
+ pkt := rtp.Packet{}
+ if err = pkt.Unmarshal(b[:i]); err != nil {
+ return 0, nil, err
+ }
+
+ stream.processRTP(r.now(), &pkt)
+
+ return i, attr, nil
+ })
+}
+
+// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
+func (r *ReceiverInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
+ r.streams.Delete(info.SSRC)
+}
+
+// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
+// change in the future. The returned method will be called once per packet batch.
+func (r *ReceiverInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
+ return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
+ i, attr, err := reader.Read(b, a)
+ if err != nil {
+ return 0, nil, err
+ }
+
+ pkts, err := rtcp.Unmarshal(b[:i])
+ if err != nil {
+ return 0, nil, err
+ }
+
+ for _, pkt := range pkts {
+ if sr, ok := (pkt).(*rtcp.SenderReport); ok {
+ value, ok := r.streams.Load(sr.SSRC)
+ if !ok {
+ continue
+ }
+
+ stream := value.(*receiverStream)
+ stream.processSenderReport(r.now(), sr)
+ }
+ }
+
+ return i, attr, nil
+ })
+}