summaryrefslogtreecommitdiff
path: root/vendor/github.com/xtaci/smux
diff options
context:
space:
mode:
authorkali kaneko (leap communications) <kali@leap.se>2021-11-29 01:46:27 +0100
committerkali kaneko (leap communications) <kali@leap.se>2021-11-29 18:14:16 +0100
commit18f52af5be3a9a0c73811706108f790d65ee9c67 (patch)
treee13cbacb47d56919caa9c44a2b45dec1497a7860 /vendor/github.com/xtaci/smux
parentebcef0d57b6ecb5a40c6579f6be07182dd3033ba (diff)
[pkg] update vendor
Diffstat (limited to 'vendor/github.com/xtaci/smux')
-rw-r--r--vendor/github.com/xtaci/smux/.gitignore24
-rw-r--r--vendor/github.com/xtaci/smux/.travis.yml17
-rw-r--r--vendor/github.com/xtaci/smux/LICENSE21
-rw-r--r--vendor/github.com/xtaci/smux/README.md136
-rw-r--r--vendor/github.com/xtaci/smux/alloc.go72
-rw-r--r--vendor/github.com/xtaci/smux/curve.jpgbin0 -> 106626 bytes
-rw-r--r--vendor/github.com/xtaci/smux/frame.go81
-rw-r--r--vendor/github.com/xtaci/smux/go.mod3
-rw-r--r--vendor/github.com/xtaci/smux/go.sum0
-rw-r--r--vendor/github.com/xtaci/smux/mux.go110
-rw-r--r--vendor/github.com/xtaci/smux/mux.jpgbin0 -> 6199 bytes
-rw-r--r--vendor/github.com/xtaci/smux/session.go525
-rw-r--r--vendor/github.com/xtaci/smux/shaper.go16
-rw-r--r--vendor/github.com/xtaci/smux/smux.pngbin0 -> 9891 bytes
-rw-r--r--vendor/github.com/xtaci/smux/stream.go549
15 files changed, 1554 insertions, 0 deletions
diff --git a/vendor/github.com/xtaci/smux/.gitignore b/vendor/github.com/xtaci/smux/.gitignore
new file mode 100644
index 0000000..daf913b
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/.gitignore
@@ -0,0 +1,24 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
+*.prof
diff --git a/vendor/github.com/xtaci/smux/.travis.yml b/vendor/github.com/xtaci/smux/.travis.yml
new file mode 100644
index 0000000..e1d30fa
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/.travis.yml
@@ -0,0 +1,17 @@
+language: go
+go:
+ - 1.9.x
+ - 1.10.x
+ - 1.11.x
+
+before_install:
+ - go get -t -v ./...
+
+install:
+ - go get github.com/xtaci/smux
+
+script:
+ - go test -coverprofile=coverage.txt -covermode=atomic -bench .
+
+after_success:
+ - bash <(curl -s https://codecov.io/bash)
diff --git a/vendor/github.com/xtaci/smux/LICENSE b/vendor/github.com/xtaci/smux/LICENSE
new file mode 100644
index 0000000..eed41ac
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016-2017 Daniel Fu
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/xtaci/smux/README.md b/vendor/github.com/xtaci/smux/README.md
new file mode 100644
index 0000000..a5c4680
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/README.md
@@ -0,0 +1,136 @@
+<img src="smux.png" alt="smux" height="35px" />
+
+[![GoDoc][1]][2] [![MIT licensed][3]][4] [![Build Status][5]][6] [![Go Report Card][7]][8] [![Coverage Statusd][9]][10] [![Sourcegraph][11]][12]
+
+<img src="mux.jpg" alt="smux" height="120px" />
+
+[1]: https://godoc.org/github.com/xtaci/smux?status.svg
+[2]: https://godoc.org/github.com/xtaci/smux
+[3]: https://img.shields.io/badge/license-MIT-blue.svg
+[4]: LICENSE
+[5]: https://travis-ci.org/xtaci/smux.svg?branch=master
+[6]: https://travis-ci.org/xtaci/smux
+[7]: https://goreportcard.com/badge/github.com/xtaci/smux
+[8]: https://goreportcard.com/report/github.com/xtaci/smux
+[9]: https://codecov.io/gh/xtaci/smux/branch/master/graph/badge.svg
+[10]: https://codecov.io/gh/xtaci/smux
+[11]: https://sourcegraph.com/github.com/xtaci/smux/-/badge.svg
+[12]: https://sourcegraph.com/github.com/xtaci/smux?badge
+
+## Introduction
+
+Smux ( **S**imple **MU**ltiple**X**ing) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or [KCP](https://github.com/xtaci/kcp-go), and provides stream-oriented multiplexing. The original intention of this library is to power the connection management for [kcp-go](https://github.com/xtaci/kcp-go).
+
+## Features
+
+1. ***Token bucket*** controlled receiving, which provides smoother bandwidth graph(see picture below).
+2. Session-wide receive buffer, shared among streams, **fully controlled** overall memory usage.
+3. Minimized header(8Bytes), maximized payload.
+4. Well-tested on millions of devices in [kcptun](https://github.com/xtaci/kcptun).
+5. Builtin fair queue traffic shaping.
+6. Per-stream sliding window to control congestion.(protocol version 2+).
+
+![smooth bandwidth curve](curve.jpg)
+
+## Documentation
+
+For complete documentation, see the associated [Godoc](https://godoc.org/github.com/xtaci/smux).
+
+## Benchmark
+```
+$ go test -v -run=^$ -bench .
+goos: darwin
+goarch: amd64
+pkg: github.com/xtaci/smux
+BenchmarkMSB-4 30000000 51.8 ns/op
+BenchmarkAcceptClose-4 50000 36783 ns/op
+BenchmarkConnSmux-4 30000 58335 ns/op 2246.88 MB/s 1208 B/op 19 allocs/op
+BenchmarkConnTCP-4 50000 25579 ns/op 5124.04 MB/s 0 B/op 0 allocs/op
+PASS
+ok github.com/xtaci/smux 7.811s
+```
+
+## Specification
+
+```
+VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)
+
+VALUES FOR LATEST VERSION:
+VERSION:
+ 1/2
+
+CMD:
+ cmdSYN(0)
+ cmdFIN(1)
+ cmdPSH(2)
+ cmdNOP(3)
+ cmdUPD(4) // only supported on version 2
+
+STREAMID:
+ client use odd numbers starts from 1
+ server use even numbers starts from 0
+
+cmdUPD:
+ | CONSUMED(4B) | WINDOW(4B) |
+```
+
+## Usage
+
+```go
+
+func client() {
+ // Get a TCP connection
+ conn, err := net.Dial(...)
+ if err != nil {
+ panic(err)
+ }
+
+ // Setup client side of smux
+ session, err := smux.Client(conn, nil)
+ if err != nil {
+ panic(err)
+ }
+
+ // Open a new stream
+ stream, err := session.OpenStream()
+ if err != nil {
+ panic(err)
+ }
+
+ // Stream implements io.ReadWriteCloser
+ stream.Write([]byte("ping"))
+ stream.Close()
+ session.Close()
+}
+
+func server() {
+ // Accept a TCP connection
+ conn, err := listener.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ // Setup server side of smux
+ session, err := smux.Server(conn, nil)
+ if err != nil {
+ panic(err)
+ }
+
+ // Accept a stream
+ stream, err := session.AcceptStream()
+ if err != nil {
+ panic(err)
+ }
+
+ // Listen for a message
+ buf := make([]byte, 4)
+ stream.Read(buf)
+ stream.Close()
+ session.Close()
+}
+
+```
+
+## Status
+
+Stable
diff --git a/vendor/github.com/xtaci/smux/alloc.go b/vendor/github.com/xtaci/smux/alloc.go
new file mode 100644
index 0000000..9e3acf3
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/alloc.go
@@ -0,0 +1,72 @@
+package smux
+
+import (
+ "errors"
+ "sync"
+)
+
+var (
+ defaultAllocator *Allocator
+ debruijinPos = [...]byte{0, 9, 1, 10, 13, 21, 2, 29, 11, 14, 16, 18, 22, 25, 3, 30, 8, 12, 20, 28, 15, 17, 24, 7, 19, 27, 23, 6, 26, 5, 4, 31}
+)
+
+func init() {
+ defaultAllocator = NewAllocator()
+}
+
+// Allocator for incoming frames, optimized to prevent overwriting after zeroing
+type Allocator struct {
+ buffers []sync.Pool
+}
+
+// NewAllocator initiates a []byte allocator for frames less than 65536 bytes,
+// the waste(memory fragmentation) of space allocation is guaranteed to be
+// no more than 50%.
+func NewAllocator() *Allocator {
+ alloc := new(Allocator)
+ alloc.buffers = make([]sync.Pool, 17) // 1B -> 64K
+ for k := range alloc.buffers {
+ i := k
+ alloc.buffers[k].New = func() interface{} {
+ return make([]byte, 1<<uint32(i))
+ }
+ }
+ return alloc
+}
+
+// Get a []byte from pool with most appropriate cap
+func (alloc *Allocator) Get(size int) []byte {
+ if size <= 0 || size > 65536 {
+ return nil
+ }
+
+ bits := msb(size)
+ if size == 1<<bits {
+ return alloc.buffers[bits].Get().([]byte)[:size]
+ } else {
+ return alloc.buffers[bits+1].Get().([]byte)[:size]
+ }
+}
+
+// Put returns a []byte to pool for future use,
+// which the cap must be exactly 2^n
+func (alloc *Allocator) Put(buf []byte) error {
+ bits := msb(cap(buf))
+ if cap(buf) == 0 || cap(buf) > 65536 || cap(buf) != 1<<bits {
+ return errors.New("allocator Put() incorrect buffer size")
+ }
+ alloc.buffers[bits].Put(buf)
+ return nil
+}
+
+// msb return the pos of most significiant bit
+// http://supertech.csail.mit.edu/papers/debruijn.pdf
+func msb(size int) byte {
+ v := uint32(size)
+ v |= v >> 1
+ v |= v >> 2
+ v |= v >> 4
+ v |= v >> 8
+ v |= v >> 16
+ return debruijinPos[(v*0x07C4ACDD)>>27]
+}
diff --git a/vendor/github.com/xtaci/smux/curve.jpg b/vendor/github.com/xtaci/smux/curve.jpg
new file mode 100644
index 0000000..3fc4863
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/curve.jpg
Binary files differ
diff --git a/vendor/github.com/xtaci/smux/frame.go b/vendor/github.com/xtaci/smux/frame.go
new file mode 100644
index 0000000..467a058
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/frame.go
@@ -0,0 +1,81 @@
+package smux
+
+import (
+ "encoding/binary"
+ "fmt"
+)
+
+const ( // cmds
+ // protocol version 1:
+ cmdSYN byte = iota // stream open
+ cmdFIN // stream close, a.k.a EOF mark
+ cmdPSH // data push
+ cmdNOP // no operation
+
+ // protocol version 2 extra commands
+ // notify bytes consumed by remote peer-end
+ cmdUPD
+)
+
+const (
+ // data size of cmdUPD, format:
+ // |4B data consumed(ACK)| 4B window size(WINDOW) |
+ szCmdUPD = 8
+)
+
+const (
+ // initial peer window guess, a slow-start
+ initialPeerWindow = 262144
+)
+
+const (
+ sizeOfVer = 1
+ sizeOfCmd = 1
+ sizeOfLength = 2
+ sizeOfSid = 4
+ headerSize = sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength
+)
+
+// Frame defines a packet from or to be multiplexed into a single connection
+type Frame struct {
+ ver byte
+ cmd byte
+ sid uint32
+ data []byte
+}
+
+func newFrame(version byte, cmd byte, sid uint32) Frame {
+ return Frame{ver: version, cmd: cmd, sid: sid}
+}
+
+type rawHeader [headerSize]byte
+
+func (h rawHeader) Version() byte {
+ return h[0]
+}
+
+func (h rawHeader) Cmd() byte {
+ return h[1]
+}
+
+func (h rawHeader) Length() uint16 {
+ return binary.LittleEndian.Uint16(h[2:])
+}
+
+func (h rawHeader) StreamID() uint32 {
+ return binary.LittleEndian.Uint32(h[4:])
+}
+
+func (h rawHeader) String() string {
+ return fmt.Sprintf("Version:%d Cmd:%d StreamID:%d Length:%d",
+ h.Version(), h.Cmd(), h.StreamID(), h.Length())
+}
+
+type updHeader [szCmdUPD]byte
+
+func (h updHeader) Consumed() uint32 {
+ return binary.LittleEndian.Uint32(h[:])
+}
+func (h updHeader) Window() uint32 {
+ return binary.LittleEndian.Uint32(h[4:])
+}
diff --git a/vendor/github.com/xtaci/smux/go.mod b/vendor/github.com/xtaci/smux/go.mod
new file mode 100644
index 0000000..9ddead6
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/go.mod
@@ -0,0 +1,3 @@
+module github.com/xtaci/smux
+
+go 1.13
diff --git a/vendor/github.com/xtaci/smux/go.sum b/vendor/github.com/xtaci/smux/go.sum
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/go.sum
diff --git a/vendor/github.com/xtaci/smux/mux.go b/vendor/github.com/xtaci/smux/mux.go
new file mode 100644
index 0000000..c0b8ab8
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/mux.go
@@ -0,0 +1,110 @@
+// Package smux is a multiplexing library for Golang.
+//
+// It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP,
+// and provides stream-oriented multiplexing over a single channel.
+package smux
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "time"
+)
+
+// Config is used to tune the Smux session
+type Config struct {
+ // SMUX Protocol version, support 1,2
+ Version int
+
+ // Disabled keepalive
+ KeepAliveDisabled bool
+
+ // KeepAliveInterval is how often to send a NOP command to the remote
+ KeepAliveInterval time.Duration
+
+ // KeepAliveTimeout is how long the session
+ // will be closed if no data has arrived
+ KeepAliveTimeout time.Duration
+
+ // MaxFrameSize is used to control the maximum
+ // frame size to sent to the remote
+ MaxFrameSize int
+
+ // MaxReceiveBuffer is used to control the maximum
+ // number of data in the buffer pool
+ MaxReceiveBuffer int
+
+ // MaxStreamBuffer is used to control the maximum
+ // number of data per stream
+ MaxStreamBuffer int
+}
+
+// DefaultConfig is used to return a default configuration
+func DefaultConfig() *Config {
+ return &Config{
+ Version: 1,
+ KeepAliveInterval: 10 * time.Second,
+ KeepAliveTimeout: 30 * time.Second,
+ MaxFrameSize: 32768,
+ MaxReceiveBuffer: 4194304,
+ MaxStreamBuffer: 65536,
+ }
+}
+
+// VerifyConfig is used to verify the sanity of configuration
+func VerifyConfig(config *Config) error {
+ if !(config.Version == 1 || config.Version == 2) {
+ return errors.New("unsupported protocol version")
+ }
+ if !config.KeepAliveDisabled {
+ if config.KeepAliveInterval == 0 {
+ return errors.New("keep-alive interval must be positive")
+ }
+ if config.KeepAliveTimeout < config.KeepAliveInterval {
+ return fmt.Errorf("keep-alive timeout must be larger than keep-alive interval")
+ }
+ }
+ if config.MaxFrameSize <= 0 {
+ return errors.New("max frame size must be positive")
+ }
+ if config.MaxFrameSize > 65535 {
+ return errors.New("max frame size must not be larger than 65535")
+ }
+ if config.MaxReceiveBuffer <= 0 {
+ return errors.New("max receive buffer must be positive")
+ }
+ if config.MaxStreamBuffer <= 0 {
+ return errors.New("max stream buffer must be positive")
+ }
+ if config.MaxStreamBuffer > config.MaxReceiveBuffer {
+ return errors.New("max stream buffer must not be larger than max receive buffer")
+ }
+ if config.MaxStreamBuffer > math.MaxInt32 {
+ return errors.New("max stream buffer cannot be larger than 2147483647")
+ }
+ return nil
+}
+
+// Server is used to initialize a new server-side connection.
+func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) {
+ if config == nil {
+ config = DefaultConfig()
+ }
+ if err := VerifyConfig(config); err != nil {
+ return nil, err
+ }
+ return newSession(config, conn, false), nil
+}
+
+// Client is used to initialize a new client-side connection.
+func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) {
+ if config == nil {
+ config = DefaultConfig()
+ }
+
+ if err := VerifyConfig(config); err != nil {
+ return nil, err
+ }
+ return newSession(config, conn, true), nil
+}
diff --git a/vendor/github.com/xtaci/smux/mux.jpg b/vendor/github.com/xtaci/smux/mux.jpg
new file mode 100644
index 0000000..dde2e11
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/mux.jpg
Binary files differ
diff --git a/vendor/github.com/xtaci/smux/session.go b/vendor/github.com/xtaci/smux/session.go
new file mode 100644
index 0000000..bc56066
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/session.go
@@ -0,0 +1,525 @@
+package smux
+
+import (
+ "container/heap"
+ "encoding/binary"
+ "errors"
+ "io"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ defaultAcceptBacklog = 1024
+)
+
+var (
+ ErrInvalidProtocol = errors.New("invalid protocol")
+ ErrConsumed = errors.New("peer consumed more than sent")
+ ErrGoAway = errors.New("stream id overflows, should start a new connection")
+ ErrTimeout = errors.New("timeout")
+ ErrWouldBlock = errors.New("operation would block on IO")
+)
+
+type writeRequest struct {
+ prio uint64
+ frame Frame
+ result chan writeResult
+}
+
+type writeResult struct {
+ n int
+ err error
+}
+
+type buffersWriter interface {
+ WriteBuffers(v [][]byte) (n int, err error)
+}
+
+// Session defines a multiplexed connection for streams
+type Session struct {
+ conn io.ReadWriteCloser
+
+ config *Config
+ nextStreamID uint32 // next stream identifier
+ nextStreamIDLock sync.Mutex
+
+ bucket int32 // token bucket
+ bucketNotify chan struct{} // used for waiting for tokens
+
+ streams map[uint32]*Stream // all streams in this session
+ streamLock sync.Mutex // locks streams
+
+ die chan struct{} // flag session has died
+ dieOnce sync.Once
+
+ // socket error handling
+ socketReadError atomic.Value
+ socketWriteError atomic.Value
+ chSocketReadError chan struct{}
+ chSocketWriteError chan struct{}
+ socketReadErrorOnce sync.Once
+ socketWriteErrorOnce sync.Once
+
+ // smux protocol errors
+ protoError atomic.Value
+ chProtoError chan struct{}
+ protoErrorOnce sync.Once
+
+ chAccepts chan *Stream
+
+ dataReady int32 // flag data has arrived
+
+ goAway int32 // flag id exhausted
+
+ deadline atomic.Value
+
+ shaper chan writeRequest // a shaper for writing
+ writes chan writeRequest
+}
+
+func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
+ s := new(Session)
+ s.die = make(chan struct{})
+ s.conn = conn
+ s.config = config
+ s.streams = make(map[uint32]*Stream)
+ s.chAccepts = make(chan *Stream, defaultAcceptBacklog)
+ s.bucket = int32(config.MaxReceiveBuffer)
+ s.bucketNotify = make(chan struct{}, 1)
+ s.shaper = make(chan writeRequest)
+ s.writes = make(chan writeRequest)
+ s.chSocketReadError = make(chan struct{})
+ s.chSocketWriteError = make(chan struct{})
+ s.chProtoError = make(chan struct{})
+
+ if client {
+ s.nextStreamID = 1
+ } else {
+ s.nextStreamID = 0
+ }
+
+ go s.shaperLoop()
+ go s.recvLoop()
+ go s.sendLoop()
+ if !config.KeepAliveDisabled {
+ go s.keepalive()
+ }
+ return s
+}
+
+// OpenStream is used to create a new stream
+func (s *Session) OpenStream() (*Stream, error) {
+ if s.IsClosed() {
+ return nil, io.ErrClosedPipe
+ }
+
+ // generate stream id
+ s.nextStreamIDLock.Lock()
+ if s.goAway > 0 {
+ s.nextStreamIDLock.Unlock()
+ return nil, ErrGoAway
+ }
+
+ s.nextStreamID += 2
+ sid := s.nextStreamID
+ if sid == sid%2 { // stream-id overflows
+ s.goAway = 1
+ s.nextStreamIDLock.Unlock()
+ return nil, ErrGoAway
+ }
+ s.nextStreamIDLock.Unlock()
+
+ stream := newStream(sid, s.config.MaxFrameSize, s)
+
+ if _, err := s.writeFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil {
+ return nil, err
+ }
+
+ s.streamLock.Lock()
+ defer s.streamLock.Unlock()
+ select {
+ case <-s.chSocketReadError:
+ return nil, s.socketReadError.Load().(error)
+ case <-s.chSocketWriteError:
+ return nil, s.socketWriteError.Load().(error)
+ case <-s.die:
+ return nil, io.ErrClosedPipe
+ default:
+ s.streams[sid] = stream
+ return stream, nil
+ }
+}
+
+// Open returns a generic ReadWriteCloser
+func (s *Session) Open() (io.ReadWriteCloser, error) {
+ return s.OpenStream()
+}
+
+// AcceptStream is used to block until the next available stream
+// is ready to be accepted.
+func (s *Session) AcceptStream() (*Stream, error) {
+ var deadline <-chan time.Time
+ if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() {
+ timer := time.NewTimer(time.Until(d))
+ defer timer.Stop()
+ deadline = timer.C
+ }
+
+ select {
+ case stream := <-s.chAccepts:
+ return stream, nil
+ case <-deadline:
+ return nil, ErrTimeout
+ case <-s.chSocketReadError:
+ return nil, s.socketReadError.Load().(error)
+ case <-s.chProtoError:
+ return nil, s.protoError.Load().(error)
+ case <-s.die:
+ return nil, io.ErrClosedPipe
+ }
+}
+
+// Accept Returns a generic ReadWriteCloser instead of smux.Stream
+func (s *Session) Accept() (io.ReadWriteCloser, error) {
+ return s.AcceptStream()
+}
+
+// Close is used to close the session and all streams.
+func (s *Session) Close() error {
+ var once bool
+ s.dieOnce.Do(func() {
+ close(s.die)
+ once = true
+ })
+
+ if once {
+ s.streamLock.Lock()
+ for k := range s.streams {
+ s.streams[k].sessionClose()
+ }
+ s.streamLock.Unlock()
+ return s.conn.Close()
+ } else {
+ return io.ErrClosedPipe
+ }
+}
+
+// notifyBucket notifies recvLoop that bucket is available
+func (s *Session) notifyBucket() {
+ select {
+ case s.bucketNotify <- struct{}{}:
+ default:
+ }
+}
+
+func (s *Session) notifyReadError(err error) {
+ s.socketReadErrorOnce.Do(func() {
+ s.socketReadError.Store(err)
+ close(s.chSocketReadError)
+ })
+}
+
+func (s *Session) notifyWriteError(err error) {
+ s.socketWriteErrorOnce.Do(func() {
+ s.socketWriteError.Store(err)
+ close(s.chSocketWriteError)
+ })
+}
+
+func (s *Session) notifyProtoError(err error) {
+ s.protoErrorOnce.Do(func() {
+ s.protoError.Store(err)
+ close(s.chProtoError)
+ })
+}
+
+// IsClosed does a safe check to see if we have shutdown
+func (s *Session) IsClosed() bool {
+ select {
+ case <-s.die:
+ return true
+ default:
+ return false
+ }
+}
+
+// NumStreams returns the number of currently open streams
+func (s *Session) NumStreams() int {
+ if s.IsClosed() {
+ return 0
+ }
+ s.streamLock.Lock()
+ defer s.streamLock.Unlock()
+ return len(s.streams)
+}
+
+// SetDeadline sets a deadline used by Accept* calls.
+// A zero time value disables the deadline.
+func (s *Session) SetDeadline(t time.Time) error {
+ s.deadline.Store(t)
+ return nil
+}
+
+// LocalAddr satisfies net.Conn interface
+func (s *Session) LocalAddr() net.Addr {
+ if ts, ok := s.conn.(interface {
+ LocalAddr() net.Addr
+ }); ok {
+ return ts.LocalAddr()
+ }
+ return nil
+}
+
+// RemoteAddr satisfies net.Conn interface
+func (s *Session) RemoteAddr() net.Addr {
+ if ts, ok := s.conn.(interface {
+ RemoteAddr() net.Addr
+ }); ok {
+ return ts.RemoteAddr()
+ }
+ return nil
+}
+
+// notify the session that a stream has closed
+func (s *Session) streamClosed(sid uint32) {
+ s.streamLock.Lock()
+ if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket
+ if atomic.AddInt32(&s.bucket, int32(n)) > 0 {
+ s.notifyBucket()
+ }
+ }
+ delete(s.streams, sid)
+ s.streamLock.Unlock()
+}
+
+// returnTokens is called by stream to return token after read
+func (s *Session) returnTokens(n int) {
+ if atomic.AddInt32(&s.bucket, int32(n)) > 0 {
+ s.notifyBucket()
+ }
+}
+
+// recvLoop keeps on reading from underlying connection if tokens are available
+func (s *Session) recvLoop() {
+ var hdr rawHeader
+ var updHdr updHeader
+
+ for {
+ for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
+ select {
+ case <-s.bucketNotify:
+ case <-s.die:
+ return
+ }
+ }
+
+ // read header first
+ if _, err := io.ReadFull(s.conn, hdr[:]); err == nil {
+ atomic.StoreInt32(&s.dataReady, 1)
+ if hdr.Version() != byte(s.config.Version) {
+ s.notifyProtoError(ErrInvalidProtocol)
+ return
+ }
+ sid := hdr.StreamID()
+ switch hdr.Cmd() {
+ case cmdNOP:
+ case cmdSYN:
+ s.streamLock.Lock()
+ if _, ok := s.streams[sid]; !ok {
+ stream := newStream(sid, s.config.MaxFrameSize, s)
+ s.streams[sid] = stream
+ select {
+ case s.chAccepts <- stream:
+ case <-s.die:
+ }
+ }
+ s.streamLock.Unlock()
+ case cmdFIN:
+ s.streamLock.Lock()
+ if stream, ok := s.streams[sid]; ok {
+ stream.fin()
+ stream.notifyReadEvent()
+ }
+ s.streamLock.Unlock()
+ case cmdPSH:
+ if hdr.Length() > 0 {
+ newbuf := defaultAllocator.Get(int(hdr.Length()))
+ if written, err := io.ReadFull(s.conn, newbuf); err == nil {
+ s.streamLock.Lock()
+ if stream, ok := s.streams[sid]; ok {
+ stream.pushBytes(newbuf)
+ atomic.AddInt32(&s.bucket, -int32(written))
+ stream.notifyReadEvent()
+ }
+ s.streamLock.Unlock()
+ } else {
+ s.notifyReadError(err)
+ return
+ }
+ }
+ case cmdUPD:
+ if _, err := io.ReadFull(s.conn, updHdr[:]); err == nil {
+ s.streamLock.Lock()
+ if stream, ok := s.streams[sid]; ok {
+ stream.update(updHdr.Consumed(), updHdr.Window())
+ }
+ s.streamLock.Unlock()
+ } else {
+ s.notifyReadError(err)
+ return
+ }
+ default:
+ s.notifyProtoError(ErrInvalidProtocol)
+ return
+ }
+ } else {
+ s.notifyReadError(err)
+ return
+ }
+ }
+}
+
+func (s *Session) keepalive() {
+ tickerPing := time.NewTicker(s.config.KeepAliveInterval)
+ tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout)
+ defer tickerPing.Stop()
+ defer tickerTimeout.Stop()
+ for {
+ select {
+ case <-tickerPing.C:
+ s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, 0)
+ s.notifyBucket() // force a signal to the recvLoop
+ case <-tickerTimeout.C:
+ if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) {
+ // recvLoop may block while bucket is 0, in this case,
+ // session should not be closed.
+ if atomic.LoadInt32(&s.bucket) > 0 {
+ s.Close()
+ return
+ }
+ }
+ case <-s.die:
+ return
+ }
+ }
+}
+
+// shaper shapes the sending sequence among streams
+func (s *Session) shaperLoop() {
+ var reqs shaperHeap
+ var next writeRequest
+ var chWrite chan writeRequest
+
+ for {
+ if len(reqs) > 0 {
+ chWrite = s.writes
+ next = heap.Pop(&reqs).(writeRequest)
+ } else {
+ chWrite = nil
+ }
+
+ select {
+ case <-s.die:
+ return
+ case r := <-s.shaper:
+ if chWrite != nil { // next is valid, reshape
+ heap.Push(&reqs, next)
+ }
+ heap.Push(&reqs, r)
+ case chWrite <- next:
+ }
+ }
+}
+
+func (s *Session) sendLoop() {
+ var buf []byte
+ var n int
+ var err error
+ var vec [][]byte // vector for writeBuffers
+
+ bw, ok := s.conn.(buffersWriter)
+ if ok {
+ buf = make([]byte, headerSize)
+ vec = make([][]byte, 2)
+ } else {
+ buf = make([]byte, (1<<16)+headerSize)
+ }
+
+ for {
+ select {
+ case <-s.die:
+ return
+ case request := <-s.writes:
+ buf[0] = request.frame.ver
+ buf[1] = request.frame.cmd
+ binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data)))
+ binary.LittleEndian.PutUint32(buf[4:], request.frame.sid)
+
+ if len(vec) > 0 {
+ vec[0] = buf[:headerSize]
+ vec[1] = request.frame.data
+ n, err = bw.WriteBuffers(vec)
+ } else {
+ copy(buf[headerSize:], request.frame.data)
+ n, err = s.conn.Write(buf[:headerSize+len(request.frame.data)])
+ }
+
+ n -= headerSize
+ if n < 0 {
+ n = 0
+ }
+
+ result := writeResult{
+ n: n,
+ err: err,
+ }
+
+ request.result <- result
+ close(request.result)
+
+ // store conn error
+ if err != nil {
+ s.notifyWriteError(err)
+ return
+ }
+ }
+ }
+}
+
+// writeFrame writes the frame to the underlying connection
+// and returns the number of bytes written if successful
+func (s *Session) writeFrame(f Frame) (n int, err error) {
+ return s.writeFrameInternal(f, nil, 0)
+}
+
+// internal writeFrame version to support deadline used in keepalive
+func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint64) (int, error) {
+ req := writeRequest{
+ prio: prio,
+ frame: f,
+ result: make(chan writeResult, 1),
+ }
+ select {
+ case s.shaper <- req:
+ case <-s.die:
+ return 0, io.ErrClosedPipe
+ case <-s.chSocketWriteError:
+ return 0, s.socketWriteError.Load().(error)
+ case <-deadline:
+ return 0, ErrTimeout
+ }
+
+ select {
+ case result := <-req.result:
+ return result.n, result.err
+ case <-s.die:
+ return 0, io.ErrClosedPipe
+ case <-s.chSocketWriteError:
+ return 0, s.socketWriteError.Load().(error)
+ case <-deadline:
+ return 0, ErrTimeout
+ }
+}
diff --git a/vendor/github.com/xtaci/smux/shaper.go b/vendor/github.com/xtaci/smux/shaper.go
new file mode 100644
index 0000000..be03406
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/shaper.go
@@ -0,0 +1,16 @@
+package smux
+
+type shaperHeap []writeRequest
+
+func (h shaperHeap) Len() int { return len(h) }
+func (h shaperHeap) Less(i, j int) bool { return h[i].prio < h[j].prio }
+func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) }
+
+func (h *shaperHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
diff --git a/vendor/github.com/xtaci/smux/smux.png b/vendor/github.com/xtaci/smux/smux.png
new file mode 100644
index 0000000..26aba3b
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/smux.png
Binary files differ
diff --git a/vendor/github.com/xtaci/smux/stream.go b/vendor/github.com/xtaci/smux/stream.go
new file mode 100644
index 0000000..6c9499c
--- /dev/null
+++ b/vendor/github.com/xtaci/smux/stream.go
@@ -0,0 +1,549 @@
+package smux
+
+import (
+ "encoding/binary"
+ "io"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// Stream implements net.Conn
+type Stream struct {
+ id uint32
+ sess *Session
+
+ buffers [][]byte
+ heads [][]byte // slice heads kept for recycle
+
+ bufferLock sync.Mutex
+ frameSize int
+
+ // notify a read event
+ chReadEvent chan struct{}
+
+ // flag the stream has closed
+ die chan struct{}
+ dieOnce sync.Once
+
+ // FIN command
+ chFinEvent chan struct{}
+ finEventOnce sync.Once
+
+ // deadlines
+ readDeadline atomic.Value
+ writeDeadline atomic.Value
+
+ // per stream sliding window control
+ numRead uint32 // number of consumed bytes
+ numWritten uint32 // count num of bytes written
+ incr uint32 // counting for sending
+
+ // UPD command
+ peerConsumed uint32 // num of bytes the peer has consumed
+ peerWindow uint32 // peer window, initialized to 256KB, updated by peer
+ chUpdate chan struct{} // notify of remote data consuming and window update
+}
+
+// newStream initiates a Stream struct
+func newStream(id uint32, frameSize int, sess *Session) *Stream {
+ s := new(Stream)
+ s.id = id
+ s.chReadEvent = make(chan struct{}, 1)
+ s.chUpdate = make(chan struct{}, 1)
+ s.frameSize = frameSize
+ s.sess = sess
+ s.die = make(chan struct{})
+ s.chFinEvent = make(chan struct{})
+ s.peerWindow = initialPeerWindow // set to initial window size
+ return s
+}
+
+// ID returns the unique stream ID.
+func (s *Stream) ID() uint32 {
+ return s.id
+}
+
+// Read implements net.Conn
+func (s *Stream) Read(b []byte) (n int, err error) {
+ for {
+ n, err = s.tryRead(b)
+ if err == ErrWouldBlock {
+ if ew := s.waitRead(); ew != nil {
+ return 0, ew
+ }
+ } else {
+ return n, err
+ }
+ }
+}
+
+// tryRead is the nonblocking version of Read
+func (s *Stream) tryRead(b []byte) (n int, err error) {
+ if s.sess.config.Version == 2 {
+ return s.tryReadv2(b)
+ }
+
+ if len(b) == 0 {
+ return 0, nil
+ }
+
+ s.bufferLock.Lock()
+ if len(s.buffers) > 0 {
+ n = copy(b, s.buffers[0])
+ s.buffers[0] = s.buffers[0][n:]
+ if len(s.buffers[0]) == 0 {
+ s.buffers[0] = nil
+ s.buffers = s.buffers[1:]
+ // full recycle
+ defaultAllocator.Put(s.heads[0])
+ s.heads = s.heads[1:]
+ }
+ }
+ s.bufferLock.Unlock()
+
+ if n > 0 {
+ s.sess.returnTokens(n)
+ return n, nil
+ }
+
+ select {
+ case <-s.die:
+ return 0, io.EOF
+ default:
+ return 0, ErrWouldBlock
+ }
+}
+
+func (s *Stream) tryReadv2(b []byte) (n int, err error) {
+ if len(b) == 0 {
+ return 0, nil
+ }
+
+ var notifyConsumed uint32
+ s.bufferLock.Lock()
+ if len(s.buffers) > 0 {
+ n = copy(b, s.buffers[0])
+ s.buffers[0] = s.buffers[0][n:]
+ if len(s.buffers[0]) == 0 {
+ s.buffers[0] = nil
+ s.buffers = s.buffers[1:]
+ // full recycle
+ defaultAllocator.Put(s.heads[0])
+ s.heads = s.heads[1:]
+ }
+ }
+
+ // in an ideal environment:
+ // if more than half of buffer has consumed, send read ack to peer
+ // based on round-trip time of ACK, continous flowing data
+ // won't slow down because of waiting for ACK, as long as the
+ // consumer keeps on reading data
+ // s.numRead == n also notify window at the first read
+ s.numRead += uint32(n)
+ s.incr += uint32(n)
+ if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(n) {
+ notifyConsumed = s.numRead
+ s.incr = 0
+ }
+ s.bufferLock.Unlock()
+
+ if n > 0 {
+ s.sess.returnTokens(n)
+ if notifyConsumed > 0 {
+ err := s.sendWindowUpdate(notifyConsumed)
+ return n, err
+ } else {
+ return n, nil
+ }
+ }
+
+ select {
+ case <-s.die:
+ return 0, io.EOF
+ default:
+ return 0, ErrWouldBlock
+ }
+}
+
+// WriteTo implements io.WriteTo
+func (s *Stream) WriteTo(w io.Writer) (n int64, err error) {
+ if s.sess.config.Version == 2 {
+ return s.writeTov2(w)
+ }
+
+ for {
+ var buf []byte
+ s.bufferLock.Lock()
+ if len(s.buffers) > 0 {
+ buf = s.buffers[0]
+ s.buffers = s.buffers[1:]
+ s.heads = s.heads[1:]
+ }
+ s.bufferLock.Unlock()
+
+ if buf != nil {
+ nw, ew := w.Write(buf)
+ s.sess.returnTokens(len(buf))
+ defaultAllocator.Put(buf)
+ if nw > 0 {
+ n += int64(nw)
+ }
+
+ if ew != nil {
+ return n, ew
+ }
+ } else if ew := s.waitRead(); ew != nil {
+ return n, ew
+ }
+ }
+}
+
+func (s *Stream) writeTov2(w io.Writer) (n int64, err error) {
+ for {
+ var notifyConsumed uint32
+ var buf []byte
+ s.bufferLock.Lock()
+ if len(s.buffers) > 0 {
+ buf = s.buffers[0]
+ s.buffers = s.buffers[1:]
+ s.heads = s.heads[1:]
+ }
+ s.numRead += uint32(len(buf))
+ s.incr += uint32(len(buf))
+ if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(len(buf)) {
+ notifyConsumed = s.numRead
+ s.incr = 0
+ }
+ s.bufferLock.Unlock()
+
+ if buf != nil {
+ nw, ew := w.Write(buf)
+ s.sess.returnTokens(len(buf))
+ defaultAllocator.Put(buf)
+ if nw > 0 {
+ n += int64(nw)
+ }
+
+ if ew != nil {
+ return n, ew
+ }
+
+ if notifyConsumed > 0 {
+ if err := s.sendWindowUpdate(notifyConsumed); err != nil {
+ return n, err
+ }
+ }
+ } else if ew := s.waitRead(); ew != nil {
+ return n, ew
+ }
+ }
+}
+
+func (s *Stream) sendWindowUpdate(consumed uint32) error {
+ var timer *time.Timer
+ var deadline <-chan time.Time
+ if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() {
+ timer = time.NewTimer(time.Until(d))
+ defer timer.Stop()
+ deadline = timer.C
+ }
+
+ frame := newFrame(byte(s.sess.config.Version), cmdUPD, s.id)
+ var hdr updHeader
+ binary.LittleEndian.PutUint32(hdr[:], consumed)
+ binary.LittleEndian.PutUint32(hdr[4:], uint32(s.sess.config.MaxStreamBuffer))
+ frame.data = hdr[:]
+ _, err := s.sess.writeFrameInternal(frame, deadline, 0)
+ return err
+}
+
+func (s *Stream) waitRead() error {
+ var timer *time.Timer
+ var deadline <-chan time.Time
+ if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() {
+ timer = time.NewTimer(time.Until(d))
+ defer timer.Stop()
+ deadline = timer.C
+ }
+
+ select {
+ case <-s.chReadEvent:
+ return nil
+ case <-s.chFinEvent:
+ // BUG(xtaci): Fix for https://github.com/xtaci/smux/issues/82
+ s.bufferLock.Lock()
+ defer s.bufferLock.Unlock()
+ if len(s.buffers) > 0 {
+ return nil
+ }
+ return io.EOF
+ case <-s.sess.chSocketReadError:
+ return s.sess.socketReadError.Load().(error)
+ case <-s.sess.chProtoError:
+ return s.sess.protoError.Load().(error)
+ case <-deadline:
+ return ErrTimeout
+ case <-s.die:
+ return io.ErrClosedPipe
+ }
+
+}
+
+// Write implements net.Conn
+//
+// Note that the behavior when multiple goroutines write concurrently is not deterministic,
+// frames may interleave in random way.
+func (s *Stream) Write(b []byte) (n int, err error) {
+ if s.sess.config.Version == 2 {
+ return s.writeV2(b)
+ }
+
+ var deadline <-chan time.Time
+ if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() {
+ timer := time.NewTimer(time.Until(d))
+ defer timer.Stop()
+ deadline = timer.C
+ }
+
+ // check if stream has closed
+ select {
+ case <-s.die:
+ return 0, io.ErrClosedPipe
+ default:
+ }
+
+ // frame split and transmit
+ sent := 0
+ frame := newFrame(byte(s.sess.config.Version), cmdPSH, s.id)
+ bts := b
+ for len(bts) > 0 {
+ sz := len(bts)
+ if sz > s.frameSize {
+ sz = s.frameSize
+ }
+ frame.data = bts[:sz]
+ bts = bts[sz:]
+ n, err := s.sess.writeFrameInternal(frame, deadline, uint64(s.numWritten))
+ s.numWritten++
+ sent += n
+ if err != nil {
+ return sent, err
+ }
+ }
+
+ return sent, nil
+}
+
+func (s *Stream) writeV2(b []byte) (n int, err error) {
+ // check empty input
+ if len(b) == 0 {
+ return 0, nil
+ }
+
+ // check if stream has closed
+ select {
+ case <-s.die:
+ return 0, io.ErrClosedPipe
+ default:
+ }
+
+ // create write deadline timer
+ var deadline <-chan time.Time
+ if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() {
+ timer := time.NewTimer(time.Until(d))
+ defer timer.Stop()
+ deadline = timer.C
+ }
+
+ // frame split and transmit process
+ sent := 0
+ frame := newFrame(byte(s.sess.config.Version), cmdPSH, s.id)
+
+ for {
+ // per stream sliding window control
+ // [.... [consumed... numWritten] ... win... ]
+ // [.... [consumed...................+rmtwnd]]
+ var bts []byte
+ // note:
+ // even if uint32 overflow, this math still works:
+ // eg1: uint32(0) - uint32(math.MaxUint32) = 1
+ // eg2: int32(uint32(0) - uint32(1)) = -1
+ // security check for misbehavior
+ inflight := int32(atomic.LoadUint32(&s.numWritten) - atomic.LoadUint32(&s.peerConsumed))
+ if inflight < 0 {
+ return 0, ErrConsumed
+ }
+
+ win := int32(atomic.LoadUint32(&s.peerWindow)) - inflight
+ if win > 0 {
+ if win > int32(len(b)) {
+ bts = b
+ b = nil
+ } else {
+ bts = b[:win]
+ b = b[win:]
+ }
+
+ for len(bts) > 0 {
+ sz := len(bts)
+ if sz > s.frameSize {
+ sz = s.frameSize
+ }
+ frame.data = bts[:sz]
+ bts = bts[sz:]
+ n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
+ atomic.AddUint32(&s.numWritten, uint32(sz))
+ sent += n
+ if err != nil {
+ return sent, err
+ }
+ }
+ }
+
+ // if there is any data remaining to be sent
+ // wait until stream closes, window changes or deadline reached
+ // this blocking behavior will inform upper layer to do flow control
+ if len(b) > 0 {
+ select {
+ case <-s.chFinEvent: // if fin arrived, future window update is impossible
+ return 0, io.EOF
+ case <-s.die:
+ return sent, io.ErrClosedPipe
+ case <-deadline:
+ return sent, ErrTimeout
+ case <-s.sess.chSocketWriteError:
+ return sent, s.sess.socketWriteError.Load().(error)
+ case <-s.chUpdate:
+ continue
+ }
+ } else {
+ return sent, nil
+ }
+ }
+}
+
+// Close implements net.Conn
+func (s *Stream) Close() error {
+ var once bool
+ var err error
+ s.dieOnce.Do(func() {
+ close(s.die)
+ once = true
+ })
+
+ if once {
+ _, err = s.sess.writeFrame(newFrame(byte(s.sess.config.Version), cmdFIN, s.id))
+ s.sess.streamClosed(s.id)
+ return err
+ } else {
+ return io.ErrClosedPipe
+ }
+}
+
+// GetDieCh returns a readonly chan which can be readable
+// when the stream is to be closed.
+func (s *Stream) GetDieCh() <-chan struct{} {
+ return s.die
+}
+
+// SetReadDeadline sets the read deadline as defined by
+// net.Conn.SetReadDeadline.
+// A zero time value disables the deadline.
+func (s *Stream) SetReadDeadline(t time.Time) error {
+ s.readDeadline.Store(t)
+ s.notifyReadEvent()
+ return nil
+}
+
+// SetWriteDeadline sets the write deadline as defined by
+// net.Conn.SetWriteDeadline.
+// A zero time value disables the deadline.
+func (s *Stream) SetWriteDeadline(t time.Time) error {
+ s.writeDeadline.Store(t)
+ return nil
+}
+
+// SetDeadline sets both read and write deadlines as defined by
+// net.Conn.SetDeadline.
+// A zero time value disables the deadlines.
+func (s *Stream) SetDeadline(t time.Time) error {
+ if err := s.SetReadDeadline(t); err != nil {
+ return err
+ }
+ if err := s.SetWriteDeadline(t); err != nil {
+ return err
+ }
+ return nil
+}
+
+// session closes
+func (s *Stream) sessionClose() { s.dieOnce.Do(func() { close(s.die) }) }
+
+// LocalAddr satisfies net.Conn interface
+func (s *Stream) LocalAddr() net.Addr {
+ if ts, ok := s.sess.conn.(interface {
+ LocalAddr() net.Addr
+ }); ok {
+ return ts.LocalAddr()
+ }
+ return nil
+}
+
+// RemoteAddr satisfies net.Conn interface
+func (s *Stream) RemoteAddr() net.Addr {
+ if ts, ok := s.sess.conn.(interface {
+ RemoteAddr() net.Addr
+ }); ok {
+ return ts.RemoteAddr()
+ }
+ return nil
+}
+
+// pushBytes append buf to buffers
+func (s *Stream) pushBytes(buf []byte) (written int, err error) {
+ s.bufferLock.Lock()
+ s.buffers = append(s.buffers, buf)
+ s.heads = append(s.heads, buf)
+ s.bufferLock.Unlock()
+ return
+}
+
+// recycleTokens transform remaining bytes to tokens(will truncate buffer)
+func (s *Stream) recycleTokens() (n int) {
+ s.bufferLock.Lock()
+ for k := range s.buffers {
+ n += len(s.buffers[k])
+ defaultAllocator.Put(s.heads[k])
+ }
+ s.buffers = nil
+ s.heads = nil
+ s.bufferLock.Unlock()
+ return
+}
+
+// notify read event
+func (s *Stream) notifyReadEvent() {
+ select {
+ case s.chReadEvent <- struct{}{}:
+ default:
+ }
+}
+
+// update command
+func (s *Stream) update(consumed uint32, window uint32) {
+ atomic.StoreUint32(&s.peerConsumed, consumed)
+ atomic.StoreUint32(&s.peerWindow, window)
+ select {
+ case s.chUpdate <- struct{}{}:
+ default:
+ }
+}
+
+// mark this stream has been closed in protocol
+func (s *Stream) fin() {
+ s.finEventOnce.Do(func() {
+ close(s.chFinEvent)
+ })
+}