diff options
Diffstat (limited to 'vendor/github.com/pion/sctp')
54 files changed, 6633 insertions, 0 deletions
diff --git a/vendor/github.com/pion/sctp/.gitignore b/vendor/github.com/pion/sctp/.gitignore new file mode 100644 index 0000000..d39fb86 --- /dev/null +++ b/vendor/github.com/pion/sctp/.gitignore @@ -0,0 +1 @@ +*.sw[poe] diff --git a/vendor/github.com/pion/sctp/.golangci.yml b/vendor/github.com/pion/sctp/.golangci.yml new file mode 100644 index 0000000..4213697 --- /dev/null +++ b/vendor/github.com/pion/sctp/.golangci.yml @@ -0,0 +1,82 @@ +linters-settings: + govet: + check-shadowing: true + misspell: + locale: US + exhaustive: + default-signifies-exhaustive: true + +linters: + enable: + - asciicheck # Simple linter to check that your code does not contain non-ASCII identifiers + - bodyclose # checks whether HTTP response body is closed successfully + - deadcode # Finds unused code + - depguard # Go linter that checks if package imports are in a list of acceptable packages + - dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f()) + - dupl # Tool for code clone detection + - errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases + - exhaustive # check exhaustiveness of enum switch statements + - exportloopref # checks for pointers to enclosing loop variables + - gci # Gci control golang package import order and make it always deterministic. + - gochecknoglobals # Checks that no globals are present in Go code + - gochecknoinits # Checks that no init functions are present in Go code + - gocognit # Computes and checks the cognitive complexity of functions + - goconst # Finds repeated strings that could be replaced by a constant + - gocritic # The most opinionated Go source code linter + - godox # Tool for detection of FIXME, TODO and other comment keywords + - goerr113 # Golang linter to check the errors handling expressions + - gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification + - gofumpt # Gofumpt checks whether code was gofumpt-ed. + - goheader # Checks is file header matches to pattern + - goimports # Goimports does everything that gofmt does. Additionally it checks unused imports + - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes + - gomodguard # Allow and block list linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations. + - goprintffuncname # Checks that printf-like functions are named with `f` at the end + - gosec # Inspects source code for security problems + - gosimple # Linter for Go source code that specializes in simplifying a code + - govet # Vet examines Go source code and reports suspicious constructs, such as Printf calls whose arguments do not align with the format string + - ineffassign # Detects when assignments to existing variables are not used + - misspell # Finds commonly misspelled English words in comments + - nakedret # Finds naked returns in functions greater than a specified function length + - noctx # noctx finds sending http request without context.Context + - scopelint # Scopelint checks for unpinned variables in go programs + - staticcheck # Staticcheck is a go vet on steroids, applying a ton of static analysis checks + - structcheck # Finds unused struct fields + - stylecheck # Stylecheck is a replacement for golint + - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code + - unconvert # Remove unnecessary type conversions + - unparam # Reports unused function parameters + - unused # Checks Go code for unused constants, variables, functions and types + - varcheck # Finds unused global variables and constants + - whitespace # Tool for detection of leading and trailing whitespace + disable: + - funlen # Tool for detection of long functions + - gocyclo # Computes and checks the cyclomatic complexity of functions + - godot # Check if comments end in a period + - gomnd # An analyzer to detect magic numbers. + - lll # Reports long lines + - maligned # Tool to detect Go structs that would take less memory if their fields were sorted + - nestif # Reports deeply nested if statements + - nlreturn # nlreturn checks for a new line before return and branch statements to increase code clarity + - nolintlint # Reports ill-formed or insufficient nolint directives + - prealloc # Finds slice declarations that could potentially be preallocated + - rowserrcheck # checks whether Err of rows is checked successfully + - sqlclosecheck # Checks that sql.Rows and sql.Stmt are closed. + - testpackage # linter that makes you use a separate _test package + - wsl # Whitespace Linter - Forces you to use empty lines! + +issues: + exclude-rules: + # Allow complex tests, better to be self contained + - path: _test\.go + linters: + - gocognit + + # Allow complex main function in examples + - path: examples + text: "of func `main` is high" + linters: + - gocognit + +run: + skip-dirs-use-default: false diff --git a/vendor/github.com/pion/sctp/DESIGN.md b/vendor/github.com/pion/sctp/DESIGN.md new file mode 100644 index 0000000..02ac161 --- /dev/null +++ b/vendor/github.com/pion/sctp/DESIGN.md @@ -0,0 +1,20 @@ +<h1 align="center"> + Design +</h1> + +### Portable +Pion SCTP is written in Go and extremely portable. Anywhere Golang runs, Pion SCTP should work as well! Instead of dealing with complicated +cross-compiling of multiple libraries, you now can run anywhere with one `go build` + +### Simple API +The API is based on an io.ReadWriteCloser. + +### Readable +If code comes from an RFC we try to make sure everything is commented with a link to the spec. +This makes learning and debugging easier, this library was written to also serve as a guide for others. + +### Tested +Every commit is tested via travis-ci Go provides fantastic facilities for testing, and more will be added as time goes on. + +### Shared libraries +Every pion product is built using shared libraries, allowing others to review and reuse our libraries. diff --git a/vendor/github.com/pion/sctp/LICENSE b/vendor/github.com/pion/sctp/LICENSE new file mode 100644 index 0000000..ab60297 --- /dev/null +++ b/vendor/github.com/pion/sctp/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/pion/sctp/README.md b/vendor/github.com/pion/sctp/README.md new file mode 100644 index 0000000..18b5693 --- /dev/null +++ b/vendor/github.com/pion/sctp/README.md @@ -0,0 +1,54 @@ +<h1 align="center"> + <br> + Pion SCTP + <br> +</h1> +<h4 align="center">A Go implementation of SCTP</h4> +<p align="center"> + <a href="https://pion.ly"><img src="https://img.shields.io/badge/pion-sctp-gray.svg?longCache=true&colorB=brightgreen" alt="Pion SCTP"></a> + <!--<a href="https://sourcegraph.com/github.com/pion/webrtc?badge"><img src="https://sourcegraph.com/github.com/pion/webrtc/-/badge.svg" alt="Sourcegraph Widget"></a>--> + <a href="https://pion.ly/slack"><img src="https://img.shields.io/badge/join-us%20on%20slack-gray.svg?longCache=true&logo=slack&colorB=brightgreen" alt="Slack Widget"></a> + <br> + <a href="https://travis-ci.org/pion/sctp"><img src="https://travis-ci.org/pion/sctp.svg?branch=master" alt="Build Status"></a> + <a href="https://pkg.go.dev/github.com/pion/sctp"><img src="https://godoc.org/github.com/pion/sctp?status.svg" alt="GoDoc"></a> + <a href="https://codecov.io/gh/pion/sctp"><img src="https://codecov.io/gh/pion/sctp/branch/master/graph/badge.svg" alt="Coverage Status"></a> + <a href="https://goreportcard.com/report/github.com/pion/sctp"><img src="https://goreportcard.com/badge/github.com/pion/sctp" alt="Go Report Card"></a> + <!--<a href="https://www.codacy.com/app/Sean-Der/webrtc"><img src="https://api.codacy.com/project/badge/Grade/18f4aec384894e6aac0b94effe51961d" alt="Codacy Badge"></a>--> + <a href="LICENSE"><img src="https://img.shields.io/badge/License-MIT-yellow.svg" alt="License: MIT"></a> +</p> +<br> + +See [DESIGN.md](DESIGN.md) for an overview of features and future goals. + +### Roadmap +The library is used as a part of our WebRTC implementation. Please refer to that [roadmap](https://github.com/pion/webrtc/issues/9) to track our major milestones. + +### Community +Pion has an active community on the [Golang Slack](https://invite.slack.golangbridge.org/). Sign up and join the **#pion** channel for discussions and support. You can also use [Pion mailing list](https://groups.google.com/forum/#!forum/pion). + +We are always looking to support **your projects**. Please reach out if you have something to build! + +If you need commercial support or don't want to use public methods you can contact us at [team@pion.ly](mailto:team@pion.ly) + +### Contributing +Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contributing)** to join the group of amazing people making this project possible: + +* [John Bradley](https://github.com/kc5nra) - *Original Author* +* [Sean DuBois](https://github.com/Sean-Der) - *Original Author* +* [Michiel De Backker](https://github.com/backkem) - *Public API, Initialization* +* [Konstantin Itskov](https://github.com/trivigy) - *Fix documentation* +* [chenkaiC4](https://github.com/chenkaiC4) - *Fix GolangCI Linter* +* [Ronan J](https://github.com/ronanj) - *Fix PPID* +* [Michael MacDonald](https://github.com/mjmac) - *Fix races* +* [Yutaka Takeda](https://github.com/enobufs) - *PR-SCTP, Retransmissions, Congestion Control* +* [Antoine Baché](https://github.com/Antonito) - *SCTP Profiling* +* [Cecylia Bocovich](https://github.com/cohosh) - *Fix SCTP reads* +* [Hugo Arregui](https://github.com/hugoArregui) +* [Atsushi Watanabe](https://github.com/at-wat) +* [Lukas Herman](https://github.com/lherman-cs) +* [Luke Curley](https://github.com/kixelated) - *Performance* +* [Aaron France](https://github.com/AeroNotix) +* [ZHENK](https://github.com/scorpionknifes) + +### License +MIT License - see [LICENSE](LICENSE) for full text diff --git a/vendor/github.com/pion/sctp/ack_timer.go b/vendor/github.com/pion/sctp/ack_timer.go new file mode 100644 index 0000000..ba23d54 --- /dev/null +++ b/vendor/github.com/pion/sctp/ack_timer.go @@ -0,0 +1,105 @@ +package sctp + +import ( + "sync" + "time" +) + +const ( + ackInterval time.Duration = 200 * time.Millisecond +) + +// ackTimerObserver is the inteface to an ack timer observer. +type ackTimerObserver interface { + onAckTimeout() +} + +// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 +type ackTimer struct { + observer ackTimerObserver + interval time.Duration + stopFunc stopAckTimerLoop + closed bool + mutex sync.RWMutex +} + +type stopAckTimerLoop func() + +// newAckTimer creates a new acknowledgement timer used to enable delayed ack. +func newAckTimer(observer ackTimerObserver) *ackTimer { + return &ackTimer{ + observer: observer, + interval: ackInterval, + } +} + +// start starts the timer. +func (t *ackTimer) start() bool { + t.mutex.Lock() + defer t.mutex.Unlock() + + // this timer is already closed + if t.closed { + return false + } + + // this is a noop if the timer is already running + if t.stopFunc != nil { + return false + } + + cancelCh := make(chan struct{}) + + go func() { + timer := time.NewTimer(t.interval) + + select { + case <-timer.C: + t.stop() + t.observer.onAckTimeout() + case <-cancelCh: + timer.Stop() + } + }() + + t.stopFunc = func() { + close(cancelCh) + } + + return true +} + +// stops the timer. this is similar to stop() but subsequent start() call +// will fail (the timer is no longer usable) +func (t *ackTimer) stop() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if t.stopFunc != nil { + t.stopFunc() + t.stopFunc = nil + } +} + +// closes the timer. this is similar to stop() but subsequent start() call +// will fail (the timer is no longer usable) +func (t *ackTimer) close() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if t.stopFunc != nil { + t.stopFunc() + t.stopFunc = nil + } + + t.closed = true +} + +// isRunning tests if the timer is running. +// Debug purpose only +func (t *ackTimer) isRunning() bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + + return (t.stopFunc != nil) +} diff --git a/vendor/github.com/pion/sctp/association.go b/vendor/github.com/pion/sctp/association.go new file mode 100644 index 0000000..1393cb8 --- /dev/null +++ b/vendor/github.com/pion/sctp/association.go @@ -0,0 +1,2241 @@ +package sctp + +import ( + "bytes" + "fmt" + "io" + "math" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/pion/logging" + "github.com/pion/randutil" + "github.com/pkg/errors" +) + +// Use global random generator to properly seed by crypto grade random. +var ( + globalMathRandomGenerator = randutil.NewMathRandomGenerator() // nolint:gochecknoglobals + errChunk = errors.New("Abort chunk, with following errors") +) + +const ( + receiveMTU uint32 = 8192 // MTU for inbound packet (from DTLS) + initialMTU uint32 = 1228 // initial MTU for outgoing packets (to DTLS) + initialRecvBufSize uint32 = 1024 * 1024 + commonHeaderSize uint32 = 12 + dataChunkHeaderSize uint32 = 16 + defaultMaxMessageSize uint32 = 65536 +) + +// association state enums +const ( + closed uint32 = iota + cookieWait + cookieEchoed + established + shutdownAckSent + shutdownPending + shutdownReceived + shutdownSent +) + +// retransmission timer IDs +const ( + timerT1Init int = iota + timerT1Cookie + timerT3RTX + timerReconfig +) + +// ack mode (for testing) +const ( + ackModeNormal int = iota + ackModeNoDelay + ackModeAlwaysDelay +) + +// ack transmission state +const ( + ackStateIdle int = iota // ack timer is off + ackStateImmediate // ack timer is on (ack is being delayed) + ackStateDelay // will send ack immediately +) + +// other constants +const ( + acceptChSize = 16 +) + +func getAssociationStateString(a uint32) string { + switch a { + case closed: + return "Closed" + case cookieWait: + return "CookieWait" + case cookieEchoed: + return "CookieEchoed" + case established: + return "Established" + case shutdownPending: + return "ShutdownPending" + case shutdownSent: + return "ShutdownSent" + case shutdownReceived: + return "ShutdownReceived" + case shutdownAckSent: + return "ShutdownAckSent" + default: + return fmt.Sprintf("Invalid association state %d", a) + } +} + +// Association represents an SCTP association +// 13.2. Parameters Necessary per Association (i.e., the TCB) +// Peer : Tag value to be sent in every packet and is received +// Verification: in the INIT or INIT ACK chunk. +// Tag : +// +// My : Tag expected in every inbound packet and sent in the +// Verification: INIT or INIT ACK chunk. +// +// Tag : +// State : A state variable indicating what state the association +// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED, +// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED, +// : SHUTDOWN-ACK-SENT. +// +// Note: No "CLOSED" state is illustrated since if a +// association is "CLOSED" its TCB SHOULD be removed. +type Association struct { + bytesReceived uint64 + bytesSent uint64 + + lock sync.RWMutex + + netConn net.Conn + + peerVerificationTag uint32 + myVerificationTag uint32 + state uint32 + myNextTSN uint32 // nextTSN + peerLastTSN uint32 // lastRcvdTSN + minTSN2MeasureRTT uint32 // for RTT measurement + willSendForwardTSN bool + willRetransmitFast bool + willRetransmitReconfig bool + + // Reconfig + myNextRSN uint32 + reconfigs map[uint32]*chunkReconfig + reconfigRequests map[uint32]*paramOutgoingResetRequest + + // Non-RFC internal data + sourcePort uint16 + destinationPort uint16 + myMaxNumInboundStreams uint16 + myMaxNumOutboundStreams uint16 + myCookie *paramStateCookie + payloadQueue *payloadQueue + inflightQueue *payloadQueue + pendingQueue *pendingQueue + controlQueue *controlQueue + mtu uint32 + maxPayloadSize uint32 // max DATA chunk payload size + cumulativeTSNAckPoint uint32 + advancedPeerTSNAckPoint uint32 + useForwardTSN bool + + // Congestion control parameters + maxReceiveBufferSize uint32 + maxMessageSize uint32 + cwnd uint32 // my congestion window size + rwnd uint32 // calculated peer's receiver windows size + ssthresh uint32 // slow start threshold + partialBytesAcked uint32 + inFastRecovery bool + fastRecoverExitPoint uint32 + + // RTX & Ack timer + rtoMgr *rtoManager + t1Init *rtxTimer + t1Cookie *rtxTimer + t3RTX *rtxTimer + tReconfig *rtxTimer + ackTimer *ackTimer + + // Chunks stored for retransmission + storedInit *chunkInit + storedCookieEcho *chunkCookieEcho + + streams map[uint16]*Stream + acceptCh chan *Stream + readLoopCloseCh chan struct{} + awakeWriteLoopCh chan struct{} + closeWriteLoopCh chan struct{} + handshakeCompletedCh chan error + + closeWriteLoopOnce sync.Once + + // local error + silentError error + + ackState int + ackMode int // for testing + + // stats + stats *associationStats + + // per inbound packet context + delayedAckTriggered bool + immediateAckTriggered bool + + name string + log logging.LeveledLogger +} + +// Config collects the arguments to createAssociation construction into +// a single structure +type Config struct { + NetConn net.Conn + MaxReceiveBufferSize uint32 + MaxMessageSize uint32 + LoggerFactory logging.LoggerFactory +} + +// Server accepts a SCTP stream over a conn +func Server(config Config) (*Association, error) { + a := createAssociation(config) + a.init(false) + + select { + case err := <-a.handshakeCompletedCh: + if err != nil { + return nil, err + } + return a, nil + case <-a.readLoopCloseCh: + return nil, errors.Errorf("association closed before connecting") + } +} + +// Client opens a SCTP stream over a conn +func Client(config Config) (*Association, error) { + a := createAssociation(config) + a.init(true) + + select { + case err := <-a.handshakeCompletedCh: + if err != nil { + return nil, err + } + return a, nil + case <-a.readLoopCloseCh: + return nil, errors.Errorf("association closed before connecting") + } +} + +func createAssociation(config Config) *Association { + var maxReceiveBufferSize uint32 + if config.MaxReceiveBufferSize == 0 { + maxReceiveBufferSize = initialRecvBufSize + } else { + maxReceiveBufferSize = config.MaxReceiveBufferSize + } + + var maxMessageSize uint32 + if config.MaxMessageSize == 0 { + maxMessageSize = defaultMaxMessageSize + } else { + maxMessageSize = config.MaxMessageSize + } + + tsn := globalMathRandomGenerator.Uint32() + a := &Association{ + netConn: config.NetConn, + maxReceiveBufferSize: maxReceiveBufferSize, + maxMessageSize: maxMessageSize, + myMaxNumOutboundStreams: math.MaxUint16, + myMaxNumInboundStreams: math.MaxUint16, + payloadQueue: newPayloadQueue(), + inflightQueue: newPayloadQueue(), + pendingQueue: newPendingQueue(), + controlQueue: newControlQueue(), + mtu: initialMTU, + maxPayloadSize: initialMTU - (commonHeaderSize + dataChunkHeaderSize), + myVerificationTag: globalMathRandomGenerator.Uint32(), + myNextTSN: tsn, + myNextRSN: tsn, + minTSN2MeasureRTT: tsn, + state: closed, + rtoMgr: newRTOManager(), + streams: map[uint16]*Stream{}, + reconfigs: map[uint32]*chunkReconfig{}, + reconfigRequests: map[uint32]*paramOutgoingResetRequest{}, + acceptCh: make(chan *Stream, acceptChSize), + readLoopCloseCh: make(chan struct{}), + awakeWriteLoopCh: make(chan struct{}, 1), + closeWriteLoopCh: make(chan struct{}), + handshakeCompletedCh: make(chan error), + cumulativeTSNAckPoint: tsn - 1, + advancedPeerTSNAckPoint: tsn - 1, + silentError: errors.Errorf("silently discard"), + stats: &associationStats{}, + log: config.LoggerFactory.NewLogger("sctp"), + } + + a.name = fmt.Sprintf("%p", a) + + // RFC 4690 Sec 7.2.1 + // o The initial cwnd before DATA transmission or after a sufficiently + // long idle period MUST be set to min(4*MTU, max (2*MTU, 4380 + // bytes)). + a.cwnd = min32(4*a.mtu, max32(2*a.mtu, 4380)) + a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)", + a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes()) + + a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans) + a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans) + a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans) // retransmit forever + a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans) // retransmit forever + a.ackTimer = newAckTimer(a) + + return a +} + +func (a *Association) init(isClient bool) { + a.lock.Lock() + defer a.lock.Unlock() + + go a.readLoop() + go a.writeLoop() + + if isClient { + a.setState(cookieWait) + init := &chunkInit{} + init.initialTSN = a.myNextTSN + init.numOutboundStreams = a.myMaxNumOutboundStreams + init.numInboundStreams = a.myMaxNumInboundStreams + init.initiateTag = a.myVerificationTag + init.advertisedReceiverWindowCredit = a.maxReceiveBufferSize + setSupportedExtensions(&init.chunkInitCommon) + a.storedInit = init + + err := a.sendInit() + if err != nil { + a.log.Errorf("[%s] failed to send init: %s", a.name, err.Error()) + } + + a.t1Init.start(a.rtoMgr.getRTO()) + } +} + +// caller must hold a.lock +func (a *Association) sendInit() error { + a.log.Debugf("[%s] sending INIT", a.name) + if a.storedInit == nil { + return errors.Errorf("the init not stored to send") + } + + outbound := &packet{} + outbound.verificationTag = a.peerVerificationTag + a.sourcePort = 5000 // Spec?? + a.destinationPort = 5000 // Spec?? + outbound.sourcePort = a.sourcePort + outbound.destinationPort = a.destinationPort + + outbound.chunks = []chunk{a.storedInit} + + a.controlQueue.push(outbound) + a.awakeWriteLoop() + + return nil +} + +// caller must hold a.lock +func (a *Association) sendCookieEcho() error { + if a.storedCookieEcho == nil { + return errors.Errorf("cookieEcho not stored to send") + } + + a.log.Debugf("[%s] sending COOKIE-ECHO", a.name) + + outbound := &packet{} + outbound.verificationTag = a.peerVerificationTag + outbound.sourcePort = a.sourcePort + outbound.destinationPort = a.destinationPort + outbound.chunks = []chunk{a.storedCookieEcho} + + a.controlQueue.push(outbound) + a.awakeWriteLoop() + + return nil +} + +// Close ends the SCTP Association and cleans up any state +func (a *Association) Close() error { + a.log.Debugf("[%s] closing association..", a.name) + + a.setState(closed) + + err := a.netConn.Close() + + a.closeAllTimers() + + // awake writeLoop to exit + a.closeWriteLoopOnce.Do(func() { close(a.closeWriteLoopCh) }) + + // Wait for readLoop to end + <-a.readLoopCloseCh + + a.log.Debugf("[%s] association closed", a.name) + a.log.Debugf("[%s] stats nDATAs (in) : %d", a.name, a.stats.getNumDATAs()) + a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKs()) + a.log.Debugf("[%s] stats nT3Timeouts : %d", a.name, a.stats.getNumT3Timeouts()) + a.log.Debugf("[%s] stats nAckTimeouts: %d", a.name, a.stats.getNumAckTimeouts()) + a.log.Debugf("[%s] stats nFastRetrans: %d", a.name, a.stats.getNumFastRetrans()) + return err +} + +func (a *Association) closeAllTimers() { + // Close all retransmission & ack timers + a.t1Init.close() + a.t1Cookie.close() + a.t3RTX.close() + a.tReconfig.close() + a.ackTimer.close() +} + +func (a *Association) readLoop() { + var closeErr error + defer func() { + // also stop writeLoop, otherwise writeLoop can be leaked + // if connection is lost when there is no writing packet. + a.closeWriteLoopOnce.Do(func() { close(a.closeWriteLoopCh) }) + + a.lock.Lock() + for _, s := range a.streams { + a.unregisterStream(s, closeErr) + } + a.lock.Unlock() + close(a.acceptCh) + close(a.readLoopCloseCh) + }() + + a.log.Debugf("[%s] readLoop entered", a.name) + buffer := make([]byte, receiveMTU) + + for { + n, err := a.netConn.Read(buffer) + if err != nil { + closeErr = err + break + } + // Make a buffer sized to what we read, then copy the data we + // read from the underlying transport. We do this because the + // user data is passed to the reassembly queue without + // copying. + inbound := make([]byte, n) + copy(inbound, buffer[:n]) + atomic.AddUint64(&a.bytesReceived, uint64(n)) + if err = a.handleInbound(inbound); err != nil { + closeErr = err + break + } + } + + a.log.Debugf("[%s] readLoop exited %s", a.name, closeErr) +} + +func (a *Association) writeLoop() { + a.log.Debugf("[%s] writeLoop entered", a.name) + +loop: + for { + rawPackets := a.gatherOutbound() + + for _, raw := range rawPackets { + _, err := a.netConn.Write(raw) + if err != nil { + if err != io.EOF { + a.log.Warnf("[%s] failed to write packets on netConn: %v", a.name, err) + } + a.log.Debugf("[%s] writeLoop ended", a.name) + break loop + } + atomic.AddUint64(&a.bytesSent, uint64(len(raw))) + } + + select { + case <-a.awakeWriteLoopCh: + case <-a.closeWriteLoopCh: + break loop + } + } + + a.setState(closed) + a.closeAllTimers() + + a.log.Debugf("[%s] writeLoop exited", a.name) +} + +func (a *Association) awakeWriteLoop() { + select { + case a.awakeWriteLoopCh <- struct{}{}: + default: + } +} + +// unregisterStream un-registers a stream from the association +// The caller should hold the association write lock. +func (a *Association) unregisterStream(s *Stream, err error) { + s.lock.Lock() + defer s.lock.Unlock() + + delete(a.streams, s.streamIdentifier) + s.readErr = err + s.readNotifier.Broadcast() +} + +// handleInbound parses incoming raw packets +func (a *Association) handleInbound(raw []byte) error { + p := &packet{} + if err := p.unmarshal(raw); err != nil { + a.log.Warnf("[%s] unable to parse SCTP packet %s", a.name, err) + return nil + } + + if err := checkPacket(p); err != nil { + a.log.Warnf("[%s] failed validating packet %s", a.name, err) + return nil + } + + a.handleChunkStart() + + for _, c := range p.chunks { + if err := a.handleChunk(p, c); err != nil { + return err + } + } + + a.handleChunkEnd() + + return nil +} + +// The caller should hold the lock +func (a *Association) gatherOutboundDataAndReconfigPackets(rawPackets [][]byte) [][]byte { + for _, p := range a.getDataPacketsToRetransmit() { + raw, err := p.marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a DATA packet to be retransmitted", a.name) + continue + } + rawPackets = append(rawPackets, raw) + } + + // Pop unsent data chunks from the pending queue to send as much as + // cwnd and rwnd allow. + chunks, sisToReset := a.popPendingDataChunksToSend() + if len(chunks) > 0 { + // Start timer. (noop if already started) + a.log.Tracef("[%s] T3-rtx timer start (pt1)", a.name) + a.t3RTX.start(a.rtoMgr.getRTO()) + for _, p := range a.bundleDataChunksIntoPackets(chunks) { + raw, err := p.marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a DATA packet", a.name) + continue + } + rawPackets = append(rawPackets, raw) + } + } + + if len(sisToReset) > 0 || a.willRetransmitReconfig { + if a.willRetransmitReconfig { + a.willRetransmitReconfig = false + a.log.Debugf("[%s] retransmit %d RECONFIG chunk(s)", a.name, len(a.reconfigs)) + for _, c := range a.reconfigs { + p := a.createPacket([]chunk{c}) + raw, err := p.marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a RECONFIG packet to be retransmitted", a.name) + } else { + rawPackets = append(rawPackets, raw) + } + } + } + + if len(sisToReset) > 0 { + rsn := a.generateNextRSN() + tsn := a.myNextTSN - 1 + c := &chunkReconfig{ + paramA: ¶mOutgoingResetRequest{ + reconfigRequestSequenceNumber: rsn, + senderLastTSN: tsn, + streamIdentifiers: sisToReset, + }, + } + a.reconfigs[rsn] = c // store in the map for retransmission + a.log.Debugf("[%s] sending RECONFIG: rsn=%d tsn=%d streams=%v", + a.name, rsn, a.myNextTSN-1, sisToReset) + p := a.createPacket([]chunk{c}) + raw, err := p.marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a RECONFIG packet to be transmitted", a.name) + } else { + rawPackets = append(rawPackets, raw) + } + } + + if len(a.reconfigs) > 0 { + a.tReconfig.start(a.rtoMgr.getRTO()) + } + } + + return rawPackets +} + +// The caller should hold the lock +func (a *Association) gatherOutboundFrastRetransmissionPackets(rawPackets [][]byte) [][]byte { + if a.willRetransmitFast { + a.willRetransmitFast = false + + toFastRetrans := []chunk{} + fastRetransSize := commonHeaderSize + + for i := 0; ; i++ { + c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + uint32(i) + 1) + if !ok { + break // end of pending data + } + + if c.acked || c.abandoned() { + continue + } + + if c.nSent > 1 || c.missIndicator < 3 { + continue + } + + // RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports + // 3) Determine how many of the earliest (i.e., lowest TSN) DATA chunks + // marked for retransmission will fit into a single packet, subject + // to constraint of the path MTU of the destination transport + // address to which the packet is being sent. Call this value K. + // Retransmit those K DATA chunks in a single packet. When a Fast + // Retransmit is being performed, the sender SHOULD ignore the value + // of cwnd and SHOULD NOT delay retransmission for this single + // packet. + + dataChunkSize := dataChunkHeaderSize + uint32(len(c.userData)) + if a.mtu < fastRetransSize+dataChunkSize { + break + } + + fastRetransSize += dataChunkSize + a.stats.incFastRetrans() + c.nSent++ + a.checkPartialReliabilityStatus(c) + toFastRetrans = append(toFastRetrans, c) + a.log.Tracef("[%s] fast-retransmit: tsn=%d sent=%d htna=%d", + a.name, c.tsn, c.nSent, a.fastRecoverExitPoint) + } + + if len(toFastRetrans) > 0 { + raw, err := a.createPacket(toFastRetrans).marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a DATA packet to be fast-retransmitted", a.name) + } else { + rawPackets = append(rawPackets, raw) + } + } + } + + return rawPackets +} + +// The caller should hold the lock +func (a *Association) gatherOutboundSackPackets(rawPackets [][]byte) [][]byte { + if a.ackState == ackStateImmediate { + a.ackState = ackStateIdle + sack := a.createSelectiveAckChunk() + a.log.Debugf("[%s] sending SACK: %s", a.name, sack.String()) + raw, err := a.createPacket([]chunk{sack}).marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a SACK packet", a.name) + } else { + rawPackets = append(rawPackets, raw) + } + } + + return rawPackets +} + +// The caller should hold the lock +func (a *Association) gatherOutboundForwardTSNPackets(rawPackets [][]byte) [][]byte { + if a.willSendForwardTSN { + a.willSendForwardTSN = false + if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) { + fwdtsn := a.createForwardTSN() + raw, err := a.createPacket([]chunk{fwdtsn}).marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a Forward TSN packet", a.name) + } else { + rawPackets = append(rawPackets, raw) + } + } + } + + return rawPackets +} + +// gatherOutbound gathers outgoing packets +func (a *Association) gatherOutbound() [][]byte { + a.lock.Lock() + defer a.lock.Unlock() + + rawPackets := [][]byte{} + + if a.controlQueue.size() > 0 { + for _, p := range a.controlQueue.popAll() { + raw, err := p.marshal() + if err != nil { + a.log.Warnf("[%s] failed to serialize a control packet", a.name) + continue + } + rawPackets = append(rawPackets, raw) + } + } + + state := a.getState() + + if state == established { + rawPackets = a.gatherOutboundDataAndReconfigPackets(rawPackets) + rawPackets = a.gatherOutboundFrastRetransmissionPackets(rawPackets) + rawPackets = a.gatherOutboundSackPackets(rawPackets) + rawPackets = a.gatherOutboundForwardTSNPackets(rawPackets) + } + + return rawPackets +} + +func checkPacket(p *packet) error { + // All packets must adhere to these rules + + // This is the SCTP sender's port number. It can be used by the + // receiver in combination with the source IP address, the SCTP + // destination port, and possibly the destination IP address to + // identify the association to which this packet belongs. The port + // number 0 MUST NOT be used. + if p.sourcePort == 0 { + return errors.Errorf("sctp packet must not have a source port of 0") + } + + // This is the SCTP port number to which this packet is destined. + // The receiving host will use this port number to de-multiplex the + // SCTP packet to the correct receiving endpoint/application. The + // port number 0 MUST NOT be used. + if p.destinationPort == 0 { + return errors.Errorf("sctp packet must not have a destination port of 0") + } + + // Check values on the packet that are specific to a particular chunk type + for _, c := range p.chunks { + switch c.(type) { // nolint:gocritic + case *chunkInit: + // An INIT or INIT ACK chunk MUST NOT be bundled with any other chunk. + // They MUST be the only chunks present in the SCTP packets that carry + // them. + if len(p.chunks) != 1 { + return errors.Errorf("init chunk must not be bundled with any other chunk") + } + + // A packet containing an INIT chunk MUST have a zero Verification + // Tag. + if p.verificationTag != 0 { + return errors.Errorf("init chunk expects a verification tag of 0 on the packet when out-of-the-blue") + } + } + } + + return nil +} + +func min16(a, b uint16) uint16 { + if a < b { + return a + } + return b +} + +func max32(a, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func min32(a, b uint32) uint32 { + if a < b { + return a + } + return b +} + +// setState atomically sets the state of the Association. +// The caller should hold the lock. +func (a *Association) setState(newState uint32) { + oldState := atomic.SwapUint32(&a.state, newState) + if newState != oldState { + a.log.Debugf("[%s] state change: '%s' => '%s'", + a.name, + getAssociationStateString(oldState), + getAssociationStateString(newState)) + } +} + +// getState atomically returns the state of the Association. +func (a *Association) getState() uint32 { + return atomic.LoadUint32(&a.state) +} + +// BytesSent returns the number of bytes sent +func (a *Association) BytesSent() uint64 { + return atomic.LoadUint64(&a.bytesSent) +} + +// BytesReceived returns the number of bytes received +func (a *Association) BytesReceived() uint64 { + return atomic.LoadUint64(&a.bytesReceived) +} + +func setSupportedExtensions(init *chunkInitCommon) { + // nolint:godox + // TODO RFC5061 https://tools.ietf.org/html/rfc6525#section-5.2 + // An implementation supporting this (Supported Extensions Parameter) + // extension MUST list the ASCONF, the ASCONF-ACK, and the AUTH chunks + // in its INIT and INIT-ACK parameters. + init.params = append(init.params, ¶mSupportedExtensions{ + ChunkTypes: []chunkType{ctReconfig, ctForwardTSN}, + }) +} + +// The caller should hold the lock. +func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) { + state := a.getState() + a.log.Debugf("[%s] chunkInit received in state '%s'", a.name, getAssociationStateString(state)) + + // https://tools.ietf.org/html/rfc4960#section-5.2.1 + // Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST + // respond with an INIT ACK using the same parameters it sent in its + // original INIT chunk (including its Initiate Tag, unchanged). When + // responding, the endpoint MUST send the INIT ACK back to the same + // address that the original INIT (sent by this endpoint) was sent. + + if state != closed && state != cookieWait && state != cookieEchoed { + // 5.2.2. Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED, + // COOKIE-WAIT, and SHUTDOWN-ACK-SENT + return nil, errors.Errorf("todo: handle Init when in state %s", getAssociationStateString(state)) + } + + // Should we be setting any of these permanently until we've ACKed further? + a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams) + a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams) + a.peerVerificationTag = i.initiateTag + a.sourcePort = p.destinationPort + a.destinationPort = p.sourcePort + + // 13.2 This is the last TSN received in sequence. This value + // is set initially by taking the peer's initial TSN, + // received in the INIT or INIT ACK chunk, and + // subtracting one from it. + a.peerLastTSN = i.initialTSN - 1 + + for _, param := range i.params { + switch v := param.(type) { // nolint:gocritic + case *paramSupportedExtensions: + for _, t := range v.ChunkTypes { + if t == ctForwardTSN { + a.log.Debugf("[%s] use ForwardTSN (on init)\n", a.name) + a.useForwardTSN = true + } + } + } + } + if !a.useForwardTSN { + a.log.Warnf("[%s] not using ForwardTSN (on init)\n", a.name) + } + + outbound := &packet{} + outbound.verificationTag = a.peerVerificationTag + outbound.sourcePort = a.sourcePort + outbound.destinationPort = a.destinationPort + + initAck := &chunkInitAck{} + + initAck.initialTSN = a.myNextTSN + initAck.numOutboundStreams = a.myMaxNumOutboundStreams + initAck.numInboundStreams = a.myMaxNumInboundStreams + initAck.initiateTag = a.myVerificationTag + initAck.advertisedReceiverWindowCredit = a.maxReceiveBufferSize + + if a.myCookie == nil { + var err error + if a.myCookie, err = newRandomStateCookie(); err != nil { + return nil, err + } + } + + initAck.params = []param{a.myCookie} + + setSupportedExtensions(&initAck.chunkInitCommon) + + outbound.chunks = []chunk{initAck} + + return pack(outbound), nil +} + +// The caller should hold the lock. +func (a *Association) handleInitAck(p *packet, i *chunkInitAck) error { + state := a.getState() + a.log.Debugf("[%s] chunkInitAck received in state '%s'", a.name, getAssociationStateString(state)) + if state != cookieWait { + // RFC 4960 + // 5.2.3. Unexpected INIT ACK + // If an INIT ACK is received by an endpoint in any state other than the + // COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk. + // An unexpected INIT ACK usually indicates the processing of an old or + // duplicated INIT chunk. + return nil + } + + a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams) + a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams) + a.peerVerificationTag = i.initiateTag + a.peerLastTSN = i.initialTSN - 1 + if a.sourcePort != p.destinationPort || + a.destinationPort != p.sourcePort { + a.log.Warnf("[%s] handleInitAck: port mismatch", a.name) + return nil + } + + a.rwnd = i.advertisedReceiverWindowCredit + a.log.Debugf("[%s] initial rwnd=%d", a.name, a.rwnd) + + // RFC 4690 Sec 7.2.1 + // o The initial value of ssthresh MAY be arbitrarily high (for + // example, implementations MAY use the size of the receiver + // advertised window). + a.ssthresh = a.rwnd + a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)", + a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes()) + + a.t1Init.stop() + a.storedInit = nil + + var cookieParam *paramStateCookie + for _, param := range i.params { + switch v := param.(type) { + case *paramStateCookie: + cookieParam = v + case *paramSupportedExtensions: + for _, t := range v.ChunkTypes { + if t == ctForwardTSN { + a.log.Debugf("[%s] use ForwardTSN (on initAck)\n", a.name) + a.useForwardTSN = true + } + } + } + } + if !a.useForwardTSN { + a.log.Warnf("[%s] not using ForwardTSN (on initAck)\n", a.name) + } + if cookieParam == nil { + return errors.Errorf("no cookie in InitAck") + } + + a.storedCookieEcho = &chunkCookieEcho{} + a.storedCookieEcho.cookie = cookieParam.cookie + + err := a.sendCookieEcho() + if err != nil { + a.log.Errorf("[%s] failed to send init: %s", a.name, err.Error()) + } + + a.t1Cookie.start(a.rtoMgr.getRTO()) + a.setState(cookieEchoed) + return nil +} + +// The caller should hold the lock. +func (a *Association) handleHeartbeat(c *chunkHeartbeat) []*packet { + a.log.Tracef("[%s] chunkHeartbeat", a.name) + hbi, ok := c.params[0].(*paramHeartbeatInfo) + if !ok { + a.log.Warnf("[%s] failed to handle Heartbeat, no ParamHeartbeatInfo", a.name) + } + + return pack(&packet{ + verificationTag: a.peerVerificationTag, + sourcePort: a.sourcePort, + destinationPort: a.destinationPort, + chunks: []chunk{&chunkHeartbeatAck{ + params: []param{ + ¶mHeartbeatInfo{ + heartbeatInformation: hbi.heartbeatInformation, + }, + }, + }}, + }) +} + +// The caller should hold the lock. +func (a *Association) handleCookieEcho(c *chunkCookieEcho) []*packet { + state := a.getState() + a.log.Debugf("[%s] COOKIE-ECHO received in state '%s'", a.name, getAssociationStateString(state)) + switch state { + default: + return nil + case established: + if !bytes.Equal(a.myCookie.cookie, c.cookie) { + return nil + } + case closed, cookieWait, cookieEchoed: + if !bytes.Equal(a.myCookie.cookie, c.cookie) { + return nil + } + + a.t1Init.stop() + a.storedInit = nil + + a.t1Cookie.stop() + a.storedCookieEcho = nil + + a.setState(established) + a.handshakeCompletedCh <- nil + } + + p := &packet{ + verificationTag: a.peerVerificationTag, + sourcePort: a.sourcePort, + destinationPort: a.destinationPort, + chunks: []chunk{&chunkCookieAck{}}, + } + return pack(p) +} + +// The caller should hold the lock. +func (a *Association) handleCookieAck() { + state := a.getState() + a.log.Debugf("[%s] COOKIE-ACK received in state '%s'", a.name, getAssociationStateString(state)) + if state != cookieEchoed { + // RFC 4960 + // 5.2.5. Handle Duplicate COOKIE-ACK. + // At any state other than COOKIE-ECHOED, an endpoint should silently + // discard a received COOKIE ACK chunk. + return + } + + a.t1Cookie.stop() + a.storedCookieEcho = nil + + a.setState(established) + a.handshakeCompletedCh <- nil +} + +// The caller should hold the lock. +func (a *Association) handleData(d *chunkPayloadData) []*packet { + a.log.Tracef("[%s] DATA: tsn=%d immediateSack=%v len=%d", + a.name, d.tsn, d.immediateSack, len(d.userData)) + a.stats.incDATAs() + + canPush := a.payloadQueue.canPush(d, a.peerLastTSN) + if canPush { + s := a.getOrCreateStream(d.streamIdentifier) + if s == nil { + // silentely discard the data. (sender will retry on T3-rtx timeout) + // see pion/sctp#30 + a.log.Debugf("discard %d", d.streamSequenceNumber) + return nil + } + + if a.getMyReceiverWindowCredit() > 0 { + // Pass the new chunk to stream level as soon as it arrives + a.payloadQueue.push(d, a.peerLastTSN) + s.handleData(d) + } else { + // Receive buffer is full + lastTSN, ok := a.payloadQueue.getLastTSNReceived() + if ok && sna32LT(d.tsn, lastTSN) { + a.log.Debugf("[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber) + a.payloadQueue.push(d, a.peerLastTSN) + s.handleData(d) + } else { + a.log.Debugf("[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber) + } + } + } + + return a.handlePeerLastTSNAndAcknowledgement(d.immediateSack) +} + +// A common routine for handleData and handleForwardTSN routines +// The caller should hold the lock. +func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool) []*packet { + var reply []*packet + + // Try to advance peerLastTSN + + // From RFC 3758 Sec 3.6: + // .. and then MUST further advance its cumulative TSN point locally + // if possible + // Meaning, if peerLastTSN+1 points to a chunk that is received, + // advance peerLastTSN until peerLastTSN+1 points to unreceived chunk. + for { + if _, popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk { + break + } + a.peerLastTSN++ + + for _, rstReq := range a.reconfigRequests { + resp := a.resetStreamsIfAny(rstReq) + if resp != nil { + a.log.Debugf("[%s] RESET RESPONSE: %+v", a.name, resp) + reply = append(reply, resp) + } + } + } + + hasPacketLoss := (a.payloadQueue.size() > 0) + if hasPacketLoss { + a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString(a.peerLastTSN)) + } + + if (a.ackState != ackStateImmediate && !sackImmediately && !hasPacketLoss && a.ackMode == ackModeNormal) || a.ackMode == ackModeAlwaysDelay { + if a.ackState == ackStateIdle { + a.delayedAckTriggered = true + } else { + a.immediateAckTriggered = true + } + } else { + a.immediateAckTriggered = true + } + + return reply +} + +// The caller should hold the lock. +func (a *Association) getMyReceiverWindowCredit() uint32 { + var bytesQueued uint32 + for _, s := range a.streams { + bytesQueued += uint32(s.getNumBytesInReassemblyQueue()) + } + + if bytesQueued >= a.maxReceiveBufferSize { + return 0 + } + return a.maxReceiveBufferSize - bytesQueued +} + +// OpenStream opens a stream +func (a *Association) OpenStream(streamIdentifier uint16, defaultPayloadType PayloadProtocolIdentifier) (*Stream, error) { + a.lock.Lock() + defer a.lock.Unlock() + + if _, ok := a.streams[streamIdentifier]; ok { + return nil, errors.Errorf("there already exists a stream with identifier %d", streamIdentifier) + } + + s := a.createStream(streamIdentifier, false) + s.setDefaultPayloadType(defaultPayloadType) + + return s, nil +} + +// AcceptStream accepts a stream +func (a *Association) AcceptStream() (*Stream, error) { + s, ok := <-a.acceptCh + if !ok { + return nil, io.EOF // no more incoming streams + } + return s, nil +} + +// createStream creates a stream. The caller should hold the lock and check no stream exists for this id. +func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream { + s := &Stream{ + association: a, + streamIdentifier: streamIdentifier, + reassemblyQueue: newReassemblyQueue(streamIdentifier), + log: a.log, + name: fmt.Sprintf("%d:%s", streamIdentifier, a.name), + } + + s.readNotifier = sync.NewCond(&s.lock) + + if accept { + select { + case a.acceptCh <- s: + a.streams[streamIdentifier] = s + a.log.Debugf("[%s] accepted a new stream (streamIdentifier: %d)", + a.name, streamIdentifier) + default: + a.log.Debugf("[%s] dropped a new stream (acceptCh size: %d)", + a.name, len(a.acceptCh)) + return nil + } + } else { + a.streams[streamIdentifier] = s + } + + return s +} + +// getOrCreateStream gets or creates a stream. The caller should hold the lock. +func (a *Association) getOrCreateStream(streamIdentifier uint16) *Stream { + if s, ok := a.streams[streamIdentifier]; ok { + return s + } + + return a.createStream(streamIdentifier, true) +} + +// The caller should hold the lock. +func (a *Association) processSelectiveAck(d *chunkSelectiveAck) (map[uint16]int, uint32, error) { // nolint:gocognit + bytesAckedPerStream := map[uint16]int{} + + // New ack point, so pop all ACKed packets from inflightQueue + // We add 1 because the "currentAckPoint" has already been popped from the inflight queue + // For the first SACK we take care of this by setting the ackpoint to cumAck - 1 + for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, d.cumulativeTSNAck); i++ { + c, ok := a.inflightQueue.pop(i) + if !ok { + return nil, 0, errors.Errorf("tsn %v unable to be popped from inflight queue", i) + } + + if !c.acked { + // RFC 4096 sec 6.3.2. Retransmission Timer Rules + // R3) Whenever a SACK is received that acknowledges the DATA chunk + // with the earliest outstanding TSN for that address, restart the + // T3-rtx timer for that address with its current RTO (if there is + // still outstanding data on that address). + if i == a.cumulativeTSNAckPoint+1 { + // T3 timer needs to be reset. Stop it for now. + a.t3RTX.stop() + } + + nBytesAcked := len(c.userData) + + // Sum the number of bytes acknowledged per stream + if amount, ok := bytesAckedPerStream[c.streamIdentifier]; ok { + bytesAckedPerStream[c.streamIdentifier] = amount + nBytesAcked + } else { + bytesAckedPerStream[c.streamIdentifier] = nBytesAcked + } + + // RFC 4960 sec 6.3.1. RTO Calculation + // C4) When data is in flight and when allowed by rule C5 below, a new + // RTT measurement MUST be made each round trip. Furthermore, new + // RTT measurements SHOULD be made no more than once per round trip + // for a given destination transport address. + // C5) Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is + // ambiguous whether the reply was for the first instance of the + // chunk or for a later instance) + if c.nSent == 1 && sna32GTE(c.tsn, a.minTSN2MeasureRTT) { + a.minTSN2MeasureRTT = a.myNextTSN + rtt := time.Since(c.since).Seconds() * 1000.0 + srtt := a.rtoMgr.setNewRTT(rtt) + a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f", + a.name, rtt, srtt, a.rtoMgr.getRTO()) + } + } + + if a.inFastRecovery && c.tsn == a.fastRecoverExitPoint { + a.log.Debugf("[%s] exit fast-recovery", a.name) + a.inFastRecovery = false + } + } + + htna := d.cumulativeTSNAck + + // Mark selectively acknowledged chunks as "acked" + for _, g := range d.gapAckBlocks { + for i := g.start; i <= g.end; i++ { + tsn := d.cumulativeTSNAck + uint32(i) + c, ok := a.inflightQueue.get(tsn) + if !ok { + return nil, 0, errors.Errorf("requested non-existent TSN %v", tsn) + } + + if !c.acked { + nBytesAcked := a.inflightQueue.markAsAcked(tsn) + + // Sum the number of bytes acknowledged per stream + if amount, ok := bytesAckedPerStream[c.streamIdentifier]; ok { + bytesAckedPerStream[c.streamIdentifier] = amount + nBytesAcked + } else { + bytesAckedPerStream[c.streamIdentifier] = nBytesAcked + } + + a.log.Tracef("[%s] tsn=%d has been sacked", a.name, c.tsn) + + if c.nSent == 1 { + a.minTSN2MeasureRTT = a.myNextTSN + rtt := time.Since(c.since).Seconds() * 1000.0 + srtt := a.rtoMgr.setNewRTT(rtt) + a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f", + a.name, rtt, srtt, a.rtoMgr.getRTO()) + } + + if sna32LT(htna, tsn) { + htna = tsn + } + } + } + } + + return bytesAckedPerStream, htna, nil +} + +// The caller should hold the lock. +func (a *Association) onCumulativeTSNAckPointAdvanced(totalBytesAcked int) { + // RFC 4096, sec 6.3.2. Retransmission Timer Rules + // R2) Whenever all outstanding data sent to an address have been + // acknowledged, turn off the T3-rtx timer of that address. + if a.inflightQueue.size() == 0 { + a.log.Tracef("[%s] SACK: no more packet in-flight (pending=%d)", a.name, a.pendingQueue.size()) + a.t3RTX.stop() + } else { + a.log.Tracef("[%s] T3-rtx timer start (pt2)", a.name) + a.t3RTX.start(a.rtoMgr.getRTO()) + } + + // Update congestion control parameters + if a.cwnd <= a.ssthresh { + // RFC 4096, sec 7.2.1. Slow-Start + // o When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST + // use the slow-start algorithm to increase cwnd only if the current + // congestion window is being fully utilized, an incoming SACK + // advances the Cumulative TSN Ack Point, and the data sender is not + // in Fast Recovery. Only when these three conditions are met can + // the cwnd be increased; otherwise, the cwnd MUST not be increased. + // If these conditions are met, then cwnd MUST be increased by, at + // most, the lesser of 1) the total size of the previously + // outstanding DATA chunk(s) acknowledged, and 2) the destination's + // path MTU. + if !a.inFastRecovery && + a.pendingQueue.size() > 0 { + a.cwnd += min32(uint32(totalBytesAcked), a.cwnd) // TCP way + // a.cwnd += min32(uint32(totalBytesAcked), a.mtu) // SCTP way (slow) + a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (SS)", + a.name, a.cwnd, a.ssthresh, totalBytesAcked) + } else { + a.log.Tracef("[%s] cwnd did not grow: cwnd=%d ssthresh=%d acked=%d FR=%v pending=%d", + a.name, a.cwnd, a.ssthresh, totalBytesAcked, a.inFastRecovery, a.pendingQueue.size()) + } + } else { + // RFC 4096, sec 7.2.2. Congestion Avoidance + // o Whenever cwnd is greater than ssthresh, upon each SACK arrival + // that advances the Cumulative TSN Ack Point, increase + // partial_bytes_acked by the total number of bytes of all new chunks + // acknowledged in that SACK including chunks acknowledged by the new + // Cumulative TSN Ack and by Gap Ack Blocks. + a.partialBytesAcked += uint32(totalBytesAcked) + + // o When partial_bytes_acked is equal to or greater than cwnd and + // before the arrival of the SACK the sender had cwnd or more bytes + // of data outstanding (i.e., before arrival of the SACK, flight size + // was greater than or equal to cwnd), increase cwnd by MTU, and + // reset partial_bytes_acked to (partial_bytes_acked - cwnd). + if a.partialBytesAcked >= a.cwnd && a.pendingQueue.size() > 0 { + a.partialBytesAcked -= a.cwnd + a.cwnd += a.mtu + a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (CA)", + a.name, a.cwnd, a.ssthresh, totalBytesAcked) + } + } +} + +// The caller should hold the lock. +func (a *Association) processFastRetransmission(cumTSNAckPoint, htna uint32, cumTSNAckPointAdvanced bool) error { + // HTNA algorithm - RFC 4960 Sec 7.2.4 + // Increment missIndicator of each chunks that the SACK reported missing + // when either of the following is met: + // a) Not in fast-recovery + // miss indications are incremented only for missing TSNs prior to the + // highest TSN newly acknowledged in the SACK. + // b) In fast-recovery AND the Cumulative TSN Ack Point advanced + // the miss indications are incremented for all TSNs reported missing + // in the SACK. + if !a.inFastRecovery || (a.inFastRecovery && cumTSNAckPointAdvanced) { + var maxTSN uint32 + if !a.inFastRecovery { + // a) increment only for missing TSNs prior to the HTNA + maxTSN = htna + } else { + // b) increment for all TSNs reported missing + maxTSN = cumTSNAckPoint + uint32(a.inflightQueue.size()) + 1 + } + + for tsn := cumTSNAckPoint + 1; sna32LT(tsn, maxTSN); tsn++ { + c, ok := a.inflightQueue.get(tsn) + if !ok { + return errors.Errorf("requested non-existent TSN %v", tsn) + } + if !c.acked && !c.abandoned() && c.missIndicator < 3 { + c.missIndicator++ + if c.missIndicator == 3 { + if !a.inFastRecovery { + // 2) If not in Fast Recovery, adjust the ssthresh and cwnd of the + // destination address(es) to which the missing DATA chunks were + // last sent, according to the formula described in Section 7.2.3. + a.inFastRecovery = true + a.fastRecoverExitPoint = htna + a.ssthresh = max32(a.cwnd/2, 4*a.mtu) + a.cwnd = a.ssthresh + a.partialBytesAcked = 0 + a.willRetransmitFast = true + + a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (FR)", + a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes()) + } + } + } + } + } + + if a.inFastRecovery && cumTSNAckPointAdvanced { + a.willRetransmitFast = true + } + + return nil +} + +// The caller should hold the lock. +func (a *Association) handleSack(d *chunkSelectiveAck) error { + a.log.Tracef("[%s] SACK: cumTSN=%d a_rwnd=%d", a.name, d.cumulativeTSNAck, d.advertisedReceiverWindowCredit) + state := a.getState() + if state != established { + return nil + } + + a.stats.incSACKs() + + if sna32GT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) { + // RFC 4960 sec 6.2.1. Processing a Received SACK + // D) + // i) If Cumulative TSN Ack is less than the Cumulative TSN Ack + // Point, then drop the SACK. Since Cumulative TSN Ack is + // monotonically increasing, a SACK whose Cumulative TSN Ack is + // less than the Cumulative TSN Ack Point indicates an out-of- + // order SACK. + + a.log.Debugf("[%s] SACK Cumulative ACK %v is older than ACK point %v", + a.name, + d.cumulativeTSNAck, + a.cumulativeTSNAckPoint) + + return nil + } + + // Process selective ack + bytesAckedPerStream, htna, err := a.processSelectiveAck(d) + if err != nil { + return err + } + + var totalBytesAcked int + for _, nBytesAcked := range bytesAckedPerStream { + totalBytesAcked += nBytesAcked + } + + cumTSNAckPointAdvanced := false + if sna32LT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) { + a.log.Tracef("[%s] SACK: cumTSN advanced: %d -> %d", + a.name, + a.cumulativeTSNAckPoint, + d.cumulativeTSNAck) + + a.cumulativeTSNAckPoint = d.cumulativeTSNAck + cumTSNAckPointAdvanced = true + a.onCumulativeTSNAckPointAdvanced(totalBytesAcked) + } + + for si, nBytesAcked := range bytesAckedPerStream { + if s, ok := a.streams[si]; ok { + a.lock.Unlock() + s.onBufferReleased(nBytesAcked) + a.lock.Lock() + } + } + + // New rwnd value + // RFC 4960 sec 6.2.1. Processing a Received SACK + // D) + // ii) Set rwnd equal to the newly received a_rwnd minus the number + // of bytes still outstanding after processing the Cumulative + // TSN Ack and the Gap Ack Blocks. + + // bytes acked were already subtracted by markAsAcked() method + bytesOutstanding := uint32(a.inflightQueue.getNumBytes()) + if bytesOutstanding >= d.advertisedReceiverWindowCredit { + a.rwnd = 0 + } else { + a.rwnd = d.advertisedReceiverWindowCredit - bytesOutstanding + } + + err = a.processFastRetransmission(d.cumulativeTSNAck, htna, cumTSNAckPointAdvanced) + if err != nil { + return err + } + + if a.useForwardTSN { + // RFC 3758 Sec 3.5 C1 + if sna32LT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) { + a.advancedPeerTSNAckPoint = a.cumulativeTSNAckPoint + } + + // RFC 3758 Sec 3.5 C2 + for i := a.advancedPeerTSNAckPoint + 1; ; i++ { + c, ok := a.inflightQueue.get(i) + if !ok { + break + } + if !c.abandoned() { + break + } + a.advancedPeerTSNAckPoint = i + } + + // RFC 3758 Sec 3.5 C3 + if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) { + a.willSendForwardTSN = true + } + a.awakeWriteLoop() + } + + if a.inflightQueue.size() > 0 { + // Start timer. (noop if already started) + a.log.Tracef("[%s] T3-rtx timer start (pt3)", a.name) + a.t3RTX.start(a.rtoMgr.getRTO()) + } + + if cumTSNAckPointAdvanced { + a.awakeWriteLoop() + } + + return nil +} + +// createForwardTSN generates ForwardTSN chunk. +// This method will be be called if useForwardTSN is set to false. +// The caller should hold the lock. +func (a *Association) createForwardTSN() *chunkForwardTSN { + // RFC 3758 Sec 3.5 C4 + streamMap := map[uint16]uint16{} // to report only once per SI + for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, a.advancedPeerTSNAckPoint); i++ { + c, ok := a.inflightQueue.get(i) + if !ok { + break + } + + ssn, ok := streamMap[c.streamIdentifier] + if !ok { + streamMap[c.streamIdentifier] = c.streamSequenceNumber + } else if sna16LT(ssn, c.streamSequenceNumber) { + // to report only once with greatest SSN + streamMap[c.streamIdentifier] = c.streamSequenceNumber + } + } + + fwdtsn := &chunkForwardTSN{ + newCumulativeTSN: a.advancedPeerTSNAckPoint, + streams: []chunkForwardTSNStream{}, + } + + var streamStr string + for si, ssn := range streamMap { + streamStr += fmt.Sprintf("(si=%d ssn=%d)", si, ssn) + fwdtsn.streams = append(fwdtsn.streams, chunkForwardTSNStream{ + identifier: si, + sequence: ssn, + }) + } + a.log.Tracef("[%s] building fwdtsn: newCumulativeTSN=%d cumTSN=%d - %s", a.name, fwdtsn.newCumulativeTSN, a.cumulativeTSNAckPoint, streamStr) + + return fwdtsn +} + +// createPacket wraps chunks in a packet. +// The caller should hold the read lock. +func (a *Association) createPacket(cs []chunk) *packet { + return &packet{ + verificationTag: a.peerVerificationTag, + sourcePort: a.sourcePort, + destinationPort: a.destinationPort, + chunks: cs, + } +} + +// The caller should hold the lock. +func (a *Association) handleReconfig(c *chunkReconfig) ([]*packet, error) { + a.log.Tracef("[%s] handleReconfig", a.name) + + pp := make([]*packet, 0) + + p, err := a.handleReconfigParam(c.paramA) + if err != nil { + return nil, err + } + if p != nil { + pp = append(pp, p) + } + + if c.paramB != nil { + p, err = a.handleReconfigParam(c.paramB) + if err != nil { + return nil, err + } + if p != nil { + pp = append(pp, p) + } + } + return pp, nil +} + +// The caller should hold the lock. +func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet { + a.log.Tracef("[%s] FwdTSN: %s", a.name, c.String()) + + if !a.useForwardTSN { + a.log.Warn("[%s] received FwdTSN but not enabled") + // Return an error chunk + cerr := &chunkError{ + errorCauses: []errorCause{&errorCauseUnrecognizedChunkType{}}, + } + outbound := &packet{} + outbound.verificationTag = a.peerVerificationTag + outbound.sourcePort = a.sourcePort + outbound.destinationPort = a.destinationPort + outbound.chunks = []chunk{cerr} + return []*packet{outbound} + } + + // From RFC 3758 Sec 3.6: + // Note, if the "New Cumulative TSN" value carried in the arrived + // FORWARD TSN chunk is found to be behind or at the current cumulative + // TSN point, the data receiver MUST treat this FORWARD TSN as out-of- + // date and MUST NOT update its Cumulative TSN. The receiver SHOULD + // send a SACK to its peer (the sender of the FORWARD TSN) since such a + // duplicate may indicate the previous SACK was lost in the network. + + a.log.Tracef("[%s] should send ack? newCumTSN=%d peerLastTSN=%d\n", + a.name, c.newCumulativeTSN, a.peerLastTSN) + if sna32LTE(c.newCumulativeTSN, a.peerLastTSN) { + a.log.Tracef("[%s] sending ack on Forward TSN", a.name) + a.ackState = ackStateImmediate + a.ackTimer.stop() + a.awakeWriteLoop() + return nil + } + + // From RFC 3758 Sec 3.6: + // the receiver MUST perform the same TSN handling, including duplicate + // detection, gap detection, SACK generation, cumulative TSN + // advancement, etc. as defined in RFC 2960 [2]---with the following + // exceptions and additions. + + // When a FORWARD TSN chunk arrives, the data receiver MUST first update + // its cumulative TSN point to the value carried in the FORWARD TSN + // chunk, + + // Advance peerLastTSN + for sna32LT(a.peerLastTSN, c.newCumulativeTSN) { + a.payloadQueue.pop(a.peerLastTSN + 1) // may not exist + a.peerLastTSN++ + } + + // Report new peerLastTSN value and abandoned largest SSN value to + // corresponding streams so that the abandoned chunks can be removed + // from the reassemblyQueue. + for _, forwarded := range c.streams { + if s, ok := a.streams[forwarded.identifier]; ok { + s.handleForwardTSNForOrdered(forwarded.sequence) + } + } + + // TSN may be forewared for unordered chunks. ForwardTSN chunk does not + // report which stream identifier it skipped for unordered chunks. + // Therefore, we need to broadcast this event to all existing streams for + // unordered chunks. + // See https://github.com/pion/sctp/issues/106 + for _, s := range a.streams { + s.handleForwardTSNForUnordered(c.newCumulativeTSN) + } + + return a.handlePeerLastTSNAndAcknowledgement(false) +} + +func (a *Association) sendResetRequest(streamIdentifier uint16) error { + a.lock.Lock() + defer a.lock.Unlock() + + state := a.getState() + if state != established { + return errors.Errorf("sending reset packet in non-established state: state=%s", + getAssociationStateString(state)) + } + + // Create DATA chunk which only contains valid stream identifier with + // nil userData and use it as a EOS from the stream. + c := &chunkPayloadData{ + streamIdentifier: streamIdentifier, + beginningFragment: true, + endingFragment: true, + userData: nil, + } + + a.pendingQueue.push(c) + a.awakeWriteLoop() + return nil +} + +// The caller should hold the lock. +func (a *Association) handleReconfigParam(raw param) (*packet, error) { + switch p := raw.(type) { + case *paramOutgoingResetRequest: + a.reconfigRequests[p.reconfigRequestSequenceNumber] = p + resp := a.resetStreamsIfAny(p) + if resp != nil { + return resp, nil + } + return nil, nil + + case *paramReconfigResponse: + delete(a.reconfigs, p.reconfigResponseSequenceNumber) + if len(a.reconfigs) == 0 { + a.tReconfig.stop() + } + return nil, nil + default: + return nil, errors.Errorf("unexpected parameter type %T", p) + } +} + +// The caller should hold the lock. +func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet { + result := reconfigResultSuccessPerformed + if sna32LTE(p.senderLastTSN, a.peerLastTSN) { + a.log.Debugf("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d", + a.name, p.senderLastTSN, a.peerLastTSN) + for _, id := range p.streamIdentifiers { + s, ok := a.streams[id] + if !ok { + continue + } + a.unregisterStream(s, io.EOF) + } + delete(a.reconfigRequests, p.reconfigRequestSequenceNumber) + } else { + a.log.Debugf("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d", + a.name, p.senderLastTSN, a.peerLastTSN) + result = reconfigResultInProgress + } + + return a.createPacket([]chunk{&chunkReconfig{ + paramA: ¶mReconfigResponse{ + reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber, + result: result, + }, + }}) +} + +// Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue. +// The caller should hold the lock. +func (a *Association) movePendingDataChunkToInflightQueue(c *chunkPayloadData) { + if err := a.pendingQueue.pop(c); err != nil { + a.log.Errorf("[%s] failed to pop from pending queue: %s", a.name, err.Error()) + } + + // Mark all fragements are in-flight now + if c.endingFragment { + c.setAllInflight() + } + + // Assign TSN + c.tsn = a.generateNextTSN() + + c.since = time.Now() // use to calculate RTT and also for maxPacketLifeTime + c.nSent = 1 // being sent for the first time + + a.checkPartialReliabilityStatus(c) + + a.log.Tracef("[%s] sending ppi=%d tsn=%d ssn=%d sent=%d len=%d (%v,%v)", + a.name, c.payloadType, c.tsn, c.streamSequenceNumber, c.nSent, len(c.userData), c.beginningFragment, c.endingFragment) + + // Push it into the inflightQueue + a.inflightQueue.pushNoCheck(c) +} + +// popPendingDataChunksToSend pops chunks from the pending queues as many as +// the cwnd and rwnd allows to send. +// The caller should hold the lock. +func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint16) { + chunks := []*chunkPayloadData{} + var sisToReset []uint16 // stream identifieres to reset + + if a.pendingQueue.size() > 0 { + // RFC 4960 sec 6.1. Transmission of DATA Chunks + // A) At any given time, the data sender MUST NOT transmit new data to + // any destination transport address if its peer's rwnd indicates + // that the peer has no buffer space (i.e., rwnd is 0; see Section + // 6.2.1). However, regardless of the value of rwnd (including if it + // is 0), the data sender can always have one DATA chunk in flight to + // the receiver if allowed by cwnd (see rule B, below). + + for { + c := a.pendingQueue.peek() + if c == nil { + break // no more pending data + } + + dataLen := uint32(len(c.userData)) + if dataLen == 0 { + sisToReset = append(sisToReset, c.streamIdentifier) + err := a.pendingQueue.pop(c) + if err != nil { + a.log.Errorf("failed to pop from pending queue: %s", err.Error()) + } + continue + } + + if uint32(a.inflightQueue.getNumBytes())+dataLen > a.cwnd { + break // would exceeds cwnd + } + + if dataLen > a.rwnd { + break // no more rwnd + } + + a.rwnd -= dataLen + + a.movePendingDataChunkToInflightQueue(c) + chunks = append(chunks, c) + } + + // the data sender can always have one DATA chunk in flight to the receiver + if len(chunks) == 0 && a.inflightQueue.size() == 0 { + // Send zero window probe + c := a.pendingQueue.peek() + if c != nil { + a.movePendingDataChunkToInflightQueue(c) + chunks = append(chunks, c) + } + } + } + + return chunks, sisToReset +} + +// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle +// DATA chunks into a packet so long as the resulting packet size does not exceed +// the path MTU. +// The caller should hold the lock. +func (a *Association) bundleDataChunksIntoPackets(chunks []*chunkPayloadData) []*packet { + packets := []*packet{} + chunksToSend := []chunk{} + bytesInPacket := int(commonHeaderSize) + + for _, c := range chunks { + // RFC 4960 sec 6.1. Transmission of DATA Chunks + // Multiple DATA chunks committed for transmission MAY be bundled in a + // single packet. Furthermore, DATA chunks being retransmitted MAY be + // bundled with new DATA chunks, as long as the resulting packet size + // does not exceed the path MTU. + if bytesInPacket+len(c.userData) > int(a.mtu) { + packets = append(packets, a.createPacket(chunksToSend)) + chunksToSend = []chunk{} + bytesInPacket = int(commonHeaderSize) + } + + chunksToSend = append(chunksToSend, c) + bytesInPacket += int(dataChunkHeaderSize) + len(c.userData) + } + + if len(chunksToSend) > 0 { + packets = append(packets, a.createPacket(chunksToSend)) + } + + return packets +} + +// sendPayloadData sends the data chunks. +func (a *Association) sendPayloadData(chunks []*chunkPayloadData) error { + a.lock.Lock() + defer a.lock.Unlock() + + state := a.getState() + if state != established { + return errors.Errorf("sending payload data in non-established state: state=%s", + getAssociationStateString(state)) + } + + // Push the chunks into the pending queue first. + for _, c := range chunks { + a.pendingQueue.push(c) + } + + a.awakeWriteLoop() + return nil +} + +// The caller should hold the lock. +func (a *Association) checkPartialReliabilityStatus(c *chunkPayloadData) { + if !a.useForwardTSN { + return + } + + // draft-ietf-rtcweb-data-protocol-09.txt section 6 + // 6. Procedures + // All Data Channel Establishment Protocol messages MUST be sent using + // ordered delivery and reliable transmission. + // + if c.payloadType == PayloadTypeWebRTCDCEP { + return + } + + // PR-SCTP + if s, ok := a.streams[c.streamIdentifier]; ok { + s.lock.RLock() + if s.reliabilityType == ReliabilityTypeRexmit { + if c.nSent >= s.reliabilityValue { + c.setAbandoned(true) + a.log.Tracef("[%s] marked as abandoned: tsn=%d ppi=%d (remix: %d)", a.name, c.tsn, c.payloadType, c.nSent) + } + } else if s.reliabilityType == ReliabilityTypeTimed { + elapsed := int64(time.Since(c.since).Seconds() * 1000) + if elapsed >= int64(s.reliabilityValue) { + c.setAbandoned(true) + a.log.Tracef("[%s] marked as abandoned: tsn=%d ppi=%d (timed: %d)", a.name, c.tsn, c.payloadType, elapsed) + } + } + s.lock.RUnlock() + } else { + a.log.Errorf("[%s] stream %d not found)", a.name, c.streamIdentifier) + } +} + +// getDataPacketsToRetransmit is called when T3-rtx is timed out and retransmit outstanding data chunks +// that are not acked or abandoned yet. +// The caller should hold the lock. +func (a *Association) getDataPacketsToRetransmit() []*packet { + awnd := min32(a.cwnd, a.rwnd) + chunks := []*chunkPayloadData{} + var bytesToSend int + var done bool + + for i := 0; !done; i++ { + c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + uint32(i) + 1) + if !ok { + break // end of pending data + } + + if !c.retransmit { + continue + } + + if i == 0 && int(a.rwnd) < len(c.userData) { + // Send it as a zero window probe + done = true + } else if bytesToSend+len(c.userData) > int(awnd) { + break + } + + // reset the retransmit flag not to retransmit again before the next + // t3-rtx timer fires + c.retransmit = false + bytesToSend += len(c.userData) + + c.nSent++ + + a.checkPartialReliabilityStatus(c) + + a.log.Tracef("[%s] retransmitting tsn=%d ssn=%d sent=%d", a.name, c.tsn, c.streamSequenceNumber, c.nSent) + + chunks = append(chunks, c) + } + + return a.bundleDataChunksIntoPackets(chunks) +} + +// generateNextTSN returns the myNextTSN and increases it. The caller should hold the lock. +// The caller should hold the lock. +func (a *Association) generateNextTSN() uint32 { + tsn := a.myNextTSN + a.myNextTSN++ + return tsn +} + +// generateNextRSN returns the myNextRSN and increases it. The caller should hold the lock. +// The caller should hold the lock. +func (a *Association) generateNextRSN() uint32 { + rsn := a.myNextRSN + a.myNextRSN++ + return rsn +} + +func (a *Association) createSelectiveAckChunk() *chunkSelectiveAck { + sack := &chunkSelectiveAck{} + sack.cumulativeTSNAck = a.peerLastTSN + sack.advertisedReceiverWindowCredit = a.getMyReceiverWindowCredit() + sack.duplicateTSN = a.payloadQueue.popDuplicates() + sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks(a.peerLastTSN) + return sack +} + +func pack(p *packet) []*packet { + return []*packet{p} +} + +func (a *Association) handleChunkStart() { + a.lock.Lock() + defer a.lock.Unlock() + + a.delayedAckTriggered = false + a.immediateAckTriggered = false +} + +func (a *Association) handleChunkEnd() { + a.lock.Lock() + defer a.lock.Unlock() + + if a.immediateAckTriggered { + // Send SACK now! + a.ackState = ackStateImmediate + a.ackTimer.stop() + a.awakeWriteLoop() + } else if a.delayedAckTriggered { + // Will send delayed ack in the next ack timeout + a.ackState = ackStateDelay + a.ackTimer.start() + } +} + +func (a *Association) handleChunk(p *packet, c chunk) error { + a.lock.Lock() + defer a.lock.Unlock() + + var packets []*packet + var err error + + if _, err = c.check(); err != nil { + a.log.Errorf("[ %s ] failed validating chunk: %s ", a.name, err) + return nil + } + + switch c := c.(type) { + case *chunkInit: + packets, err = a.handleInit(p, c) + + case *chunkInitAck: + err = a.handleInitAck(p, c) + + case *chunkAbort: + var errStr string + for _, e := range c.errorCauses { + errStr += fmt.Sprintf("(%s)", e) + } + return fmt.Errorf("[%s] %w: %s", a.name, errChunk, errStr) + + case *chunkError: + var errStr string + for _, e := range c.errorCauses { + errStr += fmt.Sprintf("(%s)", e) + } + a.log.Debugf("[%s] Error chunk, with following errors: %s", a.name, errStr) + + case *chunkHeartbeat: + packets = a.handleHeartbeat(c) + + case *chunkCookieEcho: + packets = a.handleCookieEcho(c) + + case *chunkCookieAck: + a.handleCookieAck() + + case *chunkPayloadData: + packets = a.handleData(c) + + case *chunkSelectiveAck: + err = a.handleSack(c) + + case *chunkReconfig: + packets, err = a.handleReconfig(c) + + case *chunkForwardTSN: + packets = a.handleForwardTSN(c) + + default: + err = errors.Errorf("unhandled chunk type") + } + + // Log and return, the only condition that is fatal is a ABORT chunk + if err != nil { + a.log.Errorf("Failed to handle chunk: %v", err) + return nil + } + + if len(packets) > 0 { + a.controlQueue.pushAll(packets) + a.awakeWriteLoop() + } + + return nil +} + +func (a *Association) onRetransmissionTimeout(id int, nRtos uint) { + a.lock.Lock() + defer a.lock.Unlock() + + if id == timerT1Init { + err := a.sendInit() + if err != nil { + a.log.Debugf("[%s] failed to retransmit init (nRtos=%d): %v", a.name, nRtos, err) + } + return + } + + if id == timerT1Cookie { + err := a.sendCookieEcho() + if err != nil { + a.log.Debugf("[%s] failed to retransmit cookie-echo (nRtos=%d): %v", a.name, nRtos, err) + } + return + } + + if id == timerT3RTX { + a.stats.incT3Timeouts() + + // RFC 4960 sec 6.3.3 + // E1) For the destination address for which the timer expires, adjust + // its ssthresh with rules defined in Section 7.2.3 and set the + // cwnd <- MTU. + // RFC 4960 sec 7.2.3 + // When the T3-rtx timer expires on an address, SCTP should perform slow + // start by: + // ssthresh = max(cwnd/2, 4*MTU) + // cwnd = 1*MTU + + a.ssthresh = max32(a.cwnd/2, 4*a.mtu) + a.cwnd = a.mtu + a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (RTO)", + a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes()) + + // RFC 3758 sec 3.5 + // A5) Any time the T3-rtx timer expires, on any destination, the sender + // SHOULD try to advance the "Advanced.Peer.Ack.Point" by following + // the procedures outlined in C2 - C5. + if a.useForwardTSN { + // RFC 3758 Sec 3.5 C2 + for i := a.advancedPeerTSNAckPoint + 1; ; i++ { + c, ok := a.inflightQueue.get(i) + if !ok { + break + } + if !c.abandoned() { + break + } + a.advancedPeerTSNAckPoint = i + } + + // RFC 3758 Sec 3.5 C3 + if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) { + a.willSendForwardTSN = true + } + } + + a.log.Debugf("[%s] T3-rtx timed out: nRtos=%d cwnd=%d ssthresh=%d", a.name, nRtos, a.cwnd, a.ssthresh) + + /* + a.log.Debugf(" - advancedPeerTSNAckPoint=%d", a.advancedPeerTSNAckPoint) + a.log.Debugf(" - cumulativeTSNAckPoint=%d", a.cumulativeTSNAckPoint) + a.inflightQueue.updateSortedKeys() + for i, tsn := range a.inflightQueue.sorted { + if c, ok := a.inflightQueue.get(tsn); ok { + a.log.Debugf(" - [%d] tsn=%d acked=%v abandoned=%v (%v,%v) len=%d", + i, c.tsn, c.acked, c.abandoned(), c.beginningFragment, c.endingFragment, len(c.userData)) + } + } + */ + + a.inflightQueue.markAllToRetrasmit() + a.awakeWriteLoop() + + return + } + + if id == timerReconfig { + a.willRetransmitReconfig = true + a.awakeWriteLoop() + } +} + +func (a *Association) onRetransmissionFailure(id int) { + a.lock.Lock() + defer a.lock.Unlock() + + if id == timerT1Init { + a.log.Errorf("[%s] retransmission failure: T1-init", a.name) + a.handshakeCompletedCh <- errors.Errorf("handshake failed (INIT ACK)") + return + } + + if id == timerT1Cookie { + a.log.Errorf("[%s] retransmission failure: T1-cookie", a.name) + a.handshakeCompletedCh <- errors.Errorf("handshake failed (COOKIE ECHO)") + return + } + + if id == timerT3RTX { + // T3-rtx timer will not fail by design + // Justifications: + // * ICE would fail if the connectivity is lost + // * WebRTC spec is not clear how this incident should be reported to ULP + a.log.Errorf("[%s] retransmission failure: T3-rtx (DATA)", a.name) + return + } +} + +func (a *Association) onAckTimeout() { + a.lock.Lock() + defer a.lock.Unlock() + + a.log.Tracef("[%s] ack timed out (ackState: %d)", a.name, a.ackState) + a.stats.incAckTimeouts() + + a.ackState = ackStateImmediate + a.awakeWriteLoop() +} + +// bufferedAmount returns total amount (in bytes) of currently buffered user data. +// This is used only by testing. +func (a *Association) bufferedAmount() int { + a.lock.RLock() + defer a.lock.RUnlock() + + return a.pendingQueue.getNumBytes() + a.inflightQueue.getNumBytes() +} + +// MaxMessageSize returns the maximum message size you can send. +func (a *Association) MaxMessageSize() uint32 { + return atomic.LoadUint32(&a.maxMessageSize) +} + +// SetMaxMessageSize sets the maximum message size you can send. +func (a *Association) SetMaxMessageSize(maxMsgSize uint32) { + atomic.StoreUint32(&a.maxMessageSize, maxMsgSize) +} diff --git a/vendor/github.com/pion/sctp/association_stats.go b/vendor/github.com/pion/sctp/association_stats.go new file mode 100644 index 0000000..4ccb7be --- /dev/null +++ b/vendor/github.com/pion/sctp/association_stats.go @@ -0,0 +1,61 @@ +package sctp + +import ( + "sync/atomic" +) + +type associationStats struct { + nDATAs uint64 + nSACKs uint64 + nT3Timeouts uint64 + nAckTimeouts uint64 + nFastRetrans uint64 +} + +func (s *associationStats) incDATAs() { + atomic.AddUint64(&s.nDATAs, 1) +} + +func (s *associationStats) getNumDATAs() uint64 { + return atomic.LoadUint64(&s.nDATAs) +} + +func (s *associationStats) incSACKs() { + atomic.AddUint64(&s.nSACKs, 1) +} + +func (s *associationStats) getNumSACKs() uint64 { + return atomic.LoadUint64(&s.nSACKs) +} + +func (s *associationStats) incT3Timeouts() { + atomic.AddUint64(&s.nT3Timeouts, 1) +} + +func (s *associationStats) getNumT3Timeouts() uint64 { + return atomic.LoadUint64(&s.nT3Timeouts) +} + +func (s *associationStats) incAckTimeouts() { + atomic.AddUint64(&s.nAckTimeouts, 1) +} + +func (s *associationStats) getNumAckTimeouts() uint64 { + return atomic.LoadUint64(&s.nAckTimeouts) +} + +func (s *associationStats) incFastRetrans() { + atomic.AddUint64(&s.nFastRetrans, 1) +} + +func (s *associationStats) getNumFastRetrans() uint64 { + return atomic.LoadUint64(&s.nFastRetrans) +} + +func (s *associationStats) reset() { + atomic.StoreUint64(&s.nDATAs, 0) + atomic.StoreUint64(&s.nSACKs, 0) + atomic.StoreUint64(&s.nT3Timeouts, 0) + atomic.StoreUint64(&s.nAckTimeouts, 0) + atomic.StoreUint64(&s.nFastRetrans, 0) +} diff --git a/vendor/github.com/pion/sctp/chunk.go b/vendor/github.com/pion/sctp/chunk.go new file mode 100644 index 0000000..ec47da1 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk.go @@ -0,0 +1,9 @@ +package sctp + +type chunk interface { + unmarshal(raw []byte) error + marshal() ([]byte, error) + check() (bool, error) + + valueLength() int +} diff --git a/vendor/github.com/pion/sctp/chunk_abort.go b/vendor/github.com/pion/sctp/chunk_abort.go new file mode 100644 index 0000000..9fd4692 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_abort.go @@ -0,0 +1,88 @@ +package sctp // nolint:dupl + +import ( + "fmt" + + "github.com/pkg/errors" +) + +/* +Abort represents an SCTP Chunk of type ABORT + +The ABORT chunk is sent to the peer of an association to close the +association. The ABORT chunk may contain Cause Parameters to inform +the receiver about the reason of the abort. DATA chunks MUST NOT be +bundled with ABORT. Control chunks (except for INIT, INIT ACK, and +SHUTDOWN COMPLETE) MAY be bundled with an ABORT, but they MUST be +placed before the ABORT in the SCTP packet or they will be ignored by +the receiver. + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 6 |Reserved |T| Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | +| zero or more Error Causes | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ +type chunkAbort struct { + chunkHeader + errorCauses []errorCause +} + +func (a *chunkAbort) unmarshal(raw []byte) error { + if err := a.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if a.typ != ctAbort { + return errors.Errorf("ChunkType is not of type ABORT, actually is %s", a.typ.String()) + } + + offset := chunkHeaderSize + for { + if len(raw)-offset < 4 { + break + } + + e, err := buildErrorCause(raw[offset:]) + if err != nil { + return errors.Wrap(err, "Failed build Abort Chunk") + } + + offset += int(e.length()) + a.errorCauses = append(a.errorCauses, e) + } + return nil +} + +func (a *chunkAbort) marshal() ([]byte, error) { + a.chunkHeader.typ = ctAbort + a.flags = 0x00 + a.raw = []byte{} + for _, ec := range a.errorCauses { + raw, err := ec.marshal() + if err != nil { + return nil, err + } + a.raw = append(a.raw, raw...) + } + return a.chunkHeader.marshal() +} + +func (a *chunkAbort) check() (abort bool, err error) { + return false, nil +} + +// String makes chunkAbort printable +func (a *chunkAbort) String() string { + res := a.chunkHeader.String() + + for _, cause := range a.errorCauses { + res += fmt.Sprintf("\n - %s", cause) + } + + return res +} diff --git a/vendor/github.com/pion/sctp/chunk_cookie_ack.go b/vendor/github.com/pion/sctp/chunk_cookie_ack.go new file mode 100644 index 0000000..83b6f71 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_cookie_ack.go @@ -0,0 +1,44 @@ +package sctp + +import ( + "github.com/pkg/errors" +) + +/* +chunkCookieAck represents an SCTP Chunk of type chunkCookieAck + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 11 |Chunk Flags | Length = 4 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ +type chunkCookieAck struct { + chunkHeader +} + +func (c *chunkCookieAck) unmarshal(raw []byte) error { + if err := c.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if c.typ != ctCookieAck { + return errors.Errorf("ChunkType is not of type COOKIEACK, actually is %s", c.typ.String()) + } + + return nil +} + +func (c *chunkCookieAck) marshal() ([]byte, error) { + c.chunkHeader.typ = ctCookieAck + return c.chunkHeader.marshal() +} + +func (c *chunkCookieAck) check() (abort bool, err error) { + return false, nil +} + +// String makes chunkCookieAck printable +func (c *chunkCookieAck) String() string { + return c.chunkHeader.String() +} diff --git a/vendor/github.com/pion/sctp/chunk_cookie_echo.go b/vendor/github.com/pion/sctp/chunk_cookie_echo.go new file mode 100644 index 0000000..3f0ed36 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_cookie_echo.go @@ -0,0 +1,46 @@ +package sctp + +import ( + "github.com/pkg/errors" +) + +/* +CookieEcho represents an SCTP Chunk of type CookieEcho + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 10 |Chunk Flags | Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Cookie | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +*/ +type chunkCookieEcho struct { + chunkHeader + cookie []byte +} + +func (c *chunkCookieEcho) unmarshal(raw []byte) error { + if err := c.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if c.typ != ctCookieEcho { + return errors.Errorf("ChunkType is not of type COOKIEECHO, actually is %s", c.typ.String()) + } + c.cookie = c.raw + + return nil +} + +func (c *chunkCookieEcho) marshal() ([]byte, error) { + c.chunkHeader.typ = ctCookieEcho + c.chunkHeader.raw = c.cookie + return c.chunkHeader.marshal() +} + +func (c *chunkCookieEcho) check() (abort bool, err error) { + return false, nil +} diff --git a/vendor/github.com/pion/sctp/chunk_error.go b/vendor/github.com/pion/sctp/chunk_error.go new file mode 100644 index 0000000..eb29324 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_error.go @@ -0,0 +1,95 @@ +package sctp // nolint:dupl + +import ( + "fmt" + + "github.com/pkg/errors" +) + +/* + Operation Error (ERROR) (9) + + An endpoint sends this chunk to its peer endpoint to notify it of + certain error conditions. It contains one or more error causes. An + Operation Error is not considered fatal in and of itself, but may be + used with an ERROR chunk to report a fatal condition. It has the + following parameters: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 9 | Chunk Flags | Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / one or more Error Causes / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Chunk Flags: 8 bits + + Set to 0 on transmit and ignored on receipt. + + Length: 16 bits (unsigned integer) + + Set to the size of the chunk in bytes, including the chunk header + and all the Error Cause fields present. +*/ +type chunkError struct { + chunkHeader + errorCauses []errorCause +} + +func (a *chunkError) unmarshal(raw []byte) error { + if err := a.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if a.typ != ctError { + return errors.Errorf("ChunkType is not of type ctError, actually is %s", a.typ.String()) + } + + offset := chunkHeaderSize + for { + if len(raw)-offset < 4 { + break + } + + e, err := buildErrorCause(raw[offset:]) + if err != nil { + return errors.Wrap(err, "Failed build Error Chunk") + } + + offset += int(e.length()) + a.errorCauses = append(a.errorCauses, e) + } + return nil +} + +func (a *chunkError) marshal() ([]byte, error) { + a.chunkHeader.typ = ctError + a.flags = 0x00 + a.raw = []byte{} + for _, ec := range a.errorCauses { + raw, err := ec.marshal() + if err != nil { + return nil, err + } + a.raw = append(a.raw, raw...) + } + return a.chunkHeader.marshal() +} + +func (a *chunkError) check() (abort bool, err error) { + return false, nil +} + +// String makes chunkError printable +func (a *chunkError) String() string { + res := a.chunkHeader.String() + + for _, cause := range a.errorCauses { + res += fmt.Sprintf("\n - %s", cause) + } + + return res +} diff --git a/vendor/github.com/pion/sctp/chunk_forward_tsn.go b/vendor/github.com/pion/sctp/chunk_forward_tsn.go new file mode 100644 index 0000000..f6ff357 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_forward_tsn.go @@ -0,0 +1,145 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + + "github.com/pkg/errors" +) + +// This chunk shall be used by the data sender to inform the data +// receiver to adjust its cumulative received TSN point forward because +// some missing TSNs are associated with data chunks that SHOULD NOT be +// transmitted or retransmitted by the sender. +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Type = 192 | Flags = 0x00 | Length = Variable | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | New Cumulative TSN | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Stream-1 | Stream Sequence-1 | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// \ / +// / \ +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Stream-N | Stream Sequence-N | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +type chunkForwardTSN struct { + chunkHeader + + // This indicates the new cumulative TSN to the data receiver. Upon + // the reception of this value, the data receiver MUST consider + // any missing TSNs earlier than or equal to this value as received, + // and stop reporting them as gaps in any subsequent SACKs. + newCumulativeTSN uint32 + + streams []chunkForwardTSNStream +} + +const ( + newCumulativeTSNLength = 4 + forwardTSNStreamLength = 4 +) + +var errMarshalStreamFailed = errors.New("failed to marshal stream") + +func (c *chunkForwardTSN) unmarshal(raw []byte) error { + if err := c.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if len(c.raw) < newCumulativeTSNLength { + return errors.New("chunk to short") + } + + c.newCumulativeTSN = binary.BigEndian.Uint32(c.raw[0:]) + + offset := newCumulativeTSNLength + remaining := len(c.raw) - offset + for remaining > 0 { + s := chunkForwardTSNStream{} + + if err := s.unmarshal(c.raw[offset:]); err != nil { + return fmt.Errorf("failed to unmarshal stream: %w", err) + } + + c.streams = append(c.streams, s) + + offset += s.length() + remaining -= s.length() + } + + return nil +} + +func (c *chunkForwardTSN) marshal() ([]byte, error) { + out := make([]byte, newCumulativeTSNLength) + binary.BigEndian.PutUint32(out[0:], c.newCumulativeTSN) + + for _, s := range c.streams { + b, err := s.marshal() + if err != nil { + return nil, fmt.Errorf("%w: %v", errMarshalStreamFailed, err) + } + out = append(out, b...) + } + + c.typ = ctForwardTSN + c.raw = out + return c.chunkHeader.marshal() +} + +func (c *chunkForwardTSN) check() (abort bool, err error) { + return true, nil +} + +// String makes chunkForwardTSN printable +func (c *chunkForwardTSN) String() string { + res := fmt.Sprintf("New Cumulative TSN: %d\n", c.newCumulativeTSN) + for _, s := range c.streams { + res += fmt.Sprintf(" - si=%d, ssn=%d\n", s.identifier, s.sequence) + } + return res +} + +type chunkForwardTSNStream struct { + // This field holds a stream number that was skipped by this + // FWD-TSN. + identifier uint16 + + // This field holds the sequence number associated with the stream + // that was skipped. The stream sequence field holds the largest + // stream sequence number in this stream being skipped. The receiver + // of the FWD-TSN's can use the Stream-N and Stream Sequence-N fields + // to enable delivery of any stranded TSN's that remain on the stream + // re-ordering queues. This field MUST NOT report TSN's corresponding + // to DATA chunks that are marked as unordered. For ordered DATA + // chunks this field MUST be filled in. + sequence uint16 +} + +func (s *chunkForwardTSNStream) length() int { + return forwardTSNStreamLength +} + +func (s *chunkForwardTSNStream) unmarshal(raw []byte) error { + if len(raw) < forwardTSNStreamLength { + return errors.New("stream to short") + } + s.identifier = binary.BigEndian.Uint16(raw[0:]) + s.sequence = binary.BigEndian.Uint16(raw[2:]) + + return nil +} + +func (s *chunkForwardTSNStream) marshal() ([]byte, error) { // nolint:unparam + out := make([]byte, forwardTSNStreamLength) + + binary.BigEndian.PutUint16(out[0:], s.identifier) + binary.BigEndian.PutUint16(out[2:], s.sequence) + + return out, nil +} diff --git a/vendor/github.com/pion/sctp/chunk_heartbeat.go b/vendor/github.com/pion/sctp/chunk_heartbeat.go new file mode 100644 index 0000000..f2026a4 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_heartbeat.go @@ -0,0 +1,75 @@ +package sctp + +import ( + "github.com/pkg/errors" +) + +/* +chunkHeartbeat represents an SCTP Chunk of type HEARTBEAT + +An endpoint should send this chunk to its peer endpoint to probe the +reachability of a particular destination transport address defined in +the present association. + +The parameter field contains the Heartbeat Information, which is a +variable-length opaque data structure understood only by the sender. + + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 4 | Chunk Flags | Heartbeat Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | +| Heartbeat Information TLV (Variable-Length) | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +Defined as a variable-length parameter using the format described +in Section 3.2.1, i.e.: + +Variable Parameters Status Type Value +------------------------------------------------------------- +heartbeat Info Mandatory 1 + +*/ +type chunkHeartbeat struct { + chunkHeader + params []param +} + +func (h *chunkHeartbeat) unmarshal(raw []byte) error { + if err := h.chunkHeader.unmarshal(raw); err != nil { + return err + } else if h.typ != ctHeartbeat { + return errors.Errorf("ChunkType is not of type HEARTBEAT, actually is %s", h.typ.String()) + } + + if len(raw) <= chunkHeaderSize { + return errors.Errorf("Heartbeat is not long enough to contain Heartbeat Info %d", len(raw)) + } + + pType, err := parseParamType(raw[chunkHeaderSize:]) + if err != nil { + return errors.Wrap(err, "failed to parse param type") + } + if pType != heartbeatInfo { + return errors.Errorf("Heartbeat should only have HEARTBEAT param, instead have %s", pType.String()) + } + + p, err := buildParam(pType, raw[chunkHeaderSize:]) + if err != nil { + return errors.Wrap(err, "Failed unmarshalling param in Heartbeat Chunk") + } + h.params = append(h.params, p) + + return nil +} + +func (h *chunkHeartbeat) Marshal() ([]byte, error) { + return nil, errors.Errorf("Unimplemented") +} + +func (h *chunkHeartbeat) check() (abort bool, err error) { + return false, nil +} diff --git a/vendor/github.com/pion/sctp/chunk_heartbeat_ack.go b/vendor/github.com/pion/sctp/chunk_heartbeat_ack.go new file mode 100644 index 0000000..ff84090 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_heartbeat_ack.go @@ -0,0 +1,86 @@ +package sctp + +import ( + "github.com/pkg/errors" +) + +/* +chunkHeartbeatAck represents an SCTP Chunk of type HEARTBEAT ACK + +An endpoint should send this chunk to its peer endpoint as a response +to a HEARTBEAT chunk (see Section 8.3). A HEARTBEAT ACK is always +sent to the source IP address of the IP datagram containing the +HEARTBEAT chunk to which this ack is responding. + +The parameter field contains a variable-length opaque data structure. + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 5 | Chunk Flags | Heartbeat Ack Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | +| Heartbeat Information TLV (Variable-Length) | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +Defined as a variable-length parameter using the format described +in Section 3.2.1, i.e.: + +Variable Parameters Status Type Value +------------------------------------------------------------- +Heartbeat Info Mandatory 1 + +*/ +type chunkHeartbeatAck struct { + chunkHeader + params []param +} + +func (h *chunkHeartbeatAck) unmarshal(raw []byte) error { + return errors.Errorf("Unimplemented") +} + +func (h *chunkHeartbeatAck) marshal() ([]byte, error) { + if len(h.params) != 1 { + return nil, errors.Errorf("Heartbeat Ack must have one param") + } + + switch h.params[0].(type) { + case *paramHeartbeatInfo: + // ParamHeartbeatInfo is valid + default: + return nil, errors.Errorf("Heartbeat Ack must have one param, and it should be a HeartbeatInfo") + } + + out := make([]byte, 0) + for idx, p := range h.params { + pp, err := p.marshal() + if err != nil { + return nil, errors.Wrap(err, "Unable to marshal parameter for Heartbeat Ack") + } + + out = append(out, pp...) + + // Chunks (including Type, Length, and Value fields) are padded out + // by the sender with all zero bytes to be a multiple of 4 bytes + // long. This padding MUST NOT be more than 3 bytes in total. The + // Chunk Length value does not include terminating padding of the + // chunk. *However, it does include padding of any variable-length + // parameter except the last parameter in the chunk.* The receiver + // MUST ignore the padding. + if idx != len(h.params)-1 { + out = padByte(out, getPadding(len(pp))) + } + } + + h.chunkHeader.typ = ctHeartbeatAck + h.chunkHeader.raw = out + + return h.chunkHeader.marshal() +} + +func (h *chunkHeartbeatAck) check() (abort bool, err error) { + return false, nil +} diff --git a/vendor/github.com/pion/sctp/chunk_init.go b/vendor/github.com/pion/sctp/chunk_init.go new file mode 100644 index 0000000..65b9b1a --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_init.go @@ -0,0 +1,123 @@ +package sctp // nolint:dupl + +import ( + "fmt" + + "github.com/pkg/errors" +) + +/* +Init represents an SCTP Chunk of type INIT + +See chunkInitCommon for the fixed headers + +Variable Parameters Status Type Value +------------------------------------------------------------- +IPv4 IP (Note 1) Optional 5 +IPv6 IP (Note 1) Optional 6 +Cookie Preservative Optional 9 +Reserved for ECN Capable (Note 2) Optional 32768 (0x8000) +Host Name IP (Note 3) Optional 11 +Supported IP Types (Note 4) Optional 12 +*/ +type chunkInit struct { + chunkHeader + chunkInitCommon +} + +func (i *chunkInit) unmarshal(raw []byte) error { + if err := i.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if i.typ != ctInit { + return errors.Errorf("ChunkType is not of type INIT, actually is %s", i.typ.String()) + } else if len(i.raw) < initChunkMinLength { + return errors.Errorf("Chunk Value isn't long enough for mandatory parameters exp: %d actual: %d", initChunkMinLength, len(i.raw)) + } + + // The Chunk Flags field in INIT is reserved, and all bits in it should + // be set to 0 by the sender and ignored by the receiver. The sequence + // of parameters within an INIT can be processed in any order. + if i.flags != 0 { + return errors.New("ChunkType of type INIT flags must be all 0") + } + + if err := i.chunkInitCommon.unmarshal(i.raw); err != nil { + return errors.Wrap(err, "Failed to unmarshal INIT body") + } + + return nil +} + +func (i *chunkInit) marshal() ([]byte, error) { + initShared, err := i.chunkInitCommon.marshal() + if err != nil { + return nil, errors.Wrap(err, "Failed marshaling INIT common data") + } + + i.chunkHeader.typ = ctInit + i.chunkHeader.raw = initShared + return i.chunkHeader.marshal() +} + +func (i *chunkInit) check() (abort bool, err error) { + // The receiver of the INIT (the responding end) records the value of + // the Initiate Tag parameter. This value MUST be placed into the + // Verification Tag field of every SCTP packet that the receiver of + // the INIT transmits within this association. + // + // The Initiate Tag is allowed to have any value except 0. See + // Section 5.3.1 for more on the selection of the tag value. + // + // If the value of the Initiate Tag in a received INIT chunk is found + // to be 0, the receiver MUST treat it as an error and close the + // association by transmitting an ABORT. + if i.initiateTag == 0 { + abort = true + return abort, errors.New("ChunkType of type INIT InitiateTag must not be 0") + } + + // Defines the maximum number of streams the sender of this INIT + // chunk allows the peer end to create in this association. The + // value 0 MUST NOT be used. + // + // Note: There is no negotiation of the actual number of streams but + // instead the two endpoints will use the min(requested, offered). + // See Section 5.1.1 for details. + // + // Note: A receiver of an INIT with the MIS value of 0 SHOULD abort + // the association. + if i.numInboundStreams == 0 { + abort = true + return abort, errors.New("INIT inbound stream request must be > 0") + } + + // Defines the number of outbound streams the sender of this INIT + // chunk wishes to create in this association. The value of 0 MUST + // NOT be used. + // + // Note: A receiver of an INIT with the OS value set to 0 SHOULD + // abort the association. + + if i.numOutboundStreams == 0 { + abort = true + return abort, errors.New("INIT outbound stream request must be > 0") + } + + // An SCTP receiver MUST be able to receive a minimum of 1500 bytes in + // one SCTP packet. This means that an SCTP endpoint MUST NOT indicate + // less than 1500 bytes in its initial a_rwnd sent in the INIT or INIT + // ACK. + if i.advertisedReceiverWindowCredit < 1500 { + abort = true + return abort, errors.New("INIT Advertised Receiver Window Credit (a_rwnd) must be >= 1500") + } + + return false, nil +} + +// String makes chunkInit printable +func (i *chunkInit) String() string { + return fmt.Sprintf("%s\n%s", i.chunkHeader, i.chunkInitCommon) +} diff --git a/vendor/github.com/pion/sctp/chunk_init_ack.go b/vendor/github.com/pion/sctp/chunk_init_ack.go new file mode 100644 index 0000000..551bcea --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_init_ack.go @@ -0,0 +1,126 @@ +package sctp // nolint:dupl + +import ( + "fmt" + + "github.com/pkg/errors" +) + +/* +chunkInitAck represents an SCTP Chunk of type INIT ACK + +See chunkInitCommon for the fixed headers + +Variable Parameters Status Type Value +------------------------------------------------------------- +State Cookie Mandatory 7 +IPv4 IP (Note 1) Optional 5 +IPv6 IP (Note 1) Optional 6 +Unrecognized Parameter Optional 8 +Reserved for ECN Capable (Note 2) Optional 32768 (0x8000) +Host Name IP (Note 3) Optional 11<Paste> + +*/ +type chunkInitAck struct { + chunkHeader + chunkInitCommon +} + +func (i *chunkInitAck) unmarshal(raw []byte) error { + if err := i.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if i.typ != ctInitAck { + return errors.Errorf("ChunkType is not of type INIT ACK, actually is %s", i.typ.String()) + } else if len(i.raw) < initChunkMinLength { + return errors.Errorf("Chunk Value isn't long enough for mandatory parameters exp: %d actual: %d", initChunkMinLength, len(i.raw)) + } + + // The Chunk Flags field in INIT is reserved, and all bits in it should + // be set to 0 by the sender and ignored by the receiver. The sequence + // of parameters within an INIT can be processed in any order. + if i.flags != 0 { + return errors.New("ChunkType of type INIT ACK flags must be all 0") + } + + if err := i.chunkInitCommon.unmarshal(i.raw); err != nil { + return errors.Wrap(err, "Failed to unmarshal INIT body") + } + + return nil +} + +func (i *chunkInitAck) marshal() ([]byte, error) { + initShared, err := i.chunkInitCommon.marshal() + if err != nil { + return nil, errors.Wrap(err, "Failed marshaling INIT common data") + } + + i.chunkHeader.typ = ctInitAck + i.chunkHeader.raw = initShared + return i.chunkHeader.marshal() +} + +func (i *chunkInitAck) check() (abort bool, err error) { + // The receiver of the INIT ACK records the value of the Initiate Tag + // parameter. This value MUST be placed into the Verification Tag + // field of every SCTP packet that the INIT ACK receiver transmits + // within this association. + // + // The Initiate Tag MUST NOT take the value 0. See Section 5.3.1 for + // more on the selection of the Initiate Tag value. + // + // If the value of the Initiate Tag in a received INIT ACK chunk is + // found to be 0, the receiver MUST destroy the association + // discarding its TCB. The receiver MAY send an ABORT for debugging + // purpose. + if i.initiateTag == 0 { + abort = true + return abort, errors.New("ChunkType of type INIT ACK InitiateTag must not be 0") + } + + // Defines the maximum number of streams the sender of this INIT ACK + // chunk allows the peer end to create in this association. The + // value 0 MUST NOT be used. + // + // Note: There is no negotiation of the actual number of streams but + // instead the two endpoints will use the min(requested, offered). + // See Section 5.1.1 for details. + // + // Note: A receiver of an INIT ACK with the MIS value set to 0 SHOULD + // destroy the association discarding its TCB. + if i.numInboundStreams == 0 { + abort = true + return abort, errors.New("INIT ACK inbound stream request must be > 0") + } + + // Defines the number of outbound streams the sender of this INIT ACK + // chunk wishes to create in this association. The value of 0 MUST + // NOT be used, and the value MUST NOT be greater than the MIS value + // sent in the INIT chunk. + // + // Note: A receiver of an INIT ACK with the OS value set to 0 SHOULD + // destroy the association discarding its TCB. + + if i.numOutboundStreams == 0 { + abort = true + return abort, errors.New("INIT ACK outbound stream request must be > 0") + } + + // An SCTP receiver MUST be able to receive a minimum of 1500 bytes in + // one SCTP packet. This means that an SCTP endpoint MUST NOT indicate + // less than 1500 bytes in its initial a_rwnd sent in the INIT or INIT + // ACK. + if i.advertisedReceiverWindowCredit < 1500 { + abort = true + return abort, errors.New("INIT ACK Advertised Receiver Window Credit (a_rwnd) must be >= 1500") + } + + return false, nil +} + +// String makes chunkInitAck printable +func (i *chunkInitAck) String() string { + return fmt.Sprintf("%s\n%s", i.chunkHeader, i.chunkInitCommon) +} diff --git a/vendor/github.com/pion/sctp/chunk_init_common.go b/vendor/github.com/pion/sctp/chunk_init_common.go new file mode 100644 index 0000000..f64be78 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_init_common.go @@ -0,0 +1,155 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + + "github.com/pkg/errors" +) + +/* +chunkInitCommon represents an SCTP Chunk body of type INIT and INIT ACK + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 1 | Chunk Flags | Chunk Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Initiate Tag | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Advertised Receiver Window Credit (a_rwnd) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Outbound Streams | Number of Inbound Streams | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Initial TSN | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | +| Optional/Variable-Length Parameters | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +The INIT chunk contains the following parameters. Unless otherwise +noted, each parameter MUST only be included once in the INIT chunk. + +Fixed Parameters Status +---------------------------------------------- +Initiate Tag Mandatory +Advertised Receiver Window Credit Mandatory +Number of Outbound Streams Mandatory +Number of Inbound Streams Mandatory +Initial TSN Mandatory +*/ + +type chunkInitCommon struct { + initiateTag uint32 + advertisedReceiverWindowCredit uint32 + numOutboundStreams uint16 + numInboundStreams uint16 + initialTSN uint32 + params []param +} + +const ( + initChunkMinLength = 16 + initOptionalVarHeaderLength = 4 +) + +func (i *chunkInitCommon) unmarshal(raw []byte) error { + i.initiateTag = binary.BigEndian.Uint32(raw[0:]) + i.advertisedReceiverWindowCredit = binary.BigEndian.Uint32(raw[4:]) + i.numOutboundStreams = binary.BigEndian.Uint16(raw[8:]) + i.numInboundStreams = binary.BigEndian.Uint16(raw[10:]) + i.initialTSN = binary.BigEndian.Uint32(raw[12:]) + + // https://tools.ietf.org/html/rfc4960#section-3.2.1 + // + // Chunk values of SCTP control chunks consist of a chunk-type-specific + // header of required fields, followed by zero or more parameters. The + // optional and variable-length parameters contained in a chunk are + // defined in a Type-Length-Value format as shown below. + // + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | Parameter Type | Parameter Length | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | | + // | Parameter Value | + // | | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + offset := initChunkMinLength + remaining := len(raw) - offset + for remaining > 0 { + if remaining > initOptionalVarHeaderLength { + pType, err := parseParamType(raw[offset:]) + if err != nil { + return errors.Wrap(err, "failed to parse param type") + } + p, err := buildParam(pType, raw[offset:]) + if err != nil { + return errors.Wrap(err, "Failed unmarshalling param in Init Chunk") + } + i.params = append(i.params, p) + padding := getPadding(p.length()) + offset += p.length() + padding + remaining -= p.length() + padding + } else { + break + } + } + + return nil +} + +func (i *chunkInitCommon) marshal() ([]byte, error) { + out := make([]byte, initChunkMinLength) + binary.BigEndian.PutUint32(out[0:], i.initiateTag) + binary.BigEndian.PutUint32(out[4:], i.advertisedReceiverWindowCredit) + binary.BigEndian.PutUint16(out[8:], i.numOutboundStreams) + binary.BigEndian.PutUint16(out[10:], i.numInboundStreams) + binary.BigEndian.PutUint32(out[12:], i.initialTSN) + for idx, p := range i.params { + pp, err := p.marshal() + if err != nil { + return nil, errors.Wrap(err, "Unable to marshal parameter for INIT/INITACK") + } + + out = append(out, pp...) + + // Chunks (including Type, Length, and Value fields) are padded out + // by the sender with all zero bytes to be a multiple of 4 bytes + // long. This padding MUST NOT be more than 3 bytes in total. The + // Chunk Length value does not include terminating padding of the + // chunk. *However, it does include padding of any variable-length + // parameter except the last parameter in the chunk.* The receiver + // MUST ignore the padding. + if idx != len(i.params)-1 { + out = padByte(out, getPadding(len(pp))) + } + } + + return out, nil +} + +// String makes chunkInitCommon printable +func (i chunkInitCommon) String() string { + format := `initiateTag: %d + advertisedReceiverWindowCredit: %d + numOutboundStreams: %d + numInboundStreams: %d + initialTSN: %d` + + res := fmt.Sprintf(format, + i.initiateTag, + i.advertisedReceiverWindowCredit, + i.numOutboundStreams, + i.numInboundStreams, + i.initialTSN, + ) + + for i, param := range i.params { + res += fmt.Sprintf("Param %d:\n %s", i, param) + } + return res +} diff --git a/vendor/github.com/pion/sctp/chunk_payload_data.go b/vendor/github.com/pion/sctp/chunk_payload_data.go new file mode 100644 index 0000000..3e5e346 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_payload_data.go @@ -0,0 +1,195 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + "time" +) + +/* +chunkPayloadData represents an SCTP Chunk of type DATA + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 0 | Reserved|U|B|E| Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| TSN | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Stream Identifier S | Stream Sequence Number n | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Payload Protocol Identifier | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | +| User Data (seq n of Stream S) | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +An unfragmented user message shall have both the B and E bits set to +'1'. Setting both B and E bits to '0' indicates a middle fragment of +a multi-fragment user message, as summarized in the following table: + B E Description +============================================================ +| 1 0 | First piece of a fragmented user message | ++----------------------------------------------------------+ +| 0 0 | Middle piece of a fragmented user message | ++----------------------------------------------------------+ +| 0 1 | Last piece of a fragmented user message | ++----------------------------------------------------------+ +| 1 1 | Unfragmented message | +============================================================ +| Table 1: Fragment Description Flags | +============================================================ +*/ +type chunkPayloadData struct { + chunkHeader + + unordered bool + beginningFragment bool + endingFragment bool + immediateSack bool + + tsn uint32 + streamIdentifier uint16 + streamSequenceNumber uint16 + payloadType PayloadProtocolIdentifier + userData []byte + + // Whether this data chunk was acknowledged (received by peer) + acked bool + missIndicator uint32 + + // Partial-reliability parameters used only by sender + since time.Time + nSent uint32 // number of transmission made for this chunk + _abandoned bool + _allInflight bool // valid only with the first fragment + + // Retransmission flag set when T1-RTX timeout occurred and this + // chunk is still in the inflight queue + retransmit bool + + head *chunkPayloadData // link to the head of the fragment +} + +const ( + payloadDataEndingFragmentBitmask = 1 + payloadDataBeginingFragmentBitmask = 2 + payloadDataUnorderedBitmask = 4 + payloadDataImmediateSACK = 8 + + payloadDataHeaderSize = 12 +) + +// PayloadProtocolIdentifier is an enum for DataChannel payload types +type PayloadProtocolIdentifier uint32 + +// PayloadProtocolIdentifier enums +// https://www.iana.org/assignments/sctp-parameters/sctp-parameters.xhtml#sctp-parameters-25 +const ( + PayloadTypeWebRTCDCEP PayloadProtocolIdentifier = 50 + PayloadTypeWebRTCString PayloadProtocolIdentifier = 51 + PayloadTypeWebRTCBinary PayloadProtocolIdentifier = 53 + PayloadTypeWebRTCStringEmpty PayloadProtocolIdentifier = 56 + PayloadTypeWebRTCBinaryEmpty PayloadProtocolIdentifier = 57 +) + +func (p PayloadProtocolIdentifier) String() string { + switch p { + case PayloadTypeWebRTCDCEP: + return "WebRTC DCEP" + case PayloadTypeWebRTCString: + return "WebRTC String" + case PayloadTypeWebRTCBinary: + return "WebRTC Binary" + case PayloadTypeWebRTCStringEmpty: + return "WebRTC String (Empty)" + case PayloadTypeWebRTCBinaryEmpty: + return "WebRTC Binary (Empty)" + default: + return fmt.Sprintf("Unknown Payload Protocol Identifier: %d", p) + } +} + +func (p *chunkPayloadData) unmarshal(raw []byte) error { + if err := p.chunkHeader.unmarshal(raw); err != nil { + return err + } + + p.immediateSack = p.flags&payloadDataImmediateSACK != 0 + p.unordered = p.flags&payloadDataUnorderedBitmask != 0 + p.beginningFragment = p.flags&payloadDataBeginingFragmentBitmask != 0 + p.endingFragment = p.flags&payloadDataEndingFragmentBitmask != 0 + + p.tsn = binary.BigEndian.Uint32(p.raw[0:]) + p.streamIdentifier = binary.BigEndian.Uint16(p.raw[4:]) + p.streamSequenceNumber = binary.BigEndian.Uint16(p.raw[6:]) + p.payloadType = PayloadProtocolIdentifier(binary.BigEndian.Uint32(p.raw[8:])) + p.userData = p.raw[payloadDataHeaderSize:] + + return nil +} + +func (p *chunkPayloadData) marshal() ([]byte, error) { + payRaw := make([]byte, payloadDataHeaderSize+len(p.userData)) + + binary.BigEndian.PutUint32(payRaw[0:], p.tsn) + binary.BigEndian.PutUint16(payRaw[4:], p.streamIdentifier) + binary.BigEndian.PutUint16(payRaw[6:], p.streamSequenceNumber) + binary.BigEndian.PutUint32(payRaw[8:], uint32(p.payloadType)) + copy(payRaw[payloadDataHeaderSize:], p.userData) + + flags := uint8(0) + if p.endingFragment { + flags = 1 + } + if p.beginningFragment { + flags |= 1 << 1 + } + if p.unordered { + flags |= 1 << 2 + } + if p.immediateSack { + flags |= 1 << 3 + } + + p.chunkHeader.flags = flags + p.chunkHeader.typ = ctPayloadData + p.chunkHeader.raw = payRaw + return p.chunkHeader.marshal() +} + +func (p *chunkPayloadData) check() (abort bool, err error) { + return false, nil +} + +// String makes chunkPayloadData printable +func (p *chunkPayloadData) String() string { + return fmt.Sprintf("%s\n%d", p.chunkHeader, p.tsn) +} + +func (p *chunkPayloadData) abandoned() bool { + if p.head != nil { + return p.head._abandoned && p.head._allInflight + } + return p._abandoned && p._allInflight +} + +func (p *chunkPayloadData) setAbandoned(abandoned bool) { + if p.head != nil { + p.head._abandoned = abandoned + return + } + p._abandoned = abandoned +} + +func (p *chunkPayloadData) setAllInflight() { + if p.endingFragment { + if p.head != nil { + p.head._allInflight = true + } else { + p._allInflight = true + } + } +} diff --git a/vendor/github.com/pion/sctp/chunk_reconfig.go b/vendor/github.com/pion/sctp/chunk_reconfig.go new file mode 100644 index 0000000..a53ad5e --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_reconfig.go @@ -0,0 +1,99 @@ +package sctp + +import ( + "fmt" + + "github.com/pkg/errors" +) + +// https://tools.ietf.org/html/rfc6525#section-3.1 +// chunkReconfig represents an SCTP Chunk used to reconfigure streams. +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Type = 130 | Chunk Flags | Chunk Length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// \ \ +// / Re-configuration Parameter / +// \ \ +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// \ \ +// / Re-configuration Parameter (optional) / +// \ \ +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +type chunkReconfig struct { + chunkHeader + paramA param + paramB param +} + +func (c *chunkReconfig) unmarshal(raw []byte) error { + if err := c.chunkHeader.unmarshal(raw); err != nil { + return err + } + pType, err := parseParamType(c.raw) + if err != nil { + return errors.Wrap(err, "failed to parse param type") + } + a, err := buildParam(pType, c.raw) + if err != nil { + return err + } + c.paramA = a + + padding := getPadding(a.length()) + offset := a.length() + padding + if len(c.raw) > offset { + pType, err := parseParamType(c.raw[offset:]) + if err != nil { + return errors.Wrap(err, "failed to parse param type") + } + b, err := buildParam(pType, c.raw[offset:]) + if err != nil { + return err + } + c.paramB = b + } + + return nil +} + +func (c *chunkReconfig) marshal() ([]byte, error) { + out, err := c.paramA.marshal() + if err != nil { + return nil, errors.Wrap(err, "Unable to marshal parameter A for reconfig") + } + if c.paramB != nil { + // Pad param A + out = padByte(out, getPadding(len(out))) + + outB, err := c.paramB.marshal() + if err != nil { + return nil, errors.Wrap(err, "Unable to marshal parameter B for reconfig") + } + + out = append(out, outB...) + } + + c.typ = ctReconfig + c.raw = out + return c.chunkHeader.marshal() +} + +func (c *chunkReconfig) check() (abort bool, err error) { + // nolint:godox + // TODO: check allowed combinations: + // https://tools.ietf.org/html/rfc6525#section-3.1 + return true, nil +} + +// String makes chunkReconfig printable +func (c *chunkReconfig) String() string { + res := fmt.Sprintf("Param A:\n %s", c.paramA) + if c.paramB != nil { + res += fmt.Sprintf("Param B:\n %s", c.paramB) + } + return res +} diff --git a/vendor/github.com/pion/sctp/chunk_selective_ack.go b/vendor/github.com/pion/sctp/chunk_selective_ack.go new file mode 100644 index 0000000..920d562 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunk_selective_ack.go @@ -0,0 +1,142 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + + "github.com/pkg/errors" +) + +/* +chunkSelectiveAck represents an SCTP Chunk of type SACK + +This chunk is sent to the peer endpoint to acknowledge received DATA +chunks and to inform the peer endpoint of gaps in the received +subsequences of DATA chunks as represented by their TSNs. +0 1 2 3 +0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type = 3 |Chunk Flags | Chunk Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Cumulative TSN Ack | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Advertised Receiver Window Credit (a_rwnd) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Gap Ack Blocks = N | Number of Duplicate TSNs = X | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Gap Ack Block #1 Start | Gap Ack Block #1 End | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ... \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Gap Ack Block #N Start | Gap Ack Block #N End | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Duplicate TSN 1 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ... \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Duplicate TSN X | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ + +type gapAckBlock struct { + start uint16 + end uint16 +} + +// String makes gapAckBlock printable +func (g gapAckBlock) String() string { + return fmt.Sprintf("%d - %d", g.start, g.end) +} + +type chunkSelectiveAck struct { + chunkHeader + cumulativeTSNAck uint32 + advertisedReceiverWindowCredit uint32 + gapAckBlocks []gapAckBlock + duplicateTSN []uint32 +} + +const ( + selectiveAckHeaderSize = 12 +) + +func (s *chunkSelectiveAck) unmarshal(raw []byte) error { + if err := s.chunkHeader.unmarshal(raw); err != nil { + return err + } + + if s.typ != ctSack { + return errors.Errorf("ChunkType is not of type SACK, actually is %s", s.typ.String()) + } + + if len(s.raw) < selectiveAckHeaderSize { + return errors.Errorf("SACK Chunk size is not large enough to contain header (%v remaining, needs %v bytes)", + len(s.raw), selectiveAckHeaderSize) + } + + s.cumulativeTSNAck = binary.BigEndian.Uint32(s.raw[0:]) + s.advertisedReceiverWindowCredit = binary.BigEndian.Uint32(s.raw[4:]) + s.gapAckBlocks = make([]gapAckBlock, binary.BigEndian.Uint16(s.raw[8:])) + s.duplicateTSN = make([]uint32, binary.BigEndian.Uint16(s.raw[10:])) + + if len(s.raw) != selectiveAckHeaderSize+(4*len(s.gapAckBlocks)+(4*len(s.duplicateTSN))) { + return errors.Errorf("SACK Chunk size does not match predicted amount from header values") + } + + offset := selectiveAckHeaderSize + for i := range s.gapAckBlocks { + s.gapAckBlocks[i].start = binary.BigEndian.Uint16(s.raw[offset:]) + s.gapAckBlocks[i].end = binary.BigEndian.Uint16(s.raw[offset+2:]) + offset += 4 + } + for i := range s.duplicateTSN { + s.duplicateTSN[i] = binary.BigEndian.Uint32(s.raw[offset:]) + offset += 4 + } + + return nil +} + +func (s *chunkSelectiveAck) marshal() ([]byte, error) { + sackRaw := make([]byte, selectiveAckHeaderSize+(4*len(s.gapAckBlocks)+(4*len(s.duplicateTSN)))) + binary.BigEndian.PutUint32(sackRaw[0:], s.cumulativeTSNAck) + binary.BigEndian.PutUint32(sackRaw[4:], s.advertisedReceiverWindowCredit) + binary.BigEndian.PutUint16(sackRaw[8:], uint16(len(s.gapAckBlocks))) + binary.BigEndian.PutUint16(sackRaw[10:], uint16(len(s.duplicateTSN))) + offset := selectiveAckHeaderSize + for _, g := range s.gapAckBlocks { + binary.BigEndian.PutUint16(sackRaw[offset:], g.start) + binary.BigEndian.PutUint16(sackRaw[offset+2:], g.end) + offset += 4 + } + for _, t := range s.duplicateTSN { + binary.BigEndian.PutUint32(sackRaw[offset:], t) + offset += 4 + } + + s.chunkHeader.typ = ctSack + s.chunkHeader.raw = sackRaw + return s.chunkHeader.marshal() +} + +func (s *chunkSelectiveAck) check() (abort bool, err error) { + return false, nil +} + +// String makes chunkSelectiveAck printable +func (s *chunkSelectiveAck) String() string { + res := fmt.Sprintf("SACK cumTsnAck=%d arwnd=%d dupTsn=%d", + s.cumulativeTSNAck, + s.advertisedReceiverWindowCredit, + s.duplicateTSN) + + for _, gap := range s.gapAckBlocks { + res = fmt.Sprintf("%s\n gap ack: %s", res, gap) + } + + return res +} diff --git a/vendor/github.com/pion/sctp/chunkheader.go b/vendor/github.com/pion/sctp/chunkheader.go new file mode 100644 index 0000000..a4a78db --- /dev/null +++ b/vendor/github.com/pion/sctp/chunkheader.go @@ -0,0 +1,90 @@ +package sctp + +import ( + "encoding/binary" + + "github.com/pkg/errors" +) + +/* +chunkHeader represents a SCTP Chunk header, defined in https://tools.ietf.org/html/rfc4960#section-3.2 +The figure below illustrates the field format for the chunks to be +transmitted in the SCTP packet. Each chunk is formatted with a Chunk +Type field, a chunk-specific Flag field, a Chunk Length field, and a +Value field. + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Chunk Type | Chunk Flags | Chunk Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | +| Chunk Value | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ +type chunkHeader struct { + typ chunkType + flags byte + raw []byte +} + +const ( + chunkHeaderSize = 4 +) + +func (c *chunkHeader) unmarshal(raw []byte) error { + if len(raw) < chunkHeaderSize { + return errors.Errorf("raw only %d bytes, %d is the minimum length for a SCTP chunk", len(raw), chunkHeaderSize) + } + + c.typ = chunkType(raw[0]) + c.flags = raw[1] + length := binary.BigEndian.Uint16(raw[2:]) + + // Length includes Chunk header + valueLength := int(length - chunkHeaderSize) + lengthAfterValue := len(raw) - (chunkHeaderSize + valueLength) + + if lengthAfterValue < 0 { + return errors.Errorf("Not enough data left in SCTP packet to satisfy requested length remain %d req %d ", valueLength, len(raw)-chunkHeaderSize) + } else if lengthAfterValue < 4 { + // https://tools.ietf.org/html/rfc4960#section-3.2 + // The Chunk Length field does not count any chunk padding. + // Chunks (including Type, Length, and Value fields) are padded out + // by the sender with all zero bytes to be a multiple of 4 bytes + // long. This padding MUST NOT be more than 3 bytes in total. The + // Chunk Length value does not include terminating padding of the + // chunk. However, it does include padding of any variable-length + // parameter except the last parameter in the chunk. The receiver + // MUST ignore the padding. + for i := lengthAfterValue; i > 0; i-- { + paddingOffset := chunkHeaderSize + valueLength + (i - 1) + if raw[paddingOffset] != 0 { + return errors.Errorf("Chunk padding is non-zero at offset %d ", paddingOffset) + } + } + } + + c.raw = raw[chunkHeaderSize : chunkHeaderSize+valueLength] + return nil +} + +func (c *chunkHeader) marshal() ([]byte, error) { + raw := make([]byte, 4+len(c.raw)) + + raw[0] = uint8(c.typ) + raw[1] = c.flags + binary.BigEndian.PutUint16(raw[2:], uint16(len(c.raw)+chunkHeaderSize)) + copy(raw[4:], c.raw) + return raw, nil +} + +func (c *chunkHeader) valueLength() int { + return len(c.raw) +} + +// String makes chunkHeader printable +func (c chunkHeader) String() string { + return c.typ.String() +} diff --git a/vendor/github.com/pion/sctp/chunktype.go b/vendor/github.com/pion/sctp/chunktype.go new file mode 100644 index 0000000..65b57a4 --- /dev/null +++ b/vendor/github.com/pion/sctp/chunktype.go @@ -0,0 +1,67 @@ +package sctp + +import "fmt" + +// chunkType is an enum for SCTP Chunk Type field +// This field identifies the type of information contained in the +// Chunk Value field. +type chunkType uint8 + +// List of known chunkType enums +const ( + ctPayloadData chunkType = 0 + ctInit chunkType = 1 + ctInitAck chunkType = 2 + ctSack chunkType = 3 + ctHeartbeat chunkType = 4 + ctHeartbeatAck chunkType = 5 + ctAbort chunkType = 6 + ctShutdown chunkType = 7 + ctShutdownAck chunkType = 8 + ctError chunkType = 9 + ctCookieEcho chunkType = 10 + ctCookieAck chunkType = 11 + ctCWR chunkType = 13 + ctShutdownComplete chunkType = 14 + ctReconfig chunkType = 130 + ctForwardTSN chunkType = 192 +) + +func (c chunkType) String() string { + switch c { + case ctPayloadData: + return "DATA" + case ctInit: + return "INIT" + case ctInitAck: + return "INIT-ACK" + case ctSack: + return "SACK" + case ctHeartbeat: + return "HEARTBEAT" + case ctHeartbeatAck: + return "HEARTBEAT-ACK" + case ctAbort: + return "ABORT" + case ctShutdown: + return "SHUTDOWN" + case ctShutdownAck: + return "SHUTDOWN-ACK" + case ctError: + return "ERROR" + case ctCookieEcho: + return "COOKIE-ECHO" + case ctCookieAck: + return "COOKIE-ACK" + case ctCWR: + return "ECNE" // Explicit Congestion Notification Echo + case ctShutdownComplete: + return "SHUTDOWN-COMPLETE" + case ctReconfig: + return "RECONFIG" // Re-configuration + case ctForwardTSN: + return "FORWARD-TSN" + default: + return fmt.Sprintf("Unknown ChunkType: %d", c) + } +} diff --git a/vendor/github.com/pion/sctp/codecov.yml b/vendor/github.com/pion/sctp/codecov.yml new file mode 100644 index 0000000..085200a --- /dev/null +++ b/vendor/github.com/pion/sctp/codecov.yml @@ -0,0 +1,20 @@ +# +# DO NOT EDIT THIS FILE +# +# It is automatically copied from https://github.com/pion/.goassets repository. +# + +coverage: + status: + project: + default: + # Allow decreasing 2% of total coverage to avoid noise. + threshold: 2% + patch: + default: + target: 70% + only_pulls: true + +ignore: + - "examples/*" + - "examples/**/*" diff --git a/vendor/github.com/pion/sctp/control_queue.go b/vendor/github.com/pion/sctp/control_queue.go new file mode 100644 index 0000000..7e13469 --- /dev/null +++ b/vendor/github.com/pion/sctp/control_queue.go @@ -0,0 +1,29 @@ +package sctp + +// control queue + +type controlQueue struct { + queue []*packet +} + +func newControlQueue() *controlQueue { + return &controlQueue{queue: []*packet{}} +} + +func (q *controlQueue) push(c *packet) { + q.queue = append(q.queue, c) +} + +func (q *controlQueue) pushAll(packets []*packet) { + q.queue = append(q.queue, packets...) +} + +func (q *controlQueue) popAll() []*packet { + packets := q.queue + q.queue = []*packet{} + return packets +} + +func (q *controlQueue) size() int { + return len(q.queue) +} diff --git a/vendor/github.com/pion/sctp/error_cause.go b/vendor/github.com/pion/sctp/error_cause.go new file mode 100644 index 0000000..e94cc5c --- /dev/null +++ b/vendor/github.com/pion/sctp/error_cause.go @@ -0,0 +1,91 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + + "github.com/pkg/errors" +) + +// errorCauseCode is a cause code that appears in either a ERROR or ABORT chunk +type errorCauseCode uint16 + +type errorCause interface { + unmarshal([]byte) error + marshal() ([]byte, error) + length() uint16 + String() string + + errorCauseCode() errorCauseCode +} + +// buildErrorCause delegates the building of a error cause from raw bytes to the correct structure +func buildErrorCause(raw []byte) (errorCause, error) { + var e errorCause + + c := errorCauseCode(binary.BigEndian.Uint16(raw[0:])) + switch c { + case invalidMandatoryParameter: + e = &errorCauseInvalidMandatoryParameter{} + case unrecognizedChunkType: + e = &errorCauseUnrecognizedChunkType{} + case protocolViolation: + e = &errorCauseProtocolViolation{} + default: + return nil, errors.Errorf("BuildErrorCause does not handle %s", c.String()) + } + + if err := e.unmarshal(raw); err != nil { + return nil, err + } + return e, nil +} + +const ( + invalidStreamIdentifier errorCauseCode = 1 + missingMandatoryParameter errorCauseCode = 2 + staleCookieError errorCauseCode = 3 + outOfResource errorCauseCode = 4 + unresolvableAddress errorCauseCode = 5 + unrecognizedChunkType errorCauseCode = 6 + invalidMandatoryParameter errorCauseCode = 7 + unrecognizedParameters errorCauseCode = 8 + noUserData errorCauseCode = 9 + cookieReceivedWhileShuttingDown errorCauseCode = 10 + restartOfAnAssociationWithNewAddresses errorCauseCode = 11 + userInitiatedAbort errorCauseCode = 12 + protocolViolation errorCauseCode = 13 +) + +func (e errorCauseCode) String() string { + switch e { + case invalidStreamIdentifier: + return "Invalid Stream Identifier" + case missingMandatoryParameter: + return "Missing Mandatory Parameter" + case staleCookieError: + return "Stale Cookie Error" + case outOfResource: + return "Out Of Resource" + case unresolvableAddress: + return "Unresolvable IP" + case unrecognizedChunkType: + return "Unrecognized Chunk Type" + case invalidMandatoryParameter: + return "Invalid Mandatory Parameter" + case unrecognizedParameters: + return "Unrecognized Parameters" + case noUserData: + return "No User Data" + case cookieReceivedWhileShuttingDown: + return "Cookie Received While Shutting Down" + case restartOfAnAssociationWithNewAddresses: + return "Restart Of An Association With New Addresses" + case userInitiatedAbort: + return "User Initiated Abort" + case protocolViolation: + return "Protocol Violation" + default: + return fmt.Sprintf("Unknown CauseCode: %d", e) + } +} diff --git a/vendor/github.com/pion/sctp/error_cause_header.go b/vendor/github.com/pion/sctp/error_cause_header.go new file mode 100644 index 0000000..1ad6e1d --- /dev/null +++ b/vendor/github.com/pion/sctp/error_cause_header.go @@ -0,0 +1,47 @@ +package sctp + +import ( + "encoding/binary" +) + +// errorCauseHeader represents the shared header that is shared by all error causes +type errorCauseHeader struct { + code errorCauseCode + len uint16 + raw []byte +} + +const ( + errorCauseHeaderLength = 4 +) + +func (e *errorCauseHeader) marshal() ([]byte, error) { + e.len = uint16(len(e.raw)) + uint16(errorCauseHeaderLength) + raw := make([]byte, e.len) + binary.BigEndian.PutUint16(raw[0:], uint16(e.code)) + binary.BigEndian.PutUint16(raw[2:], e.len) + copy(raw[errorCauseHeaderLength:], e.raw) + + return raw, nil +} + +func (e *errorCauseHeader) unmarshal(raw []byte) error { + e.code = errorCauseCode(binary.BigEndian.Uint16(raw[0:])) + e.len = binary.BigEndian.Uint16(raw[2:]) + valueLength := e.len - errorCauseHeaderLength + e.raw = raw[errorCauseHeaderLength : errorCauseHeaderLength+valueLength] + return nil +} + +func (e *errorCauseHeader) length() uint16 { + return e.len +} + +func (e *errorCauseHeader) errorCauseCode() errorCauseCode { + return e.code +} + +// String makes errorCauseHeader printable +func (e errorCauseHeader) String() string { + return e.code.String() +} diff --git a/vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go b/vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go new file mode 100644 index 0000000..3da8b47 --- /dev/null +++ b/vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go @@ -0,0 +1,19 @@ +package sctp + +// errorCauseInvalidMandatoryParameter represents an SCTP error cause +type errorCauseInvalidMandatoryParameter struct { + errorCauseHeader +} + +func (e *errorCauseInvalidMandatoryParameter) marshal() ([]byte, error) { + return e.errorCauseHeader.marshal() +} + +func (e *errorCauseInvalidMandatoryParameter) unmarshal(raw []byte) error { + return e.errorCauseHeader.unmarshal(raw) +} + +// String makes errorCauseInvalidMandatoryParameter printable +func (e *errorCauseInvalidMandatoryParameter) String() string { + return e.errorCauseHeader.String() +} diff --git a/vendor/github.com/pion/sctp/error_cause_protocol_violation.go b/vendor/github.com/pion/sctp/error_cause_protocol_violation.go new file mode 100644 index 0000000..8b457f4 --- /dev/null +++ b/vendor/github.com/pion/sctp/error_cause_protocol_violation.go @@ -0,0 +1,50 @@ +package sctp + +import ( + "fmt" + + "github.com/pkg/errors" +) + +/* + This error cause MAY be included in ABORT chunks that are sent + because an SCTP endpoint detects a protocol violation of the peer + that is not covered by the error causes described in Section 3.3.10.1 + to Section 3.3.10.12. An implementation MAY provide additional + information specifying what kind of protocol violation has been + detected. + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Cause Code=13 | Cause Length=Variable | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / Additional Information / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ +type errorCauseProtocolViolation struct { + errorCauseHeader + additionalInformation []byte +} + +func (e *errorCauseProtocolViolation) marshal() ([]byte, error) { + e.raw = e.additionalInformation + return e.errorCauseHeader.marshal() +} + +func (e *errorCauseProtocolViolation) unmarshal(raw []byte) error { + err := e.errorCauseHeader.unmarshal(raw) + if err != nil { + return errors.Wrap(err, "Unable to unmarshal Protocol Violation error") + } + + e.additionalInformation = e.raw + + return nil +} + +// String makes errorCauseProtocolViolation printable +func (e *errorCauseProtocolViolation) String() string { + return fmt.Sprintf("%s: %s", e.errorCauseHeader, e.additionalInformation) +} diff --git a/vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go b/vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go new file mode 100644 index 0000000..fee9a36 --- /dev/null +++ b/vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go @@ -0,0 +1,28 @@ +package sctp + +// errorCauseUnrecognizedChunkType represents an SCTP error cause +type errorCauseUnrecognizedChunkType struct { + errorCauseHeader + unrecognizedChunk []byte +} + +func (e *errorCauseUnrecognizedChunkType) marshal() ([]byte, error) { + e.code = unrecognizedChunkType + e.errorCauseHeader.raw = e.unrecognizedChunk + return e.errorCauseHeader.marshal() +} + +func (e *errorCauseUnrecognizedChunkType) unmarshal(raw []byte) error { + err := e.errorCauseHeader.unmarshal(raw) + if err != nil { + return err + } + + e.unrecognizedChunk = e.errorCauseHeader.raw + return nil +} + +// String makes errorCauseUnrecognizedChunkType printable +func (e *errorCauseUnrecognizedChunkType) String() string { + return e.errorCauseHeader.String() +} diff --git a/vendor/github.com/pion/sctp/go.mod b/vendor/github.com/pion/sctp/go.mod new file mode 100644 index 0000000..022e2fc --- /dev/null +++ b/vendor/github.com/pion/sctp/go.mod @@ -0,0 +1,14 @@ +module github.com/pion/sctp + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/pretty v0.1.0 // indirect + github.com/pion/logging v0.2.2 + github.com/pion/randutil v0.1.0 + github.com/pion/transport v0.10.1 + github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.6.1 + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect +) + +go 1.13 diff --git a/vendor/github.com/pion/sctp/go.sum b/vendor/github.com/pion/sctp/go.sum new file mode 100644 index 0000000..3744262 --- /dev/null +++ b/vendor/github.com/pion/sctp/go.sum @@ -0,0 +1,36 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/transport v0.10.1 h1:2W+yJT+0mOQ160ThZYUx5Zp2skzshiNgxrNE9GUfhJM= +github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/vendor/github.com/pion/sctp/packet.go b/vendor/github.com/pion/sctp/packet.go new file mode 100644 index 0000000..dcc650b --- /dev/null +++ b/vendor/github.com/pion/sctp/packet.go @@ -0,0 +1,178 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + "hash/crc32" + + "github.com/pkg/errors" +) + +// Create the crc32 table we'll use for the checksum +var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) // nolint:gochecknoglobals + +// Allocate and zero this data once. +// We need to use it for the checksum and don't want to allocate/clear each time. +var fourZeroes [4]byte // nolint:gochecknoglobals + +/* +Packet represents an SCTP packet, defined in https://tools.ietf.org/html/rfc4960#section-3 +An SCTP packet is composed of a common header and chunks. A chunk +contains either control information or user data. + + + SCTP Packet Format + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Common Header | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Chunk #1 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| ... | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Chunk #n | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + + SCTP Common Header Format + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Source Value Number | Destination Value Number | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Verification Tag | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Checksum | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +*/ +type packet struct { + sourcePort uint16 + destinationPort uint16 + verificationTag uint32 + chunks []chunk +} + +const ( + packetHeaderSize = 12 +) + +func (p *packet) unmarshal(raw []byte) error { + if len(raw) < packetHeaderSize { + return errors.Errorf("raw only %d bytes, %d is the minimum length for a SCTP packet", len(raw), packetHeaderSize) + } + + p.sourcePort = binary.BigEndian.Uint16(raw[0:]) + p.destinationPort = binary.BigEndian.Uint16(raw[2:]) + p.verificationTag = binary.BigEndian.Uint32(raw[4:]) + + offset := packetHeaderSize + for { + // Exact match, no more chunks + if offset == len(raw) { + break + } else if offset+chunkHeaderSize > len(raw) { + return errors.Errorf("Unable to parse SCTP chunk, not enough data for complete header: offset %d remaining %d", offset, len(raw)) + } + + var c chunk + switch chunkType(raw[offset]) { + case ctInit: + c = &chunkInit{} + case ctInitAck: + c = &chunkInitAck{} + case ctAbort: + c = &chunkAbort{} + case ctCookieEcho: + c = &chunkCookieEcho{} + case ctCookieAck: + c = &chunkCookieAck{} + case ctHeartbeat: + c = &chunkHeartbeat{} + case ctPayloadData: + c = &chunkPayloadData{} + case ctSack: + c = &chunkSelectiveAck{} + case ctReconfig: + c = &chunkReconfig{} + case ctForwardTSN: + c = &chunkForwardTSN{} + case ctError: + c = &chunkError{} + default: + return errors.Errorf("Failed to unmarshal, contains unknown chunk type %s", chunkType(raw[offset]).String()) + } + + if err := c.unmarshal(raw[offset:]); err != nil { + return err + } + + p.chunks = append(p.chunks, c) + chunkValuePadding := getPadding(c.valueLength()) + offset += chunkHeaderSize + c.valueLength() + chunkValuePadding + } + theirChecksum := binary.LittleEndian.Uint32(raw[8:]) + ourChecksum := generatePacketChecksum(raw) + if theirChecksum != ourChecksum { + return errors.Errorf("Checksum mismatch theirs: %d ours: %d", theirChecksum, ourChecksum) + } + return nil +} + +func (p *packet) marshal() ([]byte, error) { + raw := make([]byte, packetHeaderSize) + + // Populate static headers + // 8-12 is Checksum which will be populated when packet is complete + binary.BigEndian.PutUint16(raw[0:], p.sourcePort) + binary.BigEndian.PutUint16(raw[2:], p.destinationPort) + binary.BigEndian.PutUint32(raw[4:], p.verificationTag) + + // Populate chunks + for _, c := range p.chunks { + chunkRaw, err := c.marshal() + if err != nil { + return nil, err + } + raw = append(raw, chunkRaw...) + + paddingNeeded := getPadding(len(raw)) + if paddingNeeded != 0 { + raw = append(raw, make([]byte, paddingNeeded)...) + } + } + + // Checksum is already in BigEndian + // Using LittleEndian.PutUint32 stops it from being flipped + binary.LittleEndian.PutUint32(raw[8:], generatePacketChecksum(raw)) + return raw, nil +} + +func generatePacketChecksum(raw []byte) (sum uint32) { + // Fastest way to do a crc32 without allocating. + sum = crc32.Update(sum, castagnoliTable, raw[0:8]) + sum = crc32.Update(sum, castagnoliTable, fourZeroes[:]) + sum = crc32.Update(sum, castagnoliTable, raw[12:]) + return sum +} + +// String makes packet printable +func (p *packet) String() string { + format := `Packet: + sourcePort: %d + destinationPort: %d + verificationTag: %d + ` + res := fmt.Sprintf(format, + p.sourcePort, + p.destinationPort, + p.verificationTag, + ) + for i, chunk := range p.chunks { + res += fmt.Sprintf("Chunk %d:\n %s", i, chunk) + } + return res +} diff --git a/vendor/github.com/pion/sctp/param.go b/vendor/github.com/pion/sctp/param.go new file mode 100644 index 0000000..08e46d1 --- /dev/null +++ b/vendor/github.com/pion/sctp/param.go @@ -0,0 +1,35 @@ +package sctp + +import ( + "github.com/pkg/errors" +) + +type param interface { + marshal() ([]byte, error) + length() int +} + +func buildParam(t paramType, rawParam []byte) (param, error) { + switch t { + case forwardTSNSupp: + return (¶mForwardTSNSupported{}).unmarshal(rawParam) + case supportedExt: + return (¶mSupportedExtensions{}).unmarshal(rawParam) + case random: + return (¶mRandom{}).unmarshal(rawParam) + case reqHMACAlgo: + return (¶mRequestedHMACAlgorithm{}).unmarshal(rawParam) + case chunkList: + return (¶mChunkList{}).unmarshal(rawParam) + case stateCookie: + return (¶mStateCookie{}).unmarshal(rawParam) + case heartbeatInfo: + return (¶mHeartbeatInfo{}).unmarshal(rawParam) + case outSSNResetReq: + return (¶mOutgoingResetRequest{}).unmarshal(rawParam) + case reconfigResp: + return (¶mReconfigResponse{}).unmarshal(rawParam) + default: + return nil, errors.Errorf("Unhandled ParamType %v", t) + } +} diff --git a/vendor/github.com/pion/sctp/param_chunk_list.go b/vendor/github.com/pion/sctp/param_chunk_list.go new file mode 100644 index 0000000..4ea484c --- /dev/null +++ b/vendor/github.com/pion/sctp/param_chunk_list.go @@ -0,0 +1,28 @@ +package sctp + +type paramChunkList struct { + paramHeader + chunkTypes []chunkType +} + +func (c *paramChunkList) marshal() ([]byte, error) { + c.typ = chunkList + c.raw = make([]byte, len(c.chunkTypes)) + for i, t := range c.chunkTypes { + c.raw[i] = byte(t) + } + + return c.paramHeader.marshal() +} + +func (c *paramChunkList) unmarshal(raw []byte) (param, error) { + err := c.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + for _, t := range c.raw { + c.chunkTypes = append(c.chunkTypes, chunkType(t)) + } + + return c, nil +} diff --git a/vendor/github.com/pion/sctp/param_forward_tsn_supported.go b/vendor/github.com/pion/sctp/param_forward_tsn_supported.go new file mode 100644 index 0000000..62de155 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_forward_tsn_supported.go @@ -0,0 +1,28 @@ +package sctp + +// At the initialization of the association, the sender of the INIT or +// INIT ACK chunk MAY include this OPTIONAL parameter to inform its peer +// that it is able to support the Forward TSN chunk +// +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Parameter Type = 49152 | Parameter Length = 4 | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +type paramForwardTSNSupported struct { + paramHeader +} + +func (f *paramForwardTSNSupported) marshal() ([]byte, error) { + f.typ = forwardTSNSupp + f.raw = []byte{} + return f.paramHeader.marshal() +} + +func (f *paramForwardTSNSupported) unmarshal(raw []byte) (param, error) { + err := f.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + return f, nil +} diff --git a/vendor/github.com/pion/sctp/param_heartbeat_info.go b/vendor/github.com/pion/sctp/param_heartbeat_info.go new file mode 100644 index 0000000..47f64eb --- /dev/null +++ b/vendor/github.com/pion/sctp/param_heartbeat_info.go @@ -0,0 +1,21 @@ +package sctp + +type paramHeartbeatInfo struct { + paramHeader + heartbeatInformation []byte +} + +func (h *paramHeartbeatInfo) marshal() ([]byte, error) { + h.typ = heartbeatInfo + h.raw = h.heartbeatInformation + return h.paramHeader.marshal() +} + +func (h *paramHeartbeatInfo) unmarshal(raw []byte) (param, error) { + err := h.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + h.heartbeatInformation = h.raw + return h, nil +} diff --git a/vendor/github.com/pion/sctp/param_outgoing_reset_request.go b/vendor/github.com/pion/sctp/param_outgoing_reset_request.go new file mode 100644 index 0000000..ceae178 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_outgoing_reset_request.go @@ -0,0 +1,88 @@ +package sctp + +import ( + "encoding/binary" + "errors" +) + +const ( + paramOutgoingResetRequestStreamIdentifiersOffset = 12 +) + +// This parameter is used by the sender to request the reset of some or +// all outgoing streams. +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Parameter Type = 13 | Parameter Length = 16 + 2 * N | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Re-configuration Request Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Re-configuration Response Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Sender's Last Assigned TSN | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Stream Number 1 (optional) | Stream Number 2 (optional) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// / ...... / +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Stream Number N-1 (optional) | Stream Number N (optional) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +type paramOutgoingResetRequest struct { + paramHeader + // reconfigRequestSequenceNumber is used to identify the request. It is a monotonically + // increasing number that is initialized to the same value as the + // initial TSN. It is increased by 1 whenever sending a new Re- + // configuration Request Parameter. + reconfigRequestSequenceNumber uint32 + // When this Outgoing SSN Reset Request Parameter is sent in response + // to an Incoming SSN Reset Request Parameter, this parameter is also + // an implicit response to the incoming request. This field then + // holds the Re-configuration Request Sequence Number of the incoming + // request. In other cases, it holds the next expected + // Re-configuration Request Sequence Number minus 1. + reconfigResponseSequenceNumber uint32 + // This value holds the next TSN minus 1 -- in other words, the last + // TSN that this sender assigned. + senderLastTSN uint32 + // This optional field, if included, is used to indicate specific + // streams that are to be reset. If no streams are listed, then all + // streams are to be reset. + streamIdentifiers []uint16 +} + +var errSSNResetRequestParamTooShort = errors.New("outgoing SSN reset request parameter too short") + +func (r *paramOutgoingResetRequest) marshal() ([]byte, error) { + r.typ = outSSNResetReq + r.raw = make([]byte, paramOutgoingResetRequestStreamIdentifiersOffset+2*len(r.streamIdentifiers)) + binary.BigEndian.PutUint32(r.raw, r.reconfigRequestSequenceNumber) + binary.BigEndian.PutUint32(r.raw[4:], r.reconfigResponseSequenceNumber) + binary.BigEndian.PutUint32(r.raw[8:], r.senderLastTSN) + for i, sID := range r.streamIdentifiers { + binary.BigEndian.PutUint16(r.raw[paramOutgoingResetRequestStreamIdentifiersOffset+2*i:], sID) + } + return r.paramHeader.marshal() +} + +func (r *paramOutgoingResetRequest) unmarshal(raw []byte) (param, error) { + err := r.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + if len(r.raw) < paramOutgoingResetRequestStreamIdentifiersOffset { + return nil, errSSNResetRequestParamTooShort + } + r.reconfigRequestSequenceNumber = binary.BigEndian.Uint32(r.raw) + r.reconfigResponseSequenceNumber = binary.BigEndian.Uint32(r.raw[4:]) + r.senderLastTSN = binary.BigEndian.Uint32(r.raw[8:]) + + lim := (len(r.raw) - paramOutgoingResetRequestStreamIdentifiersOffset) / 2 + r.streamIdentifiers = make([]uint16, lim) + for i := 0; i < lim; i++ { + r.streamIdentifiers[i] = binary.BigEndian.Uint16(r.raw[paramOutgoingResetRequestStreamIdentifiersOffset+2*i:]) + } + + return r, nil +} diff --git a/vendor/github.com/pion/sctp/param_random.go b/vendor/github.com/pion/sctp/param_random.go new file mode 100644 index 0000000..dc454b3 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_random.go @@ -0,0 +1,21 @@ +package sctp + +type paramRandom struct { + paramHeader + randomData []byte +} + +func (r *paramRandom) marshal() ([]byte, error) { + r.typ = random + r.raw = r.randomData + return r.paramHeader.marshal() +} + +func (r *paramRandom) unmarshal(raw []byte) (param, error) { + err := r.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + r.randomData = r.raw + return r, nil +} diff --git a/vendor/github.com/pion/sctp/param_reconfig_response.go b/vendor/github.com/pion/sctp/param_reconfig_response.go new file mode 100644 index 0000000..d9eab55 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_reconfig_response.go @@ -0,0 +1,92 @@ +package sctp + +import ( + "encoding/binary" + "errors" + "fmt" +) + +// This parameter is used by the receiver of a Re-configuration Request +// Parameter to respond to the request. +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Parameter Type = 16 | Parameter Length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Re-configuration Response Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Result | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Sender's Next TSN (optional) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Receiver's Next TSN (optional) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +type paramReconfigResponse struct { + paramHeader + // This value is copied from the request parameter and is used by the + // receiver of the Re-configuration Response Parameter to tie the + // response to the request. + reconfigResponseSequenceNumber uint32 + // This value describes the result of the processing of the request. + result reconfigResult +} + +type reconfigResult uint32 + +const ( + reconfigResultSuccessNOP reconfigResult = 0 + reconfigResultSuccessPerformed reconfigResult = 1 + reconfigResultDenied reconfigResult = 2 + reconfigResultErrorWrongSSN reconfigResult = 3 + reconfigResultErrorRequestAlreadyInProgress reconfigResult = 4 + reconfigResultErrorBadSequenceNumber reconfigResult = 5 + reconfigResultInProgress reconfigResult = 6 +) + +var errReconfigRespParamTooShort = errors.New("reconfig response parameter too short") + +func (t reconfigResult) String() string { + switch t { + case reconfigResultSuccessNOP: + return "0: Success - Nothing to do" + case reconfigResultSuccessPerformed: + return "1: Success - Performed" + case reconfigResultDenied: + return "2: Denied" + case reconfigResultErrorWrongSSN: + return "3: Error - Wrong SSN" + case reconfigResultErrorRequestAlreadyInProgress: + return "4: Error - Request already in progress" + case reconfigResultErrorBadSequenceNumber: + return "5: Error - Bad Sequence Number" + case reconfigResultInProgress: + return "6: In progress" + default: + return fmt.Sprintf("Unknown reconfigResult: %d", t) + } +} + +func (r *paramReconfigResponse) marshal() ([]byte, error) { + r.typ = reconfigResp + r.raw = make([]byte, 8) + binary.BigEndian.PutUint32(r.raw, r.reconfigResponseSequenceNumber) + binary.BigEndian.PutUint32(r.raw[4:], uint32(r.result)) + + return r.paramHeader.marshal() +} + +func (r *paramReconfigResponse) unmarshal(raw []byte) (param, error) { + err := r.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + if len(r.raw) < 8 { + return nil, errReconfigRespParamTooShort + } + r.reconfigResponseSequenceNumber = binary.BigEndian.Uint32(r.raw) + r.result = reconfigResult(binary.BigEndian.Uint32(r.raw[4:])) + + return r, nil +} diff --git a/vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go b/vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go new file mode 100644 index 0000000..b520fe3 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go @@ -0,0 +1,73 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + + "github.com/pkg/errors" +) + +type hmacAlgorithm uint16 + +const ( + hmacResv1 hmacAlgorithm = 0 + hmacSHA128 = 1 + hmacResv2 hmacAlgorithm = 2 + hmacSHA256 hmacAlgorithm = 3 +) + +func (c hmacAlgorithm) String() string { + switch c { + case hmacResv1: + return "HMAC Reserved (0x00)" + case hmacSHA128: + return "HMAC SHA-128" + case hmacResv2: + return "HMAC Reserved (0x02)" + case hmacSHA256: + return "HMAC SHA-256" + default: + return fmt.Sprintf("Unknown HMAC Algorithm type: %d", c) + } +} + +type paramRequestedHMACAlgorithm struct { + paramHeader + availableAlgorithms []hmacAlgorithm +} + +func (r *paramRequestedHMACAlgorithm) marshal() ([]byte, error) { + r.typ = reqHMACAlgo + r.raw = make([]byte, len(r.availableAlgorithms)*2) + i := 0 + for _, a := range r.availableAlgorithms { + binary.BigEndian.PutUint16(r.raw[i:], uint16(a)) + i += 2 + } + + return r.paramHeader.marshal() +} + +func (r *paramRequestedHMACAlgorithm) unmarshal(raw []byte) (param, error) { + err := r.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + + i := 0 + for i < len(r.raw) { + a := hmacAlgorithm(binary.BigEndian.Uint16(r.raw[i:])) + switch a { + case hmacSHA128: + fallthrough + case hmacSHA256: + r.availableAlgorithms = append(r.availableAlgorithms, a) + default: + return nil, errors.Errorf("Invalid algorithm type '%v'", a) + } + + i += 2 + } + + return r, nil +} diff --git a/vendor/github.com/pion/sctp/param_state_cookie.go b/vendor/github.com/pion/sctp/param_state_cookie.go new file mode 100644 index 0000000..9681267 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_state_cookie.go @@ -0,0 +1,46 @@ +package sctp + +import ( + "crypto/rand" + "fmt" +) + +type paramStateCookie struct { + paramHeader + cookie []byte +} + +func newRandomStateCookie() (*paramStateCookie, error) { + randCookie := make([]byte, 32) + _, err := rand.Read(randCookie) + // crypto/rand.Read returns n == len(b) if and only if err == nil. + if err != nil { + return nil, err + } + + s := ¶mStateCookie{ + cookie: randCookie, + } + + return s, nil +} + +func (s *paramStateCookie) marshal() ([]byte, error) { + s.typ = stateCookie + s.raw = s.cookie + return s.paramHeader.marshal() +} + +func (s *paramStateCookie) unmarshal(raw []byte) (param, error) { + err := s.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + s.cookie = s.raw + return s, nil +} + +// String makes paramStateCookie printable +func (s *paramStateCookie) String() string { + return fmt.Sprintf("%s: %s", s.paramHeader, s.cookie) +} diff --git a/vendor/github.com/pion/sctp/param_supported_extensions.go b/vendor/github.com/pion/sctp/param_supported_extensions.go new file mode 100644 index 0000000..2935524 --- /dev/null +++ b/vendor/github.com/pion/sctp/param_supported_extensions.go @@ -0,0 +1,29 @@ +package sctp + +type paramSupportedExtensions struct { + paramHeader + ChunkTypes []chunkType +} + +func (s *paramSupportedExtensions) marshal() ([]byte, error) { + s.typ = supportedExt + s.raw = make([]byte, len(s.ChunkTypes)) + for i, c := range s.ChunkTypes { + s.raw[i] = byte(c) + } + + return s.paramHeader.marshal() +} + +func (s *paramSupportedExtensions) unmarshal(raw []byte) (param, error) { + err := s.paramHeader.unmarshal(raw) + if err != nil { + return nil, err + } + + for _, t := range s.raw { + s.ChunkTypes = append(s.ChunkTypes, chunkType(t)) + } + + return s, nil +} diff --git a/vendor/github.com/pion/sctp/paramheader.go b/vendor/github.com/pion/sctp/paramheader.go new file mode 100644 index 0000000..d88642b --- /dev/null +++ b/vendor/github.com/pion/sctp/paramheader.go @@ -0,0 +1,63 @@ +package sctp + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + + "github.com/pkg/errors" +) + +type paramHeader struct { + typ paramType + len int + raw []byte +} + +const ( + paramHeaderLength = 4 +) + +func (p *paramHeader) marshal() ([]byte, error) { + paramLengthPlusHeader := paramHeaderLength + len(p.raw) + + rawParam := make([]byte, paramLengthPlusHeader) + binary.BigEndian.PutUint16(rawParam[0:], uint16(p.typ)) + binary.BigEndian.PutUint16(rawParam[2:], uint16(paramLengthPlusHeader)) + copy(rawParam[paramHeaderLength:], p.raw) + + return rawParam, nil +} + +func (p *paramHeader) unmarshal(raw []byte) error { + if len(raw) < paramHeaderLength { + return errors.New("param header too short") + } + + paramLengthPlusHeader := binary.BigEndian.Uint16(raw[2:]) + if int(paramLengthPlusHeader) < paramHeaderLength { + return errors.Errorf("param self reported length (%d) smaller than header length (%d)", int(paramLengthPlusHeader), paramHeaderLength) + } + if len(raw) < int(paramLengthPlusHeader) { + return errors.Errorf("param length (%d) shorter than its self reported length (%d)", len(raw), int(paramLengthPlusHeader)) + } + + typ, err := parseParamType(raw[0:]) + if err != nil { + return errors.Wrap(err, "failed to parse param type") + } + p.typ = typ + p.raw = raw[paramHeaderLength:paramLengthPlusHeader] + p.len = int(paramLengthPlusHeader) + + return nil +} + +func (p *paramHeader) length() int { + return p.len +} + +// String makes paramHeader printable +func (p paramHeader) String() string { + return fmt.Sprintf("%s (%d): %s", p.typ, p.len, hex.Dump(p.raw)) +} diff --git a/vendor/github.com/pion/sctp/paramtype.go b/vendor/github.com/pion/sctp/paramtype.go new file mode 100644 index 0000000..bb0ee82 --- /dev/null +++ b/vendor/github.com/pion/sctp/paramtype.go @@ -0,0 +1,106 @@ +package sctp + +import ( + "encoding/binary" + "fmt" + + "github.com/pkg/errors" +) + +// paramType represents a SCTP INIT/INITACK parameter +type paramType uint16 + +const ( + heartbeatInfo paramType = 1 // Heartbeat Info [RFC4960] + ipV4Addr paramType = 5 // IPv4 IP [RFC4960] + ipV6Addr paramType = 6 // IPv6 IP [RFC4960] + stateCookie paramType = 7 // State Cookie [RFC4960] + unrecognizedParam paramType = 8 // Unrecognized Parameters [RFC4960] + cookiePreservative paramType = 9 // Cookie Preservative [RFC4960] + hostNameAddr paramType = 11 // Host Name IP [RFC4960] + supportedAddrTypes paramType = 12 // Supported IP Types [RFC4960] + outSSNResetReq paramType = 13 // Outgoing SSN Reset Request Parameter [RFC6525] + incSSNResetReq paramType = 14 // Incoming SSN Reset Request Parameter [RFC6525] + ssnTSNResetReq paramType = 15 // SSN/TSN Reset Request Parameter [RFC6525] + reconfigResp paramType = 16 // Re-configuration Response Parameter [RFC6525] + addOutStreamsReq paramType = 17 // Add Outgoing Streams Request Parameter [RFC6525] + addIncStreamsReq paramType = 18 // Add Incoming Streams Request Parameter [RFC6525] + random paramType = 32770 // Random (0x8002) [RFC4805] + chunkList paramType = 32771 // Chunk List (0x8003) [RFC4895] + reqHMACAlgo paramType = 32772 // Requested HMAC Algorithm Parameter (0x8004) [RFC4895] + padding paramType = 32773 // Padding (0x8005) + supportedExt paramType = 32776 // Supported Extensions (0x8008) [RFC5061] + forwardTSNSupp paramType = 49152 // Forward TSN supported (0xC000) [RFC3758] + addIPAddr paramType = 49153 // Add IP IP (0xC001) [RFC5061] + delIPAddr paramType = 49154 // Delete IP IP (0xC002) [RFC5061] + errClauseInd paramType = 49155 // Error Cause Indication (0xC003) [RFC5061] + setPriAddr paramType = 49156 // Set Primary IP (0xC004) [RFC5061] + successInd paramType = 49157 // Success Indication (0xC005) [RFC5061] + adaptLayerInd paramType = 49158 // Adaptation Layer Indication (0xC006) [RFC5061] +) + +func parseParamType(raw []byte) (paramType, error) { + if len(raw) < 2 { + return paramType(0), errors.New("packet to short") + } + return paramType(binary.BigEndian.Uint16(raw)), nil +} + +func (p paramType) String() string { + switch p { + case heartbeatInfo: + return "Heartbeat Info" + case ipV4Addr: + return "IPv4 IP" + case ipV6Addr: + return "IPv6 IP" + case stateCookie: + return "State Cookie" + case unrecognizedParam: + return "Unrecognized Parameters" + case cookiePreservative: + return "Cookie Preservative" + case hostNameAddr: + return "Host Name IP" + case supportedAddrTypes: + return "Supported IP Types" + case outSSNResetReq: + return "Outgoing SSN Reset Request Parameter" + case incSSNResetReq: + return "Incoming SSN Reset Request Parameter" + case ssnTSNResetReq: + return "SSN/TSN Reset Request Parameter" + case reconfigResp: + return "Re-configuration Response Parameter" + case addOutStreamsReq: + return "Add Outgoing Streams Request Parameter" + case addIncStreamsReq: + return "Add Incoming Streams Request Parameter" + case random: + return "Random" + case chunkList: + return "Chunk List" + case reqHMACAlgo: + return "Requested HMAC Algorithm Parameter" + case padding: + return "Padding" + case supportedExt: + return "Supported Extensions" + case forwardTSNSupp: + return "Forward TSN supported" + case addIPAddr: + return "Add IP IP" + case delIPAddr: + return "Delete IP IP" + case errClauseInd: + return "Error Cause Indication" + case setPriAddr: + return "Set Primary IP" + case successInd: + return "Success Indication" + case adaptLayerInd: + return "Adaptation Layer Indication" + default: + return fmt.Sprintf("Unknown ParamType: %d", p) + } +} diff --git a/vendor/github.com/pion/sctp/payload_queue.go b/vendor/github.com/pion/sctp/payload_queue.go new file mode 100644 index 0000000..2d1a35a --- /dev/null +++ b/vendor/github.com/pion/sctp/payload_queue.go @@ -0,0 +1,179 @@ +package sctp + +import ( + "fmt" + "sort" +) + +type payloadQueue struct { + chunkMap map[uint32]*chunkPayloadData + sorted []uint32 + dupTSN []uint32 + nBytes int +} + +func newPayloadQueue() *payloadQueue { + return &payloadQueue{chunkMap: map[uint32]*chunkPayloadData{}} +} + +func (q *payloadQueue) updateSortedKeys() { + if q.sorted != nil { + return + } + + q.sorted = make([]uint32, len(q.chunkMap)) + i := 0 + for k := range q.chunkMap { + q.sorted[i] = k + i++ + } + + sort.Slice(q.sorted, func(i, j int) bool { + return sna32LT(q.sorted[i], q.sorted[j]) + }) +} + +func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool { + _, ok := q.chunkMap[p.tsn] + if ok || sna32LTE(p.tsn, cumulativeTSN) { + return false + } + return true +} + +func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) { + q.chunkMap[p.tsn] = p + q.nBytes += len(p.userData) + q.sorted = nil +} + +// push pushes a payload data. If the payload data is already in our queue or +// older than our cumulativeTSN marker, it will be recored as duplications, +// which can later be retrieved using popDuplicates. +func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool { + _, ok := q.chunkMap[p.tsn] + if ok || sna32LTE(p.tsn, cumulativeTSN) { + // Found the packet, log in dups + q.dupTSN = append(q.dupTSN, p.tsn) + return false + } + + q.chunkMap[p.tsn] = p + q.nBytes += len(p.userData) + q.sorted = nil + return true +} + +// pop pops only if the oldest chunk's TSN matches the given TSN. +func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) { + q.updateSortedKeys() + + if len(q.chunkMap) > 0 && tsn == q.sorted[0] { + q.sorted = q.sorted[1:] + if c, ok := q.chunkMap[tsn]; ok { + delete(q.chunkMap, tsn) + q.nBytes -= len(c.userData) + return c, true + } + } + + return nil, false +} + +// get returns reference to chunkPayloadData with the given TSN value. +func (q *payloadQueue) get(tsn uint32) (*chunkPayloadData, bool) { + c, ok := q.chunkMap[tsn] + return c, ok +} + +// popDuplicates returns an array of TSN values that were found duplicate. +func (q *payloadQueue) popDuplicates() []uint32 { + dups := q.dupTSN + q.dupTSN = []uint32{} + return dups +} + +func (q *payloadQueue) getGapAckBlocks(cumulativeTSN uint32) (gapAckBlocks []gapAckBlock) { + var b gapAckBlock + + if len(q.chunkMap) == 0 { + return []gapAckBlock{} + } + + q.updateSortedKeys() + + for i, tsn := range q.sorted { + if i == 0 { + b.start = uint16(tsn - cumulativeTSN) + b.end = b.start + continue + } + diff := uint16(tsn - cumulativeTSN) + if b.end+1 == diff { + b.end++ + } else { + gapAckBlocks = append(gapAckBlocks, gapAckBlock{ + start: b.start, + end: b.end, + }) + b.start = diff + b.end = diff + } + } + + gapAckBlocks = append(gapAckBlocks, gapAckBlock{ + start: b.start, + end: b.end, + }) + + return gapAckBlocks +} + +func (q *payloadQueue) getGapAckBlocksString(cumulativeTSN uint32) string { + gapAckBlocks := q.getGapAckBlocks(cumulativeTSN) + str := fmt.Sprintf("cumTSN=%d", cumulativeTSN) + for _, b := range gapAckBlocks { + str += fmt.Sprintf(",%d-%d", b.start, b.end) + } + return str +} + +func (q *payloadQueue) markAsAcked(tsn uint32) int { + var nBytesAcked int + if c, ok := q.chunkMap[tsn]; ok { + c.acked = true + c.retransmit = false + nBytesAcked = len(c.userData) + q.nBytes -= nBytesAcked + c.userData = []byte{} + } + + return nBytesAcked +} + +func (q *payloadQueue) getLastTSNReceived() (uint32, bool) { + q.updateSortedKeys() + + qlen := len(q.sorted) + if qlen == 0 { + return 0, false + } + return q.sorted[qlen-1], true +} + +func (q *payloadQueue) markAllToRetrasmit() { + for _, c := range q.chunkMap { + if c.acked || c.abandoned() { + continue + } + c.retransmit = true + } +} + +func (q *payloadQueue) getNumBytes() int { + return q.nBytes +} + +func (q *payloadQueue) size() int { + return len(q.chunkMap) +} diff --git a/vendor/github.com/pion/sctp/pending_queue.go b/vendor/github.com/pion/sctp/pending_queue.go new file mode 100644 index 0000000..5f204d3 --- /dev/null +++ b/vendor/github.com/pion/sctp/pending_queue.go @@ -0,0 +1,138 @@ +package sctp + +import ( + "github.com/pkg/errors" +) + +// pendingBaseQueue + +type pendingBaseQueue struct { + queue []*chunkPayloadData +} + +func newPendingBaseQueue() *pendingBaseQueue { + return &pendingBaseQueue{queue: []*chunkPayloadData{}} +} + +func (q *pendingBaseQueue) push(c *chunkPayloadData) { + q.queue = append(q.queue, c) +} + +func (q *pendingBaseQueue) pop() *chunkPayloadData { + if len(q.queue) == 0 { + return nil + } + c := q.queue[0] + q.queue = q.queue[1:] + return c +} + +func (q *pendingBaseQueue) get(i int) *chunkPayloadData { + if len(q.queue) == 0 || i < 0 || i >= len(q.queue) { + return nil + } + return q.queue[i] +} + +func (q *pendingBaseQueue) size() int { + return len(q.queue) +} + +// pendingQueue + +type pendingQueue struct { + unorderedQueue *pendingBaseQueue + orderedQueue *pendingBaseQueue + nBytes int + selected bool + unorderedIsSelected bool +} + +var ( + errUnexpectedChuckPoppedUnordered = errors.New("unexpected chunk popped (unordered)") + errUnexpectedChuckPoppedOrdered = errors.New("unexpected chunk popped (ordered)") + errUnexpectedQState = errors.New("unexpected q state (should've been selected)") +) + +func newPendingQueue() *pendingQueue { + return &pendingQueue{ + unorderedQueue: newPendingBaseQueue(), + orderedQueue: newPendingBaseQueue(), + } +} + +func (q *pendingQueue) push(c *chunkPayloadData) { + if c.unordered { + q.unorderedQueue.push(c) + } else { + q.orderedQueue.push(c) + } + q.nBytes += len(c.userData) +} + +func (q *pendingQueue) peek() *chunkPayloadData { + if q.selected { + if q.unorderedIsSelected { + return q.unorderedQueue.get(0) + } + return q.orderedQueue.get(0) + } + + if c := q.unorderedQueue.get(0); c != nil { + return c + } + return q.orderedQueue.get(0) +} + +func (q *pendingQueue) pop(c *chunkPayloadData) error { + if q.selected { + var popped *chunkPayloadData + if q.unorderedIsSelected { + popped = q.unorderedQueue.pop() + if popped != c { + return errUnexpectedChuckPoppedUnordered + } + } else { + popped = q.orderedQueue.pop() + if popped != c { + return errUnexpectedChuckPoppedOrdered + } + } + if popped.endingFragment { + q.selected = false + } + } else { + if !c.beginningFragment { + return errUnexpectedQState + } + if c.unordered { + popped := q.unorderedQueue.pop() + if popped != c { + return errUnexpectedChuckPoppedUnordered + } + if !popped.endingFragment { + q.selected = true + q.unorderedIsSelected = true + } + } else { + popped := q.orderedQueue.pop() + if popped != c { + return errUnexpectedChuckPoppedOrdered + } + if !popped.endingFragment { + q.selected = true + q.unorderedIsSelected = false + } + } + } + q.nBytes -= len(c.userData) + return nil +} + +func (q *pendingQueue) getNumBytes() int { + return q.nBytes +} + +func (q *pendingQueue) size() int { + return q.unorderedQueue.size() + q.orderedQueue.size() +} diff --git a/vendor/github.com/pion/sctp/reassembly_queue.go b/vendor/github.com/pion/sctp/reassembly_queue.go new file mode 100644 index 0000000..f71d59c --- /dev/null +++ b/vendor/github.com/pion/sctp/reassembly_queue.go @@ -0,0 +1,353 @@ +package sctp + +import ( + "io" + "sort" + "sync/atomic" + + "github.com/pkg/errors" +) + +func sortChunksByTSN(a []*chunkPayloadData) { + sort.Slice(a, func(i, j int) bool { + return sna32LT(a[i].tsn, a[j].tsn) + }) +} + +func sortChunksBySSN(a []*chunkSet) { + sort.Slice(a, func(i, j int) bool { + return sna16LT(a[i].ssn, a[j].ssn) + }) +} + +// chunkSet is a set of chunks that share the same SSN +type chunkSet struct { + ssn uint16 // used only with the ordered chunks + ppi PayloadProtocolIdentifier + chunks []*chunkPayloadData +} + +func newChunkSet(ssn uint16, ppi PayloadProtocolIdentifier) *chunkSet { + return &chunkSet{ + ssn: ssn, + ppi: ppi, + chunks: []*chunkPayloadData{}, + } +} + +func (set *chunkSet) push(chunk *chunkPayloadData) bool { + // check if dup + for _, c := range set.chunks { + if c.tsn == chunk.tsn { + return false + } + } + + // append and sort + set.chunks = append(set.chunks, chunk) + sortChunksByTSN(set.chunks) + + // Check if we now have a complete set + complete := set.isComplete() + return complete +} + +func (set *chunkSet) isComplete() bool { + // Condition for complete set + // 0. Has at least one chunk. + // 1. Begins with beginningFragment set to true + // 2. Ends with endingFragment set to true + // 3. TSN monotinically increase by 1 from beginning to end + + // 0. + nChunks := len(set.chunks) + if nChunks == 0 { + return false + } + + // 1. + if !set.chunks[0].beginningFragment { + return false + } + + // 2. + if !set.chunks[nChunks-1].endingFragment { + return false + } + + // 3. + var lastTSN uint32 + for i, c := range set.chunks { + if i > 0 { + // Fragments must have contiguous TSN + // From RFC 4960 Section 3.3.1: + // When a user message is fragmented into multiple chunks, the TSNs are + // used by the receiver to reassemble the message. This means that the + // TSNs for each fragment of a fragmented user message MUST be strictly + // sequential. + if c.tsn != lastTSN+1 { + // mid or end fragment is missing + return false + } + } + + lastTSN = c.tsn + } + + return true +} + +type reassemblyQueue struct { + si uint16 + nextSSN uint16 // expected SSN for next ordered chunk + ordered []*chunkSet + unordered []*chunkSet + unorderedChunks []*chunkPayloadData + nBytes uint64 +} + +var errTryAgain = errors.New("try again") + +func newReassemblyQueue(si uint16) *reassemblyQueue { + // From RFC 4960 Sec 6.5: + // The Stream Sequence Number in all the streams MUST start from 0 when + // the association is established. Also, when the Stream Sequence + // Number reaches the value 65535 the next Stream Sequence Number MUST + // be set to 0. + return &reassemblyQueue{ + si: si, + nextSSN: 0, // From RFC 4960 Sec 6.5: + ordered: make([]*chunkSet, 0), + unordered: make([]*chunkSet, 0), + } +} + +func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { + var cset *chunkSet + + if chunk.streamIdentifier != r.si { + return false + } + + if chunk.unordered { + // First, insert into unorderedChunks array + r.unorderedChunks = append(r.unorderedChunks, chunk) + atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData))) + sortChunksByTSN(r.unorderedChunks) + + // Scan unorderedChunks that are contiguous (in TSN) + cset = r.findCompleteUnorderedChunkSet() + + // If found, append the complete set to the unordered array + if cset != nil { + r.unordered = append(r.unordered, cset) + return true + } + + return false + } + + // This is an ordered chunk + + if sna16LT(chunk.streamSequenceNumber, r.nextSSN) { + return false + } + + // Check if a chunkSet with the SSN already exists + for _, set := range r.ordered { + if set.ssn == chunk.streamSequenceNumber { + cset = set + break + } + } + + // If not found, create a new chunkSet + if cset == nil { + cset = newChunkSet(chunk.streamSequenceNumber, chunk.payloadType) + r.ordered = append(r.ordered, cset) + if !chunk.unordered { + sortChunksBySSN(r.ordered) + } + } + + atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData))) + + return cset.push(chunk) +} + +func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet { + startIdx := -1 + nChunks := 0 + var lastTSN uint32 + var found bool + + for i, c := range r.unorderedChunks { + // seek beigining + if c.beginningFragment { + startIdx = i + nChunks = 1 + lastTSN = c.tsn + + if c.endingFragment { + found = true + break + } + continue + } + + if startIdx < 0 { + continue + } + + // Check if contiguous in TSN + if c.tsn != lastTSN+1 { + startIdx = -1 + continue + } + + lastTSN = c.tsn + nChunks++ + + if c.endingFragment { + found = true + break + } + } + + if !found { + return nil + } + + // Extract the range of chunks + var chunks []*chunkPayloadData + chunks = append(chunks, r.unorderedChunks[startIdx:startIdx+nChunks]...) + + r.unorderedChunks = append( + r.unorderedChunks[:startIdx], + r.unorderedChunks[startIdx+nChunks:]...) + + chunkSet := newChunkSet(0, chunks[0].payloadType) + chunkSet.chunks = chunks + + return chunkSet +} + +func (r *reassemblyQueue) isReadable() bool { + // Check unordered first + if len(r.unordered) > 0 { + // The chunk sets in r.unordered should all be complete. + return true + } + + // Check ordered sets + if len(r.ordered) > 0 { + cset := r.ordered[0] + if cset.isComplete() { + if sna16LTE(cset.ssn, r.nextSSN) { + return true + } + } + } + return false +} + +func (r *reassemblyQueue) read(buf []byte) (int, PayloadProtocolIdentifier, error) { + var cset *chunkSet + // Check unordered first + switch { + case len(r.unordered) > 0: + cset = r.unordered[0] + r.unordered = r.unordered[1:] + case len(r.ordered) > 0: + // Now, check ordered + cset = r.ordered[0] + if !cset.isComplete() { + return 0, 0, errTryAgain + } + if sna16GT(cset.ssn, r.nextSSN) { + return 0, 0, errTryAgain + } + r.ordered = r.ordered[1:] + if cset.ssn == r.nextSSN { + r.nextSSN++ + } + default: + return 0, 0, errTryAgain + } + + // Concat all fragments into the buffer + nWritten := 0 + ppi := cset.ppi + var err error + for _, c := range cset.chunks { + toCopy := len(c.userData) + r.subtractNumBytes(toCopy) + if err == nil { + n := copy(buf[nWritten:], c.userData) + nWritten += n + if n < toCopy { + err = io.ErrShortBuffer + } + } + } + + return nWritten, ppi, err +} + +func (r *reassemblyQueue) forwardTSNForOrdered(lastSSN uint16) { + // Use lastSSN to locate a chunkSet then remove it if the set has + // not been complete + keep := []*chunkSet{} + for _, set := range r.ordered { + if sna16LTE(set.ssn, lastSSN) { + if !set.isComplete() { + // drop the set + for _, c := range set.chunks { + r.subtractNumBytes(len(c.userData)) + } + continue + } + } + keep = append(keep, set) + } + r.ordered = keep + + // Finally, forward nextSSN + if sna16LTE(r.nextSSN, lastSSN) { + r.nextSSN = lastSSN + 1 + } +} + +func (r *reassemblyQueue) forwardTSNForUnordered(newCumulativeTSN uint32) { + // Remove all fragments in the unordered sets that contains chunks + // equal to or older than `newCumulativeTSN`. + // We know all sets in the r.unordered are complete ones. + // Just remove chunks that are equal to or older than newCumulativeTSN + // from the unorderedChunks + lastIdx := -1 + for i, c := range r.unorderedChunks { + if sna32GT(c.tsn, newCumulativeTSN) { + break + } + lastIdx = i + } + if lastIdx >= 0 { + for _, c := range r.unorderedChunks[0 : lastIdx+1] { + r.subtractNumBytes(len(c.userData)) + } + r.unorderedChunks = r.unorderedChunks[lastIdx+1:] + } +} + +func (r *reassemblyQueue) subtractNumBytes(nBytes int) { + cur := atomic.LoadUint64(&r.nBytes) + if int(cur) >= nBytes { + atomic.AddUint64(&r.nBytes, -uint64(nBytes)) + } else { + atomic.StoreUint64(&r.nBytes, 0) + } +} + +func (r *reassemblyQueue) getNumBytes() int { + return int(atomic.LoadUint64(&r.nBytes)) +} diff --git a/vendor/github.com/pion/sctp/renovate.json b/vendor/github.com/pion/sctp/renovate.json new file mode 100644 index 0000000..4400fd9 --- /dev/null +++ b/vendor/github.com/pion/sctp/renovate.json @@ -0,0 +1,15 @@ +{ + "extends": [ + "config:base" + ], + "postUpdateOptions": [ + "gomodTidy" + ], + "commitBody": "Generated by renovateBot", + "packageRules": [ + { + "packagePatterns": ["^golang.org/x/"], + "schedule": ["on the first day of the month"] + } + ] +} diff --git a/vendor/github.com/pion/sctp/rtx_timer.go b/vendor/github.com/pion/sctp/rtx_timer.go new file mode 100644 index 0000000..14bc39d --- /dev/null +++ b/vendor/github.com/pion/sctp/rtx_timer.go @@ -0,0 +1,219 @@ +package sctp + +import ( + "math" + "sync" + "time" +) + +const ( + rtoInitial float64 = 3.0 * 1000 // msec + rtoMin float64 = 1.0 * 1000 // msec + rtoMax float64 = 60.0 * 1000 // msec + rtoAlpha float64 = 0.125 + rtoBeta float64 = 0.25 + maxInitRetrans uint = 8 + pathMaxRetrans uint = 5 + noMaxRetrans uint = 0 +) + +// rtoManager manages Rtx timeout values. +// This is an implementation of RFC 4960 sec 6.3.1. +type rtoManager struct { + srtt float64 + rttvar float64 + rto float64 + noUpdate bool + mutex sync.RWMutex +} + +// newRTOManager creates a new rtoManager. +func newRTOManager() *rtoManager { + return &rtoManager{ + rto: rtoInitial, + } +} + +// setNewRTT takes a newly measured RTT then adjust the RTO in msec. +func (m *rtoManager) setNewRTT(rtt float64) float64 { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.noUpdate { + return m.srtt + } + + if m.srtt == 0 { + // First measurement + m.srtt = rtt + m.rttvar = rtt / 2 + } else { + // Subsequent rtt measurement + m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt)) + m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt + } + m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax) + return m.srtt +} + +// getRTO simply returns the current RTO in msec. +func (m *rtoManager) getRTO() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + return m.rto +} + +// reset resets the RTO variables to the initial values. +func (m *rtoManager) reset() { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.noUpdate { + return + } + + m.srtt = 0 + m.rttvar = 0 + m.rto = rtoInitial +} + +// set RTO value for testing +func (m *rtoManager) setRTO(rto float64, noUpdate bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.rto = rto + m.noUpdate = noUpdate +} + +// rtxTimerObserver is the inteface to a timer observer. +// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer +// from within these callbacks. +type rtxTimerObserver interface { + onRetransmissionTimeout(timerID int, n uint) + onRetransmissionFailure(timerID int) +} + +// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 +type rtxTimer struct { + id int + observer rtxTimerObserver + maxRetrans uint + stopFunc stopTimerLoop + closed bool + mutex sync.RWMutex +} + +type stopTimerLoop func() + +// newRTXTimer creates a new retransmission timer. +// if maxRetrans is set to 0, it will keep retransmitting until stop() is called. +// (it will never make onRetransmissionFailure() callback. +func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer { + return &rtxTimer{ + id: id, + observer: observer, + maxRetrans: maxRetrans, + } +} + +// start starts the timer. +func (t *rtxTimer) start(rto float64) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + + // this timer is already closed + if t.closed { + return false + } + + // this is a noop if the timer is always running + if t.stopFunc != nil { + return false + } + + // Note: rto value is intentionally not capped by RTO.Min to allow + // fast timeout for the tests. Non-test code should pass in the + // rto generated by rtoManager getRTO() method which caps the + // value at RTO.Min or at RTO.Max. + var nRtos uint + + cancelCh := make(chan struct{}) + + go func() { + canceling := false + + for !canceling { + timeout := calculateNextTimeout(rto, nRtos) + timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) + + select { + case <-timer.C: + nRtos++ + if t.maxRetrans == 0 || nRtos <= t.maxRetrans { + t.observer.onRetransmissionTimeout(t.id, nRtos) + } else { + t.stop() + t.observer.onRetransmissionFailure(t.id) + } + case <-cancelCh: + canceling = true + timer.Stop() + } + } + }() + + t.stopFunc = func() { + close(cancelCh) + } + + return true +} + +// stop stops the timer. +func (t *rtxTimer) stop() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if t.stopFunc != nil { + t.stopFunc() + t.stopFunc = nil + } +} + +// closes the timer. this is similar to stop() but subsequent start() call +// will fail (the timer is no longer usable) +func (t *rtxTimer) close() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if t.stopFunc != nil { + t.stopFunc() + t.stopFunc = nil + } + + t.closed = true +} + +// isRunning tests if the timer is running. +// Debug purpose only +func (t *rtxTimer) isRunning() bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + + return (t.stopFunc != nil) +} + +func calculateNextTimeout(rto float64, nRtos uint) float64 { + // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration + // E2) For the destination address for which the timer expires, set RTO + // <- RTO * 2 ("back off the timer"). The maximum value discussed + // in rule C7 above (RTO.max) may be used to provide an upper bound + // to this doubling operation. + if nRtos < 31 { + m := 1 << nRtos + return math.Min(rto*float64(m), rtoMax) + } + return rtoMax +} diff --git a/vendor/github.com/pion/sctp/sctp.go b/vendor/github.com/pion/sctp/sctp.go new file mode 100644 index 0000000..e601342 --- /dev/null +++ b/vendor/github.com/pion/sctp/sctp.go @@ -0,0 +1,2 @@ +// Package sctp implements the SCTP spec +package sctp diff --git a/vendor/github.com/pion/sctp/stream.go b/vendor/github.com/pion/sctp/stream.go new file mode 100644 index 0000000..8a17b97 --- /dev/null +++ b/vendor/github.com/pion/sctp/stream.go @@ -0,0 +1,357 @@ +package sctp + +import ( + "io" + "math" + "sync" + + "github.com/pion/logging" + "github.com/pkg/errors" +) + +const ( + // ReliabilityTypeReliable is used for reliable transmission + ReliabilityTypeReliable byte = 0 + // ReliabilityTypeRexmit is used for partial reliability by retransmission count + ReliabilityTypeRexmit byte = 1 + // ReliabilityTypeTimed is used for partial reliability by retransmission duration + ReliabilityTypeTimed byte = 2 +) + +// Stream represents an SCTP stream +type Stream struct { + association *Association + lock sync.RWMutex + streamIdentifier uint16 + defaultPayloadType PayloadProtocolIdentifier + reassemblyQueue *reassemblyQueue + sequenceNumber uint16 + readNotifier *sync.Cond + readErr error + writeErr error + unordered bool + reliabilityType byte + reliabilityValue uint32 + bufferedAmount uint64 + bufferedAmountLow uint64 + onBufferedAmountLow func() + log logging.LeveledLogger + name string +} + +// StreamIdentifier returns the Stream identifier associated to the stream. +func (s *Stream) StreamIdentifier() uint16 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.streamIdentifier +} + +// SetDefaultPayloadType sets the default payload type used by Write. +func (s *Stream) SetDefaultPayloadType(defaultPayloadType PayloadProtocolIdentifier) { + s.lock.Lock() + defer s.lock.Unlock() + + s.setDefaultPayloadType(defaultPayloadType) +} + +// setDefaultPayloadType sets the defaultPayloadType. The caller should hold the lock. +func (s *Stream) setDefaultPayloadType(defaultPayloadType PayloadProtocolIdentifier) { + s.defaultPayloadType = defaultPayloadType +} + +// SetReliabilityParams sets reliability parameters for this stream. +func (s *Stream) SetReliabilityParams(unordered bool, relType byte, relVal uint32) { + s.lock.Lock() + defer s.lock.Unlock() + + s.setReliabilityParams(unordered, relType, relVal) +} + +// setReliabilityParams sets reliability parameters for this stream. +// The caller should hold the lock. +func (s *Stream) setReliabilityParams(unordered bool, relType byte, relVal uint32) { + s.log.Debugf("[%s] reliability params: ordered=%v type=%d value=%d", + s.name, !unordered, relType, relVal) + s.unordered = unordered + s.reliabilityType = relType + s.reliabilityValue = relVal +} + +// Read reads a packet of len(p) bytes, dropping the Payload Protocol Identifier. +// Returns EOF when the stream is reset or an error if the stream is closed +// otherwise. +func (s *Stream) Read(p []byte) (int, error) { + n, _, err := s.ReadSCTP(p) + return n, err +} + +// ReadSCTP reads a packet of len(p) bytes and returns the associated Payload +// Protocol Identifier. +// Returns EOF when the stream is reset or an error if the stream is closed +// otherwise. +func (s *Stream) ReadSCTP(p []byte) (int, PayloadProtocolIdentifier, error) { + s.lock.Lock() + defer s.lock.Unlock() + + for { + n, ppi, err := s.reassemblyQueue.read(p) + if err == nil { + return n, ppi, nil + } else if errors.Is(err, io.ErrShortBuffer) { + return 0, PayloadProtocolIdentifier(0), err + } + + err = s.readErr + if err != nil { + return 0, PayloadProtocolIdentifier(0), err + } + + s.readNotifier.Wait() + } +} + +func (s *Stream) handleData(pd *chunkPayloadData) { + s.lock.Lock() + defer s.lock.Unlock() + + var readable bool + if s.reassemblyQueue.push(pd) { + readable = s.reassemblyQueue.isReadable() + s.log.Debugf("[%s] reassemblyQueue readable=%v", s.name, readable) + if readable { + s.log.Debugf("[%s] readNotifier.signal()", s.name) + s.readNotifier.Signal() + s.log.Debugf("[%s] readNotifier.signal() done", s.name) + } + } +} + +func (s *Stream) handleForwardTSNForOrdered(ssn uint16) { + var readable bool + + func() { + s.lock.Lock() + defer s.lock.Unlock() + + if s.unordered { + return // unordered chunks are handled by handleForwardUnordered method + } + + // Remove all chunks older than or equal to the new TSN from + // the reassemblyQueue. + s.reassemblyQueue.forwardTSNForOrdered(ssn) + readable = s.reassemblyQueue.isReadable() + }() + + // Notify the reader asynchronously if there's a data chunk to read. + if readable { + s.readNotifier.Signal() + } +} + +func (s *Stream) handleForwardTSNForUnordered(newCumulativeTSN uint32) { + var readable bool + + func() { + s.lock.Lock() + defer s.lock.Unlock() + + if !s.unordered { + return // ordered chunks are handled by handleForwardTSNOrdered method + } + + // Remove all chunks older than or equal to the new TSN from + // the reassemblyQueue. + s.reassemblyQueue.forwardTSNForUnordered(newCumulativeTSN) + readable = s.reassemblyQueue.isReadable() + }() + + // Notify the reader asynchronously if there's a data chunk to read. + if readable { + s.readNotifier.Signal() + } +} + +// Write writes len(p) bytes from p with the default Payload Protocol Identifier +func (s *Stream) Write(p []byte) (n int, err error) { + return s.WriteSCTP(p, s.defaultPayloadType) +} + +// WriteSCTP writes len(p) bytes from p to the DTLS connection +func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) { + maxMessageSize := s.association.MaxMessageSize() + if len(p) > int(maxMessageSize) { + return 0, errors.Errorf("Outbound packet larger than maximum message size %v", math.MaxUint16) + } + + s.lock.RLock() + err = s.writeErr + s.lock.RUnlock() + if err != nil { + return 0, err + } + + chunks := s.packetize(p, ppi) + + return len(p), s.association.sendPayloadData(chunks) +} + +func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData { + s.lock.Lock() + defer s.lock.Unlock() + + i := uint32(0) + remaining := uint32(len(raw)) + + // From draft-ietf-rtcweb-data-protocol-09, section 6: + // All Data Channel Establishment Protocol messages MUST be sent using + // ordered delivery and reliable transmission. + unordered := ppi != PayloadTypeWebRTCDCEP && s.unordered + + var chunks []*chunkPayloadData + var head *chunkPayloadData + for remaining != 0 { + fragmentSize := min32(s.association.maxPayloadSize, remaining) + + // Copy the userdata since we'll have to store it until acked + // and the caller may re-use the buffer in the mean time + userData := make([]byte, fragmentSize) + copy(userData, raw[i:i+fragmentSize]) + + chunk := &chunkPayloadData{ + streamIdentifier: s.streamIdentifier, + userData: userData, + unordered: unordered, + beginningFragment: i == 0, + endingFragment: remaining-fragmentSize == 0, + immediateSack: false, + payloadType: ppi, + streamSequenceNumber: s.sequenceNumber, + head: head, + } + + if head == nil { + head = chunk + } + + chunks = append(chunks, chunk) + + remaining -= fragmentSize + i += fragmentSize + } + + // RFC 4960 Sec 6.6 + // Note: When transmitting ordered and unordered data, an endpoint does + // not increment its Stream Sequence Number when transmitting a DATA + // chunk with U flag set to 1. + if !unordered { + s.sequenceNumber++ + } + + s.bufferedAmount += uint64(len(raw)) + s.log.Tracef("[%s] bufferedAmount = %d", s.name, s.bufferedAmount) + + return chunks +} + +// Close closes the write-direction of the stream. +// Future calls to Write are not permitted after calling Close. +func (s *Stream) Close() error { + if sid, isOpen := func() (uint16, bool) { + s.lock.Lock() + defer s.lock.Unlock() + + isOpen := true + if s.writeErr == nil { + s.writeErr = errors.New("Stream closed") + } else { + isOpen = false + } + + if s.readErr == nil { + s.readErr = io.EOF + } else { + isOpen = false + } + s.readNotifier.Broadcast() // broadcast regardless + + return s.streamIdentifier, isOpen + }(); isOpen { + // Reset the outgoing stream + // https://tools.ietf.org/html/rfc6525 + return s.association.sendResetRequest(sid) + } + + return nil +} + +// BufferedAmount returns the number of bytes of data currently queued to be sent over this stream. +func (s *Stream) BufferedAmount() uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.bufferedAmount +} + +// BufferedAmountLowThreshold returns the number of bytes of buffered outgoing data that is +// considered "low." Defaults to 0. +func (s *Stream) BufferedAmountLowThreshold() uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.bufferedAmountLow +} + +// SetBufferedAmountLowThreshold is used to update the threshold. +// See BufferedAmountLowThreshold(). +func (s *Stream) SetBufferedAmountLowThreshold(th uint64) { + s.lock.Lock() + defer s.lock.Unlock() + + s.bufferedAmountLow = th +} + +// OnBufferedAmountLow sets the callback handler which would be called when the number of +// bytes of outgoing data buffered is lower than the threshold. +func (s *Stream) OnBufferedAmountLow(f func()) { + s.lock.Lock() + defer s.lock.Unlock() + + s.onBufferedAmountLow = f +} + +// This method is called by association's readLoop (go-)routine to notify this stream +// of the specified amount of outgoing data has been delivered to the peer. +func (s *Stream) onBufferReleased(nBytesReleased int) { + if nBytesReleased <= 0 { + return + } + + s.lock.Lock() + + fromAmount := s.bufferedAmount + + if s.bufferedAmount < uint64(nBytesReleased) { + s.bufferedAmount = 0 + s.log.Errorf("[%s] released buffer size %d should be <= %d", + s.name, nBytesReleased, s.bufferedAmount) + } else { + s.bufferedAmount -= uint64(nBytesReleased) + } + + s.log.Tracef("[%s] bufferedAmount = %d", s.name, s.bufferedAmount) + + if s.onBufferedAmountLow != nil && fromAmount > s.bufferedAmountLow && s.bufferedAmount <= s.bufferedAmountLow { + f := s.onBufferedAmountLow + s.lock.Unlock() + f() + return + } + + s.lock.Unlock() +} + +func (s *Stream) getNumBytesInReassemblyQueue() int { + // No lock is required as it reads the size with atomic load function. + return s.reassemblyQueue.getNumBytes() +} diff --git a/vendor/github.com/pion/sctp/util.go b/vendor/github.com/pion/sctp/util.go new file mode 100644 index 0000000..e2e54ab --- /dev/null +++ b/vendor/github.com/pion/sctp/util.go @@ -0,0 +1,58 @@ +package sctp + +const ( + paddingMultiple = 4 +) + +func getPadding(len int) int { + return (paddingMultiple - (len % paddingMultiple)) % paddingMultiple +} + +func padByte(in []byte, cnt int) []byte { + if cnt < 0 { + cnt = 0 + } + padding := make([]byte, cnt) + return append(in, padding...) +} + +// Serial Number Arithmetic (RFC 1982) +func sna32LT(i1, i2 uint32) bool { + return (i1 < i2 && i2-i1 < 1<<31) || (i1 > i2 && i1-i2 > 1<<31) +} + +func sna32LTE(i1, i2 uint32) bool { + return i1 == i2 || sna32LT(i1, i2) +} + +func sna32GT(i1, i2 uint32) bool { + return (i1 < i2 && (i2-i1) >= 1<<31) || (i1 > i2 && (i1-i2) <= 1<<31) +} + +func sna32GTE(i1, i2 uint32) bool { + return i1 == i2 || sna32GT(i1, i2) +} + +func sna32EQ(i1, i2 uint32) bool { + return i1 == i2 +} + +func sna16LT(i1, i2 uint16) bool { + return (i1 < i2 && (i2-i1) < 1<<15) || (i1 > i2 && (i1-i2) > 1<<15) +} + +func sna16LTE(i1, i2 uint16) bool { + return i1 == i2 || sna16LT(i1, i2) +} + +func sna16GT(i1, i2 uint16) bool { + return (i1 < i2 && (i2-i1) >= 1<<15) || (i1 > i2 && (i1-i2) <= 1<<15) +} + +func sna16GTE(i1, i2 uint16) bool { + return i1 == i2 || sna16GT(i1, i2) +} + +func sna16EQ(i1, i2 uint16) bool { + return i1 == i2 +} |