diff options
author | kali kaneko (leap communications) <kali@leap.se> | 2021-11-29 01:46:27 +0100 |
---|---|---|
committer | kali kaneko (leap communications) <kali@leap.se> | 2021-11-29 18:14:16 +0100 |
commit | 18f52af5be3a9a0c73811706108f790d65ee9c67 (patch) | |
tree | e13cbacb47d56919caa9c44a2b45dec1497a7860 /vendor/github.com/xtaci/smux | |
parent | ebcef0d57b6ecb5a40c6579f6be07182dd3033ba (diff) |
[pkg] update vendor
Diffstat (limited to 'vendor/github.com/xtaci/smux')
-rw-r--r-- | vendor/github.com/xtaci/smux/.gitignore | 24 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/.travis.yml | 17 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/LICENSE | 21 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/README.md | 136 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/alloc.go | 72 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/curve.jpg | bin | 0 -> 106626 bytes | |||
-rw-r--r-- | vendor/github.com/xtaci/smux/frame.go | 81 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/go.mod | 3 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/go.sum | 0 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/mux.go | 110 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/mux.jpg | bin | 0 -> 6199 bytes | |||
-rw-r--r-- | vendor/github.com/xtaci/smux/session.go | 525 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/shaper.go | 16 | ||||
-rw-r--r-- | vendor/github.com/xtaci/smux/smux.png | bin | 0 -> 9891 bytes | |||
-rw-r--r-- | vendor/github.com/xtaci/smux/stream.go | 549 |
15 files changed, 1554 insertions, 0 deletions
diff --git a/vendor/github.com/xtaci/smux/.gitignore b/vendor/github.com/xtaci/smux/.gitignore new file mode 100644 index 0000000..daf913b --- /dev/null +++ b/vendor/github.com/xtaci/smux/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/xtaci/smux/.travis.yml b/vendor/github.com/xtaci/smux/.travis.yml new file mode 100644 index 0000000..e1d30fa --- /dev/null +++ b/vendor/github.com/xtaci/smux/.travis.yml @@ -0,0 +1,17 @@ +language: go +go: + - 1.9.x + - 1.10.x + - 1.11.x + +before_install: + - go get -t -v ./... + +install: + - go get github.com/xtaci/smux + +script: + - go test -coverprofile=coverage.txt -covermode=atomic -bench . + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/xtaci/smux/LICENSE b/vendor/github.com/xtaci/smux/LICENSE new file mode 100644 index 0000000..eed41ac --- /dev/null +++ b/vendor/github.com/xtaci/smux/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016-2017 Daniel Fu + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/xtaci/smux/README.md b/vendor/github.com/xtaci/smux/README.md new file mode 100644 index 0000000..a5c4680 --- /dev/null +++ b/vendor/github.com/xtaci/smux/README.md @@ -0,0 +1,136 @@ +<img src="smux.png" alt="smux" height="35px" /> + +[![GoDoc][1]][2] [![MIT licensed][3]][4] [![Build Status][5]][6] [![Go Report Card][7]][8] [![Coverage Statusd][9]][10] [![Sourcegraph][11]][12] + +<img src="mux.jpg" alt="smux" height="120px" /> + +[1]: https://godoc.org/github.com/xtaci/smux?status.svg +[2]: https://godoc.org/github.com/xtaci/smux +[3]: https://img.shields.io/badge/license-MIT-blue.svg +[4]: LICENSE +[5]: https://travis-ci.org/xtaci/smux.svg?branch=master +[6]: https://travis-ci.org/xtaci/smux +[7]: https://goreportcard.com/badge/github.com/xtaci/smux +[8]: https://goreportcard.com/report/github.com/xtaci/smux +[9]: https://codecov.io/gh/xtaci/smux/branch/master/graph/badge.svg +[10]: https://codecov.io/gh/xtaci/smux +[11]: https://sourcegraph.com/github.com/xtaci/smux/-/badge.svg +[12]: https://sourcegraph.com/github.com/xtaci/smux?badge + +## Introduction + +Smux ( **S**imple **MU**ltiple**X**ing) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or [KCP](https://github.com/xtaci/kcp-go), and provides stream-oriented multiplexing. The original intention of this library is to power the connection management for [kcp-go](https://github.com/xtaci/kcp-go). + +## Features + +1. ***Token bucket*** controlled receiving, which provides smoother bandwidth graph(see picture below). +2. Session-wide receive buffer, shared among streams, **fully controlled** overall memory usage. +3. Minimized header(8Bytes), maximized payload. +4. Well-tested on millions of devices in [kcptun](https://github.com/xtaci/kcptun). +5. Builtin fair queue traffic shaping. +6. Per-stream sliding window to control congestion.(protocol version 2+). + +![smooth bandwidth curve](curve.jpg) + +## Documentation + +For complete documentation, see the associated [Godoc](https://godoc.org/github.com/xtaci/smux). + +## Benchmark +``` +$ go test -v -run=^$ -bench . +goos: darwin +goarch: amd64 +pkg: github.com/xtaci/smux +BenchmarkMSB-4 30000000 51.8 ns/op +BenchmarkAcceptClose-4 50000 36783 ns/op +BenchmarkConnSmux-4 30000 58335 ns/op 2246.88 MB/s 1208 B/op 19 allocs/op +BenchmarkConnTCP-4 50000 25579 ns/op 5124.04 MB/s 0 B/op 0 allocs/op +PASS +ok github.com/xtaci/smux 7.811s +``` + +## Specification + +``` +VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH) + +VALUES FOR LATEST VERSION: +VERSION: + 1/2 + +CMD: + cmdSYN(0) + cmdFIN(1) + cmdPSH(2) + cmdNOP(3) + cmdUPD(4) // only supported on version 2 + +STREAMID: + client use odd numbers starts from 1 + server use even numbers starts from 0 + +cmdUPD: + | CONSUMED(4B) | WINDOW(4B) | +``` + +## Usage + +```go + +func client() { + // Get a TCP connection + conn, err := net.Dial(...) + if err != nil { + panic(err) + } + + // Setup client side of smux + session, err := smux.Client(conn, nil) + if err != nil { + panic(err) + } + + // Open a new stream + stream, err := session.OpenStream() + if err != nil { + panic(err) + } + + // Stream implements io.ReadWriteCloser + stream.Write([]byte("ping")) + stream.Close() + session.Close() +} + +func server() { + // Accept a TCP connection + conn, err := listener.Accept() + if err != nil { + panic(err) + } + + // Setup server side of smux + session, err := smux.Server(conn, nil) + if err != nil { + panic(err) + } + + // Accept a stream + stream, err := session.AcceptStream() + if err != nil { + panic(err) + } + + // Listen for a message + buf := make([]byte, 4) + stream.Read(buf) + stream.Close() + session.Close() +} + +``` + +## Status + +Stable diff --git a/vendor/github.com/xtaci/smux/alloc.go b/vendor/github.com/xtaci/smux/alloc.go new file mode 100644 index 0000000..9e3acf3 --- /dev/null +++ b/vendor/github.com/xtaci/smux/alloc.go @@ -0,0 +1,72 @@ +package smux + +import ( + "errors" + "sync" +) + +var ( + defaultAllocator *Allocator + debruijinPos = [...]byte{0, 9, 1, 10, 13, 21, 2, 29, 11, 14, 16, 18, 22, 25, 3, 30, 8, 12, 20, 28, 15, 17, 24, 7, 19, 27, 23, 6, 26, 5, 4, 31} +) + +func init() { + defaultAllocator = NewAllocator() +} + +// Allocator for incoming frames, optimized to prevent overwriting after zeroing +type Allocator struct { + buffers []sync.Pool +} + +// NewAllocator initiates a []byte allocator for frames less than 65536 bytes, +// the waste(memory fragmentation) of space allocation is guaranteed to be +// no more than 50%. +func NewAllocator() *Allocator { + alloc := new(Allocator) + alloc.buffers = make([]sync.Pool, 17) // 1B -> 64K + for k := range alloc.buffers { + i := k + alloc.buffers[k].New = func() interface{} { + return make([]byte, 1<<uint32(i)) + } + } + return alloc +} + +// Get a []byte from pool with most appropriate cap +func (alloc *Allocator) Get(size int) []byte { + if size <= 0 || size > 65536 { + return nil + } + + bits := msb(size) + if size == 1<<bits { + return alloc.buffers[bits].Get().([]byte)[:size] + } else { + return alloc.buffers[bits+1].Get().([]byte)[:size] + } +} + +// Put returns a []byte to pool for future use, +// which the cap must be exactly 2^n +func (alloc *Allocator) Put(buf []byte) error { + bits := msb(cap(buf)) + if cap(buf) == 0 || cap(buf) > 65536 || cap(buf) != 1<<bits { + return errors.New("allocator Put() incorrect buffer size") + } + alloc.buffers[bits].Put(buf) + return nil +} + +// msb return the pos of most significiant bit +// http://supertech.csail.mit.edu/papers/debruijn.pdf +func msb(size int) byte { + v := uint32(size) + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + return debruijinPos[(v*0x07C4ACDD)>>27] +} diff --git a/vendor/github.com/xtaci/smux/curve.jpg b/vendor/github.com/xtaci/smux/curve.jpg Binary files differnew file mode 100644 index 0000000..3fc4863 --- /dev/null +++ b/vendor/github.com/xtaci/smux/curve.jpg diff --git a/vendor/github.com/xtaci/smux/frame.go b/vendor/github.com/xtaci/smux/frame.go new file mode 100644 index 0000000..467a058 --- /dev/null +++ b/vendor/github.com/xtaci/smux/frame.go @@ -0,0 +1,81 @@ +package smux + +import ( + "encoding/binary" + "fmt" +) + +const ( // cmds + // protocol version 1: + cmdSYN byte = iota // stream open + cmdFIN // stream close, a.k.a EOF mark + cmdPSH // data push + cmdNOP // no operation + + // protocol version 2 extra commands + // notify bytes consumed by remote peer-end + cmdUPD +) + +const ( + // data size of cmdUPD, format: + // |4B data consumed(ACK)| 4B window size(WINDOW) | + szCmdUPD = 8 +) + +const ( + // initial peer window guess, a slow-start + initialPeerWindow = 262144 +) + +const ( + sizeOfVer = 1 + sizeOfCmd = 1 + sizeOfLength = 2 + sizeOfSid = 4 + headerSize = sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength +) + +// Frame defines a packet from or to be multiplexed into a single connection +type Frame struct { + ver byte + cmd byte + sid uint32 + data []byte +} + +func newFrame(version byte, cmd byte, sid uint32) Frame { + return Frame{ver: version, cmd: cmd, sid: sid} +} + +type rawHeader [headerSize]byte + +func (h rawHeader) Version() byte { + return h[0] +} + +func (h rawHeader) Cmd() byte { + return h[1] +} + +func (h rawHeader) Length() uint16 { + return binary.LittleEndian.Uint16(h[2:]) +} + +func (h rawHeader) StreamID() uint32 { + return binary.LittleEndian.Uint32(h[4:]) +} + +func (h rawHeader) String() string { + return fmt.Sprintf("Version:%d Cmd:%d StreamID:%d Length:%d", + h.Version(), h.Cmd(), h.StreamID(), h.Length()) +} + +type updHeader [szCmdUPD]byte + +func (h updHeader) Consumed() uint32 { + return binary.LittleEndian.Uint32(h[:]) +} +func (h updHeader) Window() uint32 { + return binary.LittleEndian.Uint32(h[4:]) +} diff --git a/vendor/github.com/xtaci/smux/go.mod b/vendor/github.com/xtaci/smux/go.mod new file mode 100644 index 0000000..9ddead6 --- /dev/null +++ b/vendor/github.com/xtaci/smux/go.mod @@ -0,0 +1,3 @@ +module github.com/xtaci/smux + +go 1.13 diff --git a/vendor/github.com/xtaci/smux/go.sum b/vendor/github.com/xtaci/smux/go.sum new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/vendor/github.com/xtaci/smux/go.sum diff --git a/vendor/github.com/xtaci/smux/mux.go b/vendor/github.com/xtaci/smux/mux.go new file mode 100644 index 0000000..c0b8ab8 --- /dev/null +++ b/vendor/github.com/xtaci/smux/mux.go @@ -0,0 +1,110 @@ +// Package smux is a multiplexing library for Golang. +// +// It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, +// and provides stream-oriented multiplexing over a single channel. +package smux + +import ( + "errors" + "fmt" + "io" + "math" + "time" +) + +// Config is used to tune the Smux session +type Config struct { + // SMUX Protocol version, support 1,2 + Version int + + // Disabled keepalive + KeepAliveDisabled bool + + // KeepAliveInterval is how often to send a NOP command to the remote + KeepAliveInterval time.Duration + + // KeepAliveTimeout is how long the session + // will be closed if no data has arrived + KeepAliveTimeout time.Duration + + // MaxFrameSize is used to control the maximum + // frame size to sent to the remote + MaxFrameSize int + + // MaxReceiveBuffer is used to control the maximum + // number of data in the buffer pool + MaxReceiveBuffer int + + // MaxStreamBuffer is used to control the maximum + // number of data per stream + MaxStreamBuffer int +} + +// DefaultConfig is used to return a default configuration +func DefaultConfig() *Config { + return &Config{ + Version: 1, + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 32768, + MaxReceiveBuffer: 4194304, + MaxStreamBuffer: 65536, + } +} + +// VerifyConfig is used to verify the sanity of configuration +func VerifyConfig(config *Config) error { + if !(config.Version == 1 || config.Version == 2) { + return errors.New("unsupported protocol version") + } + if !config.KeepAliveDisabled { + if config.KeepAliveInterval == 0 { + return errors.New("keep-alive interval must be positive") + } + if config.KeepAliveTimeout < config.KeepAliveInterval { + return fmt.Errorf("keep-alive timeout must be larger than keep-alive interval") + } + } + if config.MaxFrameSize <= 0 { + return errors.New("max frame size must be positive") + } + if config.MaxFrameSize > 65535 { + return errors.New("max frame size must not be larger than 65535") + } + if config.MaxReceiveBuffer <= 0 { + return errors.New("max receive buffer must be positive") + } + if config.MaxStreamBuffer <= 0 { + return errors.New("max stream buffer must be positive") + } + if config.MaxStreamBuffer > config.MaxReceiveBuffer { + return errors.New("max stream buffer must not be larger than max receive buffer") + } + if config.MaxStreamBuffer > math.MaxInt32 { + return errors.New("max stream buffer cannot be larger than 2147483647") + } + return nil +} + +// Server is used to initialize a new server-side connection. +func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, false), nil +} + +// Client is used to initialize a new client-side connection. +func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, true), nil +} diff --git a/vendor/github.com/xtaci/smux/mux.jpg b/vendor/github.com/xtaci/smux/mux.jpg Binary files differnew file mode 100644 index 0000000..dde2e11 --- /dev/null +++ b/vendor/github.com/xtaci/smux/mux.jpg diff --git a/vendor/github.com/xtaci/smux/session.go b/vendor/github.com/xtaci/smux/session.go new file mode 100644 index 0000000..bc56066 --- /dev/null +++ b/vendor/github.com/xtaci/smux/session.go @@ -0,0 +1,525 @@ +package smux + +import ( + "container/heap" + "encoding/binary" + "errors" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +const ( + defaultAcceptBacklog = 1024 +) + +var ( + ErrInvalidProtocol = errors.New("invalid protocol") + ErrConsumed = errors.New("peer consumed more than sent") + ErrGoAway = errors.New("stream id overflows, should start a new connection") + ErrTimeout = errors.New("timeout") + ErrWouldBlock = errors.New("operation would block on IO") +) + +type writeRequest struct { + prio uint64 + frame Frame + result chan writeResult +} + +type writeResult struct { + n int + err error +} + +type buffersWriter interface { + WriteBuffers(v [][]byte) (n int, err error) +} + +// Session defines a multiplexed connection for streams +type Session struct { + conn io.ReadWriteCloser + + config *Config + nextStreamID uint32 // next stream identifier + nextStreamIDLock sync.Mutex + + bucket int32 // token bucket + bucketNotify chan struct{} // used for waiting for tokens + + streams map[uint32]*Stream // all streams in this session + streamLock sync.Mutex // locks streams + + die chan struct{} // flag session has died + dieOnce sync.Once + + // socket error handling + socketReadError atomic.Value + socketWriteError atomic.Value + chSocketReadError chan struct{} + chSocketWriteError chan struct{} + socketReadErrorOnce sync.Once + socketWriteErrorOnce sync.Once + + // smux protocol errors + protoError atomic.Value + chProtoError chan struct{} + protoErrorOnce sync.Once + + chAccepts chan *Stream + + dataReady int32 // flag data has arrived + + goAway int32 // flag id exhausted + + deadline atomic.Value + + shaper chan writeRequest // a shaper for writing + writes chan writeRequest +} + +func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { + s := new(Session) + s.die = make(chan struct{}) + s.conn = conn + s.config = config + s.streams = make(map[uint32]*Stream) + s.chAccepts = make(chan *Stream, defaultAcceptBacklog) + s.bucket = int32(config.MaxReceiveBuffer) + s.bucketNotify = make(chan struct{}, 1) + s.shaper = make(chan writeRequest) + s.writes = make(chan writeRequest) + s.chSocketReadError = make(chan struct{}) + s.chSocketWriteError = make(chan struct{}) + s.chProtoError = make(chan struct{}) + + if client { + s.nextStreamID = 1 + } else { + s.nextStreamID = 0 + } + + go s.shaperLoop() + go s.recvLoop() + go s.sendLoop() + if !config.KeepAliveDisabled { + go s.keepalive() + } + return s +} + +// OpenStream is used to create a new stream +func (s *Session) OpenStream() (*Stream, error) { + if s.IsClosed() { + return nil, io.ErrClosedPipe + } + + // generate stream id + s.nextStreamIDLock.Lock() + if s.goAway > 0 { + s.nextStreamIDLock.Unlock() + return nil, ErrGoAway + } + + s.nextStreamID += 2 + sid := s.nextStreamID + if sid == sid%2 { // stream-id overflows + s.goAway = 1 + s.nextStreamIDLock.Unlock() + return nil, ErrGoAway + } + s.nextStreamIDLock.Unlock() + + stream := newStream(sid, s.config.MaxFrameSize, s) + + if _, err := s.writeFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil { + return nil, err + } + + s.streamLock.Lock() + defer s.streamLock.Unlock() + select { + case <-s.chSocketReadError: + return nil, s.socketReadError.Load().(error) + case <-s.chSocketWriteError: + return nil, s.socketWriteError.Load().(error) + case <-s.die: + return nil, io.ErrClosedPipe + default: + s.streams[sid] = stream + return stream, nil + } +} + +// Open returns a generic ReadWriteCloser +func (s *Session) Open() (io.ReadWriteCloser, error) { + return s.OpenStream() +} + +// AcceptStream is used to block until the next available stream +// is ready to be accepted. +func (s *Session) AcceptStream() (*Stream, error) { + var deadline <-chan time.Time + if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + select { + case stream := <-s.chAccepts: + return stream, nil + case <-deadline: + return nil, ErrTimeout + case <-s.chSocketReadError: + return nil, s.socketReadError.Load().(error) + case <-s.chProtoError: + return nil, s.protoError.Load().(error) + case <-s.die: + return nil, io.ErrClosedPipe + } +} + +// Accept Returns a generic ReadWriteCloser instead of smux.Stream +func (s *Session) Accept() (io.ReadWriteCloser, error) { + return s.AcceptStream() +} + +// Close is used to close the session and all streams. +func (s *Session) Close() error { + var once bool + s.dieOnce.Do(func() { + close(s.die) + once = true + }) + + if once { + s.streamLock.Lock() + for k := range s.streams { + s.streams[k].sessionClose() + } + s.streamLock.Unlock() + return s.conn.Close() + } else { + return io.ErrClosedPipe + } +} + +// notifyBucket notifies recvLoop that bucket is available +func (s *Session) notifyBucket() { + select { + case s.bucketNotify <- struct{}{}: + default: + } +} + +func (s *Session) notifyReadError(err error) { + s.socketReadErrorOnce.Do(func() { + s.socketReadError.Store(err) + close(s.chSocketReadError) + }) +} + +func (s *Session) notifyWriteError(err error) { + s.socketWriteErrorOnce.Do(func() { + s.socketWriteError.Store(err) + close(s.chSocketWriteError) + }) +} + +func (s *Session) notifyProtoError(err error) { + s.protoErrorOnce.Do(func() { + s.protoError.Store(err) + close(s.chProtoError) + }) +} + +// IsClosed does a safe check to see if we have shutdown +func (s *Session) IsClosed() bool { + select { + case <-s.die: + return true + default: + return false + } +} + +// NumStreams returns the number of currently open streams +func (s *Session) NumStreams() int { + if s.IsClosed() { + return 0 + } + s.streamLock.Lock() + defer s.streamLock.Unlock() + return len(s.streams) +} + +// SetDeadline sets a deadline used by Accept* calls. +// A zero time value disables the deadline. +func (s *Session) SetDeadline(t time.Time) error { + s.deadline.Store(t) + return nil +} + +// LocalAddr satisfies net.Conn interface +func (s *Session) LocalAddr() net.Addr { + if ts, ok := s.conn.(interface { + LocalAddr() net.Addr + }); ok { + return ts.LocalAddr() + } + return nil +} + +// RemoteAddr satisfies net.Conn interface +func (s *Session) RemoteAddr() net.Addr { + if ts, ok := s.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return ts.RemoteAddr() + } + return nil +} + +// notify the session that a stream has closed +func (s *Session) streamClosed(sid uint32) { + s.streamLock.Lock() + if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.notifyBucket() + } + } + delete(s.streams, sid) + s.streamLock.Unlock() +} + +// returnTokens is called by stream to return token after read +func (s *Session) returnTokens(n int) { + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.notifyBucket() + } +} + +// recvLoop keeps on reading from underlying connection if tokens are available +func (s *Session) recvLoop() { + var hdr rawHeader + var updHdr updHeader + + for { + for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() { + select { + case <-s.bucketNotify: + case <-s.die: + return + } + } + + // read header first + if _, err := io.ReadFull(s.conn, hdr[:]); err == nil { + atomic.StoreInt32(&s.dataReady, 1) + if hdr.Version() != byte(s.config.Version) { + s.notifyProtoError(ErrInvalidProtocol) + return + } + sid := hdr.StreamID() + switch hdr.Cmd() { + case cmdNOP: + case cmdSYN: + s.streamLock.Lock() + if _, ok := s.streams[sid]; !ok { + stream := newStream(sid, s.config.MaxFrameSize, s) + s.streams[sid] = stream + select { + case s.chAccepts <- stream: + case <-s.die: + } + } + s.streamLock.Unlock() + case cmdFIN: + s.streamLock.Lock() + if stream, ok := s.streams[sid]; ok { + stream.fin() + stream.notifyReadEvent() + } + s.streamLock.Unlock() + case cmdPSH: + if hdr.Length() > 0 { + newbuf := defaultAllocator.Get(int(hdr.Length())) + if written, err := io.ReadFull(s.conn, newbuf); err == nil { + s.streamLock.Lock() + if stream, ok := s.streams[sid]; ok { + stream.pushBytes(newbuf) + atomic.AddInt32(&s.bucket, -int32(written)) + stream.notifyReadEvent() + } + s.streamLock.Unlock() + } else { + s.notifyReadError(err) + return + } + } + case cmdUPD: + if _, err := io.ReadFull(s.conn, updHdr[:]); err == nil { + s.streamLock.Lock() + if stream, ok := s.streams[sid]; ok { + stream.update(updHdr.Consumed(), updHdr.Window()) + } + s.streamLock.Unlock() + } else { + s.notifyReadError(err) + return + } + default: + s.notifyProtoError(ErrInvalidProtocol) + return + } + } else { + s.notifyReadError(err) + return + } + } +} + +func (s *Session) keepalive() { + tickerPing := time.NewTicker(s.config.KeepAliveInterval) + tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout) + defer tickerPing.Stop() + defer tickerTimeout.Stop() + for { + select { + case <-tickerPing.C: + s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, 0) + s.notifyBucket() // force a signal to the recvLoop + case <-tickerTimeout.C: + if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) { + // recvLoop may block while bucket is 0, in this case, + // session should not be closed. + if atomic.LoadInt32(&s.bucket) > 0 { + s.Close() + return + } + } + case <-s.die: + return + } + } +} + +// shaper shapes the sending sequence among streams +func (s *Session) shaperLoop() { + var reqs shaperHeap + var next writeRequest + var chWrite chan writeRequest + + for { + if len(reqs) > 0 { + chWrite = s.writes + next = heap.Pop(&reqs).(writeRequest) + } else { + chWrite = nil + } + + select { + case <-s.die: + return + case r := <-s.shaper: + if chWrite != nil { // next is valid, reshape + heap.Push(&reqs, next) + } + heap.Push(&reqs, r) + case chWrite <- next: + } + } +} + +func (s *Session) sendLoop() { + var buf []byte + var n int + var err error + var vec [][]byte // vector for writeBuffers + + bw, ok := s.conn.(buffersWriter) + if ok { + buf = make([]byte, headerSize) + vec = make([][]byte, 2) + } else { + buf = make([]byte, (1<<16)+headerSize) + } + + for { + select { + case <-s.die: + return + case request := <-s.writes: + buf[0] = request.frame.ver + buf[1] = request.frame.cmd + binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data))) + binary.LittleEndian.PutUint32(buf[4:], request.frame.sid) + + if len(vec) > 0 { + vec[0] = buf[:headerSize] + vec[1] = request.frame.data + n, err = bw.WriteBuffers(vec) + } else { + copy(buf[headerSize:], request.frame.data) + n, err = s.conn.Write(buf[:headerSize+len(request.frame.data)]) + } + + n -= headerSize + if n < 0 { + n = 0 + } + + result := writeResult{ + n: n, + err: err, + } + + request.result <- result + close(request.result) + + // store conn error + if err != nil { + s.notifyWriteError(err) + return + } + } + } +} + +// writeFrame writes the frame to the underlying connection +// and returns the number of bytes written if successful +func (s *Session) writeFrame(f Frame) (n int, err error) { + return s.writeFrameInternal(f, nil, 0) +} + +// internal writeFrame version to support deadline used in keepalive +func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint64) (int, error) { + req := writeRequest{ + prio: prio, + frame: f, + result: make(chan writeResult, 1), + } + select { + case s.shaper <- req: + case <-s.die: + return 0, io.ErrClosedPipe + case <-s.chSocketWriteError: + return 0, s.socketWriteError.Load().(error) + case <-deadline: + return 0, ErrTimeout + } + + select { + case result := <-req.result: + return result.n, result.err + case <-s.die: + return 0, io.ErrClosedPipe + case <-s.chSocketWriteError: + return 0, s.socketWriteError.Load().(error) + case <-deadline: + return 0, ErrTimeout + } +} diff --git a/vendor/github.com/xtaci/smux/shaper.go b/vendor/github.com/xtaci/smux/shaper.go new file mode 100644 index 0000000..be03406 --- /dev/null +++ b/vendor/github.com/xtaci/smux/shaper.go @@ -0,0 +1,16 @@ +package smux + +type shaperHeap []writeRequest + +func (h shaperHeap) Len() int { return len(h) } +func (h shaperHeap) Less(i, j int) bool { return h[i].prio < h[j].prio } +func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) } + +func (h *shaperHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/vendor/github.com/xtaci/smux/smux.png b/vendor/github.com/xtaci/smux/smux.png Binary files differnew file mode 100644 index 0000000..26aba3b --- /dev/null +++ b/vendor/github.com/xtaci/smux/smux.png diff --git a/vendor/github.com/xtaci/smux/stream.go b/vendor/github.com/xtaci/smux/stream.go new file mode 100644 index 0000000..6c9499c --- /dev/null +++ b/vendor/github.com/xtaci/smux/stream.go @@ -0,0 +1,549 @@ +package smux + +import ( + "encoding/binary" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +// Stream implements net.Conn +type Stream struct { + id uint32 + sess *Session + + buffers [][]byte + heads [][]byte // slice heads kept for recycle + + bufferLock sync.Mutex + frameSize int + + // notify a read event + chReadEvent chan struct{} + + // flag the stream has closed + die chan struct{} + dieOnce sync.Once + + // FIN command + chFinEvent chan struct{} + finEventOnce sync.Once + + // deadlines + readDeadline atomic.Value + writeDeadline atomic.Value + + // per stream sliding window control + numRead uint32 // number of consumed bytes + numWritten uint32 // count num of bytes written + incr uint32 // counting for sending + + // UPD command + peerConsumed uint32 // num of bytes the peer has consumed + peerWindow uint32 // peer window, initialized to 256KB, updated by peer + chUpdate chan struct{} // notify of remote data consuming and window update +} + +// newStream initiates a Stream struct +func newStream(id uint32, frameSize int, sess *Session) *Stream { + s := new(Stream) + s.id = id + s.chReadEvent = make(chan struct{}, 1) + s.chUpdate = make(chan struct{}, 1) + s.frameSize = frameSize + s.sess = sess + s.die = make(chan struct{}) + s.chFinEvent = make(chan struct{}) + s.peerWindow = initialPeerWindow // set to initial window size + return s +} + +// ID returns the unique stream ID. +func (s *Stream) ID() uint32 { + return s.id +} + +// Read implements net.Conn +func (s *Stream) Read(b []byte) (n int, err error) { + for { + n, err = s.tryRead(b) + if err == ErrWouldBlock { + if ew := s.waitRead(); ew != nil { + return 0, ew + } + } else { + return n, err + } + } +} + +// tryRead is the nonblocking version of Read +func (s *Stream) tryRead(b []byte) (n int, err error) { + if s.sess.config.Version == 2 { + return s.tryReadv2(b) + } + + if len(b) == 0 { + return 0, nil + } + + s.bufferLock.Lock() + if len(s.buffers) > 0 { + n = copy(b, s.buffers[0]) + s.buffers[0] = s.buffers[0][n:] + if len(s.buffers[0]) == 0 { + s.buffers[0] = nil + s.buffers = s.buffers[1:] + // full recycle + defaultAllocator.Put(s.heads[0]) + s.heads = s.heads[1:] + } + } + s.bufferLock.Unlock() + + if n > 0 { + s.sess.returnTokens(n) + return n, nil + } + + select { + case <-s.die: + return 0, io.EOF + default: + return 0, ErrWouldBlock + } +} + +func (s *Stream) tryReadv2(b []byte) (n int, err error) { + if len(b) == 0 { + return 0, nil + } + + var notifyConsumed uint32 + s.bufferLock.Lock() + if len(s.buffers) > 0 { + n = copy(b, s.buffers[0]) + s.buffers[0] = s.buffers[0][n:] + if len(s.buffers[0]) == 0 { + s.buffers[0] = nil + s.buffers = s.buffers[1:] + // full recycle + defaultAllocator.Put(s.heads[0]) + s.heads = s.heads[1:] + } + } + + // in an ideal environment: + // if more than half of buffer has consumed, send read ack to peer + // based on round-trip time of ACK, continous flowing data + // won't slow down because of waiting for ACK, as long as the + // consumer keeps on reading data + // s.numRead == n also notify window at the first read + s.numRead += uint32(n) + s.incr += uint32(n) + if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(n) { + notifyConsumed = s.numRead + s.incr = 0 + } + s.bufferLock.Unlock() + + if n > 0 { + s.sess.returnTokens(n) + if notifyConsumed > 0 { + err := s.sendWindowUpdate(notifyConsumed) + return n, err + } else { + return n, nil + } + } + + select { + case <-s.die: + return 0, io.EOF + default: + return 0, ErrWouldBlock + } +} + +// WriteTo implements io.WriteTo +func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { + if s.sess.config.Version == 2 { + return s.writeTov2(w) + } + + for { + var buf []byte + s.bufferLock.Lock() + if len(s.buffers) > 0 { + buf = s.buffers[0] + s.buffers = s.buffers[1:] + s.heads = s.heads[1:] + } + s.bufferLock.Unlock() + + if buf != nil { + nw, ew := w.Write(buf) + s.sess.returnTokens(len(buf)) + defaultAllocator.Put(buf) + if nw > 0 { + n += int64(nw) + } + + if ew != nil { + return n, ew + } + } else if ew := s.waitRead(); ew != nil { + return n, ew + } + } +} + +func (s *Stream) writeTov2(w io.Writer) (n int64, err error) { + for { + var notifyConsumed uint32 + var buf []byte + s.bufferLock.Lock() + if len(s.buffers) > 0 { + buf = s.buffers[0] + s.buffers = s.buffers[1:] + s.heads = s.heads[1:] + } + s.numRead += uint32(len(buf)) + s.incr += uint32(len(buf)) + if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(len(buf)) { + notifyConsumed = s.numRead + s.incr = 0 + } + s.bufferLock.Unlock() + + if buf != nil { + nw, ew := w.Write(buf) + s.sess.returnTokens(len(buf)) + defaultAllocator.Put(buf) + if nw > 0 { + n += int64(nw) + } + + if ew != nil { + return n, ew + } + + if notifyConsumed > 0 { + if err := s.sendWindowUpdate(notifyConsumed); err != nil { + return n, err + } + } + } else if ew := s.waitRead(); ew != nil { + return n, ew + } + } +} + +func (s *Stream) sendWindowUpdate(consumed uint32) error { + var timer *time.Timer + var deadline <-chan time.Time + if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { + timer = time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + frame := newFrame(byte(s.sess.config.Version), cmdUPD, s.id) + var hdr updHeader + binary.LittleEndian.PutUint32(hdr[:], consumed) + binary.LittleEndian.PutUint32(hdr[4:], uint32(s.sess.config.MaxStreamBuffer)) + frame.data = hdr[:] + _, err := s.sess.writeFrameInternal(frame, deadline, 0) + return err +} + +func (s *Stream) waitRead() error { + var timer *time.Timer + var deadline <-chan time.Time + if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { + timer = time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + select { + case <-s.chReadEvent: + return nil + case <-s.chFinEvent: + // BUG(xtaci): Fix for https://github.com/xtaci/smux/issues/82 + s.bufferLock.Lock() + defer s.bufferLock.Unlock() + if len(s.buffers) > 0 { + return nil + } + return io.EOF + case <-s.sess.chSocketReadError: + return s.sess.socketReadError.Load().(error) + case <-s.sess.chProtoError: + return s.sess.protoError.Load().(error) + case <-deadline: + return ErrTimeout + case <-s.die: + return io.ErrClosedPipe + } + +} + +// Write implements net.Conn +// +// Note that the behavior when multiple goroutines write concurrently is not deterministic, +// frames may interleave in random way. +func (s *Stream) Write(b []byte) (n int, err error) { + if s.sess.config.Version == 2 { + return s.writeV2(b) + } + + var deadline <-chan time.Time + if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + // check if stream has closed + select { + case <-s.die: + return 0, io.ErrClosedPipe + default: + } + + // frame split and transmit + sent := 0 + frame := newFrame(byte(s.sess.config.Version), cmdPSH, s.id) + bts := b + for len(bts) > 0 { + sz := len(bts) + if sz > s.frameSize { + sz = s.frameSize + } + frame.data = bts[:sz] + bts = bts[sz:] + n, err := s.sess.writeFrameInternal(frame, deadline, uint64(s.numWritten)) + s.numWritten++ + sent += n + if err != nil { + return sent, err + } + } + + return sent, nil +} + +func (s *Stream) writeV2(b []byte) (n int, err error) { + // check empty input + if len(b) == 0 { + return 0, nil + } + + // check if stream has closed + select { + case <-s.die: + return 0, io.ErrClosedPipe + default: + } + + // create write deadline timer + var deadline <-chan time.Time + if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + // frame split and transmit process + sent := 0 + frame := newFrame(byte(s.sess.config.Version), cmdPSH, s.id) + + for { + // per stream sliding window control + // [.... [consumed... numWritten] ... win... ] + // [.... [consumed...................+rmtwnd]] + var bts []byte + // note: + // even if uint32 overflow, this math still works: + // eg1: uint32(0) - uint32(math.MaxUint32) = 1 + // eg2: int32(uint32(0) - uint32(1)) = -1 + // security check for misbehavior + inflight := int32(atomic.LoadUint32(&s.numWritten) - atomic.LoadUint32(&s.peerConsumed)) + if inflight < 0 { + return 0, ErrConsumed + } + + win := int32(atomic.LoadUint32(&s.peerWindow)) - inflight + if win > 0 { + if win > int32(len(b)) { + bts = b + b = nil + } else { + bts = b[:win] + b = b[win:] + } + + for len(bts) > 0 { + sz := len(bts) + if sz > s.frameSize { + sz = s.frameSize + } + frame.data = bts[:sz] + bts = bts[sz:] + n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten))) + atomic.AddUint32(&s.numWritten, uint32(sz)) + sent += n + if err != nil { + return sent, err + } + } + } + + // if there is any data remaining to be sent + // wait until stream closes, window changes or deadline reached + // this blocking behavior will inform upper layer to do flow control + if len(b) > 0 { + select { + case <-s.chFinEvent: // if fin arrived, future window update is impossible + return 0, io.EOF + case <-s.die: + return sent, io.ErrClosedPipe + case <-deadline: + return sent, ErrTimeout + case <-s.sess.chSocketWriteError: + return sent, s.sess.socketWriteError.Load().(error) + case <-s.chUpdate: + continue + } + } else { + return sent, nil + } + } +} + +// Close implements net.Conn +func (s *Stream) Close() error { + var once bool + var err error + s.dieOnce.Do(func() { + close(s.die) + once = true + }) + + if once { + _, err = s.sess.writeFrame(newFrame(byte(s.sess.config.Version), cmdFIN, s.id)) + s.sess.streamClosed(s.id) + return err + } else { + return io.ErrClosedPipe + } +} + +// GetDieCh returns a readonly chan which can be readable +// when the stream is to be closed. +func (s *Stream) GetDieCh() <-chan struct{} { + return s.die +} + +// SetReadDeadline sets the read deadline as defined by +// net.Conn.SetReadDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetReadDeadline(t time.Time) error { + s.readDeadline.Store(t) + s.notifyReadEvent() + return nil +} + +// SetWriteDeadline sets the write deadline as defined by +// net.Conn.SetWriteDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetWriteDeadline(t time.Time) error { + s.writeDeadline.Store(t) + return nil +} + +// SetDeadline sets both read and write deadlines as defined by +// net.Conn.SetDeadline. +// A zero time value disables the deadlines. +func (s *Stream) SetDeadline(t time.Time) error { + if err := s.SetReadDeadline(t); err != nil { + return err + } + if err := s.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +// session closes +func (s *Stream) sessionClose() { s.dieOnce.Do(func() { close(s.die) }) } + +// LocalAddr satisfies net.Conn interface +func (s *Stream) LocalAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + LocalAddr() net.Addr + }); ok { + return ts.LocalAddr() + } + return nil +} + +// RemoteAddr satisfies net.Conn interface +func (s *Stream) RemoteAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return ts.RemoteAddr() + } + return nil +} + +// pushBytes append buf to buffers +func (s *Stream) pushBytes(buf []byte) (written int, err error) { + s.bufferLock.Lock() + s.buffers = append(s.buffers, buf) + s.heads = append(s.heads, buf) + s.bufferLock.Unlock() + return +} + +// recycleTokens transform remaining bytes to tokens(will truncate buffer) +func (s *Stream) recycleTokens() (n int) { + s.bufferLock.Lock() + for k := range s.buffers { + n += len(s.buffers[k]) + defaultAllocator.Put(s.heads[k]) + } + s.buffers = nil + s.heads = nil + s.bufferLock.Unlock() + return +} + +// notify read event +func (s *Stream) notifyReadEvent() { + select { + case s.chReadEvent <- struct{}{}: + default: + } +} + +// update command +func (s *Stream) update(consumed uint32, window uint32) { + atomic.StoreUint32(&s.peerConsumed, consumed) + atomic.StoreUint32(&s.peerWindow, window) + select { + case s.chUpdate <- struct{}{}: + default: + } +} + +// mark this stream has been closed in protocol +func (s *Stream) fin() { + s.finEventOnce.Do(func() { + close(s.chFinEvent) + }) +} |