diff options
Diffstat (limited to 'vendor/github.com/xtaci')
41 files changed, 5834 insertions, 0 deletions
diff --git a/vendor/github.com/xtaci/kcp-go/v5/.gitignore b/vendor/github.com/xtaci/kcp-go/v5/.gitignore new file mode 100644 index 0000000..2f4178c --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/.gitignore @@ -0,0 +1,25 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test +/vendor/ + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/xtaci/kcp-go/v5/.travis.yml b/vendor/github.com/xtaci/kcp-go/v5/.travis.yml new file mode 100644 index 0000000..0e7c4ac --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/.travis.yml @@ -0,0 +1,20 @@ +language: go +go: + - 1.11.x + - 1.12.x + - 1.13.x + +env: + - GO111MODULE=on + +before_install: + - go get -t -v ./... + +install: + - go get github.com/xtaci/kcp-go + +script: + - go test -coverprofile=coverage.txt -covermode=atomic -bench . -timeout 10m + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/xtaci/kcp-go/v5/LICENSE b/vendor/github.com/xtaci/kcp-go/v5/LICENSE new file mode 100644 index 0000000..8294d13 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Daniel Fu + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/xtaci/kcp-go/v5/README.md b/vendor/github.com/xtaci/kcp-go/v5/README.md new file mode 100644 index 0000000..d2273f8 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/README.md @@ -0,0 +1,285 @@ +<img src="kcp-go.png" alt="kcp-go" height="50px" /> + + +[![GoDoc][1]][2] [![Powered][9]][10] [![MIT licensed][11]][12] [![Build Status][3]][4] [![Go Report Card][5]][6] [![Coverage Statusd][7]][8] [![Sourcegraph][13]][14] + +[1]: https://godoc.org/github.com/xtaci/kcp-go?status.svg +[2]: https://pkg.go.dev/github.com/xtaci/kcp-go +[3]: https://travis-ci.org/xtaci/kcp-go.svg?branch=master +[4]: https://travis-ci.org/xtaci/kcp-go +[5]: https://goreportcard.com/badge/github.com/xtaci/kcp-go +[6]: https://goreportcard.com/report/github.com/xtaci/kcp-go +[7]: https://codecov.io/gh/xtaci/kcp-go/branch/master/graph/badge.svg +[8]: https://codecov.io/gh/xtaci/kcp-go +[9]: https://img.shields.io/badge/KCP-Powered-blue.svg +[10]: https://github.com/skywind3000/kcp +[11]: https://img.shields.io/badge/license-MIT-blue.svg +[12]: LICENSE +[13]: https://sourcegraph.com/github.com/xtaci/kcp-go/-/badge.svg +[14]: https://sourcegraph.com/github.com/xtaci/kcp-go?badge + +## Introduction + +**kcp-go** is a **Production-Grade Reliable-UDP** library for [golang](https://golang.org/). + +This library intents to provide a **smooth, resilient, ordered, error-checked and anonymous** delivery of streams over **UDP** packets, it has been battle-tested with opensource project [kcptun](https://github.com/xtaci/kcptun). Millions of devices(from low-end MIPS routers to high-end servers) have deployed **kcp-go** powered program in a variety of forms like **online games, live broadcasting, file synchronization and network acceleration**. + +[Lastest Release](https://github.com/xtaci/kcp-go/releases) + +## Features + +1. Designed for **Latency-sensitive** scenarios. +1. **Cache friendly** and **Memory optimized** design, offers extremely **High Performance** core. +1. Handles **>5K concurrent connections** on a single commodity server. +1. Compatible with [net.Conn](https://golang.org/pkg/net/#Conn) and [net.Listener](https://golang.org/pkg/net/#Listener), a drop-in replacement for [net.TCPConn](https://golang.org/pkg/net/#TCPConn). +1. [FEC(Forward Error Correction)](https://en.wikipedia.org/wiki/Forward_error_correction) Support with [Reed-Solomon Codes](https://en.wikipedia.org/wiki/Reed%E2%80%93Solomon_error_correction) +1. Packet level encryption support with [AES](https://en.wikipedia.org/wiki/Advanced_Encryption_Standard), [TEA](https://en.wikipedia.org/wiki/Tiny_Encryption_Algorithm), [3DES](https://en.wikipedia.org/wiki/Triple_DES), [Blowfish](https://en.wikipedia.org/wiki/Blowfish_(cipher)), [Cast5](https://en.wikipedia.org/wiki/CAST-128), [Salsa20]( https://en.wikipedia.org/wiki/Salsa20), etc. in [CFB](https://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Cipher_Feedback_.28CFB.29) mode, which generates completely anonymous packet. +1. Only **A fixed number of goroutines** will be created for the entire server application, costs in **context switch** between goroutines have been taken into consideration. +1. Compatible with [skywind3000's](https://github.com/skywind3000) C version with various improvements. +1. Platform-dependent optimizations: [sendmmsg](http://man7.org/linux/man-pages/man2/sendmmsg.2.html) and [recvmmsg](http://man7.org/linux/man-pages/man2/recvmmsg.2.html) were expoloited for linux. + +## Documentation + +For complete documentation, see the associated [Godoc](https://godoc.org/github.com/xtaci/kcp-go). + +## Specification + +<img src="frame.png" alt="Frame Format" height="109px" /> + +``` +NONCE: + 16bytes cryptographically secure random number, nonce changes for every packet. + +CRC32: + CRC-32 checksum of data using the IEEE polynomial + +FEC TYPE: + typeData = 0xF1 + typeParity = 0xF2 + +FEC SEQID: + monotonically increasing in range: [0, (0xffffffff/shardSize) * shardSize - 1] + +SIZE: + The size of KCP frame plus 2 +``` + +``` ++-----------------+ +| SESSION | ++-----------------+ +| KCP(ARQ) | ++-----------------+ +| FEC(OPTIONAL) | ++-----------------+ +| CRYPTO(OPTIONAL)| ++-----------------+ +| UDP(PACKET) | ++-----------------+ +| IP | ++-----------------+ +| LINK | ++-----------------+ +| PHY | ++-----------------+ +(LAYER MODEL OF KCP-GO) +``` + + +## Examples + +1. [simple examples](https://github.com/xtaci/kcp-go/tree/master/examples) +2. [kcptun client](https://github.com/xtaci/kcptun/blob/master/client/main.go) +3. [kcptun server](https://github.com/xtaci/kcptun/blob/master/server/main.go) + +## Benchmark +``` +=== +Model Name: MacBook Pro +Model Identifier: MacBookPro14,1 +Processor Name: Intel Core i5 +Processor Speed: 3.1 GHz +Number of Processors: 1 +Total Number of Cores: 2 +L2 Cache (per Core): 256 KB +L3 Cache: 4 MB +Memory: 8 GB +=== + +$ go test -v -run=^$ -bench . +beginning tests, encryption:salsa20, fec:10/3 +goos: darwin +goarch: amd64 +pkg: github.com/xtaci/kcp-go +BenchmarkSM4-4 50000 32180 ns/op 93.23 MB/s 0 B/op 0 allocs/op +BenchmarkAES128-4 500000 3285 ns/op 913.21 MB/s 0 B/op 0 allocs/op +BenchmarkAES192-4 300000 3623 ns/op 827.85 MB/s 0 B/op 0 allocs/op +BenchmarkAES256-4 300000 3874 ns/op 774.20 MB/s 0 B/op 0 allocs/op +BenchmarkTEA-4 100000 15384 ns/op 195.00 MB/s 0 B/op 0 allocs/op +BenchmarkXOR-4 20000000 89.9 ns/op 33372.00 MB/s 0 B/op 0 allocs/op +BenchmarkBlowfish-4 50000 26927 ns/op 111.41 MB/s 0 B/op 0 allocs/op +BenchmarkNone-4 30000000 45.7 ns/op 65597.94 MB/s 0 B/op 0 allocs/op +BenchmarkCast5-4 50000 34258 ns/op 87.57 MB/s 0 B/op 0 allocs/op +Benchmark3DES-4 10000 117149 ns/op 25.61 MB/s 0 B/op 0 allocs/op +BenchmarkTwofish-4 50000 33538 ns/op 89.45 MB/s 0 B/op 0 allocs/op +BenchmarkXTEA-4 30000 45666 ns/op 65.69 MB/s 0 B/op 0 allocs/op +BenchmarkSalsa20-4 500000 3308 ns/op 906.76 MB/s 0 B/op 0 allocs/op +BenchmarkCRC32-4 20000000 65.2 ns/op 15712.43 MB/s +BenchmarkCsprngSystem-4 1000000 1150 ns/op 13.91 MB/s +BenchmarkCsprngMD5-4 10000000 145 ns/op 110.26 MB/s +BenchmarkCsprngSHA1-4 10000000 158 ns/op 126.54 MB/s +BenchmarkCsprngNonceMD5-4 10000000 153 ns/op 104.22 MB/s +BenchmarkCsprngNonceAES128-4 100000000 19.1 ns/op 837.81 MB/s +BenchmarkFECDecode-4 1000000 1119 ns/op 1339.61 MB/s 1606 B/op 2 allocs/op +BenchmarkFECEncode-4 2000000 832 ns/op 1801.83 MB/s 17 B/op 0 allocs/op +BenchmarkFlush-4 5000000 272 ns/op 0 B/op 0 allocs/op +BenchmarkEchoSpeed4K-4 5000 259617 ns/op 15.78 MB/s 5451 B/op 149 allocs/op +BenchmarkEchoSpeed64K-4 1000 1706084 ns/op 38.41 MB/s 56002 B/op 1604 allocs/op +BenchmarkEchoSpeed512K-4 100 14345505 ns/op 36.55 MB/s 482597 B/op 13045 allocs/op +BenchmarkEchoSpeed1M-4 30 34859104 ns/op 30.08 MB/s 1143773 B/op 27186 allocs/op +BenchmarkSinkSpeed4K-4 50000 31369 ns/op 130.57 MB/s 1566 B/op 30 allocs/op +BenchmarkSinkSpeed64K-4 5000 329065 ns/op 199.16 MB/s 21529 B/op 453 allocs/op +BenchmarkSinkSpeed256K-4 500 2373354 ns/op 220.91 MB/s 166332 B/op 3554 allocs/op +BenchmarkSinkSpeed1M-4 300 5117927 ns/op 204.88 MB/s 310378 B/op 6988 allocs/op +PASS +ok github.com/xtaci/kcp-go 50.349s +``` + +``` +=== Raspberry Pi 4 === + +➜ kcp-go git:(master) cat /proc/cpuinfo +processor : 0 +model name : ARMv7 Processor rev 3 (v7l) +BogoMIPS : 108.00 +Features : half thumb fastmult vfp edsp neon vfpv3 tls vfpv4 idiva idivt vfpd32 lpae evtstrm crc32 +CPU implementer : 0x41 +CPU architecture: 7 +CPU variant : 0x0 +CPU part : 0xd08 +CPU revision : 3 + +➜ kcp-go git:(master) go test -run=^$ -bench . +2020/01/05 19:25:13 beginning tests, encryption:salsa20, fec:10/3 +goos: linux +goarch: arm +pkg: github.com/xtaci/kcp-go/v5 +BenchmarkSM4-4 20000 86475 ns/op 34.69 MB/s 0 B/op 0 allocs/op +BenchmarkAES128-4 20000 62254 ns/op 48.19 MB/s 0 B/op 0 allocs/op +BenchmarkAES192-4 20000 71802 ns/op 41.78 MB/s 0 B/op 0 allocs/op +BenchmarkAES256-4 20000 80570 ns/op 37.23 MB/s 0 B/op 0 allocs/op +BenchmarkTEA-4 50000 37343 ns/op 80.34 MB/s 0 B/op 0 allocs/op +BenchmarkXOR-4 100000 22266 ns/op 134.73 MB/s 0 B/op 0 allocs/op +BenchmarkBlowfish-4 20000 66123 ns/op 45.37 MB/s 0 B/op 0 allocs/op +BenchmarkNone-4 3000000 518 ns/op 5786.77 MB/s 0 B/op 0 allocs/op +BenchmarkCast5-4 20000 76705 ns/op 39.11 MB/s 0 B/op 0 allocs/op +Benchmark3DES-4 5000 418868 ns/op 7.16 MB/s 0 B/op 0 allocs/op +BenchmarkTwofish-4 5000 326896 ns/op 9.18 MB/s 0 B/op 0 allocs/op +BenchmarkXTEA-4 10000 114418 ns/op 26.22 MB/s 0 B/op 0 allocs/op +BenchmarkSalsa20-4 50000 36736 ns/op 81.66 MB/s 0 B/op 0 allocs/op +BenchmarkCRC32-4 1000000 1735 ns/op 589.98 MB/s +BenchmarkCsprngSystem-4 1000000 2179 ns/op 7.34 MB/s +BenchmarkCsprngMD5-4 2000000 811 ns/op 19.71 MB/s +BenchmarkCsprngSHA1-4 2000000 862 ns/op 23.19 MB/s +BenchmarkCsprngNonceMD5-4 2000000 878 ns/op 18.22 MB/s +BenchmarkCsprngNonceAES128-4 5000000 326 ns/op 48.97 MB/s +BenchmarkFECDecode-4 200000 9081 ns/op 165.16 MB/s 140 B/op 1 allocs/op +BenchmarkFECEncode-4 100000 12039 ns/op 124.59 MB/s 11 B/op 0 allocs/op +BenchmarkFlush-4 100000 21704 ns/op 0 B/op 0 allocs/op +BenchmarkEchoSpeed4K-4 2000 981182 ns/op 4.17 MB/s 12384 B/op 424 allocs/op +BenchmarkEchoSpeed64K-4 100 10503324 ns/op 6.24 MB/s 123616 B/op 3779 allocs/op +BenchmarkEchoSpeed512K-4 20 138633802 ns/op 3.78 MB/s 1606584 B/op 29233 allocs/op +BenchmarkEchoSpeed1M-4 5 372903568 ns/op 2.81 MB/s 4080504 B/op 63600 allocs/op +BenchmarkSinkSpeed4K-4 10000 121239 ns/op 33.78 MB/s 4647 B/op 104 allocs/op +BenchmarkSinkSpeed64K-4 1000 1587906 ns/op 41.27 MB/s 50914 B/op 1115 allocs/op +BenchmarkSinkSpeed256K-4 100 16277830 ns/op 32.21 MB/s 453027 B/op 9296 allocs/op +BenchmarkSinkSpeed1M-4 100 31040703 ns/op 33.78 MB/s 898097 B/op 18932 allocs/op +PASS +ok github.com/xtaci/kcp-go/v5 64.151s +``` + + +## Typical Flame Graph +![Flame Graph in kcptun](flame.png) + +## Key Design Considerations + +1. slice vs. container/list + +`kcp.flush()` loops through the send queue for retransmission checking for every 20ms(interval). + +I've wrote a benchmark for comparing sequential loop through *slice* and *container/list* here: + +https://github.com/xtaci/notes/blob/master/golang/benchmark2/cachemiss_test.go + +``` +BenchmarkLoopSlice-4 2000000000 0.39 ns/op +BenchmarkLoopList-4 100000000 54.6 ns/op +``` + +List structure introduces **heavy cache misses** compared to slice which owns better **locality**, 5000 connections with 32 window size and 20ms interval will cost 6us/0.03%(cpu) using slice, and 8.7ms/43.5%(cpu) for list for each `kcp.flush()`. + +2. Timing accuracy vs. syscall clock_gettime + +Timing is **critical** to **RTT estimator**, inaccurate timing leads to false retransmissions in KCP, but calling `time.Now()` costs 42 cycles(10.5ns on 4GHz CPU, 15.6ns on my MacBook Pro 2.7GHz). + +The benchmark for time.Now() lies here: + +https://github.com/xtaci/notes/blob/master/golang/benchmark2/syscall_test.go + +``` +BenchmarkNow-4 100000000 15.6 ns/op +``` + +In kcp-go, after each `kcp.output()` function call, current clock time will be updated upon return, and for a single `kcp.flush()` operation, current time will be queried from system once. For most of the time, 5000 connections costs 5000 * 15.6ns = 78us(a fixed cost while no packet needs to be sent), as for 10MB/s data transfering with 1400 MTU, `kcp.output()` will be called around 7500 times and costs 117us for `time.Now()` in **every second**. + +3. Memory management + +Primary memory allocation are done from a global buffer pool xmit.Buf, in kcp-go, when we need to allocate some bytes, we can get from that pool, and a fixed-capacity 1500 bytes(mtuLimit) will be returned, the rx queue, tx queue and fec queue all receive bytes from there, and they will return the bytes to the pool after using to prevent unnecessary zer0ing of bytes. The pool mechanism maintained a high watermark for slice objects, these in-flight objects from the pool will survive from the perodical garbage collection, meanwhile the pool kept the ability to return the memory to runtime if in idle. + +4. Information security + +kcp-go is shipped with builtin packet encryption powered by various block encryption algorithms and works in [Cipher Feedback Mode](https://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Cipher_Feedback_(CFB)), for each packet to be sent, the encryption process will start from encrypting a [nonce](https://en.wikipedia.org/wiki/Cryptographic_nonce) from the [system entropy](https://en.wikipedia.org/wiki//dev/random), so encryption to same plaintexts never leads to a same ciphertexts thereafter. + +The contents of the packets are completely anonymous with encryption, including the headers(FEC,KCP), checksums and contents. Note that, no matter which encryption method you choose on you upper layer, if you disable encryption, the transmit will be insecure somehow, since the header is ***PLAINTEXT*** to everyone it would be susceptible to header tampering, such as jamming the *sliding window size*, *round-trip time*, *FEC property* and *checksums*. ```AES-128``` is suggested for minimal encryption since modern CPUs are shipped with [AES-NI](https://en.wikipedia.org/wiki/AES_instruction_set) instructions and performs even better than `salsa20`(check the table above). + +Other possible attacks to kcp-go includes: a) [traffic analysis](https://en.wikipedia.org/wiki/Traffic_analysis), dataflow on specific websites may have pattern while interchanging data, but this type of eavesdropping has been mitigated by adapting [smux](https://github.com/xtaci/smux) to mix data streams so as to introduce noises, perfect solution to this has not appeared yet, theroretically by shuffling/mixing messages on larger scale network may mitigate this problem. b) [replay attack](https://en.wikipedia.org/wiki/Replay_attack), since the asymmetrical encryption has not been introduced into kcp-go for some reason, capturing the packets and replay them on a different machine is possible, (notice: hijacking the session and decrypting the contents is still *impossible*), so upper layers should contain a asymmetrical encryption system to guarantee the authenticity of each message(to process message exactly once), such as HTTPS/OpenSSL/LibreSSL, only by signing the requests with private keys can eliminate this type of attack. + +## Connection Termination + +Control messages like **SYN/FIN/RST** in TCP **are not defined** in KCP, you need some **keepalive/heartbeat mechanism** in the application-level. A real world example is to use some **multiplexing** protocol over session, such as [smux](https://github.com/xtaci/smux)(with embedded keepalive mechanism), see [kcptun](https://github.com/xtaci/kcptun) for example. + +## FAQ + +Q: I'm handling >5K connections on my server, the CPU utilization is so high. + +A: A standalone `agent` or `gate` server for running kcp-go is suggested, not only for CPU utilization, but also important to the **precision** of RTT measurements(timing) which indirectly affects retransmission. By increasing update `interval` with `SetNoDelay` like `conn.SetNoDelay(1, 40, 1, 1)` will dramatically reduce system load, but lower the performance. + +Q: When should I enable FEC? + +A: Forward error correction is critical to long-distance transmission, because a packet loss will lead to a huge penalty in time. And for the complicated packet routing network in modern world, round-trip time based loss check will not always be efficient, the big deviation of RTT samples in the long way usually leads to a larger RTO value in typical rtt estimator, which in other words, slows down the transmission. + +Q: Should I enable encryption? + +A: Yes, for the safety of protocol, even if the upper layer has encrypted. + +## Who is using this? + +1. https://github.com/xtaci/kcptun -- A Secure Tunnel Based On KCP over UDP. +2. https://github.com/getlantern/lantern -- Lantern delivers fast access to the open Internet. +3. https://github.com/smallnest/rpcx -- A RPC service framework based on net/rpc like alibaba Dubbo and weibo Motan. +4. https://github.com/gonet2/agent -- A gateway for games with stream multiplexing. +5. https://github.com/syncthing/syncthing -- Open Source Continuous File Synchronization. + +## Links + +1. https://github.com/xtaci/smux/ -- A Stream Multiplexing Library for golang with least memory +1. https://github.com/xtaci/libkcp -- FEC enhanced KCP session library for iOS/Android in C++ +1. https://github.com/skywind3000/kcp -- A Fast and Reliable ARQ Protocol +1. https://github.com/klauspost/reedsolomon -- Reed-Solomon Erasure Coding in Go + +## Consulting + +WeChat(付费技术咨询) + +<img src="wechat_donate.jpg" alt="kcptun" height="120px" /> diff --git a/vendor/github.com/xtaci/kcp-go/v5/autotune.go b/vendor/github.com/xtaci/kcp-go/v5/autotune.go new file mode 100644 index 0000000..1f85be3 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/autotune.go @@ -0,0 +1,64 @@ +package kcp + +const maxAutoTuneSamples = 258 + +// pulse represents a 0/1 signal with time sequence +type pulse struct { + bit bool // 0 or 1 + seq uint32 // sequence of the signal +} + +// autoTune object +type autoTune struct { + pulses [maxAutoTuneSamples]pulse +} + +// Sample adds a signal sample to the pulse buffer +func (tune *autoTune) Sample(bit bool, seq uint32) { + tune.pulses[seq%maxAutoTuneSamples] = pulse{bit, seq} +} + +// Find a period for a given signal +// returns -1 if not found +// +// --- ------ +// | | +// |______________| +// Period +// Falling Edge Rising Edge +func (tune *autoTune) FindPeriod(bit bool) int { + // last pulse and initial index setup + lastPulse := tune.pulses[0] + idx := 1 + + // left edge + var leftEdge int + for ; idx < len(tune.pulses); idx++ { + if lastPulse.bit != bit && tune.pulses[idx].bit == bit { // edge found + if lastPulse.seq+1 == tune.pulses[idx].seq { // ensure edge continuity + leftEdge = idx + break + } + } + lastPulse = tune.pulses[idx] + } + + // right edge + var rightEdge int + lastPulse = tune.pulses[leftEdge] + idx = leftEdge + 1 + + for ; idx < len(tune.pulses); idx++ { + if lastPulse.seq+1 == tune.pulses[idx].seq { // ensure pulses in this level monotonic + if lastPulse.bit == bit && tune.pulses[idx].bit != bit { // edge found + rightEdge = idx + break + } + } else { + return -1 + } + lastPulse = tune.pulses[idx] + } + + return rightEdge - leftEdge +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/batchconn.go b/vendor/github.com/xtaci/kcp-go/v5/batchconn.go new file mode 100644 index 0000000..6c30701 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/batchconn.go @@ -0,0 +1,12 @@ +package kcp + +import "golang.org/x/net/ipv4" + +const ( + batchSize = 16 +) + +type batchConn interface { + WriteBatch(ms []ipv4.Message, flags int) (int, error) + ReadBatch(ms []ipv4.Message, flags int) (int, error) +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/crypt.go b/vendor/github.com/xtaci/kcp-go/v5/crypt.go new file mode 100644 index 0000000..d882852 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/crypt.go @@ -0,0 +1,618 @@ +package kcp + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/des" + "crypto/sha1" + "unsafe" + + xor "github.com/templexxx/xorsimd" + "github.com/tjfoc/gmsm/sm4" + + "golang.org/x/crypto/blowfish" + "golang.org/x/crypto/cast5" + "golang.org/x/crypto/pbkdf2" + "golang.org/x/crypto/salsa20" + "golang.org/x/crypto/tea" + "golang.org/x/crypto/twofish" + "golang.org/x/crypto/xtea" +) + +var ( + initialVector = []byte{167, 115, 79, 156, 18, 172, 27, 1, 164, 21, 242, 193, 252, 120, 230, 107} + saltxor = `sH3CIVoF#rWLtJo6` +) + +// BlockCrypt defines encryption/decryption methods for a given byte slice. +// Notes on implementing: the data to be encrypted contains a builtin +// nonce at the first 16 bytes +type BlockCrypt interface { + // Encrypt encrypts the whole block in src into dst. + // Dst and src may point at the same memory. + Encrypt(dst, src []byte) + + // Decrypt decrypts the whole block in src into dst. + // Dst and src may point at the same memory. + Decrypt(dst, src []byte) +} + +type salsa20BlockCrypt struct { + key [32]byte +} + +// NewSalsa20BlockCrypt https://en.wikipedia.org/wiki/Salsa20 +func NewSalsa20BlockCrypt(key []byte) (BlockCrypt, error) { + c := new(salsa20BlockCrypt) + copy(c.key[:], key) + return c, nil +} + +func (c *salsa20BlockCrypt) Encrypt(dst, src []byte) { + salsa20.XORKeyStream(dst[8:], src[8:], src[:8], &c.key) + copy(dst[:8], src[:8]) +} +func (c *salsa20BlockCrypt) Decrypt(dst, src []byte) { + salsa20.XORKeyStream(dst[8:], src[8:], src[:8], &c.key) + copy(dst[:8], src[:8]) +} + +type sm4BlockCrypt struct { + encbuf [sm4.BlockSize]byte // 64bit alignment enc/dec buffer + decbuf [2 * sm4.BlockSize]byte + block cipher.Block +} + +// NewSM4BlockCrypt https://github.com/tjfoc/gmsm/tree/master/sm4 +func NewSM4BlockCrypt(key []byte) (BlockCrypt, error) { + c := new(sm4BlockCrypt) + block, err := sm4.NewCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *sm4BlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *sm4BlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type twofishBlockCrypt struct { + encbuf [twofish.BlockSize]byte + decbuf [2 * twofish.BlockSize]byte + block cipher.Block +} + +// NewTwofishBlockCrypt https://en.wikipedia.org/wiki/Twofish +func NewTwofishBlockCrypt(key []byte) (BlockCrypt, error) { + c := new(twofishBlockCrypt) + block, err := twofish.NewCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *twofishBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *twofishBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type tripleDESBlockCrypt struct { + encbuf [des.BlockSize]byte + decbuf [2 * des.BlockSize]byte + block cipher.Block +} + +// NewTripleDESBlockCrypt https://en.wikipedia.org/wiki/Triple_DES +func NewTripleDESBlockCrypt(key []byte) (BlockCrypt, error) { + c := new(tripleDESBlockCrypt) + block, err := des.NewTripleDESCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *tripleDESBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *tripleDESBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type cast5BlockCrypt struct { + encbuf [cast5.BlockSize]byte + decbuf [2 * cast5.BlockSize]byte + block cipher.Block +} + +// NewCast5BlockCrypt https://en.wikipedia.org/wiki/CAST-128 +func NewCast5BlockCrypt(key []byte) (BlockCrypt, error) { + c := new(cast5BlockCrypt) + block, err := cast5.NewCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *cast5BlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *cast5BlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type blowfishBlockCrypt struct { + encbuf [blowfish.BlockSize]byte + decbuf [2 * blowfish.BlockSize]byte + block cipher.Block +} + +// NewBlowfishBlockCrypt https://en.wikipedia.org/wiki/Blowfish_(cipher) +func NewBlowfishBlockCrypt(key []byte) (BlockCrypt, error) { + c := new(blowfishBlockCrypt) + block, err := blowfish.NewCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *blowfishBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *blowfishBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type aesBlockCrypt struct { + encbuf [aes.BlockSize]byte + decbuf [2 * aes.BlockSize]byte + block cipher.Block +} + +// NewAESBlockCrypt https://en.wikipedia.org/wiki/Advanced_Encryption_Standard +func NewAESBlockCrypt(key []byte) (BlockCrypt, error) { + c := new(aesBlockCrypt) + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *aesBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *aesBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type teaBlockCrypt struct { + encbuf [tea.BlockSize]byte + decbuf [2 * tea.BlockSize]byte + block cipher.Block +} + +// NewTEABlockCrypt https://en.wikipedia.org/wiki/Tiny_Encryption_Algorithm +func NewTEABlockCrypt(key []byte) (BlockCrypt, error) { + c := new(teaBlockCrypt) + block, err := tea.NewCipherWithRounds(key, 16) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *teaBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *teaBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type xteaBlockCrypt struct { + encbuf [xtea.BlockSize]byte + decbuf [2 * xtea.BlockSize]byte + block cipher.Block +} + +// NewXTEABlockCrypt https://en.wikipedia.org/wiki/XTEA +func NewXTEABlockCrypt(key []byte) (BlockCrypt, error) { + c := new(xteaBlockCrypt) + block, err := xtea.NewCipher(key) + if err != nil { + return nil, err + } + c.block = block + return c, nil +} + +func (c *xteaBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf[:]) } +func (c *xteaBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf[:]) } + +type simpleXORBlockCrypt struct { + xortbl []byte +} + +// NewSimpleXORBlockCrypt simple xor with key expanding +func NewSimpleXORBlockCrypt(key []byte) (BlockCrypt, error) { + c := new(simpleXORBlockCrypt) + c.xortbl = pbkdf2.Key(key, []byte(saltxor), 32, mtuLimit, sha1.New) + return c, nil +} + +func (c *simpleXORBlockCrypt) Encrypt(dst, src []byte) { xor.Bytes(dst, src, c.xortbl) } +func (c *simpleXORBlockCrypt) Decrypt(dst, src []byte) { xor.Bytes(dst, src, c.xortbl) } + +type noneBlockCrypt struct{} + +// NewNoneBlockCrypt does nothing but copying +func NewNoneBlockCrypt(key []byte) (BlockCrypt, error) { + return new(noneBlockCrypt), nil +} + +func (c *noneBlockCrypt) Encrypt(dst, src []byte) { copy(dst, src) } +func (c *noneBlockCrypt) Decrypt(dst, src []byte) { copy(dst, src) } + +// packet encryption with local CFB mode +func encrypt(block cipher.Block, dst, src, buf []byte) { + switch block.BlockSize() { + case 8: + encrypt8(block, dst, src, buf) + case 16: + encrypt16(block, dst, src, buf) + default: + panic("unsupported cipher block size") + } +} + +// optimized encryption for the ciphers which works in 8-bytes +func encrypt8(block cipher.Block, dst, src, buf []byte) { + tbl := buf[:8] + block.Encrypt(tbl, initialVector) + n := len(src) / 8 + base := 0 + repeat := n / 8 + left := n % 8 + ptr_tbl := (*uint64)(unsafe.Pointer(&tbl[0])) + + for i := 0; i < repeat; i++ { + s := src[base:][0:64] + d := dst[base:][0:64] + // 1 + *(*uint64)(unsafe.Pointer(&d[0])) = *(*uint64)(unsafe.Pointer(&s[0])) ^ *ptr_tbl + block.Encrypt(tbl, d[0:8]) + // 2 + *(*uint64)(unsafe.Pointer(&d[8])) = *(*uint64)(unsafe.Pointer(&s[8])) ^ *ptr_tbl + block.Encrypt(tbl, d[8:16]) + // 3 + *(*uint64)(unsafe.Pointer(&d[16])) = *(*uint64)(unsafe.Pointer(&s[16])) ^ *ptr_tbl + block.Encrypt(tbl, d[16:24]) + // 4 + *(*uint64)(unsafe.Pointer(&d[24])) = *(*uint64)(unsafe.Pointer(&s[24])) ^ *ptr_tbl + block.Encrypt(tbl, d[24:32]) + // 5 + *(*uint64)(unsafe.Pointer(&d[32])) = *(*uint64)(unsafe.Pointer(&s[32])) ^ *ptr_tbl + block.Encrypt(tbl, d[32:40]) + // 6 + *(*uint64)(unsafe.Pointer(&d[40])) = *(*uint64)(unsafe.Pointer(&s[40])) ^ *ptr_tbl + block.Encrypt(tbl, d[40:48]) + // 7 + *(*uint64)(unsafe.Pointer(&d[48])) = *(*uint64)(unsafe.Pointer(&s[48])) ^ *ptr_tbl + block.Encrypt(tbl, d[48:56]) + // 8 + *(*uint64)(unsafe.Pointer(&d[56])) = *(*uint64)(unsafe.Pointer(&s[56])) ^ *ptr_tbl + block.Encrypt(tbl, d[56:64]) + base += 64 + } + + switch left { + case 7: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 6: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 5: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 4: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 3: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 2: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 1: + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *ptr_tbl + block.Encrypt(tbl, dst[base:]) + base += 8 + fallthrough + case 0: + xorBytes(dst[base:], src[base:], tbl) + } +} + +// optimized encryption for the ciphers which works in 16-bytes +func encrypt16(block cipher.Block, dst, src, buf []byte) { + tbl := buf[:16] + block.Encrypt(tbl, initialVector) + n := len(src) / 16 + base := 0 + repeat := n / 8 + left := n % 8 + for i := 0; i < repeat; i++ { + s := src[base:][0:128] + d := dst[base:][0:128] + // 1 + xor.Bytes16Align(d[0:16], s[0:16], tbl) + block.Encrypt(tbl, d[0:16]) + // 2 + xor.Bytes16Align(d[16:32], s[16:32], tbl) + block.Encrypt(tbl, d[16:32]) + // 3 + xor.Bytes16Align(d[32:48], s[32:48], tbl) + block.Encrypt(tbl, d[32:48]) + // 4 + xor.Bytes16Align(d[48:64], s[48:64], tbl) + block.Encrypt(tbl, d[48:64]) + // 5 + xor.Bytes16Align(d[64:80], s[64:80], tbl) + block.Encrypt(tbl, d[64:80]) + // 6 + xor.Bytes16Align(d[80:96], s[80:96], tbl) + block.Encrypt(tbl, d[80:96]) + // 7 + xor.Bytes16Align(d[96:112], s[96:112], tbl) + block.Encrypt(tbl, d[96:112]) + // 8 + xor.Bytes16Align(d[112:128], s[112:128], tbl) + block.Encrypt(tbl, d[112:128]) + base += 128 + } + + switch left { + case 7: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 6: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 5: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 4: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 3: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 2: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 1: + xor.Bytes16Align(dst[base:], src[base:], tbl) + block.Encrypt(tbl, dst[base:]) + base += 16 + fallthrough + case 0: + xorBytes(dst[base:], src[base:], tbl) + } +} + +// decryption +func decrypt(block cipher.Block, dst, src, buf []byte) { + switch block.BlockSize() { + case 8: + decrypt8(block, dst, src, buf) + case 16: + decrypt16(block, dst, src, buf) + default: + panic("unsupported cipher block size") + } +} + +// decrypt 8 bytes block, all byte slices are supposed to be 64bit aligned +func decrypt8(block cipher.Block, dst, src, buf []byte) { + tbl := buf[0:8] + next := buf[8:16] + block.Encrypt(tbl, initialVector) + n := len(src) / 8 + base := 0 + repeat := n / 8 + left := n % 8 + ptr_tbl := (*uint64)(unsafe.Pointer(&tbl[0])) + ptr_next := (*uint64)(unsafe.Pointer(&next[0])) + + for i := 0; i < repeat; i++ { + s := src[base:][0:64] + d := dst[base:][0:64] + // 1 + block.Encrypt(next, s[0:8]) + *(*uint64)(unsafe.Pointer(&d[0])) = *(*uint64)(unsafe.Pointer(&s[0])) ^ *ptr_tbl + // 2 + block.Encrypt(tbl, s[8:16]) + *(*uint64)(unsafe.Pointer(&d[8])) = *(*uint64)(unsafe.Pointer(&s[8])) ^ *ptr_next + // 3 + block.Encrypt(next, s[16:24]) + *(*uint64)(unsafe.Pointer(&d[16])) = *(*uint64)(unsafe.Pointer(&s[16])) ^ *ptr_tbl + // 4 + block.Encrypt(tbl, s[24:32]) + *(*uint64)(unsafe.Pointer(&d[24])) = *(*uint64)(unsafe.Pointer(&s[24])) ^ *ptr_next + // 5 + block.Encrypt(next, s[32:40]) + *(*uint64)(unsafe.Pointer(&d[32])) = *(*uint64)(unsafe.Pointer(&s[32])) ^ *ptr_tbl + // 6 + block.Encrypt(tbl, s[40:48]) + *(*uint64)(unsafe.Pointer(&d[40])) = *(*uint64)(unsafe.Pointer(&s[40])) ^ *ptr_next + // 7 + block.Encrypt(next, s[48:56]) + *(*uint64)(unsafe.Pointer(&d[48])) = *(*uint64)(unsafe.Pointer(&s[48])) ^ *ptr_tbl + // 8 + block.Encrypt(tbl, s[56:64]) + *(*uint64)(unsafe.Pointer(&d[56])) = *(*uint64)(unsafe.Pointer(&s[56])) ^ *ptr_next + base += 64 + } + + switch left { + case 7: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 6: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 5: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 4: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 3: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 2: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 1: + block.Encrypt(next, src[base:]) + *(*uint64)(unsafe.Pointer(&dst[base])) = *(*uint64)(unsafe.Pointer(&src[base])) ^ *(*uint64)(unsafe.Pointer(&tbl[0])) + tbl, next = next, tbl + base += 8 + fallthrough + case 0: + xorBytes(dst[base:], src[base:], tbl) + } +} + +func decrypt16(block cipher.Block, dst, src, buf []byte) { + tbl := buf[0:16] + next := buf[16:32] + block.Encrypt(tbl, initialVector) + n := len(src) / 16 + base := 0 + repeat := n / 8 + left := n % 8 + for i := 0; i < repeat; i++ { + s := src[base:][0:128] + d := dst[base:][0:128] + // 1 + block.Encrypt(next, s[0:16]) + xor.Bytes16Align(d[0:16], s[0:16], tbl) + // 2 + block.Encrypt(tbl, s[16:32]) + xor.Bytes16Align(d[16:32], s[16:32], next) + // 3 + block.Encrypt(next, s[32:48]) + xor.Bytes16Align(d[32:48], s[32:48], tbl) + // 4 + block.Encrypt(tbl, s[48:64]) + xor.Bytes16Align(d[48:64], s[48:64], next) + // 5 + block.Encrypt(next, s[64:80]) + xor.Bytes16Align(d[64:80], s[64:80], tbl) + // 6 + block.Encrypt(tbl, s[80:96]) + xor.Bytes16Align(d[80:96], s[80:96], next) + // 7 + block.Encrypt(next, s[96:112]) + xor.Bytes16Align(d[96:112], s[96:112], tbl) + // 8 + block.Encrypt(tbl, s[112:128]) + xor.Bytes16Align(d[112:128], s[112:128], next) + base += 128 + } + + switch left { + case 7: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 6: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 5: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 4: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 3: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 2: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 1: + block.Encrypt(next, src[base:]) + xor.Bytes16Align(dst[base:], src[base:], tbl) + tbl, next = next, tbl + base += 16 + fallthrough + case 0: + xorBytes(dst[base:], src[base:], tbl) + } +} + +// per bytes xors +func xorBytes(dst, a, b []byte) int { + n := len(a) + if len(b) < n { + n = len(b) + } + if n == 0 { + return 0 + } + + for i := 0; i < n; i++ { + dst[i] = a[i] ^ b[i] + } + return n +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/donate.png b/vendor/github.com/xtaci/kcp-go/v5/donate.png Binary files differnew file mode 100644 index 0000000..0f353d9 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/donate.png diff --git a/vendor/github.com/xtaci/kcp-go/v5/entropy.go b/vendor/github.com/xtaci/kcp-go/v5/entropy.go new file mode 100644 index 0000000..156c1cd --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/entropy.go @@ -0,0 +1,52 @@ +package kcp + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/md5" + "crypto/rand" + "io" +) + +// Entropy defines a entropy source +type Entropy interface { + Init() + Fill(nonce []byte) +} + +// nonceMD5 nonce generator for packet header +type nonceMD5 struct { + seed [md5.Size]byte +} + +func (n *nonceMD5) Init() { /*nothing required*/ } + +func (n *nonceMD5) Fill(nonce []byte) { + if n.seed[0] == 0 { // entropy update + io.ReadFull(rand.Reader, n.seed[:]) + } + n.seed = md5.Sum(n.seed[:]) + copy(nonce, n.seed[:]) +} + +// nonceAES128 nonce generator for packet headers +type nonceAES128 struct { + seed [aes.BlockSize]byte + block cipher.Block +} + +func (n *nonceAES128) Init() { + var key [16]byte //aes-128 + io.ReadFull(rand.Reader, key[:]) + io.ReadFull(rand.Reader, n.seed[:]) + block, _ := aes.NewCipher(key[:]) + n.block = block +} + +func (n *nonceAES128) Fill(nonce []byte) { + if n.seed[0] == 0 { // entropy update + io.ReadFull(rand.Reader, n.seed[:]) + } + n.block.Encrypt(n.seed[:], n.seed[:]) + copy(nonce, n.seed[:]) +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/fec.go b/vendor/github.com/xtaci/kcp-go/v5/fec.go new file mode 100644 index 0000000..0a203ee --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/fec.go @@ -0,0 +1,381 @@ +package kcp + +import ( + "encoding/binary" + "sync/atomic" + + "github.com/klauspost/reedsolomon" +) + +const ( + fecHeaderSize = 6 + fecHeaderSizePlus2 = fecHeaderSize + 2 // plus 2B data size + typeData = 0xf1 + typeParity = 0xf2 + fecExpire = 60000 + rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory +) + +// fecPacket is a decoded FEC packet +type fecPacket []byte + +func (bts fecPacket) seqid() uint32 { return binary.LittleEndian.Uint32(bts) } +func (bts fecPacket) flag() uint16 { return binary.LittleEndian.Uint16(bts[4:]) } +func (bts fecPacket) data() []byte { return bts[6:] } + +// fecElement has auxcilliary time field +type fecElement struct { + fecPacket + ts uint32 +} + +// fecDecoder for decoding incoming packets +type fecDecoder struct { + rxlimit int // queue size limit + dataShards int + parityShards int + shardSize int + rx []fecElement // ordered receive queue + + // caches + decodeCache [][]byte + flagCache []bool + + // zeros + zeros []byte + + // RS decoder + codec reedsolomon.Encoder + + // auto tune fec parameter + autoTune autoTune +} + +func newFECDecoder(dataShards, parityShards int) *fecDecoder { + if dataShards <= 0 || parityShards <= 0 { + return nil + } + + dec := new(fecDecoder) + dec.dataShards = dataShards + dec.parityShards = parityShards + dec.shardSize = dataShards + parityShards + dec.rxlimit = rxFECMulti * dec.shardSize + codec, err := reedsolomon.New(dataShards, parityShards) + if err != nil { + return nil + } + dec.codec = codec + dec.decodeCache = make([][]byte, dec.shardSize) + dec.flagCache = make([]bool, dec.shardSize) + dec.zeros = make([]byte, mtuLimit) + return dec +} + +// decode a fec packet +func (dec *fecDecoder) decode(in fecPacket) (recovered [][]byte) { + // sample to auto FEC tuner + if in.flag() == typeData { + dec.autoTune.Sample(true, in.seqid()) + } else { + dec.autoTune.Sample(false, in.seqid()) + } + + // check if FEC parameters is out of sync + var shouldTune bool + if int(in.seqid())%dec.shardSize < dec.dataShards { + if in.flag() != typeData { // expect typeData + shouldTune = true + } + } else { + if in.flag() != typeParity { + shouldTune = true + } + } + + if shouldTune { + autoDS := dec.autoTune.FindPeriod(true) + autoPS := dec.autoTune.FindPeriod(false) + + // edges found, we can tune parameters now + if autoDS > 0 && autoPS > 0 && autoDS < 256 && autoPS < 256 { + // and make sure it's different + if autoDS != dec.dataShards || autoPS != dec.parityShards { + dec.dataShards = autoDS + dec.parityShards = autoPS + dec.shardSize = autoDS + autoPS + dec.rxlimit = rxFECMulti * dec.shardSize + codec, err := reedsolomon.New(autoDS, autoPS) + if err != nil { + return nil + } + dec.codec = codec + dec.decodeCache = make([][]byte, dec.shardSize) + dec.flagCache = make([]bool, dec.shardSize) + //log.Println("autotune to :", dec.dataShards, dec.parityShards) + } + } + } + + // insertion + n := len(dec.rx) - 1 + insertIdx := 0 + for i := n; i >= 0; i-- { + if in.seqid() == dec.rx[i].seqid() { // de-duplicate + return nil + } else if _itimediff(in.seqid(), dec.rx[i].seqid()) > 0 { // insertion + insertIdx = i + 1 + break + } + } + + // make a copy + pkt := fecPacket(xmitBuf.Get().([]byte)[:len(in)]) + copy(pkt, in) + elem := fecElement{pkt, currentMs()} + + // insert into ordered rx queue + if insertIdx == n+1 { + dec.rx = append(dec.rx, elem) + } else { + dec.rx = append(dec.rx, fecElement{}) + copy(dec.rx[insertIdx+1:], dec.rx[insertIdx:]) // shift right + dec.rx[insertIdx] = elem + } + + // shard range for current packet + shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize) + shardEnd := shardBegin + uint32(dec.shardSize) - 1 + + // max search range in ordered queue for current shard + searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize)) + if searchBegin < 0 { + searchBegin = 0 + } + searchEnd := searchBegin + dec.shardSize - 1 + if searchEnd >= len(dec.rx) { + searchEnd = len(dec.rx) - 1 + } + + // re-construct datashards + if searchEnd-searchBegin+1 >= dec.dataShards { + var numshard, numDataShard, first, maxlen int + + // zero caches + shards := dec.decodeCache + shardsflag := dec.flagCache + for k := range dec.decodeCache { + shards[k] = nil + shardsflag[k] = false + } + + // shard assembly + for i := searchBegin; i <= searchEnd; i++ { + seqid := dec.rx[i].seqid() + if _itimediff(seqid, shardEnd) > 0 { + break + } else if _itimediff(seqid, shardBegin) >= 0 { + shards[seqid%uint32(dec.shardSize)] = dec.rx[i].data() + shardsflag[seqid%uint32(dec.shardSize)] = true + numshard++ + if dec.rx[i].flag() == typeData { + numDataShard++ + } + if numshard == 1 { + first = i + } + if len(dec.rx[i].data()) > maxlen { + maxlen = len(dec.rx[i].data()) + } + } + } + + if numDataShard == dec.dataShards { + // case 1: no loss on data shards + dec.rx = dec.freeRange(first, numshard, dec.rx) + } else if numshard >= dec.dataShards { + // case 2: loss on data shards, but it's recoverable from parity shards + for k := range shards { + if shards[k] != nil { + dlen := len(shards[k]) + shards[k] = shards[k][:maxlen] + copy(shards[k][dlen:], dec.zeros) + } else if k < dec.dataShards { + shards[k] = xmitBuf.Get().([]byte)[:0] + } + } + if err := dec.codec.ReconstructData(shards); err == nil { + for k := range shards[:dec.dataShards] { + if !shardsflag[k] { + // recovered data should be recycled + recovered = append(recovered, shards[k]) + } + } + } + dec.rx = dec.freeRange(first, numshard, dec.rx) + } + } + + // keep rxlimit + if len(dec.rx) > dec.rxlimit { + if dec.rx[0].flag() == typeData { // track the unrecoverable data + atomic.AddUint64(&DefaultSnmp.FECShortShards, 1) + } + dec.rx = dec.freeRange(0, 1, dec.rx) + } + + // timeout policy + current := currentMs() + numExpired := 0 + for k := range dec.rx { + if _itimediff(current, dec.rx[k].ts) > fecExpire { + numExpired++ + continue + } + break + } + if numExpired > 0 { + dec.rx = dec.freeRange(0, numExpired, dec.rx) + } + return +} + +// free a range of fecPacket +func (dec *fecDecoder) freeRange(first, n int, q []fecElement) []fecElement { + for i := first; i < first+n; i++ { // recycle buffer + xmitBuf.Put([]byte(q[i].fecPacket)) + } + + if first == 0 && n < cap(q)/2 { + return q[n:] + } + copy(q[first:], q[first+n:]) + return q[:len(q)-n] +} + +// release all segments back to xmitBuf +func (dec *fecDecoder) release() { + if n := len(dec.rx); n > 0 { + dec.rx = dec.freeRange(0, n, dec.rx) + } +} + +type ( + // fecEncoder for encoding outgoing packets + fecEncoder struct { + dataShards int + parityShards int + shardSize int + paws uint32 // Protect Against Wrapped Sequence numbers + next uint32 // next seqid + + shardCount int // count the number of datashards collected + maxSize int // track maximum data length in datashard + + headerOffset int // FEC header offset + payloadOffset int // FEC payload offset + + // caches + shardCache [][]byte + encodeCache [][]byte + + // zeros + zeros []byte + + // RS encoder + codec reedsolomon.Encoder + } +) + +func newFECEncoder(dataShards, parityShards, offset int) *fecEncoder { + if dataShards <= 0 || parityShards <= 0 { + return nil + } + enc := new(fecEncoder) + enc.dataShards = dataShards + enc.parityShards = parityShards + enc.shardSize = dataShards + parityShards + enc.paws = 0xffffffff / uint32(enc.shardSize) * uint32(enc.shardSize) + enc.headerOffset = offset + enc.payloadOffset = enc.headerOffset + fecHeaderSize + + codec, err := reedsolomon.New(dataShards, parityShards) + if err != nil { + return nil + } + enc.codec = codec + + // caches + enc.encodeCache = make([][]byte, enc.shardSize) + enc.shardCache = make([][]byte, enc.shardSize) + for k := range enc.shardCache { + enc.shardCache[k] = make([]byte, mtuLimit) + } + enc.zeros = make([]byte, mtuLimit) + return enc +} + +// encodes the packet, outputs parity shards if we have collected quorum datashards +// notice: the contents of 'ps' will be re-written in successive calling +func (enc *fecEncoder) encode(b []byte) (ps [][]byte) { + // The header format: + // | FEC SEQID(4B) | FEC TYPE(2B) | SIZE (2B) | PAYLOAD(SIZE-2) | + // |<-headerOffset |<-payloadOffset + enc.markData(b[enc.headerOffset:]) + binary.LittleEndian.PutUint16(b[enc.payloadOffset:], uint16(len(b[enc.payloadOffset:]))) + + // copy data from payloadOffset to fec shard cache + sz := len(b) + enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz] + copy(enc.shardCache[enc.shardCount][enc.payloadOffset:], b[enc.payloadOffset:]) + enc.shardCount++ + + // track max datashard length + if sz > enc.maxSize { + enc.maxSize = sz + } + + // Generation of Reed-Solomon Erasure Code + if enc.shardCount == enc.dataShards { + // fill '0' into the tail of each datashard + for i := 0; i < enc.dataShards; i++ { + shard := enc.shardCache[i] + slen := len(shard) + copy(shard[slen:enc.maxSize], enc.zeros) + } + + // construct equal-sized slice with stripped header + cache := enc.encodeCache + for k := range cache { + cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize] + } + + // encoding + if err := enc.codec.Encode(cache); err == nil { + ps = enc.shardCache[enc.dataShards:] + for k := range ps { + enc.markParity(ps[k][enc.headerOffset:]) + ps[k] = ps[k][:enc.maxSize] + } + } + + // counters resetting + enc.shardCount = 0 + enc.maxSize = 0 + } + + return +} + +func (enc *fecEncoder) markData(data []byte) { + binary.LittleEndian.PutUint32(data, enc.next) + binary.LittleEndian.PutUint16(data[4:], typeData) + enc.next++ +} + +func (enc *fecEncoder) markParity(data []byte) { + binary.LittleEndian.PutUint32(data, enc.next) + binary.LittleEndian.PutUint16(data[4:], typeParity) + // sequence wrap will only happen at parity shard + enc.next = (enc.next + 1) % enc.paws +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/flame.png b/vendor/github.com/xtaci/kcp-go/v5/flame.png Binary files differnew file mode 100644 index 0000000..672f649 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/flame.png diff --git a/vendor/github.com/xtaci/kcp-go/v5/frame.png b/vendor/github.com/xtaci/kcp-go/v5/frame.png Binary files differnew file mode 100644 index 0000000..0b0aefd --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/frame.png diff --git a/vendor/github.com/xtaci/kcp-go/v5/go.mod b/vendor/github.com/xtaci/kcp-go/v5/go.mod new file mode 100644 index 0000000..ff51020 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/go.mod @@ -0,0 +1,20 @@ +module github.com/xtaci/kcp-go/v5 + +require ( + github.com/klauspost/cpuid v1.3.1 // indirect + github.com/klauspost/reedsolomon v1.9.9 + github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104 // indirect + github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.6.1 + github.com/templexxx/cpu v0.0.7 // indirect + github.com/templexxx/xorsimd v0.4.1 + github.com/tjfoc/gmsm v1.3.2 + github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae + golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de + golang.org/x/net v0.0.0-20200707034311-ab3426394381 + golang.org/x/sys v0.0.0-20200808120158-1030fc2bf1d9 // indirect + golang.org/x/tools v0.0.0-20200808161706-5bf02b21f123 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect +) + +go 1.13 diff --git a/vendor/github.com/xtaci/kcp-go/v5/go.sum b/vendor/github.com/xtaci/kcp-go/v5/go.sum new file mode 100644 index 0000000..db81c71 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/go.sum @@ -0,0 +1,69 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= +github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= +github.com/klauspost/reedsolomon v1.9.9 h1:qCL7LZlv17xMixl55nq2/Oa1Y86nfO8EqDfv2GHND54= +github.com/klauspost/reedsolomon v1.9.9/go.mod h1:O7yFFHiQwDR6b2t63KPUpccPtNdp5ADgh1gg4fd12wo= +github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104 h1:ULR/QWMgcgRiZLUjSSJMU+fW+RDMstRdmnDWj9Q+AsA= +github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104/go.mod h1:wqKykBG2QzQDJEzvRkcS8x6MiSJkF52hXZsXcjaB3ls= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/templexxx/cpu v0.0.1 h1:hY4WdLOgKdc8y13EYklu9OUTXik80BkxHoWvTO6MQQY= +github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk= +github.com/templexxx/cpu v0.0.7 h1:pUEZn8JBy/w5yzdYWgx+0m0xL9uk6j4K91C5kOViAzo= +github.com/templexxx/cpu v0.0.7/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk= +github.com/templexxx/xorsimd v0.4.1 h1:iUZcywbOYDRAZUasAs2eSCUW8eobuZDy0I9FJiORkVg= +github.com/templexxx/xorsimd v0.4.1/go.mod h1:W+ffZz8jJMH2SXwuKu9WhygqBMbFnp14G2fqEr8qaNo= +github.com/tjfoc/gmsm v1.3.2 h1:7JVkAn5bvUJ7HtU08iW6UiD+UTmJTIToHCfeFzkcCxM= +github.com/tjfoc/gmsm v1.3.2/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w= +github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM= +github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/arch v0.0.0-20190909030613-46d78d1859ac/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= +golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= +golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200808120158-1030fc2bf1d9 h1:yi1hN8dcqI9l8klZfy4B8mJvFmmAxJEePIQQFNSd7Cs= +golang.org/x/sys v0.0.0-20200808120158-1030fc2bf1d9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200425043458-8463f397d07c h1:iHhCR0b26amDCiiO+kBguKZom9aMF+NrFxh9zeKR/XU= +golang.org/x/tools v0.0.0-20200425043458-8463f397d07c/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200808161706-5bf02b21f123 h1:4JSJPND/+4555t1HfXYF4UEqDqiSKCgeV0+hbA8hMs4= +golang.org/x/tools v0.0.0-20200808161706-5bf02b21f123/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/vendor/github.com/xtaci/kcp-go/v5/kcp-go.png b/vendor/github.com/xtaci/kcp-go/v5/kcp-go.png Binary files differnew file mode 100644 index 0000000..151b7c4 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/kcp-go.png diff --git a/vendor/github.com/xtaci/kcp-go/v5/kcp.go b/vendor/github.com/xtaci/kcp-go/v5/kcp.go new file mode 100644 index 0000000..0c6c304 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/kcp.go @@ -0,0 +1,1080 @@ +package kcp + +import ( + "encoding/binary" + "sync/atomic" + "time" +) + +const ( + IKCP_RTO_NDL = 30 // no delay min rto + IKCP_RTO_MIN = 100 // normal min rto + IKCP_RTO_DEF = 200 + IKCP_RTO_MAX = 60000 + IKCP_CMD_PUSH = 81 // cmd: push data + IKCP_CMD_ACK = 82 // cmd: ack + IKCP_CMD_WASK = 83 // cmd: window probe (ask) + IKCP_CMD_WINS = 84 // cmd: window size (tell) + IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK + IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS + IKCP_WND_SND = 32 + IKCP_WND_RCV = 32 + IKCP_MTU_DEF = 1400 + IKCP_ACK_FAST = 3 + IKCP_INTERVAL = 100 + IKCP_OVERHEAD = 24 + IKCP_DEADLINK = 20 + IKCP_THRESH_INIT = 2 + IKCP_THRESH_MIN = 2 + IKCP_PROBE_INIT = 7000 // 7 secs to probe window size + IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window + IKCP_SN_OFFSET = 12 +) + +// monotonic reference time point +var refTime time.Time = time.Now() + +// currentMs returns current elapsed monotonic milliseconds since program startup +func currentMs() uint32 { return uint32(time.Since(refTime) / time.Millisecond) } + +// output_callback is a prototype which ought capture conn and call conn.Write +type output_callback func(buf []byte, size int) + +/* encode 8 bits unsigned int */ +func ikcp_encode8u(p []byte, c byte) []byte { + p[0] = c + return p[1:] +} + +/* decode 8 bits unsigned int */ +func ikcp_decode8u(p []byte, c *byte) []byte { + *c = p[0] + return p[1:] +} + +/* encode 16 bits unsigned int (lsb) */ +func ikcp_encode16u(p []byte, w uint16) []byte { + binary.LittleEndian.PutUint16(p, w) + return p[2:] +} + +/* decode 16 bits unsigned int (lsb) */ +func ikcp_decode16u(p []byte, w *uint16) []byte { + *w = binary.LittleEndian.Uint16(p) + return p[2:] +} + +/* encode 32 bits unsigned int (lsb) */ +func ikcp_encode32u(p []byte, l uint32) []byte { + binary.LittleEndian.PutUint32(p, l) + return p[4:] +} + +/* decode 32 bits unsigned int (lsb) */ +func ikcp_decode32u(p []byte, l *uint32) []byte { + *l = binary.LittleEndian.Uint32(p) + return p[4:] +} + +func _imin_(a, b uint32) uint32 { + if a <= b { + return a + } + return b +} + +func _imax_(a, b uint32) uint32 { + if a >= b { + return a + } + return b +} + +func _ibound_(lower, middle, upper uint32) uint32 { + return _imin_(_imax_(lower, middle), upper) +} + +func _itimediff(later, earlier uint32) int32 { + return (int32)(later - earlier) +} + +// segment defines a KCP segment +type segment struct { + conv uint32 + cmd uint8 + frg uint8 + wnd uint16 + ts uint32 + sn uint32 + una uint32 + rto uint32 + xmit uint32 + resendts uint32 + fastack uint32 + acked uint32 // mark if the seg has acked + data []byte +} + +// encode a segment into buffer +func (seg *segment) encode(ptr []byte) []byte { + ptr = ikcp_encode32u(ptr, seg.conv) + ptr = ikcp_encode8u(ptr, seg.cmd) + ptr = ikcp_encode8u(ptr, seg.frg) + ptr = ikcp_encode16u(ptr, seg.wnd) + ptr = ikcp_encode32u(ptr, seg.ts) + ptr = ikcp_encode32u(ptr, seg.sn) + ptr = ikcp_encode32u(ptr, seg.una) + ptr = ikcp_encode32u(ptr, uint32(len(seg.data))) + atomic.AddUint64(&DefaultSnmp.OutSegs, 1) + return ptr +} + +// KCP defines a single KCP connection +type KCP struct { + conv, mtu, mss, state uint32 + snd_una, snd_nxt, rcv_nxt uint32 + ssthresh uint32 + rx_rttvar, rx_srtt int32 + rx_rto, rx_minrto uint32 + snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32 + interval, ts_flush uint32 + nodelay, updated uint32 + ts_probe, probe_wait uint32 + dead_link, incr uint32 + + fastresend int32 + nocwnd, stream int32 + + snd_queue []segment + rcv_queue []segment + snd_buf []segment + rcv_buf []segment + + acklist []ackItem + + buffer []byte + reserved int + output output_callback +} + +type ackItem struct { + sn uint32 + ts uint32 +} + +// NewKCP create a new kcp state machine +// +// 'conv' must be equal in the connection peers, or else data will be silently rejected. +// +// 'output' function will be called whenever these is data to be sent on wire. +func NewKCP(conv uint32, output output_callback) *KCP { + kcp := new(KCP) + kcp.conv = conv + kcp.snd_wnd = IKCP_WND_SND + kcp.rcv_wnd = IKCP_WND_RCV + kcp.rmt_wnd = IKCP_WND_RCV + kcp.mtu = IKCP_MTU_DEF + kcp.mss = kcp.mtu - IKCP_OVERHEAD + kcp.buffer = make([]byte, kcp.mtu) + kcp.rx_rto = IKCP_RTO_DEF + kcp.rx_minrto = IKCP_RTO_MIN + kcp.interval = IKCP_INTERVAL + kcp.ts_flush = IKCP_INTERVAL + kcp.ssthresh = IKCP_THRESH_INIT + kcp.dead_link = IKCP_DEADLINK + kcp.output = output + return kcp +} + +// newSegment creates a KCP segment +func (kcp *KCP) newSegment(size int) (seg segment) { + seg.data = xmitBuf.Get().([]byte)[:size] + return +} + +// delSegment recycles a KCP segment +func (kcp *KCP) delSegment(seg *segment) { + if seg.data != nil { + xmitBuf.Put(seg.data) + seg.data = nil + } +} + +// ReserveBytes keeps n bytes untouched from the beginning of the buffer, +// the output_callback function should be aware of this. +// +// Return false if n >= mss +func (kcp *KCP) ReserveBytes(n int) bool { + if n >= int(kcp.mtu-IKCP_OVERHEAD) || n < 0 { + return false + } + kcp.reserved = n + kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(n) + return true +} + +// PeekSize checks the size of next message in the recv queue +func (kcp *KCP) PeekSize() (length int) { + if len(kcp.rcv_queue) == 0 { + return -1 + } + + seg := &kcp.rcv_queue[0] + if seg.frg == 0 { + return len(seg.data) + } + + if len(kcp.rcv_queue) < int(seg.frg+1) { + return -1 + } + + for k := range kcp.rcv_queue { + seg := &kcp.rcv_queue[k] + length += len(seg.data) + if seg.frg == 0 { + break + } + } + return +} + +// Receive data from kcp state machine +// +// Return number of bytes read. +// +// Return -1 when there is no readable data. +// +// Return -2 if len(buffer) is smaller than kcp.PeekSize(). +func (kcp *KCP) Recv(buffer []byte) (n int) { + peeksize := kcp.PeekSize() + if peeksize < 0 { + return -1 + } + + if peeksize > len(buffer) { + return -2 + } + + var fast_recover bool + if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) { + fast_recover = true + } + + // merge fragment + count := 0 + for k := range kcp.rcv_queue { + seg := &kcp.rcv_queue[k] + copy(buffer, seg.data) + buffer = buffer[len(seg.data):] + n += len(seg.data) + count++ + kcp.delSegment(seg) + if seg.frg == 0 { + break + } + } + if count > 0 { + kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count) + } + + // move available data from rcv_buf -> rcv_queue + count = 0 + for k := range kcp.rcv_buf { + seg := &kcp.rcv_buf[k] + if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) { + kcp.rcv_nxt++ + count++ + } else { + break + } + } + + if count > 0 { + kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...) + kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count) + } + + // fast recover + if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + kcp.probe |= IKCP_ASK_TELL + } + return +} + +// Send is user/upper level send, returns below zero for error +func (kcp *KCP) Send(buffer []byte) int { + var count int + if len(buffer) == 0 { + return -1 + } + + // append to previous segment in streaming mode (if possible) + if kcp.stream != 0 { + n := len(kcp.snd_queue) + if n > 0 { + seg := &kcp.snd_queue[n-1] + if len(seg.data) < int(kcp.mss) { + capacity := int(kcp.mss) - len(seg.data) + extend := capacity + if len(buffer) < capacity { + extend = len(buffer) + } + + // grow slice, the underlying cap is guaranteed to + // be larger than kcp.mss + oldlen := len(seg.data) + seg.data = seg.data[:oldlen+extend] + copy(seg.data[oldlen:], buffer) + buffer = buffer[extend:] + } + } + + if len(buffer) == 0 { + return 0 + } + } + + if len(buffer) <= int(kcp.mss) { + count = 1 + } else { + count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss) + } + + if count > 255 { + return -2 + } + + if count == 0 { + count = 1 + } + + for i := 0; i < count; i++ { + var size int + if len(buffer) > int(kcp.mss) { + size = int(kcp.mss) + } else { + size = len(buffer) + } + seg := kcp.newSegment(size) + copy(seg.data, buffer[:size]) + if kcp.stream == 0 { // message mode + seg.frg = uint8(count - i - 1) + } else { // stream mode + seg.frg = 0 + } + kcp.snd_queue = append(kcp.snd_queue, seg) + buffer = buffer[size:] + } + return 0 +} + +func (kcp *KCP) update_ack(rtt int32) { + // https://tools.ietf.org/html/rfc6298 + var rto uint32 + if kcp.rx_srtt == 0 { + kcp.rx_srtt = rtt + kcp.rx_rttvar = rtt >> 1 + } else { + delta := rtt - kcp.rx_srtt + kcp.rx_srtt += delta >> 3 + if delta < 0 { + delta = -delta + } + if rtt < kcp.rx_srtt-kcp.rx_rttvar { + // if the new RTT sample is below the bottom of the range of + // what an RTT measurement is expected to be. + // give an 8x reduced weight versus its normal weighting + kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5 + } else { + kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2 + } + } + rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2) + kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX) +} + +func (kcp *KCP) shrink_buf() { + if len(kcp.snd_buf) > 0 { + seg := &kcp.snd_buf[0] + kcp.snd_una = seg.sn + } else { + kcp.snd_una = kcp.snd_nxt + } +} + +func (kcp *KCP) parse_ack(sn uint32) { + if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 { + return + } + + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + if sn == seg.sn { + // mark and free space, but leave the segment here, + // and wait until `una` to delete this, then we don't + // have to shift the segments behind forward, + // which is an expensive operation for large window + seg.acked = 1 + kcp.delSegment(seg) + break + } + if _itimediff(sn, seg.sn) < 0 { + break + } + } +} + +func (kcp *KCP) parse_fastack(sn, ts uint32) { + if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 { + return + } + + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + if _itimediff(sn, seg.sn) < 0 { + break + } else if sn != seg.sn && _itimediff(seg.ts, ts) <= 0 { + seg.fastack++ + } + } +} + +func (kcp *KCP) parse_una(una uint32) int { + count := 0 + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + if _itimediff(una, seg.sn) > 0 { + kcp.delSegment(seg) + count++ + } else { + break + } + } + if count > 0 { + kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count) + } + return count +} + +// ack append +func (kcp *KCP) ack_push(sn, ts uint32) { + kcp.acklist = append(kcp.acklist, ackItem{sn, ts}) +} + +// returns true if data has repeated +func (kcp *KCP) parse_data(newseg segment) bool { + sn := newseg.sn + if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 || + _itimediff(sn, kcp.rcv_nxt) < 0 { + return true + } + + n := len(kcp.rcv_buf) - 1 + insert_idx := 0 + repeat := false + for i := n; i >= 0; i-- { + seg := &kcp.rcv_buf[i] + if seg.sn == sn { + repeat = true + break + } + if _itimediff(sn, seg.sn) > 0 { + insert_idx = i + 1 + break + } + } + + if !repeat { + // replicate the content if it's new + dataCopy := xmitBuf.Get().([]byte)[:len(newseg.data)] + copy(dataCopy, newseg.data) + newseg.data = dataCopy + + if insert_idx == n+1 { + kcp.rcv_buf = append(kcp.rcv_buf, newseg) + } else { + kcp.rcv_buf = append(kcp.rcv_buf, segment{}) + copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:]) + kcp.rcv_buf[insert_idx] = newseg + } + } + + // move available data from rcv_buf -> rcv_queue + count := 0 + for k := range kcp.rcv_buf { + seg := &kcp.rcv_buf[k] + if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) { + kcp.rcv_nxt++ + count++ + } else { + break + } + } + if count > 0 { + kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...) + kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count) + } + + return repeat +} + +// Input a packet into kcp state machine. +// +// 'regular' indicates it's a real data packet from remote, and it means it's not generated from ReedSolomon +// codecs. +// +// 'ackNoDelay' will trigger immediate ACK, but surely it will not be efficient in bandwidth +func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { + snd_una := kcp.snd_una + if len(data) < IKCP_OVERHEAD { + return -1 + } + + var latest uint32 // the latest ack packet + var flag int + var inSegs uint64 + var windowSlides bool + + for { + var ts, sn, length, una, conv uint32 + var wnd uint16 + var cmd, frg uint8 + + if len(data) < int(IKCP_OVERHEAD) { + break + } + + data = ikcp_decode32u(data, &conv) + if conv != kcp.conv { + return -1 + } + + data = ikcp_decode8u(data, &cmd) + data = ikcp_decode8u(data, &frg) + data = ikcp_decode16u(data, &wnd) + data = ikcp_decode32u(data, &ts) + data = ikcp_decode32u(data, &sn) + data = ikcp_decode32u(data, &una) + data = ikcp_decode32u(data, &length) + if len(data) < int(length) { + return -2 + } + + if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && + cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS { + return -3 + } + + // only trust window updates from regular packets. i.e: latest update + if regular { + kcp.rmt_wnd = uint32(wnd) + } + if kcp.parse_una(una) > 0 { + windowSlides = true + } + kcp.shrink_buf() + + if cmd == IKCP_CMD_ACK { + kcp.parse_ack(sn) + kcp.parse_fastack(sn, ts) + flag |= 1 + latest = ts + } else if cmd == IKCP_CMD_PUSH { + repeat := true + if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 { + kcp.ack_push(sn, ts) + if _itimediff(sn, kcp.rcv_nxt) >= 0 { + var seg segment + seg.conv = conv + seg.cmd = cmd + seg.frg = frg + seg.wnd = wnd + seg.ts = ts + seg.sn = sn + seg.una = una + seg.data = data[:length] // delayed data copying + repeat = kcp.parse_data(seg) + } + } + if regular && repeat { + atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1) + } + } else if cmd == IKCP_CMD_WASK { + // ready to send back IKCP_CMD_WINS in Ikcp_flush + // tell remote my window size + kcp.probe |= IKCP_ASK_TELL + } else if cmd == IKCP_CMD_WINS { + // do nothing + } else { + return -3 + } + + inSegs++ + data = data[length:] + } + atomic.AddUint64(&DefaultSnmp.InSegs, inSegs) + + // update rtt with the latest ts + // ignore the FEC packet + if flag != 0 && regular { + current := currentMs() + if _itimediff(current, latest) >= 0 { + kcp.update_ack(_itimediff(current, latest)) + } + } + + // cwnd update when packet arrived + if kcp.nocwnd == 0 { + if _itimediff(kcp.snd_una, snd_una) > 0 { + if kcp.cwnd < kcp.rmt_wnd { + mss := kcp.mss + if kcp.cwnd < kcp.ssthresh { + kcp.cwnd++ + kcp.incr += mss + } else { + if kcp.incr < mss { + kcp.incr = mss + } + kcp.incr += (mss*mss)/kcp.incr + (mss / 16) + if (kcp.cwnd+1)*mss <= kcp.incr { + if mss > 0 { + kcp.cwnd = (kcp.incr + mss - 1) / mss + } else { + kcp.cwnd = kcp.incr + mss - 1 + } + } + } + if kcp.cwnd > kcp.rmt_wnd { + kcp.cwnd = kcp.rmt_wnd + kcp.incr = kcp.rmt_wnd * mss + } + } + } + } + + if windowSlides { // if window has slided, flush + kcp.flush(false) + } else if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately + kcp.flush(true) + } + return 0 +} + +func (kcp *KCP) wnd_unused() uint16 { + if len(kcp.rcv_queue) < int(kcp.rcv_wnd) { + return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue)) + } + return 0 +} + +// flush pending data +func (kcp *KCP) flush(ackOnly bool) uint32 { + var seg segment + seg.conv = kcp.conv + seg.cmd = IKCP_CMD_ACK + seg.wnd = kcp.wnd_unused() + seg.una = kcp.rcv_nxt + + buffer := kcp.buffer + ptr := buffer[kcp.reserved:] // keep n bytes untouched + + // makeSpace makes room for writing + makeSpace := func(space int) { + size := len(buffer) - len(ptr) + if size+space > int(kcp.mtu) { + kcp.output(buffer, size) + ptr = buffer[kcp.reserved:] + } + } + + // flush bytes in buffer if there is any + flushBuffer := func() { + size := len(buffer) - len(ptr) + if size > kcp.reserved { + kcp.output(buffer, size) + } + } + + // flush acknowledges + for i, ack := range kcp.acklist { + makeSpace(IKCP_OVERHEAD) + // filter jitters caused by bufferbloat + if _itimediff(ack.sn, kcp.rcv_nxt) >= 0 || len(kcp.acklist)-1 == i { + seg.sn, seg.ts = ack.sn, ack.ts + ptr = seg.encode(ptr) + } + } + kcp.acklist = kcp.acklist[0:0] + + if ackOnly { // flash remain ack segments + flushBuffer() + return kcp.interval + } + + // probe window size (if remote window size equals zero) + if kcp.rmt_wnd == 0 { + current := currentMs() + if kcp.probe_wait == 0 { + kcp.probe_wait = IKCP_PROBE_INIT + kcp.ts_probe = current + kcp.probe_wait + } else { + if _itimediff(current, kcp.ts_probe) >= 0 { + if kcp.probe_wait < IKCP_PROBE_INIT { + kcp.probe_wait = IKCP_PROBE_INIT + } + kcp.probe_wait += kcp.probe_wait / 2 + if kcp.probe_wait > IKCP_PROBE_LIMIT { + kcp.probe_wait = IKCP_PROBE_LIMIT + } + kcp.ts_probe = current + kcp.probe_wait + kcp.probe |= IKCP_ASK_SEND + } + } + } else { + kcp.ts_probe = 0 + kcp.probe_wait = 0 + } + + // flush window probing commands + if (kcp.probe & IKCP_ASK_SEND) != 0 { + seg.cmd = IKCP_CMD_WASK + makeSpace(IKCP_OVERHEAD) + ptr = seg.encode(ptr) + } + + // flush window probing commands + if (kcp.probe & IKCP_ASK_TELL) != 0 { + seg.cmd = IKCP_CMD_WINS + makeSpace(IKCP_OVERHEAD) + ptr = seg.encode(ptr) + } + + kcp.probe = 0 + + // calculate window size + cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd) + if kcp.nocwnd == 0 { + cwnd = _imin_(kcp.cwnd, cwnd) + } + + // sliding window, controlled by snd_nxt && sna_una+cwnd + newSegsCount := 0 + for k := range kcp.snd_queue { + if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 { + break + } + newseg := kcp.snd_queue[k] + newseg.conv = kcp.conv + newseg.cmd = IKCP_CMD_PUSH + newseg.sn = kcp.snd_nxt + kcp.snd_buf = append(kcp.snd_buf, newseg) + kcp.snd_nxt++ + newSegsCount++ + } + if newSegsCount > 0 { + kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount) + } + + // calculate resent + resent := uint32(kcp.fastresend) + if kcp.fastresend <= 0 { + resent = 0xffffffff + } + + // check for retransmissions + current := currentMs() + var change, lostSegs, fastRetransSegs, earlyRetransSegs uint64 + minrto := int32(kcp.interval) + + ref := kcp.snd_buf[:len(kcp.snd_buf)] // for bounds check elimination + for k := range ref { + segment := &ref[k] + needsend := false + if segment.acked == 1 { + continue + } + if segment.xmit == 0 { // initial transmit + needsend = true + segment.rto = kcp.rx_rto + segment.resendts = current + segment.rto + } else if segment.fastack >= resent { // fast retransmit + needsend = true + segment.fastack = 0 + segment.rto = kcp.rx_rto + segment.resendts = current + segment.rto + change++ + fastRetransSegs++ + } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit + needsend = true + segment.fastack = 0 + segment.rto = kcp.rx_rto + segment.resendts = current + segment.rto + change++ + earlyRetransSegs++ + } else if _itimediff(current, segment.resendts) >= 0 { // RTO + needsend = true + if kcp.nodelay == 0 { + segment.rto += kcp.rx_rto + } else { + segment.rto += kcp.rx_rto / 2 + } + segment.fastack = 0 + segment.resendts = current + segment.rto + lostSegs++ + } + + if needsend { + current = currentMs() + segment.xmit++ + segment.ts = current + segment.wnd = seg.wnd + segment.una = seg.una + + need := IKCP_OVERHEAD + len(segment.data) + makeSpace(need) + ptr = segment.encode(ptr) + copy(ptr, segment.data) + ptr = ptr[len(segment.data):] + + if segment.xmit >= kcp.dead_link { + kcp.state = 0xFFFFFFFF + } + } + + // get the nearest rto + if rto := _itimediff(segment.resendts, current); rto > 0 && rto < minrto { + minrto = rto + } + } + + // flash remain segments + flushBuffer() + + // counter updates + sum := lostSegs + if lostSegs > 0 { + atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs) + } + if fastRetransSegs > 0 { + atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs) + sum += fastRetransSegs + } + if earlyRetransSegs > 0 { + atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs) + sum += earlyRetransSegs + } + if sum > 0 { + atomic.AddUint64(&DefaultSnmp.RetransSegs, sum) + } + + // cwnd update + if kcp.nocwnd == 0 { + // update ssthresh + // rate halving, https://tools.ietf.org/html/rfc6937 + if change > 0 { + inflight := kcp.snd_nxt - kcp.snd_una + kcp.ssthresh = inflight / 2 + if kcp.ssthresh < IKCP_THRESH_MIN { + kcp.ssthresh = IKCP_THRESH_MIN + } + kcp.cwnd = kcp.ssthresh + resent + kcp.incr = kcp.cwnd * kcp.mss + } + + // congestion control, https://tools.ietf.org/html/rfc5681 + if lostSegs > 0 { + kcp.ssthresh = cwnd / 2 + if kcp.ssthresh < IKCP_THRESH_MIN { + kcp.ssthresh = IKCP_THRESH_MIN + } + kcp.cwnd = 1 + kcp.incr = kcp.mss + } + + if kcp.cwnd < 1 { + kcp.cwnd = 1 + kcp.incr = kcp.mss + } + } + + return uint32(minrto) +} + +// (deprecated) +// +// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask +// ikcp_check when to call it again (without ikcp_input/_send calling). +// 'current' - current timestamp in millisec. +func (kcp *KCP) Update() { + var slap int32 + + current := currentMs() + if kcp.updated == 0 { + kcp.updated = 1 + kcp.ts_flush = current + } + + slap = _itimediff(current, kcp.ts_flush) + + if slap >= 10000 || slap < -10000 { + kcp.ts_flush = current + slap = 0 + } + + if slap >= 0 { + kcp.ts_flush += kcp.interval + if _itimediff(current, kcp.ts_flush) >= 0 { + kcp.ts_flush = current + kcp.interval + } + kcp.flush(false) + } +} + +// (deprecated) +// +// Check determines when should you invoke ikcp_update: +// returns when you should invoke ikcp_update in millisec, if there +// is no ikcp_input/_send calling. you can call ikcp_update in that +// time, instead of call update repeatly. +// Important to reduce unnacessary ikcp_update invoking. use it to +// schedule ikcp_update (eg. implementing an epoll-like mechanism, +// or optimize ikcp_update when handling massive kcp connections) +func (kcp *KCP) Check() uint32 { + current := currentMs() + ts_flush := kcp.ts_flush + tm_flush := int32(0x7fffffff) + tm_packet := int32(0x7fffffff) + minimal := uint32(0) + if kcp.updated == 0 { + return current + } + + if _itimediff(current, ts_flush) >= 10000 || + _itimediff(current, ts_flush) < -10000 { + ts_flush = current + } + + if _itimediff(current, ts_flush) >= 0 { + return current + } + + tm_flush = _itimediff(ts_flush, current) + + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + diff := _itimediff(seg.resendts, current) + if diff <= 0 { + return current + } + if diff < tm_packet { + tm_packet = diff + } + } + + minimal = uint32(tm_packet) + if tm_packet >= tm_flush { + minimal = uint32(tm_flush) + } + if minimal >= kcp.interval { + minimal = kcp.interval + } + + return current + minimal +} + +// SetMtu changes MTU size, default is 1400 +func (kcp *KCP) SetMtu(mtu int) int { + if mtu < 50 || mtu < IKCP_OVERHEAD { + return -1 + } + if kcp.reserved >= int(kcp.mtu-IKCP_OVERHEAD) || kcp.reserved < 0 { + return -1 + } + + buffer := make([]byte, mtu) + if buffer == nil { + return -2 + } + kcp.mtu = uint32(mtu) + kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(kcp.reserved) + kcp.buffer = buffer + return 0 +} + +// NoDelay options +// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1) +// nodelay: 0:disable(default), 1:enable +// interval: internal update timer interval in millisec, default is 100ms +// resend: 0:disable fast resend(default), 1:enable fast resend +// nc: 0:normal congestion control(default), 1:disable congestion control +func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int { + if nodelay >= 0 { + kcp.nodelay = uint32(nodelay) + if nodelay != 0 { + kcp.rx_minrto = IKCP_RTO_NDL + } else { + kcp.rx_minrto = IKCP_RTO_MIN + } + } + if interval >= 0 { + if interval > 5000 { + interval = 5000 + } else if interval < 10 { + interval = 10 + } + kcp.interval = uint32(interval) + } + if resend >= 0 { + kcp.fastresend = int32(resend) + } + if nc >= 0 { + kcp.nocwnd = int32(nc) + } + return 0 +} + +// WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default +func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int { + if sndwnd > 0 { + kcp.snd_wnd = uint32(sndwnd) + } + if rcvwnd > 0 { + kcp.rcv_wnd = uint32(rcvwnd) + } + return 0 +} + +// WaitSnd gets how many packet is waiting to be sent +func (kcp *KCP) WaitSnd() int { + return len(kcp.snd_buf) + len(kcp.snd_queue) +} + +// remove front n elements from queue +// if the number of elements to remove is more than half of the size. +// just shift the rear elements to front, otherwise just reslice q to q[n:] +// then the cost of runtime.growslice can always be less than n/2 +func (kcp *KCP) remove_front(q []segment, n int) []segment { + if n > cap(q)/2 { + newn := copy(q, q[n:]) + return q[:newn] + } + return q[n:] +} + +// Release all cached outgoing segments +func (kcp *KCP) ReleaseTX() { + for k := range kcp.snd_queue { + if kcp.snd_queue[k].data != nil { + xmitBuf.Put(kcp.snd_queue[k].data) + } + } + for k := range kcp.snd_buf { + if kcp.snd_buf[k].data != nil { + xmitBuf.Put(kcp.snd_buf[k].data) + } + } + kcp.snd_queue = nil + kcp.snd_buf = nil +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/readloop.go b/vendor/github.com/xtaci/kcp-go/v5/readloop.go new file mode 100644 index 0000000..697395a --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/readloop.go @@ -0,0 +1,39 @@ +package kcp + +import ( + "sync/atomic" + + "github.com/pkg/errors" +) + +func (s *UDPSession) defaultReadLoop() { + buf := make([]byte, mtuLimit) + var src string + for { + if n, addr, err := s.conn.ReadFrom(buf); err == nil { + // make sure the packet is from the same source + if src == "" { // set source address + src = addr.String() + } else if addr.String() != src { + atomic.AddUint64(&DefaultSnmp.InErrs, 1) + continue + } + s.packetInput(buf[:n]) + } else { + s.notifyReadError(errors.WithStack(err)) + return + } + } +} + +func (l *Listener) defaultMonitor() { + buf := make([]byte, mtuLimit) + for { + if n, from, err := l.conn.ReadFrom(buf); err == nil { + l.packetInput(buf[:n], from) + } else { + l.notifyReadError(errors.WithStack(err)) + return + } + } +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/readloop_generic.go b/vendor/github.com/xtaci/kcp-go/v5/readloop_generic.go new file mode 100644 index 0000000..5dbe4f4 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/readloop_generic.go @@ -0,0 +1,11 @@ +// +build !linux + +package kcp + +func (s *UDPSession) readLoop() { + s.defaultReadLoop() +} + +func (l *Listener) monitor() { + l.defaultMonitor() +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/readloop_linux.go b/vendor/github.com/xtaci/kcp-go/v5/readloop_linux.go new file mode 100644 index 0000000..be194af --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/readloop_linux.go @@ -0,0 +1,111 @@ +// +build linux + +package kcp + +import ( + "net" + "os" + "sync/atomic" + + "github.com/pkg/errors" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// the read loop for a client session +func (s *UDPSession) readLoop() { + // default version + if s.xconn == nil { + s.defaultReadLoop() + return + } + + // x/net version + var src string + msgs := make([]ipv4.Message, batchSize) + for k := range msgs { + msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)} + } + + for { + if count, err := s.xconn.ReadBatch(msgs, 0); err == nil { + for i := 0; i < count; i++ { + msg := &msgs[i] + // make sure the packet is from the same source + if src == "" { // set source address if nil + src = msg.Addr.String() + } else if msg.Addr.String() != src { + atomic.AddUint64(&DefaultSnmp.InErrs, 1) + continue + } + + // source and size has validated + s.packetInput(msg.Buffers[0][:msg.N]) + } + } else { + // compatibility issue: + // for linux kernel<=2.6.32, support for sendmmsg is not available + // an error of type os.SyscallError will be returned + if operr, ok := err.(*net.OpError); ok { + if se, ok := operr.Err.(*os.SyscallError); ok { + if se.Syscall == "recvmmsg" { + s.defaultReadLoop() + return + } + } + } + s.notifyReadError(errors.WithStack(err)) + return + } + } +} + +// monitor incoming data for all connections of server +func (l *Listener) monitor() { + var xconn batchConn + if _, ok := l.conn.(*net.UDPConn); ok { + addr, err := net.ResolveUDPAddr("udp", l.conn.LocalAddr().String()) + if err == nil { + if addr.IP.To4() != nil { + xconn = ipv4.NewPacketConn(l.conn) + } else { + xconn = ipv6.NewPacketConn(l.conn) + } + } + } + + // default version + if xconn == nil { + l.defaultMonitor() + return + } + + // x/net version + msgs := make([]ipv4.Message, batchSize) + for k := range msgs { + msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)} + } + + for { + if count, err := xconn.ReadBatch(msgs, 0); err == nil { + for i := 0; i < count; i++ { + msg := &msgs[i] + l.packetInput(msg.Buffers[0][:msg.N], msg.Addr) + } + } else { + // compatibility issue: + // for linux kernel<=2.6.32, support for sendmmsg is not available + // an error of type os.SyscallError will be returned + if operr, ok := err.(*net.OpError); ok { + if se, ok := operr.Err.(*os.SyscallError); ok { + if se.Syscall == "recvmmsg" { + l.defaultMonitor() + return + } + } + } + l.notifyReadError(errors.WithStack(err)) + return + } + } +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/sess.go b/vendor/github.com/xtaci/kcp-go/v5/sess.go new file mode 100644 index 0000000..2dedd74 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/sess.go @@ -0,0 +1,1075 @@ +// Package kcp-go is a Reliable-UDP library for golang. +// +// This library intents to provide a smooth, resilient, ordered, +// error-checked and anonymous delivery of streams over UDP packets. +// +// The interfaces of this package aims to be compatible with +// net.Conn in standard library, but offers powerful features for advanced users. +package kcp + +import ( + "crypto/rand" + "encoding/binary" + "hash/crc32" + "io" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +const ( + // 16-bytes nonce for each packet + nonceSize = 16 + + // 4-bytes packet checksum + crcSize = 4 + + // overall crypto header size + cryptHeaderSize = nonceSize + crcSize + + // maximum packet size + mtuLimit = 1500 + + // accept backlog + acceptBacklog = 128 +) + +var ( + errInvalidOperation = errors.New("invalid operation") + errTimeout = errors.New("timeout") +) + +var ( + // a system-wide packet buffer shared among sending, receiving and FEC + // to mitigate high-frequency memory allocation for packets, bytes from xmitBuf + // is aligned to 64bit + xmitBuf sync.Pool +) + +func init() { + xmitBuf.New = func() interface{} { + return make([]byte, mtuLimit) + } +} + +type ( + // UDPSession defines a KCP session implemented by UDP + UDPSession struct { + conn net.PacketConn // the underlying packet connection + ownConn bool // true if we created conn internally, false if provided by caller + kcp *KCP // KCP ARQ protocol + l *Listener // pointing to the Listener object if it's been accepted by a Listener + block BlockCrypt // block encryption object + + // kcp receiving is based on packets + // recvbuf turns packets into stream + recvbuf []byte + bufptr []byte + + // FEC codec + fecDecoder *fecDecoder + fecEncoder *fecEncoder + + // settings + remote net.Addr // remote peer address + rd time.Time // read deadline + wd time.Time // write deadline + headerSize int // the header size additional to a KCP frame + ackNoDelay bool // send ack immediately for each incoming packet(testing purpose) + writeDelay bool // delay kcp.flush() for Write() for bulk transfer + dup int // duplicate udp packets(testing purpose) + + // notifications + die chan struct{} // notify current session has Closed + dieOnce sync.Once + chReadEvent chan struct{} // notify Read() can be called without blocking + chWriteEvent chan struct{} // notify Write() can be called without blocking + + // socket error handling + socketReadError atomic.Value + socketWriteError atomic.Value + chSocketReadError chan struct{} + chSocketWriteError chan struct{} + socketReadErrorOnce sync.Once + socketWriteErrorOnce sync.Once + + // nonce generator + nonce Entropy + + // packets waiting to be sent on wire + txqueue []ipv4.Message + xconn batchConn // for x/net + xconnWriteError error + + mu sync.Mutex + } + + setReadBuffer interface { + SetReadBuffer(bytes int) error + } + + setWriteBuffer interface { + SetWriteBuffer(bytes int) error + } + + setDSCP interface { + SetDSCP(int) error + } +) + +// newUDPSession create a new udp session for client or server +func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, ownConn bool, remote net.Addr, block BlockCrypt) *UDPSession { + sess := new(UDPSession) + sess.die = make(chan struct{}) + sess.nonce = new(nonceAES128) + sess.nonce.Init() + sess.chReadEvent = make(chan struct{}, 1) + sess.chWriteEvent = make(chan struct{}, 1) + sess.chSocketReadError = make(chan struct{}) + sess.chSocketWriteError = make(chan struct{}) + sess.remote = remote + sess.conn = conn + sess.ownConn = ownConn + sess.l = l + sess.block = block + sess.recvbuf = make([]byte, mtuLimit) + + // cast to writebatch conn + if _, ok := conn.(*net.UDPConn); ok { + addr, err := net.ResolveUDPAddr("udp", conn.LocalAddr().String()) + if err == nil { + if addr.IP.To4() != nil { + sess.xconn = ipv4.NewPacketConn(conn) + } else { + sess.xconn = ipv6.NewPacketConn(conn) + } + } + } + + // FEC codec initialization + sess.fecDecoder = newFECDecoder(dataShards, parityShards) + if sess.block != nil { + sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize) + } else { + sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0) + } + + // calculate additional header size introduced by FEC and encryption + if sess.block != nil { + sess.headerSize += cryptHeaderSize + } + if sess.fecEncoder != nil { + sess.headerSize += fecHeaderSizePlus2 + } + + sess.kcp = NewKCP(conv, func(buf []byte, size int) { + if size >= IKCP_OVERHEAD+sess.headerSize { + sess.output(buf[:size]) + } + }) + sess.kcp.ReserveBytes(sess.headerSize) + + if sess.l == nil { // it's a client connection + go sess.readLoop() + atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1) + } else { + atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1) + } + + // start per-session updater + SystemTimedSched.Put(sess.update, time.Now()) + + currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1) + maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn) + if currestab > maxconn { + atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab) + } + + return sess +} + +// Read implements net.Conn +func (s *UDPSession) Read(b []byte) (n int, err error) { + for { + s.mu.Lock() + if len(s.bufptr) > 0 { // copy from buffer into b + n = copy(b, s.bufptr) + s.bufptr = s.bufptr[n:] + s.mu.Unlock() + atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n)) + return n, nil + } + + if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp + if len(b) >= size { // receive data into 'b' directly + s.kcp.Recv(b) + s.mu.Unlock() + atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size)) + return size, nil + } + + // if necessary resize the stream buffer to guarantee a sufficient buffer space + if cap(s.recvbuf) < size { + s.recvbuf = make([]byte, size) + } + + // resize the length of recvbuf to correspond to data size + s.recvbuf = s.recvbuf[:size] + s.kcp.Recv(s.recvbuf) + n = copy(b, s.recvbuf) // copy to 'b' + s.bufptr = s.recvbuf[n:] // pointer update + s.mu.Unlock() + atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n)) + return n, nil + } + + // deadline for current reading operation + var timeout *time.Timer + var c <-chan time.Time + if !s.rd.IsZero() { + if time.Now().After(s.rd) { + s.mu.Unlock() + return 0, errors.WithStack(errTimeout) + } + + delay := time.Until(s.rd) + timeout = time.NewTimer(delay) + c = timeout.C + } + s.mu.Unlock() + + // wait for read event or timeout or error + select { + case <-s.chReadEvent: + if timeout != nil { + timeout.Stop() + } + case <-c: + return 0, errors.WithStack(errTimeout) + case <-s.chSocketReadError: + return 0, s.socketReadError.Load().(error) + case <-s.die: + return 0, errors.WithStack(io.ErrClosedPipe) + } + } +} + +// Write implements net.Conn +func (s *UDPSession) Write(b []byte) (n int, err error) { return s.WriteBuffers([][]byte{b}) } + +// WriteBuffers write a vector of byte slices to the underlying connection +func (s *UDPSession) WriteBuffers(v [][]byte) (n int, err error) { + for { + select { + case <-s.chSocketWriteError: + return 0, s.socketWriteError.Load().(error) + case <-s.die: + return 0, errors.WithStack(io.ErrClosedPipe) + default: + } + + s.mu.Lock() + + // make sure write do not overflow the max sliding window on both side + waitsnd := s.kcp.WaitSnd() + if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) { + for _, b := range v { + n += len(b) + for { + if len(b) <= int(s.kcp.mss) { + s.kcp.Send(b) + break + } else { + s.kcp.Send(b[:s.kcp.mss]) + b = b[s.kcp.mss:] + } + } + } + + waitsnd = s.kcp.WaitSnd() + if waitsnd >= int(s.kcp.snd_wnd) || waitsnd >= int(s.kcp.rmt_wnd) || !s.writeDelay { + s.kcp.flush(false) + s.uncork() + } + s.mu.Unlock() + atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n)) + return n, nil + } + + var timeout *time.Timer + var c <-chan time.Time + if !s.wd.IsZero() { + if time.Now().After(s.wd) { + s.mu.Unlock() + return 0, errors.WithStack(errTimeout) + } + delay := time.Until(s.wd) + timeout = time.NewTimer(delay) + c = timeout.C + } + s.mu.Unlock() + + select { + case <-s.chWriteEvent: + if timeout != nil { + timeout.Stop() + } + case <-c: + return 0, errors.WithStack(errTimeout) + case <-s.chSocketWriteError: + return 0, s.socketWriteError.Load().(error) + case <-s.die: + return 0, errors.WithStack(io.ErrClosedPipe) + } + } +} + +// uncork sends data in txqueue if there is any +func (s *UDPSession) uncork() { + if len(s.txqueue) > 0 { + s.tx(s.txqueue) + // recycle + for k := range s.txqueue { + xmitBuf.Put(s.txqueue[k].Buffers[0]) + s.txqueue[k].Buffers = nil + } + s.txqueue = s.txqueue[:0] + } +} + +// Close closes the connection. +func (s *UDPSession) Close() error { + var once bool + s.dieOnce.Do(func() { + close(s.die) + once = true + }) + + if once { + atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0)) + + // try best to send all queued messages + s.mu.Lock() + s.kcp.flush(false) + s.uncork() + // release pending segments + s.kcp.ReleaseTX() + if s.fecDecoder != nil { + s.fecDecoder.release() + } + s.mu.Unlock() + + if s.l != nil { // belongs to listener + s.l.closeSession(s.remote) + return nil + } else if s.ownConn { // client socket close + return s.conn.Close() + } else { + return nil + } + } else { + return errors.WithStack(io.ErrClosedPipe) + } +} + +// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it. +func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() } + +// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it. +func (s *UDPSession) RemoteAddr() net.Addr { return s.remote } + +// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline. +func (s *UDPSession) SetDeadline(t time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.rd = t + s.wd = t + s.notifyReadEvent() + s.notifyWriteEvent() + return nil +} + +// SetReadDeadline implements the Conn SetReadDeadline method. +func (s *UDPSession) SetReadDeadline(t time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.rd = t + s.notifyReadEvent() + return nil +} + +// SetWriteDeadline implements the Conn SetWriteDeadline method. +func (s *UDPSession) SetWriteDeadline(t time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.wd = t + s.notifyWriteEvent() + return nil +} + +// SetWriteDelay delays write for bulk transfer until the next update interval +func (s *UDPSession) SetWriteDelay(delay bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.writeDelay = delay +} + +// SetWindowSize set maximum window size +func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) { + s.mu.Lock() + defer s.mu.Unlock() + s.kcp.WndSize(sndwnd, rcvwnd) +} + +// SetMtu sets the maximum transmission unit(not including UDP header) +func (s *UDPSession) SetMtu(mtu int) bool { + if mtu > mtuLimit { + return false + } + + s.mu.Lock() + defer s.mu.Unlock() + s.kcp.SetMtu(mtu) + return true +} + +// SetStreamMode toggles the stream mode on/off +func (s *UDPSession) SetStreamMode(enable bool) { + s.mu.Lock() + defer s.mu.Unlock() + if enable { + s.kcp.stream = 1 + } else { + s.kcp.stream = 0 + } +} + +// SetACKNoDelay changes ack flush option, set true to flush ack immediately, +func (s *UDPSession) SetACKNoDelay(nodelay bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.ackNoDelay = nodelay +} + +// (deprecated) +// +// SetDUP duplicates udp packets for kcp output. +func (s *UDPSession) SetDUP(dup int) { + s.mu.Lock() + defer s.mu.Unlock() + s.dup = dup +} + +// SetNoDelay calls nodelay() of kcp +// https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration +func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) { + s.mu.Lock() + defer s.mu.Unlock() + s.kcp.NoDelay(nodelay, interval, resend, nc) +} + +// SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header. +// +// if the underlying connection has implemented `func SetDSCP(int) error`, SetDSCP() will invoke +// this function instead. +// +// It has no effect if it's accepted from Listener. +func (s *UDPSession) SetDSCP(dscp int) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.l != nil { + return errInvalidOperation + } + + // interface enabled + if ts, ok := s.conn.(setDSCP); ok { + return ts.SetDSCP(dscp) + } + + if nc, ok := s.conn.(net.Conn); ok { + var succeed bool + if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil { + succeed = true + } + if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil { + succeed = true + } + + if succeed { + return nil + } + } + return errInvalidOperation +} + +// SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener +func (s *UDPSession) SetReadBuffer(bytes int) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.l == nil { + if nc, ok := s.conn.(setReadBuffer); ok { + return nc.SetReadBuffer(bytes) + } + } + return errInvalidOperation +} + +// SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener +func (s *UDPSession) SetWriteBuffer(bytes int) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.l == nil { + if nc, ok := s.conn.(setWriteBuffer); ok { + return nc.SetWriteBuffer(bytes) + } + } + return errInvalidOperation +} + +// post-processing for sending a packet from kcp core +// steps: +// 1. FEC packet generation +// 2. CRC32 integrity +// 3. Encryption +// 4. TxQueue +func (s *UDPSession) output(buf []byte) { + var ecc [][]byte + + // 1. FEC encoding + if s.fecEncoder != nil { + ecc = s.fecEncoder.encode(buf) + } + + // 2&3. crc32 & encryption + if s.block != nil { + s.nonce.Fill(buf[:nonceSize]) + checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:]) + binary.LittleEndian.PutUint32(buf[nonceSize:], checksum) + s.block.Encrypt(buf, buf) + + for k := range ecc { + s.nonce.Fill(ecc[k][:nonceSize]) + checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:]) + binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum) + s.block.Encrypt(ecc[k], ecc[k]) + } + } + + // 4. TxQueue + var msg ipv4.Message + for i := 0; i < s.dup+1; i++ { + bts := xmitBuf.Get().([]byte)[:len(buf)] + copy(bts, buf) + msg.Buffers = [][]byte{bts} + msg.Addr = s.remote + s.txqueue = append(s.txqueue, msg) + } + + for k := range ecc { + bts := xmitBuf.Get().([]byte)[:len(ecc[k])] + copy(bts, ecc[k]) + msg.Buffers = [][]byte{bts} + msg.Addr = s.remote + s.txqueue = append(s.txqueue, msg) + } +} + +// sess update to trigger protocol +func (s *UDPSession) update() { + select { + case <-s.die: + default: + s.mu.Lock() + interval := s.kcp.flush(false) + waitsnd := s.kcp.WaitSnd() + if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) { + s.notifyWriteEvent() + } + s.uncork() + s.mu.Unlock() + // self-synchronized timed scheduling + SystemTimedSched.Put(s.update, time.Now().Add(time.Duration(interval)*time.Millisecond)) + } +} + +// GetConv gets conversation id of a session +func (s *UDPSession) GetConv() uint32 { return s.kcp.conv } + +// GetRTO gets current rto of the session +func (s *UDPSession) GetRTO() uint32 { + s.mu.Lock() + defer s.mu.Unlock() + return s.kcp.rx_rto +} + +// GetSRTT gets current srtt of the session +func (s *UDPSession) GetSRTT() int32 { + s.mu.Lock() + defer s.mu.Unlock() + return s.kcp.rx_srtt +} + +// GetRTTVar gets current rtt variance of the session +func (s *UDPSession) GetSRTTVar() int32 { + s.mu.Lock() + defer s.mu.Unlock() + return s.kcp.rx_rttvar +} + +func (s *UDPSession) notifyReadEvent() { + select { + case s.chReadEvent <- struct{}{}: + default: + } +} + +func (s *UDPSession) notifyWriteEvent() { + select { + case s.chWriteEvent <- struct{}{}: + default: + } +} + +func (s *UDPSession) notifyReadError(err error) { + s.socketReadErrorOnce.Do(func() { + s.socketReadError.Store(err) + close(s.chSocketReadError) + }) +} + +func (s *UDPSession) notifyWriteError(err error) { + s.socketWriteErrorOnce.Do(func() { + s.socketWriteError.Store(err) + close(s.chSocketWriteError) + }) +} + +// packet input stage +func (s *UDPSession) packetInput(data []byte) { + decrypted := false + if s.block != nil && len(data) >= cryptHeaderSize { + s.block.Decrypt(data, data) + data = data[nonceSize:] + checksum := crc32.ChecksumIEEE(data[crcSize:]) + if checksum == binary.LittleEndian.Uint32(data) { + data = data[crcSize:] + decrypted = true + } else { + atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1) + } + } else if s.block == nil { + decrypted = true + } + + if decrypted && len(data) >= IKCP_OVERHEAD { + s.kcpInput(data) + } +} + +func (s *UDPSession) kcpInput(data []byte) { + var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64 + + fecFlag := binary.LittleEndian.Uint16(data[4:]) + if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2 + if len(data) >= fecHeaderSizePlus2 { + f := fecPacket(data) + if f.flag() == typeParity { + fecParityShards++ + } + + // lock + s.mu.Lock() + // if fecDecoder is not initialized, create one with default parameter + if s.fecDecoder == nil { + s.fecDecoder = newFECDecoder(1, 1) + } + recovers := s.fecDecoder.decode(f) + if f.flag() == typeData { + if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 { + kcpInErrors++ + } + } + + for _, r := range recovers { + if len(r) >= 2 { // must be larger than 2bytes + sz := binary.LittleEndian.Uint16(r) + if int(sz) <= len(r) && sz >= 2 { + if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 { + fecRecovered++ + } else { + kcpInErrors++ + } + } else { + fecErrs++ + } + } else { + fecErrs++ + } + // recycle the recovers + xmitBuf.Put(r) + } + + // to notify the readers to receive the data + if n := s.kcp.PeekSize(); n > 0 { + s.notifyReadEvent() + } + // to notify the writers + waitsnd := s.kcp.WaitSnd() + if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) { + s.notifyWriteEvent() + } + + s.uncork() + s.mu.Unlock() + } else { + atomic.AddUint64(&DefaultSnmp.InErrs, 1) + } + } else { + s.mu.Lock() + if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 { + kcpInErrors++ + } + if n := s.kcp.PeekSize(); n > 0 { + s.notifyReadEvent() + } + waitsnd := s.kcp.WaitSnd() + if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) { + s.notifyWriteEvent() + } + s.uncork() + s.mu.Unlock() + } + + atomic.AddUint64(&DefaultSnmp.InPkts, 1) + atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data))) + if fecParityShards > 0 { + atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards) + } + if kcpInErrors > 0 { + atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors) + } + if fecErrs > 0 { + atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs) + } + if fecRecovered > 0 { + atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered) + } + +} + +type ( + // Listener defines a server which will be waiting to accept incoming connections + Listener struct { + block BlockCrypt // block encryption + dataShards int // FEC data shard + parityShards int // FEC parity shard + conn net.PacketConn // the underlying packet connection + ownConn bool // true if we created conn internally, false if provided by caller + + sessions map[string]*UDPSession // all sessions accepted by this Listener + sessionLock sync.RWMutex + chAccepts chan *UDPSession // Listen() backlog + chSessionClosed chan net.Addr // session close queue + + die chan struct{} // notify the listener has closed + dieOnce sync.Once + + // socket error handling + socketReadError atomic.Value + chSocketReadError chan struct{} + socketReadErrorOnce sync.Once + + rd atomic.Value // read deadline for Accept() + } +) + +// packet input stage +func (l *Listener) packetInput(data []byte, addr net.Addr) { + decrypted := false + if l.block != nil && len(data) >= cryptHeaderSize { + l.block.Decrypt(data, data) + data = data[nonceSize:] + checksum := crc32.ChecksumIEEE(data[crcSize:]) + if checksum == binary.LittleEndian.Uint32(data) { + data = data[crcSize:] + decrypted = true + } else { + atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1) + } + } else if l.block == nil { + decrypted = true + } + + if decrypted && len(data) >= IKCP_OVERHEAD { + l.sessionLock.RLock() + s, ok := l.sessions[addr.String()] + l.sessionLock.RUnlock() + + var conv, sn uint32 + convRecovered := false + fecFlag := binary.LittleEndian.Uint16(data[4:]) + if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2 + // packet with FEC + if fecFlag == typeData && len(data) >= fecHeaderSizePlus2+IKCP_OVERHEAD { + conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:]) + sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:]) + convRecovered = true + } + } else { + // packet without FEC + conv = binary.LittleEndian.Uint32(data) + sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:]) + convRecovered = true + } + + if ok { // existing connection + if !convRecovered || conv == s.kcp.conv { // parity data or valid conversation + s.kcpInput(data) + } else if sn == 0 { // should replace current connection + s.Close() + s = nil + } + } + + if s == nil && convRecovered { // new session + if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue + s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, false, addr, l.block) + s.kcpInput(data) + l.sessionLock.Lock() + l.sessions[addr.String()] = s + l.sessionLock.Unlock() + l.chAccepts <- s + } + } + } +} + +func (l *Listener) notifyReadError(err error) { + l.socketReadErrorOnce.Do(func() { + l.socketReadError.Store(err) + close(l.chSocketReadError) + + // propagate read error to all sessions + l.sessionLock.RLock() + for _, s := range l.sessions { + s.notifyReadError(err) + } + l.sessionLock.RUnlock() + }) +} + +// SetReadBuffer sets the socket read buffer for the Listener +func (l *Listener) SetReadBuffer(bytes int) error { + if nc, ok := l.conn.(setReadBuffer); ok { + return nc.SetReadBuffer(bytes) + } + return errInvalidOperation +} + +// SetWriteBuffer sets the socket write buffer for the Listener +func (l *Listener) SetWriteBuffer(bytes int) error { + if nc, ok := l.conn.(setWriteBuffer); ok { + return nc.SetWriteBuffer(bytes) + } + return errInvalidOperation +} + +// SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header. +// +// if the underlying connection has implemented `func SetDSCP(int) error`, SetDSCP() will invoke +// this function instead. +func (l *Listener) SetDSCP(dscp int) error { + // interface enabled + if ts, ok := l.conn.(setDSCP); ok { + return ts.SetDSCP(dscp) + } + + if nc, ok := l.conn.(net.Conn); ok { + var succeed bool + if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil { + succeed = true + } + if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil { + succeed = true + } + + if succeed { + return nil + } + } + return errInvalidOperation +} + +// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn. +func (l *Listener) Accept() (net.Conn, error) { + return l.AcceptKCP() +} + +// AcceptKCP accepts a KCP connection +func (l *Listener) AcceptKCP() (*UDPSession, error) { + var timeout <-chan time.Time + if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() { + timeout = time.After(time.Until(tdeadline)) + } + + select { + case <-timeout: + return nil, errors.WithStack(errTimeout) + case c := <-l.chAccepts: + return c, nil + case <-l.chSocketReadError: + return nil, l.socketReadError.Load().(error) + case <-l.die: + return nil, errors.WithStack(io.ErrClosedPipe) + } +} + +// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline. +func (l *Listener) SetDeadline(t time.Time) error { + l.SetReadDeadline(t) + l.SetWriteDeadline(t) + return nil +} + +// SetReadDeadline implements the Conn SetReadDeadline method. +func (l *Listener) SetReadDeadline(t time.Time) error { + l.rd.Store(t) + return nil +} + +// SetWriteDeadline implements the Conn SetWriteDeadline method. +func (l *Listener) SetWriteDeadline(t time.Time) error { return errInvalidOperation } + +// Close stops listening on the UDP address, and closes the socket +func (l *Listener) Close() error { + var once bool + l.dieOnce.Do(func() { + close(l.die) + once = true + }) + + var err error + if once { + if l.ownConn { + err = l.conn.Close() + } + } else { + err = errors.WithStack(io.ErrClosedPipe) + } + return err +} + +// closeSession notify the listener that a session has closed +func (l *Listener) closeSession(remote net.Addr) (ret bool) { + l.sessionLock.Lock() + defer l.sessionLock.Unlock() + if _, ok := l.sessions[remote.String()]; ok { + delete(l.sessions, remote.String()) + return true + } + return false +} + +// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it. +func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() } + +// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp", +func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) } + +// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption. +// +// 'block' is the block encryption algorithm to encrypt packets. +// +// 'dataShards', 'parityShards' specify how many parity packets will be generated following the data packets. +// +// Check https://github.com/klauspost/reedsolomon for details +func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) { + udpaddr, err := net.ResolveUDPAddr("udp", laddr) + if err != nil { + return nil, errors.WithStack(err) + } + conn, err := net.ListenUDP("udp", udpaddr) + if err != nil { + return nil, errors.WithStack(err) + } + + return serveConn(block, dataShards, parityShards, conn, true) +} + +// ServeConn serves KCP protocol for a single packet connection. +func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) { + return serveConn(block, dataShards, parityShards, conn, false) +} + +func serveConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn, ownConn bool) (*Listener, error) { + l := new(Listener) + l.conn = conn + l.ownConn = ownConn + l.sessions = make(map[string]*UDPSession) + l.chAccepts = make(chan *UDPSession, acceptBacklog) + l.chSessionClosed = make(chan net.Addr) + l.die = make(chan struct{}) + l.dataShards = dataShards + l.parityShards = parityShards + l.block = block + l.chSocketReadError = make(chan struct{}) + go l.monitor() + return l, nil +} + +// Dial connects to the remote address "raddr" on the network "udp" without encryption and FEC +func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) } + +// DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption +// +// 'block' is the block encryption algorithm to encrypt packets. +// +// 'dataShards', 'parityShards' specify how many parity packets will be generated following the data packets. +// +// Check https://github.com/klauspost/reedsolomon for details +func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) { + // network type detection + udpaddr, err := net.ResolveUDPAddr("udp", raddr) + if err != nil { + return nil, errors.WithStack(err) + } + network := "udp4" + if udpaddr.IP.To4() == nil { + network = "udp" + } + + conn, err := net.ListenUDP(network, nil) + if err != nil { + return nil, errors.WithStack(err) + } + + var convid uint32 + binary.Read(rand.Reader, binary.LittleEndian, &convid) + return newUDPSession(convid, dataShards, parityShards, nil, conn, true, udpaddr, block), nil +} + +// NewConn3 establishes a session and talks KCP protocol over a packet connection. +func NewConn3(convid uint32, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) { + return newUDPSession(convid, dataShards, parityShards, nil, conn, false, raddr, block), nil +} + +// NewConn2 establishes a session and talks KCP protocol over a packet connection. +func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) { + var convid uint32 + binary.Read(rand.Reader, binary.LittleEndian, &convid) + return NewConn3(convid, raddr, block, dataShards, parityShards, conn) +} + +// NewConn establishes a session and talks KCP protocol over a packet connection. +func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) { + udpaddr, err := net.ResolveUDPAddr("udp", raddr) + if err != nil { + return nil, errors.WithStack(err) + } + return NewConn2(udpaddr, block, dataShards, parityShards, conn) +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/snmp.go b/vendor/github.com/xtaci/kcp-go/v5/snmp.go new file mode 100644 index 0000000..f961810 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/snmp.go @@ -0,0 +1,164 @@ +package kcp + +import ( + "fmt" + "sync/atomic" +) + +// Snmp defines network statistics indicator +type Snmp struct { + BytesSent uint64 // bytes sent from upper level + BytesReceived uint64 // bytes received to upper level + MaxConn uint64 // max number of connections ever reached + ActiveOpens uint64 // accumulated active open connections + PassiveOpens uint64 // accumulated passive open connections + CurrEstab uint64 // current number of established connections + InErrs uint64 // UDP read errors reported from net.PacketConn + InCsumErrors uint64 // checksum errors from CRC32 + KCPInErrors uint64 // packet iput errors reported from KCP + InPkts uint64 // incoming packets count + OutPkts uint64 // outgoing packets count + InSegs uint64 // incoming KCP segments + OutSegs uint64 // outgoing KCP segments + InBytes uint64 // UDP bytes received + OutBytes uint64 // UDP bytes sent + RetransSegs uint64 // accmulated retransmited segments + FastRetransSegs uint64 // accmulated fast retransmitted segments + EarlyRetransSegs uint64 // accmulated early retransmitted segments + LostSegs uint64 // number of segs inferred as lost + RepeatSegs uint64 // number of segs duplicated + FECRecovered uint64 // correct packets recovered from FEC + FECErrs uint64 // incorrect packets recovered from FEC + FECParityShards uint64 // FEC segments received + FECShortShards uint64 // number of data shards that's not enough for recovery +} + +func newSnmp() *Snmp { + return new(Snmp) +} + +// Header returns all field names +func (s *Snmp) Header() []string { + return []string{ + "BytesSent", + "BytesReceived", + "MaxConn", + "ActiveOpens", + "PassiveOpens", + "CurrEstab", + "InErrs", + "InCsumErrors", + "KCPInErrors", + "InPkts", + "OutPkts", + "InSegs", + "OutSegs", + "InBytes", + "OutBytes", + "RetransSegs", + "FastRetransSegs", + "EarlyRetransSegs", + "LostSegs", + "RepeatSegs", + "FECParityShards", + "FECErrs", + "FECRecovered", + "FECShortShards", + } +} + +// ToSlice returns current snmp info as slice +func (s *Snmp) ToSlice() []string { + snmp := s.Copy() + return []string{ + fmt.Sprint(snmp.BytesSent), + fmt.Sprint(snmp.BytesReceived), + fmt.Sprint(snmp.MaxConn), + fmt.Sprint(snmp.ActiveOpens), + fmt.Sprint(snmp.PassiveOpens), + fmt.Sprint(snmp.CurrEstab), + fmt.Sprint(snmp.InErrs), + fmt.Sprint(snmp.InCsumErrors), + fmt.Sprint(snmp.KCPInErrors), + fmt.Sprint(snmp.InPkts), + fmt.Sprint(snmp.OutPkts), + fmt.Sprint(snmp.InSegs), + fmt.Sprint(snmp.OutSegs), + fmt.Sprint(snmp.InBytes), + fmt.Sprint(snmp.OutBytes), + fmt.Sprint(snmp.RetransSegs), + fmt.Sprint(snmp.FastRetransSegs), + fmt.Sprint(snmp.EarlyRetransSegs), + fmt.Sprint(snmp.LostSegs), + fmt.Sprint(snmp.RepeatSegs), + fmt.Sprint(snmp.FECParityShards), + fmt.Sprint(snmp.FECErrs), + fmt.Sprint(snmp.FECRecovered), + fmt.Sprint(snmp.FECShortShards), + } +} + +// Copy make a copy of current snmp snapshot +func (s *Snmp) Copy() *Snmp { + d := newSnmp() + d.BytesSent = atomic.LoadUint64(&s.BytesSent) + d.BytesReceived = atomic.LoadUint64(&s.BytesReceived) + d.MaxConn = atomic.LoadUint64(&s.MaxConn) + d.ActiveOpens = atomic.LoadUint64(&s.ActiveOpens) + d.PassiveOpens = atomic.LoadUint64(&s.PassiveOpens) + d.CurrEstab = atomic.LoadUint64(&s.CurrEstab) + d.InErrs = atomic.LoadUint64(&s.InErrs) + d.InCsumErrors = atomic.LoadUint64(&s.InCsumErrors) + d.KCPInErrors = atomic.LoadUint64(&s.KCPInErrors) + d.InPkts = atomic.LoadUint64(&s.InPkts) + d.OutPkts = atomic.LoadUint64(&s.OutPkts) + d.InSegs = atomic.LoadUint64(&s.InSegs) + d.OutSegs = atomic.LoadUint64(&s.OutSegs) + d.InBytes = atomic.LoadUint64(&s.InBytes) + d.OutBytes = atomic.LoadUint64(&s.OutBytes) + d.RetransSegs = atomic.LoadUint64(&s.RetransSegs) + d.FastRetransSegs = atomic.LoadUint64(&s.FastRetransSegs) + d.EarlyRetransSegs = atomic.LoadUint64(&s.EarlyRetransSegs) + d.LostSegs = atomic.LoadUint64(&s.LostSegs) + d.RepeatSegs = atomic.LoadUint64(&s.RepeatSegs) + d.FECParityShards = atomic.LoadUint64(&s.FECParityShards) + d.FECErrs = atomic.LoadUint64(&s.FECErrs) + d.FECRecovered = atomic.LoadUint64(&s.FECRecovered) + d.FECShortShards = atomic.LoadUint64(&s.FECShortShards) + return d +} + +// Reset values to zero +func (s *Snmp) Reset() { + atomic.StoreUint64(&s.BytesSent, 0) + atomic.StoreUint64(&s.BytesReceived, 0) + atomic.StoreUint64(&s.MaxConn, 0) + atomic.StoreUint64(&s.ActiveOpens, 0) + atomic.StoreUint64(&s.PassiveOpens, 0) + atomic.StoreUint64(&s.CurrEstab, 0) + atomic.StoreUint64(&s.InErrs, 0) + atomic.StoreUint64(&s.InCsumErrors, 0) + atomic.StoreUint64(&s.KCPInErrors, 0) + atomic.StoreUint64(&s.InPkts, 0) + atomic.StoreUint64(&s.OutPkts, 0) + atomic.StoreUint64(&s.InSegs, 0) + atomic.StoreUint64(&s.OutSegs, 0) + atomic.StoreUint64(&s.InBytes, 0) + atomic.StoreUint64(&s.OutBytes, 0) + atomic.StoreUint64(&s.RetransSegs, 0) + atomic.StoreUint64(&s.FastRetransSegs, 0) + atomic.StoreUint64(&s.EarlyRetransSegs, 0) + atomic.StoreUint64(&s.LostSegs, 0) + atomic.StoreUint64(&s.RepeatSegs, 0) + atomic.StoreUint64(&s.FECParityShards, 0) + atomic.StoreUint64(&s.FECErrs, 0) + atomic.StoreUint64(&s.FECRecovered, 0) + atomic.StoreUint64(&s.FECShortShards, 0) +} + +// DefaultSnmp is the global KCP connection statistics collector +var DefaultSnmp *Snmp + +func init() { + DefaultSnmp = newSnmp() +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/timedsched.go b/vendor/github.com/xtaci/kcp-go/v5/timedsched.go new file mode 100644 index 0000000..2db7c20 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/timedsched.go @@ -0,0 +1,146 @@ +package kcp + +import ( + "container/heap" + "runtime" + "sync" + "time" +) + +// SystemTimedSched is the library level timed-scheduler +var SystemTimedSched *TimedSched = NewTimedSched(runtime.NumCPU()) + +type timedFunc struct { + execute func() + ts time.Time +} + +// a heap for sorted timed function +type timedFuncHeap []timedFunc + +func (h timedFuncHeap) Len() int { return len(h) } +func (h timedFuncHeap) Less(i, j int) bool { return h[i].ts.Before(h[j].ts) } +func (h timedFuncHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *timedFuncHeap) Push(x interface{}) { *h = append(*h, x.(timedFunc)) } +func (h *timedFuncHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + old[n-1].execute = nil // avoid memory leak + *h = old[0 : n-1] + return x +} + +// TimedSched represents the control struct for timed parallel scheduler +type TimedSched struct { + // prepending tasks + prependTasks []timedFunc + prependLock sync.Mutex + chPrependNotify chan struct{} + + // tasks will be distributed through chTask + chTask chan timedFunc + + dieOnce sync.Once + die chan struct{} +} + +// NewTimedSched creates a parallel-scheduler with given parallelization +func NewTimedSched(parallel int) *TimedSched { + ts := new(TimedSched) + ts.chTask = make(chan timedFunc) + ts.die = make(chan struct{}) + ts.chPrependNotify = make(chan struct{}, 1) + + for i := 0; i < parallel; i++ { + go ts.sched() + } + go ts.prepend() + return ts +} + +func (ts *TimedSched) sched() { + var tasks timedFuncHeap + timer := time.NewTimer(0) + drained := false + for { + select { + case task := <-ts.chTask: + now := time.Now() + if now.After(task.ts) { + // already delayed! execute immediately + task.execute() + } else { + heap.Push(&tasks, task) + // properly reset timer to trigger based on the top element + stopped := timer.Stop() + if !stopped && !drained { + <-timer.C + } + timer.Reset(tasks[0].ts.Sub(now)) + drained = false + } + case now := <-timer.C: + drained = true + for tasks.Len() > 0 { + if now.After(tasks[0].ts) { + heap.Pop(&tasks).(timedFunc).execute() + } else { + timer.Reset(tasks[0].ts.Sub(now)) + drained = false + break + } + } + case <-ts.die: + return + } + } +} + +func (ts *TimedSched) prepend() { + var tasks []timedFunc + for { + select { + case <-ts.chPrependNotify: + ts.prependLock.Lock() + // keep cap to reuse slice + if cap(tasks) < cap(ts.prependTasks) { + tasks = make([]timedFunc, 0, cap(ts.prependTasks)) + } + tasks = tasks[:len(ts.prependTasks)] + copy(tasks, ts.prependTasks) + for k := range ts.prependTasks { + ts.prependTasks[k].execute = nil // avoid memory leak + } + ts.prependTasks = ts.prependTasks[:0] + ts.prependLock.Unlock() + + for k := range tasks { + select { + case ts.chTask <- tasks[k]: + tasks[k].execute = nil // avoid memory leak + case <-ts.die: + return + } + } + tasks = tasks[:0] + case <-ts.die: + return + } + } +} + +// Put a function 'f' awaiting to be executed at 'deadline' +func (ts *TimedSched) Put(f func(), deadline time.Time) { + ts.prependLock.Lock() + ts.prependTasks = append(ts.prependTasks, timedFunc{f, deadline}) + ts.prependLock.Unlock() + + select { + case ts.chPrependNotify <- struct{}{}: + default: + } +} + +// Close terminates this scheduler +func (ts *TimedSched) Close() { ts.dieOnce.Do(func() { close(ts.die) }) } diff --git a/vendor/github.com/xtaci/kcp-go/v5/tx.go b/vendor/github.com/xtaci/kcp-go/v5/tx.go new file mode 100644 index 0000000..3397b82 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/tx.go @@ -0,0 +1,24 @@ +package kcp + +import ( + "sync/atomic" + + "github.com/pkg/errors" + "golang.org/x/net/ipv4" +) + +func (s *UDPSession) defaultTx(txqueue []ipv4.Message) { + nbytes := 0 + npkts := 0 + for k := range txqueue { + if n, err := s.conn.WriteTo(txqueue[k].Buffers[0], txqueue[k].Addr); err == nil { + nbytes += n + npkts++ + } else { + s.notifyWriteError(errors.WithStack(err)) + break + } + } + atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts)) + atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes)) +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/tx_generic.go b/vendor/github.com/xtaci/kcp-go/v5/tx_generic.go new file mode 100644 index 0000000..0b4f349 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/tx_generic.go @@ -0,0 +1,11 @@ +// +build !linux + +package kcp + +import ( + "golang.org/x/net/ipv4" +) + +func (s *UDPSession) tx(txqueue []ipv4.Message) { + s.defaultTx(txqueue) +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/tx_linux.go b/vendor/github.com/xtaci/kcp-go/v5/tx_linux.go new file mode 100644 index 0000000..4f19df5 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/tx_linux.go @@ -0,0 +1,51 @@ +// +build linux + +package kcp + +import ( + "net" + "os" + "sync/atomic" + + "github.com/pkg/errors" + "golang.org/x/net/ipv4" +) + +func (s *UDPSession) tx(txqueue []ipv4.Message) { + // default version + if s.xconn == nil || s.xconnWriteError != nil { + s.defaultTx(txqueue) + return + } + + // x/net version + nbytes := 0 + npkts := 0 + for len(txqueue) > 0 { + if n, err := s.xconn.WriteBatch(txqueue, 0); err == nil { + for k := range txqueue[:n] { + nbytes += len(txqueue[k].Buffers[0]) + } + npkts += n + txqueue = txqueue[n:] + } else { + // compatibility issue: + // for linux kernel<=2.6.32, support for sendmmsg is not available + // an error of type os.SyscallError will be returned + if operr, ok := err.(*net.OpError); ok { + if se, ok := operr.Err.(*os.SyscallError); ok { + if se.Syscall == "sendmmsg" { + s.xconnWriteError = se + s.defaultTx(txqueue) + return + } + } + } + s.notifyWriteError(errors.WithStack(err)) + break + } + } + + atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts)) + atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes)) +} diff --git a/vendor/github.com/xtaci/kcp-go/v5/wechat_donate.jpg b/vendor/github.com/xtaci/kcp-go/v5/wechat_donate.jpg Binary files differnew file mode 100644 index 0000000..ad72505 --- /dev/null +++ b/vendor/github.com/xtaci/kcp-go/v5/wechat_donate.jpg diff --git a/vendor/github.com/xtaci/smux/.gitignore b/vendor/github.com/xtaci/smux/.gitignore new file mode 100644 index 0000000..daf913b --- /dev/null +++ b/vendor/github.com/xtaci/smux/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/xtaci/smux/.travis.yml b/vendor/github.com/xtaci/smux/.travis.yml new file mode 100644 index 0000000..e1d30fa --- /dev/null +++ b/vendor/github.com/xtaci/smux/.travis.yml @@ -0,0 +1,17 @@ +language: go +go: + - 1.9.x + - 1.10.x + - 1.11.x + +before_install: + - go get -t -v ./... + +install: + - go get github.com/xtaci/smux + +script: + - go test -coverprofile=coverage.txt -covermode=atomic -bench . + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/xtaci/smux/LICENSE b/vendor/github.com/xtaci/smux/LICENSE new file mode 100644 index 0000000..eed41ac --- /dev/null +++ b/vendor/github.com/xtaci/smux/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016-2017 Daniel Fu + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/xtaci/smux/README.md b/vendor/github.com/xtaci/smux/README.md new file mode 100644 index 0000000..a5c4680 --- /dev/null +++ b/vendor/github.com/xtaci/smux/README.md @@ -0,0 +1,136 @@ +<img src="smux.png" alt="smux" height="35px" /> + +[![GoDoc][1]][2] [![MIT licensed][3]][4] [![Build Status][5]][6] [![Go Report Card][7]][8] [![Coverage Statusd][9]][10] [![Sourcegraph][11]][12] + +<img src="mux.jpg" alt="smux" height="120px" /> + +[1]: https://godoc.org/github.com/xtaci/smux?status.svg +[2]: https://godoc.org/github.com/xtaci/smux +[3]: https://img.shields.io/badge/license-MIT-blue.svg +[4]: LICENSE +[5]: https://travis-ci.org/xtaci/smux.svg?branch=master +[6]: https://travis-ci.org/xtaci/smux +[7]: https://goreportcard.com/badge/github.com/xtaci/smux +[8]: https://goreportcard.com/report/github.com/xtaci/smux +[9]: https://codecov.io/gh/xtaci/smux/branch/master/graph/badge.svg +[10]: https://codecov.io/gh/xtaci/smux +[11]: https://sourcegraph.com/github.com/xtaci/smux/-/badge.svg +[12]: https://sourcegraph.com/github.com/xtaci/smux?badge + +## Introduction + +Smux ( **S**imple **MU**ltiple**X**ing) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or [KCP](https://github.com/xtaci/kcp-go), and provides stream-oriented multiplexing. The original intention of this library is to power the connection management for [kcp-go](https://github.com/xtaci/kcp-go). + +## Features + +1. ***Token bucket*** controlled receiving, which provides smoother bandwidth graph(see picture below). +2. Session-wide receive buffer, shared among streams, **fully controlled** overall memory usage. +3. Minimized header(8Bytes), maximized payload. +4. Well-tested on millions of devices in [kcptun](https://github.com/xtaci/kcptun). +5. Builtin fair queue traffic shaping. +6. Per-stream sliding window to control congestion.(protocol version 2+). + +![smooth bandwidth curve](curve.jpg) + +## Documentation + +For complete documentation, see the associated [Godoc](https://godoc.org/github.com/xtaci/smux). + +## Benchmark +``` +$ go test -v -run=^$ -bench . +goos: darwin +goarch: amd64 +pkg: github.com/xtaci/smux +BenchmarkMSB-4 30000000 51.8 ns/op +BenchmarkAcceptClose-4 50000 36783 ns/op +BenchmarkConnSmux-4 30000 58335 ns/op 2246.88 MB/s 1208 B/op 19 allocs/op +BenchmarkConnTCP-4 50000 25579 ns/op 5124.04 MB/s 0 B/op 0 allocs/op +PASS +ok github.com/xtaci/smux 7.811s +``` + +## Specification + +``` +VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH) + +VALUES FOR LATEST VERSION: +VERSION: + 1/2 + +CMD: + cmdSYN(0) + cmdFIN(1) + cmdPSH(2) + cmdNOP(3) + cmdUPD(4) // only supported on version 2 + +STREAMID: + client use odd numbers starts from 1 + server use even numbers starts from 0 + +cmdUPD: + | CONSUMED(4B) | WINDOW(4B) | +``` + +## Usage + +```go + +func client() { + // Get a TCP connection + conn, err := net.Dial(...) + if err != nil { + panic(err) + } + + // Setup client side of smux + session, err := smux.Client(conn, nil) + if err != nil { + panic(err) + } + + // Open a new stream + stream, err := session.OpenStream() + if err != nil { + panic(err) + } + + // Stream implements io.ReadWriteCloser + stream.Write([]byte("ping")) + stream.Close() + session.Close() +} + +func server() { + // Accept a TCP connection + conn, err := listener.Accept() + if err != nil { + panic(err) + } + + // Setup server side of smux + session, err := smux.Server(conn, nil) + if err != nil { + panic(err) + } + + // Accept a stream + stream, err := session.AcceptStream() + if err != nil { + panic(err) + } + + // Listen for a message + buf := make([]byte, 4) + stream.Read(buf) + stream.Close() + session.Close() +} + +``` + +## Status + +Stable diff --git a/vendor/github.com/xtaci/smux/alloc.go b/vendor/github.com/xtaci/smux/alloc.go new file mode 100644 index 0000000..9e3acf3 --- /dev/null +++ b/vendor/github.com/xtaci/smux/alloc.go @@ -0,0 +1,72 @@ +package smux + +import ( + "errors" + "sync" +) + +var ( + defaultAllocator *Allocator + debruijinPos = [...]byte{0, 9, 1, 10, 13, 21, 2, 29, 11, 14, 16, 18, 22, 25, 3, 30, 8, 12, 20, 28, 15, 17, 24, 7, 19, 27, 23, 6, 26, 5, 4, 31} +) + +func init() { + defaultAllocator = NewAllocator() +} + +// Allocator for incoming frames, optimized to prevent overwriting after zeroing +type Allocator struct { + buffers []sync.Pool +} + +// NewAllocator initiates a []byte allocator for frames less than 65536 bytes, +// the waste(memory fragmentation) of space allocation is guaranteed to be +// no more than 50%. +func NewAllocator() *Allocator { + alloc := new(Allocator) + alloc.buffers = make([]sync.Pool, 17) // 1B -> 64K + for k := range alloc.buffers { + i := k + alloc.buffers[k].New = func() interface{} { + return make([]byte, 1<<uint32(i)) + } + } + return alloc +} + +// Get a []byte from pool with most appropriate cap +func (alloc *Allocator) Get(size int) []byte { + if size <= 0 || size > 65536 { + return nil + } + + bits := msb(size) + if size == 1<<bits { + return alloc.buffers[bits].Get().([]byte)[:size] + } else { + return alloc.buffers[bits+1].Get().([]byte)[:size] + } +} + +// Put returns a []byte to pool for future use, +// which the cap must be exactly 2^n +func (alloc *Allocator) Put(buf []byte) error { + bits := msb(cap(buf)) + if cap(buf) == 0 || cap(buf) > 65536 || cap(buf) != 1<<bits { + return errors.New("allocator Put() incorrect buffer size") + } + alloc.buffers[bits].Put(buf) + return nil +} + +// msb return the pos of most significiant bit +// http://supertech.csail.mit.edu/papers/debruijn.pdf +func msb(size int) byte { + v := uint32(size) + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + return debruijinPos[(v*0x07C4ACDD)>>27] +} diff --git a/vendor/github.com/xtaci/smux/curve.jpg b/vendor/github.com/xtaci/smux/curve.jpg Binary files differnew file mode 100644 index 0000000..3fc4863 --- /dev/null +++ b/vendor/github.com/xtaci/smux/curve.jpg diff --git a/vendor/github.com/xtaci/smux/frame.go b/vendor/github.com/xtaci/smux/frame.go new file mode 100644 index 0000000..467a058 --- /dev/null +++ b/vendor/github.com/xtaci/smux/frame.go @@ -0,0 +1,81 @@ +package smux + +import ( + "encoding/binary" + "fmt" +) + +const ( // cmds + // protocol version 1: + cmdSYN byte = iota // stream open + cmdFIN // stream close, a.k.a EOF mark + cmdPSH // data push + cmdNOP // no operation + + // protocol version 2 extra commands + // notify bytes consumed by remote peer-end + cmdUPD +) + +const ( + // data size of cmdUPD, format: + // |4B data consumed(ACK)| 4B window size(WINDOW) | + szCmdUPD = 8 +) + +const ( + // initial peer window guess, a slow-start + initialPeerWindow = 262144 +) + +const ( + sizeOfVer = 1 + sizeOfCmd = 1 + sizeOfLength = 2 + sizeOfSid = 4 + headerSize = sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength +) + +// Frame defines a packet from or to be multiplexed into a single connection +type Frame struct { + ver byte + cmd byte + sid uint32 + data []byte +} + +func newFrame(version byte, cmd byte, sid uint32) Frame { + return Frame{ver: version, cmd: cmd, sid: sid} +} + +type rawHeader [headerSize]byte + +func (h rawHeader) Version() byte { + return h[0] +} + +func (h rawHeader) Cmd() byte { + return h[1] +} + +func (h rawHeader) Length() uint16 { + return binary.LittleEndian.Uint16(h[2:]) +} + +func (h rawHeader) StreamID() uint32 { + return binary.LittleEndian.Uint32(h[4:]) +} + +func (h rawHeader) String() string { + return fmt.Sprintf("Version:%d Cmd:%d StreamID:%d Length:%d", + h.Version(), h.Cmd(), h.StreamID(), h.Length()) +} + +type updHeader [szCmdUPD]byte + +func (h updHeader) Consumed() uint32 { + return binary.LittleEndian.Uint32(h[:]) +} +func (h updHeader) Window() uint32 { + return binary.LittleEndian.Uint32(h[4:]) +} diff --git a/vendor/github.com/xtaci/smux/go.mod b/vendor/github.com/xtaci/smux/go.mod new file mode 100644 index 0000000..9ddead6 --- /dev/null +++ b/vendor/github.com/xtaci/smux/go.mod @@ -0,0 +1,3 @@ +module github.com/xtaci/smux + +go 1.13 diff --git a/vendor/github.com/xtaci/smux/go.sum b/vendor/github.com/xtaci/smux/go.sum new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/vendor/github.com/xtaci/smux/go.sum diff --git a/vendor/github.com/xtaci/smux/mux.go b/vendor/github.com/xtaci/smux/mux.go new file mode 100644 index 0000000..c0b8ab8 --- /dev/null +++ b/vendor/github.com/xtaci/smux/mux.go @@ -0,0 +1,110 @@ +// Package smux is a multiplexing library for Golang. +// +// It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, +// and provides stream-oriented multiplexing over a single channel. +package smux + +import ( + "errors" + "fmt" + "io" + "math" + "time" +) + +// Config is used to tune the Smux session +type Config struct { + // SMUX Protocol version, support 1,2 + Version int + + // Disabled keepalive + KeepAliveDisabled bool + + // KeepAliveInterval is how often to send a NOP command to the remote + KeepAliveInterval time.Duration + + // KeepAliveTimeout is how long the session + // will be closed if no data has arrived + KeepAliveTimeout time.Duration + + // MaxFrameSize is used to control the maximum + // frame size to sent to the remote + MaxFrameSize int + + // MaxReceiveBuffer is used to control the maximum + // number of data in the buffer pool + MaxReceiveBuffer int + + // MaxStreamBuffer is used to control the maximum + // number of data per stream + MaxStreamBuffer int +} + +// DefaultConfig is used to return a default configuration +func DefaultConfig() *Config { + return &Config{ + Version: 1, + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 32768, + MaxReceiveBuffer: 4194304, + MaxStreamBuffer: 65536, + } +} + +// VerifyConfig is used to verify the sanity of configuration +func VerifyConfig(config *Config) error { + if !(config.Version == 1 || config.Version == 2) { + return errors.New("unsupported protocol version") + } + if !config.KeepAliveDisabled { + if config.KeepAliveInterval == 0 { + return errors.New("keep-alive interval must be positive") + } + if config.KeepAliveTimeout < config.KeepAliveInterval { + return fmt.Errorf("keep-alive timeout must be larger than keep-alive interval") + } + } + if config.MaxFrameSize <= 0 { + return errors.New("max frame size must be positive") + } + if config.MaxFrameSize > 65535 { + return errors.New("max frame size must not be larger than 65535") + } + if config.MaxReceiveBuffer <= 0 { + return errors.New("max receive buffer must be positive") + } + if config.MaxStreamBuffer <= 0 { + return errors.New("max stream buffer must be positive") + } + if config.MaxStreamBuffer > config.MaxReceiveBuffer { + return errors.New("max stream buffer must not be larger than max receive buffer") + } + if config.MaxStreamBuffer > math.MaxInt32 { + return errors.New("max stream buffer cannot be larger than 2147483647") + } + return nil +} + +// Server is used to initialize a new server-side connection. +func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, false), nil +} + +// Client is used to initialize a new client-side connection. +func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, true), nil +} diff --git a/vendor/github.com/xtaci/smux/mux.jpg b/vendor/github.com/xtaci/smux/mux.jpg Binary files differnew file mode 100644 index 0000000..dde2e11 --- /dev/null +++ b/vendor/github.com/xtaci/smux/mux.jpg diff --git a/vendor/github.com/xtaci/smux/session.go b/vendor/github.com/xtaci/smux/session.go new file mode 100644 index 0000000..bc56066 --- /dev/null +++ b/vendor/github.com/xtaci/smux/session.go @@ -0,0 +1,525 @@ +package smux + +import ( + "container/heap" + "encoding/binary" + "errors" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +const ( + defaultAcceptBacklog = 1024 +) + +var ( + ErrInvalidProtocol = errors.New("invalid protocol") + ErrConsumed = errors.New("peer consumed more than sent") + ErrGoAway = errors.New("stream id overflows, should start a new connection") + ErrTimeout = errors.New("timeout") + ErrWouldBlock = errors.New("operation would block on IO") +) + +type writeRequest struct { + prio uint64 + frame Frame + result chan writeResult +} + +type writeResult struct { + n int + err error +} + +type buffersWriter interface { + WriteBuffers(v [][]byte) (n int, err error) +} + +// Session defines a multiplexed connection for streams +type Session struct { + conn io.ReadWriteCloser + + config *Config + nextStreamID uint32 // next stream identifier + nextStreamIDLock sync.Mutex + + bucket int32 // token bucket + bucketNotify chan struct{} // used for waiting for tokens + + streams map[uint32]*Stream // all streams in this session + streamLock sync.Mutex // locks streams + + die chan struct{} // flag session has died + dieOnce sync.Once + + // socket error handling + socketReadError atomic.Value + socketWriteError atomic.Value + chSocketReadError chan struct{} + chSocketWriteError chan struct{} + socketReadErrorOnce sync.Once + socketWriteErrorOnce sync.Once + + // smux protocol errors + protoError atomic.Value + chProtoError chan struct{} + protoErrorOnce sync.Once + + chAccepts chan *Stream + + dataReady int32 // flag data has arrived + + goAway int32 // flag id exhausted + + deadline atomic.Value + + shaper chan writeRequest // a shaper for writing + writes chan writeRequest +} + +func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { + s := new(Session) + s.die = make(chan struct{}) + s.conn = conn + s.config = config + s.streams = make(map[uint32]*Stream) + s.chAccepts = make(chan *Stream, defaultAcceptBacklog) + s.bucket = int32(config.MaxReceiveBuffer) + s.bucketNotify = make(chan struct{}, 1) + s.shaper = make(chan writeRequest) + s.writes = make(chan writeRequest) + s.chSocketReadError = make(chan struct{}) + s.chSocketWriteError = make(chan struct{}) + s.chProtoError = make(chan struct{}) + + if client { + s.nextStreamID = 1 + } else { + s.nextStreamID = 0 + } + + go s.shaperLoop() + go s.recvLoop() + go s.sendLoop() + if !config.KeepAliveDisabled { + go s.keepalive() + } + return s +} + +// OpenStream is used to create a new stream +func (s *Session) OpenStream() (*Stream, error) { + if s.IsClosed() { + return nil, io.ErrClosedPipe + } + + // generate stream id + s.nextStreamIDLock.Lock() + if s.goAway > 0 { + s.nextStreamIDLock.Unlock() + return nil, ErrGoAway + } + + s.nextStreamID += 2 + sid := s.nextStreamID + if sid == sid%2 { // stream-id overflows + s.goAway = 1 + s.nextStreamIDLock.Unlock() + return nil, ErrGoAway + } + s.nextStreamIDLock.Unlock() + + stream := newStream(sid, s.config.MaxFrameSize, s) + + if _, err := s.writeFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil { + return nil, err + } + + s.streamLock.Lock() + defer s.streamLock.Unlock() + select { + case <-s.chSocketReadError: + return nil, s.socketReadError.Load().(error) + case <-s.chSocketWriteError: + return nil, s.socketWriteError.Load().(error) + case <-s.die: + return nil, io.ErrClosedPipe + default: + s.streams[sid] = stream + return stream, nil + } +} + +// Open returns a generic ReadWriteCloser +func (s *Session) Open() (io.ReadWriteCloser, error) { + return s.OpenStream() +} + +// AcceptStream is used to block until the next available stream +// is ready to be accepted. +func (s *Session) AcceptStream() (*Stream, error) { + var deadline <-chan time.Time + if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + select { + case stream := <-s.chAccepts: + return stream, nil + case <-deadline: + return nil, ErrTimeout + case <-s.chSocketReadError: + return nil, s.socketReadError.Load().(error) + case <-s.chProtoError: + return nil, s.protoError.Load().(error) + case <-s.die: + return nil, io.ErrClosedPipe + } +} + +// Accept Returns a generic ReadWriteCloser instead of smux.Stream +func (s *Session) Accept() (io.ReadWriteCloser, error) { + return s.AcceptStream() +} + +// Close is used to close the session and all streams. +func (s *Session) Close() error { + var once bool + s.dieOnce.Do(func() { + close(s.die) + once = true + }) + + if once { + s.streamLock.Lock() + for k := range s.streams { + s.streams[k].sessionClose() + } + s.streamLock.Unlock() + return s.conn.Close() + } else { + return io.ErrClosedPipe + } +} + +// notifyBucket notifies recvLoop that bucket is available +func (s *Session) notifyBucket() { + select { + case s.bucketNotify <- struct{}{}: + default: + } +} + +func (s *Session) notifyReadError(err error) { + s.socketReadErrorOnce.Do(func() { + s.socketReadError.Store(err) + close(s.chSocketReadError) + }) +} + +func (s *Session) notifyWriteError(err error) { + s.socketWriteErrorOnce.Do(func() { + s.socketWriteError.Store(err) + close(s.chSocketWriteError) + }) +} + +func (s *Session) notifyProtoError(err error) { + s.protoErrorOnce.Do(func() { + s.protoError.Store(err) + close(s.chProtoError) + }) +} + +// IsClosed does a safe check to see if we have shutdown +func (s *Session) IsClosed() bool { + select { + case <-s.die: + return true + default: + return false + } +} + +// NumStreams returns the number of currently open streams +func (s *Session) NumStreams() int { + if s.IsClosed() { + return 0 + } + s.streamLock.Lock() + defer s.streamLock.Unlock() + return len(s.streams) +} + +// SetDeadline sets a deadline used by Accept* calls. +// A zero time value disables the deadline. +func (s *Session) SetDeadline(t time.Time) error { + s.deadline.Store(t) + return nil +} + +// LocalAddr satisfies net.Conn interface +func (s *Session) LocalAddr() net.Addr { + if ts, ok := s.conn.(interface { + LocalAddr() net.Addr + }); ok { + return ts.LocalAddr() + } + return nil +} + +// RemoteAddr satisfies net.Conn interface +func (s *Session) RemoteAddr() net.Addr { + if ts, ok := s.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return ts.RemoteAddr() + } + return nil +} + +// notify the session that a stream has closed +func (s *Session) streamClosed(sid uint32) { + s.streamLock.Lock() + if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.notifyBucket() + } + } + delete(s.streams, sid) + s.streamLock.Unlock() +} + +// returnTokens is called by stream to return token after read +func (s *Session) returnTokens(n int) { + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.notifyBucket() + } +} + +// recvLoop keeps on reading from underlying connection if tokens are available +func (s *Session) recvLoop() { + var hdr rawHeader + var updHdr updHeader + + for { + for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() { + select { + case <-s.bucketNotify: + case <-s.die: + return + } + } + + // read header first + if _, err := io.ReadFull(s.conn, hdr[:]); err == nil { + atomic.StoreInt32(&s.dataReady, 1) + if hdr.Version() != byte(s.config.Version) { + s.notifyProtoError(ErrInvalidProtocol) + return + } + sid := hdr.StreamID() + switch hdr.Cmd() { + case cmdNOP: + case cmdSYN: + s.streamLock.Lock() + if _, ok := s.streams[sid]; !ok { + stream := newStream(sid, s.config.MaxFrameSize, s) + s.streams[sid] = stream + select { + case s.chAccepts <- stream: + case <-s.die: + } + } + s.streamLock.Unlock() + case cmdFIN: + s.streamLock.Lock() + if stream, ok := s.streams[sid]; ok { + stream.fin() + stream.notifyReadEvent() + } + s.streamLock.Unlock() + case cmdPSH: + if hdr.Length() > 0 { + newbuf := defaultAllocator.Get(int(hdr.Length())) + if written, err := io.ReadFull(s.conn, newbuf); err == nil { + s.streamLock.Lock() + if stream, ok := s.streams[sid]; ok { + stream.pushBytes(newbuf) + atomic.AddInt32(&s.bucket, -int32(written)) + stream.notifyReadEvent() + } + s.streamLock.Unlock() + } else { + s.notifyReadError(err) + return + } + } + case cmdUPD: + if _, err := io.ReadFull(s.conn, updHdr[:]); err == nil { + s.streamLock.Lock() + if stream, ok := s.streams[sid]; ok { + stream.update(updHdr.Consumed(), updHdr.Window()) + } + s.streamLock.Unlock() + } else { + s.notifyReadError(err) + return + } + default: + s.notifyProtoError(ErrInvalidProtocol) + return + } + } else { + s.notifyReadError(err) + return + } + } +} + +func (s *Session) keepalive() { + tickerPing := time.NewTicker(s.config.KeepAliveInterval) + tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout) + defer tickerPing.Stop() + defer tickerTimeout.Stop() + for { + select { + case <-tickerPing.C: + s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, 0) + s.notifyBucket() // force a signal to the recvLoop + case <-tickerTimeout.C: + if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) { + // recvLoop may block while bucket is 0, in this case, + // session should not be closed. + if atomic.LoadInt32(&s.bucket) > 0 { + s.Close() + return + } + } + case <-s.die: + return + } + } +} + +// shaper shapes the sending sequence among streams +func (s *Session) shaperLoop() { + var reqs shaperHeap + var next writeRequest + var chWrite chan writeRequest + + for { + if len(reqs) > 0 { + chWrite = s.writes + next = heap.Pop(&reqs).(writeRequest) + } else { + chWrite = nil + } + + select { + case <-s.die: + return + case r := <-s.shaper: + if chWrite != nil { // next is valid, reshape + heap.Push(&reqs, next) + } + heap.Push(&reqs, r) + case chWrite <- next: + } + } +} + +func (s *Session) sendLoop() { + var buf []byte + var n int + var err error + var vec [][]byte // vector for writeBuffers + + bw, ok := s.conn.(buffersWriter) + if ok { + buf = make([]byte, headerSize) + vec = make([][]byte, 2) + } else { + buf = make([]byte, (1<<16)+headerSize) + } + + for { + select { + case <-s.die: + return + case request := <-s.writes: + buf[0] = request.frame.ver + buf[1] = request.frame.cmd + binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data))) + binary.LittleEndian.PutUint32(buf[4:], request.frame.sid) + + if len(vec) > 0 { + vec[0] = buf[:headerSize] + vec[1] = request.frame.data + n, err = bw.WriteBuffers(vec) + } else { + copy(buf[headerSize:], request.frame.data) + n, err = s.conn.Write(buf[:headerSize+len(request.frame.data)]) + } + + n -= headerSize + if n < 0 { + n = 0 + } + + result := writeResult{ + n: n, + err: err, + } + + request.result <- result + close(request.result) + + // store conn error + if err != nil { + s.notifyWriteError(err) + return + } + } + } +} + +// writeFrame writes the frame to the underlying connection +// and returns the number of bytes written if successful +func (s *Session) writeFrame(f Frame) (n int, err error) { + return s.writeFrameInternal(f, nil, 0) +} + +// internal writeFrame version to support deadline used in keepalive +func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint64) (int, error) { + req := writeRequest{ + prio: prio, + frame: f, + result: make(chan writeResult, 1), + } + select { + case s.shaper <- req: + case <-s.die: + return 0, io.ErrClosedPipe + case <-s.chSocketWriteError: + return 0, s.socketWriteError.Load().(error) + case <-deadline: + return 0, ErrTimeout + } + + select { + case result := <-req.result: + return result.n, result.err + case <-s.die: + return 0, io.ErrClosedPipe + case <-s.chSocketWriteError: + return 0, s.socketWriteError.Load().(error) + case <-deadline: + return 0, ErrTimeout + } +} diff --git a/vendor/github.com/xtaci/smux/shaper.go b/vendor/github.com/xtaci/smux/shaper.go new file mode 100644 index 0000000..be03406 --- /dev/null +++ b/vendor/github.com/xtaci/smux/shaper.go @@ -0,0 +1,16 @@ +package smux + +type shaperHeap []writeRequest + +func (h shaperHeap) Len() int { return len(h) } +func (h shaperHeap) Less(i, j int) bool { return h[i].prio < h[j].prio } +func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) } + +func (h *shaperHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/vendor/github.com/xtaci/smux/smux.png b/vendor/github.com/xtaci/smux/smux.png Binary files differnew file mode 100644 index 0000000..26aba3b --- /dev/null +++ b/vendor/github.com/xtaci/smux/smux.png diff --git a/vendor/github.com/xtaci/smux/stream.go b/vendor/github.com/xtaci/smux/stream.go new file mode 100644 index 0000000..6c9499c --- /dev/null +++ b/vendor/github.com/xtaci/smux/stream.go @@ -0,0 +1,549 @@ +package smux + +import ( + "encoding/binary" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +// Stream implements net.Conn +type Stream struct { + id uint32 + sess *Session + + buffers [][]byte + heads [][]byte // slice heads kept for recycle + + bufferLock sync.Mutex + frameSize int + + // notify a read event + chReadEvent chan struct{} + + // flag the stream has closed + die chan struct{} + dieOnce sync.Once + + // FIN command + chFinEvent chan struct{} + finEventOnce sync.Once + + // deadlines + readDeadline atomic.Value + writeDeadline atomic.Value + + // per stream sliding window control + numRead uint32 // number of consumed bytes + numWritten uint32 // count num of bytes written + incr uint32 // counting for sending + + // UPD command + peerConsumed uint32 // num of bytes the peer has consumed + peerWindow uint32 // peer window, initialized to 256KB, updated by peer + chUpdate chan struct{} // notify of remote data consuming and window update +} + +// newStream initiates a Stream struct +func newStream(id uint32, frameSize int, sess *Session) *Stream { + s := new(Stream) + s.id = id + s.chReadEvent = make(chan struct{}, 1) + s.chUpdate = make(chan struct{}, 1) + s.frameSize = frameSize + s.sess = sess + s.die = make(chan struct{}) + s.chFinEvent = make(chan struct{}) + s.peerWindow = initialPeerWindow // set to initial window size + return s +} + +// ID returns the unique stream ID. +func (s *Stream) ID() uint32 { + return s.id +} + +// Read implements net.Conn +func (s *Stream) Read(b []byte) (n int, err error) { + for { + n, err = s.tryRead(b) + if err == ErrWouldBlock { + if ew := s.waitRead(); ew != nil { + return 0, ew + } + } else { + return n, err + } + } +} + +// tryRead is the nonblocking version of Read +func (s *Stream) tryRead(b []byte) (n int, err error) { + if s.sess.config.Version == 2 { + return s.tryReadv2(b) + } + + if len(b) == 0 { + return 0, nil + } + + s.bufferLock.Lock() + if len(s.buffers) > 0 { + n = copy(b, s.buffers[0]) + s.buffers[0] = s.buffers[0][n:] + if len(s.buffers[0]) == 0 { + s.buffers[0] = nil + s.buffers = s.buffers[1:] + // full recycle + defaultAllocator.Put(s.heads[0]) + s.heads = s.heads[1:] + } + } + s.bufferLock.Unlock() + + if n > 0 { + s.sess.returnTokens(n) + return n, nil + } + + select { + case <-s.die: + return 0, io.EOF + default: + return 0, ErrWouldBlock + } +} + +func (s *Stream) tryReadv2(b []byte) (n int, err error) { + if len(b) == 0 { + return 0, nil + } + + var notifyConsumed uint32 + s.bufferLock.Lock() + if len(s.buffers) > 0 { + n = copy(b, s.buffers[0]) + s.buffers[0] = s.buffers[0][n:] + if len(s.buffers[0]) == 0 { + s.buffers[0] = nil + s.buffers = s.buffers[1:] + // full recycle + defaultAllocator.Put(s.heads[0]) + s.heads = s.heads[1:] + } + } + + // in an ideal environment: + // if more than half of buffer has consumed, send read ack to peer + // based on round-trip time of ACK, continous flowing data + // won't slow down because of waiting for ACK, as long as the + // consumer keeps on reading data + // s.numRead == n also notify window at the first read + s.numRead += uint32(n) + s.incr += uint32(n) + if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(n) { + notifyConsumed = s.numRead + s.incr = 0 + } + s.bufferLock.Unlock() + + if n > 0 { + s.sess.returnTokens(n) + if notifyConsumed > 0 { + err := s.sendWindowUpdate(notifyConsumed) + return n, err + } else { + return n, nil + } + } + + select { + case <-s.die: + return 0, io.EOF + default: + return 0, ErrWouldBlock + } +} + +// WriteTo implements io.WriteTo +func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { + if s.sess.config.Version == 2 { + return s.writeTov2(w) + } + + for { + var buf []byte + s.bufferLock.Lock() + if len(s.buffers) > 0 { + buf = s.buffers[0] + s.buffers = s.buffers[1:] + s.heads = s.heads[1:] + } + s.bufferLock.Unlock() + + if buf != nil { + nw, ew := w.Write(buf) + s.sess.returnTokens(len(buf)) + defaultAllocator.Put(buf) + if nw > 0 { + n += int64(nw) + } + + if ew != nil { + return n, ew + } + } else if ew := s.waitRead(); ew != nil { + return n, ew + } + } +} + +func (s *Stream) writeTov2(w io.Writer) (n int64, err error) { + for { + var notifyConsumed uint32 + var buf []byte + s.bufferLock.Lock() + if len(s.buffers) > 0 { + buf = s.buffers[0] + s.buffers = s.buffers[1:] + s.heads = s.heads[1:] + } + s.numRead += uint32(len(buf)) + s.incr += uint32(len(buf)) + if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(len(buf)) { + notifyConsumed = s.numRead + s.incr = 0 + } + s.bufferLock.Unlock() + + if buf != nil { + nw, ew := w.Write(buf) + s.sess.returnTokens(len(buf)) + defaultAllocator.Put(buf) + if nw > 0 { + n += int64(nw) + } + + if ew != nil { + return n, ew + } + + if notifyConsumed > 0 { + if err := s.sendWindowUpdate(notifyConsumed); err != nil { + return n, err + } + } + } else if ew := s.waitRead(); ew != nil { + return n, ew + } + } +} + +func (s *Stream) sendWindowUpdate(consumed uint32) error { + var timer *time.Timer + var deadline <-chan time.Time + if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { + timer = time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + frame := newFrame(byte(s.sess.config.Version), cmdUPD, s.id) + var hdr updHeader + binary.LittleEndian.PutUint32(hdr[:], consumed) + binary.LittleEndian.PutUint32(hdr[4:], uint32(s.sess.config.MaxStreamBuffer)) + frame.data = hdr[:] + _, err := s.sess.writeFrameInternal(frame, deadline, 0) + return err +} + +func (s *Stream) waitRead() error { + var timer *time.Timer + var deadline <-chan time.Time + if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { + timer = time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + select { + case <-s.chReadEvent: + return nil + case <-s.chFinEvent: + // BUG(xtaci): Fix for https://github.com/xtaci/smux/issues/82 + s.bufferLock.Lock() + defer s.bufferLock.Unlock() + if len(s.buffers) > 0 { + return nil + } + return io.EOF + case <-s.sess.chSocketReadError: + return s.sess.socketReadError.Load().(error) + case <-s.sess.chProtoError: + return s.sess.protoError.Load().(error) + case <-deadline: + return ErrTimeout + case <-s.die: + return io.ErrClosedPipe + } + +} + +// Write implements net.Conn +// +// Note that the behavior when multiple goroutines write concurrently is not deterministic, +// frames may interleave in random way. +func (s *Stream) Write(b []byte) (n int, err error) { + if s.sess.config.Version == 2 { + return s.writeV2(b) + } + + var deadline <-chan time.Time + if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + // check if stream has closed + select { + case <-s.die: + return 0, io.ErrClosedPipe + default: + } + + // frame split and transmit + sent := 0 + frame := newFrame(byte(s.sess.config.Version), cmdPSH, s.id) + bts := b + for len(bts) > 0 { + sz := len(bts) + if sz > s.frameSize { + sz = s.frameSize + } + frame.data = bts[:sz] + bts = bts[sz:] + n, err := s.sess.writeFrameInternal(frame, deadline, uint64(s.numWritten)) + s.numWritten++ + sent += n + if err != nil { + return sent, err + } + } + + return sent, nil +} + +func (s *Stream) writeV2(b []byte) (n int, err error) { + // check empty input + if len(b) == 0 { + return 0, nil + } + + // check if stream has closed + select { + case <-s.die: + return 0, io.ErrClosedPipe + default: + } + + // create write deadline timer + var deadline <-chan time.Time + if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(time.Until(d)) + defer timer.Stop() + deadline = timer.C + } + + // frame split and transmit process + sent := 0 + frame := newFrame(byte(s.sess.config.Version), cmdPSH, s.id) + + for { + // per stream sliding window control + // [.... [consumed... numWritten] ... win... ] + // [.... [consumed...................+rmtwnd]] + var bts []byte + // note: + // even if uint32 overflow, this math still works: + // eg1: uint32(0) - uint32(math.MaxUint32) = 1 + // eg2: int32(uint32(0) - uint32(1)) = -1 + // security check for misbehavior + inflight := int32(atomic.LoadUint32(&s.numWritten) - atomic.LoadUint32(&s.peerConsumed)) + if inflight < 0 { + return 0, ErrConsumed + } + + win := int32(atomic.LoadUint32(&s.peerWindow)) - inflight + if win > 0 { + if win > int32(len(b)) { + bts = b + b = nil + } else { + bts = b[:win] + b = b[win:] + } + + for len(bts) > 0 { + sz := len(bts) + if sz > s.frameSize { + sz = s.frameSize + } + frame.data = bts[:sz] + bts = bts[sz:] + n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten))) + atomic.AddUint32(&s.numWritten, uint32(sz)) + sent += n + if err != nil { + return sent, err + } + } + } + + // if there is any data remaining to be sent + // wait until stream closes, window changes or deadline reached + // this blocking behavior will inform upper layer to do flow control + if len(b) > 0 { + select { + case <-s.chFinEvent: // if fin arrived, future window update is impossible + return 0, io.EOF + case <-s.die: + return sent, io.ErrClosedPipe + case <-deadline: + return sent, ErrTimeout + case <-s.sess.chSocketWriteError: + return sent, s.sess.socketWriteError.Load().(error) + case <-s.chUpdate: + continue + } + } else { + return sent, nil + } + } +} + +// Close implements net.Conn +func (s *Stream) Close() error { + var once bool + var err error + s.dieOnce.Do(func() { + close(s.die) + once = true + }) + + if once { + _, err = s.sess.writeFrame(newFrame(byte(s.sess.config.Version), cmdFIN, s.id)) + s.sess.streamClosed(s.id) + return err + } else { + return io.ErrClosedPipe + } +} + +// GetDieCh returns a readonly chan which can be readable +// when the stream is to be closed. +func (s *Stream) GetDieCh() <-chan struct{} { + return s.die +} + +// SetReadDeadline sets the read deadline as defined by +// net.Conn.SetReadDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetReadDeadline(t time.Time) error { + s.readDeadline.Store(t) + s.notifyReadEvent() + return nil +} + +// SetWriteDeadline sets the write deadline as defined by +// net.Conn.SetWriteDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetWriteDeadline(t time.Time) error { + s.writeDeadline.Store(t) + return nil +} + +// SetDeadline sets both read and write deadlines as defined by +// net.Conn.SetDeadline. +// A zero time value disables the deadlines. +func (s *Stream) SetDeadline(t time.Time) error { + if err := s.SetReadDeadline(t); err != nil { + return err + } + if err := s.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +// session closes +func (s *Stream) sessionClose() { s.dieOnce.Do(func() { close(s.die) }) } + +// LocalAddr satisfies net.Conn interface +func (s *Stream) LocalAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + LocalAddr() net.Addr + }); ok { + return ts.LocalAddr() + } + return nil +} + +// RemoteAddr satisfies net.Conn interface +func (s *Stream) RemoteAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return ts.RemoteAddr() + } + return nil +} + +// pushBytes append buf to buffers +func (s *Stream) pushBytes(buf []byte) (written int, err error) { + s.bufferLock.Lock() + s.buffers = append(s.buffers, buf) + s.heads = append(s.heads, buf) + s.bufferLock.Unlock() + return +} + +// recycleTokens transform remaining bytes to tokens(will truncate buffer) +func (s *Stream) recycleTokens() (n int) { + s.bufferLock.Lock() + for k := range s.buffers { + n += len(s.buffers[k]) + defaultAllocator.Put(s.heads[k]) + } + s.buffers = nil + s.heads = nil + s.bufferLock.Unlock() + return +} + +// notify read event +func (s *Stream) notifyReadEvent() { + select { + case s.chReadEvent <- struct{}{}: + default: + } +} + +// update command +func (s *Stream) update(consumed uint32, window uint32) { + atomic.StoreUint32(&s.peerConsumed, consumed) + atomic.StoreUint32(&s.peerWindow, window) + select { + case s.chUpdate <- struct{}{}: + default: + } +} + +// mark this stream has been closed in protocol +func (s *Stream) fin() { + s.finEventOnce.Do(func() { + close(s.chFinEvent) + }) +} |