diff options
Diffstat (limited to 'vendor/github.com/pion/webrtc/v3/rtpreceiver.go')
-rw-r--r-- | vendor/github.com/pion/webrtc/v3/rtpreceiver.go | 361 |
1 files changed, 361 insertions, 0 deletions
diff --git a/vendor/github.com/pion/webrtc/v3/rtpreceiver.go b/vendor/github.com/pion/webrtc/v3/rtpreceiver.go new file mode 100644 index 0000000..8d558e0 --- /dev/null +++ b/vendor/github.com/pion/webrtc/v3/rtpreceiver.go @@ -0,0 +1,361 @@ +// +build !js + +package webrtc + +import ( + "fmt" + "io" + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/srtp/v2" + "github.com/pion/webrtc/v3/internal/util" +) + +// trackStreams maintains a mapping of RTP/RTCP streams to a specific track +// a RTPReceiver may contain multiple streams if we are dealing with Multicast +type trackStreams struct { + track *TrackRemote + + rtpReadStream *srtp.ReadStreamSRTP + rtpInterceptor interceptor.RTPReader + + rtcpReadStream *srtp.ReadStreamSRTCP + rtcpInterceptor interceptor.RTCPReader +} + +// RTPReceiver allows an application to inspect the receipt of a TrackRemote +type RTPReceiver struct { + kind RTPCodecType + transport *DTLSTransport + + tracks []trackStreams + + closed, received chan interface{} + mu sync.RWMutex + + // A reference to the associated api object + api *API +} + +// NewRTPReceiver constructs a new RTPReceiver +func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) { + if transport == nil { + return nil, errRTPReceiverDTLSTransportNil + } + + r := &RTPReceiver{ + kind: kind, + transport: transport, + api: api, + closed: make(chan interface{}), + received: make(chan interface{}), + tracks: []trackStreams{}, + } + + return r, nil +} + +// Transport returns the currently-configured *DTLSTransport or nil +// if one has not yet been configured +func (r *RTPReceiver) Transport() *DTLSTransport { + r.mu.RLock() + defer r.mu.RUnlock() + return r.transport +} + +// GetParameters describes the current configuration for the encoding and +// transmission of media on the receiver's track. +func (r *RTPReceiver) GetParameters() RTPParameters { + return r.api.mediaEngine.getRTPParametersByKind(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionRecvonly}) +} + +// Track returns the RtpTransceiver TrackRemote +func (r *RTPReceiver) Track() *TrackRemote { + r.mu.RLock() + defer r.mu.RUnlock() + + if len(r.tracks) != 1 { + return nil + } + return r.tracks[0].track +} + +// Tracks returns the RtpTransceiver tracks +// A RTPReceiver to support Simulcast may now have multiple tracks +func (r *RTPReceiver) Tracks() []*TrackRemote { + r.mu.RLock() + defer r.mu.RUnlock() + + var tracks []*TrackRemote + for i := range r.tracks { + tracks = append(tracks, r.tracks[i].track) + } + return tracks +} + +// Receive initialize the track and starts all the transports +func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error { + r.mu.Lock() + defer r.mu.Unlock() + select { + case <-r.received: + return errRTPReceiverReceiveAlreadyCalled + default: + } + defer close(r.received) + + if len(parameters.Encodings) == 1 && parameters.Encodings[0].SSRC != 0 { + t := trackStreams{ + track: newTrackRemote( + r.kind, + parameters.Encodings[0].SSRC, + "", + r, + ), + } + + globalParams := r.GetParameters() + codec := RTPCodecCapability{} + if len(globalParams.Codecs) != 0 { + codec = globalParams.Codecs[0].RTPCodecCapability + } + + streamInfo := createStreamInfo("", parameters.Encodings[0].SSRC, 0, codec, globalParams.HeaderExtensions) + var err error + if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, streamInfo); err != nil { + return err + } + + r.tracks = append(r.tracks, t) + } else { + for _, encoding := range parameters.Encodings { + r.tracks = append(r.tracks, trackStreams{ + track: newTrackRemote( + r.kind, + 0, + encoding.RID, + r, + ), + }) + } + } + + return nil +} + +// Read reads incoming RTCP for this RTPReceiver +func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) { + select { + case <-r.received: + return r.tracks[0].rtcpInterceptor.Read(b, a) + case <-r.closed: + return 0, nil, io.ErrClosedPipe + } +} + +// ReadSimulcast reads incoming RTCP for this RTPReceiver for given rid +func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) { + select { + case <-r.received: + for _, t := range r.tracks { + if t.track != nil && t.track.rid == rid { + return t.rtcpInterceptor.Read(b, a) + } + } + return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid) + case <-r.closed: + return 0, nil, io.ErrClosedPipe + } +} + +// ReadRTCP is a convenience method that wraps Read and unmarshal for you. +// It also runs any configured interceptors. +func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) { + b := make([]byte, receiveMTU) + i, attributes, err := r.Read(b) + if err != nil { + return nil, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + if err != nil { + return nil, nil, err + } + + return pkts, attributes, nil +} + +// ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you +func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) { + b := make([]byte, receiveMTU) + i, attributes, err := r.ReadSimulcast(b, rid) + if err != nil { + return nil, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + return pkts, attributes, err +} + +func (r *RTPReceiver) haveReceived() bool { + select { + case <-r.received: + return true + default: + return false + } +} + +// Stop irreversibly stops the RTPReceiver +func (r *RTPReceiver) Stop() error { + r.mu.Lock() + defer r.mu.Unlock() + var err error + + select { + case <-r.closed: + return err + default: + } + + select { + case <-r.received: + for i := range r.tracks { + errs := []error{} + + if r.tracks[i].rtcpReadStream != nil { + errs = append(errs, r.tracks[i].rtcpReadStream.Close()) + } + + if r.tracks[i].rtpReadStream != nil { + errs = append(errs, r.tracks[i].rtpReadStream.Close()) + } + + err = util.FlattenErrs(errs) + } + default: + } + + close(r.closed) + return err +} + +func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams { + for i := range r.tracks { + if r.tracks[i].track == t { + return &r.tracks[i] + } + } + return nil +} + +// readRTP should only be called by a track, this only exists so we can keep state in one place +func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) { + <-r.received + if t := r.streamsForTrack(reader); t != nil { + return t.rtpInterceptor.Read(b, a) + } + + return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) +} + +// receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs +// It populates all the internal state for the given RID +func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, ssrc SSRC) (*TrackRemote, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for i := range r.tracks { + if r.tracks[i].track.RID() == rid { + r.tracks[i].track.mu.Lock() + r.tracks[i].track.kind = r.kind + r.tracks[i].track.codec = params.Codecs[0] + r.tracks[i].track.params = params + r.tracks[i].track.ssrc = ssrc + streamInfo := createStreamInfo("", ssrc, params.Codecs[0].PayloadType, params.Codecs[0].RTPCodecCapability, params.HeaderExtensions) + r.tracks[i].track.mu.Unlock() + + var err error + if r.tracks[i].rtpReadStream, r.tracks[i].rtpInterceptor, r.tracks[i].rtcpReadStream, r.tracks[i].rtcpInterceptor, err = r.streamsForSSRC(ssrc, streamInfo); err != nil { + return nil, err + } + + return r.tracks[i].track, nil + } + } + + return nil, fmt.Errorf("%w: %d", errRTPReceiverForSSRCTrackStreamNotFound, ssrc) +} + +func (r *RTPReceiver) streamsForSSRC(ssrc SSRC, streamInfo interceptor.StreamInfo) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) { + srtpSession, err := r.transport.getSRTPSession() + if err != nil { + return nil, nil, nil, nil, err + } + + rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc)) + if err != nil { + return nil, nil, nil, nil, err + } + + rtpInterceptor := r.api.interceptor.BindRemoteStream(&streamInfo, interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) { + n, err = rtpReadStream.Read(in) + return n, a, err + })) + + srtcpSession, err := r.transport.getSRTCPSession() + if err != nil { + return nil, nil, nil, nil, err + } + + rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc)) + if err != nil { + return nil, nil, nil, nil, err + } + + rtcpInterceptor := r.api.interceptor.BindRTCPReader(interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) { + n, err = rtcpReadStream.Read(in) + return n, a, err + })) + + return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil +} + +// SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever. +func (r *RTPReceiver) SetReadDeadline(t time.Time) error { + r.mu.RLock() + defer r.mu.RUnlock() + + if err := r.tracks[0].rtcpReadStream.SetReadDeadline(t); err != nil { + return err + } + return nil +} + +// SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever. +func (r *RTPReceiver) SetReadDeadlineSimulcast(deadline time.Time, rid string) error { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, t := range r.tracks { + if t.track != nil && t.track.rid == rid { + return t.rtcpReadStream.SetReadDeadline(deadline) + } + } + return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid) +} + +// setRTPReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever. +// This should be fired by calling SetReadDeadline on the TrackRemote +func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote) error { + r.mu.RLock() + defer r.mu.RUnlock() + + if t := r.streamsForTrack(reader); t != nil { + return t.rtpReadStream.SetReadDeadline(deadline) + } + return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC()) +} |