summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/sctp
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/sctp')
-rw-r--r--vendor/github.com/pion/sctp/.gitignore1
-rw-r--r--vendor/github.com/pion/sctp/.golangci.yml82
-rw-r--r--vendor/github.com/pion/sctp/DESIGN.md20
-rw-r--r--vendor/github.com/pion/sctp/LICENSE21
-rw-r--r--vendor/github.com/pion/sctp/README.md54
-rw-r--r--vendor/github.com/pion/sctp/ack_timer.go105
-rw-r--r--vendor/github.com/pion/sctp/association.go2241
-rw-r--r--vendor/github.com/pion/sctp/association_stats.go61
-rw-r--r--vendor/github.com/pion/sctp/chunk.go9
-rw-r--r--vendor/github.com/pion/sctp/chunk_abort.go88
-rw-r--r--vendor/github.com/pion/sctp/chunk_cookie_ack.go44
-rw-r--r--vendor/github.com/pion/sctp/chunk_cookie_echo.go46
-rw-r--r--vendor/github.com/pion/sctp/chunk_error.go95
-rw-r--r--vendor/github.com/pion/sctp/chunk_forward_tsn.go145
-rw-r--r--vendor/github.com/pion/sctp/chunk_heartbeat.go75
-rw-r--r--vendor/github.com/pion/sctp/chunk_heartbeat_ack.go86
-rw-r--r--vendor/github.com/pion/sctp/chunk_init.go123
-rw-r--r--vendor/github.com/pion/sctp/chunk_init_ack.go126
-rw-r--r--vendor/github.com/pion/sctp/chunk_init_common.go155
-rw-r--r--vendor/github.com/pion/sctp/chunk_payload_data.go195
-rw-r--r--vendor/github.com/pion/sctp/chunk_reconfig.go99
-rw-r--r--vendor/github.com/pion/sctp/chunk_selective_ack.go142
-rw-r--r--vendor/github.com/pion/sctp/chunkheader.go90
-rw-r--r--vendor/github.com/pion/sctp/chunktype.go67
-rw-r--r--vendor/github.com/pion/sctp/codecov.yml20
-rw-r--r--vendor/github.com/pion/sctp/control_queue.go29
-rw-r--r--vendor/github.com/pion/sctp/error_cause.go91
-rw-r--r--vendor/github.com/pion/sctp/error_cause_header.go47
-rw-r--r--vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go19
-rw-r--r--vendor/github.com/pion/sctp/error_cause_protocol_violation.go50
-rw-r--r--vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go28
-rw-r--r--vendor/github.com/pion/sctp/go.mod14
-rw-r--r--vendor/github.com/pion/sctp/go.sum36
-rw-r--r--vendor/github.com/pion/sctp/packet.go178
-rw-r--r--vendor/github.com/pion/sctp/param.go35
-rw-r--r--vendor/github.com/pion/sctp/param_chunk_list.go28
-rw-r--r--vendor/github.com/pion/sctp/param_forward_tsn_supported.go28
-rw-r--r--vendor/github.com/pion/sctp/param_heartbeat_info.go21
-rw-r--r--vendor/github.com/pion/sctp/param_outgoing_reset_request.go88
-rw-r--r--vendor/github.com/pion/sctp/param_random.go21
-rw-r--r--vendor/github.com/pion/sctp/param_reconfig_response.go92
-rw-r--r--vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go73
-rw-r--r--vendor/github.com/pion/sctp/param_state_cookie.go46
-rw-r--r--vendor/github.com/pion/sctp/param_supported_extensions.go29
-rw-r--r--vendor/github.com/pion/sctp/paramheader.go63
-rw-r--r--vendor/github.com/pion/sctp/paramtype.go106
-rw-r--r--vendor/github.com/pion/sctp/payload_queue.go179
-rw-r--r--vendor/github.com/pion/sctp/pending_queue.go138
-rw-r--r--vendor/github.com/pion/sctp/reassembly_queue.go353
-rw-r--r--vendor/github.com/pion/sctp/renovate.json15
-rw-r--r--vendor/github.com/pion/sctp/rtx_timer.go219
-rw-r--r--vendor/github.com/pion/sctp/sctp.go2
-rw-r--r--vendor/github.com/pion/sctp/stream.go357
-rw-r--r--vendor/github.com/pion/sctp/util.go58
54 files changed, 6633 insertions, 0 deletions
diff --git a/vendor/github.com/pion/sctp/.gitignore b/vendor/github.com/pion/sctp/.gitignore
new file mode 100644
index 0000000..d39fb86
--- /dev/null
+++ b/vendor/github.com/pion/sctp/.gitignore
@@ -0,0 +1 @@
+*.sw[poe]
diff --git a/vendor/github.com/pion/sctp/.golangci.yml b/vendor/github.com/pion/sctp/.golangci.yml
new file mode 100644
index 0000000..4213697
--- /dev/null
+++ b/vendor/github.com/pion/sctp/.golangci.yml
@@ -0,0 +1,82 @@
+linters-settings:
+ govet:
+ check-shadowing: true
+ misspell:
+ locale: US
+ exhaustive:
+ default-signifies-exhaustive: true
+
+linters:
+ enable:
+ - asciicheck # Simple linter to check that your code does not contain non-ASCII identifiers
+ - bodyclose # checks whether HTTP response body is closed successfully
+ - deadcode # Finds unused code
+ - depguard # Go linter that checks if package imports are in a list of acceptable packages
+ - dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
+ - dupl # Tool for code clone detection
+ - errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
+ - exhaustive # check exhaustiveness of enum switch statements
+ - exportloopref # checks for pointers to enclosing loop variables
+ - gci # Gci control golang package import order and make it always deterministic.
+ - gochecknoglobals # Checks that no globals are present in Go code
+ - gochecknoinits # Checks that no init functions are present in Go code
+ - gocognit # Computes and checks the cognitive complexity of functions
+ - goconst # Finds repeated strings that could be replaced by a constant
+ - gocritic # The most opinionated Go source code linter
+ - godox # Tool for detection of FIXME, TODO and other comment keywords
+ - goerr113 # Golang linter to check the errors handling expressions
+ - gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
+ - gofumpt # Gofumpt checks whether code was gofumpt-ed.
+ - goheader # Checks is file header matches to pattern
+ - goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
+ - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
+ - gomodguard # Allow and block list linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations.
+ - goprintffuncname # Checks that printf-like functions are named with `f` at the end
+ - gosec # Inspects source code for security problems
+ - gosimple # Linter for Go source code that specializes in simplifying a code
+ - govet # Vet examines Go source code and reports suspicious constructs, such as Printf calls whose arguments do not align with the format string
+ - ineffassign # Detects when assignments to existing variables are not used
+ - misspell # Finds commonly misspelled English words in comments
+ - nakedret # Finds naked returns in functions greater than a specified function length
+ - noctx # noctx finds sending http request without context.Context
+ - scopelint # Scopelint checks for unpinned variables in go programs
+ - staticcheck # Staticcheck is a go vet on steroids, applying a ton of static analysis checks
+ - structcheck # Finds unused struct fields
+ - stylecheck # Stylecheck is a replacement for golint
+ - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code
+ - unconvert # Remove unnecessary type conversions
+ - unparam # Reports unused function parameters
+ - unused # Checks Go code for unused constants, variables, functions and types
+ - varcheck # Finds unused global variables and constants
+ - whitespace # Tool for detection of leading and trailing whitespace
+ disable:
+ - funlen # Tool for detection of long functions
+ - gocyclo # Computes and checks the cyclomatic complexity of functions
+ - godot # Check if comments end in a period
+ - gomnd # An analyzer to detect magic numbers.
+ - lll # Reports long lines
+ - maligned # Tool to detect Go structs that would take less memory if their fields were sorted
+ - nestif # Reports deeply nested if statements
+ - nlreturn # nlreturn checks for a new line before return and branch statements to increase code clarity
+ - nolintlint # Reports ill-formed or insufficient nolint directives
+ - prealloc # Finds slice declarations that could potentially be preallocated
+ - rowserrcheck # checks whether Err of rows is checked successfully
+ - sqlclosecheck # Checks that sql.Rows and sql.Stmt are closed.
+ - testpackage # linter that makes you use a separate _test package
+ - wsl # Whitespace Linter - Forces you to use empty lines!
+
+issues:
+ exclude-rules:
+ # Allow complex tests, better to be self contained
+ - path: _test\.go
+ linters:
+ - gocognit
+
+ # Allow complex main function in examples
+ - path: examples
+ text: "of func `main` is high"
+ linters:
+ - gocognit
+
+run:
+ skip-dirs-use-default: false
diff --git a/vendor/github.com/pion/sctp/DESIGN.md b/vendor/github.com/pion/sctp/DESIGN.md
new file mode 100644
index 0000000..02ac161
--- /dev/null
+++ b/vendor/github.com/pion/sctp/DESIGN.md
@@ -0,0 +1,20 @@
+<h1 align="center">
+ Design
+</h1>
+
+### Portable
+Pion SCTP is written in Go and extremely portable. Anywhere Golang runs, Pion SCTP should work as well! Instead of dealing with complicated
+cross-compiling of multiple libraries, you now can run anywhere with one `go build`
+
+### Simple API
+The API is based on an io.ReadWriteCloser.
+
+### Readable
+If code comes from an RFC we try to make sure everything is commented with a link to the spec.
+This makes learning and debugging easier, this library was written to also serve as a guide for others.
+
+### Tested
+Every commit is tested via travis-ci Go provides fantastic facilities for testing, and more will be added as time goes on.
+
+### Shared libraries
+Every pion product is built using shared libraries, allowing others to review and reuse our libraries.
diff --git a/vendor/github.com/pion/sctp/LICENSE b/vendor/github.com/pion/sctp/LICENSE
new file mode 100644
index 0000000..ab60297
--- /dev/null
+++ b/vendor/github.com/pion/sctp/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/pion/sctp/README.md b/vendor/github.com/pion/sctp/README.md
new file mode 100644
index 0000000..18b5693
--- /dev/null
+++ b/vendor/github.com/pion/sctp/README.md
@@ -0,0 +1,54 @@
+<h1 align="center">
+ <br>
+ Pion SCTP
+ <br>
+</h1>
+<h4 align="center">A Go implementation of SCTP</h4>
+<p align="center">
+ <a href="https://pion.ly"><img src="https://img.shields.io/badge/pion-sctp-gray.svg?longCache=true&colorB=brightgreen" alt="Pion SCTP"></a>
+ <!--<a href="https://sourcegraph.com/github.com/pion/webrtc?badge"><img src="https://sourcegraph.com/github.com/pion/webrtc/-/badge.svg" alt="Sourcegraph Widget"></a>-->
+ <a href="https://pion.ly/slack"><img src="https://img.shields.io/badge/join-us%20on%20slack-gray.svg?longCache=true&logo=slack&colorB=brightgreen" alt="Slack Widget"></a>
+ <br>
+ <a href="https://travis-ci.org/pion/sctp"><img src="https://travis-ci.org/pion/sctp.svg?branch=master" alt="Build Status"></a>
+ <a href="https://pkg.go.dev/github.com/pion/sctp"><img src="https://godoc.org/github.com/pion/sctp?status.svg" alt="GoDoc"></a>
+ <a href="https://codecov.io/gh/pion/sctp"><img src="https://codecov.io/gh/pion/sctp/branch/master/graph/badge.svg" alt="Coverage Status"></a>
+ <a href="https://goreportcard.com/report/github.com/pion/sctp"><img src="https://goreportcard.com/badge/github.com/pion/sctp" alt="Go Report Card"></a>
+ <!--<a href="https://www.codacy.com/app/Sean-Der/webrtc"><img src="https://api.codacy.com/project/badge/Grade/18f4aec384894e6aac0b94effe51961d" alt="Codacy Badge"></a>-->
+ <a href="LICENSE"><img src="https://img.shields.io/badge/License-MIT-yellow.svg" alt="License: MIT"></a>
+</p>
+<br>
+
+See [DESIGN.md](DESIGN.md) for an overview of features and future goals.
+
+### Roadmap
+The library is used as a part of our WebRTC implementation. Please refer to that [roadmap](https://github.com/pion/webrtc/issues/9) to track our major milestones.
+
+### Community
+Pion has an active community on the [Golang Slack](https://invite.slack.golangbridge.org/). Sign up and join the **#pion** channel for discussions and support. You can also use [Pion mailing list](https://groups.google.com/forum/#!forum/pion).
+
+We are always looking to support **your projects**. Please reach out if you have something to build!
+
+If you need commercial support or don't want to use public methods you can contact us at [team@pion.ly](mailto:team@pion.ly)
+
+### Contributing
+Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contributing)** to join the group of amazing people making this project possible:
+
+* [John Bradley](https://github.com/kc5nra) - *Original Author*
+* [Sean DuBois](https://github.com/Sean-Der) - *Original Author*
+* [Michiel De Backker](https://github.com/backkem) - *Public API, Initialization*
+* [Konstantin Itskov](https://github.com/trivigy) - *Fix documentation*
+* [chenkaiC4](https://github.com/chenkaiC4) - *Fix GolangCI Linter*
+* [Ronan J](https://github.com/ronanj) - *Fix PPID*
+* [Michael MacDonald](https://github.com/mjmac) - *Fix races*
+* [Yutaka Takeda](https://github.com/enobufs) - *PR-SCTP, Retransmissions, Congestion Control*
+* [Antoine Baché](https://github.com/Antonito) - *SCTP Profiling*
+* [Cecylia Bocovich](https://github.com/cohosh) - *Fix SCTP reads*
+* [Hugo Arregui](https://github.com/hugoArregui)
+* [Atsushi Watanabe](https://github.com/at-wat)
+* [Lukas Herman](https://github.com/lherman-cs)
+* [Luke Curley](https://github.com/kixelated) - *Performance*
+* [Aaron France](https://github.com/AeroNotix)
+* [ZHENK](https://github.com/scorpionknifes)
+
+### License
+MIT License - see [LICENSE](LICENSE) for full text
diff --git a/vendor/github.com/pion/sctp/ack_timer.go b/vendor/github.com/pion/sctp/ack_timer.go
new file mode 100644
index 0000000..ba23d54
--- /dev/null
+++ b/vendor/github.com/pion/sctp/ack_timer.go
@@ -0,0 +1,105 @@
+package sctp
+
+import (
+ "sync"
+ "time"
+)
+
+const (
+ ackInterval time.Duration = 200 * time.Millisecond
+)
+
+// ackTimerObserver is the inteface to an ack timer observer.
+type ackTimerObserver interface {
+ onAckTimeout()
+}
+
+// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
+type ackTimer struct {
+ observer ackTimerObserver
+ interval time.Duration
+ stopFunc stopAckTimerLoop
+ closed bool
+ mutex sync.RWMutex
+}
+
+type stopAckTimerLoop func()
+
+// newAckTimer creates a new acknowledgement timer used to enable delayed ack.
+func newAckTimer(observer ackTimerObserver) *ackTimer {
+ return &ackTimer{
+ observer: observer,
+ interval: ackInterval,
+ }
+}
+
+// start starts the timer.
+func (t *ackTimer) start() bool {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ // this timer is already closed
+ if t.closed {
+ return false
+ }
+
+ // this is a noop if the timer is already running
+ if t.stopFunc != nil {
+ return false
+ }
+
+ cancelCh := make(chan struct{})
+
+ go func() {
+ timer := time.NewTimer(t.interval)
+
+ select {
+ case <-timer.C:
+ t.stop()
+ t.observer.onAckTimeout()
+ case <-cancelCh:
+ timer.Stop()
+ }
+ }()
+
+ t.stopFunc = func() {
+ close(cancelCh)
+ }
+
+ return true
+}
+
+// stops the timer. this is similar to stop() but subsequent start() call
+// will fail (the timer is no longer usable)
+func (t *ackTimer) stop() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ if t.stopFunc != nil {
+ t.stopFunc()
+ t.stopFunc = nil
+ }
+}
+
+// closes the timer. this is similar to stop() but subsequent start() call
+// will fail (the timer is no longer usable)
+func (t *ackTimer) close() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ if t.stopFunc != nil {
+ t.stopFunc()
+ t.stopFunc = nil
+ }
+
+ t.closed = true
+}
+
+// isRunning tests if the timer is running.
+// Debug purpose only
+func (t *ackTimer) isRunning() bool {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+
+ return (t.stopFunc != nil)
+}
diff --git a/vendor/github.com/pion/sctp/association.go b/vendor/github.com/pion/sctp/association.go
new file mode 100644
index 0000000..1393cb8
--- /dev/null
+++ b/vendor/github.com/pion/sctp/association.go
@@ -0,0 +1,2241 @@
+package sctp
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "math"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pion/logging"
+ "github.com/pion/randutil"
+ "github.com/pkg/errors"
+)
+
+// Use global random generator to properly seed by crypto grade random.
+var (
+ globalMathRandomGenerator = randutil.NewMathRandomGenerator() // nolint:gochecknoglobals
+ errChunk = errors.New("Abort chunk, with following errors")
+)
+
+const (
+ receiveMTU uint32 = 8192 // MTU for inbound packet (from DTLS)
+ initialMTU uint32 = 1228 // initial MTU for outgoing packets (to DTLS)
+ initialRecvBufSize uint32 = 1024 * 1024
+ commonHeaderSize uint32 = 12
+ dataChunkHeaderSize uint32 = 16
+ defaultMaxMessageSize uint32 = 65536
+)
+
+// association state enums
+const (
+ closed uint32 = iota
+ cookieWait
+ cookieEchoed
+ established
+ shutdownAckSent
+ shutdownPending
+ shutdownReceived
+ shutdownSent
+)
+
+// retransmission timer IDs
+const (
+ timerT1Init int = iota
+ timerT1Cookie
+ timerT3RTX
+ timerReconfig
+)
+
+// ack mode (for testing)
+const (
+ ackModeNormal int = iota
+ ackModeNoDelay
+ ackModeAlwaysDelay
+)
+
+// ack transmission state
+const (
+ ackStateIdle int = iota // ack timer is off
+ ackStateImmediate // ack timer is on (ack is being delayed)
+ ackStateDelay // will send ack immediately
+)
+
+// other constants
+const (
+ acceptChSize = 16
+)
+
+func getAssociationStateString(a uint32) string {
+ switch a {
+ case closed:
+ return "Closed"
+ case cookieWait:
+ return "CookieWait"
+ case cookieEchoed:
+ return "CookieEchoed"
+ case established:
+ return "Established"
+ case shutdownPending:
+ return "ShutdownPending"
+ case shutdownSent:
+ return "ShutdownSent"
+ case shutdownReceived:
+ return "ShutdownReceived"
+ case shutdownAckSent:
+ return "ShutdownAckSent"
+ default:
+ return fmt.Sprintf("Invalid association state %d", a)
+ }
+}
+
+// Association represents an SCTP association
+// 13.2. Parameters Necessary per Association (i.e., the TCB)
+// Peer : Tag value to be sent in every packet and is received
+// Verification: in the INIT or INIT ACK chunk.
+// Tag :
+//
+// My : Tag expected in every inbound packet and sent in the
+// Verification: INIT or INIT ACK chunk.
+//
+// Tag :
+// State : A state variable indicating what state the association
+// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
+// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
+// : SHUTDOWN-ACK-SENT.
+//
+// Note: No "CLOSED" state is illustrated since if a
+// association is "CLOSED" its TCB SHOULD be removed.
+type Association struct {
+ bytesReceived uint64
+ bytesSent uint64
+
+ lock sync.RWMutex
+
+ netConn net.Conn
+
+ peerVerificationTag uint32
+ myVerificationTag uint32
+ state uint32
+ myNextTSN uint32 // nextTSN
+ peerLastTSN uint32 // lastRcvdTSN
+ minTSN2MeasureRTT uint32 // for RTT measurement
+ willSendForwardTSN bool
+ willRetransmitFast bool
+ willRetransmitReconfig bool
+
+ // Reconfig
+ myNextRSN uint32
+ reconfigs map[uint32]*chunkReconfig
+ reconfigRequests map[uint32]*paramOutgoingResetRequest
+
+ // Non-RFC internal data
+ sourcePort uint16
+ destinationPort uint16
+ myMaxNumInboundStreams uint16
+ myMaxNumOutboundStreams uint16
+ myCookie *paramStateCookie
+ payloadQueue *payloadQueue
+ inflightQueue *payloadQueue
+ pendingQueue *pendingQueue
+ controlQueue *controlQueue
+ mtu uint32
+ maxPayloadSize uint32 // max DATA chunk payload size
+ cumulativeTSNAckPoint uint32
+ advancedPeerTSNAckPoint uint32
+ useForwardTSN bool
+
+ // Congestion control parameters
+ maxReceiveBufferSize uint32
+ maxMessageSize uint32
+ cwnd uint32 // my congestion window size
+ rwnd uint32 // calculated peer's receiver windows size
+ ssthresh uint32 // slow start threshold
+ partialBytesAcked uint32
+ inFastRecovery bool
+ fastRecoverExitPoint uint32
+
+ // RTX & Ack timer
+ rtoMgr *rtoManager
+ t1Init *rtxTimer
+ t1Cookie *rtxTimer
+ t3RTX *rtxTimer
+ tReconfig *rtxTimer
+ ackTimer *ackTimer
+
+ // Chunks stored for retransmission
+ storedInit *chunkInit
+ storedCookieEcho *chunkCookieEcho
+
+ streams map[uint16]*Stream
+ acceptCh chan *Stream
+ readLoopCloseCh chan struct{}
+ awakeWriteLoopCh chan struct{}
+ closeWriteLoopCh chan struct{}
+ handshakeCompletedCh chan error
+
+ closeWriteLoopOnce sync.Once
+
+ // local error
+ silentError error
+
+ ackState int
+ ackMode int // for testing
+
+ // stats
+ stats *associationStats
+
+ // per inbound packet context
+ delayedAckTriggered bool
+ immediateAckTriggered bool
+
+ name string
+ log logging.LeveledLogger
+}
+
+// Config collects the arguments to createAssociation construction into
+// a single structure
+type Config struct {
+ NetConn net.Conn
+ MaxReceiveBufferSize uint32
+ MaxMessageSize uint32
+ LoggerFactory logging.LoggerFactory
+}
+
+// Server accepts a SCTP stream over a conn
+func Server(config Config) (*Association, error) {
+ a := createAssociation(config)
+ a.init(false)
+
+ select {
+ case err := <-a.handshakeCompletedCh:
+ if err != nil {
+ return nil, err
+ }
+ return a, nil
+ case <-a.readLoopCloseCh:
+ return nil, errors.Errorf("association closed before connecting")
+ }
+}
+
+// Client opens a SCTP stream over a conn
+func Client(config Config) (*Association, error) {
+ a := createAssociation(config)
+ a.init(true)
+
+ select {
+ case err := <-a.handshakeCompletedCh:
+ if err != nil {
+ return nil, err
+ }
+ return a, nil
+ case <-a.readLoopCloseCh:
+ return nil, errors.Errorf("association closed before connecting")
+ }
+}
+
+func createAssociation(config Config) *Association {
+ var maxReceiveBufferSize uint32
+ if config.MaxReceiveBufferSize == 0 {
+ maxReceiveBufferSize = initialRecvBufSize
+ } else {
+ maxReceiveBufferSize = config.MaxReceiveBufferSize
+ }
+
+ var maxMessageSize uint32
+ if config.MaxMessageSize == 0 {
+ maxMessageSize = defaultMaxMessageSize
+ } else {
+ maxMessageSize = config.MaxMessageSize
+ }
+
+ tsn := globalMathRandomGenerator.Uint32()
+ a := &Association{
+ netConn: config.NetConn,
+ maxReceiveBufferSize: maxReceiveBufferSize,
+ maxMessageSize: maxMessageSize,
+ myMaxNumOutboundStreams: math.MaxUint16,
+ myMaxNumInboundStreams: math.MaxUint16,
+ payloadQueue: newPayloadQueue(),
+ inflightQueue: newPayloadQueue(),
+ pendingQueue: newPendingQueue(),
+ controlQueue: newControlQueue(),
+ mtu: initialMTU,
+ maxPayloadSize: initialMTU - (commonHeaderSize + dataChunkHeaderSize),
+ myVerificationTag: globalMathRandomGenerator.Uint32(),
+ myNextTSN: tsn,
+ myNextRSN: tsn,
+ minTSN2MeasureRTT: tsn,
+ state: closed,
+ rtoMgr: newRTOManager(),
+ streams: map[uint16]*Stream{},
+ reconfigs: map[uint32]*chunkReconfig{},
+ reconfigRequests: map[uint32]*paramOutgoingResetRequest{},
+ acceptCh: make(chan *Stream, acceptChSize),
+ readLoopCloseCh: make(chan struct{}),
+ awakeWriteLoopCh: make(chan struct{}, 1),
+ closeWriteLoopCh: make(chan struct{}),
+ handshakeCompletedCh: make(chan error),
+ cumulativeTSNAckPoint: tsn - 1,
+ advancedPeerTSNAckPoint: tsn - 1,
+ silentError: errors.Errorf("silently discard"),
+ stats: &associationStats{},
+ log: config.LoggerFactory.NewLogger("sctp"),
+ }
+
+ a.name = fmt.Sprintf("%p", a)
+
+ // RFC 4690 Sec 7.2.1
+ // o The initial cwnd before DATA transmission or after a sufficiently
+ // long idle period MUST be set to min(4*MTU, max (2*MTU, 4380
+ // bytes)).
+ a.cwnd = min32(4*a.mtu, max32(2*a.mtu, 4380))
+ a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)",
+ a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
+
+ a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans)
+ a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans)
+ a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans) // retransmit forever
+ a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans) // retransmit forever
+ a.ackTimer = newAckTimer(a)
+
+ return a
+}
+
+func (a *Association) init(isClient bool) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ go a.readLoop()
+ go a.writeLoop()
+
+ if isClient {
+ a.setState(cookieWait)
+ init := &chunkInit{}
+ init.initialTSN = a.myNextTSN
+ init.numOutboundStreams = a.myMaxNumOutboundStreams
+ init.numInboundStreams = a.myMaxNumInboundStreams
+ init.initiateTag = a.myVerificationTag
+ init.advertisedReceiverWindowCredit = a.maxReceiveBufferSize
+ setSupportedExtensions(&init.chunkInitCommon)
+ a.storedInit = init
+
+ err := a.sendInit()
+ if err != nil {
+ a.log.Errorf("[%s] failed to send init: %s", a.name, err.Error())
+ }
+
+ a.t1Init.start(a.rtoMgr.getRTO())
+ }
+}
+
+// caller must hold a.lock
+func (a *Association) sendInit() error {
+ a.log.Debugf("[%s] sending INIT", a.name)
+ if a.storedInit == nil {
+ return errors.Errorf("the init not stored to send")
+ }
+
+ outbound := &packet{}
+ outbound.verificationTag = a.peerVerificationTag
+ a.sourcePort = 5000 // Spec??
+ a.destinationPort = 5000 // Spec??
+ outbound.sourcePort = a.sourcePort
+ outbound.destinationPort = a.destinationPort
+
+ outbound.chunks = []chunk{a.storedInit}
+
+ a.controlQueue.push(outbound)
+ a.awakeWriteLoop()
+
+ return nil
+}
+
+// caller must hold a.lock
+func (a *Association) sendCookieEcho() error {
+ if a.storedCookieEcho == nil {
+ return errors.Errorf("cookieEcho not stored to send")
+ }
+
+ a.log.Debugf("[%s] sending COOKIE-ECHO", a.name)
+
+ outbound := &packet{}
+ outbound.verificationTag = a.peerVerificationTag
+ outbound.sourcePort = a.sourcePort
+ outbound.destinationPort = a.destinationPort
+ outbound.chunks = []chunk{a.storedCookieEcho}
+
+ a.controlQueue.push(outbound)
+ a.awakeWriteLoop()
+
+ return nil
+}
+
+// Close ends the SCTP Association and cleans up any state
+func (a *Association) Close() error {
+ a.log.Debugf("[%s] closing association..", a.name)
+
+ a.setState(closed)
+
+ err := a.netConn.Close()
+
+ a.closeAllTimers()
+
+ // awake writeLoop to exit
+ a.closeWriteLoopOnce.Do(func() { close(a.closeWriteLoopCh) })
+
+ // Wait for readLoop to end
+ <-a.readLoopCloseCh
+
+ a.log.Debugf("[%s] association closed", a.name)
+ a.log.Debugf("[%s] stats nDATAs (in) : %d", a.name, a.stats.getNumDATAs())
+ a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKs())
+ a.log.Debugf("[%s] stats nT3Timeouts : %d", a.name, a.stats.getNumT3Timeouts())
+ a.log.Debugf("[%s] stats nAckTimeouts: %d", a.name, a.stats.getNumAckTimeouts())
+ a.log.Debugf("[%s] stats nFastRetrans: %d", a.name, a.stats.getNumFastRetrans())
+ return err
+}
+
+func (a *Association) closeAllTimers() {
+ // Close all retransmission & ack timers
+ a.t1Init.close()
+ a.t1Cookie.close()
+ a.t3RTX.close()
+ a.tReconfig.close()
+ a.ackTimer.close()
+}
+
+func (a *Association) readLoop() {
+ var closeErr error
+ defer func() {
+ // also stop writeLoop, otherwise writeLoop can be leaked
+ // if connection is lost when there is no writing packet.
+ a.closeWriteLoopOnce.Do(func() { close(a.closeWriteLoopCh) })
+
+ a.lock.Lock()
+ for _, s := range a.streams {
+ a.unregisterStream(s, closeErr)
+ }
+ a.lock.Unlock()
+ close(a.acceptCh)
+ close(a.readLoopCloseCh)
+ }()
+
+ a.log.Debugf("[%s] readLoop entered", a.name)
+ buffer := make([]byte, receiveMTU)
+
+ for {
+ n, err := a.netConn.Read(buffer)
+ if err != nil {
+ closeErr = err
+ break
+ }
+ // Make a buffer sized to what we read, then copy the data we
+ // read from the underlying transport. We do this because the
+ // user data is passed to the reassembly queue without
+ // copying.
+ inbound := make([]byte, n)
+ copy(inbound, buffer[:n])
+ atomic.AddUint64(&a.bytesReceived, uint64(n))
+ if err = a.handleInbound(inbound); err != nil {
+ closeErr = err
+ break
+ }
+ }
+
+ a.log.Debugf("[%s] readLoop exited %s", a.name, closeErr)
+}
+
+func (a *Association) writeLoop() {
+ a.log.Debugf("[%s] writeLoop entered", a.name)
+
+loop:
+ for {
+ rawPackets := a.gatherOutbound()
+
+ for _, raw := range rawPackets {
+ _, err := a.netConn.Write(raw)
+ if err != nil {
+ if err != io.EOF {
+ a.log.Warnf("[%s] failed to write packets on netConn: %v", a.name, err)
+ }
+ a.log.Debugf("[%s] writeLoop ended", a.name)
+ break loop
+ }
+ atomic.AddUint64(&a.bytesSent, uint64(len(raw)))
+ }
+
+ select {
+ case <-a.awakeWriteLoopCh:
+ case <-a.closeWriteLoopCh:
+ break loop
+ }
+ }
+
+ a.setState(closed)
+ a.closeAllTimers()
+
+ a.log.Debugf("[%s] writeLoop exited", a.name)
+}
+
+func (a *Association) awakeWriteLoop() {
+ select {
+ case a.awakeWriteLoopCh <- struct{}{}:
+ default:
+ }
+}
+
+// unregisterStream un-registers a stream from the association
+// The caller should hold the association write lock.
+func (a *Association) unregisterStream(s *Stream, err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ delete(a.streams, s.streamIdentifier)
+ s.readErr = err
+ s.readNotifier.Broadcast()
+}
+
+// handleInbound parses incoming raw packets
+func (a *Association) handleInbound(raw []byte) error {
+ p := &packet{}
+ if err := p.unmarshal(raw); err != nil {
+ a.log.Warnf("[%s] unable to parse SCTP packet %s", a.name, err)
+ return nil
+ }
+
+ if err := checkPacket(p); err != nil {
+ a.log.Warnf("[%s] failed validating packet %s", a.name, err)
+ return nil
+ }
+
+ a.handleChunkStart()
+
+ for _, c := range p.chunks {
+ if err := a.handleChunk(p, c); err != nil {
+ return err
+ }
+ }
+
+ a.handleChunkEnd()
+
+ return nil
+}
+
+// The caller should hold the lock
+func (a *Association) gatherOutboundDataAndReconfigPackets(rawPackets [][]byte) [][]byte {
+ for _, p := range a.getDataPacketsToRetransmit() {
+ raw, err := p.marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a DATA packet to be retransmitted", a.name)
+ continue
+ }
+ rawPackets = append(rawPackets, raw)
+ }
+
+ // Pop unsent data chunks from the pending queue to send as much as
+ // cwnd and rwnd allow.
+ chunks, sisToReset := a.popPendingDataChunksToSend()
+ if len(chunks) > 0 {
+ // Start timer. (noop if already started)
+ a.log.Tracef("[%s] T3-rtx timer start (pt1)", a.name)
+ a.t3RTX.start(a.rtoMgr.getRTO())
+ for _, p := range a.bundleDataChunksIntoPackets(chunks) {
+ raw, err := p.marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a DATA packet", a.name)
+ continue
+ }
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+
+ if len(sisToReset) > 0 || a.willRetransmitReconfig {
+ if a.willRetransmitReconfig {
+ a.willRetransmitReconfig = false
+ a.log.Debugf("[%s] retransmit %d RECONFIG chunk(s)", a.name, len(a.reconfigs))
+ for _, c := range a.reconfigs {
+ p := a.createPacket([]chunk{c})
+ raw, err := p.marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a RECONFIG packet to be retransmitted", a.name)
+ } else {
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+ }
+
+ if len(sisToReset) > 0 {
+ rsn := a.generateNextRSN()
+ tsn := a.myNextTSN - 1
+ c := &chunkReconfig{
+ paramA: &paramOutgoingResetRequest{
+ reconfigRequestSequenceNumber: rsn,
+ senderLastTSN: tsn,
+ streamIdentifiers: sisToReset,
+ },
+ }
+ a.reconfigs[rsn] = c // store in the map for retransmission
+ a.log.Debugf("[%s] sending RECONFIG: rsn=%d tsn=%d streams=%v",
+ a.name, rsn, a.myNextTSN-1, sisToReset)
+ p := a.createPacket([]chunk{c})
+ raw, err := p.marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a RECONFIG packet to be transmitted", a.name)
+ } else {
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+
+ if len(a.reconfigs) > 0 {
+ a.tReconfig.start(a.rtoMgr.getRTO())
+ }
+ }
+
+ return rawPackets
+}
+
+// The caller should hold the lock
+func (a *Association) gatherOutboundFrastRetransmissionPackets(rawPackets [][]byte) [][]byte {
+ if a.willRetransmitFast {
+ a.willRetransmitFast = false
+
+ toFastRetrans := []chunk{}
+ fastRetransSize := commonHeaderSize
+
+ for i := 0; ; i++ {
+ c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + uint32(i) + 1)
+ if !ok {
+ break // end of pending data
+ }
+
+ if c.acked || c.abandoned() {
+ continue
+ }
+
+ if c.nSent > 1 || c.missIndicator < 3 {
+ continue
+ }
+
+ // RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports
+ // 3) Determine how many of the earliest (i.e., lowest TSN) DATA chunks
+ // marked for retransmission will fit into a single packet, subject
+ // to constraint of the path MTU of the destination transport
+ // address to which the packet is being sent. Call this value K.
+ // Retransmit those K DATA chunks in a single packet. When a Fast
+ // Retransmit is being performed, the sender SHOULD ignore the value
+ // of cwnd and SHOULD NOT delay retransmission for this single
+ // packet.
+
+ dataChunkSize := dataChunkHeaderSize + uint32(len(c.userData))
+ if a.mtu < fastRetransSize+dataChunkSize {
+ break
+ }
+
+ fastRetransSize += dataChunkSize
+ a.stats.incFastRetrans()
+ c.nSent++
+ a.checkPartialReliabilityStatus(c)
+ toFastRetrans = append(toFastRetrans, c)
+ a.log.Tracef("[%s] fast-retransmit: tsn=%d sent=%d htna=%d",
+ a.name, c.tsn, c.nSent, a.fastRecoverExitPoint)
+ }
+
+ if len(toFastRetrans) > 0 {
+ raw, err := a.createPacket(toFastRetrans).marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a DATA packet to be fast-retransmitted", a.name)
+ } else {
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+ }
+
+ return rawPackets
+}
+
+// The caller should hold the lock
+func (a *Association) gatherOutboundSackPackets(rawPackets [][]byte) [][]byte {
+ if a.ackState == ackStateImmediate {
+ a.ackState = ackStateIdle
+ sack := a.createSelectiveAckChunk()
+ a.log.Debugf("[%s] sending SACK: %s", a.name, sack.String())
+ raw, err := a.createPacket([]chunk{sack}).marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a SACK packet", a.name)
+ } else {
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+
+ return rawPackets
+}
+
+// The caller should hold the lock
+func (a *Association) gatherOutboundForwardTSNPackets(rawPackets [][]byte) [][]byte {
+ if a.willSendForwardTSN {
+ a.willSendForwardTSN = false
+ if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
+ fwdtsn := a.createForwardTSN()
+ raw, err := a.createPacket([]chunk{fwdtsn}).marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a Forward TSN packet", a.name)
+ } else {
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+ }
+
+ return rawPackets
+}
+
+// gatherOutbound gathers outgoing packets
+func (a *Association) gatherOutbound() [][]byte {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ rawPackets := [][]byte{}
+
+ if a.controlQueue.size() > 0 {
+ for _, p := range a.controlQueue.popAll() {
+ raw, err := p.marshal()
+ if err != nil {
+ a.log.Warnf("[%s] failed to serialize a control packet", a.name)
+ continue
+ }
+ rawPackets = append(rawPackets, raw)
+ }
+ }
+
+ state := a.getState()
+
+ if state == established {
+ rawPackets = a.gatherOutboundDataAndReconfigPackets(rawPackets)
+ rawPackets = a.gatherOutboundFrastRetransmissionPackets(rawPackets)
+ rawPackets = a.gatherOutboundSackPackets(rawPackets)
+ rawPackets = a.gatherOutboundForwardTSNPackets(rawPackets)
+ }
+
+ return rawPackets
+}
+
+func checkPacket(p *packet) error {
+ // All packets must adhere to these rules
+
+ // This is the SCTP sender's port number. It can be used by the
+ // receiver in combination with the source IP address, the SCTP
+ // destination port, and possibly the destination IP address to
+ // identify the association to which this packet belongs. The port
+ // number 0 MUST NOT be used.
+ if p.sourcePort == 0 {
+ return errors.Errorf("sctp packet must not have a source port of 0")
+ }
+
+ // This is the SCTP port number to which this packet is destined.
+ // The receiving host will use this port number to de-multiplex the
+ // SCTP packet to the correct receiving endpoint/application. The
+ // port number 0 MUST NOT be used.
+ if p.destinationPort == 0 {
+ return errors.Errorf("sctp packet must not have a destination port of 0")
+ }
+
+ // Check values on the packet that are specific to a particular chunk type
+ for _, c := range p.chunks {
+ switch c.(type) { // nolint:gocritic
+ case *chunkInit:
+ // An INIT or INIT ACK chunk MUST NOT be bundled with any other chunk.
+ // They MUST be the only chunks present in the SCTP packets that carry
+ // them.
+ if len(p.chunks) != 1 {
+ return errors.Errorf("init chunk must not be bundled with any other chunk")
+ }
+
+ // A packet containing an INIT chunk MUST have a zero Verification
+ // Tag.
+ if p.verificationTag != 0 {
+ return errors.Errorf("init chunk expects a verification tag of 0 on the packet when out-of-the-blue")
+ }
+ }
+ }
+
+ return nil
+}
+
+func min16(a, b uint16) uint16 {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+func max32(a, b uint32) uint32 {
+ if a > b {
+ return a
+ }
+ return b
+}
+
+func min32(a, b uint32) uint32 {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+// setState atomically sets the state of the Association.
+// The caller should hold the lock.
+func (a *Association) setState(newState uint32) {
+ oldState := atomic.SwapUint32(&a.state, newState)
+ if newState != oldState {
+ a.log.Debugf("[%s] state change: '%s' => '%s'",
+ a.name,
+ getAssociationStateString(oldState),
+ getAssociationStateString(newState))
+ }
+}
+
+// getState atomically returns the state of the Association.
+func (a *Association) getState() uint32 {
+ return atomic.LoadUint32(&a.state)
+}
+
+// BytesSent returns the number of bytes sent
+func (a *Association) BytesSent() uint64 {
+ return atomic.LoadUint64(&a.bytesSent)
+}
+
+// BytesReceived returns the number of bytes received
+func (a *Association) BytesReceived() uint64 {
+ return atomic.LoadUint64(&a.bytesReceived)
+}
+
+func setSupportedExtensions(init *chunkInitCommon) {
+ // nolint:godox
+ // TODO RFC5061 https://tools.ietf.org/html/rfc6525#section-5.2
+ // An implementation supporting this (Supported Extensions Parameter)
+ // extension MUST list the ASCONF, the ASCONF-ACK, and the AUTH chunks
+ // in its INIT and INIT-ACK parameters.
+ init.params = append(init.params, &paramSupportedExtensions{
+ ChunkTypes: []chunkType{ctReconfig, ctForwardTSN},
+ })
+}
+
+// The caller should hold the lock.
+func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {
+ state := a.getState()
+ a.log.Debugf("[%s] chunkInit received in state '%s'", a.name, getAssociationStateString(state))
+
+ // https://tools.ietf.org/html/rfc4960#section-5.2.1
+ // Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
+ // respond with an INIT ACK using the same parameters it sent in its
+ // original INIT chunk (including its Initiate Tag, unchanged). When
+ // responding, the endpoint MUST send the INIT ACK back to the same
+ // address that the original INIT (sent by this endpoint) was sent.
+
+ if state != closed && state != cookieWait && state != cookieEchoed {
+ // 5.2.2. Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
+ // COOKIE-WAIT, and SHUTDOWN-ACK-SENT
+ return nil, errors.Errorf("todo: handle Init when in state %s", getAssociationStateString(state))
+ }
+
+ // Should we be setting any of these permanently until we've ACKed further?
+ a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
+ a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
+ a.peerVerificationTag = i.initiateTag
+ a.sourcePort = p.destinationPort
+ a.destinationPort = p.sourcePort
+
+ // 13.2 This is the last TSN received in sequence. This value
+ // is set initially by taking the peer's initial TSN,
+ // received in the INIT or INIT ACK chunk, and
+ // subtracting one from it.
+ a.peerLastTSN = i.initialTSN - 1
+
+ for _, param := range i.params {
+ switch v := param.(type) { // nolint:gocritic
+ case *paramSupportedExtensions:
+ for _, t := range v.ChunkTypes {
+ if t == ctForwardTSN {
+ a.log.Debugf("[%s] use ForwardTSN (on init)\n", a.name)
+ a.useForwardTSN = true
+ }
+ }
+ }
+ }
+ if !a.useForwardTSN {
+ a.log.Warnf("[%s] not using ForwardTSN (on init)\n", a.name)
+ }
+
+ outbound := &packet{}
+ outbound.verificationTag = a.peerVerificationTag
+ outbound.sourcePort = a.sourcePort
+ outbound.destinationPort = a.destinationPort
+
+ initAck := &chunkInitAck{}
+
+ initAck.initialTSN = a.myNextTSN
+ initAck.numOutboundStreams = a.myMaxNumOutboundStreams
+ initAck.numInboundStreams = a.myMaxNumInboundStreams
+ initAck.initiateTag = a.myVerificationTag
+ initAck.advertisedReceiverWindowCredit = a.maxReceiveBufferSize
+
+ if a.myCookie == nil {
+ var err error
+ if a.myCookie, err = newRandomStateCookie(); err != nil {
+ return nil, err
+ }
+ }
+
+ initAck.params = []param{a.myCookie}
+
+ setSupportedExtensions(&initAck.chunkInitCommon)
+
+ outbound.chunks = []chunk{initAck}
+
+ return pack(outbound), nil
+}
+
+// The caller should hold the lock.
+func (a *Association) handleInitAck(p *packet, i *chunkInitAck) error {
+ state := a.getState()
+ a.log.Debugf("[%s] chunkInitAck received in state '%s'", a.name, getAssociationStateString(state))
+ if state != cookieWait {
+ // RFC 4960
+ // 5.2.3. Unexpected INIT ACK
+ // If an INIT ACK is received by an endpoint in any state other than the
+ // COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk.
+ // An unexpected INIT ACK usually indicates the processing of an old or
+ // duplicated INIT chunk.
+ return nil
+ }
+
+ a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
+ a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
+ a.peerVerificationTag = i.initiateTag
+ a.peerLastTSN = i.initialTSN - 1
+ if a.sourcePort != p.destinationPort ||
+ a.destinationPort != p.sourcePort {
+ a.log.Warnf("[%s] handleInitAck: port mismatch", a.name)
+ return nil
+ }
+
+ a.rwnd = i.advertisedReceiverWindowCredit
+ a.log.Debugf("[%s] initial rwnd=%d", a.name, a.rwnd)
+
+ // RFC 4690 Sec 7.2.1
+ // o The initial value of ssthresh MAY be arbitrarily high (for
+ // example, implementations MAY use the size of the receiver
+ // advertised window).
+ a.ssthresh = a.rwnd
+ a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)",
+ a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
+
+ a.t1Init.stop()
+ a.storedInit = nil
+
+ var cookieParam *paramStateCookie
+ for _, param := range i.params {
+ switch v := param.(type) {
+ case *paramStateCookie:
+ cookieParam = v
+ case *paramSupportedExtensions:
+ for _, t := range v.ChunkTypes {
+ if t == ctForwardTSN {
+ a.log.Debugf("[%s] use ForwardTSN (on initAck)\n", a.name)
+ a.useForwardTSN = true
+ }
+ }
+ }
+ }
+ if !a.useForwardTSN {
+ a.log.Warnf("[%s] not using ForwardTSN (on initAck)\n", a.name)
+ }
+ if cookieParam == nil {
+ return errors.Errorf("no cookie in InitAck")
+ }
+
+ a.storedCookieEcho = &chunkCookieEcho{}
+ a.storedCookieEcho.cookie = cookieParam.cookie
+
+ err := a.sendCookieEcho()
+ if err != nil {
+ a.log.Errorf("[%s] failed to send init: %s", a.name, err.Error())
+ }
+
+ a.t1Cookie.start(a.rtoMgr.getRTO())
+ a.setState(cookieEchoed)
+ return nil
+}
+
+// The caller should hold the lock.
+func (a *Association) handleHeartbeat(c *chunkHeartbeat) []*packet {
+ a.log.Tracef("[%s] chunkHeartbeat", a.name)
+ hbi, ok := c.params[0].(*paramHeartbeatInfo)
+ if !ok {
+ a.log.Warnf("[%s] failed to handle Heartbeat, no ParamHeartbeatInfo", a.name)
+ }
+
+ return pack(&packet{
+ verificationTag: a.peerVerificationTag,
+ sourcePort: a.sourcePort,
+ destinationPort: a.destinationPort,
+ chunks: []chunk{&chunkHeartbeatAck{
+ params: []param{
+ &paramHeartbeatInfo{
+ heartbeatInformation: hbi.heartbeatInformation,
+ },
+ },
+ }},
+ })
+}
+
+// The caller should hold the lock.
+func (a *Association) handleCookieEcho(c *chunkCookieEcho) []*packet {
+ state := a.getState()
+ a.log.Debugf("[%s] COOKIE-ECHO received in state '%s'", a.name, getAssociationStateString(state))
+ switch state {
+ default:
+ return nil
+ case established:
+ if !bytes.Equal(a.myCookie.cookie, c.cookie) {
+ return nil
+ }
+ case closed, cookieWait, cookieEchoed:
+ if !bytes.Equal(a.myCookie.cookie, c.cookie) {
+ return nil
+ }
+
+ a.t1Init.stop()
+ a.storedInit = nil
+
+ a.t1Cookie.stop()
+ a.storedCookieEcho = nil
+
+ a.setState(established)
+ a.handshakeCompletedCh <- nil
+ }
+
+ p := &packet{
+ verificationTag: a.peerVerificationTag,
+ sourcePort: a.sourcePort,
+ destinationPort: a.destinationPort,
+ chunks: []chunk{&chunkCookieAck{}},
+ }
+ return pack(p)
+}
+
+// The caller should hold the lock.
+func (a *Association) handleCookieAck() {
+ state := a.getState()
+ a.log.Debugf("[%s] COOKIE-ACK received in state '%s'", a.name, getAssociationStateString(state))
+ if state != cookieEchoed {
+ // RFC 4960
+ // 5.2.5. Handle Duplicate COOKIE-ACK.
+ // At any state other than COOKIE-ECHOED, an endpoint should silently
+ // discard a received COOKIE ACK chunk.
+ return
+ }
+
+ a.t1Cookie.stop()
+ a.storedCookieEcho = nil
+
+ a.setState(established)
+ a.handshakeCompletedCh <- nil
+}
+
+// The caller should hold the lock.
+func (a *Association) handleData(d *chunkPayloadData) []*packet {
+ a.log.Tracef("[%s] DATA: tsn=%d immediateSack=%v len=%d",
+ a.name, d.tsn, d.immediateSack, len(d.userData))
+ a.stats.incDATAs()
+
+ canPush := a.payloadQueue.canPush(d, a.peerLastTSN)
+ if canPush {
+ s := a.getOrCreateStream(d.streamIdentifier)
+ if s == nil {
+ // silentely discard the data. (sender will retry on T3-rtx timeout)
+ // see pion/sctp#30
+ a.log.Debugf("discard %d", d.streamSequenceNumber)
+ return nil
+ }
+
+ if a.getMyReceiverWindowCredit() > 0 {
+ // Pass the new chunk to stream level as soon as it arrives
+ a.payloadQueue.push(d, a.peerLastTSN)
+ s.handleData(d)
+ } else {
+ // Receive buffer is full
+ lastTSN, ok := a.payloadQueue.getLastTSNReceived()
+ if ok && sna32LT(d.tsn, lastTSN) {
+ a.log.Debugf("[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
+ a.payloadQueue.push(d, a.peerLastTSN)
+ s.handleData(d)
+ } else {
+ a.log.Debugf("[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
+ }
+ }
+ }
+
+ return a.handlePeerLastTSNAndAcknowledgement(d.immediateSack)
+}
+
+// A common routine for handleData and handleForwardTSN routines
+// The caller should hold the lock.
+func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool) []*packet {
+ var reply []*packet
+
+ // Try to advance peerLastTSN
+
+ // From RFC 3758 Sec 3.6:
+ // .. and then MUST further advance its cumulative TSN point locally
+ // if possible
+ // Meaning, if peerLastTSN+1 points to a chunk that is received,
+ // advance peerLastTSN until peerLastTSN+1 points to unreceived chunk.
+ for {
+ if _, popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk {
+ break
+ }
+ a.peerLastTSN++
+
+ for _, rstReq := range a.reconfigRequests {
+ resp := a.resetStreamsIfAny(rstReq)
+ if resp != nil {
+ a.log.Debugf("[%s] RESET RESPONSE: %+v", a.name, resp)
+ reply = append(reply, resp)
+ }
+ }
+ }
+
+ hasPacketLoss := (a.payloadQueue.size() > 0)
+ if hasPacketLoss {
+ a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString(a.peerLastTSN))
+ }
+
+ if (a.ackState != ackStateImmediate && !sackImmediately && !hasPacketLoss && a.ackMode == ackModeNormal) || a.ackMode == ackModeAlwaysDelay {
+ if a.ackState == ackStateIdle {
+ a.delayedAckTriggered = true
+ } else {
+ a.immediateAckTriggered = true
+ }
+ } else {
+ a.immediateAckTriggered = true
+ }
+
+ return reply
+}
+
+// The caller should hold the lock.
+func (a *Association) getMyReceiverWindowCredit() uint32 {
+ var bytesQueued uint32
+ for _, s := range a.streams {
+ bytesQueued += uint32(s.getNumBytesInReassemblyQueue())
+ }
+
+ if bytesQueued >= a.maxReceiveBufferSize {
+ return 0
+ }
+ return a.maxReceiveBufferSize - bytesQueued
+}
+
+// OpenStream opens a stream
+func (a *Association) OpenStream(streamIdentifier uint16, defaultPayloadType PayloadProtocolIdentifier) (*Stream, error) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ if _, ok := a.streams[streamIdentifier]; ok {
+ return nil, errors.Errorf("there already exists a stream with identifier %d", streamIdentifier)
+ }
+
+ s := a.createStream(streamIdentifier, false)
+ s.setDefaultPayloadType(defaultPayloadType)
+
+ return s, nil
+}
+
+// AcceptStream accepts a stream
+func (a *Association) AcceptStream() (*Stream, error) {
+ s, ok := <-a.acceptCh
+ if !ok {
+ return nil, io.EOF // no more incoming streams
+ }
+ return s, nil
+}
+
+// createStream creates a stream. The caller should hold the lock and check no stream exists for this id.
+func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream {
+ s := &Stream{
+ association: a,
+ streamIdentifier: streamIdentifier,
+ reassemblyQueue: newReassemblyQueue(streamIdentifier),
+ log: a.log,
+ name: fmt.Sprintf("%d:%s", streamIdentifier, a.name),
+ }
+
+ s.readNotifier = sync.NewCond(&s.lock)
+
+ if accept {
+ select {
+ case a.acceptCh <- s:
+ a.streams[streamIdentifier] = s
+ a.log.Debugf("[%s] accepted a new stream (streamIdentifier: %d)",
+ a.name, streamIdentifier)
+ default:
+ a.log.Debugf("[%s] dropped a new stream (acceptCh size: %d)",
+ a.name, len(a.acceptCh))
+ return nil
+ }
+ } else {
+ a.streams[streamIdentifier] = s
+ }
+
+ return s
+}
+
+// getOrCreateStream gets or creates a stream. The caller should hold the lock.
+func (a *Association) getOrCreateStream(streamIdentifier uint16) *Stream {
+ if s, ok := a.streams[streamIdentifier]; ok {
+ return s
+ }
+
+ return a.createStream(streamIdentifier, true)
+}
+
+// The caller should hold the lock.
+func (a *Association) processSelectiveAck(d *chunkSelectiveAck) (map[uint16]int, uint32, error) { // nolint:gocognit
+ bytesAckedPerStream := map[uint16]int{}
+
+ // New ack point, so pop all ACKed packets from inflightQueue
+ // We add 1 because the "currentAckPoint" has already been popped from the inflight queue
+ // For the first SACK we take care of this by setting the ackpoint to cumAck - 1
+ for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, d.cumulativeTSNAck); i++ {
+ c, ok := a.inflightQueue.pop(i)
+ if !ok {
+ return nil, 0, errors.Errorf("tsn %v unable to be popped from inflight queue", i)
+ }
+
+ if !c.acked {
+ // RFC 4096 sec 6.3.2. Retransmission Timer Rules
+ // R3) Whenever a SACK is received that acknowledges the DATA chunk
+ // with the earliest outstanding TSN for that address, restart the
+ // T3-rtx timer for that address with its current RTO (if there is
+ // still outstanding data on that address).
+ if i == a.cumulativeTSNAckPoint+1 {
+ // T3 timer needs to be reset. Stop it for now.
+ a.t3RTX.stop()
+ }
+
+ nBytesAcked := len(c.userData)
+
+ // Sum the number of bytes acknowledged per stream
+ if amount, ok := bytesAckedPerStream[c.streamIdentifier]; ok {
+ bytesAckedPerStream[c.streamIdentifier] = amount + nBytesAcked
+ } else {
+ bytesAckedPerStream[c.streamIdentifier] = nBytesAcked
+ }
+
+ // RFC 4960 sec 6.3.1. RTO Calculation
+ // C4) When data is in flight and when allowed by rule C5 below, a new
+ // RTT measurement MUST be made each round trip. Furthermore, new
+ // RTT measurements SHOULD be made no more than once per round trip
+ // for a given destination transport address.
+ // C5) Karn's algorithm: RTT measurements MUST NOT be made using
+ // packets that were retransmitted (and thus for which it is
+ // ambiguous whether the reply was for the first instance of the
+ // chunk or for a later instance)
+ if c.nSent == 1 && sna32GTE(c.tsn, a.minTSN2MeasureRTT) {
+ a.minTSN2MeasureRTT = a.myNextTSN
+ rtt := time.Since(c.since).Seconds() * 1000.0
+ srtt := a.rtoMgr.setNewRTT(rtt)
+ a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
+ a.name, rtt, srtt, a.rtoMgr.getRTO())
+ }
+ }
+
+ if a.inFastRecovery && c.tsn == a.fastRecoverExitPoint {
+ a.log.Debugf("[%s] exit fast-recovery", a.name)
+ a.inFastRecovery = false
+ }
+ }
+
+ htna := d.cumulativeTSNAck
+
+ // Mark selectively acknowledged chunks as "acked"
+ for _, g := range d.gapAckBlocks {
+ for i := g.start; i <= g.end; i++ {
+ tsn := d.cumulativeTSNAck + uint32(i)
+ c, ok := a.inflightQueue.get(tsn)
+ if !ok {
+ return nil, 0, errors.Errorf("requested non-existent TSN %v", tsn)
+ }
+
+ if !c.acked {
+ nBytesAcked := a.inflightQueue.markAsAcked(tsn)
+
+ // Sum the number of bytes acknowledged per stream
+ if amount, ok := bytesAckedPerStream[c.streamIdentifier]; ok {
+ bytesAckedPerStream[c.streamIdentifier] = amount + nBytesAcked
+ } else {
+ bytesAckedPerStream[c.streamIdentifier] = nBytesAcked
+ }
+
+ a.log.Tracef("[%s] tsn=%d has been sacked", a.name, c.tsn)
+
+ if c.nSent == 1 {
+ a.minTSN2MeasureRTT = a.myNextTSN
+ rtt := time.Since(c.since).Seconds() * 1000.0
+ srtt := a.rtoMgr.setNewRTT(rtt)
+ a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
+ a.name, rtt, srtt, a.rtoMgr.getRTO())
+ }
+
+ if sna32LT(htna, tsn) {
+ htna = tsn
+ }
+ }
+ }
+ }
+
+ return bytesAckedPerStream, htna, nil
+}
+
+// The caller should hold the lock.
+func (a *Association) onCumulativeTSNAckPointAdvanced(totalBytesAcked int) {
+ // RFC 4096, sec 6.3.2. Retransmission Timer Rules
+ // R2) Whenever all outstanding data sent to an address have been
+ // acknowledged, turn off the T3-rtx timer of that address.
+ if a.inflightQueue.size() == 0 {
+ a.log.Tracef("[%s] SACK: no more packet in-flight (pending=%d)", a.name, a.pendingQueue.size())
+ a.t3RTX.stop()
+ } else {
+ a.log.Tracef("[%s] T3-rtx timer start (pt2)", a.name)
+ a.t3RTX.start(a.rtoMgr.getRTO())
+ }
+
+ // Update congestion control parameters
+ if a.cwnd <= a.ssthresh {
+ // RFC 4096, sec 7.2.1. Slow-Start
+ // o When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST
+ // use the slow-start algorithm to increase cwnd only if the current
+ // congestion window is being fully utilized, an incoming SACK
+ // advances the Cumulative TSN Ack Point, and the data sender is not
+ // in Fast Recovery. Only when these three conditions are met can
+ // the cwnd be increased; otherwise, the cwnd MUST not be increased.
+ // If these conditions are met, then cwnd MUST be increased by, at
+ // most, the lesser of 1) the total size of the previously
+ // outstanding DATA chunk(s) acknowledged, and 2) the destination's
+ // path MTU.
+ if !a.inFastRecovery &&
+ a.pendingQueue.size() > 0 {
+ a.cwnd += min32(uint32(totalBytesAcked), a.cwnd) // TCP way
+ // a.cwnd += min32(uint32(totalBytesAcked), a.mtu) // SCTP way (slow)
+ a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (SS)",
+ a.name, a.cwnd, a.ssthresh, totalBytesAcked)
+ } else {
+ a.log.Tracef("[%s] cwnd did not grow: cwnd=%d ssthresh=%d acked=%d FR=%v pending=%d",
+ a.name, a.cwnd, a.ssthresh, totalBytesAcked, a.inFastRecovery, a.pendingQueue.size())
+ }
+ } else {
+ // RFC 4096, sec 7.2.2. Congestion Avoidance
+ // o Whenever cwnd is greater than ssthresh, upon each SACK arrival
+ // that advances the Cumulative TSN Ack Point, increase
+ // partial_bytes_acked by the total number of bytes of all new chunks
+ // acknowledged in that SACK including chunks acknowledged by the new
+ // Cumulative TSN Ack and by Gap Ack Blocks.
+ a.partialBytesAcked += uint32(totalBytesAcked)
+
+ // o When partial_bytes_acked is equal to or greater than cwnd and
+ // before the arrival of the SACK the sender had cwnd or more bytes
+ // of data outstanding (i.e., before arrival of the SACK, flight size
+ // was greater than or equal to cwnd), increase cwnd by MTU, and
+ // reset partial_bytes_acked to (partial_bytes_acked - cwnd).
+ if a.partialBytesAcked >= a.cwnd && a.pendingQueue.size() > 0 {
+ a.partialBytesAcked -= a.cwnd
+ a.cwnd += a.mtu
+ a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (CA)",
+ a.name, a.cwnd, a.ssthresh, totalBytesAcked)
+ }
+ }
+}
+
+// The caller should hold the lock.
+func (a *Association) processFastRetransmission(cumTSNAckPoint, htna uint32, cumTSNAckPointAdvanced bool) error {
+ // HTNA algorithm - RFC 4960 Sec 7.2.4
+ // Increment missIndicator of each chunks that the SACK reported missing
+ // when either of the following is met:
+ // a) Not in fast-recovery
+ // miss indications are incremented only for missing TSNs prior to the
+ // highest TSN newly acknowledged in the SACK.
+ // b) In fast-recovery AND the Cumulative TSN Ack Point advanced
+ // the miss indications are incremented for all TSNs reported missing
+ // in the SACK.
+ if !a.inFastRecovery || (a.inFastRecovery && cumTSNAckPointAdvanced) {
+ var maxTSN uint32
+ if !a.inFastRecovery {
+ // a) increment only for missing TSNs prior to the HTNA
+ maxTSN = htna
+ } else {
+ // b) increment for all TSNs reported missing
+ maxTSN = cumTSNAckPoint + uint32(a.inflightQueue.size()) + 1
+ }
+
+ for tsn := cumTSNAckPoint + 1; sna32LT(tsn, maxTSN); tsn++ {
+ c, ok := a.inflightQueue.get(tsn)
+ if !ok {
+ return errors.Errorf("requested non-existent TSN %v", tsn)
+ }
+ if !c.acked && !c.abandoned() && c.missIndicator < 3 {
+ c.missIndicator++
+ if c.missIndicator == 3 {
+ if !a.inFastRecovery {
+ // 2) If not in Fast Recovery, adjust the ssthresh and cwnd of the
+ // destination address(es) to which the missing DATA chunks were
+ // last sent, according to the formula described in Section 7.2.3.
+ a.inFastRecovery = true
+ a.fastRecoverExitPoint = htna
+ a.ssthresh = max32(a.cwnd/2, 4*a.mtu)
+ a.cwnd = a.ssthresh
+ a.partialBytesAcked = 0
+ a.willRetransmitFast = true
+
+ a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (FR)",
+ a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
+ }
+ }
+ }
+ }
+ }
+
+ if a.inFastRecovery && cumTSNAckPointAdvanced {
+ a.willRetransmitFast = true
+ }
+
+ return nil
+}
+
+// The caller should hold the lock.
+func (a *Association) handleSack(d *chunkSelectiveAck) error {
+ a.log.Tracef("[%s] SACK: cumTSN=%d a_rwnd=%d", a.name, d.cumulativeTSNAck, d.advertisedReceiverWindowCredit)
+ state := a.getState()
+ if state != established {
+ return nil
+ }
+
+ a.stats.incSACKs()
+
+ if sna32GT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) {
+ // RFC 4960 sec 6.2.1. Processing a Received SACK
+ // D)
+ // i) If Cumulative TSN Ack is less than the Cumulative TSN Ack
+ // Point, then drop the SACK. Since Cumulative TSN Ack is
+ // monotonically increasing, a SACK whose Cumulative TSN Ack is
+ // less than the Cumulative TSN Ack Point indicates an out-of-
+ // order SACK.
+
+ a.log.Debugf("[%s] SACK Cumulative ACK %v is older than ACK point %v",
+ a.name,
+ d.cumulativeTSNAck,
+ a.cumulativeTSNAckPoint)
+
+ return nil
+ }
+
+ // Process selective ack
+ bytesAckedPerStream, htna, err := a.processSelectiveAck(d)
+ if err != nil {
+ return err
+ }
+
+ var totalBytesAcked int
+ for _, nBytesAcked := range bytesAckedPerStream {
+ totalBytesAcked += nBytesAcked
+ }
+
+ cumTSNAckPointAdvanced := false
+ if sna32LT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) {
+ a.log.Tracef("[%s] SACK: cumTSN advanced: %d -> %d",
+ a.name,
+ a.cumulativeTSNAckPoint,
+ d.cumulativeTSNAck)
+
+ a.cumulativeTSNAckPoint = d.cumulativeTSNAck
+ cumTSNAckPointAdvanced = true
+ a.onCumulativeTSNAckPointAdvanced(totalBytesAcked)
+ }
+
+ for si, nBytesAcked := range bytesAckedPerStream {
+ if s, ok := a.streams[si]; ok {
+ a.lock.Unlock()
+ s.onBufferReleased(nBytesAcked)
+ a.lock.Lock()
+ }
+ }
+
+ // New rwnd value
+ // RFC 4960 sec 6.2.1. Processing a Received SACK
+ // D)
+ // ii) Set rwnd equal to the newly received a_rwnd minus the number
+ // of bytes still outstanding after processing the Cumulative
+ // TSN Ack and the Gap Ack Blocks.
+
+ // bytes acked were already subtracted by markAsAcked() method
+ bytesOutstanding := uint32(a.inflightQueue.getNumBytes())
+ if bytesOutstanding >= d.advertisedReceiverWindowCredit {
+ a.rwnd = 0
+ } else {
+ a.rwnd = d.advertisedReceiverWindowCredit - bytesOutstanding
+ }
+
+ err = a.processFastRetransmission(d.cumulativeTSNAck, htna, cumTSNAckPointAdvanced)
+ if err != nil {
+ return err
+ }
+
+ if a.useForwardTSN {
+ // RFC 3758 Sec 3.5 C1
+ if sna32LT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
+ a.advancedPeerTSNAckPoint = a.cumulativeTSNAckPoint
+ }
+
+ // RFC 3758 Sec 3.5 C2
+ for i := a.advancedPeerTSNAckPoint + 1; ; i++ {
+ c, ok := a.inflightQueue.get(i)
+ if !ok {
+ break
+ }
+ if !c.abandoned() {
+ break
+ }
+ a.advancedPeerTSNAckPoint = i
+ }
+
+ // RFC 3758 Sec 3.5 C3
+ if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
+ a.willSendForwardTSN = true
+ }
+ a.awakeWriteLoop()
+ }
+
+ if a.inflightQueue.size() > 0 {
+ // Start timer. (noop if already started)
+ a.log.Tracef("[%s] T3-rtx timer start (pt3)", a.name)
+ a.t3RTX.start(a.rtoMgr.getRTO())
+ }
+
+ if cumTSNAckPointAdvanced {
+ a.awakeWriteLoop()
+ }
+
+ return nil
+}
+
+// createForwardTSN generates ForwardTSN chunk.
+// This method will be be called if useForwardTSN is set to false.
+// The caller should hold the lock.
+func (a *Association) createForwardTSN() *chunkForwardTSN {
+ // RFC 3758 Sec 3.5 C4
+ streamMap := map[uint16]uint16{} // to report only once per SI
+ for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, a.advancedPeerTSNAckPoint); i++ {
+ c, ok := a.inflightQueue.get(i)
+ if !ok {
+ break
+ }
+
+ ssn, ok := streamMap[c.streamIdentifier]
+ if !ok {
+ streamMap[c.streamIdentifier] = c.streamSequenceNumber
+ } else if sna16LT(ssn, c.streamSequenceNumber) {
+ // to report only once with greatest SSN
+ streamMap[c.streamIdentifier] = c.streamSequenceNumber
+ }
+ }
+
+ fwdtsn := &chunkForwardTSN{
+ newCumulativeTSN: a.advancedPeerTSNAckPoint,
+ streams: []chunkForwardTSNStream{},
+ }
+
+ var streamStr string
+ for si, ssn := range streamMap {
+ streamStr += fmt.Sprintf("(si=%d ssn=%d)", si, ssn)
+ fwdtsn.streams = append(fwdtsn.streams, chunkForwardTSNStream{
+ identifier: si,
+ sequence: ssn,
+ })
+ }
+ a.log.Tracef("[%s] building fwdtsn: newCumulativeTSN=%d cumTSN=%d - %s", a.name, fwdtsn.newCumulativeTSN, a.cumulativeTSNAckPoint, streamStr)
+
+ return fwdtsn
+}
+
+// createPacket wraps chunks in a packet.
+// The caller should hold the read lock.
+func (a *Association) createPacket(cs []chunk) *packet {
+ return &packet{
+ verificationTag: a.peerVerificationTag,
+ sourcePort: a.sourcePort,
+ destinationPort: a.destinationPort,
+ chunks: cs,
+ }
+}
+
+// The caller should hold the lock.
+func (a *Association) handleReconfig(c *chunkReconfig) ([]*packet, error) {
+ a.log.Tracef("[%s] handleReconfig", a.name)
+
+ pp := make([]*packet, 0)
+
+ p, err := a.handleReconfigParam(c.paramA)
+ if err != nil {
+ return nil, err
+ }
+ if p != nil {
+ pp = append(pp, p)
+ }
+
+ if c.paramB != nil {
+ p, err = a.handleReconfigParam(c.paramB)
+ if err != nil {
+ return nil, err
+ }
+ if p != nil {
+ pp = append(pp, p)
+ }
+ }
+ return pp, nil
+}
+
+// The caller should hold the lock.
+func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet {
+ a.log.Tracef("[%s] FwdTSN: %s", a.name, c.String())
+
+ if !a.useForwardTSN {
+ a.log.Warn("[%s] received FwdTSN but not enabled")
+ // Return an error chunk
+ cerr := &chunkError{
+ errorCauses: []errorCause{&errorCauseUnrecognizedChunkType{}},
+ }
+ outbound := &packet{}
+ outbound.verificationTag = a.peerVerificationTag
+ outbound.sourcePort = a.sourcePort
+ outbound.destinationPort = a.destinationPort
+ outbound.chunks = []chunk{cerr}
+ return []*packet{outbound}
+ }
+
+ // From RFC 3758 Sec 3.6:
+ // Note, if the "New Cumulative TSN" value carried in the arrived
+ // FORWARD TSN chunk is found to be behind or at the current cumulative
+ // TSN point, the data receiver MUST treat this FORWARD TSN as out-of-
+ // date and MUST NOT update its Cumulative TSN. The receiver SHOULD
+ // send a SACK to its peer (the sender of the FORWARD TSN) since such a
+ // duplicate may indicate the previous SACK was lost in the network.
+
+ a.log.Tracef("[%s] should send ack? newCumTSN=%d peerLastTSN=%d\n",
+ a.name, c.newCumulativeTSN, a.peerLastTSN)
+ if sna32LTE(c.newCumulativeTSN, a.peerLastTSN) {
+ a.log.Tracef("[%s] sending ack on Forward TSN", a.name)
+ a.ackState = ackStateImmediate
+ a.ackTimer.stop()
+ a.awakeWriteLoop()
+ return nil
+ }
+
+ // From RFC 3758 Sec 3.6:
+ // the receiver MUST perform the same TSN handling, including duplicate
+ // detection, gap detection, SACK generation, cumulative TSN
+ // advancement, etc. as defined in RFC 2960 [2]---with the following
+ // exceptions and additions.
+
+ // When a FORWARD TSN chunk arrives, the data receiver MUST first update
+ // its cumulative TSN point to the value carried in the FORWARD TSN
+ // chunk,
+
+ // Advance peerLastTSN
+ for sna32LT(a.peerLastTSN, c.newCumulativeTSN) {
+ a.payloadQueue.pop(a.peerLastTSN + 1) // may not exist
+ a.peerLastTSN++
+ }
+
+ // Report new peerLastTSN value and abandoned largest SSN value to
+ // corresponding streams so that the abandoned chunks can be removed
+ // from the reassemblyQueue.
+ for _, forwarded := range c.streams {
+ if s, ok := a.streams[forwarded.identifier]; ok {
+ s.handleForwardTSNForOrdered(forwarded.sequence)
+ }
+ }
+
+ // TSN may be forewared for unordered chunks. ForwardTSN chunk does not
+ // report which stream identifier it skipped for unordered chunks.
+ // Therefore, we need to broadcast this event to all existing streams for
+ // unordered chunks.
+ // See https://github.com/pion/sctp/issues/106
+ for _, s := range a.streams {
+ s.handleForwardTSNForUnordered(c.newCumulativeTSN)
+ }
+
+ return a.handlePeerLastTSNAndAcknowledgement(false)
+}
+
+func (a *Association) sendResetRequest(streamIdentifier uint16) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ state := a.getState()
+ if state != established {
+ return errors.Errorf("sending reset packet in non-established state: state=%s",
+ getAssociationStateString(state))
+ }
+
+ // Create DATA chunk which only contains valid stream identifier with
+ // nil userData and use it as a EOS from the stream.
+ c := &chunkPayloadData{
+ streamIdentifier: streamIdentifier,
+ beginningFragment: true,
+ endingFragment: true,
+ userData: nil,
+ }
+
+ a.pendingQueue.push(c)
+ a.awakeWriteLoop()
+ return nil
+}
+
+// The caller should hold the lock.
+func (a *Association) handleReconfigParam(raw param) (*packet, error) {
+ switch p := raw.(type) {
+ case *paramOutgoingResetRequest:
+ a.reconfigRequests[p.reconfigRequestSequenceNumber] = p
+ resp := a.resetStreamsIfAny(p)
+ if resp != nil {
+ return resp, nil
+ }
+ return nil, nil
+
+ case *paramReconfigResponse:
+ delete(a.reconfigs, p.reconfigResponseSequenceNumber)
+ if len(a.reconfigs) == 0 {
+ a.tReconfig.stop()
+ }
+ return nil, nil
+ default:
+ return nil, errors.Errorf("unexpected parameter type %T", p)
+ }
+}
+
+// The caller should hold the lock.
+func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
+ result := reconfigResultSuccessPerformed
+ if sna32LTE(p.senderLastTSN, a.peerLastTSN) {
+ a.log.Debugf("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d",
+ a.name, p.senderLastTSN, a.peerLastTSN)
+ for _, id := range p.streamIdentifiers {
+ s, ok := a.streams[id]
+ if !ok {
+ continue
+ }
+ a.unregisterStream(s, io.EOF)
+ }
+ delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
+ } else {
+ a.log.Debugf("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d",
+ a.name, p.senderLastTSN, a.peerLastTSN)
+ result = reconfigResultInProgress
+ }
+
+ return a.createPacket([]chunk{&chunkReconfig{
+ paramA: &paramReconfigResponse{
+ reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber,
+ result: result,
+ },
+ }})
+}
+
+// Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue.
+// The caller should hold the lock.
+func (a *Association) movePendingDataChunkToInflightQueue(c *chunkPayloadData) {
+ if err := a.pendingQueue.pop(c); err != nil {
+ a.log.Errorf("[%s] failed to pop from pending queue: %s", a.name, err.Error())
+ }
+
+ // Mark all fragements are in-flight now
+ if c.endingFragment {
+ c.setAllInflight()
+ }
+
+ // Assign TSN
+ c.tsn = a.generateNextTSN()
+
+ c.since = time.Now() // use to calculate RTT and also for maxPacketLifeTime
+ c.nSent = 1 // being sent for the first time
+
+ a.checkPartialReliabilityStatus(c)
+
+ a.log.Tracef("[%s] sending ppi=%d tsn=%d ssn=%d sent=%d len=%d (%v,%v)",
+ a.name, c.payloadType, c.tsn, c.streamSequenceNumber, c.nSent, len(c.userData), c.beginningFragment, c.endingFragment)
+
+ // Push it into the inflightQueue
+ a.inflightQueue.pushNoCheck(c)
+}
+
+// popPendingDataChunksToSend pops chunks from the pending queues as many as
+// the cwnd and rwnd allows to send.
+// The caller should hold the lock.
+func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint16) {
+ chunks := []*chunkPayloadData{}
+ var sisToReset []uint16 // stream identifieres to reset
+
+ if a.pendingQueue.size() > 0 {
+ // RFC 4960 sec 6.1. Transmission of DATA Chunks
+ // A) At any given time, the data sender MUST NOT transmit new data to
+ // any destination transport address if its peer's rwnd indicates
+ // that the peer has no buffer space (i.e., rwnd is 0; see Section
+ // 6.2.1). However, regardless of the value of rwnd (including if it
+ // is 0), the data sender can always have one DATA chunk in flight to
+ // the receiver if allowed by cwnd (see rule B, below).
+
+ for {
+ c := a.pendingQueue.peek()
+ if c == nil {
+ break // no more pending data
+ }
+
+ dataLen := uint32(len(c.userData))
+ if dataLen == 0 {
+ sisToReset = append(sisToReset, c.streamIdentifier)
+ err := a.pendingQueue.pop(c)
+ if err != nil {
+ a.log.Errorf("failed to pop from pending queue: %s", err.Error())
+ }
+ continue
+ }
+
+ if uint32(a.inflightQueue.getNumBytes())+dataLen > a.cwnd {
+ break // would exceeds cwnd
+ }
+
+ if dataLen > a.rwnd {
+ break // no more rwnd
+ }
+
+ a.rwnd -= dataLen
+
+ a.movePendingDataChunkToInflightQueue(c)
+ chunks = append(chunks, c)
+ }
+
+ // the data sender can always have one DATA chunk in flight to the receiver
+ if len(chunks) == 0 && a.inflightQueue.size() == 0 {
+ // Send zero window probe
+ c := a.pendingQueue.peek()
+ if c != nil {
+ a.movePendingDataChunkToInflightQueue(c)
+ chunks = append(chunks, c)
+ }
+ }
+ }
+
+ return chunks, sisToReset
+}
+
+// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
+// DATA chunks into a packet so long as the resulting packet size does not exceed
+// the path MTU.
+// The caller should hold the lock.
+func (a *Association) bundleDataChunksIntoPackets(chunks []*chunkPayloadData) []*packet {
+ packets := []*packet{}
+ chunksToSend := []chunk{}
+ bytesInPacket := int(commonHeaderSize)
+
+ for _, c := range chunks {
+ // RFC 4960 sec 6.1. Transmission of DATA Chunks
+ // Multiple DATA chunks committed for transmission MAY be bundled in a
+ // single packet. Furthermore, DATA chunks being retransmitted MAY be
+ // bundled with new DATA chunks, as long as the resulting packet size
+ // does not exceed the path MTU.
+ if bytesInPacket+len(c.userData) > int(a.mtu) {
+ packets = append(packets, a.createPacket(chunksToSend))
+ chunksToSend = []chunk{}
+ bytesInPacket = int(commonHeaderSize)
+ }
+
+ chunksToSend = append(chunksToSend, c)
+ bytesInPacket += int(dataChunkHeaderSize) + len(c.userData)
+ }
+
+ if len(chunksToSend) > 0 {
+ packets = append(packets, a.createPacket(chunksToSend))
+ }
+
+ return packets
+}
+
+// sendPayloadData sends the data chunks.
+func (a *Association) sendPayloadData(chunks []*chunkPayloadData) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ state := a.getState()
+ if state != established {
+ return errors.Errorf("sending payload data in non-established state: state=%s",
+ getAssociationStateString(state))
+ }
+
+ // Push the chunks into the pending queue first.
+ for _, c := range chunks {
+ a.pendingQueue.push(c)
+ }
+
+ a.awakeWriteLoop()
+ return nil
+}
+
+// The caller should hold the lock.
+func (a *Association) checkPartialReliabilityStatus(c *chunkPayloadData) {
+ if !a.useForwardTSN {
+ return
+ }
+
+ // draft-ietf-rtcweb-data-protocol-09.txt section 6
+ // 6. Procedures
+ // All Data Channel Establishment Protocol messages MUST be sent using
+ // ordered delivery and reliable transmission.
+ //
+ if c.payloadType == PayloadTypeWebRTCDCEP {
+ return
+ }
+
+ // PR-SCTP
+ if s, ok := a.streams[c.streamIdentifier]; ok {
+ s.lock.RLock()
+ if s.reliabilityType == ReliabilityTypeRexmit {
+ if c.nSent >= s.reliabilityValue {
+ c.setAbandoned(true)
+ a.log.Tracef("[%s] marked as abandoned: tsn=%d ppi=%d (remix: %d)", a.name, c.tsn, c.payloadType, c.nSent)
+ }
+ } else if s.reliabilityType == ReliabilityTypeTimed {
+ elapsed := int64(time.Since(c.since).Seconds() * 1000)
+ if elapsed >= int64(s.reliabilityValue) {
+ c.setAbandoned(true)
+ a.log.Tracef("[%s] marked as abandoned: tsn=%d ppi=%d (timed: %d)", a.name, c.tsn, c.payloadType, elapsed)
+ }
+ }
+ s.lock.RUnlock()
+ } else {
+ a.log.Errorf("[%s] stream %d not found)", a.name, c.streamIdentifier)
+ }
+}
+
+// getDataPacketsToRetransmit is called when T3-rtx is timed out and retransmit outstanding data chunks
+// that are not acked or abandoned yet.
+// The caller should hold the lock.
+func (a *Association) getDataPacketsToRetransmit() []*packet {
+ awnd := min32(a.cwnd, a.rwnd)
+ chunks := []*chunkPayloadData{}
+ var bytesToSend int
+ var done bool
+
+ for i := 0; !done; i++ {
+ c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + uint32(i) + 1)
+ if !ok {
+ break // end of pending data
+ }
+
+ if !c.retransmit {
+ continue
+ }
+
+ if i == 0 && int(a.rwnd) < len(c.userData) {
+ // Send it as a zero window probe
+ done = true
+ } else if bytesToSend+len(c.userData) > int(awnd) {
+ break
+ }
+
+ // reset the retransmit flag not to retransmit again before the next
+ // t3-rtx timer fires
+ c.retransmit = false
+ bytesToSend += len(c.userData)
+
+ c.nSent++
+
+ a.checkPartialReliabilityStatus(c)
+
+ a.log.Tracef("[%s] retransmitting tsn=%d ssn=%d sent=%d", a.name, c.tsn, c.streamSequenceNumber, c.nSent)
+
+ chunks = append(chunks, c)
+ }
+
+ return a.bundleDataChunksIntoPackets(chunks)
+}
+
+// generateNextTSN returns the myNextTSN and increases it. The caller should hold the lock.
+// The caller should hold the lock.
+func (a *Association) generateNextTSN() uint32 {
+ tsn := a.myNextTSN
+ a.myNextTSN++
+ return tsn
+}
+
+// generateNextRSN returns the myNextRSN and increases it. The caller should hold the lock.
+// The caller should hold the lock.
+func (a *Association) generateNextRSN() uint32 {
+ rsn := a.myNextRSN
+ a.myNextRSN++
+ return rsn
+}
+
+func (a *Association) createSelectiveAckChunk() *chunkSelectiveAck {
+ sack := &chunkSelectiveAck{}
+ sack.cumulativeTSNAck = a.peerLastTSN
+ sack.advertisedReceiverWindowCredit = a.getMyReceiverWindowCredit()
+ sack.duplicateTSN = a.payloadQueue.popDuplicates()
+ sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks(a.peerLastTSN)
+ return sack
+}
+
+func pack(p *packet) []*packet {
+ return []*packet{p}
+}
+
+func (a *Association) handleChunkStart() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ a.delayedAckTriggered = false
+ a.immediateAckTriggered = false
+}
+
+func (a *Association) handleChunkEnd() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ if a.immediateAckTriggered {
+ // Send SACK now!
+ a.ackState = ackStateImmediate
+ a.ackTimer.stop()
+ a.awakeWriteLoop()
+ } else if a.delayedAckTriggered {
+ // Will send delayed ack in the next ack timeout
+ a.ackState = ackStateDelay
+ a.ackTimer.start()
+ }
+}
+
+func (a *Association) handleChunk(p *packet, c chunk) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ var packets []*packet
+ var err error
+
+ if _, err = c.check(); err != nil {
+ a.log.Errorf("[ %s ] failed validating chunk: %s ", a.name, err)
+ return nil
+ }
+
+ switch c := c.(type) {
+ case *chunkInit:
+ packets, err = a.handleInit(p, c)
+
+ case *chunkInitAck:
+ err = a.handleInitAck(p, c)
+
+ case *chunkAbort:
+ var errStr string
+ for _, e := range c.errorCauses {
+ errStr += fmt.Sprintf("(%s)", e)
+ }
+ return fmt.Errorf("[%s] %w: %s", a.name, errChunk, errStr)
+
+ case *chunkError:
+ var errStr string
+ for _, e := range c.errorCauses {
+ errStr += fmt.Sprintf("(%s)", e)
+ }
+ a.log.Debugf("[%s] Error chunk, with following errors: %s", a.name, errStr)
+
+ case *chunkHeartbeat:
+ packets = a.handleHeartbeat(c)
+
+ case *chunkCookieEcho:
+ packets = a.handleCookieEcho(c)
+
+ case *chunkCookieAck:
+ a.handleCookieAck()
+
+ case *chunkPayloadData:
+ packets = a.handleData(c)
+
+ case *chunkSelectiveAck:
+ err = a.handleSack(c)
+
+ case *chunkReconfig:
+ packets, err = a.handleReconfig(c)
+
+ case *chunkForwardTSN:
+ packets = a.handleForwardTSN(c)
+
+ default:
+ err = errors.Errorf("unhandled chunk type")
+ }
+
+ // Log and return, the only condition that is fatal is a ABORT chunk
+ if err != nil {
+ a.log.Errorf("Failed to handle chunk: %v", err)
+ return nil
+ }
+
+ if len(packets) > 0 {
+ a.controlQueue.pushAll(packets)
+ a.awakeWriteLoop()
+ }
+
+ return nil
+}
+
+func (a *Association) onRetransmissionTimeout(id int, nRtos uint) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ if id == timerT1Init {
+ err := a.sendInit()
+ if err != nil {
+ a.log.Debugf("[%s] failed to retransmit init (nRtos=%d): %v", a.name, nRtos, err)
+ }
+ return
+ }
+
+ if id == timerT1Cookie {
+ err := a.sendCookieEcho()
+ if err != nil {
+ a.log.Debugf("[%s] failed to retransmit cookie-echo (nRtos=%d): %v", a.name, nRtos, err)
+ }
+ return
+ }
+
+ if id == timerT3RTX {
+ a.stats.incT3Timeouts()
+
+ // RFC 4960 sec 6.3.3
+ // E1) For the destination address for which the timer expires, adjust
+ // its ssthresh with rules defined in Section 7.2.3 and set the
+ // cwnd <- MTU.
+ // RFC 4960 sec 7.2.3
+ // When the T3-rtx timer expires on an address, SCTP should perform slow
+ // start by:
+ // ssthresh = max(cwnd/2, 4*MTU)
+ // cwnd = 1*MTU
+
+ a.ssthresh = max32(a.cwnd/2, 4*a.mtu)
+ a.cwnd = a.mtu
+ a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (RTO)",
+ a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
+
+ // RFC 3758 sec 3.5
+ // A5) Any time the T3-rtx timer expires, on any destination, the sender
+ // SHOULD try to advance the "Advanced.Peer.Ack.Point" by following
+ // the procedures outlined in C2 - C5.
+ if a.useForwardTSN {
+ // RFC 3758 Sec 3.5 C2
+ for i := a.advancedPeerTSNAckPoint + 1; ; i++ {
+ c, ok := a.inflightQueue.get(i)
+ if !ok {
+ break
+ }
+ if !c.abandoned() {
+ break
+ }
+ a.advancedPeerTSNAckPoint = i
+ }
+
+ // RFC 3758 Sec 3.5 C3
+ if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
+ a.willSendForwardTSN = true
+ }
+ }
+
+ a.log.Debugf("[%s] T3-rtx timed out: nRtos=%d cwnd=%d ssthresh=%d", a.name, nRtos, a.cwnd, a.ssthresh)
+
+ /*
+ a.log.Debugf(" - advancedPeerTSNAckPoint=%d", a.advancedPeerTSNAckPoint)
+ a.log.Debugf(" - cumulativeTSNAckPoint=%d", a.cumulativeTSNAckPoint)
+ a.inflightQueue.updateSortedKeys()
+ for i, tsn := range a.inflightQueue.sorted {
+ if c, ok := a.inflightQueue.get(tsn); ok {
+ a.log.Debugf(" - [%d] tsn=%d acked=%v abandoned=%v (%v,%v) len=%d",
+ i, c.tsn, c.acked, c.abandoned(), c.beginningFragment, c.endingFragment, len(c.userData))
+ }
+ }
+ */
+
+ a.inflightQueue.markAllToRetrasmit()
+ a.awakeWriteLoop()
+
+ return
+ }
+
+ if id == timerReconfig {
+ a.willRetransmitReconfig = true
+ a.awakeWriteLoop()
+ }
+}
+
+func (a *Association) onRetransmissionFailure(id int) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ if id == timerT1Init {
+ a.log.Errorf("[%s] retransmission failure: T1-init", a.name)
+ a.handshakeCompletedCh <- errors.Errorf("handshake failed (INIT ACK)")
+ return
+ }
+
+ if id == timerT1Cookie {
+ a.log.Errorf("[%s] retransmission failure: T1-cookie", a.name)
+ a.handshakeCompletedCh <- errors.Errorf("handshake failed (COOKIE ECHO)")
+ return
+ }
+
+ if id == timerT3RTX {
+ // T3-rtx timer will not fail by design
+ // Justifications:
+ // * ICE would fail if the connectivity is lost
+ // * WebRTC spec is not clear how this incident should be reported to ULP
+ a.log.Errorf("[%s] retransmission failure: T3-rtx (DATA)", a.name)
+ return
+ }
+}
+
+func (a *Association) onAckTimeout() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ a.log.Tracef("[%s] ack timed out (ackState: %d)", a.name, a.ackState)
+ a.stats.incAckTimeouts()
+
+ a.ackState = ackStateImmediate
+ a.awakeWriteLoop()
+}
+
+// bufferedAmount returns total amount (in bytes) of currently buffered user data.
+// This is used only by testing.
+func (a *Association) bufferedAmount() int {
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+
+ return a.pendingQueue.getNumBytes() + a.inflightQueue.getNumBytes()
+}
+
+// MaxMessageSize returns the maximum message size you can send.
+func (a *Association) MaxMessageSize() uint32 {
+ return atomic.LoadUint32(&a.maxMessageSize)
+}
+
+// SetMaxMessageSize sets the maximum message size you can send.
+func (a *Association) SetMaxMessageSize(maxMsgSize uint32) {
+ atomic.StoreUint32(&a.maxMessageSize, maxMsgSize)
+}
diff --git a/vendor/github.com/pion/sctp/association_stats.go b/vendor/github.com/pion/sctp/association_stats.go
new file mode 100644
index 0000000..4ccb7be
--- /dev/null
+++ b/vendor/github.com/pion/sctp/association_stats.go
@@ -0,0 +1,61 @@
+package sctp
+
+import (
+ "sync/atomic"
+)
+
+type associationStats struct {
+ nDATAs uint64
+ nSACKs uint64
+ nT3Timeouts uint64
+ nAckTimeouts uint64
+ nFastRetrans uint64
+}
+
+func (s *associationStats) incDATAs() {
+ atomic.AddUint64(&s.nDATAs, 1)
+}
+
+func (s *associationStats) getNumDATAs() uint64 {
+ return atomic.LoadUint64(&s.nDATAs)
+}
+
+func (s *associationStats) incSACKs() {
+ atomic.AddUint64(&s.nSACKs, 1)
+}
+
+func (s *associationStats) getNumSACKs() uint64 {
+ return atomic.LoadUint64(&s.nSACKs)
+}
+
+func (s *associationStats) incT3Timeouts() {
+ atomic.AddUint64(&s.nT3Timeouts, 1)
+}
+
+func (s *associationStats) getNumT3Timeouts() uint64 {
+ return atomic.LoadUint64(&s.nT3Timeouts)
+}
+
+func (s *associationStats) incAckTimeouts() {
+ atomic.AddUint64(&s.nAckTimeouts, 1)
+}
+
+func (s *associationStats) getNumAckTimeouts() uint64 {
+ return atomic.LoadUint64(&s.nAckTimeouts)
+}
+
+func (s *associationStats) incFastRetrans() {
+ atomic.AddUint64(&s.nFastRetrans, 1)
+}
+
+func (s *associationStats) getNumFastRetrans() uint64 {
+ return atomic.LoadUint64(&s.nFastRetrans)
+}
+
+func (s *associationStats) reset() {
+ atomic.StoreUint64(&s.nDATAs, 0)
+ atomic.StoreUint64(&s.nSACKs, 0)
+ atomic.StoreUint64(&s.nT3Timeouts, 0)
+ atomic.StoreUint64(&s.nAckTimeouts, 0)
+ atomic.StoreUint64(&s.nFastRetrans, 0)
+}
diff --git a/vendor/github.com/pion/sctp/chunk.go b/vendor/github.com/pion/sctp/chunk.go
new file mode 100644
index 0000000..ec47da1
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk.go
@@ -0,0 +1,9 @@
+package sctp
+
+type chunk interface {
+ unmarshal(raw []byte) error
+ marshal() ([]byte, error)
+ check() (bool, error)
+
+ valueLength() int
+}
diff --git a/vendor/github.com/pion/sctp/chunk_abort.go b/vendor/github.com/pion/sctp/chunk_abort.go
new file mode 100644
index 0000000..9fd4692
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_abort.go
@@ -0,0 +1,88 @@
+package sctp // nolint:dupl
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+Abort represents an SCTP Chunk of type ABORT
+
+The ABORT chunk is sent to the peer of an association to close the
+association. The ABORT chunk may contain Cause Parameters to inform
+the receiver about the reason of the abort. DATA chunks MUST NOT be
+bundled with ABORT. Control chunks (except for INIT, INIT ACK, and
+SHUTDOWN COMPLETE) MAY be bundled with an ABORT, but they MUST be
+placed before the ABORT in the SCTP packet or they will be ignored by
+the receiver.
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 6 |Reserved |T| Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| |
+| zero or more Error Causes |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+type chunkAbort struct {
+ chunkHeader
+ errorCauses []errorCause
+}
+
+func (a *chunkAbort) unmarshal(raw []byte) error {
+ if err := a.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if a.typ != ctAbort {
+ return errors.Errorf("ChunkType is not of type ABORT, actually is %s", a.typ.String())
+ }
+
+ offset := chunkHeaderSize
+ for {
+ if len(raw)-offset < 4 {
+ break
+ }
+
+ e, err := buildErrorCause(raw[offset:])
+ if err != nil {
+ return errors.Wrap(err, "Failed build Abort Chunk")
+ }
+
+ offset += int(e.length())
+ a.errorCauses = append(a.errorCauses, e)
+ }
+ return nil
+}
+
+func (a *chunkAbort) marshal() ([]byte, error) {
+ a.chunkHeader.typ = ctAbort
+ a.flags = 0x00
+ a.raw = []byte{}
+ for _, ec := range a.errorCauses {
+ raw, err := ec.marshal()
+ if err != nil {
+ return nil, err
+ }
+ a.raw = append(a.raw, raw...)
+ }
+ return a.chunkHeader.marshal()
+}
+
+func (a *chunkAbort) check() (abort bool, err error) {
+ return false, nil
+}
+
+// String makes chunkAbort printable
+func (a *chunkAbort) String() string {
+ res := a.chunkHeader.String()
+
+ for _, cause := range a.errorCauses {
+ res += fmt.Sprintf("\n - %s", cause)
+ }
+
+ return res
+}
diff --git a/vendor/github.com/pion/sctp/chunk_cookie_ack.go b/vendor/github.com/pion/sctp/chunk_cookie_ack.go
new file mode 100644
index 0000000..83b6f71
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_cookie_ack.go
@@ -0,0 +1,44 @@
+package sctp
+
+import (
+ "github.com/pkg/errors"
+)
+
+/*
+chunkCookieAck represents an SCTP Chunk of type chunkCookieAck
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 11 |Chunk Flags | Length = 4 |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+type chunkCookieAck struct {
+ chunkHeader
+}
+
+func (c *chunkCookieAck) unmarshal(raw []byte) error {
+ if err := c.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if c.typ != ctCookieAck {
+ return errors.Errorf("ChunkType is not of type COOKIEACK, actually is %s", c.typ.String())
+ }
+
+ return nil
+}
+
+func (c *chunkCookieAck) marshal() ([]byte, error) {
+ c.chunkHeader.typ = ctCookieAck
+ return c.chunkHeader.marshal()
+}
+
+func (c *chunkCookieAck) check() (abort bool, err error) {
+ return false, nil
+}
+
+// String makes chunkCookieAck printable
+func (c *chunkCookieAck) String() string {
+ return c.chunkHeader.String()
+}
diff --git a/vendor/github.com/pion/sctp/chunk_cookie_echo.go b/vendor/github.com/pion/sctp/chunk_cookie_echo.go
new file mode 100644
index 0000000..3f0ed36
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_cookie_echo.go
@@ -0,0 +1,46 @@
+package sctp
+
+import (
+ "github.com/pkg/errors"
+)
+
+/*
+CookieEcho represents an SCTP Chunk of type CookieEcho
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 10 |Chunk Flags | Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Cookie |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+*/
+type chunkCookieEcho struct {
+ chunkHeader
+ cookie []byte
+}
+
+func (c *chunkCookieEcho) unmarshal(raw []byte) error {
+ if err := c.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if c.typ != ctCookieEcho {
+ return errors.Errorf("ChunkType is not of type COOKIEECHO, actually is %s", c.typ.String())
+ }
+ c.cookie = c.raw
+
+ return nil
+}
+
+func (c *chunkCookieEcho) marshal() ([]byte, error) {
+ c.chunkHeader.typ = ctCookieEcho
+ c.chunkHeader.raw = c.cookie
+ return c.chunkHeader.marshal()
+}
+
+func (c *chunkCookieEcho) check() (abort bool, err error) {
+ return false, nil
+}
diff --git a/vendor/github.com/pion/sctp/chunk_error.go b/vendor/github.com/pion/sctp/chunk_error.go
new file mode 100644
index 0000000..eb29324
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_error.go
@@ -0,0 +1,95 @@
+package sctp // nolint:dupl
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+ Operation Error (ERROR) (9)
+
+ An endpoint sends this chunk to its peer endpoint to notify it of
+ certain error conditions. It contains one or more error causes. An
+ Operation Error is not considered fatal in and of itself, but may be
+ used with an ERROR chunk to report a fatal condition. It has the
+ following parameters:
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Type = 9 | Chunk Flags | Length |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ \ \
+ / one or more Error Causes /
+ \ \
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+ Chunk Flags: 8 bits
+
+ Set to 0 on transmit and ignored on receipt.
+
+ Length: 16 bits (unsigned integer)
+
+ Set to the size of the chunk in bytes, including the chunk header
+ and all the Error Cause fields present.
+*/
+type chunkError struct {
+ chunkHeader
+ errorCauses []errorCause
+}
+
+func (a *chunkError) unmarshal(raw []byte) error {
+ if err := a.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if a.typ != ctError {
+ return errors.Errorf("ChunkType is not of type ctError, actually is %s", a.typ.String())
+ }
+
+ offset := chunkHeaderSize
+ for {
+ if len(raw)-offset < 4 {
+ break
+ }
+
+ e, err := buildErrorCause(raw[offset:])
+ if err != nil {
+ return errors.Wrap(err, "Failed build Error Chunk")
+ }
+
+ offset += int(e.length())
+ a.errorCauses = append(a.errorCauses, e)
+ }
+ return nil
+}
+
+func (a *chunkError) marshal() ([]byte, error) {
+ a.chunkHeader.typ = ctError
+ a.flags = 0x00
+ a.raw = []byte{}
+ for _, ec := range a.errorCauses {
+ raw, err := ec.marshal()
+ if err != nil {
+ return nil, err
+ }
+ a.raw = append(a.raw, raw...)
+ }
+ return a.chunkHeader.marshal()
+}
+
+func (a *chunkError) check() (abort bool, err error) {
+ return false, nil
+}
+
+// String makes chunkError printable
+func (a *chunkError) String() string {
+ res := a.chunkHeader.String()
+
+ for _, cause := range a.errorCauses {
+ res += fmt.Sprintf("\n - %s", cause)
+ }
+
+ return res
+}
diff --git a/vendor/github.com/pion/sctp/chunk_forward_tsn.go b/vendor/github.com/pion/sctp/chunk_forward_tsn.go
new file mode 100644
index 0000000..f6ff357
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_forward_tsn.go
@@ -0,0 +1,145 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+// This chunk shall be used by the data sender to inform the data
+// receiver to adjust its cumulative received TSN point forward because
+// some missing TSNs are associated with data chunks that SHOULD NOT be
+// transmitted or retransmitted by the sender.
+//
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Type = 192 | Flags = 0x00 | Length = Variable |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | New Cumulative TSN |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Stream-1 | Stream Sequence-1 |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// \ /
+// / \
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Stream-N | Stream Sequence-N |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+type chunkForwardTSN struct {
+ chunkHeader
+
+ // This indicates the new cumulative TSN to the data receiver. Upon
+ // the reception of this value, the data receiver MUST consider
+ // any missing TSNs earlier than or equal to this value as received,
+ // and stop reporting them as gaps in any subsequent SACKs.
+ newCumulativeTSN uint32
+
+ streams []chunkForwardTSNStream
+}
+
+const (
+ newCumulativeTSNLength = 4
+ forwardTSNStreamLength = 4
+)
+
+var errMarshalStreamFailed = errors.New("failed to marshal stream")
+
+func (c *chunkForwardTSN) unmarshal(raw []byte) error {
+ if err := c.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if len(c.raw) < newCumulativeTSNLength {
+ return errors.New("chunk to short")
+ }
+
+ c.newCumulativeTSN = binary.BigEndian.Uint32(c.raw[0:])
+
+ offset := newCumulativeTSNLength
+ remaining := len(c.raw) - offset
+ for remaining > 0 {
+ s := chunkForwardTSNStream{}
+
+ if err := s.unmarshal(c.raw[offset:]); err != nil {
+ return fmt.Errorf("failed to unmarshal stream: %w", err)
+ }
+
+ c.streams = append(c.streams, s)
+
+ offset += s.length()
+ remaining -= s.length()
+ }
+
+ return nil
+}
+
+func (c *chunkForwardTSN) marshal() ([]byte, error) {
+ out := make([]byte, newCumulativeTSNLength)
+ binary.BigEndian.PutUint32(out[0:], c.newCumulativeTSN)
+
+ for _, s := range c.streams {
+ b, err := s.marshal()
+ if err != nil {
+ return nil, fmt.Errorf("%w: %v", errMarshalStreamFailed, err)
+ }
+ out = append(out, b...)
+ }
+
+ c.typ = ctForwardTSN
+ c.raw = out
+ return c.chunkHeader.marshal()
+}
+
+func (c *chunkForwardTSN) check() (abort bool, err error) {
+ return true, nil
+}
+
+// String makes chunkForwardTSN printable
+func (c *chunkForwardTSN) String() string {
+ res := fmt.Sprintf("New Cumulative TSN: %d\n", c.newCumulativeTSN)
+ for _, s := range c.streams {
+ res += fmt.Sprintf(" - si=%d, ssn=%d\n", s.identifier, s.sequence)
+ }
+ return res
+}
+
+type chunkForwardTSNStream struct {
+ // This field holds a stream number that was skipped by this
+ // FWD-TSN.
+ identifier uint16
+
+ // This field holds the sequence number associated with the stream
+ // that was skipped. The stream sequence field holds the largest
+ // stream sequence number in this stream being skipped. The receiver
+ // of the FWD-TSN's can use the Stream-N and Stream Sequence-N fields
+ // to enable delivery of any stranded TSN's that remain on the stream
+ // re-ordering queues. This field MUST NOT report TSN's corresponding
+ // to DATA chunks that are marked as unordered. For ordered DATA
+ // chunks this field MUST be filled in.
+ sequence uint16
+}
+
+func (s *chunkForwardTSNStream) length() int {
+ return forwardTSNStreamLength
+}
+
+func (s *chunkForwardTSNStream) unmarshal(raw []byte) error {
+ if len(raw) < forwardTSNStreamLength {
+ return errors.New("stream to short")
+ }
+ s.identifier = binary.BigEndian.Uint16(raw[0:])
+ s.sequence = binary.BigEndian.Uint16(raw[2:])
+
+ return nil
+}
+
+func (s *chunkForwardTSNStream) marshal() ([]byte, error) { // nolint:unparam
+ out := make([]byte, forwardTSNStreamLength)
+
+ binary.BigEndian.PutUint16(out[0:], s.identifier)
+ binary.BigEndian.PutUint16(out[2:], s.sequence)
+
+ return out, nil
+}
diff --git a/vendor/github.com/pion/sctp/chunk_heartbeat.go b/vendor/github.com/pion/sctp/chunk_heartbeat.go
new file mode 100644
index 0000000..f2026a4
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_heartbeat.go
@@ -0,0 +1,75 @@
+package sctp
+
+import (
+ "github.com/pkg/errors"
+)
+
+/*
+chunkHeartbeat represents an SCTP Chunk of type HEARTBEAT
+
+An endpoint should send this chunk to its peer endpoint to probe the
+reachability of a particular destination transport address defined in
+the present association.
+
+The parameter field contains the Heartbeat Information, which is a
+variable-length opaque data structure understood only by the sender.
+
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 4 | Chunk Flags | Heartbeat Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| |
+| Heartbeat Information TLV (Variable-Length) |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+Defined as a variable-length parameter using the format described
+in Section 3.2.1, i.e.:
+
+Variable Parameters Status Type Value
+-------------------------------------------------------------
+heartbeat Info Mandatory 1
+
+*/
+type chunkHeartbeat struct {
+ chunkHeader
+ params []param
+}
+
+func (h *chunkHeartbeat) unmarshal(raw []byte) error {
+ if err := h.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ } else if h.typ != ctHeartbeat {
+ return errors.Errorf("ChunkType is not of type HEARTBEAT, actually is %s", h.typ.String())
+ }
+
+ if len(raw) <= chunkHeaderSize {
+ return errors.Errorf("Heartbeat is not long enough to contain Heartbeat Info %d", len(raw))
+ }
+
+ pType, err := parseParamType(raw[chunkHeaderSize:])
+ if err != nil {
+ return errors.Wrap(err, "failed to parse param type")
+ }
+ if pType != heartbeatInfo {
+ return errors.Errorf("Heartbeat should only have HEARTBEAT param, instead have %s", pType.String())
+ }
+
+ p, err := buildParam(pType, raw[chunkHeaderSize:])
+ if err != nil {
+ return errors.Wrap(err, "Failed unmarshalling param in Heartbeat Chunk")
+ }
+ h.params = append(h.params, p)
+
+ return nil
+}
+
+func (h *chunkHeartbeat) Marshal() ([]byte, error) {
+ return nil, errors.Errorf("Unimplemented")
+}
+
+func (h *chunkHeartbeat) check() (abort bool, err error) {
+ return false, nil
+}
diff --git a/vendor/github.com/pion/sctp/chunk_heartbeat_ack.go b/vendor/github.com/pion/sctp/chunk_heartbeat_ack.go
new file mode 100644
index 0000000..ff84090
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_heartbeat_ack.go
@@ -0,0 +1,86 @@
+package sctp
+
+import (
+ "github.com/pkg/errors"
+)
+
+/*
+chunkHeartbeatAck represents an SCTP Chunk of type HEARTBEAT ACK
+
+An endpoint should send this chunk to its peer endpoint as a response
+to a HEARTBEAT chunk (see Section 8.3). A HEARTBEAT ACK is always
+sent to the source IP address of the IP datagram containing the
+HEARTBEAT chunk to which this ack is responding.
+
+The parameter field contains a variable-length opaque data structure.
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 5 | Chunk Flags | Heartbeat Ack Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| |
+| Heartbeat Information TLV (Variable-Length) |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+Defined as a variable-length parameter using the format described
+in Section 3.2.1, i.e.:
+
+Variable Parameters Status Type Value
+-------------------------------------------------------------
+Heartbeat Info Mandatory 1
+
+*/
+type chunkHeartbeatAck struct {
+ chunkHeader
+ params []param
+}
+
+func (h *chunkHeartbeatAck) unmarshal(raw []byte) error {
+ return errors.Errorf("Unimplemented")
+}
+
+func (h *chunkHeartbeatAck) marshal() ([]byte, error) {
+ if len(h.params) != 1 {
+ return nil, errors.Errorf("Heartbeat Ack must have one param")
+ }
+
+ switch h.params[0].(type) {
+ case *paramHeartbeatInfo:
+ // ParamHeartbeatInfo is valid
+ default:
+ return nil, errors.Errorf("Heartbeat Ack must have one param, and it should be a HeartbeatInfo")
+ }
+
+ out := make([]byte, 0)
+ for idx, p := range h.params {
+ pp, err := p.marshal()
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to marshal parameter for Heartbeat Ack")
+ }
+
+ out = append(out, pp...)
+
+ // Chunks (including Type, Length, and Value fields) are padded out
+ // by the sender with all zero bytes to be a multiple of 4 bytes
+ // long. This padding MUST NOT be more than 3 bytes in total. The
+ // Chunk Length value does not include terminating padding of the
+ // chunk. *However, it does include padding of any variable-length
+ // parameter except the last parameter in the chunk.* The receiver
+ // MUST ignore the padding.
+ if idx != len(h.params)-1 {
+ out = padByte(out, getPadding(len(pp)))
+ }
+ }
+
+ h.chunkHeader.typ = ctHeartbeatAck
+ h.chunkHeader.raw = out
+
+ return h.chunkHeader.marshal()
+}
+
+func (h *chunkHeartbeatAck) check() (abort bool, err error) {
+ return false, nil
+}
diff --git a/vendor/github.com/pion/sctp/chunk_init.go b/vendor/github.com/pion/sctp/chunk_init.go
new file mode 100644
index 0000000..65b9b1a
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_init.go
@@ -0,0 +1,123 @@
+package sctp // nolint:dupl
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+Init represents an SCTP Chunk of type INIT
+
+See chunkInitCommon for the fixed headers
+
+Variable Parameters Status Type Value
+-------------------------------------------------------------
+IPv4 IP (Note 1) Optional 5
+IPv6 IP (Note 1) Optional 6
+Cookie Preservative Optional 9
+Reserved for ECN Capable (Note 2) Optional 32768 (0x8000)
+Host Name IP (Note 3) Optional 11
+Supported IP Types (Note 4) Optional 12
+*/
+type chunkInit struct {
+ chunkHeader
+ chunkInitCommon
+}
+
+func (i *chunkInit) unmarshal(raw []byte) error {
+ if err := i.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if i.typ != ctInit {
+ return errors.Errorf("ChunkType is not of type INIT, actually is %s", i.typ.String())
+ } else if len(i.raw) < initChunkMinLength {
+ return errors.Errorf("Chunk Value isn't long enough for mandatory parameters exp: %d actual: %d", initChunkMinLength, len(i.raw))
+ }
+
+ // The Chunk Flags field in INIT is reserved, and all bits in it should
+ // be set to 0 by the sender and ignored by the receiver. The sequence
+ // of parameters within an INIT can be processed in any order.
+ if i.flags != 0 {
+ return errors.New("ChunkType of type INIT flags must be all 0")
+ }
+
+ if err := i.chunkInitCommon.unmarshal(i.raw); err != nil {
+ return errors.Wrap(err, "Failed to unmarshal INIT body")
+ }
+
+ return nil
+}
+
+func (i *chunkInit) marshal() ([]byte, error) {
+ initShared, err := i.chunkInitCommon.marshal()
+ if err != nil {
+ return nil, errors.Wrap(err, "Failed marshaling INIT common data")
+ }
+
+ i.chunkHeader.typ = ctInit
+ i.chunkHeader.raw = initShared
+ return i.chunkHeader.marshal()
+}
+
+func (i *chunkInit) check() (abort bool, err error) {
+ // The receiver of the INIT (the responding end) records the value of
+ // the Initiate Tag parameter. This value MUST be placed into the
+ // Verification Tag field of every SCTP packet that the receiver of
+ // the INIT transmits within this association.
+ //
+ // The Initiate Tag is allowed to have any value except 0. See
+ // Section 5.3.1 for more on the selection of the tag value.
+ //
+ // If the value of the Initiate Tag in a received INIT chunk is found
+ // to be 0, the receiver MUST treat it as an error and close the
+ // association by transmitting an ABORT.
+ if i.initiateTag == 0 {
+ abort = true
+ return abort, errors.New("ChunkType of type INIT InitiateTag must not be 0")
+ }
+
+ // Defines the maximum number of streams the sender of this INIT
+ // chunk allows the peer end to create in this association. The
+ // value 0 MUST NOT be used.
+ //
+ // Note: There is no negotiation of the actual number of streams but
+ // instead the two endpoints will use the min(requested, offered).
+ // See Section 5.1.1 for details.
+ //
+ // Note: A receiver of an INIT with the MIS value of 0 SHOULD abort
+ // the association.
+ if i.numInboundStreams == 0 {
+ abort = true
+ return abort, errors.New("INIT inbound stream request must be > 0")
+ }
+
+ // Defines the number of outbound streams the sender of this INIT
+ // chunk wishes to create in this association. The value of 0 MUST
+ // NOT be used.
+ //
+ // Note: A receiver of an INIT with the OS value set to 0 SHOULD
+ // abort the association.
+
+ if i.numOutboundStreams == 0 {
+ abort = true
+ return abort, errors.New("INIT outbound stream request must be > 0")
+ }
+
+ // An SCTP receiver MUST be able to receive a minimum of 1500 bytes in
+ // one SCTP packet. This means that an SCTP endpoint MUST NOT indicate
+ // less than 1500 bytes in its initial a_rwnd sent in the INIT or INIT
+ // ACK.
+ if i.advertisedReceiverWindowCredit < 1500 {
+ abort = true
+ return abort, errors.New("INIT Advertised Receiver Window Credit (a_rwnd) must be >= 1500")
+ }
+
+ return false, nil
+}
+
+// String makes chunkInit printable
+func (i *chunkInit) String() string {
+ return fmt.Sprintf("%s\n%s", i.chunkHeader, i.chunkInitCommon)
+}
diff --git a/vendor/github.com/pion/sctp/chunk_init_ack.go b/vendor/github.com/pion/sctp/chunk_init_ack.go
new file mode 100644
index 0000000..551bcea
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_init_ack.go
@@ -0,0 +1,126 @@
+package sctp // nolint:dupl
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+chunkInitAck represents an SCTP Chunk of type INIT ACK
+
+See chunkInitCommon for the fixed headers
+
+Variable Parameters Status Type Value
+-------------------------------------------------------------
+State Cookie Mandatory 7
+IPv4 IP (Note 1) Optional 5
+IPv6 IP (Note 1) Optional 6
+Unrecognized Parameter Optional 8
+Reserved for ECN Capable (Note 2) Optional 32768 (0x8000)
+Host Name IP (Note 3) Optional 11<Paste>
+
+*/
+type chunkInitAck struct {
+ chunkHeader
+ chunkInitCommon
+}
+
+func (i *chunkInitAck) unmarshal(raw []byte) error {
+ if err := i.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if i.typ != ctInitAck {
+ return errors.Errorf("ChunkType is not of type INIT ACK, actually is %s", i.typ.String())
+ } else if len(i.raw) < initChunkMinLength {
+ return errors.Errorf("Chunk Value isn't long enough for mandatory parameters exp: %d actual: %d", initChunkMinLength, len(i.raw))
+ }
+
+ // The Chunk Flags field in INIT is reserved, and all bits in it should
+ // be set to 0 by the sender and ignored by the receiver. The sequence
+ // of parameters within an INIT can be processed in any order.
+ if i.flags != 0 {
+ return errors.New("ChunkType of type INIT ACK flags must be all 0")
+ }
+
+ if err := i.chunkInitCommon.unmarshal(i.raw); err != nil {
+ return errors.Wrap(err, "Failed to unmarshal INIT body")
+ }
+
+ return nil
+}
+
+func (i *chunkInitAck) marshal() ([]byte, error) {
+ initShared, err := i.chunkInitCommon.marshal()
+ if err != nil {
+ return nil, errors.Wrap(err, "Failed marshaling INIT common data")
+ }
+
+ i.chunkHeader.typ = ctInitAck
+ i.chunkHeader.raw = initShared
+ return i.chunkHeader.marshal()
+}
+
+func (i *chunkInitAck) check() (abort bool, err error) {
+ // The receiver of the INIT ACK records the value of the Initiate Tag
+ // parameter. This value MUST be placed into the Verification Tag
+ // field of every SCTP packet that the INIT ACK receiver transmits
+ // within this association.
+ //
+ // The Initiate Tag MUST NOT take the value 0. See Section 5.3.1 for
+ // more on the selection of the Initiate Tag value.
+ //
+ // If the value of the Initiate Tag in a received INIT ACK chunk is
+ // found to be 0, the receiver MUST destroy the association
+ // discarding its TCB. The receiver MAY send an ABORT for debugging
+ // purpose.
+ if i.initiateTag == 0 {
+ abort = true
+ return abort, errors.New("ChunkType of type INIT ACK InitiateTag must not be 0")
+ }
+
+ // Defines the maximum number of streams the sender of this INIT ACK
+ // chunk allows the peer end to create in this association. The
+ // value 0 MUST NOT be used.
+ //
+ // Note: There is no negotiation of the actual number of streams but
+ // instead the two endpoints will use the min(requested, offered).
+ // See Section 5.1.1 for details.
+ //
+ // Note: A receiver of an INIT ACK with the MIS value set to 0 SHOULD
+ // destroy the association discarding its TCB.
+ if i.numInboundStreams == 0 {
+ abort = true
+ return abort, errors.New("INIT ACK inbound stream request must be > 0")
+ }
+
+ // Defines the number of outbound streams the sender of this INIT ACK
+ // chunk wishes to create in this association. The value of 0 MUST
+ // NOT be used, and the value MUST NOT be greater than the MIS value
+ // sent in the INIT chunk.
+ //
+ // Note: A receiver of an INIT ACK with the OS value set to 0 SHOULD
+ // destroy the association discarding its TCB.
+
+ if i.numOutboundStreams == 0 {
+ abort = true
+ return abort, errors.New("INIT ACK outbound stream request must be > 0")
+ }
+
+ // An SCTP receiver MUST be able to receive a minimum of 1500 bytes in
+ // one SCTP packet. This means that an SCTP endpoint MUST NOT indicate
+ // less than 1500 bytes in its initial a_rwnd sent in the INIT or INIT
+ // ACK.
+ if i.advertisedReceiverWindowCredit < 1500 {
+ abort = true
+ return abort, errors.New("INIT ACK Advertised Receiver Window Credit (a_rwnd) must be >= 1500")
+ }
+
+ return false, nil
+}
+
+// String makes chunkInitAck printable
+func (i *chunkInitAck) String() string {
+ return fmt.Sprintf("%s\n%s", i.chunkHeader, i.chunkInitCommon)
+}
diff --git a/vendor/github.com/pion/sctp/chunk_init_common.go b/vendor/github.com/pion/sctp/chunk_init_common.go
new file mode 100644
index 0000000..f64be78
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_init_common.go
@@ -0,0 +1,155 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+chunkInitCommon represents an SCTP Chunk body of type INIT and INIT ACK
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 1 | Chunk Flags | Chunk Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Initiate Tag |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Advertised Receiver Window Credit (a_rwnd) |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Number of Outbound Streams | Number of Inbound Streams |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Initial TSN |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| |
+| Optional/Variable-Length Parameters |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+The INIT chunk contains the following parameters. Unless otherwise
+noted, each parameter MUST only be included once in the INIT chunk.
+
+Fixed Parameters Status
+----------------------------------------------
+Initiate Tag Mandatory
+Advertised Receiver Window Credit Mandatory
+Number of Outbound Streams Mandatory
+Number of Inbound Streams Mandatory
+Initial TSN Mandatory
+*/
+
+type chunkInitCommon struct {
+ initiateTag uint32
+ advertisedReceiverWindowCredit uint32
+ numOutboundStreams uint16
+ numInboundStreams uint16
+ initialTSN uint32
+ params []param
+}
+
+const (
+ initChunkMinLength = 16
+ initOptionalVarHeaderLength = 4
+)
+
+func (i *chunkInitCommon) unmarshal(raw []byte) error {
+ i.initiateTag = binary.BigEndian.Uint32(raw[0:])
+ i.advertisedReceiverWindowCredit = binary.BigEndian.Uint32(raw[4:])
+ i.numOutboundStreams = binary.BigEndian.Uint16(raw[8:])
+ i.numInboundStreams = binary.BigEndian.Uint16(raw[10:])
+ i.initialTSN = binary.BigEndian.Uint32(raw[12:])
+
+ // https://tools.ietf.org/html/rfc4960#section-3.2.1
+ //
+ // Chunk values of SCTP control chunks consist of a chunk-type-specific
+ // header of required fields, followed by zero or more parameters. The
+ // optional and variable-length parameters contained in a chunk are
+ // defined in a Type-Length-Value format as shown below.
+ //
+ // 0 1 2 3
+ // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ // | Parameter Type | Parameter Length |
+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ // | |
+ // | Parameter Value |
+ // | |
+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+ offset := initChunkMinLength
+ remaining := len(raw) - offset
+ for remaining > 0 {
+ if remaining > initOptionalVarHeaderLength {
+ pType, err := parseParamType(raw[offset:])
+ if err != nil {
+ return errors.Wrap(err, "failed to parse param type")
+ }
+ p, err := buildParam(pType, raw[offset:])
+ if err != nil {
+ return errors.Wrap(err, "Failed unmarshalling param in Init Chunk")
+ }
+ i.params = append(i.params, p)
+ padding := getPadding(p.length())
+ offset += p.length() + padding
+ remaining -= p.length() + padding
+ } else {
+ break
+ }
+ }
+
+ return nil
+}
+
+func (i *chunkInitCommon) marshal() ([]byte, error) {
+ out := make([]byte, initChunkMinLength)
+ binary.BigEndian.PutUint32(out[0:], i.initiateTag)
+ binary.BigEndian.PutUint32(out[4:], i.advertisedReceiverWindowCredit)
+ binary.BigEndian.PutUint16(out[8:], i.numOutboundStreams)
+ binary.BigEndian.PutUint16(out[10:], i.numInboundStreams)
+ binary.BigEndian.PutUint32(out[12:], i.initialTSN)
+ for idx, p := range i.params {
+ pp, err := p.marshal()
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to marshal parameter for INIT/INITACK")
+ }
+
+ out = append(out, pp...)
+
+ // Chunks (including Type, Length, and Value fields) are padded out
+ // by the sender with all zero bytes to be a multiple of 4 bytes
+ // long. This padding MUST NOT be more than 3 bytes in total. The
+ // Chunk Length value does not include terminating padding of the
+ // chunk. *However, it does include padding of any variable-length
+ // parameter except the last parameter in the chunk.* The receiver
+ // MUST ignore the padding.
+ if idx != len(i.params)-1 {
+ out = padByte(out, getPadding(len(pp)))
+ }
+ }
+
+ return out, nil
+}
+
+// String makes chunkInitCommon printable
+func (i chunkInitCommon) String() string {
+ format := `initiateTag: %d
+ advertisedReceiverWindowCredit: %d
+ numOutboundStreams: %d
+ numInboundStreams: %d
+ initialTSN: %d`
+
+ res := fmt.Sprintf(format,
+ i.initiateTag,
+ i.advertisedReceiverWindowCredit,
+ i.numOutboundStreams,
+ i.numInboundStreams,
+ i.initialTSN,
+ )
+
+ for i, param := range i.params {
+ res += fmt.Sprintf("Param %d:\n %s", i, param)
+ }
+ return res
+}
diff --git a/vendor/github.com/pion/sctp/chunk_payload_data.go b/vendor/github.com/pion/sctp/chunk_payload_data.go
new file mode 100644
index 0000000..3e5e346
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_payload_data.go
@@ -0,0 +1,195 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+ "time"
+)
+
+/*
+chunkPayloadData represents an SCTP Chunk of type DATA
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 0 | Reserved|U|B|E| Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| TSN |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Stream Identifier S | Stream Sequence Number n |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Payload Protocol Identifier |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| |
+| User Data (seq n of Stream S) |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+An unfragmented user message shall have both the B and E bits set to
+'1'. Setting both B and E bits to '0' indicates a middle fragment of
+a multi-fragment user message, as summarized in the following table:
+ B E Description
+============================================================
+| 1 0 | First piece of a fragmented user message |
++----------------------------------------------------------+
+| 0 0 | Middle piece of a fragmented user message |
++----------------------------------------------------------+
+| 0 1 | Last piece of a fragmented user message |
++----------------------------------------------------------+
+| 1 1 | Unfragmented message |
+============================================================
+| Table 1: Fragment Description Flags |
+============================================================
+*/
+type chunkPayloadData struct {
+ chunkHeader
+
+ unordered bool
+ beginningFragment bool
+ endingFragment bool
+ immediateSack bool
+
+ tsn uint32
+ streamIdentifier uint16
+ streamSequenceNumber uint16
+ payloadType PayloadProtocolIdentifier
+ userData []byte
+
+ // Whether this data chunk was acknowledged (received by peer)
+ acked bool
+ missIndicator uint32
+
+ // Partial-reliability parameters used only by sender
+ since time.Time
+ nSent uint32 // number of transmission made for this chunk
+ _abandoned bool
+ _allInflight bool // valid only with the first fragment
+
+ // Retransmission flag set when T1-RTX timeout occurred and this
+ // chunk is still in the inflight queue
+ retransmit bool
+
+ head *chunkPayloadData // link to the head of the fragment
+}
+
+const (
+ payloadDataEndingFragmentBitmask = 1
+ payloadDataBeginingFragmentBitmask = 2
+ payloadDataUnorderedBitmask = 4
+ payloadDataImmediateSACK = 8
+
+ payloadDataHeaderSize = 12
+)
+
+// PayloadProtocolIdentifier is an enum for DataChannel payload types
+type PayloadProtocolIdentifier uint32
+
+// PayloadProtocolIdentifier enums
+// https://www.iana.org/assignments/sctp-parameters/sctp-parameters.xhtml#sctp-parameters-25
+const (
+ PayloadTypeWebRTCDCEP PayloadProtocolIdentifier = 50
+ PayloadTypeWebRTCString PayloadProtocolIdentifier = 51
+ PayloadTypeWebRTCBinary PayloadProtocolIdentifier = 53
+ PayloadTypeWebRTCStringEmpty PayloadProtocolIdentifier = 56
+ PayloadTypeWebRTCBinaryEmpty PayloadProtocolIdentifier = 57
+)
+
+func (p PayloadProtocolIdentifier) String() string {
+ switch p {
+ case PayloadTypeWebRTCDCEP:
+ return "WebRTC DCEP"
+ case PayloadTypeWebRTCString:
+ return "WebRTC String"
+ case PayloadTypeWebRTCBinary:
+ return "WebRTC Binary"
+ case PayloadTypeWebRTCStringEmpty:
+ return "WebRTC String (Empty)"
+ case PayloadTypeWebRTCBinaryEmpty:
+ return "WebRTC Binary (Empty)"
+ default:
+ return fmt.Sprintf("Unknown Payload Protocol Identifier: %d", p)
+ }
+}
+
+func (p *chunkPayloadData) unmarshal(raw []byte) error {
+ if err := p.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ p.immediateSack = p.flags&payloadDataImmediateSACK != 0
+ p.unordered = p.flags&payloadDataUnorderedBitmask != 0
+ p.beginningFragment = p.flags&payloadDataBeginingFragmentBitmask != 0
+ p.endingFragment = p.flags&payloadDataEndingFragmentBitmask != 0
+
+ p.tsn = binary.BigEndian.Uint32(p.raw[0:])
+ p.streamIdentifier = binary.BigEndian.Uint16(p.raw[4:])
+ p.streamSequenceNumber = binary.BigEndian.Uint16(p.raw[6:])
+ p.payloadType = PayloadProtocolIdentifier(binary.BigEndian.Uint32(p.raw[8:]))
+ p.userData = p.raw[payloadDataHeaderSize:]
+
+ return nil
+}
+
+func (p *chunkPayloadData) marshal() ([]byte, error) {
+ payRaw := make([]byte, payloadDataHeaderSize+len(p.userData))
+
+ binary.BigEndian.PutUint32(payRaw[0:], p.tsn)
+ binary.BigEndian.PutUint16(payRaw[4:], p.streamIdentifier)
+ binary.BigEndian.PutUint16(payRaw[6:], p.streamSequenceNumber)
+ binary.BigEndian.PutUint32(payRaw[8:], uint32(p.payloadType))
+ copy(payRaw[payloadDataHeaderSize:], p.userData)
+
+ flags := uint8(0)
+ if p.endingFragment {
+ flags = 1
+ }
+ if p.beginningFragment {
+ flags |= 1 << 1
+ }
+ if p.unordered {
+ flags |= 1 << 2
+ }
+ if p.immediateSack {
+ flags |= 1 << 3
+ }
+
+ p.chunkHeader.flags = flags
+ p.chunkHeader.typ = ctPayloadData
+ p.chunkHeader.raw = payRaw
+ return p.chunkHeader.marshal()
+}
+
+func (p *chunkPayloadData) check() (abort bool, err error) {
+ return false, nil
+}
+
+// String makes chunkPayloadData printable
+func (p *chunkPayloadData) String() string {
+ return fmt.Sprintf("%s\n%d", p.chunkHeader, p.tsn)
+}
+
+func (p *chunkPayloadData) abandoned() bool {
+ if p.head != nil {
+ return p.head._abandoned && p.head._allInflight
+ }
+ return p._abandoned && p._allInflight
+}
+
+func (p *chunkPayloadData) setAbandoned(abandoned bool) {
+ if p.head != nil {
+ p.head._abandoned = abandoned
+ return
+ }
+ p._abandoned = abandoned
+}
+
+func (p *chunkPayloadData) setAllInflight() {
+ if p.endingFragment {
+ if p.head != nil {
+ p.head._allInflight = true
+ } else {
+ p._allInflight = true
+ }
+ }
+}
diff --git a/vendor/github.com/pion/sctp/chunk_reconfig.go b/vendor/github.com/pion/sctp/chunk_reconfig.go
new file mode 100644
index 0000000..a53ad5e
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_reconfig.go
@@ -0,0 +1,99 @@
+package sctp
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+// https://tools.ietf.org/html/rfc6525#section-3.1
+// chunkReconfig represents an SCTP Chunk used to reconfigure streams.
+//
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Type = 130 | Chunk Flags | Chunk Length |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// \ \
+// / Re-configuration Parameter /
+// \ \
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// \ \
+// / Re-configuration Parameter (optional) /
+// \ \
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+type chunkReconfig struct {
+ chunkHeader
+ paramA param
+ paramB param
+}
+
+func (c *chunkReconfig) unmarshal(raw []byte) error {
+ if err := c.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+ pType, err := parseParamType(c.raw)
+ if err != nil {
+ return errors.Wrap(err, "failed to parse param type")
+ }
+ a, err := buildParam(pType, c.raw)
+ if err != nil {
+ return err
+ }
+ c.paramA = a
+
+ padding := getPadding(a.length())
+ offset := a.length() + padding
+ if len(c.raw) > offset {
+ pType, err := parseParamType(c.raw[offset:])
+ if err != nil {
+ return errors.Wrap(err, "failed to parse param type")
+ }
+ b, err := buildParam(pType, c.raw[offset:])
+ if err != nil {
+ return err
+ }
+ c.paramB = b
+ }
+
+ return nil
+}
+
+func (c *chunkReconfig) marshal() ([]byte, error) {
+ out, err := c.paramA.marshal()
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to marshal parameter A for reconfig")
+ }
+ if c.paramB != nil {
+ // Pad param A
+ out = padByte(out, getPadding(len(out)))
+
+ outB, err := c.paramB.marshal()
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to marshal parameter B for reconfig")
+ }
+
+ out = append(out, outB...)
+ }
+
+ c.typ = ctReconfig
+ c.raw = out
+ return c.chunkHeader.marshal()
+}
+
+func (c *chunkReconfig) check() (abort bool, err error) {
+ // nolint:godox
+ // TODO: check allowed combinations:
+ // https://tools.ietf.org/html/rfc6525#section-3.1
+ return true, nil
+}
+
+// String makes chunkReconfig printable
+func (c *chunkReconfig) String() string {
+ res := fmt.Sprintf("Param A:\n %s", c.paramA)
+ if c.paramB != nil {
+ res += fmt.Sprintf("Param B:\n %s", c.paramB)
+ }
+ return res
+}
diff --git a/vendor/github.com/pion/sctp/chunk_selective_ack.go b/vendor/github.com/pion/sctp/chunk_selective_ack.go
new file mode 100644
index 0000000..920d562
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunk_selective_ack.go
@@ -0,0 +1,142 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+chunkSelectiveAck represents an SCTP Chunk of type SACK
+
+This chunk is sent to the peer endpoint to acknowledge received DATA
+chunks and to inform the peer endpoint of gaps in the received
+subsequences of DATA chunks as represented by their TSNs.
+0 1 2 3
+0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Type = 3 |Chunk Flags | Chunk Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Cumulative TSN Ack |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Advertised Receiver Window Credit (a_rwnd) |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Number of Gap Ack Blocks = N | Number of Duplicate TSNs = X |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Gap Ack Block #1 Start | Gap Ack Block #1 End |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ ... \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Gap Ack Block #N Start | Gap Ack Block #N End |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Duplicate TSN 1 |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/ /
+\ ... \
+/ /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Duplicate TSN X |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+
+type gapAckBlock struct {
+ start uint16
+ end uint16
+}
+
+// String makes gapAckBlock printable
+func (g gapAckBlock) String() string {
+ return fmt.Sprintf("%d - %d", g.start, g.end)
+}
+
+type chunkSelectiveAck struct {
+ chunkHeader
+ cumulativeTSNAck uint32
+ advertisedReceiverWindowCredit uint32
+ gapAckBlocks []gapAckBlock
+ duplicateTSN []uint32
+}
+
+const (
+ selectiveAckHeaderSize = 12
+)
+
+func (s *chunkSelectiveAck) unmarshal(raw []byte) error {
+ if err := s.chunkHeader.unmarshal(raw); err != nil {
+ return err
+ }
+
+ if s.typ != ctSack {
+ return errors.Errorf("ChunkType is not of type SACK, actually is %s", s.typ.String())
+ }
+
+ if len(s.raw) < selectiveAckHeaderSize {
+ return errors.Errorf("SACK Chunk size is not large enough to contain header (%v remaining, needs %v bytes)",
+ len(s.raw), selectiveAckHeaderSize)
+ }
+
+ s.cumulativeTSNAck = binary.BigEndian.Uint32(s.raw[0:])
+ s.advertisedReceiverWindowCredit = binary.BigEndian.Uint32(s.raw[4:])
+ s.gapAckBlocks = make([]gapAckBlock, binary.BigEndian.Uint16(s.raw[8:]))
+ s.duplicateTSN = make([]uint32, binary.BigEndian.Uint16(s.raw[10:]))
+
+ if len(s.raw) != selectiveAckHeaderSize+(4*len(s.gapAckBlocks)+(4*len(s.duplicateTSN))) {
+ return errors.Errorf("SACK Chunk size does not match predicted amount from header values")
+ }
+
+ offset := selectiveAckHeaderSize
+ for i := range s.gapAckBlocks {
+ s.gapAckBlocks[i].start = binary.BigEndian.Uint16(s.raw[offset:])
+ s.gapAckBlocks[i].end = binary.BigEndian.Uint16(s.raw[offset+2:])
+ offset += 4
+ }
+ for i := range s.duplicateTSN {
+ s.duplicateTSN[i] = binary.BigEndian.Uint32(s.raw[offset:])
+ offset += 4
+ }
+
+ return nil
+}
+
+func (s *chunkSelectiveAck) marshal() ([]byte, error) {
+ sackRaw := make([]byte, selectiveAckHeaderSize+(4*len(s.gapAckBlocks)+(4*len(s.duplicateTSN))))
+ binary.BigEndian.PutUint32(sackRaw[0:], s.cumulativeTSNAck)
+ binary.BigEndian.PutUint32(sackRaw[4:], s.advertisedReceiverWindowCredit)
+ binary.BigEndian.PutUint16(sackRaw[8:], uint16(len(s.gapAckBlocks)))
+ binary.BigEndian.PutUint16(sackRaw[10:], uint16(len(s.duplicateTSN)))
+ offset := selectiveAckHeaderSize
+ for _, g := range s.gapAckBlocks {
+ binary.BigEndian.PutUint16(sackRaw[offset:], g.start)
+ binary.BigEndian.PutUint16(sackRaw[offset+2:], g.end)
+ offset += 4
+ }
+ for _, t := range s.duplicateTSN {
+ binary.BigEndian.PutUint32(sackRaw[offset:], t)
+ offset += 4
+ }
+
+ s.chunkHeader.typ = ctSack
+ s.chunkHeader.raw = sackRaw
+ return s.chunkHeader.marshal()
+}
+
+func (s *chunkSelectiveAck) check() (abort bool, err error) {
+ return false, nil
+}
+
+// String makes chunkSelectiveAck printable
+func (s *chunkSelectiveAck) String() string {
+ res := fmt.Sprintf("SACK cumTsnAck=%d arwnd=%d dupTsn=%d",
+ s.cumulativeTSNAck,
+ s.advertisedReceiverWindowCredit,
+ s.duplicateTSN)
+
+ for _, gap := range s.gapAckBlocks {
+ res = fmt.Sprintf("%s\n gap ack: %s", res, gap)
+ }
+
+ return res
+}
diff --git a/vendor/github.com/pion/sctp/chunkheader.go b/vendor/github.com/pion/sctp/chunkheader.go
new file mode 100644
index 0000000..a4a78db
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunkheader.go
@@ -0,0 +1,90 @@
+package sctp
+
+import (
+ "encoding/binary"
+
+ "github.com/pkg/errors"
+)
+
+/*
+chunkHeader represents a SCTP Chunk header, defined in https://tools.ietf.org/html/rfc4960#section-3.2
+The figure below illustrates the field format for the chunks to be
+transmitted in the SCTP packet. Each chunk is formatted with a Chunk
+Type field, a chunk-specific Flag field, a Chunk Length field, and a
+Value field.
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Chunk Type | Chunk Flags | Chunk Length |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| |
+| Chunk Value |
+| |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+type chunkHeader struct {
+ typ chunkType
+ flags byte
+ raw []byte
+}
+
+const (
+ chunkHeaderSize = 4
+)
+
+func (c *chunkHeader) unmarshal(raw []byte) error {
+ if len(raw) < chunkHeaderSize {
+ return errors.Errorf("raw only %d bytes, %d is the minimum length for a SCTP chunk", len(raw), chunkHeaderSize)
+ }
+
+ c.typ = chunkType(raw[0])
+ c.flags = raw[1]
+ length := binary.BigEndian.Uint16(raw[2:])
+
+ // Length includes Chunk header
+ valueLength := int(length - chunkHeaderSize)
+ lengthAfterValue := len(raw) - (chunkHeaderSize + valueLength)
+
+ if lengthAfterValue < 0 {
+ return errors.Errorf("Not enough data left in SCTP packet to satisfy requested length remain %d req %d ", valueLength, len(raw)-chunkHeaderSize)
+ } else if lengthAfterValue < 4 {
+ // https://tools.ietf.org/html/rfc4960#section-3.2
+ // The Chunk Length field does not count any chunk padding.
+ // Chunks (including Type, Length, and Value fields) are padded out
+ // by the sender with all zero bytes to be a multiple of 4 bytes
+ // long. This padding MUST NOT be more than 3 bytes in total. The
+ // Chunk Length value does not include terminating padding of the
+ // chunk. However, it does include padding of any variable-length
+ // parameter except the last parameter in the chunk. The receiver
+ // MUST ignore the padding.
+ for i := lengthAfterValue; i > 0; i-- {
+ paddingOffset := chunkHeaderSize + valueLength + (i - 1)
+ if raw[paddingOffset] != 0 {
+ return errors.Errorf("Chunk padding is non-zero at offset %d ", paddingOffset)
+ }
+ }
+ }
+
+ c.raw = raw[chunkHeaderSize : chunkHeaderSize+valueLength]
+ return nil
+}
+
+func (c *chunkHeader) marshal() ([]byte, error) {
+ raw := make([]byte, 4+len(c.raw))
+
+ raw[0] = uint8(c.typ)
+ raw[1] = c.flags
+ binary.BigEndian.PutUint16(raw[2:], uint16(len(c.raw)+chunkHeaderSize))
+ copy(raw[4:], c.raw)
+ return raw, nil
+}
+
+func (c *chunkHeader) valueLength() int {
+ return len(c.raw)
+}
+
+// String makes chunkHeader printable
+func (c chunkHeader) String() string {
+ return c.typ.String()
+}
diff --git a/vendor/github.com/pion/sctp/chunktype.go b/vendor/github.com/pion/sctp/chunktype.go
new file mode 100644
index 0000000..65b57a4
--- /dev/null
+++ b/vendor/github.com/pion/sctp/chunktype.go
@@ -0,0 +1,67 @@
+package sctp
+
+import "fmt"
+
+// chunkType is an enum for SCTP Chunk Type field
+// This field identifies the type of information contained in the
+// Chunk Value field.
+type chunkType uint8
+
+// List of known chunkType enums
+const (
+ ctPayloadData chunkType = 0
+ ctInit chunkType = 1
+ ctInitAck chunkType = 2
+ ctSack chunkType = 3
+ ctHeartbeat chunkType = 4
+ ctHeartbeatAck chunkType = 5
+ ctAbort chunkType = 6
+ ctShutdown chunkType = 7
+ ctShutdownAck chunkType = 8
+ ctError chunkType = 9
+ ctCookieEcho chunkType = 10
+ ctCookieAck chunkType = 11
+ ctCWR chunkType = 13
+ ctShutdownComplete chunkType = 14
+ ctReconfig chunkType = 130
+ ctForwardTSN chunkType = 192
+)
+
+func (c chunkType) String() string {
+ switch c {
+ case ctPayloadData:
+ return "DATA"
+ case ctInit:
+ return "INIT"
+ case ctInitAck:
+ return "INIT-ACK"
+ case ctSack:
+ return "SACK"
+ case ctHeartbeat:
+ return "HEARTBEAT"
+ case ctHeartbeatAck:
+ return "HEARTBEAT-ACK"
+ case ctAbort:
+ return "ABORT"
+ case ctShutdown:
+ return "SHUTDOWN"
+ case ctShutdownAck:
+ return "SHUTDOWN-ACK"
+ case ctError:
+ return "ERROR"
+ case ctCookieEcho:
+ return "COOKIE-ECHO"
+ case ctCookieAck:
+ return "COOKIE-ACK"
+ case ctCWR:
+ return "ECNE" // Explicit Congestion Notification Echo
+ case ctShutdownComplete:
+ return "SHUTDOWN-COMPLETE"
+ case ctReconfig:
+ return "RECONFIG" // Re-configuration
+ case ctForwardTSN:
+ return "FORWARD-TSN"
+ default:
+ return fmt.Sprintf("Unknown ChunkType: %d", c)
+ }
+}
diff --git a/vendor/github.com/pion/sctp/codecov.yml b/vendor/github.com/pion/sctp/codecov.yml
new file mode 100644
index 0000000..085200a
--- /dev/null
+++ b/vendor/github.com/pion/sctp/codecov.yml
@@ -0,0 +1,20 @@
+#
+# DO NOT EDIT THIS FILE
+#
+# It is automatically copied from https://github.com/pion/.goassets repository.
+#
+
+coverage:
+ status:
+ project:
+ default:
+ # Allow decreasing 2% of total coverage to avoid noise.
+ threshold: 2%
+ patch:
+ default:
+ target: 70%
+ only_pulls: true
+
+ignore:
+ - "examples/*"
+ - "examples/**/*"
diff --git a/vendor/github.com/pion/sctp/control_queue.go b/vendor/github.com/pion/sctp/control_queue.go
new file mode 100644
index 0000000..7e13469
--- /dev/null
+++ b/vendor/github.com/pion/sctp/control_queue.go
@@ -0,0 +1,29 @@
+package sctp
+
+// control queue
+
+type controlQueue struct {
+ queue []*packet
+}
+
+func newControlQueue() *controlQueue {
+ return &controlQueue{queue: []*packet{}}
+}
+
+func (q *controlQueue) push(c *packet) {
+ q.queue = append(q.queue, c)
+}
+
+func (q *controlQueue) pushAll(packets []*packet) {
+ q.queue = append(q.queue, packets...)
+}
+
+func (q *controlQueue) popAll() []*packet {
+ packets := q.queue
+ q.queue = []*packet{}
+ return packets
+}
+
+func (q *controlQueue) size() int {
+ return len(q.queue)
+}
diff --git a/vendor/github.com/pion/sctp/error_cause.go b/vendor/github.com/pion/sctp/error_cause.go
new file mode 100644
index 0000000..e94cc5c
--- /dev/null
+++ b/vendor/github.com/pion/sctp/error_cause.go
@@ -0,0 +1,91 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+// errorCauseCode is a cause code that appears in either a ERROR or ABORT chunk
+type errorCauseCode uint16
+
+type errorCause interface {
+ unmarshal([]byte) error
+ marshal() ([]byte, error)
+ length() uint16
+ String() string
+
+ errorCauseCode() errorCauseCode
+}
+
+// buildErrorCause delegates the building of a error cause from raw bytes to the correct structure
+func buildErrorCause(raw []byte) (errorCause, error) {
+ var e errorCause
+
+ c := errorCauseCode(binary.BigEndian.Uint16(raw[0:]))
+ switch c {
+ case invalidMandatoryParameter:
+ e = &errorCauseInvalidMandatoryParameter{}
+ case unrecognizedChunkType:
+ e = &errorCauseUnrecognizedChunkType{}
+ case protocolViolation:
+ e = &errorCauseProtocolViolation{}
+ default:
+ return nil, errors.Errorf("BuildErrorCause does not handle %s", c.String())
+ }
+
+ if err := e.unmarshal(raw); err != nil {
+ return nil, err
+ }
+ return e, nil
+}
+
+const (
+ invalidStreamIdentifier errorCauseCode = 1
+ missingMandatoryParameter errorCauseCode = 2
+ staleCookieError errorCauseCode = 3
+ outOfResource errorCauseCode = 4
+ unresolvableAddress errorCauseCode = 5
+ unrecognizedChunkType errorCauseCode = 6
+ invalidMandatoryParameter errorCauseCode = 7
+ unrecognizedParameters errorCauseCode = 8
+ noUserData errorCauseCode = 9
+ cookieReceivedWhileShuttingDown errorCauseCode = 10
+ restartOfAnAssociationWithNewAddresses errorCauseCode = 11
+ userInitiatedAbort errorCauseCode = 12
+ protocolViolation errorCauseCode = 13
+)
+
+func (e errorCauseCode) String() string {
+ switch e {
+ case invalidStreamIdentifier:
+ return "Invalid Stream Identifier"
+ case missingMandatoryParameter:
+ return "Missing Mandatory Parameter"
+ case staleCookieError:
+ return "Stale Cookie Error"
+ case outOfResource:
+ return "Out Of Resource"
+ case unresolvableAddress:
+ return "Unresolvable IP"
+ case unrecognizedChunkType:
+ return "Unrecognized Chunk Type"
+ case invalidMandatoryParameter:
+ return "Invalid Mandatory Parameter"
+ case unrecognizedParameters:
+ return "Unrecognized Parameters"
+ case noUserData:
+ return "No User Data"
+ case cookieReceivedWhileShuttingDown:
+ return "Cookie Received While Shutting Down"
+ case restartOfAnAssociationWithNewAddresses:
+ return "Restart Of An Association With New Addresses"
+ case userInitiatedAbort:
+ return "User Initiated Abort"
+ case protocolViolation:
+ return "Protocol Violation"
+ default:
+ return fmt.Sprintf("Unknown CauseCode: %d", e)
+ }
+}
diff --git a/vendor/github.com/pion/sctp/error_cause_header.go b/vendor/github.com/pion/sctp/error_cause_header.go
new file mode 100644
index 0000000..1ad6e1d
--- /dev/null
+++ b/vendor/github.com/pion/sctp/error_cause_header.go
@@ -0,0 +1,47 @@
+package sctp
+
+import (
+ "encoding/binary"
+)
+
+// errorCauseHeader represents the shared header that is shared by all error causes
+type errorCauseHeader struct {
+ code errorCauseCode
+ len uint16
+ raw []byte
+}
+
+const (
+ errorCauseHeaderLength = 4
+)
+
+func (e *errorCauseHeader) marshal() ([]byte, error) {
+ e.len = uint16(len(e.raw)) + uint16(errorCauseHeaderLength)
+ raw := make([]byte, e.len)
+ binary.BigEndian.PutUint16(raw[0:], uint16(e.code))
+ binary.BigEndian.PutUint16(raw[2:], e.len)
+ copy(raw[errorCauseHeaderLength:], e.raw)
+
+ return raw, nil
+}
+
+func (e *errorCauseHeader) unmarshal(raw []byte) error {
+ e.code = errorCauseCode(binary.BigEndian.Uint16(raw[0:]))
+ e.len = binary.BigEndian.Uint16(raw[2:])
+ valueLength := e.len - errorCauseHeaderLength
+ e.raw = raw[errorCauseHeaderLength : errorCauseHeaderLength+valueLength]
+ return nil
+}
+
+func (e *errorCauseHeader) length() uint16 {
+ return e.len
+}
+
+func (e *errorCauseHeader) errorCauseCode() errorCauseCode {
+ return e.code
+}
+
+// String makes errorCauseHeader printable
+func (e errorCauseHeader) String() string {
+ return e.code.String()
+}
diff --git a/vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go b/vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go
new file mode 100644
index 0000000..3da8b47
--- /dev/null
+++ b/vendor/github.com/pion/sctp/error_cause_invalid_mandatory_parameter.go
@@ -0,0 +1,19 @@
+package sctp
+
+// errorCauseInvalidMandatoryParameter represents an SCTP error cause
+type errorCauseInvalidMandatoryParameter struct {
+ errorCauseHeader
+}
+
+func (e *errorCauseInvalidMandatoryParameter) marshal() ([]byte, error) {
+ return e.errorCauseHeader.marshal()
+}
+
+func (e *errorCauseInvalidMandatoryParameter) unmarshal(raw []byte) error {
+ return e.errorCauseHeader.unmarshal(raw)
+}
+
+// String makes errorCauseInvalidMandatoryParameter printable
+func (e *errorCauseInvalidMandatoryParameter) String() string {
+ return e.errorCauseHeader.String()
+}
diff --git a/vendor/github.com/pion/sctp/error_cause_protocol_violation.go b/vendor/github.com/pion/sctp/error_cause_protocol_violation.go
new file mode 100644
index 0000000..8b457f4
--- /dev/null
+++ b/vendor/github.com/pion/sctp/error_cause_protocol_violation.go
@@ -0,0 +1,50 @@
+package sctp
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+/*
+ This error cause MAY be included in ABORT chunks that are sent
+ because an SCTP endpoint detects a protocol violation of the peer
+ that is not covered by the error causes described in Section 3.3.10.1
+ to Section 3.3.10.12. An implementation MAY provide additional
+ information specifying what kind of protocol violation has been
+ detected.
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Cause Code=13 | Cause Length=Variable |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ / Additional Information /
+ \ \
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+type errorCauseProtocolViolation struct {
+ errorCauseHeader
+ additionalInformation []byte
+}
+
+func (e *errorCauseProtocolViolation) marshal() ([]byte, error) {
+ e.raw = e.additionalInformation
+ return e.errorCauseHeader.marshal()
+}
+
+func (e *errorCauseProtocolViolation) unmarshal(raw []byte) error {
+ err := e.errorCauseHeader.unmarshal(raw)
+ if err != nil {
+ return errors.Wrap(err, "Unable to unmarshal Protocol Violation error")
+ }
+
+ e.additionalInformation = e.raw
+
+ return nil
+}
+
+// String makes errorCauseProtocolViolation printable
+func (e *errorCauseProtocolViolation) String() string {
+ return fmt.Sprintf("%s: %s", e.errorCauseHeader, e.additionalInformation)
+}
diff --git a/vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go b/vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go
new file mode 100644
index 0000000..fee9a36
--- /dev/null
+++ b/vendor/github.com/pion/sctp/error_cause_unrecognized_chunk_type.go
@@ -0,0 +1,28 @@
+package sctp
+
+// errorCauseUnrecognizedChunkType represents an SCTP error cause
+type errorCauseUnrecognizedChunkType struct {
+ errorCauseHeader
+ unrecognizedChunk []byte
+}
+
+func (e *errorCauseUnrecognizedChunkType) marshal() ([]byte, error) {
+ e.code = unrecognizedChunkType
+ e.errorCauseHeader.raw = e.unrecognizedChunk
+ return e.errorCauseHeader.marshal()
+}
+
+func (e *errorCauseUnrecognizedChunkType) unmarshal(raw []byte) error {
+ err := e.errorCauseHeader.unmarshal(raw)
+ if err != nil {
+ return err
+ }
+
+ e.unrecognizedChunk = e.errorCauseHeader.raw
+ return nil
+}
+
+// String makes errorCauseUnrecognizedChunkType printable
+func (e *errorCauseUnrecognizedChunkType) String() string {
+ return e.errorCauseHeader.String()
+}
diff --git a/vendor/github.com/pion/sctp/go.mod b/vendor/github.com/pion/sctp/go.mod
new file mode 100644
index 0000000..022e2fc
--- /dev/null
+++ b/vendor/github.com/pion/sctp/go.mod
@@ -0,0 +1,14 @@
+module github.com/pion/sctp
+
+require (
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/kr/pretty v0.1.0 // indirect
+ github.com/pion/logging v0.2.2
+ github.com/pion/randutil v0.1.0
+ github.com/pion/transport v0.10.1
+ github.com/pkg/errors v0.9.1
+ github.com/stretchr/testify v1.6.1
+ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
+)
+
+go 1.13
diff --git a/vendor/github.com/pion/sctp/go.sum b/vendor/github.com/pion/sctp/go.sum
new file mode 100644
index 0000000..3744262
--- /dev/null
+++ b/vendor/github.com/pion/sctp/go.sum
@@ -0,0 +1,36 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
+github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
+github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
+github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
+github.com/pion/transport v0.10.1 h1:2W+yJT+0mOQ160ThZYUx5Zp2skzshiNgxrNE9GUfhJM=
+github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4=
+golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/vendor/github.com/pion/sctp/packet.go b/vendor/github.com/pion/sctp/packet.go
new file mode 100644
index 0000000..dcc650b
--- /dev/null
+++ b/vendor/github.com/pion/sctp/packet.go
@@ -0,0 +1,178 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+ "hash/crc32"
+
+ "github.com/pkg/errors"
+)
+
+// Create the crc32 table we'll use for the checksum
+var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) // nolint:gochecknoglobals
+
+// Allocate and zero this data once.
+// We need to use it for the checksum and don't want to allocate/clear each time.
+var fourZeroes [4]byte // nolint:gochecknoglobals
+
+/*
+Packet represents an SCTP packet, defined in https://tools.ietf.org/html/rfc4960#section-3
+An SCTP packet is composed of a common header and chunks. A chunk
+contains either control information or user data.
+
+
+ SCTP Packet Format
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Common Header |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Chunk #1 |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| ... |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Chunk #n |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+ SCTP Common Header Format
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Source Value Number | Destination Value Number |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Verification Tag |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+| Checksum |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+*/
+type packet struct {
+ sourcePort uint16
+ destinationPort uint16
+ verificationTag uint32
+ chunks []chunk
+}
+
+const (
+ packetHeaderSize = 12
+)
+
+func (p *packet) unmarshal(raw []byte) error {
+ if len(raw) < packetHeaderSize {
+ return errors.Errorf("raw only %d bytes, %d is the minimum length for a SCTP packet", len(raw), packetHeaderSize)
+ }
+
+ p.sourcePort = binary.BigEndian.Uint16(raw[0:])
+ p.destinationPort = binary.BigEndian.Uint16(raw[2:])
+ p.verificationTag = binary.BigEndian.Uint32(raw[4:])
+
+ offset := packetHeaderSize
+ for {
+ // Exact match, no more chunks
+ if offset == len(raw) {
+ break
+ } else if offset+chunkHeaderSize > len(raw) {
+ return errors.Errorf("Unable to parse SCTP chunk, not enough data for complete header: offset %d remaining %d", offset, len(raw))
+ }
+
+ var c chunk
+ switch chunkType(raw[offset]) {
+ case ctInit:
+ c = &chunkInit{}
+ case ctInitAck:
+ c = &chunkInitAck{}
+ case ctAbort:
+ c = &chunkAbort{}
+ case ctCookieEcho:
+ c = &chunkCookieEcho{}
+ case ctCookieAck:
+ c = &chunkCookieAck{}
+ case ctHeartbeat:
+ c = &chunkHeartbeat{}
+ case ctPayloadData:
+ c = &chunkPayloadData{}
+ case ctSack:
+ c = &chunkSelectiveAck{}
+ case ctReconfig:
+ c = &chunkReconfig{}
+ case ctForwardTSN:
+ c = &chunkForwardTSN{}
+ case ctError:
+ c = &chunkError{}
+ default:
+ return errors.Errorf("Failed to unmarshal, contains unknown chunk type %s", chunkType(raw[offset]).String())
+ }
+
+ if err := c.unmarshal(raw[offset:]); err != nil {
+ return err
+ }
+
+ p.chunks = append(p.chunks, c)
+ chunkValuePadding := getPadding(c.valueLength())
+ offset += chunkHeaderSize + c.valueLength() + chunkValuePadding
+ }
+ theirChecksum := binary.LittleEndian.Uint32(raw[8:])
+ ourChecksum := generatePacketChecksum(raw)
+ if theirChecksum != ourChecksum {
+ return errors.Errorf("Checksum mismatch theirs: %d ours: %d", theirChecksum, ourChecksum)
+ }
+ return nil
+}
+
+func (p *packet) marshal() ([]byte, error) {
+ raw := make([]byte, packetHeaderSize)
+
+ // Populate static headers
+ // 8-12 is Checksum which will be populated when packet is complete
+ binary.BigEndian.PutUint16(raw[0:], p.sourcePort)
+ binary.BigEndian.PutUint16(raw[2:], p.destinationPort)
+ binary.BigEndian.PutUint32(raw[4:], p.verificationTag)
+
+ // Populate chunks
+ for _, c := range p.chunks {
+ chunkRaw, err := c.marshal()
+ if err != nil {
+ return nil, err
+ }
+ raw = append(raw, chunkRaw...)
+
+ paddingNeeded := getPadding(len(raw))
+ if paddingNeeded != 0 {
+ raw = append(raw, make([]byte, paddingNeeded)...)
+ }
+ }
+
+ // Checksum is already in BigEndian
+ // Using LittleEndian.PutUint32 stops it from being flipped
+ binary.LittleEndian.PutUint32(raw[8:], generatePacketChecksum(raw))
+ return raw, nil
+}
+
+func generatePacketChecksum(raw []byte) (sum uint32) {
+ // Fastest way to do a crc32 without allocating.
+ sum = crc32.Update(sum, castagnoliTable, raw[0:8])
+ sum = crc32.Update(sum, castagnoliTable, fourZeroes[:])
+ sum = crc32.Update(sum, castagnoliTable, raw[12:])
+ return sum
+}
+
+// String makes packet printable
+func (p *packet) String() string {
+ format := `Packet:
+ sourcePort: %d
+ destinationPort: %d
+ verificationTag: %d
+ `
+ res := fmt.Sprintf(format,
+ p.sourcePort,
+ p.destinationPort,
+ p.verificationTag,
+ )
+ for i, chunk := range p.chunks {
+ res += fmt.Sprintf("Chunk %d:\n %s", i, chunk)
+ }
+ return res
+}
diff --git a/vendor/github.com/pion/sctp/param.go b/vendor/github.com/pion/sctp/param.go
new file mode 100644
index 0000000..08e46d1
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param.go
@@ -0,0 +1,35 @@
+package sctp
+
+import (
+ "github.com/pkg/errors"
+)
+
+type param interface {
+ marshal() ([]byte, error)
+ length() int
+}
+
+func buildParam(t paramType, rawParam []byte) (param, error) {
+ switch t {
+ case forwardTSNSupp:
+ return (&paramForwardTSNSupported{}).unmarshal(rawParam)
+ case supportedExt:
+ return (&paramSupportedExtensions{}).unmarshal(rawParam)
+ case random:
+ return (&paramRandom{}).unmarshal(rawParam)
+ case reqHMACAlgo:
+ return (&paramRequestedHMACAlgorithm{}).unmarshal(rawParam)
+ case chunkList:
+ return (&paramChunkList{}).unmarshal(rawParam)
+ case stateCookie:
+ return (&paramStateCookie{}).unmarshal(rawParam)
+ case heartbeatInfo:
+ return (&paramHeartbeatInfo{}).unmarshal(rawParam)
+ case outSSNResetReq:
+ return (&paramOutgoingResetRequest{}).unmarshal(rawParam)
+ case reconfigResp:
+ return (&paramReconfigResponse{}).unmarshal(rawParam)
+ default:
+ return nil, errors.Errorf("Unhandled ParamType %v", t)
+ }
+}
diff --git a/vendor/github.com/pion/sctp/param_chunk_list.go b/vendor/github.com/pion/sctp/param_chunk_list.go
new file mode 100644
index 0000000..4ea484c
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_chunk_list.go
@@ -0,0 +1,28 @@
+package sctp
+
+type paramChunkList struct {
+ paramHeader
+ chunkTypes []chunkType
+}
+
+func (c *paramChunkList) marshal() ([]byte, error) {
+ c.typ = chunkList
+ c.raw = make([]byte, len(c.chunkTypes))
+ for i, t := range c.chunkTypes {
+ c.raw[i] = byte(t)
+ }
+
+ return c.paramHeader.marshal()
+}
+
+func (c *paramChunkList) unmarshal(raw []byte) (param, error) {
+ err := c.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ for _, t := range c.raw {
+ c.chunkTypes = append(c.chunkTypes, chunkType(t))
+ }
+
+ return c, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_forward_tsn_supported.go b/vendor/github.com/pion/sctp/param_forward_tsn_supported.go
new file mode 100644
index 0000000..62de155
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_forward_tsn_supported.go
@@ -0,0 +1,28 @@
+package sctp
+
+// At the initialization of the association, the sender of the INIT or
+// INIT ACK chunk MAY include this OPTIONAL parameter to inform its peer
+// that it is able to support the Forward TSN chunk
+//
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Parameter Type = 49152 | Parameter Length = 4 |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+type paramForwardTSNSupported struct {
+ paramHeader
+}
+
+func (f *paramForwardTSNSupported) marshal() ([]byte, error) {
+ f.typ = forwardTSNSupp
+ f.raw = []byte{}
+ return f.paramHeader.marshal()
+}
+
+func (f *paramForwardTSNSupported) unmarshal(raw []byte) (param, error) {
+ err := f.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ return f, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_heartbeat_info.go b/vendor/github.com/pion/sctp/param_heartbeat_info.go
new file mode 100644
index 0000000..47f64eb
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_heartbeat_info.go
@@ -0,0 +1,21 @@
+package sctp
+
+type paramHeartbeatInfo struct {
+ paramHeader
+ heartbeatInformation []byte
+}
+
+func (h *paramHeartbeatInfo) marshal() ([]byte, error) {
+ h.typ = heartbeatInfo
+ h.raw = h.heartbeatInformation
+ return h.paramHeader.marshal()
+}
+
+func (h *paramHeartbeatInfo) unmarshal(raw []byte) (param, error) {
+ err := h.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ h.heartbeatInformation = h.raw
+ return h, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_outgoing_reset_request.go b/vendor/github.com/pion/sctp/param_outgoing_reset_request.go
new file mode 100644
index 0000000..ceae178
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_outgoing_reset_request.go
@@ -0,0 +1,88 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "errors"
+)
+
+const (
+ paramOutgoingResetRequestStreamIdentifiersOffset = 12
+)
+
+// This parameter is used by the sender to request the reset of some or
+// all outgoing streams.
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Parameter Type = 13 | Parameter Length = 16 + 2 * N |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Re-configuration Request Sequence Number |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Re-configuration Response Sequence Number |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Sender's Last Assigned TSN |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Stream Number 1 (optional) | Stream Number 2 (optional) |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// / ...... /
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Stream Number N-1 (optional) | Stream Number N (optional) |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+type paramOutgoingResetRequest struct {
+ paramHeader
+ // reconfigRequestSequenceNumber is used to identify the request. It is a monotonically
+ // increasing number that is initialized to the same value as the
+ // initial TSN. It is increased by 1 whenever sending a new Re-
+ // configuration Request Parameter.
+ reconfigRequestSequenceNumber uint32
+ // When this Outgoing SSN Reset Request Parameter is sent in response
+ // to an Incoming SSN Reset Request Parameter, this parameter is also
+ // an implicit response to the incoming request. This field then
+ // holds the Re-configuration Request Sequence Number of the incoming
+ // request. In other cases, it holds the next expected
+ // Re-configuration Request Sequence Number minus 1.
+ reconfigResponseSequenceNumber uint32
+ // This value holds the next TSN minus 1 -- in other words, the last
+ // TSN that this sender assigned.
+ senderLastTSN uint32
+ // This optional field, if included, is used to indicate specific
+ // streams that are to be reset. If no streams are listed, then all
+ // streams are to be reset.
+ streamIdentifiers []uint16
+}
+
+var errSSNResetRequestParamTooShort = errors.New("outgoing SSN reset request parameter too short")
+
+func (r *paramOutgoingResetRequest) marshal() ([]byte, error) {
+ r.typ = outSSNResetReq
+ r.raw = make([]byte, paramOutgoingResetRequestStreamIdentifiersOffset+2*len(r.streamIdentifiers))
+ binary.BigEndian.PutUint32(r.raw, r.reconfigRequestSequenceNumber)
+ binary.BigEndian.PutUint32(r.raw[4:], r.reconfigResponseSequenceNumber)
+ binary.BigEndian.PutUint32(r.raw[8:], r.senderLastTSN)
+ for i, sID := range r.streamIdentifiers {
+ binary.BigEndian.PutUint16(r.raw[paramOutgoingResetRequestStreamIdentifiersOffset+2*i:], sID)
+ }
+ return r.paramHeader.marshal()
+}
+
+func (r *paramOutgoingResetRequest) unmarshal(raw []byte) (param, error) {
+ err := r.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ if len(r.raw) < paramOutgoingResetRequestStreamIdentifiersOffset {
+ return nil, errSSNResetRequestParamTooShort
+ }
+ r.reconfigRequestSequenceNumber = binary.BigEndian.Uint32(r.raw)
+ r.reconfigResponseSequenceNumber = binary.BigEndian.Uint32(r.raw[4:])
+ r.senderLastTSN = binary.BigEndian.Uint32(r.raw[8:])
+
+ lim := (len(r.raw) - paramOutgoingResetRequestStreamIdentifiersOffset) / 2
+ r.streamIdentifiers = make([]uint16, lim)
+ for i := 0; i < lim; i++ {
+ r.streamIdentifiers[i] = binary.BigEndian.Uint16(r.raw[paramOutgoingResetRequestStreamIdentifiersOffset+2*i:])
+ }
+
+ return r, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_random.go b/vendor/github.com/pion/sctp/param_random.go
new file mode 100644
index 0000000..dc454b3
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_random.go
@@ -0,0 +1,21 @@
+package sctp
+
+type paramRandom struct {
+ paramHeader
+ randomData []byte
+}
+
+func (r *paramRandom) marshal() ([]byte, error) {
+ r.typ = random
+ r.raw = r.randomData
+ return r.paramHeader.marshal()
+}
+
+func (r *paramRandom) unmarshal(raw []byte) (param, error) {
+ err := r.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ r.randomData = r.raw
+ return r, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_reconfig_response.go b/vendor/github.com/pion/sctp/param_reconfig_response.go
new file mode 100644
index 0000000..d9eab55
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_reconfig_response.go
@@ -0,0 +1,92 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+)
+
+// This parameter is used by the receiver of a Re-configuration Request
+// Parameter to respond to the request.
+//
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Parameter Type = 16 | Parameter Length |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Re-configuration Response Sequence Number |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Result |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Sender's Next TSN (optional) |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Receiver's Next TSN (optional) |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+type paramReconfigResponse struct {
+ paramHeader
+ // This value is copied from the request parameter and is used by the
+ // receiver of the Re-configuration Response Parameter to tie the
+ // response to the request.
+ reconfigResponseSequenceNumber uint32
+ // This value describes the result of the processing of the request.
+ result reconfigResult
+}
+
+type reconfigResult uint32
+
+const (
+ reconfigResultSuccessNOP reconfigResult = 0
+ reconfigResultSuccessPerformed reconfigResult = 1
+ reconfigResultDenied reconfigResult = 2
+ reconfigResultErrorWrongSSN reconfigResult = 3
+ reconfigResultErrorRequestAlreadyInProgress reconfigResult = 4
+ reconfigResultErrorBadSequenceNumber reconfigResult = 5
+ reconfigResultInProgress reconfigResult = 6
+)
+
+var errReconfigRespParamTooShort = errors.New("reconfig response parameter too short")
+
+func (t reconfigResult) String() string {
+ switch t {
+ case reconfigResultSuccessNOP:
+ return "0: Success - Nothing to do"
+ case reconfigResultSuccessPerformed:
+ return "1: Success - Performed"
+ case reconfigResultDenied:
+ return "2: Denied"
+ case reconfigResultErrorWrongSSN:
+ return "3: Error - Wrong SSN"
+ case reconfigResultErrorRequestAlreadyInProgress:
+ return "4: Error - Request already in progress"
+ case reconfigResultErrorBadSequenceNumber:
+ return "5: Error - Bad Sequence Number"
+ case reconfigResultInProgress:
+ return "6: In progress"
+ default:
+ return fmt.Sprintf("Unknown reconfigResult: %d", t)
+ }
+}
+
+func (r *paramReconfigResponse) marshal() ([]byte, error) {
+ r.typ = reconfigResp
+ r.raw = make([]byte, 8)
+ binary.BigEndian.PutUint32(r.raw, r.reconfigResponseSequenceNumber)
+ binary.BigEndian.PutUint32(r.raw[4:], uint32(r.result))
+
+ return r.paramHeader.marshal()
+}
+
+func (r *paramReconfigResponse) unmarshal(raw []byte) (param, error) {
+ err := r.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ if len(r.raw) < 8 {
+ return nil, errReconfigRespParamTooShort
+ }
+ r.reconfigResponseSequenceNumber = binary.BigEndian.Uint32(r.raw)
+ r.result = reconfigResult(binary.BigEndian.Uint32(r.raw[4:]))
+
+ return r, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go b/vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go
new file mode 100644
index 0000000..b520fe3
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_requested_hmac_algorithm.go
@@ -0,0 +1,73 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+type hmacAlgorithm uint16
+
+const (
+ hmacResv1 hmacAlgorithm = 0
+ hmacSHA128 = 1
+ hmacResv2 hmacAlgorithm = 2
+ hmacSHA256 hmacAlgorithm = 3
+)
+
+func (c hmacAlgorithm) String() string {
+ switch c {
+ case hmacResv1:
+ return "HMAC Reserved (0x00)"
+ case hmacSHA128:
+ return "HMAC SHA-128"
+ case hmacResv2:
+ return "HMAC Reserved (0x02)"
+ case hmacSHA256:
+ return "HMAC SHA-256"
+ default:
+ return fmt.Sprintf("Unknown HMAC Algorithm type: %d", c)
+ }
+}
+
+type paramRequestedHMACAlgorithm struct {
+ paramHeader
+ availableAlgorithms []hmacAlgorithm
+}
+
+func (r *paramRequestedHMACAlgorithm) marshal() ([]byte, error) {
+ r.typ = reqHMACAlgo
+ r.raw = make([]byte, len(r.availableAlgorithms)*2)
+ i := 0
+ for _, a := range r.availableAlgorithms {
+ binary.BigEndian.PutUint16(r.raw[i:], uint16(a))
+ i += 2
+ }
+
+ return r.paramHeader.marshal()
+}
+
+func (r *paramRequestedHMACAlgorithm) unmarshal(raw []byte) (param, error) {
+ err := r.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+
+ i := 0
+ for i < len(r.raw) {
+ a := hmacAlgorithm(binary.BigEndian.Uint16(r.raw[i:]))
+ switch a {
+ case hmacSHA128:
+ fallthrough
+ case hmacSHA256:
+ r.availableAlgorithms = append(r.availableAlgorithms, a)
+ default:
+ return nil, errors.Errorf("Invalid algorithm type '%v'", a)
+ }
+
+ i += 2
+ }
+
+ return r, nil
+}
diff --git a/vendor/github.com/pion/sctp/param_state_cookie.go b/vendor/github.com/pion/sctp/param_state_cookie.go
new file mode 100644
index 0000000..9681267
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_state_cookie.go
@@ -0,0 +1,46 @@
+package sctp
+
+import (
+ "crypto/rand"
+ "fmt"
+)
+
+type paramStateCookie struct {
+ paramHeader
+ cookie []byte
+}
+
+func newRandomStateCookie() (*paramStateCookie, error) {
+ randCookie := make([]byte, 32)
+ _, err := rand.Read(randCookie)
+ // crypto/rand.Read returns n == len(b) if and only if err == nil.
+ if err != nil {
+ return nil, err
+ }
+
+ s := &paramStateCookie{
+ cookie: randCookie,
+ }
+
+ return s, nil
+}
+
+func (s *paramStateCookie) marshal() ([]byte, error) {
+ s.typ = stateCookie
+ s.raw = s.cookie
+ return s.paramHeader.marshal()
+}
+
+func (s *paramStateCookie) unmarshal(raw []byte) (param, error) {
+ err := s.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+ s.cookie = s.raw
+ return s, nil
+}
+
+// String makes paramStateCookie printable
+func (s *paramStateCookie) String() string {
+ return fmt.Sprintf("%s: %s", s.paramHeader, s.cookie)
+}
diff --git a/vendor/github.com/pion/sctp/param_supported_extensions.go b/vendor/github.com/pion/sctp/param_supported_extensions.go
new file mode 100644
index 0000000..2935524
--- /dev/null
+++ b/vendor/github.com/pion/sctp/param_supported_extensions.go
@@ -0,0 +1,29 @@
+package sctp
+
+type paramSupportedExtensions struct {
+ paramHeader
+ ChunkTypes []chunkType
+}
+
+func (s *paramSupportedExtensions) marshal() ([]byte, error) {
+ s.typ = supportedExt
+ s.raw = make([]byte, len(s.ChunkTypes))
+ for i, c := range s.ChunkTypes {
+ s.raw[i] = byte(c)
+ }
+
+ return s.paramHeader.marshal()
+}
+
+func (s *paramSupportedExtensions) unmarshal(raw []byte) (param, error) {
+ err := s.paramHeader.unmarshal(raw)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, t := range s.raw {
+ s.ChunkTypes = append(s.ChunkTypes, chunkType(t))
+ }
+
+ return s, nil
+}
diff --git a/vendor/github.com/pion/sctp/paramheader.go b/vendor/github.com/pion/sctp/paramheader.go
new file mode 100644
index 0000000..d88642b
--- /dev/null
+++ b/vendor/github.com/pion/sctp/paramheader.go
@@ -0,0 +1,63 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+type paramHeader struct {
+ typ paramType
+ len int
+ raw []byte
+}
+
+const (
+ paramHeaderLength = 4
+)
+
+func (p *paramHeader) marshal() ([]byte, error) {
+ paramLengthPlusHeader := paramHeaderLength + len(p.raw)
+
+ rawParam := make([]byte, paramLengthPlusHeader)
+ binary.BigEndian.PutUint16(rawParam[0:], uint16(p.typ))
+ binary.BigEndian.PutUint16(rawParam[2:], uint16(paramLengthPlusHeader))
+ copy(rawParam[paramHeaderLength:], p.raw)
+
+ return rawParam, nil
+}
+
+func (p *paramHeader) unmarshal(raw []byte) error {
+ if len(raw) < paramHeaderLength {
+ return errors.New("param header too short")
+ }
+
+ paramLengthPlusHeader := binary.BigEndian.Uint16(raw[2:])
+ if int(paramLengthPlusHeader) < paramHeaderLength {
+ return errors.Errorf("param self reported length (%d) smaller than header length (%d)", int(paramLengthPlusHeader), paramHeaderLength)
+ }
+ if len(raw) < int(paramLengthPlusHeader) {
+ return errors.Errorf("param length (%d) shorter than its self reported length (%d)", len(raw), int(paramLengthPlusHeader))
+ }
+
+ typ, err := parseParamType(raw[0:])
+ if err != nil {
+ return errors.Wrap(err, "failed to parse param type")
+ }
+ p.typ = typ
+ p.raw = raw[paramHeaderLength:paramLengthPlusHeader]
+ p.len = int(paramLengthPlusHeader)
+
+ return nil
+}
+
+func (p *paramHeader) length() int {
+ return p.len
+}
+
+// String makes paramHeader printable
+func (p paramHeader) String() string {
+ return fmt.Sprintf("%s (%d): %s", p.typ, p.len, hex.Dump(p.raw))
+}
diff --git a/vendor/github.com/pion/sctp/paramtype.go b/vendor/github.com/pion/sctp/paramtype.go
new file mode 100644
index 0000000..bb0ee82
--- /dev/null
+++ b/vendor/github.com/pion/sctp/paramtype.go
@@ -0,0 +1,106 @@
+package sctp
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/pkg/errors"
+)
+
+// paramType represents a SCTP INIT/INITACK parameter
+type paramType uint16
+
+const (
+ heartbeatInfo paramType = 1 // Heartbeat Info [RFC4960]
+ ipV4Addr paramType = 5 // IPv4 IP [RFC4960]
+ ipV6Addr paramType = 6 // IPv6 IP [RFC4960]
+ stateCookie paramType = 7 // State Cookie [RFC4960]
+ unrecognizedParam paramType = 8 // Unrecognized Parameters [RFC4960]
+ cookiePreservative paramType = 9 // Cookie Preservative [RFC4960]
+ hostNameAddr paramType = 11 // Host Name IP [RFC4960]
+ supportedAddrTypes paramType = 12 // Supported IP Types [RFC4960]
+ outSSNResetReq paramType = 13 // Outgoing SSN Reset Request Parameter [RFC6525]
+ incSSNResetReq paramType = 14 // Incoming SSN Reset Request Parameter [RFC6525]
+ ssnTSNResetReq paramType = 15 // SSN/TSN Reset Request Parameter [RFC6525]
+ reconfigResp paramType = 16 // Re-configuration Response Parameter [RFC6525]
+ addOutStreamsReq paramType = 17 // Add Outgoing Streams Request Parameter [RFC6525]
+ addIncStreamsReq paramType = 18 // Add Incoming Streams Request Parameter [RFC6525]
+ random paramType = 32770 // Random (0x8002) [RFC4805]
+ chunkList paramType = 32771 // Chunk List (0x8003) [RFC4895]
+ reqHMACAlgo paramType = 32772 // Requested HMAC Algorithm Parameter (0x8004) [RFC4895]
+ padding paramType = 32773 // Padding (0x8005)
+ supportedExt paramType = 32776 // Supported Extensions (0x8008) [RFC5061]
+ forwardTSNSupp paramType = 49152 // Forward TSN supported (0xC000) [RFC3758]
+ addIPAddr paramType = 49153 // Add IP IP (0xC001) [RFC5061]
+ delIPAddr paramType = 49154 // Delete IP IP (0xC002) [RFC5061]
+ errClauseInd paramType = 49155 // Error Cause Indication (0xC003) [RFC5061]
+ setPriAddr paramType = 49156 // Set Primary IP (0xC004) [RFC5061]
+ successInd paramType = 49157 // Success Indication (0xC005) [RFC5061]
+ adaptLayerInd paramType = 49158 // Adaptation Layer Indication (0xC006) [RFC5061]
+)
+
+func parseParamType(raw []byte) (paramType, error) {
+ if len(raw) < 2 {
+ return paramType(0), errors.New("packet to short")
+ }
+ return paramType(binary.BigEndian.Uint16(raw)), nil
+}
+
+func (p paramType) String() string {
+ switch p {
+ case heartbeatInfo:
+ return "Heartbeat Info"
+ case ipV4Addr:
+ return "IPv4 IP"
+ case ipV6Addr:
+ return "IPv6 IP"
+ case stateCookie:
+ return "State Cookie"
+ case unrecognizedParam:
+ return "Unrecognized Parameters"
+ case cookiePreservative:
+ return "Cookie Preservative"
+ case hostNameAddr:
+ return "Host Name IP"
+ case supportedAddrTypes:
+ return "Supported IP Types"
+ case outSSNResetReq:
+ return "Outgoing SSN Reset Request Parameter"
+ case incSSNResetReq:
+ return "Incoming SSN Reset Request Parameter"
+ case ssnTSNResetReq:
+ return "SSN/TSN Reset Request Parameter"
+ case reconfigResp:
+ return "Re-configuration Response Parameter"
+ case addOutStreamsReq:
+ return "Add Outgoing Streams Request Parameter"
+ case addIncStreamsReq:
+ return "Add Incoming Streams Request Parameter"
+ case random:
+ return "Random"
+ case chunkList:
+ return "Chunk List"
+ case reqHMACAlgo:
+ return "Requested HMAC Algorithm Parameter"
+ case padding:
+ return "Padding"
+ case supportedExt:
+ return "Supported Extensions"
+ case forwardTSNSupp:
+ return "Forward TSN supported"
+ case addIPAddr:
+ return "Add IP IP"
+ case delIPAddr:
+ return "Delete IP IP"
+ case errClauseInd:
+ return "Error Cause Indication"
+ case setPriAddr:
+ return "Set Primary IP"
+ case successInd:
+ return "Success Indication"
+ case adaptLayerInd:
+ return "Adaptation Layer Indication"
+ default:
+ return fmt.Sprintf("Unknown ParamType: %d", p)
+ }
+}
diff --git a/vendor/github.com/pion/sctp/payload_queue.go b/vendor/github.com/pion/sctp/payload_queue.go
new file mode 100644
index 0000000..2d1a35a
--- /dev/null
+++ b/vendor/github.com/pion/sctp/payload_queue.go
@@ -0,0 +1,179 @@
+package sctp
+
+import (
+ "fmt"
+ "sort"
+)
+
+type payloadQueue struct {
+ chunkMap map[uint32]*chunkPayloadData
+ sorted []uint32
+ dupTSN []uint32
+ nBytes int
+}
+
+func newPayloadQueue() *payloadQueue {
+ return &payloadQueue{chunkMap: map[uint32]*chunkPayloadData{}}
+}
+
+func (q *payloadQueue) updateSortedKeys() {
+ if q.sorted != nil {
+ return
+ }
+
+ q.sorted = make([]uint32, len(q.chunkMap))
+ i := 0
+ for k := range q.chunkMap {
+ q.sorted[i] = k
+ i++
+ }
+
+ sort.Slice(q.sorted, func(i, j int) bool {
+ return sna32LT(q.sorted[i], q.sorted[j])
+ })
+}
+
+func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool {
+ _, ok := q.chunkMap[p.tsn]
+ if ok || sna32LTE(p.tsn, cumulativeTSN) {
+ return false
+ }
+ return true
+}
+
+func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) {
+ q.chunkMap[p.tsn] = p
+ q.nBytes += len(p.userData)
+ q.sorted = nil
+}
+
+// push pushes a payload data. If the payload data is already in our queue or
+// older than our cumulativeTSN marker, it will be recored as duplications,
+// which can later be retrieved using popDuplicates.
+func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool {
+ _, ok := q.chunkMap[p.tsn]
+ if ok || sna32LTE(p.tsn, cumulativeTSN) {
+ // Found the packet, log in dups
+ q.dupTSN = append(q.dupTSN, p.tsn)
+ return false
+ }
+
+ q.chunkMap[p.tsn] = p
+ q.nBytes += len(p.userData)
+ q.sorted = nil
+ return true
+}
+
+// pop pops only if the oldest chunk's TSN matches the given TSN.
+func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) {
+ q.updateSortedKeys()
+
+ if len(q.chunkMap) > 0 && tsn == q.sorted[0] {
+ q.sorted = q.sorted[1:]
+ if c, ok := q.chunkMap[tsn]; ok {
+ delete(q.chunkMap, tsn)
+ q.nBytes -= len(c.userData)
+ return c, true
+ }
+ }
+
+ return nil, false
+}
+
+// get returns reference to chunkPayloadData with the given TSN value.
+func (q *payloadQueue) get(tsn uint32) (*chunkPayloadData, bool) {
+ c, ok := q.chunkMap[tsn]
+ return c, ok
+}
+
+// popDuplicates returns an array of TSN values that were found duplicate.
+func (q *payloadQueue) popDuplicates() []uint32 {
+ dups := q.dupTSN
+ q.dupTSN = []uint32{}
+ return dups
+}
+
+func (q *payloadQueue) getGapAckBlocks(cumulativeTSN uint32) (gapAckBlocks []gapAckBlock) {
+ var b gapAckBlock
+
+ if len(q.chunkMap) == 0 {
+ return []gapAckBlock{}
+ }
+
+ q.updateSortedKeys()
+
+ for i, tsn := range q.sorted {
+ if i == 0 {
+ b.start = uint16(tsn - cumulativeTSN)
+ b.end = b.start
+ continue
+ }
+ diff := uint16(tsn - cumulativeTSN)
+ if b.end+1 == diff {
+ b.end++
+ } else {
+ gapAckBlocks = append(gapAckBlocks, gapAckBlock{
+ start: b.start,
+ end: b.end,
+ })
+ b.start = diff
+ b.end = diff
+ }
+ }
+
+ gapAckBlocks = append(gapAckBlocks, gapAckBlock{
+ start: b.start,
+ end: b.end,
+ })
+
+ return gapAckBlocks
+}
+
+func (q *payloadQueue) getGapAckBlocksString(cumulativeTSN uint32) string {
+ gapAckBlocks := q.getGapAckBlocks(cumulativeTSN)
+ str := fmt.Sprintf("cumTSN=%d", cumulativeTSN)
+ for _, b := range gapAckBlocks {
+ str += fmt.Sprintf(",%d-%d", b.start, b.end)
+ }
+ return str
+}
+
+func (q *payloadQueue) markAsAcked(tsn uint32) int {
+ var nBytesAcked int
+ if c, ok := q.chunkMap[tsn]; ok {
+ c.acked = true
+ c.retransmit = false
+ nBytesAcked = len(c.userData)
+ q.nBytes -= nBytesAcked
+ c.userData = []byte{}
+ }
+
+ return nBytesAcked
+}
+
+func (q *payloadQueue) getLastTSNReceived() (uint32, bool) {
+ q.updateSortedKeys()
+
+ qlen := len(q.sorted)
+ if qlen == 0 {
+ return 0, false
+ }
+ return q.sorted[qlen-1], true
+}
+
+func (q *payloadQueue) markAllToRetrasmit() {
+ for _, c := range q.chunkMap {
+ if c.acked || c.abandoned() {
+ continue
+ }
+ c.retransmit = true
+ }
+}
+
+func (q *payloadQueue) getNumBytes() int {
+ return q.nBytes
+}
+
+func (q *payloadQueue) size() int {
+ return len(q.chunkMap)
+}
diff --git a/vendor/github.com/pion/sctp/pending_queue.go b/vendor/github.com/pion/sctp/pending_queue.go
new file mode 100644
index 0000000..5f204d3
--- /dev/null
+++ b/vendor/github.com/pion/sctp/pending_queue.go
@@ -0,0 +1,138 @@
+package sctp
+
+import (
+ "github.com/pkg/errors"
+)
+
+// pendingBaseQueue
+
+type pendingBaseQueue struct {
+ queue []*chunkPayloadData
+}
+
+func newPendingBaseQueue() *pendingBaseQueue {
+ return &pendingBaseQueue{queue: []*chunkPayloadData{}}
+}
+
+func (q *pendingBaseQueue) push(c *chunkPayloadData) {
+ q.queue = append(q.queue, c)
+}
+
+func (q *pendingBaseQueue) pop() *chunkPayloadData {
+ if len(q.queue) == 0 {
+ return nil
+ }
+ c := q.queue[0]
+ q.queue = q.queue[1:]
+ return c
+}
+
+func (q *pendingBaseQueue) get(i int) *chunkPayloadData {
+ if len(q.queue) == 0 || i < 0 || i >= len(q.queue) {
+ return nil
+ }
+ return q.queue[i]
+}
+
+func (q *pendingBaseQueue) size() int {
+ return len(q.queue)
+}
+
+// pendingQueue
+
+type pendingQueue struct {
+ unorderedQueue *pendingBaseQueue
+ orderedQueue *pendingBaseQueue
+ nBytes int
+ selected bool
+ unorderedIsSelected bool
+}
+
+var (
+ errUnexpectedChuckPoppedUnordered = errors.New("unexpected chunk popped (unordered)")
+ errUnexpectedChuckPoppedOrdered = errors.New("unexpected chunk popped (ordered)")
+ errUnexpectedQState = errors.New("unexpected q state (should've been selected)")
+)
+
+func newPendingQueue() *pendingQueue {
+ return &pendingQueue{
+ unorderedQueue: newPendingBaseQueue(),
+ orderedQueue: newPendingBaseQueue(),
+ }
+}
+
+func (q *pendingQueue) push(c *chunkPayloadData) {
+ if c.unordered {
+ q.unorderedQueue.push(c)
+ } else {
+ q.orderedQueue.push(c)
+ }
+ q.nBytes += len(c.userData)
+}
+
+func (q *pendingQueue) peek() *chunkPayloadData {
+ if q.selected {
+ if q.unorderedIsSelected {
+ return q.unorderedQueue.get(0)
+ }
+ return q.orderedQueue.get(0)
+ }
+
+ if c := q.unorderedQueue.get(0); c != nil {
+ return c
+ }
+ return q.orderedQueue.get(0)
+}
+
+func (q *pendingQueue) pop(c *chunkPayloadData) error {
+ if q.selected {
+ var popped *chunkPayloadData
+ if q.unorderedIsSelected {
+ popped = q.unorderedQueue.pop()
+ if popped != c {
+ return errUnexpectedChuckPoppedUnordered
+ }
+ } else {
+ popped = q.orderedQueue.pop()
+ if popped != c {
+ return errUnexpectedChuckPoppedOrdered
+ }
+ }
+ if popped.endingFragment {
+ q.selected = false
+ }
+ } else {
+ if !c.beginningFragment {
+ return errUnexpectedQState
+ }
+ if c.unordered {
+ popped := q.unorderedQueue.pop()
+ if popped != c {
+ return errUnexpectedChuckPoppedUnordered
+ }
+ if !popped.endingFragment {
+ q.selected = true
+ q.unorderedIsSelected = true
+ }
+ } else {
+ popped := q.orderedQueue.pop()
+ if popped != c {
+ return errUnexpectedChuckPoppedOrdered
+ }
+ if !popped.endingFragment {
+ q.selected = true
+ q.unorderedIsSelected = false
+ }
+ }
+ }
+ q.nBytes -= len(c.userData)
+ return nil
+}
+
+func (q *pendingQueue) getNumBytes() int {
+ return q.nBytes
+}
+
+func (q *pendingQueue) size() int {
+ return q.unorderedQueue.size() + q.orderedQueue.size()
+}
diff --git a/vendor/github.com/pion/sctp/reassembly_queue.go b/vendor/github.com/pion/sctp/reassembly_queue.go
new file mode 100644
index 0000000..f71d59c
--- /dev/null
+++ b/vendor/github.com/pion/sctp/reassembly_queue.go
@@ -0,0 +1,353 @@
+package sctp
+
+import (
+ "io"
+ "sort"
+ "sync/atomic"
+
+ "github.com/pkg/errors"
+)
+
+func sortChunksByTSN(a []*chunkPayloadData) {
+ sort.Slice(a, func(i, j int) bool {
+ return sna32LT(a[i].tsn, a[j].tsn)
+ })
+}
+
+func sortChunksBySSN(a []*chunkSet) {
+ sort.Slice(a, func(i, j int) bool {
+ return sna16LT(a[i].ssn, a[j].ssn)
+ })
+}
+
+// chunkSet is a set of chunks that share the same SSN
+type chunkSet struct {
+ ssn uint16 // used only with the ordered chunks
+ ppi PayloadProtocolIdentifier
+ chunks []*chunkPayloadData
+}
+
+func newChunkSet(ssn uint16, ppi PayloadProtocolIdentifier) *chunkSet {
+ return &chunkSet{
+ ssn: ssn,
+ ppi: ppi,
+ chunks: []*chunkPayloadData{},
+ }
+}
+
+func (set *chunkSet) push(chunk *chunkPayloadData) bool {
+ // check if dup
+ for _, c := range set.chunks {
+ if c.tsn == chunk.tsn {
+ return false
+ }
+ }
+
+ // append and sort
+ set.chunks = append(set.chunks, chunk)
+ sortChunksByTSN(set.chunks)
+
+ // Check if we now have a complete set
+ complete := set.isComplete()
+ return complete
+}
+
+func (set *chunkSet) isComplete() bool {
+ // Condition for complete set
+ // 0. Has at least one chunk.
+ // 1. Begins with beginningFragment set to true
+ // 2. Ends with endingFragment set to true
+ // 3. TSN monotinically increase by 1 from beginning to end
+
+ // 0.
+ nChunks := len(set.chunks)
+ if nChunks == 0 {
+ return false
+ }
+
+ // 1.
+ if !set.chunks[0].beginningFragment {
+ return false
+ }
+
+ // 2.
+ if !set.chunks[nChunks-1].endingFragment {
+ return false
+ }
+
+ // 3.
+ var lastTSN uint32
+ for i, c := range set.chunks {
+ if i > 0 {
+ // Fragments must have contiguous TSN
+ // From RFC 4960 Section 3.3.1:
+ // When a user message is fragmented into multiple chunks, the TSNs are
+ // used by the receiver to reassemble the message. This means that the
+ // TSNs for each fragment of a fragmented user message MUST be strictly
+ // sequential.
+ if c.tsn != lastTSN+1 {
+ // mid or end fragment is missing
+ return false
+ }
+ }
+
+ lastTSN = c.tsn
+ }
+
+ return true
+}
+
+type reassemblyQueue struct {
+ si uint16
+ nextSSN uint16 // expected SSN for next ordered chunk
+ ordered []*chunkSet
+ unordered []*chunkSet
+ unorderedChunks []*chunkPayloadData
+ nBytes uint64
+}
+
+var errTryAgain = errors.New("try again")
+
+func newReassemblyQueue(si uint16) *reassemblyQueue {
+ // From RFC 4960 Sec 6.5:
+ // The Stream Sequence Number in all the streams MUST start from 0 when
+ // the association is established. Also, when the Stream Sequence
+ // Number reaches the value 65535 the next Stream Sequence Number MUST
+ // be set to 0.
+ return &reassemblyQueue{
+ si: si,
+ nextSSN: 0, // From RFC 4960 Sec 6.5:
+ ordered: make([]*chunkSet, 0),
+ unordered: make([]*chunkSet, 0),
+ }
+}
+
+func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool {
+ var cset *chunkSet
+
+ if chunk.streamIdentifier != r.si {
+ return false
+ }
+
+ if chunk.unordered {
+ // First, insert into unorderedChunks array
+ r.unorderedChunks = append(r.unorderedChunks, chunk)
+ atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData)))
+ sortChunksByTSN(r.unorderedChunks)
+
+ // Scan unorderedChunks that are contiguous (in TSN)
+ cset = r.findCompleteUnorderedChunkSet()
+
+ // If found, append the complete set to the unordered array
+ if cset != nil {
+ r.unordered = append(r.unordered, cset)
+ return true
+ }
+
+ return false
+ }
+
+ // This is an ordered chunk
+
+ if sna16LT(chunk.streamSequenceNumber, r.nextSSN) {
+ return false
+ }
+
+ // Check if a chunkSet with the SSN already exists
+ for _, set := range r.ordered {
+ if set.ssn == chunk.streamSequenceNumber {
+ cset = set
+ break
+ }
+ }
+
+ // If not found, create a new chunkSet
+ if cset == nil {
+ cset = newChunkSet(chunk.streamSequenceNumber, chunk.payloadType)
+ r.ordered = append(r.ordered, cset)
+ if !chunk.unordered {
+ sortChunksBySSN(r.ordered)
+ }
+ }
+
+ atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData)))
+
+ return cset.push(chunk)
+}
+
+func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet {
+ startIdx := -1
+ nChunks := 0
+ var lastTSN uint32
+ var found bool
+
+ for i, c := range r.unorderedChunks {
+ // seek beigining
+ if c.beginningFragment {
+ startIdx = i
+ nChunks = 1
+ lastTSN = c.tsn
+
+ if c.endingFragment {
+ found = true
+ break
+ }
+ continue
+ }
+
+ if startIdx < 0 {
+ continue
+ }
+
+ // Check if contiguous in TSN
+ if c.tsn != lastTSN+1 {
+ startIdx = -1
+ continue
+ }
+
+ lastTSN = c.tsn
+ nChunks++
+
+ if c.endingFragment {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ return nil
+ }
+
+ // Extract the range of chunks
+ var chunks []*chunkPayloadData
+ chunks = append(chunks, r.unorderedChunks[startIdx:startIdx+nChunks]...)
+
+ r.unorderedChunks = append(
+ r.unorderedChunks[:startIdx],
+ r.unorderedChunks[startIdx+nChunks:]...)
+
+ chunkSet := newChunkSet(0, chunks[0].payloadType)
+ chunkSet.chunks = chunks
+
+ return chunkSet
+}
+
+func (r *reassemblyQueue) isReadable() bool {
+ // Check unordered first
+ if len(r.unordered) > 0 {
+ // The chunk sets in r.unordered should all be complete.
+ return true
+ }
+
+ // Check ordered sets
+ if len(r.ordered) > 0 {
+ cset := r.ordered[0]
+ if cset.isComplete() {
+ if sna16LTE(cset.ssn, r.nextSSN) {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (r *reassemblyQueue) read(buf []byte) (int, PayloadProtocolIdentifier, error) {
+ var cset *chunkSet
+ // Check unordered first
+ switch {
+ case len(r.unordered) > 0:
+ cset = r.unordered[0]
+ r.unordered = r.unordered[1:]
+ case len(r.ordered) > 0:
+ // Now, check ordered
+ cset = r.ordered[0]
+ if !cset.isComplete() {
+ return 0, 0, errTryAgain
+ }
+ if sna16GT(cset.ssn, r.nextSSN) {
+ return 0, 0, errTryAgain
+ }
+ r.ordered = r.ordered[1:]
+ if cset.ssn == r.nextSSN {
+ r.nextSSN++
+ }
+ default:
+ return 0, 0, errTryAgain
+ }
+
+ // Concat all fragments into the buffer
+ nWritten := 0
+ ppi := cset.ppi
+ var err error
+ for _, c := range cset.chunks {
+ toCopy := len(c.userData)
+ r.subtractNumBytes(toCopy)
+ if err == nil {
+ n := copy(buf[nWritten:], c.userData)
+ nWritten += n
+ if n < toCopy {
+ err = io.ErrShortBuffer
+ }
+ }
+ }
+
+ return nWritten, ppi, err
+}
+
+func (r *reassemblyQueue) forwardTSNForOrdered(lastSSN uint16) {
+ // Use lastSSN to locate a chunkSet then remove it if the set has
+ // not been complete
+ keep := []*chunkSet{}
+ for _, set := range r.ordered {
+ if sna16LTE(set.ssn, lastSSN) {
+ if !set.isComplete() {
+ // drop the set
+ for _, c := range set.chunks {
+ r.subtractNumBytes(len(c.userData))
+ }
+ continue
+ }
+ }
+ keep = append(keep, set)
+ }
+ r.ordered = keep
+
+ // Finally, forward nextSSN
+ if sna16LTE(r.nextSSN, lastSSN) {
+ r.nextSSN = lastSSN + 1
+ }
+}
+
+func (r *reassemblyQueue) forwardTSNForUnordered(newCumulativeTSN uint32) {
+ // Remove all fragments in the unordered sets that contains chunks
+ // equal to or older than `newCumulativeTSN`.
+ // We know all sets in the r.unordered are complete ones.
+ // Just remove chunks that are equal to or older than newCumulativeTSN
+ // from the unorderedChunks
+ lastIdx := -1
+ for i, c := range r.unorderedChunks {
+ if sna32GT(c.tsn, newCumulativeTSN) {
+ break
+ }
+ lastIdx = i
+ }
+ if lastIdx >= 0 {
+ for _, c := range r.unorderedChunks[0 : lastIdx+1] {
+ r.subtractNumBytes(len(c.userData))
+ }
+ r.unorderedChunks = r.unorderedChunks[lastIdx+1:]
+ }
+}
+
+func (r *reassemblyQueue) subtractNumBytes(nBytes int) {
+ cur := atomic.LoadUint64(&r.nBytes)
+ if int(cur) >= nBytes {
+ atomic.AddUint64(&r.nBytes, -uint64(nBytes))
+ } else {
+ atomic.StoreUint64(&r.nBytes, 0)
+ }
+}
+
+func (r *reassemblyQueue) getNumBytes() int {
+ return int(atomic.LoadUint64(&r.nBytes))
+}
diff --git a/vendor/github.com/pion/sctp/renovate.json b/vendor/github.com/pion/sctp/renovate.json
new file mode 100644
index 0000000..4400fd9
--- /dev/null
+++ b/vendor/github.com/pion/sctp/renovate.json
@@ -0,0 +1,15 @@
+{
+ "extends": [
+ "config:base"
+ ],
+ "postUpdateOptions": [
+ "gomodTidy"
+ ],
+ "commitBody": "Generated by renovateBot",
+ "packageRules": [
+ {
+ "packagePatterns": ["^golang.org/x/"],
+ "schedule": ["on the first day of the month"]
+ }
+ ]
+}
diff --git a/vendor/github.com/pion/sctp/rtx_timer.go b/vendor/github.com/pion/sctp/rtx_timer.go
new file mode 100644
index 0000000..14bc39d
--- /dev/null
+++ b/vendor/github.com/pion/sctp/rtx_timer.go
@@ -0,0 +1,219 @@
+package sctp
+
+import (
+ "math"
+ "sync"
+ "time"
+)
+
+const (
+ rtoInitial float64 = 3.0 * 1000 // msec
+ rtoMin float64 = 1.0 * 1000 // msec
+ rtoMax float64 = 60.0 * 1000 // msec
+ rtoAlpha float64 = 0.125
+ rtoBeta float64 = 0.25
+ maxInitRetrans uint = 8
+ pathMaxRetrans uint = 5
+ noMaxRetrans uint = 0
+)
+
+// rtoManager manages Rtx timeout values.
+// This is an implementation of RFC 4960 sec 6.3.1.
+type rtoManager struct {
+ srtt float64
+ rttvar float64
+ rto float64
+ noUpdate bool
+ mutex sync.RWMutex
+}
+
+// newRTOManager creates a new rtoManager.
+func newRTOManager() *rtoManager {
+ return &rtoManager{
+ rto: rtoInitial,
+ }
+}
+
+// setNewRTT takes a newly measured RTT then adjust the RTO in msec.
+func (m *rtoManager) setNewRTT(rtt float64) float64 {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if m.noUpdate {
+ return m.srtt
+ }
+
+ if m.srtt == 0 {
+ // First measurement
+ m.srtt = rtt
+ m.rttvar = rtt / 2
+ } else {
+ // Subsequent rtt measurement
+ m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
+ m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
+ }
+ m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax)
+ return m.srtt
+}
+
+// getRTO simply returns the current RTO in msec.
+func (m *rtoManager) getRTO() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ return m.rto
+}
+
+// reset resets the RTO variables to the initial values.
+func (m *rtoManager) reset() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if m.noUpdate {
+ return
+ }
+
+ m.srtt = 0
+ m.rttvar = 0
+ m.rto = rtoInitial
+}
+
+// set RTO value for testing
+func (m *rtoManager) setRTO(rto float64, noUpdate bool) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.rto = rto
+ m.noUpdate = noUpdate
+}
+
+// rtxTimerObserver is the inteface to a timer observer.
+// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
+// from within these callbacks.
+type rtxTimerObserver interface {
+ onRetransmissionTimeout(timerID int, n uint)
+ onRetransmissionFailure(timerID int)
+}
+
+// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
+type rtxTimer struct {
+ id int
+ observer rtxTimerObserver
+ maxRetrans uint
+ stopFunc stopTimerLoop
+ closed bool
+ mutex sync.RWMutex
+}
+
+type stopTimerLoop func()
+
+// newRTXTimer creates a new retransmission timer.
+// if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
+// (it will never make onRetransmissionFailure() callback.
+func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer {
+ return &rtxTimer{
+ id: id,
+ observer: observer,
+ maxRetrans: maxRetrans,
+ }
+}
+
+// start starts the timer.
+func (t *rtxTimer) start(rto float64) bool {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ // this timer is already closed
+ if t.closed {
+ return false
+ }
+
+ // this is a noop if the timer is always running
+ if t.stopFunc != nil {
+ return false
+ }
+
+ // Note: rto value is intentionally not capped by RTO.Min to allow
+ // fast timeout for the tests. Non-test code should pass in the
+ // rto generated by rtoManager getRTO() method which caps the
+ // value at RTO.Min or at RTO.Max.
+ var nRtos uint
+
+ cancelCh := make(chan struct{})
+
+ go func() {
+ canceling := false
+
+ for !canceling {
+ timeout := calculateNextTimeout(rto, nRtos)
+ timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)
+
+ select {
+ case <-timer.C:
+ nRtos++
+ if t.maxRetrans == 0 || nRtos <= t.maxRetrans {
+ t.observer.onRetransmissionTimeout(t.id, nRtos)
+ } else {
+ t.stop()
+ t.observer.onRetransmissionFailure(t.id)
+ }
+ case <-cancelCh:
+ canceling = true
+ timer.Stop()
+ }
+ }
+ }()
+
+ t.stopFunc = func() {
+ close(cancelCh)
+ }
+
+ return true
+}
+
+// stop stops the timer.
+func (t *rtxTimer) stop() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ if t.stopFunc != nil {
+ t.stopFunc()
+ t.stopFunc = nil
+ }
+}
+
+// closes the timer. this is similar to stop() but subsequent start() call
+// will fail (the timer is no longer usable)
+func (t *rtxTimer) close() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ if t.stopFunc != nil {
+ t.stopFunc()
+ t.stopFunc = nil
+ }
+
+ t.closed = true
+}
+
+// isRunning tests if the timer is running.
+// Debug purpose only
+func (t *rtxTimer) isRunning() bool {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+
+ return (t.stopFunc != nil)
+}
+
+func calculateNextTimeout(rto float64, nRtos uint) float64 {
+ // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
+ // E2) For the destination address for which the timer expires, set RTO
+ // <- RTO * 2 ("back off the timer"). The maximum value discussed
+ // in rule C7 above (RTO.max) may be used to provide an upper bound
+ // to this doubling operation.
+ if nRtos < 31 {
+ m := 1 << nRtos
+ return math.Min(rto*float64(m), rtoMax)
+ }
+ return rtoMax
+}
diff --git a/vendor/github.com/pion/sctp/sctp.go b/vendor/github.com/pion/sctp/sctp.go
new file mode 100644
index 0000000..e601342
--- /dev/null
+++ b/vendor/github.com/pion/sctp/sctp.go
@@ -0,0 +1,2 @@
+// Package sctp implements the SCTP spec
+package sctp
diff --git a/vendor/github.com/pion/sctp/stream.go b/vendor/github.com/pion/sctp/stream.go
new file mode 100644
index 0000000..8a17b97
--- /dev/null
+++ b/vendor/github.com/pion/sctp/stream.go
@@ -0,0 +1,357 @@
+package sctp
+
+import (
+ "io"
+ "math"
+ "sync"
+
+ "github.com/pion/logging"
+ "github.com/pkg/errors"
+)
+
+const (
+ // ReliabilityTypeReliable is used for reliable transmission
+ ReliabilityTypeReliable byte = 0
+ // ReliabilityTypeRexmit is used for partial reliability by retransmission count
+ ReliabilityTypeRexmit byte = 1
+ // ReliabilityTypeTimed is used for partial reliability by retransmission duration
+ ReliabilityTypeTimed byte = 2
+)
+
+// Stream represents an SCTP stream
+type Stream struct {
+ association *Association
+ lock sync.RWMutex
+ streamIdentifier uint16
+ defaultPayloadType PayloadProtocolIdentifier
+ reassemblyQueue *reassemblyQueue
+ sequenceNumber uint16
+ readNotifier *sync.Cond
+ readErr error
+ writeErr error
+ unordered bool
+ reliabilityType byte
+ reliabilityValue uint32
+ bufferedAmount uint64
+ bufferedAmountLow uint64
+ onBufferedAmountLow func()
+ log logging.LeveledLogger
+ name string
+}
+
+// StreamIdentifier returns the Stream identifier associated to the stream.
+func (s *Stream) StreamIdentifier() uint16 {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+ return s.streamIdentifier
+}
+
+// SetDefaultPayloadType sets the default payload type used by Write.
+func (s *Stream) SetDefaultPayloadType(defaultPayloadType PayloadProtocolIdentifier) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.setDefaultPayloadType(defaultPayloadType)
+}
+
+// setDefaultPayloadType sets the defaultPayloadType. The caller should hold the lock.
+func (s *Stream) setDefaultPayloadType(defaultPayloadType PayloadProtocolIdentifier) {
+ s.defaultPayloadType = defaultPayloadType
+}
+
+// SetReliabilityParams sets reliability parameters for this stream.
+func (s *Stream) SetReliabilityParams(unordered bool, relType byte, relVal uint32) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.setReliabilityParams(unordered, relType, relVal)
+}
+
+// setReliabilityParams sets reliability parameters for this stream.
+// The caller should hold the lock.
+func (s *Stream) setReliabilityParams(unordered bool, relType byte, relVal uint32) {
+ s.log.Debugf("[%s] reliability params: ordered=%v type=%d value=%d",
+ s.name, !unordered, relType, relVal)
+ s.unordered = unordered
+ s.reliabilityType = relType
+ s.reliabilityValue = relVal
+}
+
+// Read reads a packet of len(p) bytes, dropping the Payload Protocol Identifier.
+// Returns EOF when the stream is reset or an error if the stream is closed
+// otherwise.
+func (s *Stream) Read(p []byte) (int, error) {
+ n, _, err := s.ReadSCTP(p)
+ return n, err
+}
+
+// ReadSCTP reads a packet of len(p) bytes and returns the associated Payload
+// Protocol Identifier.
+// Returns EOF when the stream is reset or an error if the stream is closed
+// otherwise.
+func (s *Stream) ReadSCTP(p []byte) (int, PayloadProtocolIdentifier, error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for {
+ n, ppi, err := s.reassemblyQueue.read(p)
+ if err == nil {
+ return n, ppi, nil
+ } else if errors.Is(err, io.ErrShortBuffer) {
+ return 0, PayloadProtocolIdentifier(0), err
+ }
+
+ err = s.readErr
+ if err != nil {
+ return 0, PayloadProtocolIdentifier(0), err
+ }
+
+ s.readNotifier.Wait()
+ }
+}
+
+func (s *Stream) handleData(pd *chunkPayloadData) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ var readable bool
+ if s.reassemblyQueue.push(pd) {
+ readable = s.reassemblyQueue.isReadable()
+ s.log.Debugf("[%s] reassemblyQueue readable=%v", s.name, readable)
+ if readable {
+ s.log.Debugf("[%s] readNotifier.signal()", s.name)
+ s.readNotifier.Signal()
+ s.log.Debugf("[%s] readNotifier.signal() done", s.name)
+ }
+ }
+}
+
+func (s *Stream) handleForwardTSNForOrdered(ssn uint16) {
+ var readable bool
+
+ func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.unordered {
+ return // unordered chunks are handled by handleForwardUnordered method
+ }
+
+ // Remove all chunks older than or equal to the new TSN from
+ // the reassemblyQueue.
+ s.reassemblyQueue.forwardTSNForOrdered(ssn)
+ readable = s.reassemblyQueue.isReadable()
+ }()
+
+ // Notify the reader asynchronously if there's a data chunk to read.
+ if readable {
+ s.readNotifier.Signal()
+ }
+}
+
+func (s *Stream) handleForwardTSNForUnordered(newCumulativeTSN uint32) {
+ var readable bool
+
+ func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if !s.unordered {
+ return // ordered chunks are handled by handleForwardTSNOrdered method
+ }
+
+ // Remove all chunks older than or equal to the new TSN from
+ // the reassemblyQueue.
+ s.reassemblyQueue.forwardTSNForUnordered(newCumulativeTSN)
+ readable = s.reassemblyQueue.isReadable()
+ }()
+
+ // Notify the reader asynchronously if there's a data chunk to read.
+ if readable {
+ s.readNotifier.Signal()
+ }
+}
+
+// Write writes len(p) bytes from p with the default Payload Protocol Identifier
+func (s *Stream) Write(p []byte) (n int, err error) {
+ return s.WriteSCTP(p, s.defaultPayloadType)
+}
+
+// WriteSCTP writes len(p) bytes from p to the DTLS connection
+func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) {
+ maxMessageSize := s.association.MaxMessageSize()
+ if len(p) > int(maxMessageSize) {
+ return 0, errors.Errorf("Outbound packet larger than maximum message size %v", math.MaxUint16)
+ }
+
+ s.lock.RLock()
+ err = s.writeErr
+ s.lock.RUnlock()
+ if err != nil {
+ return 0, err
+ }
+
+ chunks := s.packetize(p, ppi)
+
+ return len(p), s.association.sendPayloadData(chunks)
+}
+
+func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ i := uint32(0)
+ remaining := uint32(len(raw))
+
+ // From draft-ietf-rtcweb-data-protocol-09, section 6:
+ // All Data Channel Establishment Protocol messages MUST be sent using
+ // ordered delivery and reliable transmission.
+ unordered := ppi != PayloadTypeWebRTCDCEP && s.unordered
+
+ var chunks []*chunkPayloadData
+ var head *chunkPayloadData
+ for remaining != 0 {
+ fragmentSize := min32(s.association.maxPayloadSize, remaining)
+
+ // Copy the userdata since we'll have to store it until acked
+ // and the caller may re-use the buffer in the mean time
+ userData := make([]byte, fragmentSize)
+ copy(userData, raw[i:i+fragmentSize])
+
+ chunk := &chunkPayloadData{
+ streamIdentifier: s.streamIdentifier,
+ userData: userData,
+ unordered: unordered,
+ beginningFragment: i == 0,
+ endingFragment: remaining-fragmentSize == 0,
+ immediateSack: false,
+ payloadType: ppi,
+ streamSequenceNumber: s.sequenceNumber,
+ head: head,
+ }
+
+ if head == nil {
+ head = chunk
+ }
+
+ chunks = append(chunks, chunk)
+
+ remaining -= fragmentSize
+ i += fragmentSize
+ }
+
+ // RFC 4960 Sec 6.6
+ // Note: When transmitting ordered and unordered data, an endpoint does
+ // not increment its Stream Sequence Number when transmitting a DATA
+ // chunk with U flag set to 1.
+ if !unordered {
+ s.sequenceNumber++
+ }
+
+ s.bufferedAmount += uint64(len(raw))
+ s.log.Tracef("[%s] bufferedAmount = %d", s.name, s.bufferedAmount)
+
+ return chunks
+}
+
+// Close closes the write-direction of the stream.
+// Future calls to Write are not permitted after calling Close.
+func (s *Stream) Close() error {
+ if sid, isOpen := func() (uint16, bool) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ isOpen := true
+ if s.writeErr == nil {
+ s.writeErr = errors.New("Stream closed")
+ } else {
+ isOpen = false
+ }
+
+ if s.readErr == nil {
+ s.readErr = io.EOF
+ } else {
+ isOpen = false
+ }
+ s.readNotifier.Broadcast() // broadcast regardless
+
+ return s.streamIdentifier, isOpen
+ }(); isOpen {
+ // Reset the outgoing stream
+ // https://tools.ietf.org/html/rfc6525
+ return s.association.sendResetRequest(sid)
+ }
+
+ return nil
+}
+
+// BufferedAmount returns the number of bytes of data currently queued to be sent over this stream.
+func (s *Stream) BufferedAmount() uint64 {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ return s.bufferedAmount
+}
+
+// BufferedAmountLowThreshold returns the number of bytes of buffered outgoing data that is
+// considered "low." Defaults to 0.
+func (s *Stream) BufferedAmountLowThreshold() uint64 {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ return s.bufferedAmountLow
+}
+
+// SetBufferedAmountLowThreshold is used to update the threshold.
+// See BufferedAmountLowThreshold().
+func (s *Stream) SetBufferedAmountLowThreshold(th uint64) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.bufferedAmountLow = th
+}
+
+// OnBufferedAmountLow sets the callback handler which would be called when the number of
+// bytes of outgoing data buffered is lower than the threshold.
+func (s *Stream) OnBufferedAmountLow(f func()) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.onBufferedAmountLow = f
+}
+
+// This method is called by association's readLoop (go-)routine to notify this stream
+// of the specified amount of outgoing data has been delivered to the peer.
+func (s *Stream) onBufferReleased(nBytesReleased int) {
+ if nBytesReleased <= 0 {
+ return
+ }
+
+ s.lock.Lock()
+
+ fromAmount := s.bufferedAmount
+
+ if s.bufferedAmount < uint64(nBytesReleased) {
+ s.bufferedAmount = 0
+ s.log.Errorf("[%s] released buffer size %d should be <= %d",
+ s.name, nBytesReleased, s.bufferedAmount)
+ } else {
+ s.bufferedAmount -= uint64(nBytesReleased)
+ }
+
+ s.log.Tracef("[%s] bufferedAmount = %d", s.name, s.bufferedAmount)
+
+ if s.onBufferedAmountLow != nil && fromAmount > s.bufferedAmountLow && s.bufferedAmount <= s.bufferedAmountLow {
+ f := s.onBufferedAmountLow
+ s.lock.Unlock()
+ f()
+ return
+ }
+
+ s.lock.Unlock()
+}
+
+func (s *Stream) getNumBytesInReassemblyQueue() int {
+ // No lock is required as it reads the size with atomic load function.
+ return s.reassemblyQueue.getNumBytes()
+}
diff --git a/vendor/github.com/pion/sctp/util.go b/vendor/github.com/pion/sctp/util.go
new file mode 100644
index 0000000..e2e54ab
--- /dev/null
+++ b/vendor/github.com/pion/sctp/util.go
@@ -0,0 +1,58 @@
+package sctp
+
+const (
+ paddingMultiple = 4
+)
+
+func getPadding(len int) int {
+ return (paddingMultiple - (len % paddingMultiple)) % paddingMultiple
+}
+
+func padByte(in []byte, cnt int) []byte {
+ if cnt < 0 {
+ cnt = 0
+ }
+ padding := make([]byte, cnt)
+ return append(in, padding...)
+}
+
+// Serial Number Arithmetic (RFC 1982)
+func sna32LT(i1, i2 uint32) bool {
+ return (i1 < i2 && i2-i1 < 1<<31) || (i1 > i2 && i1-i2 > 1<<31)
+}
+
+func sna32LTE(i1, i2 uint32) bool {
+ return i1 == i2 || sna32LT(i1, i2)
+}
+
+func sna32GT(i1, i2 uint32) bool {
+ return (i1 < i2 && (i2-i1) >= 1<<31) || (i1 > i2 && (i1-i2) <= 1<<31)
+}
+
+func sna32GTE(i1, i2 uint32) bool {
+ return i1 == i2 || sna32GT(i1, i2)
+}
+
+func sna32EQ(i1, i2 uint32) bool {
+ return i1 == i2
+}
+
+func sna16LT(i1, i2 uint16) bool {
+ return (i1 < i2 && (i2-i1) < 1<<15) || (i1 > i2 && (i1-i2) > 1<<15)
+}
+
+func sna16LTE(i1, i2 uint16) bool {
+ return i1 == i2 || sna16LT(i1, i2)
+}
+
+func sna16GT(i1, i2 uint16) bool {
+ return (i1 < i2 && (i2-i1) >= 1<<15) || (i1 > i2 && (i1-i2) <= 1<<15)
+}
+
+func sna16GTE(i1, i2 uint16) bool {
+ return i1 == i2 || sna16GT(i1, i2)
+}
+
+func sna16EQ(i1, i2 uint16) bool {
+ return i1 == i2
+}