summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/webrtc/v3/datachannel.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/webrtc/v3/datachannel.go')
-rw-r--r--vendor/github.com/pion/webrtc/v3/datachannel.go597
1 files changed, 597 insertions, 0 deletions
diff --git a/vendor/github.com/pion/webrtc/v3/datachannel.go b/vendor/github.com/pion/webrtc/v3/datachannel.go
new file mode 100644
index 0000000..f391e7f
--- /dev/null
+++ b/vendor/github.com/pion/webrtc/v3/datachannel.go
@@ -0,0 +1,597 @@
+// +build !js
+
+package webrtc
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pion/datachannel"
+ "github.com/pion/logging"
+ "github.com/pion/webrtc/v3/pkg/rtcerr"
+)
+
+const dataChannelBufferSize = math.MaxUint16 // message size limit for Chromium
+var errSCTPNotEstablished = errors.New("SCTP not established")
+
+// DataChannel represents a WebRTC DataChannel
+// The DataChannel interface represents a network channel
+// which can be used for bidirectional peer-to-peer transfers of arbitrary data
+type DataChannel struct {
+ mu sync.RWMutex
+
+ statsID string
+ label string
+ ordered bool
+ maxPacketLifeTime *uint16
+ maxRetransmits *uint16
+ protocol string
+ negotiated bool
+ id *uint16
+ readyState atomic.Value // DataChannelState
+ bufferedAmountLowThreshold uint64
+ detachCalled bool
+
+ // The binaryType represents attribute MUST, on getting, return the value to
+ // which it was last set. On setting, if the new value is either the string
+ // "blob" or the string "arraybuffer", then set the IDL attribute to this
+ // new value. Otherwise, throw a SyntaxError. When an DataChannel object
+ // is created, the binaryType attribute MUST be initialized to the string
+ // "blob". This attribute controls how binary data is exposed to scripts.
+ // binaryType string
+
+ onMessageHandler func(DataChannelMessage)
+ openHandlerOnce sync.Once
+ onOpenHandler func()
+ onCloseHandler func()
+ onBufferedAmountLow func()
+ onErrorHandler func(error)
+
+ sctpTransport *SCTPTransport
+ dataChannel *datachannel.DataChannel
+
+ // A reference to the associated api object used by this datachannel
+ api *API
+ log logging.LeveledLogger
+}
+
+// NewDataChannel creates a new DataChannel.
+// This constructor is part of the ORTC API. It is not
+// meant to be used together with the basic WebRTC API.
+func (api *API) NewDataChannel(transport *SCTPTransport, params *DataChannelParameters) (*DataChannel, error) {
+ d, err := api.newDataChannel(params, api.settingEngine.LoggerFactory.NewLogger("ortc"))
+ if err != nil {
+ return nil, err
+ }
+
+ err = d.open(transport)
+ if err != nil {
+ return nil, err
+ }
+
+ return d, nil
+}
+
+// newDataChannel is an internal constructor for the data channel used to
+// create the DataChannel object before the networking is set up.
+func (api *API) newDataChannel(params *DataChannelParameters, log logging.LeveledLogger) (*DataChannel, error) {
+ // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #5)
+ if len(params.Label) > 65535 {
+ return nil, &rtcerr.TypeError{Err: ErrStringSizeLimit}
+ }
+
+ d := &DataChannel{
+ statsID: fmt.Sprintf("DataChannel-%d", time.Now().UnixNano()),
+ label: params.Label,
+ protocol: params.Protocol,
+ negotiated: params.Negotiated,
+ id: params.ID,
+ ordered: params.Ordered,
+ maxPacketLifeTime: params.MaxPacketLifeTime,
+ maxRetransmits: params.MaxRetransmits,
+ api: api,
+ log: log,
+ }
+
+ d.setReadyState(DataChannelStateConnecting)
+ return d, nil
+}
+
+// open opens the datachannel over the sctp transport
+func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
+ d.mu.Lock()
+ if d.sctpTransport != nil {
+ // already open
+ d.mu.Unlock()
+ return nil
+ }
+ d.sctpTransport = sctpTransport
+
+ if err := d.ensureSCTP(); err != nil {
+ d.mu.Unlock()
+ return err
+ }
+
+ var channelType datachannel.ChannelType
+ var reliabilityParameter uint32
+
+ switch {
+ case d.maxPacketLifeTime == nil && d.maxRetransmits == nil:
+ if d.ordered {
+ channelType = datachannel.ChannelTypeReliable
+ } else {
+ channelType = datachannel.ChannelTypeReliableUnordered
+ }
+
+ case d.maxRetransmits != nil:
+ reliabilityParameter = uint32(*d.maxRetransmits)
+ if d.ordered {
+ channelType = datachannel.ChannelTypePartialReliableRexmit
+ } else {
+ channelType = datachannel.ChannelTypePartialReliableRexmitUnordered
+ }
+ default:
+ reliabilityParameter = uint32(*d.maxPacketLifeTime)
+ if d.ordered {
+ channelType = datachannel.ChannelTypePartialReliableTimed
+ } else {
+ channelType = datachannel.ChannelTypePartialReliableTimedUnordered
+ }
+ }
+
+ cfg := &datachannel.Config{
+ ChannelType: channelType,
+ Priority: datachannel.ChannelPriorityNormal,
+ ReliabilityParameter: reliabilityParameter,
+ Label: d.label,
+ Protocol: d.protocol,
+ Negotiated: d.negotiated,
+ LoggerFactory: d.api.settingEngine.LoggerFactory,
+ }
+
+ if d.id == nil {
+ err := d.sctpTransport.generateAndSetDataChannelID(d.sctpTransport.dtlsTransport.role(), &d.id)
+ if err != nil {
+ return err
+ }
+ }
+
+ dc, err := datachannel.Dial(d.sctpTransport.association, *d.id, cfg)
+ if err != nil {
+ d.mu.Unlock()
+ return err
+ }
+
+ // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
+ dc.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
+ dc.OnBufferedAmountLow(d.onBufferedAmountLow)
+ d.mu.Unlock()
+
+ d.handleOpen(dc)
+ return nil
+}
+
+func (d *DataChannel) ensureSCTP() error {
+ if d.sctpTransport == nil {
+ return errSCTPNotEstablished
+ }
+
+ d.sctpTransport.lock.RLock()
+ defer d.sctpTransport.lock.RUnlock()
+ if d.sctpTransport.association == nil {
+ return errSCTPNotEstablished
+ }
+ return nil
+}
+
+// Transport returns the SCTPTransport instance the DataChannel is sending over.
+func (d *DataChannel) Transport() *SCTPTransport {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.sctpTransport
+}
+
+// After onOpen is complete check that the user called detach
+// and provide an error message if the call was missed
+func (d *DataChannel) checkDetachAfterOpen() {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ if d.api.settingEngine.detach.DataChannels && !d.detachCalled {
+ d.log.Warn("webrtc.DetachDataChannels() enabled but didn't Detach, call Detach from OnOpen")
+ }
+}
+
+// OnOpen sets an event handler which is invoked when
+// the underlying data transport has been established (or re-established).
+func (d *DataChannel) OnOpen(f func()) {
+ d.mu.Lock()
+ d.openHandlerOnce = sync.Once{}
+ d.onOpenHandler = f
+ d.mu.Unlock()
+
+ if d.ReadyState() == DataChannelStateOpen {
+ // If the data channel is already open, call the handler immediately.
+ go d.openHandlerOnce.Do(func() {
+ f()
+ d.checkDetachAfterOpen()
+ })
+ }
+}
+
+func (d *DataChannel) onOpen() {
+ d.mu.RLock()
+ handler := d.onOpenHandler
+ d.mu.RUnlock()
+
+ if handler != nil {
+ go d.openHandlerOnce.Do(func() {
+ handler()
+ d.checkDetachAfterOpen()
+ })
+ }
+}
+
+// OnClose sets an event handler which is invoked when
+// the underlying data transport has been closed.
+func (d *DataChannel) OnClose(f func()) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.onCloseHandler = f
+}
+
+func (d *DataChannel) onClose() {
+ d.mu.RLock()
+ handler := d.onCloseHandler
+ d.mu.RUnlock()
+
+ if handler != nil {
+ go handler()
+ }
+}
+
+// OnMessage sets an event handler which is invoked on a binary
+// message arrival over the sctp transport from a remote peer.
+// OnMessage can currently receive messages up to 16384 bytes
+// in size. Check out the detach API if you want to use larger
+// message sizes. Note that browser support for larger messages
+// is also limited.
+func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.onMessageHandler = f
+}
+
+func (d *DataChannel) onMessage(msg DataChannelMessage) {
+ d.mu.RLock()
+ handler := d.onMessageHandler
+ d.mu.RUnlock()
+
+ if handler == nil {
+ return
+ }
+ handler(msg)
+}
+
+func (d *DataChannel) handleOpen(dc *datachannel.DataChannel) {
+ d.mu.Lock()
+ d.dataChannel = dc
+ d.mu.Unlock()
+ d.setReadyState(DataChannelStateOpen)
+
+ d.onOpen()
+
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if !d.api.settingEngine.detach.DataChannels {
+ go d.readLoop()
+ }
+}
+
+// OnError sets an event handler which is invoked when
+// the underlying data transport cannot be read.
+func (d *DataChannel) OnError(f func(err error)) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.onErrorHandler = f
+}
+
+func (d *DataChannel) onError(err error) {
+ d.mu.RLock()
+ handler := d.onErrorHandler
+ d.mu.RUnlock()
+
+ if handler != nil {
+ go handler(err)
+ }
+}
+
+// See https://github.com/pion/webrtc/issues/1516
+// nolint:gochecknoglobals
+var rlBufPool = sync.Pool{New: func() interface{} {
+ return make([]byte, dataChannelBufferSize)
+}}
+
+func (d *DataChannel) readLoop() {
+ for {
+ buffer := rlBufPool.Get().([]byte)
+ n, isString, err := d.dataChannel.ReadDataChannel(buffer)
+ if err != nil {
+ rlBufPool.Put(buffer) // nolint:staticcheck
+ d.setReadyState(DataChannelStateClosed)
+ if err != io.EOF {
+ d.onError(err)
+ }
+ d.onClose()
+ return
+ }
+
+ m := DataChannelMessage{Data: make([]byte, n), IsString: isString}
+ copy(m.Data, buffer[:n])
+ // The 'staticcheck' pragma is a false positive on the part of the CI linter.
+ rlBufPool.Put(buffer) // nolint:staticcheck
+
+ // NB: Why was DataChannelMessage not passed as a pointer value?
+ d.onMessage(m) // nolint:staticcheck
+ }
+}
+
+// Send sends the binary message to the DataChannel peer
+func (d *DataChannel) Send(data []byte) error {
+ err := d.ensureOpen()
+ if err != nil {
+ return err
+ }
+
+ _, err = d.dataChannel.WriteDataChannel(data, false)
+ return err
+}
+
+// SendText sends the text message to the DataChannel peer
+func (d *DataChannel) SendText(s string) error {
+ err := d.ensureOpen()
+ if err != nil {
+ return err
+ }
+
+ _, err = d.dataChannel.WriteDataChannel([]byte(s), true)
+ return err
+}
+
+func (d *DataChannel) ensureOpen() error {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.ReadyState() != DataChannelStateOpen {
+ return io.ErrClosedPipe
+ }
+ return nil
+}
+
+// Detach allows you to detach the underlying datachannel. This provides
+// an idiomatic API to work with, however it disables the OnMessage callback.
+// Before calling Detach you have to enable this behavior by calling
+// webrtc.DetachDataChannels(). Combining detached and normal data channels
+// is not supported.
+// Please refer to the data-channels-detach example and the
+// pion/datachannel documentation for the correct way to handle the
+// resulting DataChannel object.
+func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if !d.api.settingEngine.detach.DataChannels {
+ return nil, errDetachNotEnabled
+ }
+
+ if d.dataChannel == nil {
+ return nil, errDetachBeforeOpened
+ }
+
+ d.detachCalled = true
+
+ return d.dataChannel, nil
+}
+
+// Close Closes the DataChannel. It may be called regardless of whether
+// the DataChannel object was created by this peer or the remote peer.
+func (d *DataChannel) Close() error {
+ d.mu.Lock()
+ haveSctpTransport := d.dataChannel != nil
+ d.mu.Unlock()
+
+ if d.ReadyState() == DataChannelStateClosed {
+ return nil
+ }
+
+ d.setReadyState(DataChannelStateClosing)
+ if !haveSctpTransport {
+ return nil
+ }
+
+ return d.dataChannel.Close()
+}
+
+// Label represents a label that can be used to distinguish this
+// DataChannel object from other DataChannel objects. Scripts are
+// allowed to create multiple DataChannel objects with the same label.
+func (d *DataChannel) Label() string {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.label
+}
+
+// Ordered represents if the DataChannel is ordered, and false if
+// out-of-order delivery is allowed.
+func (d *DataChannel) Ordered() bool {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.ordered
+}
+
+// MaxPacketLifeTime represents the length of the time window (msec) during
+// which transmissions and retransmissions may occur in unreliable mode.
+func (d *DataChannel) MaxPacketLifeTime() *uint16 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.maxPacketLifeTime
+}
+
+// MaxRetransmits represents the maximum number of retransmissions that are
+// attempted in unreliable mode.
+func (d *DataChannel) MaxRetransmits() *uint16 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.maxRetransmits
+}
+
+// Protocol represents the name of the sub-protocol used with this
+// DataChannel.
+func (d *DataChannel) Protocol() string {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.protocol
+}
+
+// Negotiated represents whether this DataChannel was negotiated by the
+// application (true), or not (false).
+func (d *DataChannel) Negotiated() bool {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.negotiated
+}
+
+// ID represents the ID for this DataChannel. The value is initially
+// null, which is what will be returned if the ID was not provided at
+// channel creation time, and the DTLS role of the SCTP transport has not
+// yet been negotiated. Otherwise, it will return the ID that was either
+// selected by the script or generated. After the ID is set to a non-null
+// value, it will not change.
+func (d *DataChannel) ID() *uint16 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ return d.id
+}
+
+// ReadyState represents the state of the DataChannel object.
+func (d *DataChannel) ReadyState() DataChannelState {
+ if v := d.readyState.Load(); v != nil {
+ return v.(DataChannelState)
+ }
+ return DataChannelState(0)
+}
+
+// BufferedAmount represents the number of bytes of application data
+// (UTF-8 text and binary data) that have been queued using send(). Even
+// though the data transmission can occur in parallel, the returned value
+// MUST NOT be decreased before the current task yielded back to the event
+// loop to prevent race conditions. The value does not include framing
+// overhead incurred by the protocol, or buffering done by the operating
+// system or network hardware. The value of BufferedAmount slot will only
+// increase with each call to the send() method as long as the ReadyState is
+// open; however, BufferedAmount does not reset to zero once the channel
+// closes.
+func (d *DataChannel) BufferedAmount() uint64 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ if d.dataChannel == nil {
+ return 0
+ }
+ return d.dataChannel.BufferedAmount()
+}
+
+// BufferedAmountLowThreshold represents the threshold at which the
+// bufferedAmount is considered to be low. When the bufferedAmount decreases
+// from above this threshold to equal or below it, the bufferedamountlow
+// event fires. BufferedAmountLowThreshold is initially zero on each new
+// DataChannel, but the application may change its value at any time.
+// The threshold is set to 0 by default.
+func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ if d.dataChannel == nil {
+ return d.bufferedAmountLowThreshold
+ }
+ return d.dataChannel.BufferedAmountLowThreshold()
+}
+
+// SetBufferedAmountLowThreshold is used to update the threshold.
+// See BufferedAmountLowThreshold().
+func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ d.bufferedAmountLowThreshold = th
+
+ if d.dataChannel != nil {
+ d.dataChannel.SetBufferedAmountLowThreshold(th)
+ }
+}
+
+// OnBufferedAmountLow sets an event handler which is invoked when
+// the number of bytes of outgoing data becomes lower than the
+// BufferedAmountLowThreshold.
+func (d *DataChannel) OnBufferedAmountLow(f func()) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ d.onBufferedAmountLow = f
+ if d.dataChannel != nil {
+ d.dataChannel.OnBufferedAmountLow(f)
+ }
+}
+
+func (d *DataChannel) getStatsID() string {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ return d.statsID
+}
+
+func (d *DataChannel) collectStats(collector *statsReportCollector) {
+ collector.Collecting()
+
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ stats := DataChannelStats{
+ Timestamp: statsTimestampNow(),
+ Type: StatsTypeDataChannel,
+ ID: d.statsID,
+ Label: d.label,
+ Protocol: d.protocol,
+ // TransportID string `json:"transportId"`
+ State: d.ReadyState(),
+ }
+
+ if d.id != nil {
+ stats.DataChannelIdentifier = int32(*d.id)
+ }
+
+ if d.dataChannel != nil {
+ stats.MessagesSent = d.dataChannel.MessagesSent()
+ stats.BytesSent = d.dataChannel.BytesSent()
+ stats.MessagesReceived = d.dataChannel.MessagesReceived()
+ stats.BytesReceived = d.dataChannel.BytesReceived()
+ }
+
+ collector.Collect(stats.ID, stats)
+}
+
+func (d *DataChannel) setReadyState(r DataChannelState) {
+ d.readyState.Store(r)
+}