summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/interceptor/pkg/report/sender_interceptor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/interceptor/pkg/report/sender_interceptor.go')
-rw-r--r--vendor/github.com/pion/interceptor/pkg/report/sender_interceptor.go139
1 files changed, 139 insertions, 0 deletions
diff --git a/vendor/github.com/pion/interceptor/pkg/report/sender_interceptor.go b/vendor/github.com/pion/interceptor/pkg/report/sender_interceptor.go
new file mode 100644
index 0000000..e56fd8c
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/report/sender_interceptor.go
@@ -0,0 +1,139 @@
+package report
+
+import (
+ "sync"
+ "time"
+
+ "github.com/pion/interceptor"
+ "github.com/pion/logging"
+ "github.com/pion/rtcp"
+ "github.com/pion/rtp"
+)
+
+func ntpTime(t time.Time) uint64 {
+ // seconds since 1st January 1900
+ s := (float64(t.UnixNano()) / 1000000000) + 2208988800
+
+ // higher 32 bits are the integer part, lower 32 bits are the fractional part
+ integerPart := uint32(s)
+ fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF)
+ return uint64(integerPart)<<32 | uint64(fractionalPart)
+}
+
+// SenderInterceptor interceptor generates sender reports.
+type SenderInterceptor struct {
+ interceptor.NoOp
+ interval time.Duration
+ now func() time.Time
+ streams sync.Map
+ log logging.LeveledLogger
+ m sync.Mutex
+ wg sync.WaitGroup
+ close chan struct{}
+}
+
+// NewSenderInterceptor returns a new SenderInterceptor interceptor.
+func NewSenderInterceptor(opts ...SenderOption) (*SenderInterceptor, error) {
+ s := &SenderInterceptor{
+ interval: 1 * time.Second,
+ now: time.Now,
+ log: logging.NewDefaultLoggerFactory().NewLogger("sender_interceptor"),
+ close: make(chan struct{}),
+ }
+
+ for _, opt := range opts {
+ if err := opt(s); err != nil {
+ return nil, err
+ }
+ }
+
+ return s, nil
+}
+
+func (s *SenderInterceptor) isClosed() bool {
+ select {
+ case <-s.close:
+ return true
+ default:
+ return false
+ }
+}
+
+// Close closes the interceptor.
+func (s *SenderInterceptor) Close() error {
+ defer s.wg.Wait()
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if !s.isClosed() {
+ close(s.close)
+ }
+
+ return nil
+}
+
+// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
+// will be called once per packet batch.
+func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if s.isClosed() {
+ return writer
+ }
+
+ s.wg.Add(1)
+
+ go s.loop(writer)
+
+ return writer
+}
+
+func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
+ defer s.wg.Done()
+
+ ticker := time.NewTicker(s.interval)
+ for {
+ select {
+ case <-ticker.C:
+ now := s.now()
+ s.streams.Range(func(key, value interface{}) bool {
+ ssrc := key.(uint32)
+ stream := value.(*senderStream)
+
+ stream.m.Lock()
+ defer stream.m.Unlock()
+
+ sr := &rtcp.SenderReport{
+ SSRC: ssrc,
+ NTPTime: ntpTime(now),
+ RTPTime: stream.lastRTPTimeRTP + uint32(now.Sub(stream.lastRTPTimeTime).Seconds()*stream.clockRate),
+ PacketCount: stream.packetCount,
+ OctetCount: stream.octetCount,
+ }
+
+ if _, err := rtcpWriter.Write([]rtcp.Packet{sr}, interceptor.Attributes{}); err != nil {
+ s.log.Warnf("failed sending: %+v", err)
+ }
+
+ return true
+ })
+
+ case <-s.close:
+ return
+ }
+ }
+}
+
+// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
+// will be called once per rtp packet.
+func (s *SenderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
+ stream := newSenderStream(info.ClockRate)
+ s.streams.Store(info.SSRC, stream)
+
+ return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) {
+ stream.processRTP(s.now(), header, payload)
+
+ return writer.Write(header, payload, a)
+ })
+}