summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/interceptor/pkg/report/receiver_stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/interceptor/pkg/report/receiver_stream.go')
-rw-r--r--vendor/github.com/pion/interceptor/pkg/report/receiver_stream.go159
1 files changed, 159 insertions, 0 deletions
diff --git a/vendor/github.com/pion/interceptor/pkg/report/receiver_stream.go b/vendor/github.com/pion/interceptor/pkg/report/receiver_stream.go
new file mode 100644
index 0000000..569715d
--- /dev/null
+++ b/vendor/github.com/pion/interceptor/pkg/report/receiver_stream.go
@@ -0,0 +1,159 @@
+package report
+
+import (
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/pion/rtcp"
+ "github.com/pion/rtp"
+)
+
+type receiverStream struct {
+ ssrc uint32
+ receiverSSRC uint32
+ clockRate float64
+
+ m sync.Mutex
+ size uint16
+ packets []uint64
+ started bool
+ seqnumCycles uint16
+ lastSeqnum uint16
+ lastReportSeqnum uint16
+ lastRTPTimeRTP uint32
+ lastRTPTimeTime time.Time
+ jitter float64
+ lastSenderReport uint32
+ lastSenderReportTime time.Time
+ totalLost uint32
+}
+
+func newReceiverStream(ssrc uint32, clockRate uint32) *receiverStream {
+ receiverSSRC := rand.Uint32() // #nosec
+ return &receiverStream{
+ ssrc: ssrc,
+ receiverSSRC: receiverSSRC,
+ clockRate: float64(clockRate),
+ size: 128,
+ packets: make([]uint64, 128),
+ }
+}
+
+func (stream *receiverStream) processRTP(now time.Time, pkt *rtp.Packet) {
+ stream.m.Lock()
+ defer stream.m.Unlock()
+
+ if !stream.started { // first frame
+ stream.started = true
+ stream.setReceived(pkt.SequenceNumber)
+ stream.lastSeqnum = pkt.SequenceNumber
+ stream.lastReportSeqnum = pkt.SequenceNumber - 1
+ stream.lastRTPTimeRTP = pkt.Timestamp
+ stream.lastRTPTimeTime = now
+ } else { // following frames
+ stream.setReceived(pkt.SequenceNumber)
+
+ diff := int32(pkt.SequenceNumber) - int32(stream.lastSeqnum)
+ if diff > 0 || diff < -0x0FFF {
+ // overflow
+ if diff < -0x0FFF {
+ stream.seqnumCycles++
+ }
+
+ // set missing packets as missing
+ for i := stream.lastSeqnum + 1; i != pkt.SequenceNumber; i++ {
+ stream.delReceived(i)
+ }
+
+ stream.lastSeqnum = pkt.SequenceNumber
+ }
+
+ // compute jitter
+ // https://tools.ietf.org/html/rfc3550#page-39
+ D := now.Sub(stream.lastRTPTimeTime).Seconds()*stream.clockRate -
+ (float64(pkt.Timestamp) - float64(stream.lastRTPTimeRTP))
+ if D < 0 {
+ D = -D
+ }
+ stream.jitter += (D - stream.jitter) / 16
+ stream.lastRTPTimeRTP = pkt.Timestamp
+ stream.lastRTPTimeTime = now
+ }
+}
+
+func (stream *receiverStream) setReceived(seq uint16) {
+ pos := seq % stream.size
+ stream.packets[pos/64] |= 1 << (pos % 64)
+}
+
+func (stream *receiverStream) delReceived(seq uint16) {
+ pos := seq % stream.size
+ stream.packets[pos/64] &^= 1 << (pos % 64)
+}
+
+func (stream *receiverStream) getReceived(seq uint16) bool {
+ pos := seq % stream.size
+ return (stream.packets[pos/64] & (1 << (pos % 64))) != 0
+}
+
+func (stream *receiverStream) processSenderReport(now time.Time, sr *rtcp.SenderReport) {
+ stream.m.Lock()
+ defer stream.m.Unlock()
+
+ stream.lastSenderReport = uint32(sr.NTPTime >> 16)
+ stream.lastSenderReportTime = now
+}
+
+func (stream *receiverStream) generateReport(now time.Time) *rtcp.ReceiverReport {
+ stream.m.Lock()
+ defer stream.m.Unlock()
+
+ totalSinceReport := stream.lastSeqnum - stream.lastReportSeqnum
+ totalLostSinceReport := func() uint32 {
+ if stream.lastSeqnum == stream.lastReportSeqnum {
+ return 0
+ }
+
+ ret := uint32(0)
+ for i := stream.lastReportSeqnum + 1; i != stream.lastSeqnum; i++ {
+ if !stream.getReceived(i) {
+ ret++
+ }
+ }
+ return ret
+ }()
+ stream.totalLost += totalLostSinceReport
+
+ // allow up to 24 bits
+ if totalLostSinceReport > 0xFFFFFF {
+ totalLostSinceReport = 0xFFFFFF
+ }
+ if stream.totalLost > 0xFFFFFF {
+ stream.totalLost = 0xFFFFFF
+ }
+
+ r := &rtcp.ReceiverReport{
+ SSRC: stream.receiverSSRC,
+ Reports: []rtcp.ReceptionReport{
+ {
+ SSRC: stream.ssrc,
+ LastSequenceNumber: uint32(stream.seqnumCycles)<<16 | uint32(stream.lastSeqnum),
+ LastSenderReport: stream.lastSenderReport,
+ FractionLost: uint8(float64(totalLostSinceReport*256) / float64(totalSinceReport)),
+ TotalLost: stream.totalLost,
+ Delay: func() uint32 {
+ if stream.lastSenderReportTime.IsZero() {
+ return 0
+ }
+ return uint32(now.Sub(stream.lastSenderReportTime).Seconds() * 65536)
+ }(),
+ Jitter: uint32(stream.jitter),
+ },
+ },
+ }
+
+ stream.lastReportSeqnum = stream.lastSeqnum
+
+ return r
+}