summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/webrtc/v3/rtpreceiver.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/webrtc/v3/rtpreceiver.go')
-rw-r--r--vendor/github.com/pion/webrtc/v3/rtpreceiver.go361
1 files changed, 361 insertions, 0 deletions
diff --git a/vendor/github.com/pion/webrtc/v3/rtpreceiver.go b/vendor/github.com/pion/webrtc/v3/rtpreceiver.go
new file mode 100644
index 0000000..8d558e0
--- /dev/null
+++ b/vendor/github.com/pion/webrtc/v3/rtpreceiver.go
@@ -0,0 +1,361 @@
+// +build !js
+
+package webrtc
+
+import (
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/pion/interceptor"
+ "github.com/pion/rtcp"
+ "github.com/pion/srtp/v2"
+ "github.com/pion/webrtc/v3/internal/util"
+)
+
+// trackStreams maintains a mapping of RTP/RTCP streams to a specific track
+// a RTPReceiver may contain multiple streams if we are dealing with Multicast
+type trackStreams struct {
+ track *TrackRemote
+
+ rtpReadStream *srtp.ReadStreamSRTP
+ rtpInterceptor interceptor.RTPReader
+
+ rtcpReadStream *srtp.ReadStreamSRTCP
+ rtcpInterceptor interceptor.RTCPReader
+}
+
+// RTPReceiver allows an application to inspect the receipt of a TrackRemote
+type RTPReceiver struct {
+ kind RTPCodecType
+ transport *DTLSTransport
+
+ tracks []trackStreams
+
+ closed, received chan interface{}
+ mu sync.RWMutex
+
+ // A reference to the associated api object
+ api *API
+}
+
+// NewRTPReceiver constructs a new RTPReceiver
+func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) {
+ if transport == nil {
+ return nil, errRTPReceiverDTLSTransportNil
+ }
+
+ r := &RTPReceiver{
+ kind: kind,
+ transport: transport,
+ api: api,
+ closed: make(chan interface{}),
+ received: make(chan interface{}),
+ tracks: []trackStreams{},
+ }
+
+ return r, nil
+}
+
+// Transport returns the currently-configured *DTLSTransport or nil
+// if one has not yet been configured
+func (r *RTPReceiver) Transport() *DTLSTransport {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.transport
+}
+
+// GetParameters describes the current configuration for the encoding and
+// transmission of media on the receiver's track.
+func (r *RTPReceiver) GetParameters() RTPParameters {
+ return r.api.mediaEngine.getRTPParametersByKind(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionRecvonly})
+}
+
+// Track returns the RtpTransceiver TrackRemote
+func (r *RTPReceiver) Track() *TrackRemote {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ if len(r.tracks) != 1 {
+ return nil
+ }
+ return r.tracks[0].track
+}
+
+// Tracks returns the RtpTransceiver tracks
+// A RTPReceiver to support Simulcast may now have multiple tracks
+func (r *RTPReceiver) Tracks() []*TrackRemote {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ var tracks []*TrackRemote
+ for i := range r.tracks {
+ tracks = append(tracks, r.tracks[i].track)
+ }
+ return tracks
+}
+
+// Receive initialize the track and starts all the transports
+func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ select {
+ case <-r.received:
+ return errRTPReceiverReceiveAlreadyCalled
+ default:
+ }
+ defer close(r.received)
+
+ if len(parameters.Encodings) == 1 && parameters.Encodings[0].SSRC != 0 {
+ t := trackStreams{
+ track: newTrackRemote(
+ r.kind,
+ parameters.Encodings[0].SSRC,
+ "",
+ r,
+ ),
+ }
+
+ globalParams := r.GetParameters()
+ codec := RTPCodecCapability{}
+ if len(globalParams.Codecs) != 0 {
+ codec = globalParams.Codecs[0].RTPCodecCapability
+ }
+
+ streamInfo := createStreamInfo("", parameters.Encodings[0].SSRC, 0, codec, globalParams.HeaderExtensions)
+ var err error
+ if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, streamInfo); err != nil {
+ return err
+ }
+
+ r.tracks = append(r.tracks, t)
+ } else {
+ for _, encoding := range parameters.Encodings {
+ r.tracks = append(r.tracks, trackStreams{
+ track: newTrackRemote(
+ r.kind,
+ 0,
+ encoding.RID,
+ r,
+ ),
+ })
+ }
+ }
+
+ return nil
+}
+
+// Read reads incoming RTCP for this RTPReceiver
+func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
+ select {
+ case <-r.received:
+ return r.tracks[0].rtcpInterceptor.Read(b, a)
+ case <-r.closed:
+ return 0, nil, io.ErrClosedPipe
+ }
+}
+
+// ReadSimulcast reads incoming RTCP for this RTPReceiver for given rid
+func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
+ select {
+ case <-r.received:
+ for _, t := range r.tracks {
+ if t.track != nil && t.track.rid == rid {
+ return t.rtcpInterceptor.Read(b, a)
+ }
+ }
+ return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
+ case <-r.closed:
+ return 0, nil, io.ErrClosedPipe
+ }
+}
+
+// ReadRTCP is a convenience method that wraps Read and unmarshal for you.
+// It also runs any configured interceptors.
+func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
+ b := make([]byte, receiveMTU)
+ i, attributes, err := r.Read(b)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ pkts, err := rtcp.Unmarshal(b[:i])
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return pkts, attributes, nil
+}
+
+// ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
+func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
+ b := make([]byte, receiveMTU)
+ i, attributes, err := r.ReadSimulcast(b, rid)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ pkts, err := rtcp.Unmarshal(b[:i])
+ return pkts, attributes, err
+}
+
+func (r *RTPReceiver) haveReceived() bool {
+ select {
+ case <-r.received:
+ return true
+ default:
+ return false
+ }
+}
+
+// Stop irreversibly stops the RTPReceiver
+func (r *RTPReceiver) Stop() error {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ var err error
+
+ select {
+ case <-r.closed:
+ return err
+ default:
+ }
+
+ select {
+ case <-r.received:
+ for i := range r.tracks {
+ errs := []error{}
+
+ if r.tracks[i].rtcpReadStream != nil {
+ errs = append(errs, r.tracks[i].rtcpReadStream.Close())
+ }
+
+ if r.tracks[i].rtpReadStream != nil {
+ errs = append(errs, r.tracks[i].rtpReadStream.Close())
+ }
+
+ err = util.FlattenErrs(errs)
+ }
+ default:
+ }
+
+ close(r.closed)
+ return err
+}
+
+func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
+ for i := range r.tracks {
+ if r.tracks[i].track == t {
+ return &r.tracks[i]
+ }
+ }
+ return nil
+}
+
+// readRTP should only be called by a track, this only exists so we can keep state in one place
+func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
+ <-r.received
+ if t := r.streamsForTrack(reader); t != nil {
+ return t.rtpInterceptor.Read(b, a)
+ }
+
+ return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
+}
+
+// receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs
+// It populates all the internal state for the given RID
+func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, ssrc SSRC) (*TrackRemote, error) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for i := range r.tracks {
+ if r.tracks[i].track.RID() == rid {
+ r.tracks[i].track.mu.Lock()
+ r.tracks[i].track.kind = r.kind
+ r.tracks[i].track.codec = params.Codecs[0]
+ r.tracks[i].track.params = params
+ r.tracks[i].track.ssrc = ssrc
+ streamInfo := createStreamInfo("", ssrc, params.Codecs[0].PayloadType, params.Codecs[0].RTPCodecCapability, params.HeaderExtensions)
+ r.tracks[i].track.mu.Unlock()
+
+ var err error
+ if r.tracks[i].rtpReadStream, r.tracks[i].rtpInterceptor, r.tracks[i].rtcpReadStream, r.tracks[i].rtcpInterceptor, err = r.streamsForSSRC(ssrc, streamInfo); err != nil {
+ return nil, err
+ }
+
+ return r.tracks[i].track, nil
+ }
+ }
+
+ return nil, fmt.Errorf("%w: %d", errRTPReceiverForSSRCTrackStreamNotFound, ssrc)
+}
+
+func (r *RTPReceiver) streamsForSSRC(ssrc SSRC, streamInfo interceptor.StreamInfo) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
+ srtpSession, err := r.transport.getSRTPSession()
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc))
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ rtpInterceptor := r.api.interceptor.BindRemoteStream(&streamInfo, interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
+ n, err = rtpReadStream.Read(in)
+ return n, a, err
+ }))
+
+ srtcpSession, err := r.transport.getSRTCPSession()
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc))
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ rtcpInterceptor := r.api.interceptor.BindRTCPReader(interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
+ n, err = rtcpReadStream.Read(in)
+ return n, a, err
+ }))
+
+ return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil
+}
+
+// SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever.
+func (r *RTPReceiver) SetReadDeadline(t time.Time) error {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ if err := r.tracks[0].rtcpReadStream.SetReadDeadline(t); err != nil {
+ return err
+ }
+ return nil
+}
+
+// SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
+func (r *RTPReceiver) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ for _, t := range r.tracks {
+ if t.track != nil && t.track.rid == rid {
+ return t.rtcpReadStream.SetReadDeadline(deadline)
+ }
+ }
+ return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
+}
+
+// setRTPReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever.
+// This should be fired by calling SetReadDeadline on the TrackRemote
+func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote) error {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ if t := r.streamsForTrack(reader); t != nil {
+ return t.rtpReadStream.SetReadDeadline(deadline)
+ }
+ return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
+}